[KYUUBI #1340] Refactor ProcBuilder creation based on EngineType
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1341 from yanghua/KYUUBI-1340. Closes #1340 aa0df59a [yanghua] Fixed test failure ee2cd078 [yanghua] [KYUUBI #1340] Refactor ProcBuilder creation based on EngineType Authored-by: yanghua <yanghua1127@gmail.com> Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
parent
04d4179bbe
commit
f9b933a661
@ -32,7 +32,7 @@ import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.KyuubiConf._
|
||||
import org.apache.kyuubi.engine.EngineType.EngineType
|
||||
import org.apache.kyuubi.engine.EngineType.{EngineType, SPARK_SQL}
|
||||
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, SERVER, ShareLevel}
|
||||
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
|
||||
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
|
||||
@ -179,13 +179,17 @@ private[kyuubi] class EngineRef(
|
||||
var engineRef = getServerHost(zkClient, engineSpace)
|
||||
if (engineRef.nonEmpty) return engineRef.get
|
||||
|
||||
conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
|
||||
// tag is a seq type with comma-separated
|
||||
conf.set(SparkProcessBuilder.TAG_KEY,
|
||||
conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
|
||||
conf.set(HA_ZK_NAMESPACE, engineSpace)
|
||||
conf.set(HA_ZK_ENGINE_REF_ID, engineRefId)
|
||||
val builder = new SparkProcessBuilder(appUser, conf)
|
||||
val builder = engineType match {
|
||||
case SPARK_SQL =>
|
||||
conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
|
||||
// tag is a seq type with comma-separated
|
||||
conf.set(SparkProcessBuilder.TAG_KEY,
|
||||
conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
|
||||
new SparkProcessBuilder(appUser, conf)
|
||||
case _ => throw new UnsupportedOperationException(s"Unsupported engine type: ${engineType}")
|
||||
}
|
||||
MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
|
||||
try {
|
||||
info(s"Launching engine:\n$builder")
|
||||
@ -209,7 +213,7 @@ private[kyuubi] class EngineRef(
|
||||
process.destroyForcibly()
|
||||
MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser)))
|
||||
throw KyuubiSQLException(
|
||||
s"Timeout($timeout ms) to launched Spark with $builder. $killMessage",
|
||||
s"Timeout($timeout ms) to launched $engineType engine with $builder. $killMessage",
|
||||
builder.getError)
|
||||
}
|
||||
engineRef = getEngineByRefId(zkClient, engineSpace, engineRefId)
|
||||
|
||||
@ -166,6 +166,7 @@ class EngineRefSuite extends KyuubiFunSuite {
|
||||
test("start and get engine address with lock") {
|
||||
val id = UUID.randomUUID().toString
|
||||
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
|
||||
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
|
||||
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
|
||||
conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test")
|
||||
conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user