From 7a79b16cdf2deae68768a37ecddf748bed046bb2 Mon Sep 17 00:00:00 2001 From: Zen Date: Thu, 24 Dec 2020 19:02:40 +0800 Subject: [PATCH] fix #259 engine scope Squashed commit of the following: commit bb598b101da43b145bd4c95976e8decd39a02210 Merge: 54e730c c4cf523 Author: zen Date: Thu Dec 24 10:53:42 2020 +0800 Merge remote-tracking branch 'origin/pr2-engine-scope' into pr2-engine-scope # Conflicts: # externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala commit 54e730c5b26ffb147e5c1266bdd036460cc20d82 Merge: 62d1575 512fbc5 Author: zen Date: Thu Dec 24 10:51:21 2020 +0800 Merge branch 'master' into pr2-engine-scope # Conflicts: # externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala commit c4cf523dc5bc409ea7ac742292d5e6e2d6421b8a Author: zen Date: Mon Dec 21 23:10:12 2020 +0800 merge master and fix the conflicts commit c53b166f1eb3e8a698f498e1d123c815cef5d665 Merge: 62d1575 e8bc44c Author: zen Date: Mon Dec 21 23:02:26 2020 +0800 Merge branch 'master' into pr2-engine-scope # Conflicts: # externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala commit 62d15751ea6b213319501bcfac442d21d0cd5ca6 Author: zen Date: Thu Dec 10 15:40:48 2020 +0800 fix zkNamespace commit 7e3e432318ed931b1ad16c839c4a027e00bc6ef9 Author: zen Date: Thu Dec 10 14:34:34 2020 +0800 Support for configuring the session scope of SparkSQLEngine. --- .../kyuubi/engine/spark/SparkSQLEngine.scala | 24 ++++- .../org/apache/kyuubi/config/KyuubiConf.scala | 13 +++ .../apache/kyuubi/engine/EngineAppName.scala | 88 +++++++++++++++++++ .../apache/kyuubi/engine/EngineScope.scala | 26 ++++++ .../kyuubi/engine/EngineAppNameSuite.scala | 71 +++++++++++++++ .../kyuubi/session/KyuubiSessionImpl.scala | 5 +- 6 files changed, 223 insertions(+), 4 deletions(-) create mode 100644 kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineAppName.scala create mode 100644 kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineScope.scala create mode 100644 kyuubi-common/src/test/scala/org/apache/kyuubi/engine/EngineAppNameSuite.scala diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index 81ad368ba..e4ed0ed01 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.SparkSession import org.apache.kyuubi.{Logging, Utils} import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.engine.{EngineAppName, EngineScope} import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch import org.apache.kyuubi.ha.HighAvailabilityConf._ import org.apache.kyuubi.ha.client.{RetryPolicies, ServiceDiscovery} @@ -38,6 +39,11 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession) override private[kyuubi] val backendService = new SparkSQLBackendService(spark) + val appName: EngineAppName = EngineAppName.parseAppName( + spark.conf.get(EngineAppName.SPARK_APP_NAME_KEY), + SparkSQLEngine.kyuubiConf + ) + override protected def stopServer(): Unit = { countDownLatch.countDown() spark.stop() @@ -59,7 +65,7 @@ object SparkSQLEngine extends Logging { val appName = s"kyuubi_${user}_spark_${Instant.now}" - sparkConf.setAppName(appName) + sparkConf.setIfMissing(EngineAppName.SPARK_APP_NAME_KEY, appName) kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_BIND_PORT, 0) kyuubiConf.setIfMissing(HA_ZK_CONN_RETRY_POLICY, RetryPolicies.N_TIME.toString) @@ -93,10 +99,22 @@ object SparkSQLEngine extends Logging { val needExpose = kyuubiConf.get(HA_ZK_QUORUM).nonEmpty if (needExpose) { val zkNamespacePrefix = kyuubiConf.get(HA_ZK_NAMESPACE) - val serviceDiscovery = new ServiceDiscovery(engine, s"$zkNamespacePrefix-$user") + val zkNamespace = engine.appName.makeZkPath(zkNamespacePrefix) + val serviceDiscovery = new ServiceDiscovery(engine, zkNamespace.substring(1)) serviceDiscovery.initialize(kyuubiConf) serviceDiscovery.start() - sys.addShutdownHook(serviceDiscovery.stop()) + sys.addShutdownHook({ + serviceDiscovery.stop() + if (EngineScope.SESSION.equals(engine.appName.getEngineScope)) { + val zkClient = ServiceDiscovery.startZookeeperClient(kyuubiConf) + try { + info(s"Deleting engine service's namespace: $zkNamespace") + zkClient.delete().forPath(zkNamespace) + } finally { + zkClient.close() + } + } + }) } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 6afef5fa5..f7fab1516 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import org.apache.kyuubi.{Logging, Utils} +import org.apache.kyuubi.engine.EngineScope import org.apache.kyuubi.service.authentication.{AuthTypes, SaslQOP} case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging { @@ -471,4 +472,16 @@ object KyuubiConf { .version("1.0.0") .timeConf .createWithDefault(Duration.ofSeconds(5).toMillis) + + val ENGINE_SCOPE: ConfigEntry[String] = buildConf("session.engine.scope") + .doc("The engine session scope.
    " + + "
  • S: One engine per kyuubi session in kyuubi cluster.
  • " + + "
  • U: One engine per user in kyuubi cluster.
  • " + + "
  • G: One engine per group in kyuubi cluster.
  • " + + "
  • K: One engine per kyuubi server in kyuubi cluster.
") + .version("1.0.0") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(EngineScope.values.map(_.toString)) + .createWithDefault(EngineScope.USER.toString) } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineAppName.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineAppName.scala new file mode 100644 index 000000000..254345386 --- /dev/null +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineAppName.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine + +import org.apache.curator.utils.ZKPaths + +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.ENGINE_SCOPE +import org.apache.kyuubi.engine.EngineScope.{EngineScope, SESSION} + +class EngineAppName(user: String, sessionId: String, conf: KyuubiConf) { + + import EngineAppName._ + + private val engineScope = EngineScope.withName(conf.get(ENGINE_SCOPE)) + + def getEngineScope: EngineScope = engineScope + + /** + * kyuubi_[KGUS]_user_sessionid + * + * @return engine app name + */ + def generateAppName(): String = { + StringBuilder.newBuilder.append(APP_NAME_PREFIX) + .append(DELIMITER).append(engineScope) + .append(DELIMITER).append(user) + .append(DELIMITER).append(sessionId).mkString + } + + /** + * if engine scope is [S] return + * /[zkNamespace]-engine/S/[user]/[sessionId] + * else return + * /[zkNamespace]-engine/[engineScope]/[user] + * + * @param zkNamespace zk root path + * @return engine zk path + */ + def makeZkPath(zkNamespace: String): String = { + val namespace = zkNamespace + "-" + ZK_NAMESPACE_SUFFIX + engineScope match { + case SESSION => + ZKPaths.makePath(namespace, engineScope.toString, user, sessionId) + case _ => + ZKPaths.makePath(namespace, engineScope.toString, user) + } + } + +} + +object EngineAppName { + + private val APP_NAME_PREFIX = "kyuubi" + + private val ZK_NAMESPACE_SUFFIX = "engine" + + private val DELIMITER = "_" + + val SPARK_APP_NAME_KEY = "spark.app.name" + + def apply(user: String, sessionId: String, conf: KyuubiConf): EngineAppName = + new EngineAppName(user, sessionId, conf) + + def parseAppName(appName: String, conf: KyuubiConf): EngineAppName = { + val params = appName.split(DELIMITER) + val clone = conf.clone + clone.set(ENGINE_SCOPE, params(1)) + EngineAppName(params(2), params(3), conf) + } + +} + diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineScope.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineScope.scala new file mode 100644 index 000000000..b9c55f1fc --- /dev/null +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineScope.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine + +object EngineScope extends Enumeration { + type EngineScope = Value + val SESSION = Value("S") + val USER = Value("U") + val GROUP = Value("G") + val SERVER = Value("K") +} diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/engine/EngineAppNameSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/engine/EngineAppNameSuite.scala new file mode 100644 index 000000000..838be9555 --- /dev/null +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/engine/EngineAppNameSuite.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine + +import org.apache.kyuubi.KyuubiFunSuite +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.engine.EngineScope.EngineScope + +class EngineAppNameSuite extends KyuubiFunSuite { + + private val kyuubiConf: KyuubiConf = KyuubiConf() + private val zkNamespace: String = "kyuubi" + private val user: String = "hive" + private val handle: String = "a9938028-667d-4006-993e-0bdb5a14ae91" + + override def beforeAll(): Unit = { + super.beforeAll() + } + + + test("SparkSQLEngineAppName") { + + // SESSION SCOPE + val sessionScopeAppName = "kyuubi_S_hive_a9938028-667d-4006-993e-0bdb5a14ae91" + val sessionScopeZkPath = "/kyuubi-engine/S/hive/a9938028-667d-4006-993e-0bdb5a14ae91" + checkAppNameAndZkPath(EngineScope.SESSION, sessionScopeAppName, sessionScopeZkPath) + + // USER SCOPE + val userScopeAppName = "kyuubi_U_hive_a9938028-667d-4006-993e-0bdb5a14ae91" + val userScopeZkPath = "/kyuubi-engine/U/hive" + checkAppNameAndZkPath(EngineScope.USER, userScopeAppName, userScopeZkPath) + + // GROUP SCOPE + val groupScopeAppName = "kyuubi_G_hive_a9938028-667d-4006-993e-0bdb5a14ae91" + val groupScopeZkPath = "/kyuubi-engine/G/hive" + checkAppNameAndZkPath(EngineScope.GROUP, groupScopeAppName, groupScopeZkPath) + + // SERVER SCOPE + val serverScopeAppName = "kyuubi_K_hive_a9938028-667d-4006-993e-0bdb5a14ae91" + val serverScopeZkPath = "/kyuubi-engine/K/hive" + checkAppNameAndZkPath(EngineScope.SERVER, serverScopeAppName, serverScopeZkPath) + + } + + private def checkAppNameAndZkPath(scope: EngineScope, + expectAppName: String, expectZkPath: String): Unit = { + kyuubiConf.set(KyuubiConf.ENGINE_SCOPE, scope.toString) + val engine = EngineAppName(user, handle, kyuubiConf.getUserDefaults(user)) + assert(engine.generateAppName() === expectAppName) + assert(engine.makeZkPath(zkNamespace) === expectZkPath) + val zkPath = EngineAppName.parseAppName(expectAppName, kyuubiConf).makeZkPath(zkNamespace) + assert(zkPath === expectZkPath) + } + + +} diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index 26df13798..eef0a62b1 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -32,6 +32,7 @@ import org.apache.thrift.transport.{TSocket, TTransport} import org.apache.kyuubi._ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ +import org.apache.kyuubi.engine.EngineAppName import org.apache.kyuubi.engine.spark.SparkProcessBuilder import org.apache.kyuubi.ha.client.ServiceDiscovery import org.apache.kyuubi.service.authentication.PlainSASLHelper @@ -54,10 +55,12 @@ class KyuubiSessionImpl( case ("use:database", _) => case (key, value) => sessionConf.set(key, value) } + sessionConf.set(EngineAppName.SPARK_APP_NAME_KEY, engineAppName.generateAppName()) } + private val engineAppName = EngineAppName(user, handle.identifier.toString, sessionConf) private val timeout: Long = sessionConf.get(ENGINE_INIT_TIMEOUT) - private val zkNamespace = s"$zkNamespacePrefix-$user" + private val zkNamespace = engineAppName.makeZkPath(zkNamespacePrefix) private val zkPath = ZKPaths.makePath(null, zkNamespace) private lazy val zkClient = ServiceDiscovery.startZookeeperClient(sessionConf)