From 6a23f88b00e03b36f8f68c526128b166f866dc6f Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Wed, 6 Sep 2023 02:51:43 +0800 Subject: [PATCH] [KYUUBI #5243] Distinguish metadata between batch impl v2 and recovery ### _Why are the changes needed?_ The `recoveryMetadata` is not accurate after batch impl is introduced. This PR proposes to rename `recoveryMetadata` to `metadata` and introduce a dedicated flay `fromRecovery` to distinguish metadata between them. This PR also partially reverts #4798, by removing unnecessary constructor parameters `shouldRunAsync` and `batchConf` ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No. Closes #5243 from pan3793/meta-recov. Closes #5243 0718fbefe [Cheng Pan] nit b8358464c [Cheng Pan] simplify a2d6519c6 [Cheng Pan] fix test 2dad868bd [Cheng Pan] refactor f83d2a602 [Cheng Pan] Distinguish batch impl v2 metadata from recovery Authored-by: Cheng Pan Signed-off-by: Cheng Pan --- .../spark/SparkOnKubernetesTestsSuite.scala | 3 - .../kyuubi/operation/BatchJobSubmission.scala | 45 +++++------ .../operation/KyuubiOperationManager.scala | 6 +- .../kyuubi/server/KyuubiBatchService.scala | 14 +--- .../server/api/v1/BatchesResource.scala | 1 - .../kyuubi/session/KyuubiBatchSession.scala | 75 +++++++++++-------- .../kyuubi/session/KyuubiSessionManager.scala | 22 ++---- .../kyuubi/WithKyuubiServerOnYarn.scala | 2 - .../ServerJsonLoggingEventHandlerSuite.scala | 3 +- .../server/api/v1/BatchesResourceSuite.scala | 22 +++--- .../server/rest/client/BatchCliSuite.scala | 12 +-- 11 files changed, 101 insertions(+), 104 deletions(-) diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala index 037681a3f..3f591e604 100644 --- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala +++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala @@ -19,7 +19,6 @@ package org.apache.kyuubi.kubernetes.test.spark import java.util.UUID -import scala.collection.JavaConverters._ import scala.concurrent.duration._ import org.apache.hadoop.conf.Configuration @@ -149,7 +148,6 @@ class KyuubiOperationKubernetesClusterClientModeSuite "kyuubi", "passwd", "localhost", - batchRequest.getConf.asScala.toMap, batchRequest) eventually(timeout(3.minutes), interval(50.milliseconds)) { @@ -217,7 +215,6 @@ class KyuubiOperationKubernetesClusterClusterModeSuite "runner", "passwd", "localhost", - batchRequest.getConf.asScala.toMap, batchRequest) // wait for driver pod start diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index ac723b2c6..4ea609540 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -58,11 +58,12 @@ class BatchJobSubmission( className: String, batchConf: Map[String, String], batchArgs: Seq[String], - recoveryMetadata: Option[Metadata], - override val shouldRunAsync: Boolean) + metadata: Option[Metadata]) extends KyuubiApplicationOperation(session) { import BatchJobSubmission._ + override def shouldRunAsync: Boolean = true + private val _operationLog = OperationLog.createOperationLog(session, getHandle) private val applicationManager = session.sessionManager.applicationManager @@ -75,7 +76,7 @@ class BatchJobSubmission( private var killMessage: KillResponse = (false, "UNKNOWN") def getKillMessage: KillResponse = killMessage - @volatile private var _appStartTime = recoveryMetadata.map(_.engineOpenTime).getOrElse(0L) + @volatile private var _appStartTime = metadata.map(_.engineOpenTime).getOrElse(0L) def appStartTime: Long = _appStartTime def appStarted: Boolean = _appStartTime > 0 @@ -184,21 +185,24 @@ class BatchJobSubmission( override protected def runInternal(): Unit = session.handleSessionException { val asyncOperation: Runnable = () => { try { - recoveryMetadata match { + metadata match { case Some(metadata) if metadata.peerInstanceClosed => setState(OperationState.CANCELED) case Some(metadata) if metadata.state == OperationState.PENDING.toString => - // In recovery mode, only submit batch job when previous state is PENDING - // and fail to fetch the status including appId from resource manager. - // Otherwise, monitor the submitted batch application. + // case 1: new batch job created using batch impl v2 + // case 2: batch job from recovery, do submission only when previous state is + // PENDING and fail to fetch the status by appId from resource manager, which + // is similar with case 1; otherwise, monitor the submitted batch application. _applicationInfo = currentApplicationInfo() applicationId(_applicationInfo) match { - case Some(appId) => monitorBatchJob(appId) case None => submitAndMonitorBatchJob() + case Some(appId) => monitorBatchJob(appId) } case Some(metadata) => + // batch job from recovery which was submitted monitorBatchJob(metadata.engineId) case None => + // brand-new job created using batch impl v1 submitAndMonitorBatchJob() } setStateIfNotCanceled(OperationState.FINISHED) @@ -219,7 +223,6 @@ class BatchJobSubmission( updateBatchMetadata() } } - if (!shouldRunAsync) getBackgroundHandle.get() } private def submitAndMonitorBatchJob(): Unit = { @@ -295,19 +298,19 @@ class BatchJobSubmission( } if (_applicationInfo.isEmpty) { info(s"The $batchType batch[$batchId] job: $appId not found, assume that it has finished.") - } else if (applicationFailed(_applicationInfo)) { + return + } + if (applicationFailed(_applicationInfo)) { + throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}") + } + updateBatchMetadata() + // TODO: add limit for max batch job submission lifetime + while (_applicationInfo.isDefined && !applicationTerminated(_applicationInfo)) { + Thread.sleep(applicationCheckInterval) + updateApplicationInfoMetadataIfNeeded() + } + if (applicationFailed(_applicationInfo)) { throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}") - } else { - updateBatchMetadata() - // TODO: add limit for max batch job submission lifetime - while (_applicationInfo.isDefined && !applicationTerminated(_applicationInfo)) { - Thread.sleep(applicationCheckInterval) - updateApplicationInfoMetadataIfNeeded() - } - - if (applicationFailed(_applicationInfo)) { - throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}") - } } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala index 8ae9c91f8..739c99cd7 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala @@ -81,8 +81,7 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam className: String, batchConf: Map[String, String], batchArgs: Seq[String], - recoveryMetadata: Option[Metadata], - shouldRunAsync: Boolean): BatchJobSubmission = { + metadata: Option[Metadata]): BatchJobSubmission = { val operation = new BatchJobSubmission( session, batchType, @@ -91,8 +90,7 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam className, batchConf, batchArgs, - recoveryMetadata, - shouldRunAsync) + metadata) addOperation(operation) operation } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala index 7ed2ab8e1..bf10a68fa 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicBoolean import org.apache.kyuubi.config.KyuubiConf.BATCH_SUBMITTER_THREADS import org.apache.kyuubi.operation.OperationState import org.apache.kyuubi.server.metadata.MetadataManager -import org.apache.kyuubi.server.metadata.api.Metadata import org.apache.kyuubi.service.{AbstractService, Serverable} import org.apache.kyuubi.session.KyuubiSessionManager import org.apache.kyuubi.util.ThreadUtils @@ -81,16 +80,9 @@ class KyuubiBatchService( Option(metadata.requestName), metadata.resource, metadata.className, - metadata.requestConf, metadata.requestArgs, - Some(metadata), // TODO some logic need to fix since it's not from recovery - shouldRunAsync = true) - val metadataForUpdate = Metadata( - identifier = batchId, - kyuubiInstance = kyuubiInstance, - requestConf = batchSession.optimizedConf, - clusterManager = batchSession.batchJobSubmissionOp.builder.clusterManager()) - metadataManager.updateMetadata(metadataForUpdate, asyncRetryOnError = false) + Some(metadata), + fromRecovery = false) val sessionHandle = sessionManager.openBatchSession(batchSession) var submitted = false while (!submitted) { // block until batch job submitted @@ -113,7 +105,7 @@ class KyuubiBatchService( // } if (!submitted) Thread.sleep(1000) } - info(s"$batchId is submitted.") + info(s"$batchId is submitted or finished.") } } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index d7e8b615a..12db68aeb 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -269,7 +269,6 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { userName, "anonymous", ipAddress, - request.getConf.asScala.toMap, request) } match { case Success(sessionHandle) => diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala index 014bbced3..c8563bca1 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala @@ -41,10 +41,9 @@ class KyuubiBatchSession( batchName: Option[String], resource: String, className: String, - batchConf: Map[String, String], batchArgs: Seq[String], - recoveryMetadata: Option[Metadata] = None, - shouldRunAsync: Boolean) + metadata: Option[Metadata] = None, + fromRecovery: Boolean) extends KyuubiSession( TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1, user, @@ -55,11 +54,11 @@ class KyuubiBatchSession( override val sessionType: SessionType = SessionType.BATCH override val handle: SessionHandle = { - val batchId = recoveryMetadata.map(_.identifier).getOrElse(conf(KYUUBI_BATCH_ID_KEY)) + val batchId = metadata.map(_.identifier).getOrElse(conf(KYUUBI_BATCH_ID_KEY)) SessionHandle.fromUUID(batchId) } - override def createTime: Long = recoveryMetadata.map(_.createTime).getOrElse(super.createTime) + override def createTime: Long = metadata.map(_.createTime).getOrElse(super.createTime) override def getNoOperationTime: Long = { if (batchJobSubmissionOp != null && !OperationState.isTerminal( @@ -74,7 +73,7 @@ class KyuubiBatchSession( sessionManager.getConf.get(KyuubiConf.BATCH_SESSION_IDLE_TIMEOUT) override val normalizedConf: Map[String, String] = - sessionConf.getBatchConf(batchType) ++ sessionManager.validateBatchConf(batchConf) + sessionConf.getBatchConf(batchType) ++ sessionManager.validateBatchConf(conf) val optimizedConf: Map[String, String] = { val confOverlay = sessionManager.sessionConfAdvisor.getConfOverlay( @@ -95,7 +94,7 @@ class KyuubiBatchSession( // whether the resource file is from uploading private[kyuubi] val isResourceUploaded: Boolean = - batchConf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, "false").toBoolean + conf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, "false").toBoolean private[kyuubi] lazy val batchJobSubmissionOp = sessionManager.operationManager .newBatchJobSubmissionOperation( @@ -106,8 +105,7 @@ class KyuubiBatchSession( className, optimizedConf, batchArgs, - recoveryMetadata, - shouldRunAsync) + metadata) private def waitMetadataRequestsRetryCompletion(): Unit = { val batchId = batchJobSubmissionOp.batchId @@ -122,7 +120,9 @@ class KyuubiBatchSession( } private val sessionEvent = KyuubiSessionEvent(this) - recoveryMetadata.foreach(metadata => sessionEvent.engineId = metadata.engineId) + if (fromRecovery) { + metadata.foreach { m => sessionEvent.engineId = m.engineId } + } EventBus.post(sessionEvent) override def getSessionEvent: Option[KyuubiSessionEvent] = { @@ -142,32 +142,47 @@ class KyuubiBatchSession( override def open(): Unit = handleSessionException { traceMetricsOnOpen() - if (recoveryMetadata.isEmpty) { + lazy val kubernetesInfo: Map[String, String] = { val appMgrInfo = batchJobSubmissionOp.builder.appMgrInfo() - val kubernetesInfo = appMgrInfo.kubernetesInfo.context.map { context => + appMgrInfo.kubernetesInfo.context.map { context => Map(KyuubiConf.KUBERNETES_CONTEXT.key -> context) }.getOrElse(Map.empty) ++ appMgrInfo.kubernetesInfo.namespace.map { namespace => Map(KyuubiConf.KUBERNETES_NAMESPACE.key -> namespace) }.getOrElse(Map.empty) - val metaData = Metadata( - identifier = handle.identifier.toString, - sessionType = sessionType, - realUser = realUser, - username = user, - ipAddress = ipAddress, - kyuubiInstance = connectionUrl, - state = OperationState.PENDING.toString, - resource = resource, - className = className, - requestName = name.orNull, - requestConf = optimizedConf ++ kubernetesInfo, // save the kubernetes info into request conf - requestArgs = batchArgs, - createTime = createTime, - engineType = batchType, - clusterManager = batchJobSubmissionOp.builder.clusterManager()) + } - // there is a chance that operation failed w/ duplicated key error - sessionManager.insertMetadata(metaData) + (metadata, fromRecovery) match { + case (Some(initialMetadata), false) => + // new batch job created using batch impl v2 + val metadataToUpdate = Metadata( + identifier = initialMetadata.identifier, + kyuubiInstance = connectionUrl, + requestName = name.orNull, + requestConf = optimizedConf ++ kubernetesInfo, // save the kubernetes info + clusterManager = batchJobSubmissionOp.builder.clusterManager()) + sessionManager.updateMetadata(metadataToUpdate) + case (None, _) => + // new batch job created using batch impl v1 + val newMetadata = Metadata( + identifier = handle.identifier.toString, + sessionType = sessionType, + realUser = realUser, + username = user, + ipAddress = ipAddress, + kyuubiInstance = connectionUrl, + state = OperationState.PENDING.toString, + resource = resource, + className = className, + requestName = name.orNull, + requestConf = optimizedConf ++ kubernetesInfo, // save the kubernetes info + requestArgs = batchArgs, + createTime = createTime, + engineType = batchType, + clusterManager = batchJobSubmissionOp.builder.clusterManager()) + + // there is a chance that operation failed w/ duplicated key error + sessionManager.insertMetadata(newMetadata) + case _ => } checkSessionAccessPathURIs() diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index 19259bb1b..8d3234699 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -144,10 +144,9 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { batchName: Option[String], resource: String, className: String, - batchConf: Map[String, String], batchArgs: Seq[String], - recoveryMetadata: Option[Metadata] = None, - shouldRunAsync: Boolean): KyuubiBatchSession = { + metadata: Option[Metadata] = None, + fromRecovery: Boolean): KyuubiBatchSession = { // scalastyle:on val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous") val sessionConf = this.getConf.getUserDefaults(user) @@ -162,10 +161,9 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { batchName, resource, className, - batchConf, batchArgs, - recoveryMetadata, - shouldRunAsync) + metadata, + fromRecovery) } private[kyuubi] def openBatchSession(batchSession: KyuubiBatchSession): SessionHandle = { @@ -202,22 +200,19 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { user: String, password: String, ipAddress: String, - conf: Map[String, String], - batchRequest: BatchRequest, - shouldRunAsync: Boolean = true): SessionHandle = { + batchRequest: BatchRequest): SessionHandle = { val batchSession = createBatchSession( user, password, ipAddress, - conf, + batchRequest.getConf.asScala.toMap, batchRequest.getBatchType, Option(batchRequest.getName), batchRequest.getResource, batchRequest.getClassName, - batchRequest.getConf.asScala.toMap, batchRequest.getArgs.asScala.toSeq, None, - shouldRunAsync) + fromRecovery = false) openBatchSession(batchSession) } @@ -313,10 +308,9 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { Option(metadata.requestName), metadata.resource, metadata.className, - metadata.requestConf, metadata.requestArgs, Some(metadata), - shouldRunAsync = true) + fromRecovery = true) }).getOrElse(Seq.empty) } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala index 7a4bfea1b..e4382a859 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala @@ -116,7 +116,6 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD "kyuubi", "passwd", "localhost", - batchRequest.getConf.asScala.toMap, batchRequest) val session = sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSession] @@ -180,7 +179,6 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD "kyuubi", "passwd", "localhost", - batchRequest.getConf.asScala.toMap, batchRequest) val session = sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSession] diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala index 7c79d6a87..2f794ed48 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala @@ -135,13 +135,12 @@ class ServerJsonLoggingEventHandlerSuite extends WithKyuubiServer with HiveJDBCT } } - val batchRequest = newSparkBatchRequest() + val batchRequest = newSparkBatchRequest(Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)) val sessionMgr = server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager] val batchSessionHandle = sessionMgr.openBatchSession( Utils.currentUser, "kyuubi", "127.0.0.1", - Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString), batchRequest) withSessionConf()(Map.empty)(Map("spark.sql.shuffle.partitions" -> "2")) { withJdbcStatement() { statement => diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala index 504b9ac26..3a47461a2 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala @@ -358,12 +358,12 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite "kyuubi", "kyuubi", InetAddress.getLocalHost.getCanonicalHostName, - Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString), newBatchRequest( "spark", sparkBatchTestResource.get, "", - "")) + "", + Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))) sessionManager.openSession( TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11, "", @@ -380,22 +380,22 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite "kyuubi", "kyuubi", InetAddress.getLocalHost.getCanonicalHostName, - Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString), newBatchRequest( "spark", sparkBatchTestResource.get, "", - "")) + "", + Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))) sessionManager.openBatchSession( "kyuubi", "kyuubi", InetAddress.getLocalHost.getCanonicalHostName, - Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString), newBatchRequest( "spark", sparkBatchTestResource.get, "", - "")) + "", + Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))) val response2 = webTarget.path("api/v1/batches") .queryParam("batchType", "spark") @@ -780,12 +780,14 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite .be.sessionManager.asInstanceOf[KyuubiSessionManager] val e = intercept[Exception] { + val conf = Map( + KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString, + "spark.jars" -> "disAllowPath") sessionManager.openBatchSession( "kyuubi", "kyuubi", InetAddress.getLocalHost.getCanonicalHostName, - Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString), - newSparkBatchRequest(Map("spark.jars" -> "disAllowPath"))) + newSparkBatchRequest(conf)) } val sessionHandleRegex = "\\[\\S*]".r val batchId = sessionHandleRegex.findFirstMatchIn(e.getMessage).get.group(0) @@ -803,12 +805,12 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite "kyuubi", "kyuubi", InetAddress.getLocalHost.getCanonicalHostName, - Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString), newBatchRequest( "spark", sparkBatchTestResource.get, "", - uniqueName)) + uniqueName, + Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))) val response = webTarget.path("api/v1/batches") .queryParam("batchName", uniqueName) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala index 0c44fc3a8..bcf8c450e 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala @@ -290,12 +290,12 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat "kyuubi", "kyuubi", InetAddress.getLocalHost.getCanonicalHostName, - Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString), newBatchRequest( "spark", "", "", - "")) + "", + Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))) sessionManager.openSession( TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11, "", @@ -312,22 +312,22 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat "kyuubi", "kyuubi", InetAddress.getLocalHost.getCanonicalHostName, - Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString), newBatchRequest( "spark", "", "", - "")) + "", + Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))) sessionManager.openBatchSession( "kyuubi", "kyuubi", InetAddress.getLocalHost.getCanonicalHostName, - Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString), newBatchRequest( "spark", "", "", - "")) + "", + Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))) val listArgs = Array( "list",