From b23c87c318a67445abb65252e547844471a65b3d Mon Sep 17 00:00:00 2001 From: zwangsheng <2213335496@qq.com> Date: Tue, 14 Mar 2023 23:01:43 +0800 Subject: [PATCH] [KYUUBI #4467][K8S][BATCH] Tolerate Driver Pod ephemerally invisible after submitting ### _Why are the changes needed?_ The following discussion assumes using Spark cluster mode w/ `waitCompletion=false`. In Spark on Yarn, the application is visible immediately after `spark-submit` is returned, but things are different in Spark on K8s, Driver Pod is ephemerally invisible after submitting, so NOT_FOUND is returned instead of UNKNOWN or PENDING. To tolerate the above case, `kyuubi.engine.submit.timeout` is introduced, ApplicationManager will report UNKNOWN instead of NOT_FOUND during the Driver Pod scheduling period. More detail in #4467 1. Remove `KubernetesApplicationOperation`'s `JpsApplicationOperation` for handle Client Deploy Mode(`YarnApplicationOperation` doesn't handle this either) 2. Add engine submit timeout for `KubernetesApplicationOperation` to return Unknown status when not found driver pod in time range. 3. GetApplicationInfo with it's submit time ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4469 from zwangsheng/4467. Closes #4467 562b67463 [zwangsheng] [KYUUBI #4467] Fix Setting.md 362c43d1b [zwangsheng] [KYUUBI #4467] Fix Setting.md ac69f4d81 [zwangsheng] [KYUUBI #4467] Add Config Desc d2b9fb660 [zwangsheng] [KYUUBI #4467] save tab eac880fcf [zwangsheng] [KYUUBI #4467] Ingnore Kubernetes Operation for client mode test 7a20b97a4 [Cheng Pan] Update kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala aa4c7716a [zwangsheng] [KYUUBI #4467] Ingnore Kubernetes Operation for client mode test c5bd888ab [zwangsheng] [KYUUBI #4467] note it test a86dcefba [zwangsheng] [KYUUBI #4467] Using default none aed7f8794 [Cheng Pan] Update docs/deployment/settings.md 490df7dc0 [zwangsheng] [KYUUBI #4467] fix complie 33f3a5be8 [zwangsheng] [KYUUBI #4467] fix comments 4745790cf [zwangsheng] [KYUUBI #4467] Fix IT Test 924cfe38e [zwangsheng] [KYUUBI #4467] Fix Setting.md 5f8aeaacc [zwangsheng] [KYUUBI #4467] KubernetesApplicationOperation Wait if not fount driver pod in limit time range Lead-authored-by: zwangsheng <2213335496@qq.com> Co-authored-by: Cheng Pan Signed-off-by: Cheng Pan --- docs/deployment/settings.md | 1 + .../spark/SparkOnKubernetesTestsSuite.scala | 3 +- .../org/apache/kyuubi/config/KyuubiConf.scala | 9 +++ .../kyuubi/engine/ApplicationOperation.scala | 3 +- .../org/apache/kyuubi/engine/EngineRef.scala | 5 +- .../engine/JpsApplicationOperation.scala | 2 +- .../KubernetesApplicationOperation.scala | 73 +++++++++++-------- .../engine/KyuubiApplicationManager.scala | 5 +- .../engine/YarnApplicationOperation.scala | 2 +- .../kyuubi/operation/BatchJobSubmission.scala | 10 ++- .../server/api/v1/BatchesResource.scala | 3 +- 11 files changed, 77 insertions(+), 39 deletions(-) diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index b34aa4c48..a2474a506 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -160,6 +160,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.engine.spark.python.env.archive | <undefined> | Portable Python env archive used for Spark engine Python language mode. | string | 1.7.0 | | kyuubi.engine.spark.python.env.archive.exec.path | bin/python | The Python exec path under the Python env archive. | string | 1.7.0 | | kyuubi.engine.spark.python.home.archive | <undefined> | Spark archive containing $SPARK_HOME/python directory, which is used to init session Python worker for Python language mode. | string | 1.7.0 | +| kyuubi.engine.submit.timeout | PT30S | Period to tolerant Driver Pod ephemerally invisible after submitting. In some Resource Managers, e.g. K8s, the Driver Pod is not invisible immediately after `spark-submit` is returned. | duration | 1.7.1 | | kyuubi.engine.trino.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go. | seq | 1.7.0 | | kyuubi.engine.trino.extra.classpath | <undefined> | The extra classpath for the Trino query engine, for configuring other libs which may need by the Trino engine | string | 1.6.0 | | kyuubi.engine.trino.java.options | <undefined> | The extra Java options for the Trino query engine | string | 1.6.0 | diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala index 14db8b408..831504608 100644 --- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala +++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala @@ -125,6 +125,7 @@ class SparkClusterModeOnKubernetesSuite override protected def jdbcUrl: String = getJdbcUrl } +// [KYUUBI #4467] KubernetesApplicationOperator doesn't support client mode class KyuubiOperationKubernetesClusterClientModeSuite extends SparkClientModeOnKubernetesSuiteBase { private lazy val k8sOperation: KubernetesApplicationOperation = { @@ -136,7 +137,7 @@ class KyuubiOperationKubernetesClusterClientModeSuite private def sessionManager: KyuubiSessionManager = server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager] - test("Spark Client Mode On Kubernetes Kyuubi KubernetesApplicationOperation Suite") { + ignore("Spark Client Mode On Kubernetes Kyuubi KubernetesApplicationOperation Suite") { val batchRequest = newSparkBatchRequest(conf.getAll ++ Map( KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index bd8fdaa2a..1b7737344 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -2541,6 +2541,15 @@ object KyuubiConf { .booleanConf .createWithDefault(true) + val ENGINE_SUBMIT_TIMEOUT: ConfigEntry[Long] = + buildConf("kyuubi.engine.submit.timeout") + .doc("Period to tolerant Driver Pod ephemerally invisible after submitting. " + + "In some Resource Managers, e.g. K8s, the Driver Pod is not invisible immediately " + + "after `spark-submit` is returned.") + .version("1.7.1") + .timeConf + .createWithDefaultString("PT30S") + /** * Holds information about keys that have been deprecated. * diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala index 93d495895..00db372ce 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala @@ -56,9 +56,10 @@ trait ApplicationOperation { * Get the engine/application status by the unique application tag * * @param tag the unique application tag for engine instance. + * @param submitTime engine submit to resourceManager time * @return [[ApplicationInfo]] */ - def getApplicationInfoByTag(tag: String): ApplicationInfo + def getApplicationInfoByTag(tag: String, submitTime: Option[Long] = None): ApplicationInfo } object ApplicationState extends Enumeration { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index 84b7707e8..1f38ea3c2 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -216,7 +216,10 @@ private[kyuubi] class EngineRef( // check the engine application state from engine manager and fast fail on engine terminate if (exitValue == Some(0)) { Option(engineManager).foreach { engineMgr => - engineMgr.getApplicationInfo(builder.clusterManager(), engineRefId).foreach { appInfo => + engineMgr.getApplicationInfo( + builder.clusterManager(), + engineRefId, + Some(started)).foreach { appInfo => if (ApplicationState.isTerminated(appInfo.state)) { MetricsSystem.tracing { ms => ms.incCount(MetricRegistry.name(ENGINE_FAIL, appUser)) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala index bd482b86b..ce2e05461 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala @@ -84,7 +84,7 @@ class JpsApplicationOperation extends ApplicationOperation { killJpsApplicationByTag(tag, true) } - override def getApplicationInfoByTag(tag: String): ApplicationInfo = { + override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]): ApplicationInfo = { val commandOption = getEngine(tag) if (commandOption.nonEmpty) { val idAndCmd = commandOption.get diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index 1dd5ff83f..d0820b9ae 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -32,16 +32,15 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { @volatile private var kubernetesClient: KubernetesClient = _ - private var jpsOperation: JpsApplicationOperation = _ + + private var submitTimeout: Long = _ override def initialize(conf: KyuubiConf): Unit = { - jpsOperation = new JpsApplicationOperation - jpsOperation.initialize(conf) - info("Start initializing Kubernetes Client.") kubernetesClient = KubernetesUtils.buildKubernetesClient(conf) match { case Some(client) => info(s"Initialized Kubernetes Client connect to: ${client.getMasterUrl}") + submitTimeout = conf.get(KyuubiConf.ENGINE_SUBMIT_TIMEOUT) client case None => warn("Fail to init Kubernetes Client for Kubernetes Application Operation") @@ -50,6 +49,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { } override def isSupported(clusterManager: Option[String]): Boolean = { + // TODO add deploy mode to check whether is supported kubernetesClient != null && clusterManager.nonEmpty && clusterManager.get.toLowerCase.startsWith("k8s") } @@ -73,8 +73,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { s"Operation of deleted appId: ${podList.get(0).getMetadata.getName} is completed") } } else { - // client mode - jpsOperation.killApplicationByTag(tag) + ( + false, + s"Target Pod(tag: $tag) is not found, due to pod have been deleted or not created") } } catch { case e: Exception => @@ -85,32 +86,44 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { } } - override def getApplicationInfoByTag(tag: String): ApplicationInfo = { - if (kubernetesClient != null) { - debug(s"Getting application info from Kubernetes cluster by $tag tag") - try { - val podList = findDriverPodByTag(tag) - if (podList.size() != 0) { - val pod = podList.get(0) - val info = ApplicationInfo( - // spark pods always tag label `spark-app-selector:` - id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL), - name = pod.getMetadata.getName, - state = KubernetesApplicationOperation.toApplicationState(pod.getStatus.getPhase), - error = Option(pod.getStatus.getReason)) - debug(s"Successfully got application info by $tag: $info") - info - } else { - // client mode - jpsOperation.getApplicationInfoByTag(tag) - } - } catch { - case e: Exception => - error(s"Failed to get application with $tag, due to ${e.getMessage}") + override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]): ApplicationInfo = { + if (kubernetesClient == null) { + throw new IllegalStateException("Methods initialize and isSupported must be called ahead") + } + debug(s"Getting application info from Kubernetes cluster by $tag tag") + try { + val podList = findDriverPodByTag(tag) + if (podList.size() != 0) { + val pod = podList.get(0) + val info = ApplicationInfo( + // spark pods always tag label `spark-app-selector:` + id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL), + name = pod.getMetadata.getName, + state = KubernetesApplicationOperation.toApplicationState(pod.getStatus.getPhase), + error = Option(pod.getStatus.getReason)) + debug(s"Successfully got application info by $tag: $info") + return info + } + // Kyuubi should wait second if pod is not be created + submitTime match { + case Some(time) => + val elapsedTime = System.currentTimeMillis() - time + if (elapsedTime > submitTimeout) { + error(s"Can't find target driver pod by tag: $tag, " + + s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.") + ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND) + } else { + warn("Wait for driver pod to be created, " + + s"elapsed time: ${elapsedTime}ms, return UNKNOWN status") + ApplicationInfo(id = null, name = null, ApplicationState.UNKNOWN) + } + case None => ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND) } - } else { - throw new IllegalStateException("Methods initialize and isSupported must be called ahead") + } catch { + case e: Exception => + error(s"Failed to get application with $tag, due to ${e.getMessage}") + ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND) } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala index 70c130012..9b23e550d 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala @@ -84,10 +84,11 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager def getApplicationInfo( clusterManager: Option[String], - tag: String): Option[ApplicationInfo] = { + tag: String, + submitTime: Option[Long] = None): Option[ApplicationInfo] = { val operation = operations.find(_.isSupported(clusterManager)) operation match { - case Some(op) => Some(op.getApplicationInfoByTag(tag)) + case Some(op) => Some(op.getApplicationInfoByTag(tag, submitTime)) case None => None } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala index b38b1daa2..e836e65da 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala @@ -75,7 +75,7 @@ class YarnApplicationOperation extends ApplicationOperation with Logging { } } - override def getApplicationInfoByTag(tag: String): ApplicationInfo = { + override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]): ApplicationInfo = { if (yarnClient != null) { debug(s"Getting application info from Yarn cluster by $tag tag") val reports = yarnClient.getApplications(null, null, Set(tag).asJava) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index f061d977d..883afcf58 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -105,8 +105,16 @@ class BatchJobSubmission( override protected def currentApplicationInfo: Option[ApplicationInfo] = { if (isTerminal(state) && _applicationInfo.nonEmpty) return _applicationInfo // only the ApplicationInfo with non-empty id is valid for the operation + val submitTime = if (_appStartTime <= 0) { + System.currentTimeMillis() + } else { + _appStartTime + } val applicationInfo = - applicationManager.getApplicationInfo(builder.clusterManager(), batchId).filter(_.id != null) + applicationManager.getApplicationInfo( + builder.clusterManager(), + batchId, + Some(submitTime)).filter(_.id != null) applicationInfo.foreach { _ => if (_appStartTime <= 0) { _appStartTime = System.currentTimeMillis() diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index 0ef3c8bac..4814996a4 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -296,7 +296,8 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { error(s"Error redirecting get batch[$batchId] to ${metadata.kyuubiInstance}", e) val batchAppStatus = sessionManager.applicationManager.getApplicationInfo( metadata.clusterManager, - batchId) + batchId, + Some(metadata.createTime)) buildBatch(metadata, batchAppStatus) } }