From f9b933a66181ea73d7d296cdd2db6a928a89a867 Mon Sep 17 00:00:00 2001 From: yanghua Date: Sat, 6 Nov 2021 11:33:20 +0800 Subject: [PATCH] [KYUUBI #1340] Refactor ProcBuilder creation based on EngineType ### _Why are the changes needed?_ ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1341 from yanghua/KYUUBI-1340. Closes #1340 aa0df59a [yanghua] Fixed test failure ee2cd078 [yanghua] [KYUUBI #1340] Refactor ProcBuilder creation based on EngineType Authored-by: yanghua Signed-off-by: Kent Yao --- .../org/apache/kyuubi/engine/EngineRef.scala | 18 +++++++++++------- .../apache/kyuubi/engine/EngineRefSuite.scala | 1 + 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index b85f2d6dc..fc06bdfd5 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ -import org.apache.kyuubi.engine.EngineType.EngineType +import org.apache.kyuubi.engine.EngineType.{EngineType, SPARK_SQL} import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, SERVER, ShareLevel} import org.apache.kyuubi.engine.spark.SparkProcessBuilder import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID @@ -179,13 +179,17 @@ private[kyuubi] class EngineRef( var engineRef = getServerHost(zkClient, engineSpace) if (engineRef.nonEmpty) return engineRef.get - conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName) - // tag is a seq type with comma-separated - conf.set(SparkProcessBuilder.TAG_KEY, - conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI") conf.set(HA_ZK_NAMESPACE, engineSpace) conf.set(HA_ZK_ENGINE_REF_ID, engineRefId) - val builder = new SparkProcessBuilder(appUser, conf) + val builder = engineType match { + case SPARK_SQL => + conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName) + // tag is a seq type with comma-separated + conf.set(SparkProcessBuilder.TAG_KEY, + conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI") + new SparkProcessBuilder(appUser, conf) + case _ => throw new UnsupportedOperationException(s"Unsupported engine type: ${engineType}") + } MetricsSystem.tracing(_.incCount(ENGINE_TOTAL)) try { info(s"Launching engine:\n$builder") @@ -209,7 +213,7 @@ private[kyuubi] class EngineRef( process.destroyForcibly() MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser))) throw KyuubiSQLException( - s"Timeout($timeout ms) to launched Spark with $builder. $killMessage", + s"Timeout($timeout ms) to launched $engineType engine with $builder. $killMessage", builder.getError) } engineRef = getEngineByRefId(zkClient, engineSpace, engineRefId) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala index e5aceba8e..dc3405e3f 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala @@ -166,6 +166,7 @@ class EngineRefSuite extends KyuubiFunSuite { test("start and get engine address with lock") { val id = UUID.randomUUID().toString conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString) + conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString) conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0) conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test") conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString)