[KYUUBI #6335][FOLLOWUP] Using sessionId for sessionUploadFolderPath

# 🔍 Description
## Issue References 🔗

Followup for #6335

Just use the session handle identifier to build the batchResourceUploadFolderPath, and do not rely on the BatchJobSubmission operation initialization.
## Describe Your Solution 🔧

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6629 from turboFei/batch_id.

Closes #6335

28df03d4d [liangbowen] unused import
2eee2a64f [Wang, Fei] Move to KyuubiApplicationManager
f23ecae1f [liangbowen] update batchResourceUploadFolderPath method for removing batch prefix and checking sessionId
b30a7b996 [Wang, Fei] using session id

Lead-authored-by: Wang, Fei <fwang12@ebay.com>
Co-authored-by: liangbowen <liangbowen@gf.com.cn>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Wang, Fei 2024-08-19 23:07:54 -07:00
parent 9a82eb5c12
commit 9cf22452c8
3 changed files with 13 additions and 5 deletions

View File

@ -24,6 +24,8 @@ import java.util.Locale
import scala.util.control.NonFatal
import org.apache.commons.lang3.StringUtils
import org.apache.kyuubi.{KyuubiException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.KubernetesApplicationOperation.LABEL_KYUUBI_UNIQUE_KEY
@ -211,4 +213,9 @@ object KyuubiApplicationManager {
case _ =>
}
}
def sessionUploadFolderPath(sessionId: String): Path = {
require(StringUtils.isNotBlank(sessionId))
uploadWorkDir.resolve(sessionId)
}
}

View File

@ -545,7 +545,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
resourceFileInputStream: InputStream,
resourceFileName: String,
formDataMultiPartOpt: Option[FormDataMultiPart]): Option[JPath] = {
val uploadFileFolderPath = batchResourceUploadFolderPath(batchId)
val uploadFileFolderPath = KyuubiApplicationManager.sessionUploadFolderPath(batchId)
try {
handleUploadingResourceFile(
request,
@ -642,6 +642,8 @@ object BatchesResource {
Option(batchState).exists(bt => VALID_BATCH_STATES.contains(bt.toUpperCase(Locale.ROOT)))
}
def batchResourceUploadFolderPath(batchId: String): JPath =
KyuubiApplicationManager.uploadWorkDir.resolve(s"batch-$batchId")
def batchResourceUploadFolderPath(sessionId: String): JPath = {
require(StringUtils.isNotBlank(sessionId))
KyuubiApplicationManager.uploadWorkDir.resolve(sessionId)
}
}

View File

@ -28,7 +28,6 @@ import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.api.v1.BatchesResource
import org.apache.kyuubi.server.metadata.api.Metadata
import org.apache.kyuubi.session.SessionType.SessionType
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
@ -83,7 +82,7 @@ class KyuubiBatchSession(
sessionConf.getBatchConf(batchType) ++ sessionManager.validateBatchConf(conf)
private[kyuubi] def resourceUploadFolderPath: Path =
BatchesResource.batchResourceUploadFolderPath(batchJobSubmissionOp.batchId)
KyuubiApplicationManager.sessionUploadFolderPath(handle.identifier.toString)
val optimizedConf: Map[String, String] = {
val confOverlay = sessionManager.sessionConfAdvisor.map(_.getConfOverlay(