[KYUUBI #7027] Support to initialize kubernetes clients on kyuubi server startup

### Why are the changes needed?

This ensure the Kyuubi server is promptly informed for any Kubernetes resource changes after startup. It is highly recommend to set it for multiple Kyuubi instances mode.

### How was this patch tested?

Existing GA and Integration testing.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #7027 from turboFei/k8s_client_init.

Closes #7027

393b9960a [Wang, Fei] server only
a640278c4 [Wang, Fei] refresh

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Wang, Fei 2025-04-15 22:36:16 -07:00
parent fa99183354
commit 4fc201e85d
6 changed files with 60 additions and 0 deletions

View File

@ -356,6 +356,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.kubernetes.authenticate.clientKeyFile | &lt;undefined&gt; | Path to the client key file for connecting to the Kubernetes API server over TLS from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a scheme) | string | 1.7.0 |
| kyuubi.kubernetes.authenticate.oauthToken | &lt;undefined&gt; | The OAuth token to use when authenticating against the Kubernetes API server. Note that unlike, the other authentication options, this must be the exact string value of the token to use for the authentication. | string | 1.7.0 |
| kyuubi.kubernetes.authenticate.oauthTokenFile | &lt;undefined&gt; | Path to the file containing the OAuth token to use when authenticating against the Kubernetes API server. Specify this as a path as opposed to a URI (i.e. do not provide a scheme) | string | 1.7.0 |
| kyuubi.kubernetes.client.initialize.list || The kubernetes client initialize list to register kubernetes resource informers during Kyuubi server startup. This ensure the Kyuubi server is promptly informed for any Kubernetes resource changes after startup. It is highly recommend to set it for multiple Kyuubi instances mode. The format is `context1:namespace1,context2:namespace2`. | seq | 1.11.0 |
| kyuubi.kubernetes.context | &lt;undefined&gt; | The desired context from your kubernetes config file used to configure the K8s client for interacting with the cluster. | string | 1.6.0 |
| kyuubi.kubernetes.context.allow.list || The allowed kubernetes context list, if it is empty, there is no kubernetes context limitation. | set | 1.8.0 |
| kyuubi.kubernetes.master.address | &lt;undefined&gt; | The internal Kubernetes master (API server) address to be used for kyuubi. | string | 1.7.0 |

View File

@ -1236,6 +1236,18 @@ object KyuubiConf {
.toSet()
.createWithDefault(Set.empty)
val KUBERNETES_CLIENT_INITIALIZE_LIST: ConfigEntry[Seq[String]] =
buildConf("kyuubi.kubernetes.client.initialize.list")
.doc("The kubernetes client initialize list to register kubernetes resource informers" +
" during Kyuubi server startup. This ensure the Kyuubi server is promptly informed for" +
" any Kubernetes resource changes after startup. It is highly recommend to set it for" +
" multiple Kyuubi instances mode. The format is `context1:namespace1,context2:namespace2`.")
.version("1.11.0")
.serverOnly
.stringConf
.toSequence()
.createWithDefault(Nil)
val KUBERNETES_MASTER: OptionalConfigEntry[String] =
buildConf("kyuubi.kubernetes.master.address")
.doc("The internal Kubernetes master (API server) address to be used for kyuubi.")

View File

@ -167,6 +167,29 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
TimeUnit.MILLISECONDS)
cleanupCanceledAppPodExecutor = ThreadUtils.newDaemonCachedThreadPool(
"cleanup-canceled-app-pod-thread")
initializeKubernetesClient(kyuubiConf)
}
private[kyuubi] def getKubernetesClientInitializeInfo(
kyuubiConf: KyuubiConf): Seq[KubernetesInfo] = {
kyuubiConf.get(KyuubiConf.KUBERNETES_CLIENT_INITIALIZE_LIST).map { init =>
val (context, namespace) = init.split(":") match {
case Array(ctx, ns) => (Some(ctx).filterNot(_.isEmpty), Some(ns).filterNot(_.isEmpty))
case Array(ctx) => (Some(ctx).filterNot(_.isEmpty), None)
case _ => (None, None)
}
KubernetesInfo(context, namespace)
}
}
private[kyuubi] def initializeKubernetesClient(kyuubiConf: KyuubiConf): Unit = {
getKubernetesClientInitializeInfo(kyuubiConf).foreach { kubernetesInfo =>
try {
getOrCreateKubernetesClient(kubernetesInfo)
} catch {
case e: Throwable => error(s"Failed to initialize Kubernetes client for $kubernetesInfo", e)
}
}
}
override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {

View File

@ -96,6 +96,11 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
case None => None
}
}
private[kyuubi] def getKubernetesApplicationOperation: Option[KubernetesApplicationOperation] = {
operations.find(_.isInstanceOf[KubernetesApplicationOperation])
.map(_.asInstanceOf[KubernetesApplicationOperation])
}
}
object KyuubiApplicationManager {

View File

@ -124,6 +124,8 @@ object KyuubiServer extends Logging {
val refreshedKubernetesConf =
createKyuubiConf().getAll.filter(_._1.startsWith(KYUUBI_KUBERNETES_CONF_PREFIX))
refreshConfig("kubernetes", existedKubernetesConf, refreshedKubernetesConf)
kyuubiServer.backendService.sessionManager.asInstanceOf[KyuubiSessionManager].applicationManager
.getKubernetesApplicationOperation.foreach(_.initializeKubernetesClient(kyuubiServer.conf))
}
private def refreshConfig(

View File

@ -96,4 +96,21 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite {
sparkUiPort) ===
s"http://$sparkDriverSvc.$kubernetesNamespace.svc.$kubernetesContext.k8s.io:$sparkUiPort")
}
test("get kubernetes client initialization info") {
val kyuubiConf = KyuubiConf()
kyuubiConf.set(
KyuubiConf.KUBERNETES_CLIENT_INITIALIZE_LIST.key,
"c1:ns1,c1:ns2,c2:ns1,c2:ns2,c1:,:ns1")
val operation = new KubernetesApplicationOperation()
assert(operation.getKubernetesClientInitializeInfo(kyuubiConf) ===
Array(
KubernetesInfo(Some("c1"), Some("ns1")),
KubernetesInfo(Some("c1"), Some("ns2")),
KubernetesInfo(Some("c2"), Some("ns1")),
KubernetesInfo(Some("c2"), Some("ns2")),
KubernetesInfo(Some("c1"), None),
KubernetesInfo(None, Some("ns1"))))
}
}