From a3f1e51e782d5185e8ddf26fb8cce3ef85f76b64 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Mon, 21 Jul 2025 10:11:07 +0800 Subject: [PATCH] [KYUUBI #7141] Support to get spark app url with pattern `http://{{SPARK_DRIVER_POD_IP}}:{{SPARK_UI_PORT}}` ### Why are the changes needed? We are using [virtual-kubelet](https://github.com/virtual-kubelet/virtual-kubelet) for spark on kubernetes, and spark kubernetes pods would be allocated across kubernetes clusters. And we use the driver POD ip as driver host, see https://github.com/apache/spark/pull/40392, which is supported since spark-3.5. The kubernetes context and namespace are virtual and we can not build the app URL by spark driver svc. And the spark driver pod IP is accessible for our use case, so raise this PR to build the spark app url by spark driver pod id and spark ui port. ### How was this patch tested? UT. image image ### Was this patch authored or co-authored using generative AI tooling? No. Closes #7141 from turboFei/app_url_v2. Closes #7141 1277952f5 [Wang, Fei] VAR d15e6bea7 [Cheng Pan] Update kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala 1535e00ac [Wang, Fei] spark driver pod ip Lead-authored-by: Wang, Fei Co-authored-by: Cheng Pan Signed-off-by: Cheng Pan --- docs/configuration/settings.md | 2 +- .../org/apache/kyuubi/config/KyuubiConf.scala | 5 +- .../KubernetesApplicationOperation.scala | 87 ++++++++++++++++--- .../KubernetesApplicationUrlSource.scala | 23 +++++ .../KubernetesApplicationOperationSuite.scala | 20 ++++- 5 files changed, 119 insertions(+), 18 deletions(-) create mode 100644 kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationUrlSource.scala diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index 8bbbcfed6..83485a521 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -363,7 +363,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.kubernetes.master.address | <undefined> | The internal Kubernetes master (API server) address to be used for kyuubi. | string | 1.7.0 | | kyuubi.kubernetes.namespace | default | The namespace that will be used for running the kyuubi pods and find engines. | string | 1.7.0 | | kyuubi.kubernetes.namespace.allow.list || The allowed kubernetes namespace list, if it is empty, there is no kubernetes namespace limitation. | set | 1.8.0 | -| kyuubi.kubernetes.spark.appUrlPattern | http://{{SPARK_DRIVER_SVC}}.{{KUBERNETES_NAMESPACE}}.svc:{{SPARK_UI_PORT}} | The pattern to generate the spark on kubernetes application UI URL. The pattern should contain placeholders for the application variables. Available placeholders are `{{SPARK_APP_ID}}`, `{{SPARK_DRIVER_SVC}}`, `{{KUBERNETES_NAMESPACE}}`, `{{KUBERNETES_CONTEXT}}` and `{{SPARK_UI_PORT}}`. | string | 1.10.0 | +| kyuubi.kubernetes.spark.appUrlPattern | http://{{SPARK_DRIVER_SVC}}.{{KUBERNETES_NAMESPACE}}.svc:{{SPARK_UI_PORT}} | The pattern to generate the spark on kubernetes application UI URL. The pattern should contain placeholders for the application variables. Available placeholders are `{{SPARK_APP_ID}}`, `{{SPARK_DRIVER_SVC}}`, `{{SPARK_DRIVER_POD_IP}}`, `{{KUBERNETES_NAMESPACE}}`, `{{KUBERNETES_CONTEXT}}` and `{{SPARK_UI_PORT}}`. | string | 1.10.0 | | kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled | false | If enabled, Kyuubi server will try to create the `spark.kubernetes.file.upload.path` with permission 777 before submitting the Spark application. | boolean | 1.11.0 | | kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.checkInterval | PT1M | Kyuubi server use guava cache as the cleanup trigger with time-based eviction, but the eviction would not happened until any get/put operation happened. This option schedule a daemon thread evict cache periodically. | duration | 1.8.1 | | kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.kind | NONE | Kyuubi server will delete the spark driver pod after the application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod. Available options are NONE, ALL, COMPLETED and default value is None which means none of the pod will be deleted | string | 1.8.1 | diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index b1589811c..06c8e7a9d 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1398,8 +1398,9 @@ object KyuubiConf { buildConf("kyuubi.kubernetes.spark.appUrlPattern") .doc("The pattern to generate the spark on kubernetes application UI URL. " + "The pattern should contain placeholders for the application variables. " + - "Available placeholders are `{{SPARK_APP_ID}}`, `{{SPARK_DRIVER_SVC}}`, " + - "`{{KUBERNETES_NAMESPACE}}`, `{{KUBERNETES_CONTEXT}}` and `{{SPARK_UI_PORT}}`.") + "Available placeholders are `{{SPARK_APP_ID}}`, `{{SPARK_DRIVER_SVC}}`," + + " `{{SPARK_DRIVER_POD_IP}}`, `{{KUBERNETES_NAMESPACE}}`, `{{KUBERNETES_CONTEXT}}`" + + " and `{{SPARK_UI_PORT}}`.") .version("1.10.0") .stringConf .createWithDefault( diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index 6066c951d..afe7bcc31 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -35,6 +35,7 @@ import org.apache.kyuubi.config.KyuubiConf.{KubernetesApplicationStateSource, Ku import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource import org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL, COMPLETED, NONE} import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, KILLED, NOT_FOUND, PENDING, RUNNING, UNKNOWN} +import org.apache.kyuubi.engine.KubernetesApplicationUrlSource._ import org.apache.kyuubi.engine.KubernetesResourceEventTypes.KubernetesResourceEventType import org.apache.kyuubi.operation.OperationState import org.apache.kyuubi.server.metadata.MetadataManager @@ -65,6 +66,13 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { private def appStateContainer: String = kyuubiConf.get(KyuubiConf.KUBERNETES_APPLICATION_STATE_CONTAINER) + private lazy val sparkAppUrlPattern = kyuubiConf.get(KyuubiConf.KUBERNETES_SPARK_APP_URL_PATTERN) + private lazy val sparkAppUrlSource = if (sparkAppUrlPattern.contains(SPARK_DRIVER_SVC_PATTERN)) { + KubernetesApplicationUrlSource.SVC + } else { + KubernetesApplicationUrlSource.POD + } + // key is kyuubi_unique_key private val appInfoStore: ConcurrentHashMap[String, (KubernetesInfo, ApplicationInfo)] = new ConcurrentHashMap[String, (KubernetesInfo, ApplicationInfo)] @@ -143,11 +151,14 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { val enginePodInformer = client.pods() .withLabel(LABEL_KYUUBI_UNIQUE_KEY) .inform(new SparkEnginePodEventHandler(kubernetesInfo)) - info(s"[$kubernetesInfo] Start Kubernetes Client Informer.") - val engineSvcInformer = client.services() - .inform(new SparkEngineSvcEventHandler(kubernetesInfo)) + info(s"[$kubernetesInfo] Start Kubernetes Client POD Informer.") enginePodInformers.put(kubernetesInfo, enginePodInformer) - engineSvcInformers.put(kubernetesInfo, engineSvcInformer) + if (sparkAppUrlSource == KubernetesApplicationUrlSource.SVC) { + info(s"[$kubernetesInfo] Start Kubernetes Client SVC Informer.") + val engineSvcInformer = client.services() + .inform(new SparkEngineSvcEventHandler(kubernetesInfo)) + engineSvcInformers.put(kubernetesInfo, engineSvcInformer) + } client case None => throw new KyuubiException(s"Fail to build Kubernetes client for $kubernetesInfo") @@ -474,6 +485,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { id = getPodAppId(pod), name = getPodAppName(pod), state = appState, + url = appInfo.url.orElse { + getPodAppUrl(sparkAppUrlSource, sparkAppUrlPattern, kubernetesInfo, pod) + }, error = appError, podName = Some(pod.getMetadata.getName))) }.getOrElse { @@ -483,6 +497,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { id = getPodAppId(pod), name = getPodAppName(pod), state = appState, + url = getPodAppUrl(sparkAppUrlSource, sparkAppUrlPattern, kubernetesInfo, pod), error = appError, podName = Some(pod.getMetadata.getName))) } @@ -500,7 +515,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { val appUrl = buildSparkAppUrl( appUrlPattern, sparkAppId, - sparkDriverSvc, + sparkDriverSvc = Some(sparkDriverSvc), + sparkDriverPodIp = None, kubernetesContext, kubernetesNamespace, sparkUiPort) @@ -695,9 +711,17 @@ object KubernetesApplicationOperation extends Logging { UNKNOWN } + final private val SPARK_APP_ID_PATTERN = "{{SPARK_APP_ID}}" + final private val SPARK_DRIVER_SVC_PATTERN = "{{SPARK_DRIVER_SVC}}" + final private val SPARK_DRIVER_POD_IP_PATTERN = "{{SPARK_DRIVER_POD_IP}}" + final private val KUBERNETES_CONTEXT_PATTERN = "{{KUBERNETES_CONTEXT}}" + final private val KUBERNETES_NAMESPACE_PATTERN = "{{KUBERNETES_NAMESPACE}}" + final private val SPARK_UI_PORT_PATTERN = "{{SPARK_UI_PORT}}" + /** * Replaces all the {{SPARK_APP_ID}} occurrences with the Spark App Id, * {{SPARK_DRIVER_SVC}} occurrences with the Spark Driver Service name, + * {{SPARK_DRIVER_POD_IP}} occurrences with the Spark Driver Pod IP, * {{KUBERNETES_CONTEXT}} occurrences with the Kubernetes Context, * {{KUBERNETES_NAMESPACE}} occurrences with the Kubernetes Namespace, * and {{SPARK_UI_PORT}} occurrences with the Spark UI Port. @@ -705,22 +729,61 @@ object KubernetesApplicationOperation extends Logging { private[kyuubi] def buildSparkAppUrl( sparkAppUrlPattern: String, sparkAppId: String, - sparkDriverSvc: String, + sparkDriverSvc: Option[String], + sparkDriverPodIp: Option[String], kubernetesContext: String, kubernetesNamespace: String, sparkUiPort: Int): String = { - sparkAppUrlPattern - .replace("{{SPARK_APP_ID}}", sparkAppId) - .replace("{{SPARK_DRIVER_SVC}}", sparkDriverSvc) - .replace("{{KUBERNETES_CONTEXT}}", kubernetesContext) - .replace("{{KUBERNETES_NAMESPACE}}", kubernetesNamespace) - .replace("{{SPARK_UI_PORT}}", sparkUiPort.toString) + var appUrl = sparkAppUrlPattern + .replace(SPARK_APP_ID_PATTERN, sparkAppId) + .replace(KUBERNETES_CONTEXT_PATTERN, kubernetesContext) + .replace(KUBERNETES_NAMESPACE_PATTERN, kubernetesNamespace) + .replace(SPARK_UI_PORT_PATTERN, sparkUiPort.toString) + sparkDriverSvc match { + case Some(svc) => appUrl = appUrl.replace(SPARK_DRIVER_SVC_PATTERN, svc) + case None => + } + sparkDriverPodIp match { + case Some(ip) => appUrl = appUrl.replace(SPARK_DRIVER_POD_IP_PATTERN, ip) + case None => + } + appUrl } def getPodAppId(pod: Pod): String = { pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL) } + private[kyuubi] def getPodAppUrl( + sparkAppUrlSource: KubernetesApplicationUrlSource, + sparkAppUrlPattern: String, + kubernetesInfo: KubernetesInfo, + pod: Pod): Option[String] = { + sparkAppUrlSource match { + case KubernetesApplicationUrlSource.SVC => None + case KubernetesApplicationUrlSource.POD => + val podIp = Option(pod.getStatus.getPodIP).filter(_.nonEmpty) + podIp match { + case Some(ip) => pod.getSpec.getContainers.asScala.flatMap( + _.getPorts.asScala).find(_.getName == SPARK_UI_PORT_NAME) + .map(_.getContainerPort).map { sparkUiPort => + buildSparkAppUrl( + sparkAppUrlPattern, + getPodAppId(pod), + sparkDriverSvc = None, + sparkDriverPodIp = Some(ip), + kubernetesContext = kubernetesInfo.context.orNull, + kubernetesNamespace = kubernetesInfo.namespace.orNull, + sparkUiPort = sparkUiPort) + }.orElse { + warn(s"Spark UI port not found in pod ${pod.getMetadata.getName}") + None + } + case None => None + } + } + } + def getPodAppName(pod: Pod): String = { Option(pod.getMetadata.getLabels.get(SPARK_APP_NAME_LABEL)).getOrElse(pod.getMetadata.getName) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationUrlSource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationUrlSource.scala new file mode 100644 index 000000000..276c50b95 --- /dev/null +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationUrlSource.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine + +object KubernetesApplicationUrlSource extends Enumeration { + type KubernetesApplicationUrlSource = Value + val SVC, POD = Value +} diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala index de4043852..489a00be4 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala @@ -64,6 +64,7 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite { val sparkAppUrlPattern3 = "http://{{SPARK_DRIVER_SVC}}.{{KUBERNETES_NAMESPACE}}.svc" + ".{{KUBERNETES_CONTEXT}}.k8s.io:{{SPARK_UI_PORT}}" + val sparkAppUrlPattern4 = "http://{{SPARK_DRIVER_POD_IP}}:{{SPARK_UI_PORT}}" val sparkAppId = "spark-123" val sparkDriverSvc = "spark-456-driver-svc" @@ -74,7 +75,8 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite { assert(KubernetesApplicationOperation.buildSparkAppUrl( sparkAppUrlPattern1, sparkAppId, - sparkDriverSvc, + Some(sparkDriverSvc), + None, kubernetesContext, kubernetesNamespace, sparkUiPort) === s"http://$sparkAppId.ingress.balabala") @@ -82,7 +84,8 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite { assert(KubernetesApplicationOperation.buildSparkAppUrl( sparkAppUrlPattern2, sparkAppId, - sparkDriverSvc, + Some(sparkDriverSvc), + None, kubernetesContext, kubernetesNamespace, sparkUiPort) === s"http://$sparkDriverSvc.$kubernetesNamespace.svc:$sparkUiPort") @@ -90,11 +93,22 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite { assert(KubernetesApplicationOperation.buildSparkAppUrl( sparkAppUrlPattern3, sparkAppId, - sparkDriverSvc, + Some(sparkDriverSvc), + None, kubernetesContext, kubernetesNamespace, sparkUiPort) === s"http://$sparkDriverSvc.$kubernetesNamespace.svc.$kubernetesContext.k8s.io:$sparkUiPort") + + assert(KubernetesApplicationOperation.buildSparkAppUrl( + sparkAppUrlPattern4, + sparkAppId, + None, + Some("10.69.234.1"), + kubernetesContext, + kubernetesNamespace, + sparkUiPort) === + s"http://10.69.234.1:$sparkUiPort") } test("get kubernetes client initialization info") {