diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala index 7f1be93b5..ef159bb93 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala @@ -51,7 +51,10 @@ class SparkBatchProcessBuilder( // tag batch application KyuubiApplicationManager.tagApplication(batchId, "spark", clusterManager(), batchKyuubiConf) - (batchKyuubiConf.getAll ++ sparkAppNameConf() ++ engineLogPathConf()).foreach { case (k, v) => + (batchKyuubiConf.getAll ++ + sparkAppNameConf() ++ + engineLogPathConf() ++ + appendPodNameConf(batchConf)).foreach { case (k, v) => buffer += CONF buffer += s"${convertConfigKey(k)}=$v" } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index 5998d5d4e..351eddb75 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -21,6 +21,7 @@ import java.io.{File, IOException} import java.nio.file.Paths import java.util.Locale +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import com.google.common.annotations.VisibleForTesting @@ -34,6 +35,7 @@ import org.apache.kyuubi.engine.ProcBuilder.KYUUBI_ENGINE_LOG_PATH_KEY import org.apache.kyuubi.ha.HighAvailabilityConf import org.apache.kyuubi.ha.client.AuthTypes import org.apache.kyuubi.operation.log.OperationLog +import org.apache.kyuubi.util.KubernetesUtils import org.apache.kyuubi.util.Validator class SparkProcessBuilder( @@ -118,7 +120,7 @@ class SparkProcessBuilder( allConf = allConf ++ zkAuthKeytabFileConf(allConf) } // pass spark engine log path to spark conf - (allConf ++ engineLogPathConf).foreach { case (k, v) => + (allConf ++ engineLogPathConf ++ appendPodNameConf(allConf)).foreach { case (k, v) => buffer += CONF buffer += s"${convertConfigKey(k)}=$v" } @@ -208,6 +210,24 @@ class SparkProcessBuilder( kubernetesNamespace()) } + def appendPodNameConf(conf: Map[String, String]): Map[String, String] = { + val appName = conf.getOrElse(APP_KEY, "spark") + val map = mutable.Map.newBuilder[String, String] + if (clusterManager().exists(cm => cm.toLowerCase(Locale.ROOT).startsWith("k8s"))) { + if (!conf.contains(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)) { + val prefix = KubernetesUtils.generateExecutorPodNamePrefix(appName, engineRefId) + map += (KUBERNETES_EXECUTOR_POD_NAME_PREFIX -> prefix) + } + if (deployMode().exists(_.toLowerCase(Locale.ROOT) == "cluster")) { + if (!conf.contains(KUBERNETES_DRIVER_POD_NAME)) { + val name = KubernetesUtils.generateDriverPodName(appName, engineRefId) + map += (KUBERNETES_DRIVER_POD_NAME -> name) + } + } + } + map.result().toMap + } + override def clusterManager(): Option[String] = { conf.getOption(MASTER_KEY).orElse(defaultsConf.get(MASTER_KEY)) } @@ -258,6 +278,8 @@ object SparkProcessBuilder { final val DEPLOY_MODE_KEY = "spark.submit.deployMode" final val KUBERNETES_CONTEXT_KEY = "spark.kubernetes.context" final val KUBERNETES_NAMESPACE_KEY = "spark.kubernetes.namespace" + final val KUBERNETES_DRIVER_POD_NAME = "spark.kubernetes.driver.pod.name" + final val KUBERNETES_EXECUTOR_POD_NAME_PREFIX = "spark.kubernetes.executor.podNamePrefix" final val INTERNAL_RESOURCE = "spark-internal" /** diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/util/KubernetesUtils.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/util/KubernetesUtils.scala index f9780bb16..9da3408a3 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/util/KubernetesUtils.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/util/KubernetesUtils.scala @@ -18,6 +18,7 @@ package org.apache.kyuubi.util import java.io.File +import java.util.Locale import com.fasterxml.jackson.databind.ObjectMapper import com.google.common.base.Charsets @@ -32,6 +33,10 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ object KubernetesUtils extends Logging { + // Kubernetes pod name max length - '-exec-' - Int.MAX_VALUE.length + // 253 - 10 - 6 + final val EXECUTOR_POD_NAME_PREFIX_MAX_LENGTH = 237 + final val DRIVER_POD_NAME_MAX_LENGTH = 253 def buildKubernetesClient(conf: KyuubiConf): Option[KubernetesClient] = { val master = conf.get(KUBERNETES_MASTER) @@ -114,4 +119,32 @@ object KubernetesUtils extends Logging { opt1.foreach { _ => require(opt2.isEmpty, errMessage) } opt2.foreach { _ => require(opt1.isEmpty, errMessage) } } + + private def getResourceNamePrefix(appName: String, engineRefId: String): String = { + s"$appName-$engineRefId" + .trim + .toLowerCase(Locale.ROOT) + .replaceAll("[^a-z0-9\\-]", "-") + .replaceAll("-+", "-") + .replaceAll("^-", "") + .replaceAll("^[0-9]", "x") + } + + def generateDriverPodName(appName: String, engineRefId: String): String = { + val resolvedResourceName = s"kyuubi-${getResourceNamePrefix(appName, engineRefId)}-driver" + if (resolvedResourceName.length <= DRIVER_POD_NAME_MAX_LENGTH) { + resolvedResourceName + } else { + s"kyuubi-$engineRefId-driver" + } + } + + def generateExecutorPodNamePrefix(appName: String, engineRefId: String): String = { + val resolvedResourceName = s"kyuubi-${getResourceNamePrefix(appName, engineRefId)}" + if (resolvedResourceName.length <= EXECUTOR_POD_NAME_PREFIX_MAX_LENGTH) { + resolvedResourceName + } else { + s"kyuubi-$engineRefId" + } + } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala index cd549c0f3..a4227d26e 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala @@ -30,7 +30,7 @@ import org.apache.kyuubi.{KerberizedTestHelper, KyuubiSQLException, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.{ENGINE_LOG_TIMEOUT, ENGINE_SPARK_MAIN_RESOURCE} import org.apache.kyuubi.engine.ProcBuilder.KYUUBI_ENGINE_LOG_PATH_KEY -import org.apache.kyuubi.engine.spark.SparkProcessBuilder.CONF +import org.apache.kyuubi.engine.spark.SparkProcessBuilder._ import org.apache.kyuubi.ha.HighAvailabilityConf import org.apache.kyuubi.ha.client.AuthTypes import org.apache.kyuubi.service.ServiceUtils @@ -305,6 +305,64 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { b1.toString.contains( s"$CONF spark.$KYUUBI_ENGINE_LOG_PATH_KEY=${b1.engineLog.getAbsolutePath}")) } + + test("[KYUUBI #5165] Test SparkProcessBuilder#appendDriverPodPrefix") { + val engineRefId = "kyuubi-test-engine" + val appName = "test-app" + val processBuilder = new SparkProcessBuilder( + "kyuubi", + conf.set(MASTER_KEY, "k8s://internal").set(DEPLOY_MODE_KEY, "cluster"), + engineRefId) + val conf1 = Map(APP_KEY -> "test-app") + val driverPodName1 = processBuilder.appendPodNameConf(conf1).get(KUBERNETES_DRIVER_POD_NAME) + assert(driverPodName1 === Some(s"kyuubi-$appName-$engineRefId-driver")) + // respect user specified driver pod name + val conf2 = conf1 ++ Map(KUBERNETES_DRIVER_POD_NAME -> "kyuubi-test-1-driver") + val driverPodName2 = processBuilder.appendPodNameConf(conf2).get(KUBERNETES_DRIVER_POD_NAME) + assert(driverPodName2 === None) + val longAppName = "thisisalonglonglonglonglonglonglonglonglonglonglonglong" + + "longlonglonglonglonglonglonglonglonglonglonglonglonglong" + + "longlonglonglonglonglonglonglonglonglonglonglonglonglong" + + "longlonglonglonglonglonglonglonglonglonglonglonglonglong" + + "longlonglonglonglonglonglonglonglonglonglonglonglonglong" + + "longlonglonglonglonglonglonglonglonglonglonglonglongappname" + val conf3 = Map(APP_KEY -> longAppName) + val driverPodName3 = processBuilder.appendPodNameConf(conf3).get(KUBERNETES_DRIVER_POD_NAME) + assert(driverPodName3 === Some(s"kyuubi-$engineRefId-driver")) + // scalastyle:off + val chineseAppName = "你好_test_任务" + // scalastyle:on + val conf4 = Map(APP_KEY -> chineseAppName) + val driverPodName4 = processBuilder.appendPodNameConf(conf4).get(KUBERNETES_DRIVER_POD_NAME) + assert(driverPodName4 === Some(s"kyuubi-test-$engineRefId-driver")) + } + + test("[KYUUBI #5165] Test SparkProcessBuilder#appendExecutorPodPrefix") { + val engineRefId = "kyuubi-test-engine" + val appName = "test-app" + val processBuilder = new SparkProcessBuilder( + "kyuubi", + conf.set(MASTER_KEY, "k8s://internal").set(DEPLOY_MODE_KEY, "cluster"), + engineRefId) + val conf1 = Map(APP_KEY -> "test-app") + val execPodNamePrefix1 = processBuilder + .appendPodNameConf(conf1).get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + assert(execPodNamePrefix1 === Some(s"kyuubi-$appName-$engineRefId")) + val conf2 = conf1 ++ Map(KUBERNETES_EXECUTOR_POD_NAME_PREFIX -> "kyuubi-test") + val execPodNamePrefix2 = processBuilder + .appendPodNameConf(conf2).get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + assert(execPodNamePrefix2 === None) + val longAppName = "thisisalonglonglonglonglonglonglonglonglonglonglonglong" + + "longlonglonglonglonglonglonglonglonglonglonglonglonglong" + + "longlonglonglonglonglonglonglonglonglonglonglonglonglong" + + "longlonglonglonglonglonglonglonglonglonglonglonglonglong" + + "longlonglonglonglonglonglonglonglonglonglonglonglonglong" + + "longlonglonglonglonglonglonglonglonglonglonglonglongappname" + val conf3 = Map(APP_KEY -> longAppName) + val execPodNamePrefix3 = processBuilder + .appendPodNameConf(conf3).get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + assert(execPodNamePrefix3 === Some(s"kyuubi-$engineRefId")) + } } class FakeSparkProcessBuilder(config: KyuubiConf)