[KYUUBI #4798] Allows BatchJobSubmission to run in sync mode

### _Why are the changes needed?_

Currently, BatchJobSubmission is only allowed to run in async mode, this PR makes the `shouldRunAsync` configurable and allows BatchJobSubmission to run in sync mode. (To minimize the change, in sync mode, the real submission and monitoring still happen on the exec pool, the BatchJobSubmission just blocks until the batch is finished)

This PR also refactors the constructor parameters of `KyuubiBatchSessionImpl`, and unwrapped the BatchRequest to make it fit the Batch V2 design.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4798 from pan3793/batch-sync.

Closes #4798

38eee2708 [Cheng Pan] Allows BatchJobSubmission run in sync mode

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Cheng Pan 2023-05-07 19:31:45 +08:00
parent 3108c8e1a5
commit ae3b81395c
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
4 changed files with 98 additions and 79 deletions

View File

@ -58,12 +58,11 @@ class BatchJobSubmission(
className: String,
batchConf: Map[String, String],
batchArgs: Seq[String],
recoveryMetadata: Option[Metadata])
recoveryMetadata: Option[Metadata],
override val shouldRunAsync: Boolean)
extends KyuubiApplicationOperation(session) {
import BatchJobSubmission._
override def shouldRunAsync: Boolean = true
private val _operationLog = OperationLog.createOperationLog(session, getHandle)
private val applicationManager = session.sessionManager.applicationManager
@ -131,17 +130,10 @@ class BatchJobSubmission(
session.sessionConf.get(KyuubiConf.BATCH_APPLICATION_STARVATION_TIMEOUT)
private def updateBatchMetadata(): Unit = {
val endTime =
if (isTerminalState(state)) {
lastAccessTime
} else {
0L
}
val endTime = if (isTerminalState(state)) lastAccessTime else 0L
if (isTerminalState(state)) {
if (_applicationInfo.isEmpty) {
_applicationInfo = Some(ApplicationInfo.NOT_FOUND)
}
if (isTerminalState(state) && _applicationInfo.isEmpty) {
_applicationInfo = Some(ApplicationInfo.NOT_FOUND)
}
_applicationInfo.foreach { appInfo =>
@ -187,27 +179,24 @@ class BatchJobSubmission(
override protected def runInternal(): Unit = session.handleSessionException {
val asyncOperation: Runnable = () => {
try {
if (recoveryMetadata.exists(_.peerInstanceClosed)) {
setState(OperationState.CANCELED)
} else {
// If it is in recovery mode, only re-submit batch job if previous state is PENDING and
// fail to fetch the status including appId from resource manager. Otherwise, monitor the
// submitted batch application.
recoveryMetadata.map { metadata =>
if (metadata.state == OperationState.PENDING.toString) {
_applicationInfo = currentApplicationInfo()
applicationId(_applicationInfo) match {
case Some(appId) => monitorBatchJob(appId)
case None => submitAndMonitorBatchJob()
}
} else {
monitorBatchJob(metadata.engineId)
recoveryMetadata match {
case Some(metadata) if metadata.peerInstanceClosed =>
setState(OperationState.CANCELED)
case Some(metadata) if metadata.state == OperationState.PENDING.toString =>
// In recovery mode, only submit batch job when previous state is PENDING
// and fail to fetch the status including appId from resource manager.
// Otherwise, monitor the submitted batch application.
_applicationInfo = currentApplicationInfo()
applicationId(_applicationInfo) match {
case Some(appId) => monitorBatchJob(appId)
case None => submitAndMonitorBatchJob()
}
}.getOrElse {
case Some(metadata) =>
monitorBatchJob(metadata.engineId)
case None =>
submitAndMonitorBatchJob()
}
setStateIfNotCanceled(OperationState.FINISHED)
}
setStateIfNotCanceled(OperationState.FINISHED)
} catch {
onError()
} finally {
@ -225,6 +214,7 @@ class BatchJobSubmission(
updateBatchMetadata()
}
}
if (!shouldRunAsync) getBackgroundHandle.get()
}
private def submitAndMonitorBatchJob(): Unit = {

View File

@ -81,7 +81,8 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
className: String,
batchConf: Map[String, String],
batchArgs: Seq[String],
recoveryMetadata: Option[Metadata]): BatchJobSubmission = {
recoveryMetadata: Option[Metadata],
shouldRunAsync: Boolean): BatchJobSubmission = {
val operation = new BatchJobSubmission(
session,
batchType,
@ -90,7 +91,8 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
className,
batchConf,
batchArgs,
recoveryMetadata)
recoveryMetadata,
shouldRunAsync)
addOperation(operation)
operation
}

View File

@ -21,7 +21,6 @@ import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.client.api.v1.dto.BatchRequest
import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.engine.KyuubiApplicationManager
@ -38,8 +37,14 @@ class KyuubiBatchSessionImpl(
conf: Map[String, String],
override val sessionManager: KyuubiSessionManager,
val sessionConf: KyuubiConf,
batchRequest: BatchRequest,
recoveryMetadata: Option[Metadata] = None)
batchType: String,
batchName: Option[String],
resource: String,
className: String,
batchConf: Map[String, String],
batchArgs: Seq[String],
recoveryMetadata: Option[Metadata] = None,
shouldRunAsync: Boolean)
extends KyuubiSession(
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
user,
@ -68,42 +73,41 @@ class KyuubiBatchSessionImpl(
override val sessionIdleTimeoutThreshold: Long =
sessionManager.getConf.get(KyuubiConf.BATCH_SESSION_IDLE_TIMEOUT)
override val normalizedConf: Map[String, String] = {
sessionConf.getBatchConf(batchRequest.getBatchType) ++
sessionManager.validateBatchConf(batchRequest.getConf.asScala.toMap)
}
override val normalizedConf: Map[String, String] =
sessionConf.getBatchConf(batchType) ++ sessionManager.validateBatchConf(batchConf)
private val optimizedConf: Map[String, String] = {
val optimizedConf: Map[String, String] = {
val confOverlay = sessionManager.sessionConfAdvisor.getConfOverlay(
user,
normalizedConf.asJava)
if (confOverlay != null) {
val overlayConf = new KyuubiConf(false)
confOverlay.asScala.foreach { case (k, v) => overlayConf.set(k, v) }
normalizedConf ++ overlayConf.getBatchConf(batchRequest.getBatchType)
normalizedConf ++ overlayConf.getBatchConf(batchType)
} else {
warn(s"the server plugin return null value for user: $user, ignore it")
normalizedConf
}
}
override lazy val name: Option[String] = Option(batchRequest.getName).orElse(
optimizedConf.get(KyuubiConf.SESSION_NAME.key))
override lazy val name: Option[String] =
batchName.filterNot(_.trim.isEmpty).orElse(optimizedConf.get(KyuubiConf.SESSION_NAME.key))
// whether the resource file is from uploading
private[kyuubi] val isResourceUploaded: Boolean = batchRequest.getConf
.getOrDefault(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, "false").toBoolean
private[kyuubi] val isResourceUploaded: Boolean =
batchConf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, "false").toBoolean
private[kyuubi] lazy val batchJobSubmissionOp = sessionManager.operationManager
.newBatchJobSubmissionOperation(
this,
batchRequest.getBatchType,
batchType,
name.orNull,
batchRequest.getResource,
batchRequest.getClassName,
resource,
className,
optimizedConf,
batchRequest.getArgs.asScala,
recoveryMetadata)
batchArgs,
recoveryMetadata,
shouldRunAsync)
private def waitMetadataRequestsRetryCompletion(): Unit = {
val batchId = batchJobSubmissionOp.batchId
@ -127,14 +131,11 @@ class KyuubiBatchSessionImpl(
override def checkSessionAccessPathURIs(): Unit = {
KyuubiApplicationManager.checkApplicationAccessPaths(
batchRequest.getBatchType,
batchType,
optimizedConf,
sessionManager.getConf)
if (batchRequest.getResource != SparkProcessBuilder.INTERNAL_RESOURCE
&& !isResourceUploaded) {
KyuubiApplicationManager.checkApplicationAccessPath(
batchRequest.getResource,
sessionManager.getConf)
if (resource != SparkProcessBuilder.INTERNAL_RESOURCE && !isResourceUploaded) {
KyuubiApplicationManager.checkApplicationAccessPath(resource, sessionManager.getConf)
}
}
@ -150,13 +151,13 @@ class KyuubiBatchSessionImpl(
ipAddress = ipAddress,
kyuubiInstance = connectionUrl,
state = OperationState.PENDING.toString,
resource = batchRequest.getResource,
className = batchRequest.getClassName,
resource = resource,
className = className,
requestName = name.orNull,
requestConf = optimizedConf,
requestArgs = batchRequest.getArgs.asScala,
requestArgs = batchArgs,
createTime = createTime,
engineType = batchRequest.getBatchType,
engineType = batchType,
clusterManager = batchJobSubmissionOp.builder.clusterManager())
// there is a chance that operation failed w/ duplicated key error

View File

@ -124,23 +124,38 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
}
}
private def createBatchSession(
// scalastyle:off
def createBatchSession(
user: String,
password: String,
ipAddress: String,
conf: Map[String, String],
batchRequest: BatchRequest,
recoveryMetadata: Option[Metadata] = None): KyuubiBatchSessionImpl = {
batchType: String,
batchName: Option[String],
resource: String,
className: String,
batchConf: Map[String, String],
batchArgs: Seq[String],
recoveryMetadata: Option[Metadata] = None,
shouldRunAsync: Boolean): KyuubiBatchSessionImpl = {
// scalastyle:on
val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
val sessionConf = this.getConf.getUserDefaults(user)
new KyuubiBatchSessionImpl(
username,
password,
ipAddress,
conf,
this,
this.getConf.getUserDefaults(user),
batchRequest,
recoveryMetadata)
sessionConf,
batchType,
batchName,
resource,
className,
batchConf,
batchArgs,
recoveryMetadata,
shouldRunAsync)
}
private[kyuubi] def openBatchSession(batchSession: KyuubiBatchSessionImpl): SessionHandle = {
@ -178,8 +193,21 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
password: String,
ipAddress: String,
conf: Map[String, String],
batchRequest: BatchRequest): SessionHandle = {
val batchSession = createBatchSession(user, password, ipAddress, conf, batchRequest)
batchRequest: BatchRequest,
shouldRunAsync: Boolean = true): SessionHandle = {
val batchSession = createBatchSession(
user,
password,
ipAddress,
conf,
batchRequest.getBatchType,
Option(batchRequest.getName),
batchRequest.getResource,
batchRequest.getClassName,
batchRequest.getConf.asScala.toMap,
batchRequest.getArgs.asScala,
None,
shouldRunAsync)
openBatchSession(batchSession)
}
@ -246,21 +274,19 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
kyuubiInstance,
0,
Int.MaxValue).map { metadata =>
val batchRequest = new BatchRequest(
metadata.engineType,
metadata.resource,
metadata.className,
metadata.requestName,
metadata.requestConf.asJava,
metadata.requestArgs.asJava)
createBatchSession(
metadata.username,
"anonymous",
metadata.ipAddress,
metadata.requestConf,
batchRequest,
Some(metadata))
metadata.engineType,
Option(metadata.requestName),
metadata.resource,
metadata.className,
metadata.requestConf,
metadata.requestArgs,
Some(metadata),
shouldRunAsync = true)
}).getOrElse(Seq.empty)
}
}