diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala index 05dd7fda9..4d5db5d4e 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala @@ -103,6 +103,7 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin protected def setOperationException(opEx: KyuubiSQLException): Unit = { this.operationException = opEx + withOperationLog(error(s"Error operating $opType: ${opEx.getMessage}", opEx)) } def getOperationJobProgress: TProgressUpdateResp = operationJobProgress diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index 771ff3b4c..f573dc0d2 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -163,6 +163,8 @@ class BatchJobSubmission( opState: OperationState, appState: ApplicationState.ApplicationState): ApplicationState.ApplicationState = { if (opState == OperationState.ERROR && !ApplicationState.isTerminated(appState)) { + withOperationLog(error(s"Batch $batchId state is $opState," + + s" but the application state is $appState and not terminated, set to UNKNOWN.")) ApplicationState.UNKNOWN } else { appState @@ -240,50 +242,58 @@ class BatchJobSubmission( private def submitAndMonitorBatchJob(): Unit = { var appStatusFirstUpdated = false var lastStarvationCheckTime = createTime + + def doUpdateApplicationInfoMetadataIfNeeded(): Unit = { + updateApplicationInfoMetadataIfNeeded() + if (!appStatusFirstUpdated) { + // only the ApplicationInfo with non-empty id indicates that batch is RUNNING + if (applicationId(_applicationInfo).isDefined) { + setStateIfNotCanceled(OperationState.RUNNING) + updateBatchMetadata() + appStatusFirstUpdated = true + } else { + val currentTime = System.currentTimeMillis() + if (currentTime - lastStarvationCheckTime > applicationStarvationTimeout) { + lastStarvationCheckTime = currentTime + warn(s"Batch[$batchId] has not started, check the Kyuubi server to ensure" + + s" that batch jobs can be submitted.") + } + } + } + } + try { info(s"Submitting $batchType batch[$batchId] job:\n$builder") val process = builder.start - while (!applicationFailed(_applicationInfo) && process.isAlive) { - updateApplicationInfoMetadataIfNeeded() - if (!appStatusFirstUpdated) { - // only the ApplicationInfo with non-empty id indicates that batch is RUNNING - if (applicationId(_applicationInfo).isDefined) { - setStateIfNotCanceled(OperationState.RUNNING) - updateBatchMetadata() - appStatusFirstUpdated = true - } else { - val currentTime = System.currentTimeMillis() - if (currentTime - lastStarvationCheckTime > applicationStarvationTimeout) { - lastStarvationCheckTime = currentTime - warn(s"Batch[$batchId] has not started, check the Kyuubi server to ensure" + - s" that batch jobs can be submitted.") - } - } - } + while (process.isAlive && !applicationFailed(_applicationInfo)) { + doUpdateApplicationInfoMetadataIfNeeded() process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS) } + if (!process.isAlive) { + doUpdateApplicationInfoMetadataIfNeeded() + } + if (applicationFailed(_applicationInfo)) { Utils.terminateProcess(process, applicationStartupDestroyTimeout) throw new KyuubiException(s"Batch job failed: ${_applicationInfo}") - } else { - process.waitFor() - if (process.exitValue() != 0) { - throw new KyuubiException(s"Process exit with value ${process.exitValue()}") - } + } - while (!appStarted && applicationId(_applicationInfo).isEmpty && - !applicationTerminated(_applicationInfo)) { - Thread.sleep(applicationCheckInterval) - updateApplicationInfoMetadataIfNeeded() - } + if (process.waitFor() != 0) { + throw new KyuubiException(s"Process exit with value ${process.exitValue}") + } - applicationId(_applicationInfo) match { - case Some(appId) => monitorBatchJob(appId) - case None if !appStarted => - throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}") - case None => - } + while (!appStarted && applicationId(_applicationInfo).isEmpty && + !applicationTerminated(_applicationInfo)) { + Thread.sleep(applicationCheckInterval) + doUpdateApplicationInfoMetadataIfNeeded() + } + + applicationId(_applicationInfo) match { + case Some(appId) => monitorBatchJob(appId) + case None if !appStarted => + throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}") + case None => } } finally { val waitCompletion = batchConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION.key)