diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala index 247f0c813..a80f13a86 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala @@ -24,6 +24,8 @@ import java.util.Locale import scala.util.control.NonFatal +import org.apache.commons.lang3.StringUtils + import org.apache.kyuubi.{KyuubiException, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.KubernetesApplicationOperation.LABEL_KYUUBI_UNIQUE_KEY @@ -211,4 +213,9 @@ object KyuubiApplicationManager { case _ => } } + + def sessionUploadFolderPath(sessionId: String): Path = { + require(StringUtils.isNotBlank(sessionId)) + uploadWorkDir.resolve(sessionId) + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index 182b28b0c..b5e98845e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -545,7 +545,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { resourceFileInputStream: InputStream, resourceFileName: String, formDataMultiPartOpt: Option[FormDataMultiPart]): Option[JPath] = { - val uploadFileFolderPath = batchResourceUploadFolderPath(batchId) + val uploadFileFolderPath = KyuubiApplicationManager.sessionUploadFolderPath(batchId) try { handleUploadingResourceFile( request, @@ -642,6 +642,8 @@ object BatchesResource { Option(batchState).exists(bt => VALID_BATCH_STATES.contains(bt.toUpperCase(Locale.ROOT))) } - def batchResourceUploadFolderPath(batchId: String): JPath = - KyuubiApplicationManager.uploadWorkDir.resolve(s"batch-$batchId") + def batchResourceUploadFolderPath(sessionId: String): JPath = { + require(StringUtils.isNotBlank(sessionId)) + KyuubiApplicationManager.uploadWorkDir.resolve(sessionId) + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala index 5dc3a605d..c8cce59e1 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala @@ -28,7 +28,6 @@ import org.apache.kyuubi.engine.KyuubiApplicationManager import org.apache.kyuubi.engine.spark.SparkProcessBuilder import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent} import org.apache.kyuubi.operation.OperationState -import org.apache.kyuubi.server.api.v1.BatchesResource import org.apache.kyuubi.server.metadata.api.Metadata import org.apache.kyuubi.session.SessionType.SessionType import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion @@ -83,7 +82,7 @@ class KyuubiBatchSession( sessionConf.getBatchConf(batchType) ++ sessionManager.validateBatchConf(conf) private[kyuubi] def resourceUploadFolderPath: Path = - BatchesResource.batchResourceUploadFolderPath(batchJobSubmissionOp.batchId) + KyuubiApplicationManager.sessionUploadFolderPath(handle.identifier.toString) val optimizedConf: Map[String, String] = { val confOverlay = sessionManager.sessionConfAdvisor.map(_.getConfOverlay(