[KYUUBI #3836] [SPARK] Support pyspark batch job by restful api
### _Why are the changes needed?_ Submit pyspark batch job by restful api ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #3836 from leoluan2009/pyspark-1. Closes #3836 550021ac [Cheng Pan] Update kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala 357691c2 [Xuedong Luan] fix comment 7dfdbe24 [Xuedong Luan] fix comment 31bda178 [Xuedong Luan] [WIP] Support pyspark batch job by restful api Lead-authored-by: Xuedong Luan <luanxuedong2009@gmail.com> Co-authored-by: Cheng Pan <pan3793@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
ce11e35822
commit
00d2d2eb67
@ -124,7 +124,11 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
|
||||
|
||||
/** Get all batch conf as map */
|
||||
def getBatchConf(batchType: String): Map[String, String] = {
|
||||
getAllWithPrefix(s"$KYUUBI_BATCH_CONF_PREFIX.${batchType.toLowerCase(Locale.ROOT)}", "")
|
||||
val normalizedBatchType = batchType.toLowerCase(Locale.ROOT) match {
|
||||
case "pyspark" => "spark"
|
||||
case other => other.toLowerCase(Locale.ROOT)
|
||||
}
|
||||
getAllWithPrefix(s"$KYUUBI_BATCH_CONF_PREFIX.$normalizedBatchType", "")
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -178,7 +178,7 @@ object KyuubiApplicationManager {
|
||||
appConf: Map[String, String],
|
||||
kyuubiConf: KyuubiConf): Unit = {
|
||||
applicationType.toUpperCase(Locale.ROOT) match {
|
||||
case appType if appType.startsWith("SPARK") => checkSparkAccessPaths(appConf, kyuubiConf)
|
||||
case appType if appType.contains("SPARK") => checkSparkAccessPaths(appConf, kyuubiConf)
|
||||
case appType if appType.startsWith("FLINK") => // TODO: check flink app access local paths
|
||||
case _ =>
|
||||
}
|
||||
|
||||
@ -206,6 +206,7 @@ object SparkProcessBuilder {
|
||||
"spark.yarn.jars",
|
||||
"spark.yarn.dist.files",
|
||||
"spark.yarn.dist.pyFiles",
|
||||
"spark.submit.pyFiles",
|
||||
"spark.yarn.dist.jars",
|
||||
"spark.yarn.dist.archives",
|
||||
"spark.kerberos.keytab",
|
||||
|
||||
@ -77,7 +77,7 @@ class BatchJobSubmission(
|
||||
@VisibleForTesting
|
||||
private[kyuubi] val builder: ProcBuilder = {
|
||||
Option(batchType).map(_.toUpperCase(Locale.ROOT)) match {
|
||||
case Some("SPARK") =>
|
||||
case Some("SPARK") | Some("PYSPARK") =>
|
||||
new SparkBatchProcessBuilder(
|
||||
session.user,
|
||||
session.sessionConf,
|
||||
|
||||
@ -163,7 +163,9 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
|
||||
supportedBatchType(request.getBatchType),
|
||||
s"${request.getBatchType} is not in the supported list: $SUPPORTED_BATCH_TYPES}")
|
||||
require(request.getResource != null, "resource is a required parameter")
|
||||
require(request.getClassName != null, "classname is a required parameter")
|
||||
if (request.getBatchType.equalsIgnoreCase("SPARK")) {
|
||||
require(request.getClassName != null, "classname is a required parameter for SPARK")
|
||||
}
|
||||
request.setBatchType(request.getBatchType.toUpperCase(Locale.ROOT))
|
||||
|
||||
val userName = fe.getSessionUser(request.getConf.asScala.toMap)
|
||||
@ -366,7 +368,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
|
||||
}
|
||||
|
||||
object BatchesResource {
|
||||
val SUPPORTED_BATCH_TYPES = Seq("SPARK")
|
||||
val SUPPORTED_BATCH_TYPES = Seq("SPARK", "PYSPARK")
|
||||
val VALID_BATCH_STATES = Seq(
|
||||
OperationState.PENDING,
|
||||
OperationState.RUNNING,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user