[KYUUBI #6997] Get the latest batch app info after submit process terminated to prevent batch ERROR due to engine submit timeout

### Why are the changes needed?

We meet below issue:
For spark on yarn:
```
spark.yarn.submit.waitAppCompletion=false
kyuubi.engine.yarn.submit.timeout=PT10M
```

Due to network issue, the application submission was very slow.

It was submitted after 15 minutes.
<img width="1430" alt="image" src="https://github.com/user-attachments/assets/a326c3d1-4d39-42da-b6aa-cad5f8e7fc4b" />

<img width="1350" alt="image" src="https://github.com/user-attachments/assets/8e20056a-bd71-4515-a5e3-f881509a34b2" />

Then the batch failed from PENDING state to ERRO state directly, due to application state NOT_FOUND(exceeds the kyuubi.engine.yarn.submit.timeout).

a54ee39ab3/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala (L99-L106)

<img width="1727" alt="image" src="https://github.com/user-attachments/assets/20a2987c-675c-4136-a107-001f30b1b217" />

Here is the operation event:
<img width="1727" alt="image" src="https://github.com/user-attachments/assets/e2bab9c3-a959-4e2b-a207-813ae6489b30" />

But from the batch log, the current application status should be `PENDING`.
```
:2025-03-21 17:36:19.350 INFO [KyuubiSessionManager-exec-pool: Thread-176922] org.apache.kyuubi.operation.BatchJobSubmission: Batch report for bbba09c8-3704-4a87-8394-9bcbbd39cc34, Some(ApplicationInfo(application_1741747369441_2258235,6042072c-e8fa-425d-a6a3-3d5bbb4ec1e3-275732_6042072c-e8fa-425d-a6a3-3d5bbb4ec1e3-275732.e3a34b86-7fc7-43ea-b4a5-1b6f27df54b5.0_20250322002147.stm,PENDING,Some(https://apollo-rno-rm-2.vip.hadoop.ebay.com:50030/proxy/application_1741747369441_2258235/),Some()))
```

So, we should retrieve the batch application info after the submission process terminated before checking the application failed, to get the current application information to prevent the corner case:
1. the application submission time exceeds the `kyuubi.engine.yarn.submit.timeout` and the app state is NOT FOUND
2. can not get the application report before the submission process terminated
3. then the batch state to ERROR from PENDING directly.

Conclusion:

The application state transition was:

UNKNOWN(before submit timeout) -> NOT_FOUND(reach submit timeout) -> processExit -> batchOpError -> PENDING(updateApplicationInfoMetadataIfNeeded) -> UNKNOWN(batchError but app not terminated)

After this PR, it should be:

UNKNOWN(before submit timeout) -> NOT_FOUND(reach submit timeout) ->  processExit-> PENDING(after process terminated) -> ....

### How was this patch tested?

Existing GA.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #6997 from turboFei/app_not_found_v2.

Closes #6997

370cf49e9 [Wang, Fei] v2
912ec28ca [Wang, Fei] nit
3c376f922 [Wang, Fei] log the op ex
d9cbdb87d [Wang, Fei] fix app not found

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Wang, Fei 2025-03-24 12:53:22 -07:00
parent 2080c2186c
commit 196b47e32a
2 changed files with 44 additions and 33 deletions

View File

@ -124,6 +124,7 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
protected def setOperationException(opEx: KyuubiSQLException): Unit = {
this.operationException = opEx
withOperationLog(error(s"Error operating $opType: ${opEx.getMessage}", opEx))
}
def getOperationJobProgress: TProgressUpdateResp = operationJobProgress

View File

@ -165,6 +165,8 @@ class BatchJobSubmission(
opState: OperationState,
appState: ApplicationState.ApplicationState): ApplicationState.ApplicationState = {
if (opState == OperationState.ERROR && !ApplicationState.isTerminated(appState)) {
withOperationLog(error(s"Batch $batchId state is $opState," +
s" but the application state is $appState and not terminated, set to UNKNOWN."))
ApplicationState.UNKNOWN
} else {
appState
@ -250,50 +252,58 @@ class BatchJobSubmission(
private def submitAndMonitorBatchJob(): Unit = {
var appStatusFirstUpdated = false
var lastStarvationCheckTime = createTime
def doUpdateApplicationInfoMetadataIfNeeded(): Unit = {
updateApplicationInfoMetadataIfNeeded()
if (!appStatusFirstUpdated) {
// only the ApplicationInfo with non-empty id indicates that batch is RUNNING
if (applicationId(_applicationInfo).isDefined) {
setStateIfNotCanceled(OperationState.RUNNING)
updateBatchMetadata()
appStatusFirstUpdated = true
} else {
val currentTime = System.currentTimeMillis()
if (currentTime - lastStarvationCheckTime > applicationStarvationTimeout) {
lastStarvationCheckTime = currentTime
warn(s"Batch[$batchId] has not started, check the Kyuubi server to ensure" +
s" that batch jobs can be submitted.")
}
}
}
}
try {
info(s"Submitting $batchType batch[$batchId] job:\n$builder")
val process = builder.start
while (!applicationFailed(_applicationInfo) && process.isAlive) {
updateApplicationInfoMetadataIfNeeded()
if (!appStatusFirstUpdated) {
// only the ApplicationInfo with non-empty id indicates that batch is RUNNING
if (applicationId(_applicationInfo).isDefined) {
setStateIfNotCanceled(OperationState.RUNNING)
updateBatchMetadata()
appStatusFirstUpdated = true
} else {
val currentTime = System.currentTimeMillis()
if (currentTime - lastStarvationCheckTime > applicationStarvationTimeout) {
lastStarvationCheckTime = currentTime
warn(s"Batch[$batchId] has not started, check the Kyuubi server to ensure" +
s" that batch jobs can be submitted.")
}
}
}
while (process.isAlive && !applicationFailed(_applicationInfo)) {
doUpdateApplicationInfoMetadataIfNeeded()
process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS)
}
if (!process.isAlive) {
doUpdateApplicationInfoMetadataIfNeeded()
}
if (applicationFailed(_applicationInfo)) {
Utils.terminateProcess(process, applicationStartupDestroyTimeout)
throw new KyuubiException(s"Batch job failed: ${_applicationInfo}")
} else {
process.waitFor()
if (process.exitValue() != 0) {
throw new KyuubiException(s"Process exit with value ${process.exitValue()}")
}
}
while (!appStarted && applicationId(_applicationInfo).isEmpty &&
!applicationTerminated(_applicationInfo)) {
Thread.sleep(applicationCheckInterval)
updateApplicationInfoMetadataIfNeeded()
}
if (process.waitFor() != 0) {
throw new KyuubiException(s"Process exit with value ${process.exitValue}")
}
applicationId(_applicationInfo) match {
case Some(appId) => monitorBatchJob(appId)
case None if !appStarted =>
throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
case None =>
}
while (!appStarted && applicationId(_applicationInfo).isEmpty &&
!applicationTerminated(_applicationInfo)) {
Thread.sleep(applicationCheckInterval)
doUpdateApplicationInfoMetadataIfNeeded()
}
applicationId(_applicationInfo) match {
case Some(appId) => monitorBatchJob(appId)
case None if !appStarted =>
throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
case None =>
}
} finally {
val waitCompletion = batchConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION.key)