[KYUUBI #7141] Support to get spark app url with pattern http://{{SPARK_DRIVER_POD_IP}}:{{SPARK_UI_PORT}}

### Why are the changes needed?

We are using [virtual-kubelet](https://github.com/virtual-kubelet/virtual-kubelet) for spark on kubernetes, and spark kubernetes pods would be allocated across kubernetes clusters.

And we use the driver POD ip as driver host, see https://github.com/apache/spark/pull/40392, which is supported since spark-3.5.

The kubernetes context and namespace are virtual and we can not build the app URL by spark driver svc.

And the spark driver pod IP is accessible for our use case, so raise this PR to build the spark app url by spark driver pod id and spark ui port.

### How was this patch tested?

UT.

<img width="1532" height="626" alt="image" src="https://github.com/user-attachments/assets/5cb54602-9e79-40b7-b51c-0b873c17560b" />
<img width="710" height="170" alt="image" src="https://github.com/user-attachments/assets/6d1c9580-62d6-423a-a04f-dc6cdcee940a" />

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #7141 from turboFei/app_url_v2.

Closes #7141

1277952f5 [Wang, Fei] VAR
d15e6bea7 [Cheng Pan] Update kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
1535e00ac [Wang, Fei] spark driver pod ip

Lead-authored-by: Wang, Fei <fwang12@ebay.com>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Wang, Fei 2025-07-21 10:11:07 +08:00 committed by Cheng Pan
parent 5f4b1f0de5
commit a3f1e51e78
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
5 changed files with 119 additions and 18 deletions

View File

@ -363,7 +363,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.kubernetes.master.address | &lt;undefined&gt; | The internal Kubernetes master (API server) address to be used for kyuubi. | string | 1.7.0 |
| kyuubi.kubernetes.namespace | default | The namespace that will be used for running the kyuubi pods and find engines. | string | 1.7.0 |
| kyuubi.kubernetes.namespace.allow.list || The allowed kubernetes namespace list, if it is empty, there is no kubernetes namespace limitation. | set | 1.8.0 |
| kyuubi.kubernetes.spark.appUrlPattern | http://{{SPARK_DRIVER_SVC}}.{{KUBERNETES_NAMESPACE}}.svc:{{SPARK_UI_PORT}} | The pattern to generate the spark on kubernetes application UI URL. The pattern should contain placeholders for the application variables. Available placeholders are `{{SPARK_APP_ID}}`, `{{SPARK_DRIVER_SVC}}`, `{{KUBERNETES_NAMESPACE}}`, `{{KUBERNETES_CONTEXT}}` and `{{SPARK_UI_PORT}}`. | string | 1.10.0 |
| kyuubi.kubernetes.spark.appUrlPattern | http://{{SPARK_DRIVER_SVC}}.{{KUBERNETES_NAMESPACE}}.svc:{{SPARK_UI_PORT}} | The pattern to generate the spark on kubernetes application UI URL. The pattern should contain placeholders for the application variables. Available placeholders are `{{SPARK_APP_ID}}`, `{{SPARK_DRIVER_SVC}}`, `{{SPARK_DRIVER_POD_IP}}`, `{{KUBERNETES_NAMESPACE}}`, `{{KUBERNETES_CONTEXT}}` and `{{SPARK_UI_PORT}}`. | string | 1.10.0 |
| kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled | false | If enabled, Kyuubi server will try to create the `spark.kubernetes.file.upload.path` with permission 777 before submitting the Spark application. | boolean | 1.11.0 |
| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.checkInterval | PT1M | Kyuubi server use guava cache as the cleanup trigger with time-based eviction, but the eviction would not happened until any get/put operation happened. This option schedule a daemon thread evict cache periodically. | duration | 1.8.1 |
| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.kind | NONE | Kyuubi server will delete the spark driver pod after the application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod. Available options are NONE, ALL, COMPLETED and default value is None which means none of the pod will be deleted | string | 1.8.1 |

View File

@ -1398,8 +1398,9 @@ object KyuubiConf {
buildConf("kyuubi.kubernetes.spark.appUrlPattern")
.doc("The pattern to generate the spark on kubernetes application UI URL. " +
"The pattern should contain placeholders for the application variables. " +
"Available placeholders are `{{SPARK_APP_ID}}`, `{{SPARK_DRIVER_SVC}}`, " +
"`{{KUBERNETES_NAMESPACE}}`, `{{KUBERNETES_CONTEXT}}` and `{{SPARK_UI_PORT}}`.")
"Available placeholders are `{{SPARK_APP_ID}}`, `{{SPARK_DRIVER_SVC}}`," +
" `{{SPARK_DRIVER_POD_IP}}`, `{{KUBERNETES_NAMESPACE}}`, `{{KUBERNETES_CONTEXT}}`" +
" and `{{SPARK_UI_PORT}}`.")
.version("1.10.0")
.stringConf
.createWithDefault(

View File

@ -35,6 +35,7 @@ import org.apache.kyuubi.config.KyuubiConf.{KubernetesApplicationStateSource, Ku
import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource
import org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL, COMPLETED, NONE}
import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, KILLED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
import org.apache.kyuubi.engine.KubernetesApplicationUrlSource._
import org.apache.kyuubi.engine.KubernetesResourceEventTypes.KubernetesResourceEventType
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.metadata.MetadataManager
@ -65,6 +66,13 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
private def appStateContainer: String =
kyuubiConf.get(KyuubiConf.KUBERNETES_APPLICATION_STATE_CONTAINER)
private lazy val sparkAppUrlPattern = kyuubiConf.get(KyuubiConf.KUBERNETES_SPARK_APP_URL_PATTERN)
private lazy val sparkAppUrlSource = if (sparkAppUrlPattern.contains(SPARK_DRIVER_SVC_PATTERN)) {
KubernetesApplicationUrlSource.SVC
} else {
KubernetesApplicationUrlSource.POD
}
// key is kyuubi_unique_key
private val appInfoStore: ConcurrentHashMap[String, (KubernetesInfo, ApplicationInfo)] =
new ConcurrentHashMap[String, (KubernetesInfo, ApplicationInfo)]
@ -143,11 +151,14 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
val enginePodInformer = client.pods()
.withLabel(LABEL_KYUUBI_UNIQUE_KEY)
.inform(new SparkEnginePodEventHandler(kubernetesInfo))
info(s"[$kubernetesInfo] Start Kubernetes Client Informer.")
val engineSvcInformer = client.services()
.inform(new SparkEngineSvcEventHandler(kubernetesInfo))
info(s"[$kubernetesInfo] Start Kubernetes Client POD Informer.")
enginePodInformers.put(kubernetesInfo, enginePodInformer)
engineSvcInformers.put(kubernetesInfo, engineSvcInformer)
if (sparkAppUrlSource == KubernetesApplicationUrlSource.SVC) {
info(s"[$kubernetesInfo] Start Kubernetes Client SVC Informer.")
val engineSvcInformer = client.services()
.inform(new SparkEngineSvcEventHandler(kubernetesInfo))
engineSvcInformers.put(kubernetesInfo, engineSvcInformer)
}
client
case None => throw new KyuubiException(s"Fail to build Kubernetes client for $kubernetesInfo")
@ -474,6 +485,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
id = getPodAppId(pod),
name = getPodAppName(pod),
state = appState,
url = appInfo.url.orElse {
getPodAppUrl(sparkAppUrlSource, sparkAppUrlPattern, kubernetesInfo, pod)
},
error = appError,
podName = Some(pod.getMetadata.getName)))
}.getOrElse {
@ -483,6 +497,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
id = getPodAppId(pod),
name = getPodAppName(pod),
state = appState,
url = getPodAppUrl(sparkAppUrlSource, sparkAppUrlPattern, kubernetesInfo, pod),
error = appError,
podName = Some(pod.getMetadata.getName)))
}
@ -500,7 +515,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
val appUrl = buildSparkAppUrl(
appUrlPattern,
sparkAppId,
sparkDriverSvc,
sparkDriverSvc = Some(sparkDriverSvc),
sparkDriverPodIp = None,
kubernetesContext,
kubernetesNamespace,
sparkUiPort)
@ -695,9 +711,17 @@ object KubernetesApplicationOperation extends Logging {
UNKNOWN
}
final private val SPARK_APP_ID_PATTERN = "{{SPARK_APP_ID}}"
final private val SPARK_DRIVER_SVC_PATTERN = "{{SPARK_DRIVER_SVC}}"
final private val SPARK_DRIVER_POD_IP_PATTERN = "{{SPARK_DRIVER_POD_IP}}"
final private val KUBERNETES_CONTEXT_PATTERN = "{{KUBERNETES_CONTEXT}}"
final private val KUBERNETES_NAMESPACE_PATTERN = "{{KUBERNETES_NAMESPACE}}"
final private val SPARK_UI_PORT_PATTERN = "{{SPARK_UI_PORT}}"
/**
* Replaces all the {{SPARK_APP_ID}} occurrences with the Spark App Id,
* {{SPARK_DRIVER_SVC}} occurrences with the Spark Driver Service name,
* {{SPARK_DRIVER_POD_IP}} occurrences with the Spark Driver Pod IP,
* {{KUBERNETES_CONTEXT}} occurrences with the Kubernetes Context,
* {{KUBERNETES_NAMESPACE}} occurrences with the Kubernetes Namespace,
* and {{SPARK_UI_PORT}} occurrences with the Spark UI Port.
@ -705,22 +729,61 @@ object KubernetesApplicationOperation extends Logging {
private[kyuubi] def buildSparkAppUrl(
sparkAppUrlPattern: String,
sparkAppId: String,
sparkDriverSvc: String,
sparkDriverSvc: Option[String],
sparkDriverPodIp: Option[String],
kubernetesContext: String,
kubernetesNamespace: String,
sparkUiPort: Int): String = {
sparkAppUrlPattern
.replace("{{SPARK_APP_ID}}", sparkAppId)
.replace("{{SPARK_DRIVER_SVC}}", sparkDriverSvc)
.replace("{{KUBERNETES_CONTEXT}}", kubernetesContext)
.replace("{{KUBERNETES_NAMESPACE}}", kubernetesNamespace)
.replace("{{SPARK_UI_PORT}}", sparkUiPort.toString)
var appUrl = sparkAppUrlPattern
.replace(SPARK_APP_ID_PATTERN, sparkAppId)
.replace(KUBERNETES_CONTEXT_PATTERN, kubernetesContext)
.replace(KUBERNETES_NAMESPACE_PATTERN, kubernetesNamespace)
.replace(SPARK_UI_PORT_PATTERN, sparkUiPort.toString)
sparkDriverSvc match {
case Some(svc) => appUrl = appUrl.replace(SPARK_DRIVER_SVC_PATTERN, svc)
case None =>
}
sparkDriverPodIp match {
case Some(ip) => appUrl = appUrl.replace(SPARK_DRIVER_POD_IP_PATTERN, ip)
case None =>
}
appUrl
}
def getPodAppId(pod: Pod): String = {
pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL)
}
private[kyuubi] def getPodAppUrl(
sparkAppUrlSource: KubernetesApplicationUrlSource,
sparkAppUrlPattern: String,
kubernetesInfo: KubernetesInfo,
pod: Pod): Option[String] = {
sparkAppUrlSource match {
case KubernetesApplicationUrlSource.SVC => None
case KubernetesApplicationUrlSource.POD =>
val podIp = Option(pod.getStatus.getPodIP).filter(_.nonEmpty)
podIp match {
case Some(ip) => pod.getSpec.getContainers.asScala.flatMap(
_.getPorts.asScala).find(_.getName == SPARK_UI_PORT_NAME)
.map(_.getContainerPort).map { sparkUiPort =>
buildSparkAppUrl(
sparkAppUrlPattern,
getPodAppId(pod),
sparkDriverSvc = None,
sparkDriverPodIp = Some(ip),
kubernetesContext = kubernetesInfo.context.orNull,
kubernetesNamespace = kubernetesInfo.namespace.orNull,
sparkUiPort = sparkUiPort)
}.orElse {
warn(s"Spark UI port not found in pod ${pod.getMetadata.getName}")
None
}
case None => None
}
}
}
def getPodAppName(pod: Pod): String = {
Option(pod.getMetadata.getLabels.get(SPARK_APP_NAME_LABEL)).getOrElse(pod.getMetadata.getName)
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine
object KubernetesApplicationUrlSource extends Enumeration {
type KubernetesApplicationUrlSource = Value
val SVC, POD = Value
}

View File

@ -64,6 +64,7 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite {
val sparkAppUrlPattern3 =
"http://{{SPARK_DRIVER_SVC}}.{{KUBERNETES_NAMESPACE}}.svc" +
".{{KUBERNETES_CONTEXT}}.k8s.io:{{SPARK_UI_PORT}}"
val sparkAppUrlPattern4 = "http://{{SPARK_DRIVER_POD_IP}}:{{SPARK_UI_PORT}}"
val sparkAppId = "spark-123"
val sparkDriverSvc = "spark-456-driver-svc"
@ -74,7 +75,8 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite {
assert(KubernetesApplicationOperation.buildSparkAppUrl(
sparkAppUrlPattern1,
sparkAppId,
sparkDriverSvc,
Some(sparkDriverSvc),
None,
kubernetesContext,
kubernetesNamespace,
sparkUiPort) === s"http://$sparkAppId.ingress.balabala")
@ -82,7 +84,8 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite {
assert(KubernetesApplicationOperation.buildSparkAppUrl(
sparkAppUrlPattern2,
sparkAppId,
sparkDriverSvc,
Some(sparkDriverSvc),
None,
kubernetesContext,
kubernetesNamespace,
sparkUiPort) === s"http://$sparkDriverSvc.$kubernetesNamespace.svc:$sparkUiPort")
@ -90,11 +93,22 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite {
assert(KubernetesApplicationOperation.buildSparkAppUrl(
sparkAppUrlPattern3,
sparkAppId,
sparkDriverSvc,
Some(sparkDriverSvc),
None,
kubernetesContext,
kubernetesNamespace,
sparkUiPort) ===
s"http://$sparkDriverSvc.$kubernetesNamespace.svc.$kubernetesContext.k8s.io:$sparkUiPort")
assert(KubernetesApplicationOperation.buildSparkAppUrl(
sparkAppUrlPattern4,
sparkAppId,
None,
Some("10.69.234.1"),
kubernetesContext,
kubernetesNamespace,
sparkUiPort) ===
s"http://10.69.234.1:$sparkUiPort")
}
test("get kubernetes client initialization info") {