[KYUUBI #5243] Distinguish metadata between batch impl v2 and recovery

### _Why are the changes needed?_

The `recoveryMetadata` is not accurate after batch impl is introduced. This PR proposes to rename `recoveryMetadata` to `metadata` and introduce a dedicated flay `fromRecovery` to distinguish metadata between them.

This PR also partially reverts #4798, by removing unnecessary constructor parameters `shouldRunAsync` and `batchConf`

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

No.

Closes #5243 from pan3793/meta-recov.

Closes #5243

0718fbefe [Cheng Pan] nit
b8358464c [Cheng Pan] simplify
a2d6519c6 [Cheng Pan] fix test
2dad868bd [Cheng Pan] refactor
f83d2a602 [Cheng Pan] Distinguish batch impl v2 metadata from recovery

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Cheng Pan 2023-09-06 02:51:43 +08:00
parent c3b7af0b54
commit 6a23f88b00
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
11 changed files with 101 additions and 104 deletions

View File

@ -19,7 +19,6 @@ package org.apache.kyuubi.kubernetes.test.spark
import java.util.UUID
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import org.apache.hadoop.conf.Configuration
@ -149,7 +148,6 @@ class KyuubiOperationKubernetesClusterClientModeSuite
"kyuubi",
"passwd",
"localhost",
batchRequest.getConf.asScala.toMap,
batchRequest)
eventually(timeout(3.minutes), interval(50.milliseconds)) {
@ -217,7 +215,6 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
"runner",
"passwd",
"localhost",
batchRequest.getConf.asScala.toMap,
batchRequest)
// wait for driver pod start

View File

@ -58,11 +58,12 @@ class BatchJobSubmission(
className: String,
batchConf: Map[String, String],
batchArgs: Seq[String],
recoveryMetadata: Option[Metadata],
override val shouldRunAsync: Boolean)
metadata: Option[Metadata])
extends KyuubiApplicationOperation(session) {
import BatchJobSubmission._
override def shouldRunAsync: Boolean = true
private val _operationLog = OperationLog.createOperationLog(session, getHandle)
private val applicationManager = session.sessionManager.applicationManager
@ -75,7 +76,7 @@ class BatchJobSubmission(
private var killMessage: KillResponse = (false, "UNKNOWN")
def getKillMessage: KillResponse = killMessage
@volatile private var _appStartTime = recoveryMetadata.map(_.engineOpenTime).getOrElse(0L)
@volatile private var _appStartTime = metadata.map(_.engineOpenTime).getOrElse(0L)
def appStartTime: Long = _appStartTime
def appStarted: Boolean = _appStartTime > 0
@ -184,21 +185,24 @@ class BatchJobSubmission(
override protected def runInternal(): Unit = session.handleSessionException {
val asyncOperation: Runnable = () => {
try {
recoveryMetadata match {
metadata match {
case Some(metadata) if metadata.peerInstanceClosed =>
setState(OperationState.CANCELED)
case Some(metadata) if metadata.state == OperationState.PENDING.toString =>
// In recovery mode, only submit batch job when previous state is PENDING
// and fail to fetch the status including appId from resource manager.
// Otherwise, monitor the submitted batch application.
// case 1: new batch job created using batch impl v2
// case 2: batch job from recovery, do submission only when previous state is
// PENDING and fail to fetch the status by appId from resource manager, which
// is similar with case 1; otherwise, monitor the submitted batch application.
_applicationInfo = currentApplicationInfo()
applicationId(_applicationInfo) match {
case Some(appId) => monitorBatchJob(appId)
case None => submitAndMonitorBatchJob()
case Some(appId) => monitorBatchJob(appId)
}
case Some(metadata) =>
// batch job from recovery which was submitted
monitorBatchJob(metadata.engineId)
case None =>
// brand-new job created using batch impl v1
submitAndMonitorBatchJob()
}
setStateIfNotCanceled(OperationState.FINISHED)
@ -219,7 +223,6 @@ class BatchJobSubmission(
updateBatchMetadata()
}
}
if (!shouldRunAsync) getBackgroundHandle.get()
}
private def submitAndMonitorBatchJob(): Unit = {
@ -295,19 +298,19 @@ class BatchJobSubmission(
}
if (_applicationInfo.isEmpty) {
info(s"The $batchType batch[$batchId] job: $appId not found, assume that it has finished.")
} else if (applicationFailed(_applicationInfo)) {
return
}
if (applicationFailed(_applicationInfo)) {
throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
}
updateBatchMetadata()
// TODO: add limit for max batch job submission lifetime
while (_applicationInfo.isDefined && !applicationTerminated(_applicationInfo)) {
Thread.sleep(applicationCheckInterval)
updateApplicationInfoMetadataIfNeeded()
}
if (applicationFailed(_applicationInfo)) {
throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
} else {
updateBatchMetadata()
// TODO: add limit for max batch job submission lifetime
while (_applicationInfo.isDefined && !applicationTerminated(_applicationInfo)) {
Thread.sleep(applicationCheckInterval)
updateApplicationInfoMetadataIfNeeded()
}
if (applicationFailed(_applicationInfo)) {
throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
}
}
}

View File

@ -81,8 +81,7 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
className: String,
batchConf: Map[String, String],
batchArgs: Seq[String],
recoveryMetadata: Option[Metadata],
shouldRunAsync: Boolean): BatchJobSubmission = {
metadata: Option[Metadata]): BatchJobSubmission = {
val operation = new BatchJobSubmission(
session,
batchType,
@ -91,8 +90,7 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
className,
batchConf,
batchArgs,
recoveryMetadata,
shouldRunAsync)
metadata)
addOperation(operation)
operation
}

View File

@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicBoolean
import org.apache.kyuubi.config.KyuubiConf.BATCH_SUBMITTER_THREADS
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.metadata.MetadataManager
import org.apache.kyuubi.server.metadata.api.Metadata
import org.apache.kyuubi.service.{AbstractService, Serverable}
import org.apache.kyuubi.session.KyuubiSessionManager
import org.apache.kyuubi.util.ThreadUtils
@ -81,16 +80,9 @@ class KyuubiBatchService(
Option(metadata.requestName),
metadata.resource,
metadata.className,
metadata.requestConf,
metadata.requestArgs,
Some(metadata), // TODO some logic need to fix since it's not from recovery
shouldRunAsync = true)
val metadataForUpdate = Metadata(
identifier = batchId,
kyuubiInstance = kyuubiInstance,
requestConf = batchSession.optimizedConf,
clusterManager = batchSession.batchJobSubmissionOp.builder.clusterManager())
metadataManager.updateMetadata(metadataForUpdate, asyncRetryOnError = false)
Some(metadata),
fromRecovery = false)
val sessionHandle = sessionManager.openBatchSession(batchSession)
var submitted = false
while (!submitted) { // block until batch job submitted
@ -113,7 +105,7 @@ class KyuubiBatchService(
// }
if (!submitted) Thread.sleep(1000)
}
info(s"$batchId is submitted.")
info(s"$batchId is submitted or finished.")
}
}
}

View File

@ -269,7 +269,6 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
userName,
"anonymous",
ipAddress,
request.getConf.asScala.toMap,
request)
} match {
case Success(sessionHandle) =>

View File

@ -41,10 +41,9 @@ class KyuubiBatchSession(
batchName: Option[String],
resource: String,
className: String,
batchConf: Map[String, String],
batchArgs: Seq[String],
recoveryMetadata: Option[Metadata] = None,
shouldRunAsync: Boolean)
metadata: Option[Metadata] = None,
fromRecovery: Boolean)
extends KyuubiSession(
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
user,
@ -55,11 +54,11 @@ class KyuubiBatchSession(
override val sessionType: SessionType = SessionType.BATCH
override val handle: SessionHandle = {
val batchId = recoveryMetadata.map(_.identifier).getOrElse(conf(KYUUBI_BATCH_ID_KEY))
val batchId = metadata.map(_.identifier).getOrElse(conf(KYUUBI_BATCH_ID_KEY))
SessionHandle.fromUUID(batchId)
}
override def createTime: Long = recoveryMetadata.map(_.createTime).getOrElse(super.createTime)
override def createTime: Long = metadata.map(_.createTime).getOrElse(super.createTime)
override def getNoOperationTime: Long = {
if (batchJobSubmissionOp != null && !OperationState.isTerminal(
@ -74,7 +73,7 @@ class KyuubiBatchSession(
sessionManager.getConf.get(KyuubiConf.BATCH_SESSION_IDLE_TIMEOUT)
override val normalizedConf: Map[String, String] =
sessionConf.getBatchConf(batchType) ++ sessionManager.validateBatchConf(batchConf)
sessionConf.getBatchConf(batchType) ++ sessionManager.validateBatchConf(conf)
val optimizedConf: Map[String, String] = {
val confOverlay = sessionManager.sessionConfAdvisor.getConfOverlay(
@ -95,7 +94,7 @@ class KyuubiBatchSession(
// whether the resource file is from uploading
private[kyuubi] val isResourceUploaded: Boolean =
batchConf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, "false").toBoolean
conf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, "false").toBoolean
private[kyuubi] lazy val batchJobSubmissionOp = sessionManager.operationManager
.newBatchJobSubmissionOperation(
@ -106,8 +105,7 @@ class KyuubiBatchSession(
className,
optimizedConf,
batchArgs,
recoveryMetadata,
shouldRunAsync)
metadata)
private def waitMetadataRequestsRetryCompletion(): Unit = {
val batchId = batchJobSubmissionOp.batchId
@ -122,7 +120,9 @@ class KyuubiBatchSession(
}
private val sessionEvent = KyuubiSessionEvent(this)
recoveryMetadata.foreach(metadata => sessionEvent.engineId = metadata.engineId)
if (fromRecovery) {
metadata.foreach { m => sessionEvent.engineId = m.engineId }
}
EventBus.post(sessionEvent)
override def getSessionEvent: Option[KyuubiSessionEvent] = {
@ -142,32 +142,47 @@ class KyuubiBatchSession(
override def open(): Unit = handleSessionException {
traceMetricsOnOpen()
if (recoveryMetadata.isEmpty) {
lazy val kubernetesInfo: Map[String, String] = {
val appMgrInfo = batchJobSubmissionOp.builder.appMgrInfo()
val kubernetesInfo = appMgrInfo.kubernetesInfo.context.map { context =>
appMgrInfo.kubernetesInfo.context.map { context =>
Map(KyuubiConf.KUBERNETES_CONTEXT.key -> context)
}.getOrElse(Map.empty) ++ appMgrInfo.kubernetesInfo.namespace.map { namespace =>
Map(KyuubiConf.KUBERNETES_NAMESPACE.key -> namespace)
}.getOrElse(Map.empty)
val metaData = Metadata(
identifier = handle.identifier.toString,
sessionType = sessionType,
realUser = realUser,
username = user,
ipAddress = ipAddress,
kyuubiInstance = connectionUrl,
state = OperationState.PENDING.toString,
resource = resource,
className = className,
requestName = name.orNull,
requestConf = optimizedConf ++ kubernetesInfo, // save the kubernetes info into request conf
requestArgs = batchArgs,
createTime = createTime,
engineType = batchType,
clusterManager = batchJobSubmissionOp.builder.clusterManager())
}
// there is a chance that operation failed w/ duplicated key error
sessionManager.insertMetadata(metaData)
(metadata, fromRecovery) match {
case (Some(initialMetadata), false) =>
// new batch job created using batch impl v2
val metadataToUpdate = Metadata(
identifier = initialMetadata.identifier,
kyuubiInstance = connectionUrl,
requestName = name.orNull,
requestConf = optimizedConf ++ kubernetesInfo, // save the kubernetes info
clusterManager = batchJobSubmissionOp.builder.clusterManager())
sessionManager.updateMetadata(metadataToUpdate)
case (None, _) =>
// new batch job created using batch impl v1
val newMetadata = Metadata(
identifier = handle.identifier.toString,
sessionType = sessionType,
realUser = realUser,
username = user,
ipAddress = ipAddress,
kyuubiInstance = connectionUrl,
state = OperationState.PENDING.toString,
resource = resource,
className = className,
requestName = name.orNull,
requestConf = optimizedConf ++ kubernetesInfo, // save the kubernetes info
requestArgs = batchArgs,
createTime = createTime,
engineType = batchType,
clusterManager = batchJobSubmissionOp.builder.clusterManager())
// there is a chance that operation failed w/ duplicated key error
sessionManager.insertMetadata(newMetadata)
case _ =>
}
checkSessionAccessPathURIs()

View File

@ -144,10 +144,9 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
batchName: Option[String],
resource: String,
className: String,
batchConf: Map[String, String],
batchArgs: Seq[String],
recoveryMetadata: Option[Metadata] = None,
shouldRunAsync: Boolean): KyuubiBatchSession = {
metadata: Option[Metadata] = None,
fromRecovery: Boolean): KyuubiBatchSession = {
// scalastyle:on
val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
val sessionConf = this.getConf.getUserDefaults(user)
@ -162,10 +161,9 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
batchName,
resource,
className,
batchConf,
batchArgs,
recoveryMetadata,
shouldRunAsync)
metadata,
fromRecovery)
}
private[kyuubi] def openBatchSession(batchSession: KyuubiBatchSession): SessionHandle = {
@ -202,22 +200,19 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
user: String,
password: String,
ipAddress: String,
conf: Map[String, String],
batchRequest: BatchRequest,
shouldRunAsync: Boolean = true): SessionHandle = {
batchRequest: BatchRequest): SessionHandle = {
val batchSession = createBatchSession(
user,
password,
ipAddress,
conf,
batchRequest.getConf.asScala.toMap,
batchRequest.getBatchType,
Option(batchRequest.getName),
batchRequest.getResource,
batchRequest.getClassName,
batchRequest.getConf.asScala.toMap,
batchRequest.getArgs.asScala.toSeq,
None,
shouldRunAsync)
fromRecovery = false)
openBatchSession(batchSession)
}
@ -313,10 +308,9 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
Option(metadata.requestName),
metadata.resource,
metadata.className,
metadata.requestConf,
metadata.requestArgs,
Some(metadata),
shouldRunAsync = true)
fromRecovery = true)
}).getOrElse(Seq.empty)
}
}

View File

@ -116,7 +116,6 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
"kyuubi",
"passwd",
"localhost",
batchRequest.getConf.asScala.toMap,
batchRequest)
val session = sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSession]
@ -180,7 +179,6 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
"kyuubi",
"passwd",
"localhost",
batchRequest.getConf.asScala.toMap,
batchRequest)
val session = sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSession]

View File

@ -135,13 +135,12 @@ class ServerJsonLoggingEventHandlerSuite extends WithKyuubiServer with HiveJDBCT
}
}
val batchRequest = newSparkBatchRequest()
val batchRequest = newSparkBatchRequest(Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))
val sessionMgr = server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
val batchSessionHandle = sessionMgr.openBatchSession(
Utils.currentUser,
"kyuubi",
"127.0.0.1",
Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
batchRequest)
withSessionConf()(Map.empty)(Map("spark.sql.shuffle.partitions" -> "2")) {
withJdbcStatement() { statement =>

View File

@ -358,12 +358,12 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite
"kyuubi",
"kyuubi",
InetAddress.getLocalHost.getCanonicalHostName,
Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
newBatchRequest(
"spark",
sparkBatchTestResource.get,
"",
""))
"",
Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
sessionManager.openSession(
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11,
"",
@ -380,22 +380,22 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite
"kyuubi",
"kyuubi",
InetAddress.getLocalHost.getCanonicalHostName,
Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
newBatchRequest(
"spark",
sparkBatchTestResource.get,
"",
""))
"",
Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
sessionManager.openBatchSession(
"kyuubi",
"kyuubi",
InetAddress.getLocalHost.getCanonicalHostName,
Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
newBatchRequest(
"spark",
sparkBatchTestResource.get,
"",
""))
"",
Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
val response2 = webTarget.path("api/v1/batches")
.queryParam("batchType", "spark")
@ -780,12 +780,14 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite
.be.sessionManager.asInstanceOf[KyuubiSessionManager]
val e = intercept[Exception] {
val conf = Map(
KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString,
"spark.jars" -> "disAllowPath")
sessionManager.openBatchSession(
"kyuubi",
"kyuubi",
InetAddress.getLocalHost.getCanonicalHostName,
Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
newSparkBatchRequest(Map("spark.jars" -> "disAllowPath")))
newSparkBatchRequest(conf))
}
val sessionHandleRegex = "\\[\\S*]".r
val batchId = sessionHandleRegex.findFirstMatchIn(e.getMessage).get.group(0)
@ -803,12 +805,12 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite
"kyuubi",
"kyuubi",
InetAddress.getLocalHost.getCanonicalHostName,
Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
newBatchRequest(
"spark",
sparkBatchTestResource.get,
"",
uniqueName))
uniqueName,
Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
val response = webTarget.path("api/v1/batches")
.queryParam("batchName", uniqueName)

View File

@ -290,12 +290,12 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat
"kyuubi",
"kyuubi",
InetAddress.getLocalHost.getCanonicalHostName,
Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
newBatchRequest(
"spark",
"",
"",
""))
"",
Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
sessionManager.openSession(
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11,
"",
@ -312,22 +312,22 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat
"kyuubi",
"kyuubi",
InetAddress.getLocalHost.getCanonicalHostName,
Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
newBatchRequest(
"spark",
"",
"",
""))
"",
Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
sessionManager.openBatchSession(
"kyuubi",
"kyuubi",
InetAddress.getLocalHost.getCanonicalHostName,
Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
newBatchRequest(
"spark",
"",
"",
""))
"",
Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
val listArgs = Array(
"list",