[KYUUBI #5165][K8S][SPARK] Build Spark Driver/Executor Pod Name(Prefix) in process

### _Why are the changes needed?_

1. Print those pod name/prefix in kyuubi server log
2. Make sure driver pod name in length limit

close #5165
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

Closes #5168 from zwangsheng/KYUUBI#5165.

Closes #5165

290a46860 [Cheng Pan] nit
9c9dd2d38 [Cheng Pan] Revert "Add app Name"
50187cddb [zwangsheng] Add app Name
f704a9839 [Cheng Pan] app key
463904bec [Cheng Pan] revise
33d1ffd9e [Cheng Pan] Apply suggestions from code review
0b7e64da5 [Cheng Pan] Apply suggestions from code review
b3b1e08fd [zwangsheng] Fix Test
742e78461 [zwangsheng] modify long app name length to 266
90b165e95 [zwangsheng] fix test
191447058 [zwangsheng] fix test fail
05698fdb1 [zwangsheng] fix test fail
39637ae68 [zwangsheng] fix comments
23513810f [zwangsheng] fix comments
a56630179 [Cheng Pan] Update kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
8599a1430 [zwangsheng] fix style
73aec9a03 [zwangsheng] [KYUUBI #5165][K8S][SPARK] Build Spark Driver/Executor Pod Name(Prefix) in process

Lead-authored-by: zwangsheng <2213335496@qq.com>
Co-authored-by: Cheng Pan <chengpan@apache.org>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
zwangsheng 2023-08-16 21:10:23 +08:00 committed by Cheng Pan
parent b57bc1cab6
commit f05aefb866
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
4 changed files with 119 additions and 3 deletions

View File

@ -51,7 +51,10 @@ class SparkBatchProcessBuilder(
// tag batch application
KyuubiApplicationManager.tagApplication(batchId, "spark", clusterManager(), batchKyuubiConf)
(batchKyuubiConf.getAll ++ sparkAppNameConf() ++ engineLogPathConf()).foreach { case (k, v) =>
(batchKyuubiConf.getAll ++
sparkAppNameConf() ++
engineLogPathConf() ++
appendPodNameConf(batchConf)).foreach { case (k, v) =>
buffer += CONF
buffer += s"${convertConfigKey(k)}=$v"
}

View File

@ -21,6 +21,7 @@ import java.io.{File, IOException}
import java.nio.file.Paths
import java.util.Locale
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import com.google.common.annotations.VisibleForTesting
@ -34,6 +35,7 @@ import org.apache.kyuubi.engine.ProcBuilder.KYUUBI_ENGINE_LOG_PATH_KEY
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.KubernetesUtils
import org.apache.kyuubi.util.Validator
class SparkProcessBuilder(
@ -118,7 +120,7 @@ class SparkProcessBuilder(
allConf = allConf ++ zkAuthKeytabFileConf(allConf)
}
// pass spark engine log path to spark conf
(allConf ++ engineLogPathConf).foreach { case (k, v) =>
(allConf ++ engineLogPathConf ++ appendPodNameConf(allConf)).foreach { case (k, v) =>
buffer += CONF
buffer += s"${convertConfigKey(k)}=$v"
}
@ -208,6 +210,24 @@ class SparkProcessBuilder(
kubernetesNamespace())
}
def appendPodNameConf(conf: Map[String, String]): Map[String, String] = {
val appName = conf.getOrElse(APP_KEY, "spark")
val map = mutable.Map.newBuilder[String, String]
if (clusterManager().exists(cm => cm.toLowerCase(Locale.ROOT).startsWith("k8s"))) {
if (!conf.contains(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)) {
val prefix = KubernetesUtils.generateExecutorPodNamePrefix(appName, engineRefId)
map += (KUBERNETES_EXECUTOR_POD_NAME_PREFIX -> prefix)
}
if (deployMode().exists(_.toLowerCase(Locale.ROOT) == "cluster")) {
if (!conf.contains(KUBERNETES_DRIVER_POD_NAME)) {
val name = KubernetesUtils.generateDriverPodName(appName, engineRefId)
map += (KUBERNETES_DRIVER_POD_NAME -> name)
}
}
}
map.result().toMap
}
override def clusterManager(): Option[String] = {
conf.getOption(MASTER_KEY).orElse(defaultsConf.get(MASTER_KEY))
}
@ -258,6 +278,8 @@ object SparkProcessBuilder {
final val DEPLOY_MODE_KEY = "spark.submit.deployMode"
final val KUBERNETES_CONTEXT_KEY = "spark.kubernetes.context"
final val KUBERNETES_NAMESPACE_KEY = "spark.kubernetes.namespace"
final val KUBERNETES_DRIVER_POD_NAME = "spark.kubernetes.driver.pod.name"
final val KUBERNETES_EXECUTOR_POD_NAME_PREFIX = "spark.kubernetes.executor.podNamePrefix"
final val INTERNAL_RESOURCE = "spark-internal"
/**

View File

@ -18,6 +18,7 @@
package org.apache.kyuubi.util
import java.io.File
import java.util.Locale
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.base.Charsets
@ -32,6 +33,10 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
object KubernetesUtils extends Logging {
// Kubernetes pod name max length - '-exec-' - Int.MAX_VALUE.length
// 253 - 10 - 6
final val EXECUTOR_POD_NAME_PREFIX_MAX_LENGTH = 237
final val DRIVER_POD_NAME_MAX_LENGTH = 253
def buildKubernetesClient(conf: KyuubiConf): Option[KubernetesClient] = {
val master = conf.get(KUBERNETES_MASTER)
@ -114,4 +119,32 @@ object KubernetesUtils extends Logging {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
opt2.foreach { _ => require(opt1.isEmpty, errMessage) }
}
private def getResourceNamePrefix(appName: String, engineRefId: String): String = {
s"$appName-$engineRefId"
.trim
.toLowerCase(Locale.ROOT)
.replaceAll("[^a-z0-9\\-]", "-")
.replaceAll("-+", "-")
.replaceAll("^-", "")
.replaceAll("^[0-9]", "x")
}
def generateDriverPodName(appName: String, engineRefId: String): String = {
val resolvedResourceName = s"kyuubi-${getResourceNamePrefix(appName, engineRefId)}-driver"
if (resolvedResourceName.length <= DRIVER_POD_NAME_MAX_LENGTH) {
resolvedResourceName
} else {
s"kyuubi-$engineRefId-driver"
}
}
def generateExecutorPodNamePrefix(appName: String, engineRefId: String): String = {
val resolvedResourceName = s"kyuubi-${getResourceNamePrefix(appName, engineRefId)}"
if (resolvedResourceName.length <= EXECUTOR_POD_NAME_PREFIX_MAX_LENGTH) {
resolvedResourceName
} else {
s"kyuubi-$engineRefId"
}
}
}

View File

@ -30,7 +30,7 @@ import org.apache.kyuubi.{KerberizedTestHelper, KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_LOG_TIMEOUT, ENGINE_SPARK_MAIN_RESOURCE}
import org.apache.kyuubi.engine.ProcBuilder.KYUUBI_ENGINE_LOG_PATH_KEY
import org.apache.kyuubi.engine.spark.SparkProcessBuilder.CONF
import org.apache.kyuubi.engine.spark.SparkProcessBuilder._
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.service.ServiceUtils
@ -305,6 +305,64 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
b1.toString.contains(
s"$CONF spark.$KYUUBI_ENGINE_LOG_PATH_KEY=${b1.engineLog.getAbsolutePath}"))
}
test("[KYUUBI #5165] Test SparkProcessBuilder#appendDriverPodPrefix") {
val engineRefId = "kyuubi-test-engine"
val appName = "test-app"
val processBuilder = new SparkProcessBuilder(
"kyuubi",
conf.set(MASTER_KEY, "k8s://internal").set(DEPLOY_MODE_KEY, "cluster"),
engineRefId)
val conf1 = Map(APP_KEY -> "test-app")
val driverPodName1 = processBuilder.appendPodNameConf(conf1).get(KUBERNETES_DRIVER_POD_NAME)
assert(driverPodName1 === Some(s"kyuubi-$appName-$engineRefId-driver"))
// respect user specified driver pod name
val conf2 = conf1 ++ Map(KUBERNETES_DRIVER_POD_NAME -> "kyuubi-test-1-driver")
val driverPodName2 = processBuilder.appendPodNameConf(conf2).get(KUBERNETES_DRIVER_POD_NAME)
assert(driverPodName2 === None)
val longAppName = "thisisalonglonglonglonglonglonglonglonglonglonglonglong" +
"longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
"longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
"longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
"longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
"longlonglonglonglonglonglonglonglonglonglonglonglongappname"
val conf3 = Map(APP_KEY -> longAppName)
val driverPodName3 = processBuilder.appendPodNameConf(conf3).get(KUBERNETES_DRIVER_POD_NAME)
assert(driverPodName3 === Some(s"kyuubi-$engineRefId-driver"))
// scalastyle:off
val chineseAppName = "你好_test_任务"
// scalastyle:on
val conf4 = Map(APP_KEY -> chineseAppName)
val driverPodName4 = processBuilder.appendPodNameConf(conf4).get(KUBERNETES_DRIVER_POD_NAME)
assert(driverPodName4 === Some(s"kyuubi-test-$engineRefId-driver"))
}
test("[KYUUBI #5165] Test SparkProcessBuilder#appendExecutorPodPrefix") {
val engineRefId = "kyuubi-test-engine"
val appName = "test-app"
val processBuilder = new SparkProcessBuilder(
"kyuubi",
conf.set(MASTER_KEY, "k8s://internal").set(DEPLOY_MODE_KEY, "cluster"),
engineRefId)
val conf1 = Map(APP_KEY -> "test-app")
val execPodNamePrefix1 = processBuilder
.appendPodNameConf(conf1).get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
assert(execPodNamePrefix1 === Some(s"kyuubi-$appName-$engineRefId"))
val conf2 = conf1 ++ Map(KUBERNETES_EXECUTOR_POD_NAME_PREFIX -> "kyuubi-test")
val execPodNamePrefix2 = processBuilder
.appendPodNameConf(conf2).get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
assert(execPodNamePrefix2 === None)
val longAppName = "thisisalonglonglonglonglonglonglonglonglonglonglonglong" +
"longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
"longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
"longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
"longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
"longlonglonglonglonglonglonglonglonglonglonglonglongappname"
val conf3 = Map(APP_KEY -> longAppName)
val execPodNamePrefix3 = processBuilder
.appendPodNameConf(conf3).get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
assert(execPodNamePrefix3 === Some(s"kyuubi-$engineRefId"))
}
}
class FakeSparkProcessBuilder(config: KyuubiConf)