diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index 81ad368ba..e4ed0ed01 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.SparkSession import org.apache.kyuubi.{Logging, Utils} import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.engine.{EngineAppName, EngineScope} import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch import org.apache.kyuubi.ha.HighAvailabilityConf._ import org.apache.kyuubi.ha.client.{RetryPolicies, ServiceDiscovery} @@ -38,6 +39,11 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession) override private[kyuubi] val backendService = new SparkSQLBackendService(spark) + val appName: EngineAppName = EngineAppName.parseAppName( + spark.conf.get(EngineAppName.SPARK_APP_NAME_KEY), + SparkSQLEngine.kyuubiConf + ) + override protected def stopServer(): Unit = { countDownLatch.countDown() spark.stop() @@ -59,7 +65,7 @@ object SparkSQLEngine extends Logging { val appName = s"kyuubi_${user}_spark_${Instant.now}" - sparkConf.setAppName(appName) + sparkConf.setIfMissing(EngineAppName.SPARK_APP_NAME_KEY, appName) kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_BIND_PORT, 0) kyuubiConf.setIfMissing(HA_ZK_CONN_RETRY_POLICY, RetryPolicies.N_TIME.toString) @@ -93,10 +99,22 @@ object SparkSQLEngine extends Logging { val needExpose = kyuubiConf.get(HA_ZK_QUORUM).nonEmpty if (needExpose) { val zkNamespacePrefix = kyuubiConf.get(HA_ZK_NAMESPACE) - val serviceDiscovery = new ServiceDiscovery(engine, s"$zkNamespacePrefix-$user") + val zkNamespace = engine.appName.makeZkPath(zkNamespacePrefix) + val serviceDiscovery = new ServiceDiscovery(engine, zkNamespace.substring(1)) serviceDiscovery.initialize(kyuubiConf) serviceDiscovery.start() - sys.addShutdownHook(serviceDiscovery.stop()) + sys.addShutdownHook({ + serviceDiscovery.stop() + if (EngineScope.SESSION.equals(engine.appName.getEngineScope)) { + val zkClient = ServiceDiscovery.startZookeeperClient(kyuubiConf) + try { + info(s"Deleting engine service's namespace: $zkNamespace") + zkClient.delete().forPath(zkNamespace) + } finally { + zkClient.close() + } + } + }) } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 6afef5fa5..f7fab1516 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import org.apache.kyuubi.{Logging, Utils} +import org.apache.kyuubi.engine.EngineScope import org.apache.kyuubi.service.authentication.{AuthTypes, SaslQOP} case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging { @@ -471,4 +472,16 @@ object KyuubiConf { .version("1.0.0") .timeConf .createWithDefault(Duration.ofSeconds(5).toMillis) + + val ENGINE_SCOPE: ConfigEntry[String] = buildConf("session.engine.scope") + .doc("The engine session scope.") + .version("1.0.0") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(EngineScope.values.map(_.toString)) + .createWithDefault(EngineScope.USER.toString) } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineAppName.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineAppName.scala new file mode 100644 index 000000000..254345386 --- /dev/null +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineAppName.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine + +import org.apache.curator.utils.ZKPaths + +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.ENGINE_SCOPE +import org.apache.kyuubi.engine.EngineScope.{EngineScope, SESSION} + +class EngineAppName(user: String, sessionId: String, conf: KyuubiConf) { + + import EngineAppName._ + + private val engineScope = EngineScope.withName(conf.get(ENGINE_SCOPE)) + + def getEngineScope: EngineScope = engineScope + + /** + * kyuubi_[KGUS]_user_sessionid + * + * @return engine app name + */ + def generateAppName(): String = { + StringBuilder.newBuilder.append(APP_NAME_PREFIX) + .append(DELIMITER).append(engineScope) + .append(DELIMITER).append(user) + .append(DELIMITER).append(sessionId).mkString + } + + /** + * if engine scope is [S] return + * /[zkNamespace]-engine/S/[user]/[sessionId] + * else return + * /[zkNamespace]-engine/[engineScope]/[user] + * + * @param zkNamespace zk root path + * @return engine zk path + */ + def makeZkPath(zkNamespace: String): String = { + val namespace = zkNamespace + "-" + ZK_NAMESPACE_SUFFIX + engineScope match { + case SESSION => + ZKPaths.makePath(namespace, engineScope.toString, user, sessionId) + case _ => + ZKPaths.makePath(namespace, engineScope.toString, user) + } + } + +} + +object EngineAppName { + + private val APP_NAME_PREFIX = "kyuubi" + + private val ZK_NAMESPACE_SUFFIX = "engine" + + private val DELIMITER = "_" + + val SPARK_APP_NAME_KEY = "spark.app.name" + + def apply(user: String, sessionId: String, conf: KyuubiConf): EngineAppName = + new EngineAppName(user, sessionId, conf) + + def parseAppName(appName: String, conf: KyuubiConf): EngineAppName = { + val params = appName.split(DELIMITER) + val clone = conf.clone + clone.set(ENGINE_SCOPE, params(1)) + EngineAppName(params(2), params(3), conf) + } + +} + diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineScope.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineScope.scala new file mode 100644 index 000000000..b9c55f1fc --- /dev/null +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineScope.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine + +object EngineScope extends Enumeration { + type EngineScope = Value + val SESSION = Value("S") + val USER = Value("U") + val GROUP = Value("G") + val SERVER = Value("K") +} diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/engine/EngineAppNameSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/engine/EngineAppNameSuite.scala new file mode 100644 index 000000000..838be9555 --- /dev/null +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/engine/EngineAppNameSuite.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine + +import org.apache.kyuubi.KyuubiFunSuite +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.engine.EngineScope.EngineScope + +class EngineAppNameSuite extends KyuubiFunSuite { + + private val kyuubiConf: KyuubiConf = KyuubiConf() + private val zkNamespace: String = "kyuubi" + private val user: String = "hive" + private val handle: String = "a9938028-667d-4006-993e-0bdb5a14ae91" + + override def beforeAll(): Unit = { + super.beforeAll() + } + + + test("SparkSQLEngineAppName") { + + // SESSION SCOPE + val sessionScopeAppName = "kyuubi_S_hive_a9938028-667d-4006-993e-0bdb5a14ae91" + val sessionScopeZkPath = "/kyuubi-engine/S/hive/a9938028-667d-4006-993e-0bdb5a14ae91" + checkAppNameAndZkPath(EngineScope.SESSION, sessionScopeAppName, sessionScopeZkPath) + + // USER SCOPE + val userScopeAppName = "kyuubi_U_hive_a9938028-667d-4006-993e-0bdb5a14ae91" + val userScopeZkPath = "/kyuubi-engine/U/hive" + checkAppNameAndZkPath(EngineScope.USER, userScopeAppName, userScopeZkPath) + + // GROUP SCOPE + val groupScopeAppName = "kyuubi_G_hive_a9938028-667d-4006-993e-0bdb5a14ae91" + val groupScopeZkPath = "/kyuubi-engine/G/hive" + checkAppNameAndZkPath(EngineScope.GROUP, groupScopeAppName, groupScopeZkPath) + + // SERVER SCOPE + val serverScopeAppName = "kyuubi_K_hive_a9938028-667d-4006-993e-0bdb5a14ae91" + val serverScopeZkPath = "/kyuubi-engine/K/hive" + checkAppNameAndZkPath(EngineScope.SERVER, serverScopeAppName, serverScopeZkPath) + + } + + private def checkAppNameAndZkPath(scope: EngineScope, + expectAppName: String, expectZkPath: String): Unit = { + kyuubiConf.set(KyuubiConf.ENGINE_SCOPE, scope.toString) + val engine = EngineAppName(user, handle, kyuubiConf.getUserDefaults(user)) + assert(engine.generateAppName() === expectAppName) + assert(engine.makeZkPath(zkNamespace) === expectZkPath) + val zkPath = EngineAppName.parseAppName(expectAppName, kyuubiConf).makeZkPath(zkNamespace) + assert(zkPath === expectZkPath) + } + + +} diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index 26df13798..eef0a62b1 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -32,6 +32,7 @@ import org.apache.thrift.transport.{TSocket, TTransport} import org.apache.kyuubi._ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ +import org.apache.kyuubi.engine.EngineAppName import org.apache.kyuubi.engine.spark.SparkProcessBuilder import org.apache.kyuubi.ha.client.ServiceDiscovery import org.apache.kyuubi.service.authentication.PlainSASLHelper @@ -54,10 +55,12 @@ class KyuubiSessionImpl( case ("use:database", _) => case (key, value) => sessionConf.set(key, value) } + sessionConf.set(EngineAppName.SPARK_APP_NAME_KEY, engineAppName.generateAppName()) } + private val engineAppName = EngineAppName(user, handle.identifier.toString, sessionConf) private val timeout: Long = sessionConf.get(ENGINE_INIT_TIMEOUT) - private val zkNamespace = s"$zkNamespacePrefix-$user" + private val zkNamespace = engineAppName.makeZkPath(zkNamespacePrefix) private val zkPath = ZKPaths.makePath(null, zkNamespace) private lazy val zkClient = ServiceDiscovery.startZookeeperClient(sessionConf)