From c2e27304feda75aa26b42e8512f5de7483104fae Mon Sep 17 00:00:00 2001 From: fwang12 Date: Mon, 26 Jun 2023 15:52:56 +0800 Subject: [PATCH] [KYUUBI #4843] Support multiple kubernetes contexts and namespaces ### _Why are the changes needed?_ Close #4843 Support to submit kyuubi engine/batch to multiple kubernetes contexts and namespaces. In this pr, the user can config the kubernetes conf for specified kubernetes context and namespace likes below. ``` kyuubi.kubernetes..master.address kyuubi.kubernetes...authenticate.oauthTokenFile ``` For example: ``` kyuubi.kubernetes.28.master.address=k8s://master kyuubi.kubernetes.28.ns1.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/token.ns1 kyuubi.kubernetes.28.ns2.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/token.ns2 ``` for k8s context=28, namespace=ns1, its kubernetes config is: ``` kyuubi.kubernetes.master.address=k8s://master kyuubi.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/token.ns1 ``` for k8s context=28, namespace=ns2, its kubernetes config is: ``` kyuubi.kubernetes.master.address=k8s://master kyuubi.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/token.ns2 ``` So that, kyuubi server can build kubernetes client for each context and namespace. ### _How was this patch tested?_ Existing kubernetes integration testing. Closes #4984 from turboFei/k8s_client_yaml. Closes #4843 f8ffaeeb9 [fwang12] nit d25774288 [fwang12] comments 5ae7c8433 [fwang12] save into request conf fd6c363db [fwang12] save ff004a529 [fwang12] procebuilder method 6b9520bfd [fwang12] save 58850387e [fwang12] save 98df67e5f [fwang12] ut da811697c [fwang12] fix aa568aaa4 [fwang12] save 89656f463 [fwang12] check init a0ef6894b [fwang12] code style 00abb6568 [fwang12] default namespace 295512987 [fwang12] k8s context namespace Authored-by: fwang12 Signed-off-by: fwang12 --- docs/deployment/settings.md | 2 + .../spark/SparkOnKubernetesTestsSuite.scala | 37 +++-- .../org/apache/kyuubi/config/KyuubiConf.scala | 44 ++++++ .../kyuubi/config/KyuubiConfSuite.scala | 21 +++ .../kyuubi/engine/ApplicationOperation.scala | 32 ++++- .../org/apache/kyuubi/engine/EngineRef.scala | 8 +- .../engine/JpsApplicationOperation.scala | 14 +- .../KubernetesApplicationOperation.scala | 135 ++++++++++++------ .../engine/KyuubiApplicationManager.scala | 14 +- .../apache/kyuubi/engine/ProcBuilder.scala | 1 + .../engine/YarnApplicationOperation.scala | 14 +- .../engine/flink/FlinkProcessBuilder.scala | 6 +- .../spark/SparkBatchProcessBuilder.scala | 8 ++ .../engine/spark/SparkProcessBuilder.scala | 47 +++--- .../kyuubi/operation/BatchJobSubmission.scala | 4 +- .../server/api/v1/BatchesResource.scala | 12 +- .../kyuubi/server/metadata/api/Metadata.scala | 11 +- .../kyuubi/session/KyuubiBatchSession.scala | 8 +- .../kyuubi/WithKyuubiServerOnYarn.scala | 10 +- .../engine/JpsApplicationOperationSuite.scala | 24 ++-- .../KyuubiOperationPerConnectionSuite.scala | 8 +- .../server/api/v1/AdminResourceSuite.scala | 30 ++-- .../server/api/v1/BatchesResourceSuite.scala | 7 +- .../server/rest/client/BatchCliSuite.scala | 3 +- 24 files changed, 359 insertions(+), 141 deletions(-) diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index 97ca5973d..ca29592e2 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -307,8 +307,10 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.kubernetes.authenticate.oauthToken | <undefined> | The OAuth token to use when authenticating against the Kubernetes API server. Note that unlike, the other authentication options, this must be the exact string value of the token to use for the authentication. | string | 1.7.0 | | kyuubi.kubernetes.authenticate.oauthTokenFile | <undefined> | Path to the file containing the OAuth token to use when authenticating against the Kubernetes API server. Specify this as a path as opposed to a URI (i.e. do not provide a scheme) | string | 1.7.0 | | kyuubi.kubernetes.context | <undefined> | The desired context from your kubernetes config file used to configure the K8s client for interacting with the cluster. | string | 1.6.0 | +| kyuubi.kubernetes.context.allow.list || The allowed kubernetes context list, if it is empty, there is no kubernetes context limitation. | seq | 1.8.0 | | kyuubi.kubernetes.master.address | <undefined> | The internal Kubernetes master (API server) address to be used for kyuubi. | string | 1.7.0 | | kyuubi.kubernetes.namespace | default | The namespace that will be used for running the kyuubi pods and find engines. | string | 1.7.0 | +| kyuubi.kubernetes.namespace.allow.list || The allowed kubernetes namespace list, if it is empty, there is no kubernetes namespace limitation. | seq | 1.8.0 | | kyuubi.kubernetes.terminatedApplicationRetainPeriod | PT5M | The period for which the Kyuubi server retains application information after the application terminates. | duration | 1.7.1 | | kyuubi.kubernetes.trust.certificates | false | If set to true then client can submit to kubernetes cluster only with token | boolean | 1.7.0 | diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala index 74090bc40..6345a05f1 100644 --- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala +++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala @@ -29,7 +29,7 @@ import org.apache.kyuubi._ import org.apache.kyuubi.client.util.BatchUtils._ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_HOST -import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationOperation, KubernetesApplicationOperation} +import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo, ApplicationOperation, KubernetesApplicationOperation} import org.apache.kyuubi.engine.ApplicationState.{FAILED, NOT_FOUND, RUNNING} import org.apache.kyuubi.engine.spark.SparkProcessBuilder import org.apache.kyuubi.kubernetes.test.MiniKube @@ -44,6 +44,9 @@ abstract class SparkOnKubernetesSuiteBase MiniKube.getKubernetesClient.getMasterUrl.toString } + protected val appMgrInfo = + ApplicationManagerInfo(Some(s"k8s://$apiServerAddress"), Some("minikube"), None) + protected def sparkOnK8sConf: KyuubiConf = { // TODO Support more Spark version // Spark official docker image: https://hub.docker.com/r/apache/spark/tags @@ -150,20 +153,28 @@ class KyuubiOperationKubernetesClusterClientModeSuite batchRequest) eventually(timeout(3.minutes), interval(50.milliseconds)) { - val state = k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString) + val state = k8sOperation.getApplicationInfoByTag( + appMgrInfo, + sessionHandle.identifier.toString) assert(state.id != null) assert(state.name != null) assert(state.state == RUNNING) } - val killResponse = k8sOperation.killApplicationByTag(sessionHandle.identifier.toString) + val killResponse = k8sOperation.killApplicationByTag( + appMgrInfo, + sessionHandle.identifier.toString) assert(killResponse._1) assert(killResponse._2 startsWith "Succeeded to terminate:") - val appInfo = k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString) + val appInfo = k8sOperation.getApplicationInfoByTag( + appMgrInfo, + sessionHandle.identifier.toString) assert(appInfo == ApplicationInfo(null, null, NOT_FOUND)) - val failKillResponse = k8sOperation.killApplicationByTag(sessionHandle.identifier.toString) + val failKillResponse = k8sOperation.killApplicationByTag( + appMgrInfo, + sessionHandle.identifier.toString) assert(!failKillResponse._1) assert(failKillResponse._2 === ApplicationOperation.NOT_FOUND) } @@ -212,24 +223,32 @@ class KyuubiOperationKubernetesClusterClusterModeSuite // wait for driver pod start eventually(timeout(3.minutes), interval(5.second)) { // trigger k8sOperation init here - val appInfo = k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString) + val appInfo = k8sOperation.getApplicationInfoByTag( + appMgrInfo, + sessionHandle.identifier.toString) assert(appInfo.state == RUNNING) assert(appInfo.name.startsWith(driverPodNamePrefix)) } - val killResponse = k8sOperation.killApplicationByTag(sessionHandle.identifier.toString) + val killResponse = k8sOperation.killApplicationByTag( + appMgrInfo, + sessionHandle.identifier.toString) assert(killResponse._1) assert(killResponse._2 endsWith "is completed") assert(killResponse._2 contains sessionHandle.identifier.toString) eventually(timeout(3.minutes), interval(50.milliseconds)) { - val appInfo = k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString) + val appInfo = k8sOperation.getApplicationInfoByTag( + appMgrInfo, + sessionHandle.identifier.toString) // We may kill engine start but not ready // An EOF Error occurred when the driver was starting assert(appInfo.state == FAILED || appInfo.state == NOT_FOUND) } - val failKillResponse = k8sOperation.killApplicationByTag(sessionHandle.identifier.toString) + val failKillResponse = k8sOperation.killApplicationByTag( + appMgrInfo, + sessionHandle.identifier.toString) assert(!failKillResponse._1) } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index d51a5f5c3..669f72da0 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -134,6 +134,31 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging { getAllWithPrefix(s"$KYUUBI_BATCH_CONF_PREFIX.$normalizedBatchType", "") } + /** Get the kubernetes conf for specified kubernetes context and namespace. */ + def getKubernetesConf(context: Option[String], namespace: Option[String]): KyuubiConf = { + val conf = this.clone + context.foreach { c => + val contextConf = + getAllWithPrefix(s"$KYUUBI_KUBERNETES_CONF_PREFIX.$c", "").map { case (suffix, value) => + s"$KYUUBI_KUBERNETES_CONF_PREFIX.$suffix" -> value + } + val contextNamespaceConf = namespace.map { ns => + getAllWithPrefix(s"$KYUUBI_KUBERNETES_CONF_PREFIX.$c.$ns", "").map { + case (suffix, value) => + s"$KYUUBI_KUBERNETES_CONF_PREFIX.$suffix" -> value + } + }.getOrElse(Map.empty) + + (contextConf ++ contextNamespaceConf).map { case (key, value) => + conf.set(key, value) + } + conf.set(KUBERNETES_CONTEXT, c) + namespace.foreach(ns => conf.set(KUBERNETES_NAMESPACE, ns)) + conf + } + conf + } + /** * Retrieve key-value pairs from [[KyuubiConf]] starting with `dropped.remainder`, and put them to * the result map with the `dropped` of key being dropped. @@ -207,6 +232,7 @@ object KyuubiConf { final val KYUUBI_HOME = "KYUUBI_HOME" final val KYUUBI_ENGINE_ENV_PREFIX = "kyuubi.engineEnv" final val KYUUBI_BATCH_CONF_PREFIX = "kyuubi.batchConf" + final val KYUUBI_KUBERNETES_CONF_PREFIX = "kyuubi.kubernetes" final val USER_DEFAULTS_CONF_QUOTE = "___" private[this] val kyuubiConfEntriesUpdateLock = new Object @@ -1106,6 +1132,15 @@ object KyuubiConf { .stringConf .createOptional + val KUBERNETES_CONTEXT_ALLOW_LIST: ConfigEntry[Seq[String]] = + buildConf("kyuubi.kubernetes.context.allow.list") + .doc("The allowed kubernetes context list, if it is empty," + + " there is no kubernetes context limitation.") + .version("1.8.0") + .stringConf + .toSequence() + .createWithDefault(Nil) + val KUBERNETES_NAMESPACE: ConfigEntry[String] = buildConf("kyuubi.kubernetes.namespace") .doc("The namespace that will be used for running the kyuubi pods and find engines.") @@ -1113,6 +1148,15 @@ object KyuubiConf { .stringConf .createWithDefault("default") + val KUBERNETES_NAMESPACE_ALLOW_LIST: ConfigEntry[Seq[String]] = + buildConf("kyuubi.kubernetes.namespace.allow.list") + .doc("The allowed kubernetes namespace list, if it is empty," + + " there is no kubernetes namespace limitation.") + .version("1.8.0") + .stringConf + .toSequence() + .createWithDefault(Nil) + val KUBERNETES_MASTER: OptionalConfigEntry[String] = buildConf("kyuubi.kubernetes.master.address") .doc("The internal Kubernetes master (API server) address to be used for kyuubi.") diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala index f05e15d8a..39e68f0ec 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala @@ -200,4 +200,25 @@ class KyuubiConfSuite extends KyuubiFunSuite { assertResult(kSeq(1))("kyuubi.efg") assertResult(kSeq(2))("kyuubi.xyz") } + + test("KYUUBI #4843 - Support multiple kubernetes contexts and namespaces") { + val kyuubiConf = KyuubiConf(false) + kyuubiConf.set("kyuubi.kubernetes.28.master.address", "k8s://master") + kyuubiConf.set( + "kyuubi.kubernetes.28.ns1.authenticate.oauthTokenFile", + "/var/run/secrets/kubernetes.io/token.ns1") + kyuubiConf.set( + "kyuubi.kubernetes.28.ns2.authenticate.oauthTokenFile", + "/var/run/secrets/kubernetes.io/token.ns2") + + val kubernetesConf1 = kyuubiConf.getKubernetesConf(Some("28"), Some("ns1")) + assert(kubernetesConf1.get(KyuubiConf.KUBERNETES_MASTER) == Some("k8s://master")) + assert(kubernetesConf1.get(KyuubiConf.KUBERNETES_AUTHENTICATE_OAUTH_TOKEN_FILE) == + Some("/var/run/secrets/kubernetes.io/token.ns1")) + + val kubernetesConf2 = kyuubiConf.getKubernetesConf(Some("28"), Some("ns2")) + assert(kubernetesConf2.get(KyuubiConf.KUBERNETES_MASTER) == Some("k8s://master")) + assert(kubernetesConf2.get(KyuubiConf.KUBERNETES_AUTHENTICATE_OAUTH_TOKEN_FILE) == + Some("/var/run/secrets/kubernetes.io/token.ns2")) + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala index a2b3d0f76..2acce39cc 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala @@ -35,13 +35,14 @@ trait ApplicationOperation { /** * Called before other method to do a quick skip * - * @param clusterManager the underlying cluster manager or just local instance + * @param appMgrInfo the application manager information */ - def isSupported(clusterManager: Option[String]): Boolean + def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean /** * Kill the app/engine by the unique application tag * + * @param appMgrInfo the application manager information * @param tag the unique application tag for engine instance. * For example, * if the Hadoop Yarn is used, for spark applications, @@ -50,16 +51,20 @@ trait ApplicationOperation { * * @note For implementations, please suppress exceptions and always return KillResponse */ - def killApplicationByTag(tag: String): KillResponse + def killApplicationByTag(appMgrInfo: ApplicationManagerInfo, tag: String): KillResponse /** * Get the engine/application status by the unique application tag * + * @param appMgrInfo the application manager information * @param tag the unique application tag for engine instance. * @param submitTime engine submit to resourceManager time * @return [[ApplicationInfo]] */ - def getApplicationInfoByTag(tag: String, submitTime: Option[Long] = None): ApplicationInfo + def getApplicationInfoByTag( + appMgrInfo: ApplicationManagerInfo, + tag: String, + submitTime: Option[Long] = None): ApplicationInfo } object ApplicationState extends Enumeration { @@ -108,3 +113,22 @@ object ApplicationInfo { object ApplicationOperation { val NOT_FOUND = "APPLICATION_NOT_FOUND" } + +case class KubernetesInfo(context: Option[String] = None, namespace: Option[String] = None) + +case class ApplicationManagerInfo( + resourceManager: Option[String], + kubernetesInfo: KubernetesInfo = KubernetesInfo()) + +object ApplicationManagerInfo { + final val DEFAULT_KUBERNETES_NAMESPACE = "default" + + def apply( + resourceManager: Option[String], + kubernetesContext: Option[String], + kubernetesNamespace: Option[String]): ApplicationManagerInfo = { + new ApplicationManagerInfo( + resourceManager, + KubernetesInfo(kubernetesContext, kubernetesNamespace)) + } +} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index 227cdd6c8..387758714 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -223,7 +223,7 @@ private[kyuubi] class EngineRef( } if (started + timeout <= System.currentTimeMillis()) { - val killMessage = engineManager.killApplication(builder.clusterManager(), engineRefId) + val killMessage = engineManager.killApplication(builder.appMgrInfo(), engineRefId) process.destroyForcibly() MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser))) throw KyuubiSQLException( @@ -242,7 +242,7 @@ private[kyuubi] class EngineRef( } val applicationInfo = engineMgr.getApplicationInfo( - builder.clusterManager(), + builder.appMgrInfo(), engineRefId, Some(started)) @@ -291,9 +291,9 @@ private[kyuubi] class EngineRef( def close(): Unit = { if (shareLevel == CONNECTION && builder != null) { try { - val clusterManager = builder.clusterManager() + val appMgrInfo = builder.appMgrInfo() builder.close(true) - engineManager.killApplication(clusterManager, engineRefId) + engineManager.killApplication(appMgrInfo, engineRefId) } catch { case e: Exception => warn(s"Error closing engine builder, engineRefId: $engineRefId", e) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala index ce2e05461..64dacbb64 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala @@ -41,8 +41,9 @@ class JpsApplicationOperation extends ApplicationOperation { } } - override def isSupported(clusterManager: Option[String]): Boolean = { - runner != null && (clusterManager.isEmpty || clusterManager.get == "local") + override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = { + runner != null && + (appMgrInfo.resourceManager.isEmpty || appMgrInfo.resourceManager.get == "local") } private def getEngine(tag: String): Option[String] = { @@ -80,11 +81,16 @@ class JpsApplicationOperation extends ApplicationOperation { } } - override def killApplicationByTag(tag: String): KillResponse = { + override def killApplicationByTag( + appMgrInfo: ApplicationManagerInfo, + tag: String): KillResponse = { killJpsApplicationByTag(tag, true) } - override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]): ApplicationInfo = { + override def getApplicationInfoByTag( + appMgrInfo: ApplicationManagerInfo, + tag: String, + submitTime: Option[Long]): ApplicationInfo = { val commandOption = getEngine(tag) if (commandOption.nonEmpty) { val idAndCmd = commandOption.get diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index 0bd3127cb..d6dfba2fe 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -20,12 +20,14 @@ package org.apache.kyuubi.engine import java.util.Locale import java.util.concurrent.{ConcurrentHashMap, TimeUnit} +import scala.collection.JavaConverters._ + import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification} import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.informers.{ResourceEventHandler, SharedIndexInformer} -import org.apache.kyuubi.{Logging, Utils} +import org.apache.kyuubi.{KyuubiException, Logging, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN} import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState, LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL} @@ -33,10 +35,16 @@ import org.apache.kyuubi.util.KubernetesUtils class KubernetesApplicationOperation extends ApplicationOperation with Logging { - @volatile - private var kubernetesClient: KubernetesClient = _ - private var enginePodInformer: SharedIndexInformer[Pod] = _ + private val kubernetesClients: ConcurrentHashMap[KubernetesInfo, KubernetesClient] = + new ConcurrentHashMap[KubernetesInfo, KubernetesClient] + private val enginePodInformers: ConcurrentHashMap[KubernetesInfo, SharedIndexInformer[Pod]] = + new ConcurrentHashMap[KubernetesInfo, SharedIndexInformer[Pod]] + + private var allowedContexts: Seq[String] = Seq.empty + private var allowedNamespaces: Seq[String] = Seq.empty + private var submitTimeout: Long = _ + private var kyuubiConf: KyuubiConf = _ // key is kyuubi_unique_key private val appInfoStore: ConcurrentHashMap[String, ApplicationInfo] = @@ -44,45 +52,74 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { // key is kyuubi_unique_key private var cleanupTerminatedAppInfoTrigger: Cache[String, ApplicationState] = _ - override def initialize(conf: KyuubiConf): Unit = { - info("Start initializing Kubernetes Client.") - kubernetesClient = KubernetesUtils.buildKubernetesClient(conf) match { + private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = { + val context = kubernetesInfo.context + val namespace = kubernetesInfo.namespace + + if (allowedContexts.nonEmpty && !allowedContexts.contains(context)) { + throw new KyuubiException( + s"Kubernetes context $context is not in the allowed list[$allowedContexts]") + } + + if (allowedNamespaces.nonEmpty && !allowedNamespaces.contains(namespace)) { + throw new KyuubiException( + s"Kubernetes namespace $namespace is not in the allowed list[$allowedNamespaces]") + } + + kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => buildKubernetesClient(kInfo)) + } + + private def buildKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = { + val kubernetesConf = + kyuubiConf.getKubernetesConf(kubernetesInfo.context, kubernetesInfo.namespace) + KubernetesUtils.buildKubernetesClient(kubernetesConf) match { case Some(client) => - info(s"Initialized Kubernetes Client connect to: ${client.getMasterUrl}") - submitTimeout = conf.get(KyuubiConf.ENGINE_KUBERNETES_SUBMIT_TIMEOUT) - // Disable resync, see https://github.com/fabric8io/kubernetes-client/discussions/5015 - enginePodInformer = client.pods() + info(s"[$kubernetesInfo] Initialized Kubernetes Client connect to: ${client.getMasterUrl}") + val enginePodInformer = client.pods() .withLabel(LABEL_KYUUBI_UNIQUE_KEY) .inform(new SparkEnginePodEventHandler) - info("Start Kubernetes Client Informer.") - // Defer cleaning terminated application information - val retainPeriod = conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD) - cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder() - .expireAfterWrite(retainPeriod, TimeUnit.MILLISECONDS) - .removalListener((notification: RemovalNotification[String, ApplicationState]) => { - Option(appInfoStore.remove(notification.getKey)).foreach { removed => - info(s"Remove terminated application ${removed.id} with " + - s"tag ${notification.getKey} and state ${removed.state}") - } - }) - .build() + info(s"[$kubernetesInfo] Start Kubernetes Client Informer.") + enginePodInformers.put(kubernetesInfo, enginePodInformer) client - case None => - warn("Fail to init Kubernetes Client for Kubernetes Application Operation") - null + + case None => throw new KyuubiException(s"Fail to build Kubernetes client for $kubernetesInfo") } } - override def isSupported(clusterManager: Option[String]): Boolean = { - // TODO add deploy mode to check whether is supported - kubernetesClient != null && clusterManager.exists(_.toLowerCase(Locale.ROOT).startsWith("k8s")) + override def initialize(conf: KyuubiConf): Unit = { + kyuubiConf = conf + info("Start initializing Kubernetes application operation.") + submitTimeout = conf.get(KyuubiConf.ENGINE_KUBERNETES_SUBMIT_TIMEOUT) + allowedContexts = conf.get(KyuubiConf.KUBERNETES_CONTEXT_ALLOW_LIST) + allowedNamespaces = conf.get(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST) + // Defer cleaning terminated application information + val retainPeriod = conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD) + cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder() + .expireAfterWrite(retainPeriod, TimeUnit.MILLISECONDS) + .removalListener((notification: RemovalNotification[String, ApplicationState]) => { + Option(appInfoStore.remove(notification.getKey)).foreach { removed => + info(s"Remove terminated application ${removed.id} with " + + s"tag ${notification.getKey} and state ${removed.state}") + } + }) + .build() } - override def killApplicationByTag(tag: String): KillResponse = { - if (kubernetesClient == null) { + override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = { + // TODO add deploy mode to check whether is supported + kyuubiConf != null && + appMgrInfo.resourceManager.exists(_.toLowerCase(Locale.ROOT).startsWith("k8s")) + } + + override def killApplicationByTag( + appMgrInfo: ApplicationManagerInfo, + tag: String): KillResponse = { + if (kyuubiConf == null) { throw new IllegalStateException("Methods initialize and isSupported must be called ahead") } - debug(s"Deleting application info from Kubernetes cluster by $tag tag") + val kubernetesInfo = appMgrInfo.kubernetesInfo + val kubernetesClient = getOrCreateKubernetesClient(kubernetesInfo) + debug(s"[$kubernetesInfo] Deleting application info from Kubernetes cluster by $tag tag") try { val info = appInfoStore.getOrDefault(tag, ApplicationInfo.NOT_FOUND) debug(s"Application info[tag: $tag] is in ${info.state}") @@ -90,24 +127,32 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { case NOT_FOUND | FAILED | UNKNOWN => ( false, - s"Target application[tag: $tag] is in ${info.state} status") + s"[$kubernetesInfo] Target application[tag: $tag] is in ${info.state} status") case _ => ( !kubernetesClient.pods.withName(info.name).delete().isEmpty, - s"Operation of deleted application[appId: ${info.id} ,tag: $tag] is completed") + s"[$kubernetesInfo] Operation of deleted" + + s" application[appId: ${info.id} ,tag: $tag] is completed") } } catch { case e: Exception => - (false, s"Failed to terminate application with $tag, due to ${e.getMessage}") + ( + false, + s"[$kubernetesInfo] Failed to terminate application with $tag, due to ${e.getMessage}") } } - override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]): ApplicationInfo = { - if (kubernetesClient == null) { + override def getApplicationInfoByTag( + appMgrInfo: ApplicationManagerInfo, + tag: String, + submitTime: Option[Long]): ApplicationInfo = { + if (kyuubiConf == null) { throw new IllegalStateException("Methods initialize and isSupported must be called ahead") } debug(s"Getting application info from Kubernetes cluster by $tag tag") try { + // need to initialize the kubernetes client if not exists + getOrCreateKubernetesClient(appMgrInfo.kubernetesInfo) val appInfo = appInfoStore.getOrDefault(tag, ApplicationInfo.NOT_FOUND) (appInfo.state, submitTime) match { // Kyuubi should wait second if pod is not be created @@ -136,19 +181,15 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { } override def stop(): Unit = { - Utils.tryLogNonFatalError { - if (enginePodInformer != null) { - enginePodInformer.stop() - enginePodInformer = null - } + enginePodInformers.asScala.foreach { case (_, informer) => + Utils.tryLogNonFatalError(informer.stop()) } + enginePodInformers.clear() - Utils.tryLogNonFatalError { - if (kubernetesClient != null) { - kubernetesClient.close() - kubernetesClient = null - } + kubernetesClients.asScala.foreach { case (_, client) => + Utils.tryLogNonFatalError(client.close()) } + kubernetesClients.clear() if (cleanupTerminatedAppInfoTrigger != null) { cleanupTerminatedAppInfoTrigger.cleanUp() diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala index ac7225dd8..4e121d297 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala @@ -60,11 +60,11 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager super.stop() } - def killApplication(resourceManager: Option[String], tag: String): KillResponse = { + def killApplication(appMgrInfo: ApplicationManagerInfo, tag: String): KillResponse = { var (killed, lastMessage): KillResponse = (false, null) for (operation <- operations if !killed) { - if (operation.isSupported(resourceManager)) { - val (k, m) = operation.killApplicationByTag(tag) + if (operation.isSupported(appMgrInfo)) { + val (k, m) = operation.killApplicationByTag(appMgrInfo, tag) killed = k lastMessage = m } @@ -73,7 +73,7 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager val finalMessage = if (lastMessage == null) { s"No ${classOf[ApplicationOperation]} Service found in ServiceLoader" + - s" for $resourceManager" + s" for $appMgrInfo" } else { lastMessage } @@ -81,12 +81,12 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager } def getApplicationInfo( - clusterManager: Option[String], + appMgrInfo: ApplicationManagerInfo, tag: String, submitTime: Option[Long] = None): Option[ApplicationInfo] = { - val operation = operations.find(_.isSupported(clusterManager)) + val operation = operations.find(_.isSupported(appMgrInfo)) operation match { - case Some(op) => Some(op.getApplicationInfoByTag(tag, submitTime)) + case Some(op) => Some(op.getApplicationInfoByTag(appMgrInfo, tag, submitTime)) case None => None } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala index 4c7330b4d..d30e72674 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala @@ -341,6 +341,7 @@ trait ProcBuilder { def clusterManager(): Option[String] = None + def appMgrInfo(): ApplicationManagerInfo = ApplicationManagerInfo(None) } object ProcBuilder extends Logging { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala index 1f06484fc..d87fc406a 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala @@ -47,11 +47,14 @@ class YarnApplicationOperation extends ApplicationOperation with Logging { info(s"Successfully initialized yarn client: ${c.getServiceState}") } - override def isSupported(clusterManager: Option[String]): Boolean = { - yarnClient != null && clusterManager.exists(_.toLowerCase(Locale.ROOT).startsWith("yarn")) + override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = { + yarnClient != null && appMgrInfo.resourceManager.exists( + _.toLowerCase(Locale.ROOT).startsWith("yarn")) } - override def killApplicationByTag(tag: String): KillResponse = { + override def killApplicationByTag( + appMgrInfo: ApplicationManagerInfo, + tag: String): KillResponse = { if (yarnClient != null) { try { val reports = yarnClient.getApplications(null, null, Set(tag).asJava) @@ -79,7 +82,10 @@ class YarnApplicationOperation extends ApplicationOperation with Logging { } } - override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]): ApplicationInfo = { + override def getApplicationInfoByTag( + appMgrInfo: ApplicationManagerInfo, + tag: String, + submitTime: Option[Long]): ApplicationInfo = { if (yarnClient != null) { debug(s"Getting application info from Yarn cluster by $tag tag") val reports = yarnClient.getApplications(null, null, Set(tag).asJava) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala index d8d46e427..3da8d1b1a 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala @@ -29,7 +29,7 @@ import org.apache.kyuubi._ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY -import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder} +import org.apache.kyuubi.engine.{ApplicationManagerInfo, KyuubiApplicationManager, ProcBuilder} import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._ import org.apache.kyuubi.operation.log.OperationLog @@ -73,6 +73,10 @@ class FlinkProcessBuilder( } } + override def appMgrInfo(): ApplicationManagerInfo = { + ApplicationManagerInfo(clusterManager()) + } + override protected val commands: Array[String] = { KyuubiApplicationManager.tagApplication(engineRefId, shortName, clusterManager(), conf) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala index 4a613278d..0d20068de 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala @@ -77,4 +77,12 @@ class SparkBatchProcessBuilder( override def clusterManager(): Option[String] = { batchConf.get(MASTER_KEY).orElse(super.clusterManager()) } + + override def kubernetesContext(): Option[String] = { + batchConf.get(KUBERNETES_CONTEXT_KEY).orElse(super.kubernetesContext()) + } + + override def kubernetesNamespace(): Option[String] = { + batchConf.get(KUBERNETES_NAMESPACE_KEY).orElse(super.kubernetesNamespace()) + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index b74eab77d..6110c0246 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.kyuubi._ import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder} +import org.apache.kyuubi.engine.{ApplicationManagerInfo, KyuubiApplicationManager, ProcBuilder} import org.apache.kyuubi.engine.KubernetesApplicationOperation.{KUBERNETES_SERVICE_HOST, KUBERNETES_SERVICE_PORT} import org.apache.kyuubi.ha.HighAvailabilityConf import org.apache.kyuubi.ha.client.AuthTypes @@ -183,26 +183,39 @@ class SparkProcessBuilder( override def shortName: String = "spark" - protected lazy val defaultMaster: Option[String] = { + protected lazy val defaultsConf: Map[String, String] = { val confDir = env.getOrElse(SPARK_CONF_DIR, s"$sparkHome${File.separator}conf") - val defaults = - try { - val confFile = new File(s"$confDir${File.separator}$SPARK_CONF_FILE_NAME") - if (confFile.exists()) { - Utils.getPropertiesFromFile(Some(confFile)) - } else { - Map.empty[String, String] - } - } catch { - case _: Exception => - warn(s"Failed to load spark configurations from $confDir") - Map.empty[String, String] + try { + val confFile = new File(s"$confDir${File.separator}$SPARK_CONF_FILE_NAME") + if (confFile.exists()) { + Utils.getPropertiesFromFile(Some(confFile)) + } else { + Map.empty[String, String] } - defaults.get(MASTER_KEY) + } catch { + case _: Exception => + warn(s"Failed to load spark configurations from $confDir") + Map.empty[String, String] + } + } + + override def appMgrInfo(): ApplicationManagerInfo = { + ApplicationManagerInfo( + clusterManager(), + kubernetesContext(), + kubernetesNamespace()) } override def clusterManager(): Option[String] = { - conf.getOption(MASTER_KEY).orElse(defaultMaster) + conf.getOption(MASTER_KEY).orElse(defaultsConf.get(MASTER_KEY)) + } + + def kubernetesContext(): Option[String] = { + conf.getOption(KUBERNETES_CONTEXT_KEY).orElse(defaultsConf.get(KUBERNETES_CONTEXT_KEY)) + } + + def kubernetesNamespace(): Option[String] = { + conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY)) } override def validateConf: Unit = Validator.validateConf(conf) @@ -224,6 +237,8 @@ object SparkProcessBuilder { final val APP_KEY = "spark.app.name" final val TAG_KEY = "spark.yarn.tags" final val MASTER_KEY = "spark.master" + final val KUBERNETES_CONTEXT_KEY = "spark.kubernetes.context" + final val KUBERNETES_NAMESPACE_KEY = "spark.kubernetes.namespace" final val INTERNAL_RESOURCE = "spark-internal" /** diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index ab2cfa302..82562541d 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -107,7 +107,7 @@ class BatchJobSubmission( } val applicationInfo = applicationManager.getApplicationInfo( - builder.clusterManager(), + builder.appMgrInfo(), batchId, Some(_submitTime)) applicationId(applicationInfo).foreach { _ => @@ -123,7 +123,7 @@ class BatchJobSubmission( } private[kyuubi] def killBatchApplication(): KillResponse = { - applicationManager.killApplication(builder.clusterManager(), batchId) + applicationManager.killApplication(builder.appMgrInfo(), batchId) } private val applicationCheckInterval = diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index cb0c63be0..ba043f071 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -40,7 +40,7 @@ import org.apache.kyuubi.client.exception.KyuubiRestException import org.apache.kyuubi.client.util.BatchUtils._ import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiReservedKeys._ -import org.apache.kyuubi.engine.{ApplicationInfo, KillResponse, KyuubiApplicationManager} +import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo, KillResponse, KyuubiApplicationManager} import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation, OperationState} import org.apache.kyuubi.server.api.ApiRequestContext import org.apache.kyuubi.server.api.v1.BatchesResource._ @@ -289,7 +289,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { case e: KyuubiRestException => error(s"Error redirecting get batch[$batchId] to ${metadata.kyuubiInstance}", e) val batchAppStatus = sessionManager.applicationManager.getApplicationInfo( - metadata.clusterManager, + metadata.appMgrInfo, batchId, // prevent that the batch be marked as terminated if application state is NOT_FOUND Some(metadata.engineOpenTime).filter(_ > 0).orElse(Some(System.currentTimeMillis))) @@ -407,9 +407,9 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { } } - def forceKill(clusterManager: Option[String], batchId: String): KillResponse = { + def forceKill(appMgrInfo: ApplicationManagerInfo, batchId: String): KillResponse = { val (killed, message) = sessionManager.applicationManager - .killApplication(clusterManager, batchId) + .killApplication(appMgrInfo, batchId) info(s"Mark batch[$batchId] closed by ${fe.connectionUrl}") sessionManager.updateMetadata(Metadata(identifier = batchId, peerInstanceClosed = true)) (killed, message) @@ -436,12 +436,12 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { } catch { case e: KyuubiRestException => error(s"Error redirecting delete batch[$batchId] to ${metadata.kyuubiInstance}", e) - val (killed, msg) = forceKill(metadata.clusterManager, batchId) + val (killed, msg) = forceKill(metadata.appMgrInfo, batchId) new CloseBatchResponse(killed, if (killed) msg else Utils.stringifyException(e)) } } else { // should not happen, but handle this for safe warn(s"Something wrong on deleting batch[$batchId], try forcibly killing application") - val (killed, msg) = forceKill(metadata.clusterManager, batchId) + val (killed, msg) = forceKill(metadata.appMgrInfo, batchId) new CloseBatchResponse(killed, msg) } }.getOrElse { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala index 949e88abd..12759f8cc 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala @@ -17,6 +17,8 @@ package org.apache.kyuubi.server.metadata.api +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.engine.ApplicationManagerInfo import org.apache.kyuubi.session.SessionType.SessionType /** @@ -73,4 +75,11 @@ case class Metadata( engineState: String = null, engineError: Option[String] = None, endTime: Long = 0L, - peerInstanceClosed: Boolean = false) + peerInstanceClosed: Boolean = false) { + def appMgrInfo: ApplicationManagerInfo = { + ApplicationManagerInfo( + clusterManager, + requestConf.get(KyuubiConf.KUBERNETES_CONTEXT.key), + requestConf.get(KyuubiConf.KUBERNETES_NAMESPACE.key)) + } +} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala index ded4c8bf4..014bbced3 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala @@ -143,6 +143,12 @@ class KyuubiBatchSession( traceMetricsOnOpen() if (recoveryMetadata.isEmpty) { + val appMgrInfo = batchJobSubmissionOp.builder.appMgrInfo() + val kubernetesInfo = appMgrInfo.kubernetesInfo.context.map { context => + Map(KyuubiConf.KUBERNETES_CONTEXT.key -> context) + }.getOrElse(Map.empty) ++ appMgrInfo.kubernetesInfo.namespace.map { namespace => + Map(KyuubiConf.KUBERNETES_NAMESPACE.key -> namespace) + }.getOrElse(Map.empty) val metaData = Metadata( identifier = handle.identifier.toString, sessionType = sessionType, @@ -154,7 +160,7 @@ class KyuubiBatchSession( resource = resource, className = className, requestName = name.orNull, - requestConf = optimizedConf, + requestConf = optimizedConf ++ kubernetesInfo, // save the kubernetes info into request conf requestArgs = batchArgs, createTime = createTime, engineType = batchType, diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala index 2ed413dcb..68d95dc80 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala @@ -26,7 +26,7 @@ import org.apache.kyuubi.client.util.BatchUtils._ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols.FrontendProtocol -import org.apache.kyuubi.engine.{ApplicationState, YarnApplicationOperation} +import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState, YarnApplicationOperation} import org.apache.kyuubi.engine.ApplicationState._ import org.apache.kyuubi.operation.{FetchOrientation, HiveJDBCTestHelper, OperationState} import org.apache.kyuubi.operation.OperationState.ERROR @@ -134,11 +134,15 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD assert(metadata.map(_.engineId).get.startsWith("application_")) } - val killResponse = yarnOperation.killApplicationByTag(sessionHandle.identifier.toString) + val appMgrInfo = ApplicationManagerInfo(Some("yarn")) + + val killResponse = + yarnOperation.killApplicationByTag(appMgrInfo, sessionHandle.identifier.toString) assert(killResponse._1) assert(killResponse._2 startsWith "Succeeded to terminate:") - val appInfo = yarnOperation.getApplicationInfoByTag(sessionHandle.identifier.toString) + val appInfo = + yarnOperation.getApplicationInfoByTag(appMgrInfo, sessionHandle.identifier.toString) assert(appInfo.state === KILLED) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala index a6e00bbaf..a0914afcf 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala @@ -38,10 +38,10 @@ class JpsApplicationOperationSuite extends KyuubiFunSuite { jps.initialize(null) test("JpsApplicationOperation with jstat") { - assert(jps.isSupported(None)) - assert(jps.isSupported(Some("local"))) - assert(!jps.killApplicationByTag(null)._1) - assert(!jps.killApplicationByTag("have a space")._1) + assert(jps.isSupported(ApplicationManagerInfo(None))) + assert(jps.isSupported(ApplicationManagerInfo(Some("local")))) + assert(!jps.killApplicationByTag(ApplicationManagerInfo(None), null)._1) + assert(!jps.killApplicationByTag(ApplicationManagerInfo(None), "have a space")._1) val currentProcess = ManagementFactory.getRuntimeMXBean.getName val currentPid = currentProcess.splitAt(currentProcess.indexOf("@"))._1 @@ -52,16 +52,16 @@ class JpsApplicationOperationSuite extends KyuubiFunSuite { }.start() eventually(Timeout(10.seconds)) { - val desc1 = jps.getApplicationInfoByTag("sun.tools.jstat.Jstat") + val desc1 = jps.getApplicationInfoByTag(ApplicationManagerInfo(None), "sun.tools.jstat.Jstat") assert(desc1.id != null) assert(desc1.name != null) assert(desc1.state == ApplicationState.RUNNING) } - jps.killApplicationByTag("sun.tools.jstat.Jstat") + jps.killApplicationByTag(ApplicationManagerInfo(None), "sun.tools.jstat.Jstat") eventually(Timeout(10.seconds)) { - val desc2 = jps.getApplicationInfoByTag("sun.tools.jstat.Jstat") + val desc2 = jps.getApplicationInfoByTag(ApplicationManagerInfo(None), "sun.tools.jstat.Jstat") assert(desc2.id == null) assert(desc2.name == null) assert(desc2.state == ApplicationState.NOT_FOUND) @@ -78,25 +78,25 @@ class JpsApplicationOperationSuite extends KyuubiFunSuite { val builder = new SparkProcessBuilder(user, conf) builder.start - assert(jps.isSupported(builder.clusterManager())) + assert(jps.isSupported(ApplicationManagerInfo(builder.clusterManager()))) eventually(Timeout(10.seconds)) { - val desc1 = jps.getApplicationInfoByTag(id) + val desc1 = jps.getApplicationInfoByTag(ApplicationManagerInfo(None), id) assert(desc1.id != null) assert(desc1.name != null) assert(desc1.state == ApplicationState.RUNNING) - val response = jps.killApplicationByTag(id) + val response = jps.killApplicationByTag(ApplicationManagerInfo(None), id) assert(response._1, response._2) assert(response._2 startsWith "Succeeded to terminate:") } eventually(Timeout(10.seconds)) { - val desc2 = jps.getApplicationInfoByTag(id) + val desc2 = jps.getApplicationInfoByTag(ApplicationManagerInfo(None), id) assert(desc2.id == null) assert(desc2.name == null) assert(desc2.state == ApplicationState.NOT_FOUND) } - val response2 = jps.killApplicationByTag(id) + val response2 = jps.killApplicationByTag(ApplicationManagerInfo(None), id) assert(!response2._1) assert(response2._2 === ApplicationOperation.NOT_FOUND) } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala index 9c69817a3..0c180db72 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala @@ -29,7 +29,7 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime import org.apache.kyuubi.{KYUUBI_VERSION, WithKyuubiServer} import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys} import org.apache.kyuubi.config.KyuubiConf.SESSION_CONF_ADVISOR -import org.apache.kyuubi.engine.ApplicationState +import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState} import org.apache.kyuubi.jdbc.KyuubiHiveDriver import org.apache.kyuubi.jdbc.hive.{KyuubiConnection, KyuubiSQLException} import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem} @@ -233,9 +233,11 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe } val engineId = sessionManager.allSessions().head.handle.identifier.toString // kill the engine application and wait the engine terminate - sessionManager.applicationManager.killApplication(None, engineId) + sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None), engineId) eventually(timeout(30.seconds), interval(100.milliseconds)) { - assert(sessionManager.applicationManager.getApplicationInfo(None, engineId) + assert(sessionManager.applicationManager.getApplicationInfo( + ApplicationManagerInfo(None), + engineId) .exists(_.state == ApplicationState.NOT_FOUND)) } assert(!conn.isValid(3000)) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala index da9b8ae44..f5bbe640e 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala @@ -33,7 +33,7 @@ import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, RestFrontendTestHelper import org.apache.kyuubi.client.api.v1.dto.{Engine, OperationData, ServerData, SessionData, SessionHandle, SessionOpenRequest} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY -import org.apache.kyuubi.engine.{ApplicationState, EngineRef, KyuubiApplicationManager} +import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState, EngineRef, KyuubiApplicationManager} import org.apache.kyuubi.engine.EngineType.SPARK_SQL import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, USER} import org.apache.kyuubi.ha.HighAvailabilityConf @@ -293,9 +293,10 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { } // kill the engine application - engineMgr.killApplication(None, id) + engineMgr.killApplication(ApplicationManagerInfo(None), id) eventually(timeout(30.seconds), interval(100.milliseconds)) { - assert(engineMgr.getApplicationInfo(None, id).exists(_.state == ApplicationState.NOT_FOUND)) + assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id).exists( + _.state == ApplicationState.NOT_FOUND)) } } } @@ -339,9 +340,10 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { } // kill the engine application - engineMgr.killApplication(None, id) + engineMgr.killApplication(ApplicationManagerInfo(None), id) eventually(timeout(30.seconds), interval(100.milliseconds)) { - assert(engineMgr.getApplicationInfo(None, id).exists(_.state == ApplicationState.NOT_FOUND)) + assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id).exists( + _.state == ApplicationState.NOT_FOUND)) } } } @@ -417,9 +419,10 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { assert(engines(0).getSubdomain == "default") // kill the engine application - engineMgr.killApplication(None, id) + engineMgr.killApplication(ApplicationManagerInfo(None), id) eventually(timeout(30.seconds), interval(100.milliseconds)) { - assert(engineMgr.getApplicationInfo(None, id).exists(_.state == ApplicationState.NOT_FOUND)) + assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id).exists( + _.state == ApplicationState.NOT_FOUND)) } } } @@ -463,9 +466,10 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { assert(engines(0).getSubdomain == "default") // kill the engine application - engineMgr.killApplication(None, id) + engineMgr.killApplication(ApplicationManagerInfo(None), id) eventually(timeout(30.seconds), interval(100.milliseconds)) { - assert(engineMgr.getApplicationInfo(None, id).exists(_.state == ApplicationState.NOT_FOUND)) + assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id).exists( + _.state == ApplicationState.NOT_FOUND)) } } } @@ -528,12 +532,12 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { assert(result1.size == 1) // kill the engine application - engineMgr.killApplication(None, id1) - engineMgr.killApplication(None, id2) + engineMgr.killApplication(ApplicationManagerInfo(None), id1) + engineMgr.killApplication(ApplicationManagerInfo(None), id2) eventually(timeout(30.seconds), interval(100.milliseconds)) { - assert(engineMgr.getApplicationInfo(None, id1) + assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id1) .exists(_.state == ApplicationState.NOT_FOUND)) - assert(engineMgr.getApplicationInfo(None, id2) + assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id2) .exists(_.state == ApplicationState.NOT_FOUND)) } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala index 8e0a80c4d..8a797f842 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala @@ -37,7 +37,7 @@ import org.apache.kyuubi.client.util.BatchUtils import org.apache.kyuubi.client.util.BatchUtils._ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ -import org.apache.kyuubi.engine.{ApplicationInfo, KyuubiApplicationManager} +import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo, KyuubiApplicationManager} import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem} import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState} @@ -64,7 +64,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi } sessionManager.getBatchesFromMetadataStore(null, null, null, 0, 0, 0, Int.MaxValue).foreach { batch => - sessionManager.applicationManager.killApplication(None, batch.getId) + sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None), batch.getId) sessionManager.cleanupMetadata(batch.getId) } } @@ -481,7 +481,8 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi var applicationStatus: Option[ApplicationInfo] = None eventually(timeout(5.seconds)) { - applicationStatus = sessionManager.applicationManager.getApplicationInfo(None, batchId2) + applicationStatus = + sessionManager.applicationManager.getApplicationInfo(ApplicationManagerInfo(None), batchId2) assert(applicationStatus.isDefined) } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala index ff807ef02..4e18951b5 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala @@ -32,6 +32,7 @@ import org.apache.kyuubi.{BatchTestHelper, RestClientTestHelper, Utils} import org.apache.kyuubi.client.util.BatchUtils._ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.ctl.{CtlConf, TestPrematureExit} +import org.apache.kyuubi.engine.ApplicationManagerInfo import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem} import org.apache.kyuubi.session.KyuubiSessionManager @@ -80,7 +81,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat } sessionManager.getBatchesFromMetadataStore(null, null, null, 0, 0, 0, Int.MaxValue).foreach { batch => - sessionManager.applicationManager.killApplication(None, batch.getId) + sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None), batch.getId) sessionManager.cleanupMetadata(batch.getId) } }