From 5bce321adb30be40ba08e424c4f3054a39468807 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Mon, 24 Mar 2025 12:53:22 -0700 Subject: [PATCH] [KYUUBI #6997] Get the latest batch app info after submit process terminated to prevent batch ERROR due to engine submit timeout ### Why are the changes needed? We meet below issue: For spark on yarn: ``` spark.yarn.submit.waitAppCompletion=false kyuubi.engine.yarn.submit.timeout=PT10M ``` Due to network issue, the application submission was very slow. It was submitted after 15 minutes. image image Then the batch failed from PENDING state to ERRO state directly, due to application state NOT_FOUND(exceeds the kyuubi.engine.yarn.submit.timeout). https://github.com/apache/kyuubi/blob/a54ee39ab338e310c6b9a508ad8f14c0bd82fa0f/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala#L99-L106 image Here is the operation event: image But from the batch log, the current application status should be `PENDING`. ``` :2025-03-21 17:36:19.350 INFO [KyuubiSessionManager-exec-pool: Thread-176922] org.apache.kyuubi.operation.BatchJobSubmission: Batch report for bbba09c8-3704-4a87-8394-9bcbbd39cc34, Some(ApplicationInfo(application_1741747369441_2258235,6042072c-e8fa-425d-a6a3-3d5bbb4ec1e3-275732_6042072c-e8fa-425d-a6a3-3d5bbb4ec1e3-275732.e3a34b86-7fc7-43ea-b4a5-1b6f27df54b5.0_20250322002147.stm,PENDING,Some(https://apollo-rno-rm-2.vip.hadoop.ebay.com:50030/proxy/application_1741747369441_2258235/),Some())) ``` So, we should retrieve the batch application info after the submission process terminated before checking the application failed, to get the current application information to prevent the corner case: 1. the application submission time exceeds the `kyuubi.engine.yarn.submit.timeout` and the app state is NOT FOUND 2. can not get the application report before the submission process terminated 3. then the batch state to ERROR from PENDING directly. Conclusion: The application state transition was: UNKNOWN(before submit timeout) -> NOT_FOUND(reach submit timeout) -> processExit -> batchOpError -> PENDING(updateApplicationInfoMetadataIfNeeded) -> UNKNOWN(batchError but app not terminated) After this PR, it should be: UNKNOWN(before submit timeout) -> NOT_FOUND(reach submit timeout) -> processExit-> PENDING(after process terminated) -> .... ### How was this patch tested? Existing GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #6997 from turboFei/app_not_found_v2. Closes #6997 370cf49e9 [Wang, Fei] v2 912ec28ca [Wang, Fei] nit 3c376f922 [Wang, Fei] log the op ex d9cbdb87d [Wang, Fei] fix app not found Authored-by: Wang, Fei Signed-off-by: Wang, Fei (cherry picked from commit 196b47e32a527c4b4d6d296a28b8c6fc7d56a1be) Signed-off-by: Wang, Fei --- .../kyuubi/operation/AbstractOperation.scala | 1 + .../kyuubi/operation/BatchJobSubmission.scala | 76 +++++++++++-------- 2 files changed, 44 insertions(+), 33 deletions(-) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala index fdcc0c340..bd197f1a5 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala @@ -113,6 +113,7 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin protected def setOperationException(opEx: KyuubiSQLException): Unit = { this.operationException = opEx + withOperationLog(error(s"Error operating $opType: ${opEx.getMessage}", opEx)) } def getOperationJobProgress: TProgressUpdateResp = operationJobProgress diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index aa17c5436..9dfc3b04e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -165,6 +165,8 @@ class BatchJobSubmission( opState: OperationState, appState: ApplicationState.ApplicationState): ApplicationState.ApplicationState = { if (opState == OperationState.ERROR && !ApplicationState.isTerminated(appState)) { + withOperationLog(error(s"Batch $batchId state is $opState," + + s" but the application state is $appState and not terminated, set to UNKNOWN.")) ApplicationState.UNKNOWN } else { appState @@ -250,50 +252,58 @@ class BatchJobSubmission( private def submitAndMonitorBatchJob(): Unit = { var appStatusFirstUpdated = false var lastStarvationCheckTime = createTime + + def doUpdateApplicationInfoMetadataIfNeeded(): Unit = { + updateApplicationInfoMetadataIfNeeded() + if (!appStatusFirstUpdated) { + // only the ApplicationInfo with non-empty id indicates that batch is RUNNING + if (applicationId(_applicationInfo).isDefined) { + setStateIfNotCanceled(OperationState.RUNNING) + updateBatchMetadata() + appStatusFirstUpdated = true + } else { + val currentTime = System.currentTimeMillis() + if (currentTime - lastStarvationCheckTime > applicationStarvationTimeout) { + lastStarvationCheckTime = currentTime + warn(s"Batch[$batchId] has not started, check the Kyuubi server to ensure" + + s" that batch jobs can be submitted.") + } + } + } + } + try { info(s"Submitting $batchType batch[$batchId] job:\n$builder") val process = builder.start - while (!applicationFailed(_applicationInfo) && process.isAlive) { - updateApplicationInfoMetadataIfNeeded() - if (!appStatusFirstUpdated) { - // only the ApplicationInfo with non-empty id indicates that batch is RUNNING - if (applicationId(_applicationInfo).isDefined) { - setStateIfNotCanceled(OperationState.RUNNING) - updateBatchMetadata() - appStatusFirstUpdated = true - } else { - val currentTime = System.currentTimeMillis() - if (currentTime - lastStarvationCheckTime > applicationStarvationTimeout) { - lastStarvationCheckTime = currentTime - warn(s"Batch[$batchId] has not started, check the Kyuubi server to ensure" + - s" that batch jobs can be submitted.") - } - } - } + while (process.isAlive && !applicationFailed(_applicationInfo)) { + doUpdateApplicationInfoMetadataIfNeeded() process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS) } + if (!process.isAlive) { + doUpdateApplicationInfoMetadataIfNeeded() + } + if (applicationFailed(_applicationInfo)) { Utils.terminateProcess(process, applicationStartupDestroyTimeout) throw new KyuubiException(s"Batch job failed: ${_applicationInfo}") - } else { - process.waitFor() - if (process.exitValue() != 0) { - throw new KyuubiException(s"Process exit with value ${process.exitValue()}") - } + } - while (!appStarted && applicationId(_applicationInfo).isEmpty && - !applicationTerminated(_applicationInfo)) { - Thread.sleep(applicationCheckInterval) - updateApplicationInfoMetadataIfNeeded() - } + if (process.waitFor() != 0) { + throw new KyuubiException(s"Process exit with value ${process.exitValue}") + } - applicationId(_applicationInfo) match { - case Some(appId) => monitorBatchJob(appId) - case None if !appStarted => - throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}") - case None => - } + while (!appStarted && applicationId(_applicationInfo).isEmpty && + !applicationTerminated(_applicationInfo)) { + Thread.sleep(applicationCheckInterval) + doUpdateApplicationInfoMetadataIfNeeded() + } + + applicationId(_applicationInfo) match { + case Some(appId) => monitorBatchJob(appId) + case None if !appStarted => + throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}") + case None => } } finally { val waitCompletion = batchConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION.key)