From bada9c04115a966132d66bb0cc5e7997e97b4b1b Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Thu, 12 Jun 2025 10:20:44 -0700 Subject: [PATCH] [KYUUBI #7095] Respect terminated app state when building batch info from metadata ### Why are the changes needed? Respect terminated app state when building batch info from metadata It is a followup for https://github.com/apache/kyuubi/pull/2911, https://github.com/apache/kyuubi/blob/9e40e39c39566c435ad92f0659c6120b9f2b8578/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala#L128-L142 1. if the kyuubi instance is unreachable during maintain window. 2. the batch app state has been terminated, and the app stated was backfilled by another kyuubi instance peer, see #2911 3. the batch state in the metadata table is still PENDING/RUNNING 4. return the terminated batch state for such case instead of `PENDING or RUNNING`. ### How was this patch tested? GA and IT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #7095 from turboFei/always_respect_appstate. Closes #7095 ec72666c9 [Wang, Fei] rename bc74a9c56 [Wang, Fei] if op not terminated e786c8d9b [Wang, Fei] respect terminated app state when building batch info from metadata Authored-by: Wang, Fei Signed-off-by: Wang, Fei --- .../server/api/v1/BatchesResource.scala | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index 6e7fa9a33..463252755 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -91,7 +91,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { private def sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager] - private def buildBatch(session: KyuubiBatchSession): Batch = { + private def buildBatchFromSession(session: KyuubiBatchSession): Batch = { val batchOp = session.batchJobSubmissionOp val batchOpStatus = batchOp.getStatus @@ -125,7 +125,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { Map.empty[String, String].asJava) } - private def buildBatch( + private def buildBatchFromMetadataAndAppInfo( metadata: Metadata, batchAppStatus: Option[ApplicationInfo]): Batch = { batchAppStatus.map { appStatus => @@ -316,7 +316,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { } match { case Success(sessionHandle) => sessionManager.getBatchSession(sessionHandle) match { - case Some(batchSession) => buildBatch(batchSession) + case Some(batchSession) => buildBatchFromSession(batchSession) case None => throw new IllegalStateException( s"can not find batch $batchId from metadata store") } @@ -349,7 +349,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { val userName = fe.getSessionUser(Map.empty[String, String]) val sessionHandle = formatSessionHandle(batchId) sessionManager.getBatchSession(sessionHandle).map { batchSession => - buildBatch(batchSession) + buildBatchFromSession(batchSession) }.getOrElse { sessionManager.getBatchMetadata(batchId).map { metadata => val isOperationTerminated = (StringUtils.isNotBlank(metadata.state) @@ -362,7 +362,18 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { isOperationTerminated || isApplicationTerminated || metadata.kyuubiInstance == fe.connectionUrl) { - MetadataManager.buildBatch(metadata) + if (isApplicationTerminated && !isOperationTerminated) { + buildBatchFromMetadataAndAppInfo( + metadata, + Some(ApplicationInfo( + metadata.engineId, + metadata.engineName, + metadata.appState.orNull, + Option(metadata.engineUrl), + metadata.engineError))) + } else { + MetadataManager.buildBatch(metadata) + } } else { val internalRestClient = getInternalRestClient(metadata.kyuubiInstance) try { @@ -387,7 +398,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { engineState = appInfo.state.toString, engineError = appInfo.error)) } - buildBatch(metadata, batchAppStatus) + buildBatchFromMetadataAndAppInfo(metadata, batchAppStatus) } } }.getOrElse {