diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala index bf10a68fa..2bfbbce2a 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala @@ -20,6 +20,7 @@ package org.apache.kyuubi.server import java.util.concurrent.atomic.AtomicBoolean import org.apache.kyuubi.config.KyuubiConf.BATCH_SUBMITTER_THREADS +import org.apache.kyuubi.engine.ApplicationState import org.apache.kyuubi.operation.OperationState import org.apache.kyuubi.server.metadata.MetadataManager import org.apache.kyuubi.service.{AbstractService, Serverable} @@ -83,26 +84,28 @@ class KyuubiBatchService( metadata.requestArgs, Some(metadata), fromRecovery = false) - val sessionHandle = sessionManager.openBatchSession(batchSession) + sessionManager.openBatchSession(batchSession) var submitted = false while (!submitted) { // block until batch job submitted - submitted = sessionManager.getBatchSession(sessionHandle).map { batchSession => - val batchState = batchSession.batchJobSubmissionOp.getStatus.state - batchState == OperationState.RUNNING || OperationState.isTerminal(batchState) - }.getOrElse { - error(s"Batch Session $batchId is not existed, marked as finished") - true + submitted = metadataManager.getBatchSessionMetadata(batchId) match { + case Some(metadata) if OperationState.isTerminal(metadata.opState) => + true + case Some(metadata) if metadata.opState == OperationState.RUNNING => + metadata.appState match { + // app that is not submitted to resource manager + case None | Some(ApplicationState.NOT_FOUND) => false + // app that is pending in resource manager + case Some(ApplicationState.PENDING) => false + // not sure, added for safe + case Some(ApplicationState.UNKNOWN) => false + case _ => true + } + case Some(_) => + false + case None => + error(s"$batchId does not existed in metastore, assume it is finished") + true } - // should we always treat metastore as the single of truth? - // - // submitted = metadataManager.getBatchSessionMetadata(batchId) match { - // case Some(metadata) => - // val batchState = OperationState.withName(metadata.state) - // batchState == OperationState.RUNNING || OperationState.isTerminal(batchState) - // case None => - // error(s"$batchId does not existed in metastore, assume it is finished") - // true - // } if (!submitted) Thread.sleep(1000) } info(s"$batchId is submitted or finished.") diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala index 12759f8cc..3e3d94828 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala @@ -18,7 +18,10 @@ package org.apache.kyuubi.server.metadata.api import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.engine.ApplicationManagerInfo +import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState} +import org.apache.kyuubi.engine.ApplicationState.ApplicationState +import org.apache.kyuubi.operation.OperationState +import org.apache.kyuubi.operation.OperationState.OperationState import org.apache.kyuubi.session.SessionType.SessionType /** @@ -82,4 +85,11 @@ case class Metadata( requestConf.get(KyuubiConf.KUBERNETES_CONTEXT.key), requestConf.get(KyuubiConf.KUBERNETES_NAMESPACE.key)) } + + def opState: OperationState = { + assert(state != null, "invalid state, a normal batch record must have non-null state") + OperationState.withName(state) + } + + def appState: Option[ApplicationState] = Option(engineState).map(ApplicationState.withName) }