diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala index e63c37045..798618e4c 100644 --- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala +++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala @@ -206,7 +206,7 @@ class KyuubiOperationKubernetesClusterClusterModeSuite val batchJobSubmissionOp = session.batchJobSubmissionOp eventually(timeout(3.minutes), interval(50.milliseconds)) { - val appInfo = batchJobSubmissionOp.currentApplicationInfo + val appInfo = batchJobSubmissionOp.getOrFetchCurrentApplicationInfo assert(appInfo.nonEmpty) assert(appInfo.exists(_.state == RUNNING)) assert(appInfo.exists(_.name.startsWith(driverPodNamePrefix))) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index e99b3292c..c1bcf6cec 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -32,7 +32,7 @@ import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN import org.apache.kyuubi.metrics.MetricsSystem import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation -import org.apache.kyuubi.operation.OperationState.{CANCELED, OperationState, RUNNING} +import org.apache.kyuubi.operation.OperationState.{isTerminal, CANCELED, OperationState, RUNNING} import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.server.metadata.api.Metadata import org.apache.kyuubi.session.KyuubiBatchSessionImpl @@ -69,7 +69,11 @@ class BatchJobSubmission( private[kyuubi] val batchId: String = session.handle.identifier.toString - private var applicationInfo: Option[ApplicationInfo] = None + @volatile private var _applicationInfo: Option[ApplicationInfo] = None + def getOrFetchCurrentApplicationInfo: Option[ApplicationInfo] = _applicationInfo match { + case Some(_) => _applicationInfo + case None => currentApplicationInfo + } private var killMessage: KillResponse = (false, "UNKNOWN") def getKillMessage: KillResponse = killMessage @@ -97,7 +101,8 @@ class BatchJobSubmission( } } - override private[kyuubi] def currentApplicationInfo: Option[ApplicationInfo] = { + override protected def currentApplicationInfo: Option[ApplicationInfo] = { + if (isTerminal(state) && _applicationInfo.nonEmpty) return _applicationInfo // only the ApplicationInfo with non-empty id is valid for the operation val applicationInfo = applicationManager.getApplicationInfo(builder.clusterManager(), batchId).filter(_.id != null) @@ -127,13 +132,13 @@ class BatchJobSubmission( } if (isTerminalState(state)) { - if (applicationInfo.isEmpty) { - applicationInfo = + if (_applicationInfo.isEmpty) { + _applicationInfo = Option(ApplicationInfo(id = null, name = null, state = ApplicationState.NOT_FOUND)) } } - applicationInfo.foreach { status => + _applicationInfo.foreach { status => val metadataToUpdate = Metadata( identifier = batchId, state = state.toString, @@ -154,7 +159,7 @@ class BatchJobSubmission( private def setStateIfNotCanceled(newState: OperationState): Unit = state.synchronized { if (state != CANCELED) { setState(newState) - applicationInfo.filter(_.id != null).foreach { ai => + _applicationInfo.filter(_.id != null).foreach { ai => session.getSessionEvent.foreach(_.engineId = ai.id) } if (newState == RUNNING) { @@ -184,8 +189,8 @@ class BatchJobSubmission( // submitted batch application. recoveryMetadata.map { metadata => if (metadata.state == OperationState.PENDING.toString) { - applicationInfo = currentApplicationInfo - applicationInfo.map(_.id) match { + _applicationInfo = currentApplicationInfo + _applicationInfo.map(_.id) match { case Some(null) => submitAndMonitorBatchJob() case Some(appId) => @@ -226,10 +231,10 @@ class BatchJobSubmission( try { info(s"Submitting $batchType batch[$batchId] job:\n$builder") val process = builder.start - applicationInfo = currentApplicationInfo - while (!applicationFailed(applicationInfo) && process.isAlive) { + _applicationInfo = currentApplicationInfo + while (!applicationFailed(_applicationInfo) && process.isAlive) { if (!appStatusFirstUpdated) { - if (applicationInfo.isDefined) { + if (_applicationInfo.isDefined) { setStateIfNotCanceled(OperationState.RUNNING) updateBatchMetadata() appStatusFirstUpdated = true @@ -243,19 +248,19 @@ class BatchJobSubmission( } } process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS) - applicationInfo = currentApplicationInfo + _applicationInfo = currentApplicationInfo } - if (applicationFailed(applicationInfo)) { + if (applicationFailed(_applicationInfo)) { process.destroyForcibly() - throw new RuntimeException(s"Batch job failed: $applicationInfo") + throw new RuntimeException(s"Batch job failed: ${_applicationInfo}") } else { process.waitFor() if (process.exitValue() != 0) { throw new KyuubiException(s"Process exit with value ${process.exitValue()}") } - Option(applicationInfo.map(_.id)).foreach { + Option(_applicationInfo.map(_.id)).foreach { case Some(appId) => monitorBatchJob(appId) case _ => } @@ -267,30 +272,30 @@ class BatchJobSubmission( private def monitorBatchJob(appId: String): Unit = { info(s"Monitoring submitted $batchType batch[$batchId] job: $appId") - if (applicationInfo.isEmpty) { - applicationInfo = currentApplicationInfo + if (_applicationInfo.isEmpty) { + _applicationInfo = currentApplicationInfo } if (state == OperationState.PENDING) { setStateIfNotCanceled(OperationState.RUNNING) } - if (applicationInfo.isEmpty) { + if (_applicationInfo.isEmpty) { info(s"The $batchType batch[$batchId] job: $appId not found, assume that it has finished.") - } else if (applicationFailed(applicationInfo)) { - throw new RuntimeException(s"$batchType batch[$batchId] job failed: $applicationInfo") + } else if (applicationFailed(_applicationInfo)) { + throw new RuntimeException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}") } else { updateBatchMetadata() // TODO: add limit for max batch job submission lifetime - while (applicationInfo.isDefined && !applicationTerminated(applicationInfo)) { + while (_applicationInfo.isDefined && !applicationTerminated(_applicationInfo)) { Thread.sleep(applicationCheckInterval) val newApplicationStatus = currentApplicationInfo - if (newApplicationStatus.map(_.state) != applicationInfo.map(_.state)) { - applicationInfo = newApplicationStatus - info(s"Batch report for $batchId, $applicationInfo") + if (newApplicationStatus.map(_.state) != _applicationInfo.map(_.state)) { + _applicationInfo = newApplicationStatus + info(s"Batch report for $batchId, ${_applicationInfo}") } } - if (applicationFailed(applicationInfo)) { - throw new RuntimeException(s"$batchType batch[$batchId] job failed: $applicationInfo") + if (applicationFailed(_applicationInfo)) { + throw new RuntimeException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}") } } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala index cf10b2da4..b864f0101 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala @@ -31,7 +31,7 @@ import org.apache.kyuubi.util.ThriftUtils abstract class KyuubiApplicationOperation(session: Session) extends KyuubiOperation(session) { - private[kyuubi] def currentApplicationInfo: Option[ApplicationInfo] + protected def currentApplicationInfo: Option[ApplicationInfo] override def getResultSetMetadata: TGetResultSetMetadataResp = { val schema = new TTableSchema() diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala index 0444b92fd..3d9b4937f 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala @@ -33,7 +33,7 @@ class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Bool } override def getOperationLog: Option[OperationLog] = Option(_operationLog) - override private[kyuubi] def currentApplicationInfo: Option[ApplicationInfo] = { + override protected def currentApplicationInfo: Option[ApplicationInfo] = { Option(client).map { cli => ApplicationInfo( cli.engineId.orNull, diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index 487362d96..c00fb95f6 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -68,7 +68,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { private def buildBatch(session: KyuubiBatchSessionImpl): Batch = { val batchOp = session.batchJobSubmissionOp val batchOpStatus = batchOp.getStatus - val batchAppStatus = batchOp.currentApplicationInfo + val batchAppStatus = batchOp.getOrFetchCurrentApplicationInfo val name = Option(batchOp.batchName).getOrElse(batchAppStatus.map(_.name).orNull) var appId: String = null diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala index 727c5545e..27e769d29 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala @@ -23,8 +23,8 @@ import scala.concurrent.duration._ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols.FrontendProtocol +import org.apache.kyuubi.engine.{ApplicationState, YarnApplicationOperation} import org.apache.kyuubi.engine.ApplicationState._ -import org.apache.kyuubi.engine.YarnApplicationOperation import org.apache.kyuubi.operation.{FetchOrientation, HiveJDBCTestHelper, OperationState} import org.apache.kyuubi.operation.OperationState.ERROR import org.apache.kyuubi.server.MiniYarnService @@ -117,7 +117,7 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD val batchJobSubmissionOp = session.batchJobSubmissionOp eventually(timeout(3.minutes), interval(50.milliseconds)) { - val appInfo = batchJobSubmissionOp.currentApplicationInfo + val appInfo = batchJobSubmissionOp.getOrFetchCurrentApplicationInfo assert(appInfo.nonEmpty) assert(appInfo.exists(_.id.startsWith("application_"))) } @@ -152,7 +152,7 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD val appUrl = rows("url") val appError = rows("error") - val appInfo2 = batchJobSubmissionOp.currentApplicationInfo.get + val appInfo2 = batchJobSubmissionOp.getOrFetchCurrentApplicationInfo.get assert(appId === appInfo2.id) assert(appName === appInfo2.name) assert(appState === appInfo2.state.toString) @@ -175,7 +175,9 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD val batchJobSubmissionOp = session.batchJobSubmissionOp eventually(timeout(3.minutes), interval(50.milliseconds)) { - assert(batchJobSubmissionOp.currentApplicationInfo.isEmpty) + assert(batchJobSubmissionOp.getOrFetchCurrentApplicationInfo.exists(_.id == null)) + assert(batchJobSubmissionOp.getOrFetchCurrentApplicationInfo.exists( + _.state == ApplicationState.NOT_FOUND)) assert(batchJobSubmissionOp.getStatus.state === OperationState.ERROR) } }