[KYUUBI #6335] [REST] Support uploading extra resources in creating batch jobs via REST API

# 🔍 Description
## Issue References 🔗

## Describe Your Solution 🔧
- support creating batch jobs with uploading extra resource files
- allow uploading extra resource when creating batch jobs via REST API
- support binding the subresources to configs by customed configs, eg.`spark.submit.pyFiles`.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests
+ new test

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6335 from bowenliang123/batch-subresource.

Closes #6335

57d43d26d [Bowen Liang] nit
d866a8a17 [Bowen Liang] warn exception
20d4328a1 [Bowen Liang] log exception when exception ignored
58c402334 [Bowen Liang] rename param to ignoreException
80bc21034 [Bowen Liang] cleanup the uploaded resource folder when handling files error
3e7961124 [Bowen Liang] throw exception when file non-existed
09ac48a26 [liangbowen] pyspark extra resources

Lead-authored-by: Bowen Liang <liangbowen@gf.com.cn>
Co-authored-by: liangbowen <liangbowen@gf.com.cn>
Signed-off-by: Bowen Liang <liangbowen@gf.com.cn>
This commit is contained in:
Bowen Liang 2024-08-07 14:24:02 +08:00
parent 38069464a0
commit 49d224e002
12 changed files with 296 additions and 32 deletions

View File

@ -137,12 +137,23 @@ object Utils extends Logging {
/**
* Delete a directory recursively.
*/
def deleteDirectoryRecursively(f: File): Boolean = {
if (f.isDirectory) f.listFiles match {
case files: Array[File] => files.foreach(deleteDirectoryRecursively)
case _ =>
def deleteDirectoryRecursively(f: File, ignoreException: Boolean = true): Unit = {
if (f.isDirectory) {
val files = f.listFiles
if (files != null && files.nonEmpty) {
files.foreach(deleteDirectoryRecursively(_, ignoreException))
}
}
try {
f.delete()
} catch {
case e: Exception =>
if (ignoreException) {
warn(s"Ignoring the exception in deleting file, path: ${f.toPath}", e)
} else {
throw e
}
}
f.delete()
}
/**

View File

@ -18,8 +18,9 @@
package org.apache.kyuubi.client;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.nio.file.Paths;
import java.util.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.kyuubi.client.api.v1.dto.*;
import org.apache.kyuubi.client.util.JsonUtils;
import org.apache.kyuubi.client.util.VersionUtils;
@ -46,10 +47,29 @@ public class BatchRestApi {
}
public Batch createBatch(BatchRequest request, File resourceFile) {
return createBatch(request, resourceFile, Collections.emptyList());
}
public Batch createBatch(BatchRequest request, File resourceFile, List<String> extraResources) {
setClientVersion(request);
Map<String, MultiPart> multiPartMap = new HashMap<>();
multiPartMap.put("batchRequest", new MultiPart(MultiPart.MultiPartType.JSON, request));
multiPartMap.put("resourceFile", new MultiPart(MultiPart.MultiPartType.FILE, resourceFile));
extraResources.stream()
.distinct()
.filter(StringUtils::isNotBlank)
.map(
path -> {
File file = Paths.get(path).toFile();
if (!file.exists()) {
throw new RuntimeException("File not existed, path: " + path);
}
return file;
})
.forEach(
file ->
multiPartMap.put(
file.getName(), new MultiPart(MultiPart.MultiPartType.FILE, file)));
return this.getClient().post(API_BASE_PATH, multiPartMap, Batch.class, client.getAuthHeader());
}

View File

@ -31,6 +31,7 @@ public class BatchRequest {
private String name;
private Map<String, String> conf = Collections.emptyMap();
private List<String> args = Collections.emptyList();
private Map<String, String> extraResourcesMap = Collections.emptyMap();
public BatchRequest() {}
@ -110,6 +111,14 @@ public class BatchRequest {
this.args = args;
}
public Map<String, String> getExtraResourcesMap() {
return extraResourcesMap;
}
public void setExtraResourcesMap(Map<String, String> extraResourcesMap) {
this.extraResourcesMap = extraResourcesMap;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -120,13 +129,20 @@ public class BatchRequest {
&& Objects.equals(getClassName(), that.getClassName())
&& Objects.equals(getName(), that.getName())
&& Objects.equals(getConf(), that.getConf())
&& Objects.equals(getArgs(), that.getArgs());
&& Objects.equals(getArgs(), that.getArgs())
&& Objects.equals(getExtraResourcesMap(), that.getExtraResourcesMap());
}
@Override
public int hashCode() {
return Objects.hash(
getBatchType(), getResource(), getClassName(), getName(), getConf(), getArgs());
getBatchType(),
getResource(),
getClassName(),
getName(),
getConf(),
getArgs(),
getExtraResourcesMap());
}
@Override

View File

@ -17,7 +17,6 @@
package org.apache.kyuubi.operation
import java.nio.file.{Files, Paths}
import java.util.Locale
import java.util.concurrent.TimeUnit
@ -395,11 +394,7 @@ class BatchJobSubmission(
private def cleanupUploadedResourceIfNeeded(): Unit = {
if (session.isResourceUploaded) {
try {
Files.deleteIfExists(Paths.get(resource))
} catch {
case e: Throwable => error(s"Error deleting the uploaded resource: $resource", e)
}
Utils.deleteDirectoryRecursively(session.resourceUploadFolderPath.toFile)
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.kyuubi.server.api.v1
import java.io.InputStream
import java.nio.file.{Path => JPath}
import java.util
import java.util.{Collections, Locale, UUID}
import java.util.concurrent.ConcurrentHashMap
@ -32,7 +33,7 @@ import io.swagger.v3.oas.annotations.media.{Content, Schema}
import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag
import org.apache.commons.lang3.StringUtils
import org.glassfish.jersey.media.multipart.{FormDataContentDisposition, FormDataParam}
import org.glassfish.jersey.media.multipart.{FormDataContentDisposition, FormDataMultiPart, FormDataParam}
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.client.api.v1.dto._
@ -190,7 +191,8 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
def openBatchSessionWithUpload(
@FormDataParam("batchRequest") batchRequest: BatchRequest,
@FormDataParam("resourceFile") resourceFileInputStream: InputStream,
@FormDataParam("resourceFile") resourceFileMetadata: FormDataContentDisposition): Batch = {
@FormDataParam("resourceFile") resourceFileMetadata: FormDataContentDisposition,
formDataMultiPart: FormDataMultiPart): Batch = {
require(
fe.getConf.get(BATCH_RESOURCE_UPLOAD_ENABLED),
"Batch resource upload function is disabled.")
@ -198,12 +200,12 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
batchRequest != null,
"batchRequest is required and please check the content type" +
" of batchRequest is application/json")
val tempFile = Utils.writeToTempFile(
resourceFileInputStream,
KyuubiApplicationManager.uploadWorkDir,
resourceFileMetadata.getFileName)
batchRequest.setResource(tempFile.getPath)
openBatchSessionInternal(batchRequest, isResourceFromUpload = true)
openBatchSessionInternal(
batchRequest,
isResourceFromUpload = true,
resourceFileInputStream = Some(resourceFileInputStream),
resourceFileMetadata = Some(resourceFileMetadata),
formDataMultiPartOpt = Some(formDataMultiPart))
}
/**
@ -215,7 +217,10 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
*/
private def openBatchSessionInternal(
request: BatchRequest,
isResourceFromUpload: Boolean = false): Batch = {
isResourceFromUpload: Boolean = false,
resourceFileInputStream: Option[InputStream] = None,
resourceFileMetadata: Option[FormDataContentDisposition] = None,
formDataMultiPartOpt: Option[FormDataMultiPart] = None): Batch = {
require(
supportedBatchType(request.getBatchType),
s"${request.getBatchType} is not in the supported list: $SUPPORTED_BATCH_TYPES}")
@ -243,6 +248,14 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
markDuplicated(batch)
case None =>
val batchId = userProvidedBatchId.getOrElse(UUID.randomUUID().toString)
if (isResourceFromUpload) {
handleUploadingFiles(
batchId,
request,
resourceFileInputStream.get,
resourceFileMetadata.get.getFileName,
formDataMultiPartOpt)
}
request.setConf(
(request.getConf.asScala ++ Map(
KYUUBI_BATCH_ID_KEY -> batchId,
@ -525,22 +538,110 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
}
}
}
private def handleUploadingFiles(
batchId: String,
request: BatchRequest,
resourceFileInputStream: InputStream,
resourceFileName: String,
formDataMultiPartOpt: Option[FormDataMultiPart]): Option[JPath] = {
val uploadFileFolderPath = batchResourceUploadFolderPath(batchId)
try {
handleUploadingResourceFile(
request,
resourceFileInputStream,
resourceFileName,
uploadFileFolderPath)
handleUploadingExtraResourcesFiles(request, formDataMultiPartOpt, uploadFileFolderPath)
Some(uploadFileFolderPath)
} catch {
case e: Exception =>
Utils.deleteDirectoryRecursively(uploadFileFolderPath.toFile)
throw e
}
}
private def handleUploadingResourceFile(
request: BatchRequest,
inputStream: InputStream,
fileName: String,
uploadFileFolderPath: JPath): Unit = {
try {
val tempFile = Utils.writeToTempFile(inputStream, uploadFileFolderPath, fileName)
request.setResource(tempFile.getPath)
} catch {
case e: Exception =>
throw new RuntimeException(
s"Failed handling uploaded resource file $fileName: ${e.getMessage}",
e)
}
}
private def handleUploadingExtraResourcesFiles(
request: BatchRequest,
formDataMultiPartOpt: Option[FormDataMultiPart],
uploadFileFolderPath: JPath): Unit = {
val extraResourceMap = request.getExtraResourcesMap.asScala
if (extraResourceMap.nonEmpty) {
val fileNameSeparator = ","
val formDataMultiPart = formDataMultiPartOpt.get
val transformedExtraResourcesMap = extraResourceMap
.mapValues(confValue =>
confValue.split(fileNameSeparator).filter(StringUtils.isNotBlank(_)))
.filter { case (confKey, fileNames) =>
fileNames.nonEmpty && StringUtils.isNotBlank(confKey)
}.mapValues { fileNames =>
fileNames.map(fileName =>
Option(formDataMultiPart.getField(fileName))
.getOrElse(throw new RuntimeException(s"File part for file $fileName not found")))
}.map {
case (confKey, fileParts) =>
val tempFilePaths = fileParts.map { filePart =>
val fileName = filePart.getContentDisposition.getFileName
try {
Utils.writeToTempFile(
filePart.getValueAs(classOf[InputStream]),
uploadFileFolderPath,
fileName).getPath
} catch {
case e: Exception =>
throw new RuntimeException(
s"Failed handling uploaded extra resource file $fileName: ${e.getMessage}",
e)
}
}
(confKey, tempFilePaths.mkString(fileNameSeparator))
}
val conf = request.getConf
transformedExtraResourcesMap.foreach { case (confKey, tempFilePathStr) =>
conf.get(confKey) match {
case confValue: String if StringUtils.isNotBlank(confValue) =>
conf.put(confKey, List(confValue.trim, tempFilePathStr).mkString(fileNameSeparator))
case _ => conf.put(confKey, tempFilePathStr)
}
}
}
}
}
object BatchesResource {
val SUPPORTED_BATCH_TYPES = Seq("SPARK", "PYSPARK")
val VALID_BATCH_STATES = Seq(
private lazy val SUPPORTED_BATCH_TYPES = Set("SPARK", "PYSPARK")
private lazy val VALID_BATCH_STATES = Set(
OperationState.PENDING,
OperationState.RUNNING,
OperationState.FINISHED,
OperationState.ERROR,
OperationState.CANCELED).map(_.toString)
def supportedBatchType(batchType: String): Boolean = {
private def supportedBatchType(batchType: String): Boolean = {
Option(batchType).exists(bt => SUPPORTED_BATCH_TYPES.contains(bt.toUpperCase(Locale.ROOT)))
}
def validBatchState(batchState: String): Boolean = {
private def validBatchState(batchState: String): Boolean = {
Option(batchState).exists(bt => VALID_BATCH_STATES.contains(bt.toUpperCase(Locale.ROOT)))
}
def batchResourceUploadFolderPath(batchId: String): JPath =
KyuubiApplicationManager.uploadWorkDir.resolve(s"batch-$batchId")
}

View File

@ -17,6 +17,8 @@
package org.apache.kyuubi.session
import java.nio.file.Path
import scala.collection.JavaConverters._
import org.apache.kyuubi.client.util.BatchUtils._
@ -26,6 +28,7 @@ import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.api.v1.BatchesResource
import org.apache.kyuubi.server.metadata.api.Metadata
import org.apache.kyuubi.session.SessionType.SessionType
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
@ -79,6 +82,9 @@ class KyuubiBatchSession(
override val normalizedConf: Map[String, String] =
sessionConf.getBatchConf(batchType) ++ sessionManager.validateBatchConf(conf)
private[kyuubi] def resourceUploadFolderPath: Path =
BatchesResource.batchResourceUploadFolderPath(batchJobSubmissionOp.batchId)
val optimizedConf: Map[String, String] = {
val confOverlay = sessionManager.sessionConfAdvisor.map(_.getConfOverlay(
user,
@ -101,8 +107,8 @@ class KyuubiBatchSession(
batchName.filterNot(_.trim.isEmpty).orElse(optimizedConf.get(KyuubiConf.SESSION_NAME.key))
// whether the resource file is from uploading
private[kyuubi] val isResourceUploaded: Boolean =
conf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, "false").toBoolean
private[kyuubi] lazy val isResourceUploaded: Boolean =
conf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, false.toString).toBoolean
private[kyuubi] lazy val batchJobSubmissionOp = sessionManager.operationManager
.newBatchJobSubmissionOperation(

View File

@ -0,0 +1,20 @@
from module1.module import func1
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType
if __name__ == "__main__":
print(f"Started running PySpark app at {func1()}")
spark = SparkSession.builder.appName("pyspark-sample").getOrCreate()
sc = spark.sparkContext
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
transformed_rdd = rdd.map(lambda x: x * 2)
collected = transformed_rdd.collect()
df = spark.createDataFrame(transformed_rdd, IntegerType())
df.coalesce(1).write.format("csv").option("header", "false").save("/tmp/" + func1())
print(f"Result: {collected}")

View File

@ -0,0 +1,5 @@
from module2.module import current_time
def func1():
return "result_" + current_time()

View File

@ -0,0 +1,6 @@
from datetime import datetime
def current_time():
now = datetime.now()
return now.strftime("%Y%m%d%H%M%S")

View File

@ -16,9 +16,12 @@
*/
package org.apache.kyuubi.server.rest.client
import java.nio.file.Paths
import java.io.{File, FileOutputStream}
import java.nio.file.{Files, Path, Paths}
import java.util.Base64
import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.JavaConverters._
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
@ -29,6 +32,7 @@ import org.apache.kyuubi.client.exception.KyuubiRestException
import org.apache.kyuubi.config.KyuubiReservedKeys
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.session.{KyuubiSession, SessionHandle}
import org.apache.kyuubi.util.GoldenFileUtils.getCurrentModuleHome
class BatchRestApiSuite extends RestClientTestHelper with BatchTestHelper {
@ -99,6 +103,86 @@ class BatchRestApiSuite extends RestClientTestHelper with BatchTestHelper {
basicKyuubiRestClient.close()
}
test("basic batch rest client with uploading resource and extra resources") {
def preparePyModulesZip(
srcFolderPath: Path,
targetZipFileName: String,
excludedFileNames: Set[String] = Set.empty[String]): String = {
def addFolderToZip(zos: ZipOutputStream, folder: File, parentFolder: String = ""): Unit = {
if (folder.isDirectory) {
folder.listFiles().foreach { file =>
val fileName = file.getName
if (!(excludedFileNames.contains(fileName) || fileName.startsWith("."))) {
if (file.isDirectory) {
val folderPath =
if (parentFolder.isEmpty) fileName else parentFolder + "/" + fileName
addFolderToZip(zos, file, folderPath)
} else {
val filePath = if (parentFolder.isEmpty) fileName else parentFolder + "/" + fileName
zos.putNextEntry(new ZipEntry(filePath))
zos.write(Files.readAllBytes(file.toPath))
zos.closeEntry()
}
}
}
}
}
val zipFilePath = Paths.get(System.getProperty("java.io.tmpdir"), targetZipFileName).toString
val fileOutputStream = new FileOutputStream(zipFilePath)
val zipOutputStream = new ZipOutputStream(fileOutputStream)
try {
addFolderToZip(zipOutputStream, srcFolderPath.toFile)
} finally {
zipOutputStream.close()
fileOutputStream.close()
}
zipFilePath
}
val basicKyuubiRestClient: KyuubiRestClient =
KyuubiRestClient.builder(baseUri.toString)
.authHeaderMethod(KyuubiRestClient.AuthHeaderMethod.BASIC)
.username(ldapUser)
.password(ldapUserPasswd)
.socketTimeout(5 * 60 * 1000)
.build()
val batchRestApi: BatchRestApi = new BatchRestApi(basicKyuubiRestClient)
val pythonScriptsPath = s"${getCurrentModuleHome(this)}/src/test/resources/python/"
val appScriptFileName = "app.py"
val appScriptFile = Paths.get(pythonScriptsPath, appScriptFileName).toFile
val modulesZipFileName = "pymodules.zip"
val modulesZipFile = preparePyModulesZip(
srcFolderPath = Paths.get(pythonScriptsPath),
targetZipFileName = modulesZipFileName,
excludedFileNames = Set(appScriptFileName))
val requestObj = newSparkBatchRequest(Map("spark.master" -> "local"))
requestObj.setBatchType("PYSPARK")
requestObj.setName("pyspark-test")
requestObj.setExtraResourcesMap(Map("spark.submit.pyFiles" -> modulesZipFileName).asJava)
val extraResources = List(modulesZipFile)
val batch: Batch = batchRestApi.createBatch(requestObj, appScriptFile, extraResources.asJava)
try {
assert(batch.getKyuubiInstance === fe.connectionUrl)
assert(batch.getBatchType === "PYSPARK")
val batchId = batch.getId
assert(batchId !== null)
eventually(timeout(1.minutes), interval(1.seconds)) {
val batch = batchRestApi.getBatchById(batchId)
assert(batch.getState == "FINISHED")
}
} finally {
Files.deleteIfExists(Paths.get(modulesZipFile))
basicKyuubiRestClient.close()
}
}
test("basic batch rest client with invalid user") {
val totalConnections =
MetricsSystem.counterValue(MetricsConstants.REST_CONN_TOTAL).getOrElse(0L)