[KYUUBI #4155] Reduce the application info call for batch

### _Why are the changes needed?_

Reduce the application info call for batch.
- If terminated and applicationInfo is defined, return applicationInfo directly.
- For batch report, return the existing applicationInfo directly.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4155 from turboFei/terminate_state.

Closes #4155

9d7e16121 [fwang12] comment
a0d70a633 [fwang12] Fix style
d9814c5b4 [fwang12] get or fetch
e547ff071 [fwang12] refine the variable
f9130e30e [fwang12] refactor code
5913d2419 [fwang12] fix ut
3b2772672 [fwang12] reduce app info call
a001dd9c4 [fwang12] do not call yarn for batch report
beaa54b32 [fwang12] if terminated, do not call

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
fwang12 2023-01-16 07:44:15 +08:00
parent 2613a1ba74
commit b420243a1f
6 changed files with 42 additions and 35 deletions

View File

@ -206,7 +206,7 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
val batchJobSubmissionOp = session.batchJobSubmissionOp
eventually(timeout(3.minutes), interval(50.milliseconds)) {
val appInfo = batchJobSubmissionOp.currentApplicationInfo
val appInfo = batchJobSubmissionOp.getOrFetchCurrentApplicationInfo
assert(appInfo.nonEmpty)
assert(appInfo.exists(_.state == RUNNING))
assert(appInfo.exists(_.name.startsWith(driverPodNamePrefix)))

View File

@ -32,7 +32,7 @@ import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationState.{CANCELED, OperationState, RUNNING}
import org.apache.kyuubi.operation.OperationState.{isTerminal, CANCELED, OperationState, RUNNING}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.server.metadata.api.Metadata
import org.apache.kyuubi.session.KyuubiBatchSessionImpl
@ -69,7 +69,11 @@ class BatchJobSubmission(
private[kyuubi] val batchId: String = session.handle.identifier.toString
private var applicationInfo: Option[ApplicationInfo] = None
@volatile private var _applicationInfo: Option[ApplicationInfo] = None
def getOrFetchCurrentApplicationInfo: Option[ApplicationInfo] = _applicationInfo match {
case Some(_) => _applicationInfo
case None => currentApplicationInfo
}
private var killMessage: KillResponse = (false, "UNKNOWN")
def getKillMessage: KillResponse = killMessage
@ -97,7 +101,8 @@ class BatchJobSubmission(
}
}
override private[kyuubi] def currentApplicationInfo: Option[ApplicationInfo] = {
override protected def currentApplicationInfo: Option[ApplicationInfo] = {
if (isTerminal(state) && _applicationInfo.nonEmpty) return _applicationInfo
// only the ApplicationInfo with non-empty id is valid for the operation
val applicationInfo =
applicationManager.getApplicationInfo(builder.clusterManager(), batchId).filter(_.id != null)
@ -127,13 +132,13 @@ class BatchJobSubmission(
}
if (isTerminalState(state)) {
if (applicationInfo.isEmpty) {
applicationInfo =
if (_applicationInfo.isEmpty) {
_applicationInfo =
Option(ApplicationInfo(id = null, name = null, state = ApplicationState.NOT_FOUND))
}
}
applicationInfo.foreach { status =>
_applicationInfo.foreach { status =>
val metadataToUpdate = Metadata(
identifier = batchId,
state = state.toString,
@ -154,7 +159,7 @@ class BatchJobSubmission(
private def setStateIfNotCanceled(newState: OperationState): Unit = state.synchronized {
if (state != CANCELED) {
setState(newState)
applicationInfo.filter(_.id != null).foreach { ai =>
_applicationInfo.filter(_.id != null).foreach { ai =>
session.getSessionEvent.foreach(_.engineId = ai.id)
}
if (newState == RUNNING) {
@ -184,8 +189,8 @@ class BatchJobSubmission(
// submitted batch application.
recoveryMetadata.map { metadata =>
if (metadata.state == OperationState.PENDING.toString) {
applicationInfo = currentApplicationInfo
applicationInfo.map(_.id) match {
_applicationInfo = currentApplicationInfo
_applicationInfo.map(_.id) match {
case Some(null) =>
submitAndMonitorBatchJob()
case Some(appId) =>
@ -226,10 +231,10 @@ class BatchJobSubmission(
try {
info(s"Submitting $batchType batch[$batchId] job:\n$builder")
val process = builder.start
applicationInfo = currentApplicationInfo
while (!applicationFailed(applicationInfo) && process.isAlive) {
_applicationInfo = currentApplicationInfo
while (!applicationFailed(_applicationInfo) && process.isAlive) {
if (!appStatusFirstUpdated) {
if (applicationInfo.isDefined) {
if (_applicationInfo.isDefined) {
setStateIfNotCanceled(OperationState.RUNNING)
updateBatchMetadata()
appStatusFirstUpdated = true
@ -243,19 +248,19 @@ class BatchJobSubmission(
}
}
process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS)
applicationInfo = currentApplicationInfo
_applicationInfo = currentApplicationInfo
}
if (applicationFailed(applicationInfo)) {
if (applicationFailed(_applicationInfo)) {
process.destroyForcibly()
throw new RuntimeException(s"Batch job failed: $applicationInfo")
throw new RuntimeException(s"Batch job failed: ${_applicationInfo}")
} else {
process.waitFor()
if (process.exitValue() != 0) {
throw new KyuubiException(s"Process exit with value ${process.exitValue()}")
}
Option(applicationInfo.map(_.id)).foreach {
Option(_applicationInfo.map(_.id)).foreach {
case Some(appId) => monitorBatchJob(appId)
case _ =>
}
@ -267,30 +272,30 @@ class BatchJobSubmission(
private def monitorBatchJob(appId: String): Unit = {
info(s"Monitoring submitted $batchType batch[$batchId] job: $appId")
if (applicationInfo.isEmpty) {
applicationInfo = currentApplicationInfo
if (_applicationInfo.isEmpty) {
_applicationInfo = currentApplicationInfo
}
if (state == OperationState.PENDING) {
setStateIfNotCanceled(OperationState.RUNNING)
}
if (applicationInfo.isEmpty) {
if (_applicationInfo.isEmpty) {
info(s"The $batchType batch[$batchId] job: $appId not found, assume that it has finished.")
} else if (applicationFailed(applicationInfo)) {
throw new RuntimeException(s"$batchType batch[$batchId] job failed: $applicationInfo")
} else if (applicationFailed(_applicationInfo)) {
throw new RuntimeException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
} else {
updateBatchMetadata()
// TODO: add limit for max batch job submission lifetime
while (applicationInfo.isDefined && !applicationTerminated(applicationInfo)) {
while (_applicationInfo.isDefined && !applicationTerminated(_applicationInfo)) {
Thread.sleep(applicationCheckInterval)
val newApplicationStatus = currentApplicationInfo
if (newApplicationStatus.map(_.state) != applicationInfo.map(_.state)) {
applicationInfo = newApplicationStatus
info(s"Batch report for $batchId, $applicationInfo")
if (newApplicationStatus.map(_.state) != _applicationInfo.map(_.state)) {
_applicationInfo = newApplicationStatus
info(s"Batch report for $batchId, ${_applicationInfo}")
}
}
if (applicationFailed(applicationInfo)) {
throw new RuntimeException(s"$batchType batch[$batchId] job failed: $applicationInfo")
if (applicationFailed(_applicationInfo)) {
throw new RuntimeException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
}
}
}

View File

@ -31,7 +31,7 @@ import org.apache.kyuubi.util.ThriftUtils
abstract class KyuubiApplicationOperation(session: Session) extends KyuubiOperation(session) {
private[kyuubi] def currentApplicationInfo: Option[ApplicationInfo]
protected def currentApplicationInfo: Option[ApplicationInfo]
override def getResultSetMetadata: TGetResultSetMetadataResp = {
val schema = new TTableSchema()

View File

@ -33,7 +33,7 @@ class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Bool
}
override def getOperationLog: Option[OperationLog] = Option(_operationLog)
override private[kyuubi] def currentApplicationInfo: Option[ApplicationInfo] = {
override protected def currentApplicationInfo: Option[ApplicationInfo] = {
Option(client).map { cli =>
ApplicationInfo(
cli.engineId.orNull,

View File

@ -68,7 +68,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
private def buildBatch(session: KyuubiBatchSessionImpl): Batch = {
val batchOp = session.batchJobSubmissionOp
val batchOpStatus = batchOp.getStatus
val batchAppStatus = batchOp.currentApplicationInfo
val batchAppStatus = batchOp.getOrFetchCurrentApplicationInfo
val name = Option(batchOp.batchName).getOrElse(batchAppStatus.map(_.name).orNull)
var appId: String = null

View File

@ -23,8 +23,8 @@ import scala.concurrent.duration._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols.FrontendProtocol
import org.apache.kyuubi.engine.{ApplicationState, YarnApplicationOperation}
import org.apache.kyuubi.engine.ApplicationState._
import org.apache.kyuubi.engine.YarnApplicationOperation
import org.apache.kyuubi.operation.{FetchOrientation, HiveJDBCTestHelper, OperationState}
import org.apache.kyuubi.operation.OperationState.ERROR
import org.apache.kyuubi.server.MiniYarnService
@ -117,7 +117,7 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
val batchJobSubmissionOp = session.batchJobSubmissionOp
eventually(timeout(3.minutes), interval(50.milliseconds)) {
val appInfo = batchJobSubmissionOp.currentApplicationInfo
val appInfo = batchJobSubmissionOp.getOrFetchCurrentApplicationInfo
assert(appInfo.nonEmpty)
assert(appInfo.exists(_.id.startsWith("application_")))
}
@ -152,7 +152,7 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
val appUrl = rows("url")
val appError = rows("error")
val appInfo2 = batchJobSubmissionOp.currentApplicationInfo.get
val appInfo2 = batchJobSubmissionOp.getOrFetchCurrentApplicationInfo.get
assert(appId === appInfo2.id)
assert(appName === appInfo2.name)
assert(appState === appInfo2.state.toString)
@ -175,7 +175,9 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
val batchJobSubmissionOp = session.batchJobSubmissionOp
eventually(timeout(3.minutes), interval(50.milliseconds)) {
assert(batchJobSubmissionOp.currentApplicationInfo.isEmpty)
assert(batchJobSubmissionOp.getOrFetchCurrentApplicationInfo.exists(_.id == null))
assert(batchJobSubmissionOp.getOrFetchCurrentApplicationInfo.exists(
_.state == ApplicationState.NOT_FOUND))
assert(batchJobSubmissionOp.getStatus.state === OperationState.ERROR)
}
}