[KYUUBI #4843] Support multiple kubernetes contexts and namespaces

### _Why are the changes needed?_

Close #4843
Support  to submit kyuubi engine/batch to multiple kubernetes contexts and namespaces.

In this pr, the user can config the kubernetes conf for specified kubernetes context and namespace likes below.
```
kyuubi.kubernetes.<context>.master.address
kyuubi.kubernetes.<context>.<namespace>.authenticate.oauthTokenFile
```

For example:

```
kyuubi.kubernetes.28.master.address=k8s://master
kyuubi.kubernetes.28.ns1.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/token.ns1
kyuubi.kubernetes.28.ns2.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/token.ns2
```

for k8s context=28, namespace=ns1, its kubernetes config is:
```
kyuubi.kubernetes.master.address=k8s://master
kyuubi.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/token.ns1
```

for k8s context=28, namespace=ns2, its kubernetes config is:
```
kyuubi.kubernetes.master.address=k8s://master
kyuubi.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/token.ns2
```

So that, kyuubi server can build kubernetes client for each context and namespace.
### _How was this patch tested?_
Existing kubernetes integration testing.

Closes #4984 from turboFei/k8s_client_yaml.

Closes #4843

f8ffaeeb9 [fwang12] nit
d25774288 [fwang12] comments
5ae7c8433 [fwang12] save into request conf
fd6c363db [fwang12] save
ff004a529 [fwang12] procebuilder method
6b9520bfd [fwang12] save
58850387e [fwang12] save
98df67e5f [fwang12] ut
da811697c [fwang12] fix
aa568aaa4 [fwang12] save
89656f463 [fwang12] check init
a0ef6894b [fwang12] code style
00abb6568 [fwang12] default namespace
295512987 [fwang12] k8s context namespace

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
fwang12 2023-06-26 15:52:56 +08:00
parent 80bc028e6d
commit c2e27304fe
24 changed files with 359 additions and 141 deletions

View File

@ -307,8 +307,10 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.kubernetes.authenticate.oauthToken | &lt;undefined&gt; | The OAuth token to use when authenticating against the Kubernetes API server. Note that unlike, the other authentication options, this must be the exact string value of the token to use for the authentication. | string | 1.7.0 |
| kyuubi.kubernetes.authenticate.oauthTokenFile | &lt;undefined&gt; | Path to the file containing the OAuth token to use when authenticating against the Kubernetes API server. Specify this as a path as opposed to a URI (i.e. do not provide a scheme) | string | 1.7.0 |
| kyuubi.kubernetes.context | &lt;undefined&gt; | The desired context from your kubernetes config file used to configure the K8s client for interacting with the cluster. | string | 1.6.0 |
| kyuubi.kubernetes.context.allow.list || The allowed kubernetes context list, if it is empty, there is no kubernetes context limitation. | seq | 1.8.0 |
| kyuubi.kubernetes.master.address | &lt;undefined&gt; | The internal Kubernetes master (API server) address to be used for kyuubi. | string | 1.7.0 |
| kyuubi.kubernetes.namespace | default | The namespace that will be used for running the kyuubi pods and find engines. | string | 1.7.0 |
| kyuubi.kubernetes.namespace.allow.list || The allowed kubernetes namespace list, if it is empty, there is no kubernetes namespace limitation. | seq | 1.8.0 |
| kyuubi.kubernetes.terminatedApplicationRetainPeriod | PT5M | The period for which the Kyuubi server retains application information after the application terminates. | duration | 1.7.1 |
| kyuubi.kubernetes.trust.certificates | false | If set to true then client can submit to kubernetes cluster only with token | boolean | 1.7.0 |

View File

@ -29,7 +29,7 @@ import org.apache.kyuubi._
import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_HOST
import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationOperation, KubernetesApplicationOperation}
import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo, ApplicationOperation, KubernetesApplicationOperation}
import org.apache.kyuubi.engine.ApplicationState.{FAILED, NOT_FOUND, RUNNING}
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.kubernetes.test.MiniKube
@ -44,6 +44,9 @@ abstract class SparkOnKubernetesSuiteBase
MiniKube.getKubernetesClient.getMasterUrl.toString
}
protected val appMgrInfo =
ApplicationManagerInfo(Some(s"k8s://$apiServerAddress"), Some("minikube"), None)
protected def sparkOnK8sConf: KyuubiConf = {
// TODO Support more Spark version
// Spark official docker image: https://hub.docker.com/r/apache/spark/tags
@ -150,20 +153,28 @@ class KyuubiOperationKubernetesClusterClientModeSuite
batchRequest)
eventually(timeout(3.minutes), interval(50.milliseconds)) {
val state = k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
val state = k8sOperation.getApplicationInfoByTag(
appMgrInfo,
sessionHandle.identifier.toString)
assert(state.id != null)
assert(state.name != null)
assert(state.state == RUNNING)
}
val killResponse = k8sOperation.killApplicationByTag(sessionHandle.identifier.toString)
val killResponse = k8sOperation.killApplicationByTag(
appMgrInfo,
sessionHandle.identifier.toString)
assert(killResponse._1)
assert(killResponse._2 startsWith "Succeeded to terminate:")
val appInfo = k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
val appInfo = k8sOperation.getApplicationInfoByTag(
appMgrInfo,
sessionHandle.identifier.toString)
assert(appInfo == ApplicationInfo(null, null, NOT_FOUND))
val failKillResponse = k8sOperation.killApplicationByTag(sessionHandle.identifier.toString)
val failKillResponse = k8sOperation.killApplicationByTag(
appMgrInfo,
sessionHandle.identifier.toString)
assert(!failKillResponse._1)
assert(failKillResponse._2 === ApplicationOperation.NOT_FOUND)
}
@ -212,24 +223,32 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
// wait for driver pod start
eventually(timeout(3.minutes), interval(5.second)) {
// trigger k8sOperation init here
val appInfo = k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
val appInfo = k8sOperation.getApplicationInfoByTag(
appMgrInfo,
sessionHandle.identifier.toString)
assert(appInfo.state == RUNNING)
assert(appInfo.name.startsWith(driverPodNamePrefix))
}
val killResponse = k8sOperation.killApplicationByTag(sessionHandle.identifier.toString)
val killResponse = k8sOperation.killApplicationByTag(
appMgrInfo,
sessionHandle.identifier.toString)
assert(killResponse._1)
assert(killResponse._2 endsWith "is completed")
assert(killResponse._2 contains sessionHandle.identifier.toString)
eventually(timeout(3.minutes), interval(50.milliseconds)) {
val appInfo = k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
val appInfo = k8sOperation.getApplicationInfoByTag(
appMgrInfo,
sessionHandle.identifier.toString)
// We may kill engine start but not ready
// An EOF Error occurred when the driver was starting
assert(appInfo.state == FAILED || appInfo.state == NOT_FOUND)
}
val failKillResponse = k8sOperation.killApplicationByTag(sessionHandle.identifier.toString)
val failKillResponse = k8sOperation.killApplicationByTag(
appMgrInfo,
sessionHandle.identifier.toString)
assert(!failKillResponse._1)
}
}

View File

@ -134,6 +134,31 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
getAllWithPrefix(s"$KYUUBI_BATCH_CONF_PREFIX.$normalizedBatchType", "")
}
/** Get the kubernetes conf for specified kubernetes context and namespace. */
def getKubernetesConf(context: Option[String], namespace: Option[String]): KyuubiConf = {
val conf = this.clone
context.foreach { c =>
val contextConf =
getAllWithPrefix(s"$KYUUBI_KUBERNETES_CONF_PREFIX.$c", "").map { case (suffix, value) =>
s"$KYUUBI_KUBERNETES_CONF_PREFIX.$suffix" -> value
}
val contextNamespaceConf = namespace.map { ns =>
getAllWithPrefix(s"$KYUUBI_KUBERNETES_CONF_PREFIX.$c.$ns", "").map {
case (suffix, value) =>
s"$KYUUBI_KUBERNETES_CONF_PREFIX.$suffix" -> value
}
}.getOrElse(Map.empty)
(contextConf ++ contextNamespaceConf).map { case (key, value) =>
conf.set(key, value)
}
conf.set(KUBERNETES_CONTEXT, c)
namespace.foreach(ns => conf.set(KUBERNETES_NAMESPACE, ns))
conf
}
conf
}
/**
* Retrieve key-value pairs from [[KyuubiConf]] starting with `dropped.remainder`, and put them to
* the result map with the `dropped` of key being dropped.
@ -207,6 +232,7 @@ object KyuubiConf {
final val KYUUBI_HOME = "KYUUBI_HOME"
final val KYUUBI_ENGINE_ENV_PREFIX = "kyuubi.engineEnv"
final val KYUUBI_BATCH_CONF_PREFIX = "kyuubi.batchConf"
final val KYUUBI_KUBERNETES_CONF_PREFIX = "kyuubi.kubernetes"
final val USER_DEFAULTS_CONF_QUOTE = "___"
private[this] val kyuubiConfEntriesUpdateLock = new Object
@ -1106,6 +1132,15 @@ object KyuubiConf {
.stringConf
.createOptional
val KUBERNETES_CONTEXT_ALLOW_LIST: ConfigEntry[Seq[String]] =
buildConf("kyuubi.kubernetes.context.allow.list")
.doc("The allowed kubernetes context list, if it is empty," +
" there is no kubernetes context limitation.")
.version("1.8.0")
.stringConf
.toSequence()
.createWithDefault(Nil)
val KUBERNETES_NAMESPACE: ConfigEntry[String] =
buildConf("kyuubi.kubernetes.namespace")
.doc("The namespace that will be used for running the kyuubi pods and find engines.")
@ -1113,6 +1148,15 @@ object KyuubiConf {
.stringConf
.createWithDefault("default")
val KUBERNETES_NAMESPACE_ALLOW_LIST: ConfigEntry[Seq[String]] =
buildConf("kyuubi.kubernetes.namespace.allow.list")
.doc("The allowed kubernetes namespace list, if it is empty," +
" there is no kubernetes namespace limitation.")
.version("1.8.0")
.stringConf
.toSequence()
.createWithDefault(Nil)
val KUBERNETES_MASTER: OptionalConfigEntry[String] =
buildConf("kyuubi.kubernetes.master.address")
.doc("The internal Kubernetes master (API server) address to be used for kyuubi.")

View File

@ -200,4 +200,25 @@ class KyuubiConfSuite extends KyuubiFunSuite {
assertResult(kSeq(1))("kyuubi.efg")
assertResult(kSeq(2))("kyuubi.xyz")
}
test("KYUUBI #4843 - Support multiple kubernetes contexts and namespaces") {
val kyuubiConf = KyuubiConf(false)
kyuubiConf.set("kyuubi.kubernetes.28.master.address", "k8s://master")
kyuubiConf.set(
"kyuubi.kubernetes.28.ns1.authenticate.oauthTokenFile",
"/var/run/secrets/kubernetes.io/token.ns1")
kyuubiConf.set(
"kyuubi.kubernetes.28.ns2.authenticate.oauthTokenFile",
"/var/run/secrets/kubernetes.io/token.ns2")
val kubernetesConf1 = kyuubiConf.getKubernetesConf(Some("28"), Some("ns1"))
assert(kubernetesConf1.get(KyuubiConf.KUBERNETES_MASTER) == Some("k8s://master"))
assert(kubernetesConf1.get(KyuubiConf.KUBERNETES_AUTHENTICATE_OAUTH_TOKEN_FILE) ==
Some("/var/run/secrets/kubernetes.io/token.ns1"))
val kubernetesConf2 = kyuubiConf.getKubernetesConf(Some("28"), Some("ns2"))
assert(kubernetesConf2.get(KyuubiConf.KUBERNETES_MASTER) == Some("k8s://master"))
assert(kubernetesConf2.get(KyuubiConf.KUBERNETES_AUTHENTICATE_OAUTH_TOKEN_FILE) ==
Some("/var/run/secrets/kubernetes.io/token.ns2"))
}
}

View File

@ -35,13 +35,14 @@ trait ApplicationOperation {
/**
* Called before other method to do a quick skip
*
* @param clusterManager the underlying cluster manager or just local instance
* @param appMgrInfo the application manager information
*/
def isSupported(clusterManager: Option[String]): Boolean
def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean
/**
* Kill the app/engine by the unique application tag
*
* @param appMgrInfo the application manager information
* @param tag the unique application tag for engine instance.
* For example,
* if the Hadoop Yarn is used, for spark applications,
@ -50,16 +51,20 @@ trait ApplicationOperation {
*
* @note For implementations, please suppress exceptions and always return KillResponse
*/
def killApplicationByTag(tag: String): KillResponse
def killApplicationByTag(appMgrInfo: ApplicationManagerInfo, tag: String): KillResponse
/**
* Get the engine/application status by the unique application tag
*
* @param appMgrInfo the application manager information
* @param tag the unique application tag for engine instance.
* @param submitTime engine submit to resourceManager time
* @return [[ApplicationInfo]]
*/
def getApplicationInfoByTag(tag: String, submitTime: Option[Long] = None): ApplicationInfo
def getApplicationInfoByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String,
submitTime: Option[Long] = None): ApplicationInfo
}
object ApplicationState extends Enumeration {
@ -108,3 +113,22 @@ object ApplicationInfo {
object ApplicationOperation {
val NOT_FOUND = "APPLICATION_NOT_FOUND"
}
case class KubernetesInfo(context: Option[String] = None, namespace: Option[String] = None)
case class ApplicationManagerInfo(
resourceManager: Option[String],
kubernetesInfo: KubernetesInfo = KubernetesInfo())
object ApplicationManagerInfo {
final val DEFAULT_KUBERNETES_NAMESPACE = "default"
def apply(
resourceManager: Option[String],
kubernetesContext: Option[String],
kubernetesNamespace: Option[String]): ApplicationManagerInfo = {
new ApplicationManagerInfo(
resourceManager,
KubernetesInfo(kubernetesContext, kubernetesNamespace))
}
}

View File

@ -223,7 +223,7 @@ private[kyuubi] class EngineRef(
}
if (started + timeout <= System.currentTimeMillis()) {
val killMessage = engineManager.killApplication(builder.clusterManager(), engineRefId)
val killMessage = engineManager.killApplication(builder.appMgrInfo(), engineRefId)
process.destroyForcibly()
MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser)))
throw KyuubiSQLException(
@ -242,7 +242,7 @@ private[kyuubi] class EngineRef(
}
val applicationInfo = engineMgr.getApplicationInfo(
builder.clusterManager(),
builder.appMgrInfo(),
engineRefId,
Some(started))
@ -291,9 +291,9 @@ private[kyuubi] class EngineRef(
def close(): Unit = {
if (shareLevel == CONNECTION && builder != null) {
try {
val clusterManager = builder.clusterManager()
val appMgrInfo = builder.appMgrInfo()
builder.close(true)
engineManager.killApplication(clusterManager, engineRefId)
engineManager.killApplication(appMgrInfo, engineRefId)
} catch {
case e: Exception =>
warn(s"Error closing engine builder, engineRefId: $engineRefId", e)

View File

@ -41,8 +41,9 @@ class JpsApplicationOperation extends ApplicationOperation {
}
}
override def isSupported(clusterManager: Option[String]): Boolean = {
runner != null && (clusterManager.isEmpty || clusterManager.get == "local")
override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {
runner != null &&
(appMgrInfo.resourceManager.isEmpty || appMgrInfo.resourceManager.get == "local")
}
private def getEngine(tag: String): Option[String] = {
@ -80,11 +81,16 @@ class JpsApplicationOperation extends ApplicationOperation {
}
}
override def killApplicationByTag(tag: String): KillResponse = {
override def killApplicationByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String): KillResponse = {
killJpsApplicationByTag(tag, true)
}
override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]): ApplicationInfo = {
override def getApplicationInfoByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String,
submitTime: Option[Long]): ApplicationInfo = {
val commandOption = getEngine(tag)
if (commandOption.nonEmpty) {
val idAndCmd = commandOption.get

View File

@ -20,12 +20,14 @@ package org.apache.kyuubi.engine
import java.util.Locale
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.collection.JavaConverters._
import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification}
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.informers.{ResourceEventHandler, SharedIndexInformer}
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.{KyuubiException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState, LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL}
@ -33,10 +35,16 @@ import org.apache.kyuubi.util.KubernetesUtils
class KubernetesApplicationOperation extends ApplicationOperation with Logging {
@volatile
private var kubernetesClient: KubernetesClient = _
private var enginePodInformer: SharedIndexInformer[Pod] = _
private val kubernetesClients: ConcurrentHashMap[KubernetesInfo, KubernetesClient] =
new ConcurrentHashMap[KubernetesInfo, KubernetesClient]
private val enginePodInformers: ConcurrentHashMap[KubernetesInfo, SharedIndexInformer[Pod]] =
new ConcurrentHashMap[KubernetesInfo, SharedIndexInformer[Pod]]
private var allowedContexts: Seq[String] = Seq.empty
private var allowedNamespaces: Seq[String] = Seq.empty
private var submitTimeout: Long = _
private var kyuubiConf: KyuubiConf = _
// key is kyuubi_unique_key
private val appInfoStore: ConcurrentHashMap[String, ApplicationInfo] =
@ -44,45 +52,74 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
// key is kyuubi_unique_key
private var cleanupTerminatedAppInfoTrigger: Cache[String, ApplicationState] = _
override def initialize(conf: KyuubiConf): Unit = {
info("Start initializing Kubernetes Client.")
kubernetesClient = KubernetesUtils.buildKubernetesClient(conf) match {
private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = {
val context = kubernetesInfo.context
val namespace = kubernetesInfo.namespace
if (allowedContexts.nonEmpty && !allowedContexts.contains(context)) {
throw new KyuubiException(
s"Kubernetes context $context is not in the allowed list[$allowedContexts]")
}
if (allowedNamespaces.nonEmpty && !allowedNamespaces.contains(namespace)) {
throw new KyuubiException(
s"Kubernetes namespace $namespace is not in the allowed list[$allowedNamespaces]")
}
kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => buildKubernetesClient(kInfo))
}
private def buildKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = {
val kubernetesConf =
kyuubiConf.getKubernetesConf(kubernetesInfo.context, kubernetesInfo.namespace)
KubernetesUtils.buildKubernetesClient(kubernetesConf) match {
case Some(client) =>
info(s"Initialized Kubernetes Client connect to: ${client.getMasterUrl}")
submitTimeout = conf.get(KyuubiConf.ENGINE_KUBERNETES_SUBMIT_TIMEOUT)
// Disable resync, see https://github.com/fabric8io/kubernetes-client/discussions/5015
enginePodInformer = client.pods()
info(s"[$kubernetesInfo] Initialized Kubernetes Client connect to: ${client.getMasterUrl}")
val enginePodInformer = client.pods()
.withLabel(LABEL_KYUUBI_UNIQUE_KEY)
.inform(new SparkEnginePodEventHandler)
info("Start Kubernetes Client Informer.")
// Defer cleaning terminated application information
val retainPeriod = conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD)
cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder()
.expireAfterWrite(retainPeriod, TimeUnit.MILLISECONDS)
.removalListener((notification: RemovalNotification[String, ApplicationState]) => {
Option(appInfoStore.remove(notification.getKey)).foreach { removed =>
info(s"Remove terminated application ${removed.id} with " +
s"tag ${notification.getKey} and state ${removed.state}")
}
})
.build()
info(s"[$kubernetesInfo] Start Kubernetes Client Informer.")
enginePodInformers.put(kubernetesInfo, enginePodInformer)
client
case None =>
warn("Fail to init Kubernetes Client for Kubernetes Application Operation")
null
case None => throw new KyuubiException(s"Fail to build Kubernetes client for $kubernetesInfo")
}
}
override def isSupported(clusterManager: Option[String]): Boolean = {
// TODO add deploy mode to check whether is supported
kubernetesClient != null && clusterManager.exists(_.toLowerCase(Locale.ROOT).startsWith("k8s"))
override def initialize(conf: KyuubiConf): Unit = {
kyuubiConf = conf
info("Start initializing Kubernetes application operation.")
submitTimeout = conf.get(KyuubiConf.ENGINE_KUBERNETES_SUBMIT_TIMEOUT)
allowedContexts = conf.get(KyuubiConf.KUBERNETES_CONTEXT_ALLOW_LIST)
allowedNamespaces = conf.get(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST)
// Defer cleaning terminated application information
val retainPeriod = conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD)
cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder()
.expireAfterWrite(retainPeriod, TimeUnit.MILLISECONDS)
.removalListener((notification: RemovalNotification[String, ApplicationState]) => {
Option(appInfoStore.remove(notification.getKey)).foreach { removed =>
info(s"Remove terminated application ${removed.id} with " +
s"tag ${notification.getKey} and state ${removed.state}")
}
})
.build()
}
override def killApplicationByTag(tag: String): KillResponse = {
if (kubernetesClient == null) {
override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {
// TODO add deploy mode to check whether is supported
kyuubiConf != null &&
appMgrInfo.resourceManager.exists(_.toLowerCase(Locale.ROOT).startsWith("k8s"))
}
override def killApplicationByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String): KillResponse = {
if (kyuubiConf == null) {
throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
}
debug(s"Deleting application info from Kubernetes cluster by $tag tag")
val kubernetesInfo = appMgrInfo.kubernetesInfo
val kubernetesClient = getOrCreateKubernetesClient(kubernetesInfo)
debug(s"[$kubernetesInfo] Deleting application info from Kubernetes cluster by $tag tag")
try {
val info = appInfoStore.getOrDefault(tag, ApplicationInfo.NOT_FOUND)
debug(s"Application info[tag: $tag] is in ${info.state}")
@ -90,24 +127,32 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
case NOT_FOUND | FAILED | UNKNOWN =>
(
false,
s"Target application[tag: $tag] is in ${info.state} status")
s"[$kubernetesInfo] Target application[tag: $tag] is in ${info.state} status")
case _ =>
(
!kubernetesClient.pods.withName(info.name).delete().isEmpty,
s"Operation of deleted application[appId: ${info.id} ,tag: $tag] is completed")
s"[$kubernetesInfo] Operation of deleted" +
s" application[appId: ${info.id} ,tag: $tag] is completed")
}
} catch {
case e: Exception =>
(false, s"Failed to terminate application with $tag, due to ${e.getMessage}")
(
false,
s"[$kubernetesInfo] Failed to terminate application with $tag, due to ${e.getMessage}")
}
}
override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]): ApplicationInfo = {
if (kubernetesClient == null) {
override def getApplicationInfoByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String,
submitTime: Option[Long]): ApplicationInfo = {
if (kyuubiConf == null) {
throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
}
debug(s"Getting application info from Kubernetes cluster by $tag tag")
try {
// need to initialize the kubernetes client if not exists
getOrCreateKubernetesClient(appMgrInfo.kubernetesInfo)
val appInfo = appInfoStore.getOrDefault(tag, ApplicationInfo.NOT_FOUND)
(appInfo.state, submitTime) match {
// Kyuubi should wait second if pod is not be created
@ -136,19 +181,15 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
}
override def stop(): Unit = {
Utils.tryLogNonFatalError {
if (enginePodInformer != null) {
enginePodInformer.stop()
enginePodInformer = null
}
enginePodInformers.asScala.foreach { case (_, informer) =>
Utils.tryLogNonFatalError(informer.stop())
}
enginePodInformers.clear()
Utils.tryLogNonFatalError {
if (kubernetesClient != null) {
kubernetesClient.close()
kubernetesClient = null
}
kubernetesClients.asScala.foreach { case (_, client) =>
Utils.tryLogNonFatalError(client.close())
}
kubernetesClients.clear()
if (cleanupTerminatedAppInfoTrigger != null) {
cleanupTerminatedAppInfoTrigger.cleanUp()

View File

@ -60,11 +60,11 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
super.stop()
}
def killApplication(resourceManager: Option[String], tag: String): KillResponse = {
def killApplication(appMgrInfo: ApplicationManagerInfo, tag: String): KillResponse = {
var (killed, lastMessage): KillResponse = (false, null)
for (operation <- operations if !killed) {
if (operation.isSupported(resourceManager)) {
val (k, m) = operation.killApplicationByTag(tag)
if (operation.isSupported(appMgrInfo)) {
val (k, m) = operation.killApplicationByTag(appMgrInfo, tag)
killed = k
lastMessage = m
}
@ -73,7 +73,7 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
val finalMessage =
if (lastMessage == null) {
s"No ${classOf[ApplicationOperation]} Service found in ServiceLoader" +
s" for $resourceManager"
s" for $appMgrInfo"
} else {
lastMessage
}
@ -81,12 +81,12 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
}
def getApplicationInfo(
clusterManager: Option[String],
appMgrInfo: ApplicationManagerInfo,
tag: String,
submitTime: Option[Long] = None): Option[ApplicationInfo] = {
val operation = operations.find(_.isSupported(clusterManager))
val operation = operations.find(_.isSupported(appMgrInfo))
operation match {
case Some(op) => Some(op.getApplicationInfoByTag(tag, submitTime))
case Some(op) => Some(op.getApplicationInfoByTag(appMgrInfo, tag, submitTime))
case None => None
}
}

View File

@ -341,6 +341,7 @@ trait ProcBuilder {
def clusterManager(): Option[String] = None
def appMgrInfo(): ApplicationManagerInfo = ApplicationManagerInfo(None)
}
object ProcBuilder extends Logging {

View File

@ -47,11 +47,14 @@ class YarnApplicationOperation extends ApplicationOperation with Logging {
info(s"Successfully initialized yarn client: ${c.getServiceState}")
}
override def isSupported(clusterManager: Option[String]): Boolean = {
yarnClient != null && clusterManager.exists(_.toLowerCase(Locale.ROOT).startsWith("yarn"))
override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {
yarnClient != null && appMgrInfo.resourceManager.exists(
_.toLowerCase(Locale.ROOT).startsWith("yarn"))
}
override def killApplicationByTag(tag: String): KillResponse = {
override def killApplicationByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String): KillResponse = {
if (yarnClient != null) {
try {
val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
@ -79,7 +82,10 @@ class YarnApplicationOperation extends ApplicationOperation with Logging {
}
}
override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]): ApplicationInfo = {
override def getApplicationInfoByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String,
submitTime: Option[Long]): ApplicationInfo = {
if (yarnClient != null) {
debug(s"Getting application info from Yarn cluster by $tag tag")
val reports = yarnClient.getApplications(null, null, Set(tag).asJava)

View File

@ -29,7 +29,7 @@ import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder}
import org.apache.kyuubi.engine.{ApplicationManagerInfo, KyuubiApplicationManager, ProcBuilder}
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._
import org.apache.kyuubi.operation.log.OperationLog
@ -73,6 +73,10 @@ class FlinkProcessBuilder(
}
}
override def appMgrInfo(): ApplicationManagerInfo = {
ApplicationManagerInfo(clusterManager())
}
override protected val commands: Array[String] = {
KyuubiApplicationManager.tagApplication(engineRefId, shortName, clusterManager(), conf)

View File

@ -77,4 +77,12 @@ class SparkBatchProcessBuilder(
override def clusterManager(): Option[String] = {
batchConf.get(MASTER_KEY).orElse(super.clusterManager())
}
override def kubernetesContext(): Option[String] = {
batchConf.get(KUBERNETES_CONTEXT_KEY).orElse(super.kubernetesContext())
}
override def kubernetesNamespace(): Option[String] = {
batchConf.get(KUBERNETES_NAMESPACE_KEY).orElse(super.kubernetesNamespace())
}
}

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder}
import org.apache.kyuubi.engine.{ApplicationManagerInfo, KyuubiApplicationManager, ProcBuilder}
import org.apache.kyuubi.engine.KubernetesApplicationOperation.{KUBERNETES_SERVICE_HOST, KUBERNETES_SERVICE_PORT}
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.AuthTypes
@ -183,26 +183,39 @@ class SparkProcessBuilder(
override def shortName: String = "spark"
protected lazy val defaultMaster: Option[String] = {
protected lazy val defaultsConf: Map[String, String] = {
val confDir = env.getOrElse(SPARK_CONF_DIR, s"$sparkHome${File.separator}conf")
val defaults =
try {
val confFile = new File(s"$confDir${File.separator}$SPARK_CONF_FILE_NAME")
if (confFile.exists()) {
Utils.getPropertiesFromFile(Some(confFile))
} else {
Map.empty[String, String]
}
} catch {
case _: Exception =>
warn(s"Failed to load spark configurations from $confDir")
Map.empty[String, String]
try {
val confFile = new File(s"$confDir${File.separator}$SPARK_CONF_FILE_NAME")
if (confFile.exists()) {
Utils.getPropertiesFromFile(Some(confFile))
} else {
Map.empty[String, String]
}
defaults.get(MASTER_KEY)
} catch {
case _: Exception =>
warn(s"Failed to load spark configurations from $confDir")
Map.empty[String, String]
}
}
override def appMgrInfo(): ApplicationManagerInfo = {
ApplicationManagerInfo(
clusterManager(),
kubernetesContext(),
kubernetesNamespace())
}
override def clusterManager(): Option[String] = {
conf.getOption(MASTER_KEY).orElse(defaultMaster)
conf.getOption(MASTER_KEY).orElse(defaultsConf.get(MASTER_KEY))
}
def kubernetesContext(): Option[String] = {
conf.getOption(KUBERNETES_CONTEXT_KEY).orElse(defaultsConf.get(KUBERNETES_CONTEXT_KEY))
}
def kubernetesNamespace(): Option[String] = {
conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY))
}
override def validateConf: Unit = Validator.validateConf(conf)
@ -224,6 +237,8 @@ object SparkProcessBuilder {
final val APP_KEY = "spark.app.name"
final val TAG_KEY = "spark.yarn.tags"
final val MASTER_KEY = "spark.master"
final val KUBERNETES_CONTEXT_KEY = "spark.kubernetes.context"
final val KUBERNETES_NAMESPACE_KEY = "spark.kubernetes.namespace"
final val INTERNAL_RESOURCE = "spark-internal"
/**

View File

@ -107,7 +107,7 @@ class BatchJobSubmission(
}
val applicationInfo =
applicationManager.getApplicationInfo(
builder.clusterManager(),
builder.appMgrInfo(),
batchId,
Some(_submitTime))
applicationId(applicationInfo).foreach { _ =>
@ -123,7 +123,7 @@ class BatchJobSubmission(
}
private[kyuubi] def killBatchApplication(): KillResponse = {
applicationManager.killApplication(builder.clusterManager(), batchId)
applicationManager.killApplication(builder.appMgrInfo(), batchId)
}
private val applicationCheckInterval =

View File

@ -40,7 +40,7 @@ import org.apache.kyuubi.client.exception.KyuubiRestException
import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys._
import org.apache.kyuubi.engine.{ApplicationInfo, KillResponse, KyuubiApplicationManager}
import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo, KillResponse, KyuubiApplicationManager}
import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation, OperationState}
import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.server.api.v1.BatchesResource._
@ -289,7 +289,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
case e: KyuubiRestException =>
error(s"Error redirecting get batch[$batchId] to ${metadata.kyuubiInstance}", e)
val batchAppStatus = sessionManager.applicationManager.getApplicationInfo(
metadata.clusterManager,
metadata.appMgrInfo,
batchId,
// prevent that the batch be marked as terminated if application state is NOT_FOUND
Some(metadata.engineOpenTime).filter(_ > 0).orElse(Some(System.currentTimeMillis)))
@ -407,9 +407,9 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
}
}
def forceKill(clusterManager: Option[String], batchId: String): KillResponse = {
def forceKill(appMgrInfo: ApplicationManagerInfo, batchId: String): KillResponse = {
val (killed, message) = sessionManager.applicationManager
.killApplication(clusterManager, batchId)
.killApplication(appMgrInfo, batchId)
info(s"Mark batch[$batchId] closed by ${fe.connectionUrl}")
sessionManager.updateMetadata(Metadata(identifier = batchId, peerInstanceClosed = true))
(killed, message)
@ -436,12 +436,12 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
} catch {
case e: KyuubiRestException =>
error(s"Error redirecting delete batch[$batchId] to ${metadata.kyuubiInstance}", e)
val (killed, msg) = forceKill(metadata.clusterManager, batchId)
val (killed, msg) = forceKill(metadata.appMgrInfo, batchId)
new CloseBatchResponse(killed, if (killed) msg else Utils.stringifyException(e))
}
} else { // should not happen, but handle this for safe
warn(s"Something wrong on deleting batch[$batchId], try forcibly killing application")
val (killed, msg) = forceKill(metadata.clusterManager, batchId)
val (killed, msg) = forceKill(metadata.appMgrInfo, batchId)
new CloseBatchResponse(killed, msg)
}
}.getOrElse {

View File

@ -17,6 +17,8 @@
package org.apache.kyuubi.server.metadata.api
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.ApplicationManagerInfo
import org.apache.kyuubi.session.SessionType.SessionType
/**
@ -73,4 +75,11 @@ case class Metadata(
engineState: String = null,
engineError: Option[String] = None,
endTime: Long = 0L,
peerInstanceClosed: Boolean = false)
peerInstanceClosed: Boolean = false) {
def appMgrInfo: ApplicationManagerInfo = {
ApplicationManagerInfo(
clusterManager,
requestConf.get(KyuubiConf.KUBERNETES_CONTEXT.key),
requestConf.get(KyuubiConf.KUBERNETES_NAMESPACE.key))
}
}

View File

@ -143,6 +143,12 @@ class KyuubiBatchSession(
traceMetricsOnOpen()
if (recoveryMetadata.isEmpty) {
val appMgrInfo = batchJobSubmissionOp.builder.appMgrInfo()
val kubernetesInfo = appMgrInfo.kubernetesInfo.context.map { context =>
Map(KyuubiConf.KUBERNETES_CONTEXT.key -> context)
}.getOrElse(Map.empty) ++ appMgrInfo.kubernetesInfo.namespace.map { namespace =>
Map(KyuubiConf.KUBERNETES_NAMESPACE.key -> namespace)
}.getOrElse(Map.empty)
val metaData = Metadata(
identifier = handle.identifier.toString,
sessionType = sessionType,
@ -154,7 +160,7 @@ class KyuubiBatchSession(
resource = resource,
className = className,
requestName = name.orNull,
requestConf = optimizedConf,
requestConf = optimizedConf ++ kubernetesInfo, // save the kubernetes info into request conf
requestArgs = batchArgs,
createTime = createTime,
engineType = batchType,

View File

@ -26,7 +26,7 @@ import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols.FrontendProtocol
import org.apache.kyuubi.engine.{ApplicationState, YarnApplicationOperation}
import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState, YarnApplicationOperation}
import org.apache.kyuubi.engine.ApplicationState._
import org.apache.kyuubi.operation.{FetchOrientation, HiveJDBCTestHelper, OperationState}
import org.apache.kyuubi.operation.OperationState.ERROR
@ -134,11 +134,15 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
assert(metadata.map(_.engineId).get.startsWith("application_"))
}
val killResponse = yarnOperation.killApplicationByTag(sessionHandle.identifier.toString)
val appMgrInfo = ApplicationManagerInfo(Some("yarn"))
val killResponse =
yarnOperation.killApplicationByTag(appMgrInfo, sessionHandle.identifier.toString)
assert(killResponse._1)
assert(killResponse._2 startsWith "Succeeded to terminate:")
val appInfo = yarnOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
val appInfo =
yarnOperation.getApplicationInfoByTag(appMgrInfo, sessionHandle.identifier.toString)
assert(appInfo.state === KILLED)

View File

@ -38,10 +38,10 @@ class JpsApplicationOperationSuite extends KyuubiFunSuite {
jps.initialize(null)
test("JpsApplicationOperation with jstat") {
assert(jps.isSupported(None))
assert(jps.isSupported(Some("local")))
assert(!jps.killApplicationByTag(null)._1)
assert(!jps.killApplicationByTag("have a space")._1)
assert(jps.isSupported(ApplicationManagerInfo(None)))
assert(jps.isSupported(ApplicationManagerInfo(Some("local"))))
assert(!jps.killApplicationByTag(ApplicationManagerInfo(None), null)._1)
assert(!jps.killApplicationByTag(ApplicationManagerInfo(None), "have a space")._1)
val currentProcess = ManagementFactory.getRuntimeMXBean.getName
val currentPid = currentProcess.splitAt(currentProcess.indexOf("@"))._1
@ -52,16 +52,16 @@ class JpsApplicationOperationSuite extends KyuubiFunSuite {
}.start()
eventually(Timeout(10.seconds)) {
val desc1 = jps.getApplicationInfoByTag("sun.tools.jstat.Jstat")
val desc1 = jps.getApplicationInfoByTag(ApplicationManagerInfo(None), "sun.tools.jstat.Jstat")
assert(desc1.id != null)
assert(desc1.name != null)
assert(desc1.state == ApplicationState.RUNNING)
}
jps.killApplicationByTag("sun.tools.jstat.Jstat")
jps.killApplicationByTag(ApplicationManagerInfo(None), "sun.tools.jstat.Jstat")
eventually(Timeout(10.seconds)) {
val desc2 = jps.getApplicationInfoByTag("sun.tools.jstat.Jstat")
val desc2 = jps.getApplicationInfoByTag(ApplicationManagerInfo(None), "sun.tools.jstat.Jstat")
assert(desc2.id == null)
assert(desc2.name == null)
assert(desc2.state == ApplicationState.NOT_FOUND)
@ -78,25 +78,25 @@ class JpsApplicationOperationSuite extends KyuubiFunSuite {
val builder = new SparkProcessBuilder(user, conf)
builder.start
assert(jps.isSupported(builder.clusterManager()))
assert(jps.isSupported(ApplicationManagerInfo(builder.clusterManager())))
eventually(Timeout(10.seconds)) {
val desc1 = jps.getApplicationInfoByTag(id)
val desc1 = jps.getApplicationInfoByTag(ApplicationManagerInfo(None), id)
assert(desc1.id != null)
assert(desc1.name != null)
assert(desc1.state == ApplicationState.RUNNING)
val response = jps.killApplicationByTag(id)
val response = jps.killApplicationByTag(ApplicationManagerInfo(None), id)
assert(response._1, response._2)
assert(response._2 startsWith "Succeeded to terminate:")
}
eventually(Timeout(10.seconds)) {
val desc2 = jps.getApplicationInfoByTag(id)
val desc2 = jps.getApplicationInfoByTag(ApplicationManagerInfo(None), id)
assert(desc2.id == null)
assert(desc2.name == null)
assert(desc2.state == ApplicationState.NOT_FOUND)
}
val response2 = jps.killApplicationByTag(id)
val response2 = jps.killApplicationByTag(ApplicationManagerInfo(None), id)
assert(!response2._1)
assert(response2._2 === ApplicationOperation.NOT_FOUND)
}

View File

@ -29,7 +29,7 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.{KYUUBI_VERSION, WithKyuubiServer}
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf.SESSION_CONF_ADVISOR
import org.apache.kyuubi.engine.ApplicationState
import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState}
import org.apache.kyuubi.jdbc.KyuubiHiveDriver
import org.apache.kyuubi.jdbc.hive.{KyuubiConnection, KyuubiSQLException}
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
@ -233,9 +233,11 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe
}
val engineId = sessionManager.allSessions().head.handle.identifier.toString
// kill the engine application and wait the engine terminate
sessionManager.applicationManager.killApplication(None, engineId)
sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None), engineId)
eventually(timeout(30.seconds), interval(100.milliseconds)) {
assert(sessionManager.applicationManager.getApplicationInfo(None, engineId)
assert(sessionManager.applicationManager.getApplicationInfo(
ApplicationManagerInfo(None),
engineId)
.exists(_.state == ApplicationState.NOT_FOUND))
}
assert(!conn.isValid(3000))

View File

@ -33,7 +33,7 @@ import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, RestFrontendTestHelper
import org.apache.kyuubi.client.api.v1.dto.{Engine, OperationData, ServerData, SessionData, SessionHandle, SessionOpenRequest}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY
import org.apache.kyuubi.engine.{ApplicationState, EngineRef, KyuubiApplicationManager}
import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState, EngineRef, KyuubiApplicationManager}
import org.apache.kyuubi.engine.EngineType.SPARK_SQL
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, USER}
import org.apache.kyuubi.ha.HighAvailabilityConf
@ -293,9 +293,10 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
}
// kill the engine application
engineMgr.killApplication(None, id)
engineMgr.killApplication(ApplicationManagerInfo(None), id)
eventually(timeout(30.seconds), interval(100.milliseconds)) {
assert(engineMgr.getApplicationInfo(None, id).exists(_.state == ApplicationState.NOT_FOUND))
assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id).exists(
_.state == ApplicationState.NOT_FOUND))
}
}
}
@ -339,9 +340,10 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
}
// kill the engine application
engineMgr.killApplication(None, id)
engineMgr.killApplication(ApplicationManagerInfo(None), id)
eventually(timeout(30.seconds), interval(100.milliseconds)) {
assert(engineMgr.getApplicationInfo(None, id).exists(_.state == ApplicationState.NOT_FOUND))
assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id).exists(
_.state == ApplicationState.NOT_FOUND))
}
}
}
@ -417,9 +419,10 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(engines(0).getSubdomain == "default")
// kill the engine application
engineMgr.killApplication(None, id)
engineMgr.killApplication(ApplicationManagerInfo(None), id)
eventually(timeout(30.seconds), interval(100.milliseconds)) {
assert(engineMgr.getApplicationInfo(None, id).exists(_.state == ApplicationState.NOT_FOUND))
assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id).exists(
_.state == ApplicationState.NOT_FOUND))
}
}
}
@ -463,9 +466,10 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(engines(0).getSubdomain == "default")
// kill the engine application
engineMgr.killApplication(None, id)
engineMgr.killApplication(ApplicationManagerInfo(None), id)
eventually(timeout(30.seconds), interval(100.milliseconds)) {
assert(engineMgr.getApplicationInfo(None, id).exists(_.state == ApplicationState.NOT_FOUND))
assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id).exists(
_.state == ApplicationState.NOT_FOUND))
}
}
}
@ -528,12 +532,12 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(result1.size == 1)
// kill the engine application
engineMgr.killApplication(None, id1)
engineMgr.killApplication(None, id2)
engineMgr.killApplication(ApplicationManagerInfo(None), id1)
engineMgr.killApplication(ApplicationManagerInfo(None), id2)
eventually(timeout(30.seconds), interval(100.milliseconds)) {
assert(engineMgr.getApplicationInfo(None, id1)
assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id1)
.exists(_.state == ApplicationState.NOT_FOUND))
assert(engineMgr.getApplicationInfo(None, id2)
assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id2)
.exists(_.state == ApplicationState.NOT_FOUND))
}
}

View File

@ -37,7 +37,7 @@ import org.apache.kyuubi.client.util.BatchUtils
import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.{ApplicationInfo, KyuubiApplicationManager}
import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo, KyuubiApplicationManager}
import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState}
@ -64,7 +64,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
}
sessionManager.getBatchesFromMetadataStore(null, null, null, 0, 0, 0, Int.MaxValue).foreach {
batch =>
sessionManager.applicationManager.killApplication(None, batch.getId)
sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None), batch.getId)
sessionManager.cleanupMetadata(batch.getId)
}
}
@ -481,7 +481,8 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
var applicationStatus: Option[ApplicationInfo] = None
eventually(timeout(5.seconds)) {
applicationStatus = sessionManager.applicationManager.getApplicationInfo(None, batchId2)
applicationStatus =
sessionManager.applicationManager.getApplicationInfo(ApplicationManagerInfo(None), batchId2)
assert(applicationStatus.isDefined)
}

View File

@ -32,6 +32,7 @@ import org.apache.kyuubi.{BatchTestHelper, RestClientTestHelper, Utils}
import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ctl.{CtlConf, TestPrematureExit}
import org.apache.kyuubi.engine.ApplicationManagerInfo
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.session.KyuubiSessionManager
@ -80,7 +81,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat
}
sessionManager.getBatchesFromMetadataStore(null, null, null, 0, 0, 0, Int.MaxValue).foreach {
batch =>
sessionManager.applicationManager.killApplication(None, batch.getId)
sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None), batch.getId)
sessionManager.cleanupMetadata(batch.getId)
}
}