diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index 3c17e25b8..ab3d47ebf 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -356,6 +356,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.kubernetes.authenticate.clientKeyFile | <undefined> | Path to the client key file for connecting to the Kubernetes API server over TLS from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a scheme) | string | 1.7.0 | | kyuubi.kubernetes.authenticate.oauthToken | <undefined> | The OAuth token to use when authenticating against the Kubernetes API server. Note that unlike, the other authentication options, this must be the exact string value of the token to use for the authentication. | string | 1.7.0 | | kyuubi.kubernetes.authenticate.oauthTokenFile | <undefined> | Path to the file containing the OAuth token to use when authenticating against the Kubernetes API server. Specify this as a path as opposed to a URI (i.e. do not provide a scheme) | string | 1.7.0 | +| kyuubi.kubernetes.client.initialize.list || The kubernetes client initialize list to register kubernetes resource informers during Kyuubi server startup. This ensure the Kyuubi server is promptly informed for any Kubernetes resource changes after startup. It is highly recommend to set it for multiple Kyuubi instances mode. The format is `context1:namespace1,context2:namespace2`. | seq | 1.11.0 | | kyuubi.kubernetes.context | <undefined> | The desired context from your kubernetes config file used to configure the K8s client for interacting with the cluster. | string | 1.6.0 | | kyuubi.kubernetes.context.allow.list || The allowed kubernetes context list, if it is empty, there is no kubernetes context limitation. | set | 1.8.0 | | kyuubi.kubernetes.master.address | <undefined> | The internal Kubernetes master (API server) address to be used for kyuubi. | string | 1.7.0 | diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index d27df47c0..c1604054d 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1236,6 +1236,18 @@ object KyuubiConf { .toSet() .createWithDefault(Set.empty) + val KUBERNETES_CLIENT_INITIALIZE_LIST: ConfigEntry[Seq[String]] = + buildConf("kyuubi.kubernetes.client.initialize.list") + .doc("The kubernetes client initialize list to register kubernetes resource informers" + + " during Kyuubi server startup. This ensure the Kyuubi server is promptly informed for" + + " any Kubernetes resource changes after startup. It is highly recommend to set it for" + + " multiple Kyuubi instances mode. The format is `context1:namespace1,context2:namespace2`.") + .version("1.11.0") + .serverOnly + .stringConf + .toSequence() + .createWithDefault(Nil) + val KUBERNETES_MASTER: OptionalConfigEntry[String] = buildConf("kyuubi.kubernetes.master.address") .doc("The internal Kubernetes master (API server) address to be used for kyuubi.") diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index 59faee486..55f0fa661 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -167,6 +167,29 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { TimeUnit.MILLISECONDS) cleanupCanceledAppPodExecutor = ThreadUtils.newDaemonCachedThreadPool( "cleanup-canceled-app-pod-thread") + initializeKubernetesClient(kyuubiConf) + } + + private[kyuubi] def getKubernetesClientInitializeInfo( + kyuubiConf: KyuubiConf): Seq[KubernetesInfo] = { + kyuubiConf.get(KyuubiConf.KUBERNETES_CLIENT_INITIALIZE_LIST).map { init => + val (context, namespace) = init.split(":") match { + case Array(ctx, ns) => (Some(ctx).filterNot(_.isEmpty), Some(ns).filterNot(_.isEmpty)) + case Array(ctx) => (Some(ctx).filterNot(_.isEmpty), None) + case _ => (None, None) + } + KubernetesInfo(context, namespace) + } + } + + private[kyuubi] def initializeKubernetesClient(kyuubiConf: KyuubiConf): Unit = { + getKubernetesClientInitializeInfo(kyuubiConf).foreach { kubernetesInfo => + try { + getOrCreateKubernetesClient(kubernetesInfo) + } catch { + case e: Throwable => error(s"Failed to initialize Kubernetes client for $kubernetesInfo", e) + } + } } override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala index a80f13a86..b4095c424 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala @@ -96,6 +96,11 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager case None => None } } + + private[kyuubi] def getKubernetesApplicationOperation: Option[KubernetesApplicationOperation] = { + operations.find(_.isInstanceOf[KubernetesApplicationOperation]) + .map(_.asInstanceOf[KubernetesApplicationOperation]) + } } object KyuubiApplicationManager { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala index 338ac6b41..099647265 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala @@ -124,6 +124,8 @@ object KyuubiServer extends Logging { val refreshedKubernetesConf = createKyuubiConf().getAll.filter(_._1.startsWith(KYUUBI_KUBERNETES_CONF_PREFIX)) refreshConfig("kubernetes", existedKubernetesConf, refreshedKubernetesConf) + kyuubiServer.backendService.sessionManager.asInstanceOf[KyuubiSessionManager].applicationManager + .getKubernetesApplicationOperation.foreach(_.initializeKubernetesClient(kyuubiServer.conf)) } private def refreshConfig( diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala index ab663a007..f6bfc409d 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala @@ -96,4 +96,21 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite { sparkUiPort) === s"http://$sparkDriverSvc.$kubernetesNamespace.svc.$kubernetesContext.k8s.io:$sparkUiPort") } + + test("get kubernetes client initialization info") { + val kyuubiConf = KyuubiConf() + kyuubiConf.set( + KyuubiConf.KUBERNETES_CLIENT_INITIALIZE_LIST.key, + "c1:ns1,c1:ns2,c2:ns1,c2:ns2,c1:,:ns1") + + val operation = new KubernetesApplicationOperation() + assert(operation.getKubernetesClientInitializeInfo(kyuubiConf) === + Array( + KubernetesInfo(Some("c1"), Some("ns1")), + KubernetesInfo(Some("c1"), Some("ns2")), + KubernetesInfo(Some("c2"), Some("ns1")), + KubernetesInfo(Some("c2"), Some("ns2")), + KubernetesInfo(Some("c1"), None), + KubernetesInfo(None, Some("ns1")))) + } }