From dd2cd516d72631b464bcec2d4bcafc29d61d3880 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 14 Sep 2023 12:13:20 +0800 Subject: [PATCH] [KYUUBI #5220][FOLLOWUP] Batch submitted considers application state ### _Why are the changes needed?_ This PR aims to fix the `SparkSubmit` concurrency limit implemented in #5220. "submitted" judgment in #5220 only considered `OperationState`, actually, `ApplicationState` should be counted too. For instance, if a batch is pending in `ACCEPTED` state, the `SparkSubmit` process won't exit until changed to `RUNNING` or `FAILED` state, in such case, the `OperationState` is `RUNNING` and `ApplicationState` is `PENDING`, it should not be treated as "submitted". Additionally, this PR treats metastore as the single of truth for batch instead of `SessionManager` ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No. Closes #5279 from pan3793/5220-followup. Closes #5220 903abc6d4 [Cheng Pan] Fix 0af6738d2 [Cheng Pan] [KYUUBI #5220][FOLLOWUP] Batch submmited should not contain application pending state Authored-by: Cheng Pan Signed-off-by: Cheng Pan --- .../kyuubi/server/KyuubiBatchService.scala | 37 ++++++++++--------- .../kyuubi/server/metadata/api/Metadata.scala | 12 +++++- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala index bf10a68fa..2bfbbce2a 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala @@ -20,6 +20,7 @@ package org.apache.kyuubi.server import java.util.concurrent.atomic.AtomicBoolean import org.apache.kyuubi.config.KyuubiConf.BATCH_SUBMITTER_THREADS +import org.apache.kyuubi.engine.ApplicationState import org.apache.kyuubi.operation.OperationState import org.apache.kyuubi.server.metadata.MetadataManager import org.apache.kyuubi.service.{AbstractService, Serverable} @@ -83,26 +84,28 @@ class KyuubiBatchService( metadata.requestArgs, Some(metadata), fromRecovery = false) - val sessionHandle = sessionManager.openBatchSession(batchSession) + sessionManager.openBatchSession(batchSession) var submitted = false while (!submitted) { // block until batch job submitted - submitted = sessionManager.getBatchSession(sessionHandle).map { batchSession => - val batchState = batchSession.batchJobSubmissionOp.getStatus.state - batchState == OperationState.RUNNING || OperationState.isTerminal(batchState) - }.getOrElse { - error(s"Batch Session $batchId is not existed, marked as finished") - true + submitted = metadataManager.getBatchSessionMetadata(batchId) match { + case Some(metadata) if OperationState.isTerminal(metadata.opState) => + true + case Some(metadata) if metadata.opState == OperationState.RUNNING => + metadata.appState match { + // app that is not submitted to resource manager + case None | Some(ApplicationState.NOT_FOUND) => false + // app that is pending in resource manager + case Some(ApplicationState.PENDING) => false + // not sure, added for safe + case Some(ApplicationState.UNKNOWN) => false + case _ => true + } + case Some(_) => + false + case None => + error(s"$batchId does not existed in metastore, assume it is finished") + true } - // should we always treat metastore as the single of truth? - // - // submitted = metadataManager.getBatchSessionMetadata(batchId) match { - // case Some(metadata) => - // val batchState = OperationState.withName(metadata.state) - // batchState == OperationState.RUNNING || OperationState.isTerminal(batchState) - // case None => - // error(s"$batchId does not existed in metastore, assume it is finished") - // true - // } if (!submitted) Thread.sleep(1000) } info(s"$batchId is submitted or finished.") diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala index 12759f8cc..3e3d94828 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala @@ -18,7 +18,10 @@ package org.apache.kyuubi.server.metadata.api import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.engine.ApplicationManagerInfo +import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState} +import org.apache.kyuubi.engine.ApplicationState.ApplicationState +import org.apache.kyuubi.operation.OperationState +import org.apache.kyuubi.operation.OperationState.OperationState import org.apache.kyuubi.session.SessionType.SessionType /** @@ -82,4 +85,11 @@ case class Metadata( requestConf.get(KyuubiConf.KUBERNETES_CONTEXT.key), requestConf.get(KyuubiConf.KUBERNETES_NAMESPACE.key)) } + + def opState: OperationState = { + assert(state != null, "invalid state, a normal batch record must have non-null state") + OperationState.withName(state) + } + + def appState: Option[ApplicationState] = Option(engineState).map(ApplicationState.withName) }