[KYUUBI #7095] Respect terminated app state when building batch info from metadata

### Why are the changes needed?

Respect terminated app state when building batch info from metadata

It is a followup for https://github.com/apache/kyuubi/pull/2911,
9e40e39c39/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala (L128-L142)

1. if the kyuubi instance is unreachable during maintain window.
2. the batch app state has been terminated, and the app stated was backfilled by another kyuubi instance peer, see #2911
3. the batch state in the metadata table is still PENDING/RUNNING
4. return the terminated batch state for such case instead of `PENDING or RUNNING`.
### How was this patch tested?

GA and IT.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #7095 from turboFei/always_respect_appstate.

Closes #7095

ec72666c9 [Wang, Fei] rename
bc74a9c56 [Wang, Fei] if op not terminated
e786c8d9b [Wang, Fei] respect terminated app state when building batch info from metadata

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Wang, Fei 2025-06-12 10:20:44 -07:00
parent 00ff464b06
commit bada9c0411

View File

@ -91,7 +91,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
private def sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
private def buildBatch(session: KyuubiBatchSession): Batch = {
private def buildBatchFromSession(session: KyuubiBatchSession): Batch = {
val batchOp = session.batchJobSubmissionOp
val batchOpStatus = batchOp.getStatus
@ -125,7 +125,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
Map.empty[String, String].asJava)
}
private def buildBatch(
private def buildBatchFromMetadataAndAppInfo(
metadata: Metadata,
batchAppStatus: Option[ApplicationInfo]): Batch = {
batchAppStatus.map { appStatus =>
@ -316,7 +316,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
} match {
case Success(sessionHandle) =>
sessionManager.getBatchSession(sessionHandle) match {
case Some(batchSession) => buildBatch(batchSession)
case Some(batchSession) => buildBatchFromSession(batchSession)
case None => throw new IllegalStateException(
s"can not find batch $batchId from metadata store")
}
@ -349,7 +349,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
val userName = fe.getSessionUser(Map.empty[String, String])
val sessionHandle = formatSessionHandle(batchId)
sessionManager.getBatchSession(sessionHandle).map { batchSession =>
buildBatch(batchSession)
buildBatchFromSession(batchSession)
}.getOrElse {
sessionManager.getBatchMetadata(batchId).map { metadata =>
val isOperationTerminated = (StringUtils.isNotBlank(metadata.state)
@ -362,7 +362,18 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
isOperationTerminated ||
isApplicationTerminated ||
metadata.kyuubiInstance == fe.connectionUrl) {
MetadataManager.buildBatch(metadata)
if (isApplicationTerminated && !isOperationTerminated) {
buildBatchFromMetadataAndAppInfo(
metadata,
Some(ApplicationInfo(
metadata.engineId,
metadata.engineName,
metadata.appState.orNull,
Option(metadata.engineUrl),
metadata.engineError)))
} else {
MetadataManager.buildBatch(metadata)
}
} else {
val internalRestClient = getInternalRestClient(metadata.kyuubiInstance)
try {
@ -387,7 +398,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
engineState = appInfo.state.toString,
engineError = appInfo.error))
}
buildBatch(metadata, batchAppStatus)
buildBatchFromMetadataAndAppInfo(metadata, batchAppStatus)
}
}
}.getOrElse {