diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index ae759de07..a683447b8 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -181,6 +181,7 @@ kyuubi\.session\.engine
\.check\.interval|
PT30M
|engine timeout, the engine will self-terminate when it's not accessed for this duration
|1.0.0
kyuubi\.session\.engine
\.initialize\.timeout|PT1M
|Timeout for starting the background engine, e.g. SparkSQLEngine.
|1.0.0
kyuubi\.session\.engine
\.login\.timeout|PT15S
|The timeout(ms) of creating the connection to remote sql query engine
|1.0.0
+kyuubi\.session\.engine
\.share\.level|USER
|The SQL engine App will be shared in different levels, available configs are:
- CONNECTION: the App will not be shared but only used by the current client connection
- USER: the App will be shared by all sessions created by a unique username
- GROUP: the App will be shared within a certain group (NOT YET)
- SERVER: the App will be shared by Kyuubi servers
|1.0.0
kyuubi\.session\.engine
\.spark\.main\.resource|<undefined>
|The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default
|1.0.0
kyuubi\.session
\.timeout|PT6H
|session timeout, it will be closed when it's not accessed for this duration
|1.0.0
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index e4ed0ed01..76311076d 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.{EngineAppName, EngineScope}
import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{RetryPolicies, ServiceDiscovery}
@@ -39,11 +38,6 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
override private[kyuubi] val backendService = new SparkSQLBackendService(spark)
- val appName: EngineAppName = EngineAppName.parseAppName(
- spark.conf.get(EngineAppName.SPARK_APP_NAME_KEY),
- SparkSQLEngine.kyuubiConf
- )
-
override protected def stopServer(): Unit = {
countDownLatch.countDown()
spark.stop()
@@ -64,8 +58,7 @@ object SparkSQLEngine extends Logging {
sparkConf.setIfMissing("spark.ui.port", "0")
val appName = s"kyuubi_${user}_spark_${Instant.now}"
-
- sparkConf.setIfMissing(EngineAppName.SPARK_APP_NAME_KEY, appName)
+ sparkConf.setIfMissing("spark.app.name", appName)
kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_BIND_PORT, 0)
kyuubiConf.setIfMissing(HA_ZK_CONN_RETRY_POLICY, RetryPolicies.N_TIME.toString)
@@ -98,23 +91,11 @@ object SparkSQLEngine extends Logging {
def exposeEngine(engine: SparkSQLEngine): Unit = {
val needExpose = kyuubiConf.get(HA_ZK_QUORUM).nonEmpty
if (needExpose) {
- val zkNamespacePrefix = kyuubiConf.get(HA_ZK_NAMESPACE)
- val zkNamespace = engine.appName.makeZkPath(zkNamespacePrefix)
- val serviceDiscovery = new ServiceDiscovery(engine, zkNamespace.substring(1))
+ val zkNamespace = kyuubiConf.get(HA_ZK_NAMESPACE)
+ val serviceDiscovery = new ServiceDiscovery(engine, zkNamespace)
serviceDiscovery.initialize(kyuubiConf)
serviceDiscovery.start()
- sys.addShutdownHook({
- serviceDiscovery.stop()
- if (EngineScope.SESSION.equals(engine.appName.getEngineScope)) {
- val zkClient = ServiceDiscovery.startZookeeperClient(kyuubiConf)
- try {
- info(s"Deleting engine service's namespace: $zkNamespace")
- zkClient.delete().forPath(zkNamespace)
- } finally {
- zkClient.close()
- }
- }
- })
+ sys.addShutdownHook(serviceDiscovery.stop())
}
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index f7fab1516..481df3d8f 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import org.apache.kyuubi.{Logging, Utils}
-import org.apache.kyuubi.engine.EngineScope
+import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.service.authentication.{AuthTypes, SaslQOP}
case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
@@ -473,15 +473,16 @@ object KyuubiConf {
.timeConf
.createWithDefault(Duration.ofSeconds(5).toMillis)
- val ENGINE_SCOPE: ConfigEntry[String] = buildConf("session.engine.scope")
- .doc("The engine session scope." +
- " - S: One engine per kyuubi session in kyuubi cluster.
" +
- " - U: One engine per user in kyuubi cluster.
" +
- " - G: One engine per group in kyuubi cluster.
" +
- " - K: One engine per kyuubi server in kyuubi cluster.
")
+ val ENGINE_SHARED_LEVEL: ConfigEntry[String] = buildConf("session.engine.share.level")
+ .doc("The SQL engine App will be shared in different levels, available configs are: " +
+ " - CONNECTION: the App will not be shared but only used by the current client" +
+ " connection
" +
+ " - USER: the App will be shared by all sessions created by a unique username
" +
+ " - GROUP: the App will be shared within a certain group (NOT YET)
" +
+ " - SERVER: the App will be shared by Kyuubi servers
")
.version("1.0.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
- .checkValues(EngineScope.values.map(_.toString))
- .createWithDefault(EngineScope.USER.toString)
+ .checkValues(ShareLevel.values.map(_.toString))
+ .createWithDefault(ShareLevel.USER.toString)
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineAppName.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineAppName.scala
deleted file mode 100644
index 254345386..000000000
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineAppName.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine
-
-import org.apache.curator.utils.ZKPaths
-
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_SCOPE
-import org.apache.kyuubi.engine.EngineScope.{EngineScope, SESSION}
-
-class EngineAppName(user: String, sessionId: String, conf: KyuubiConf) {
-
- import EngineAppName._
-
- private val engineScope = EngineScope.withName(conf.get(ENGINE_SCOPE))
-
- def getEngineScope: EngineScope = engineScope
-
- /**
- * kyuubi_[KGUS]_user_sessionid
- *
- * @return engine app name
- */
- def generateAppName(): String = {
- StringBuilder.newBuilder.append(APP_NAME_PREFIX)
- .append(DELIMITER).append(engineScope)
- .append(DELIMITER).append(user)
- .append(DELIMITER).append(sessionId).mkString
- }
-
- /**
- * if engine scope is [S] return
- * /[zkNamespace]-engine/S/[user]/[sessionId]
- * else return
- * /[zkNamespace]-engine/[engineScope]/[user]
- *
- * @param zkNamespace zk root path
- * @return engine zk path
- */
- def makeZkPath(zkNamespace: String): String = {
- val namespace = zkNamespace + "-" + ZK_NAMESPACE_SUFFIX
- engineScope match {
- case SESSION =>
- ZKPaths.makePath(namespace, engineScope.toString, user, sessionId)
- case _ =>
- ZKPaths.makePath(namespace, engineScope.toString, user)
- }
- }
-
-}
-
-object EngineAppName {
-
- private val APP_NAME_PREFIX = "kyuubi"
-
- private val ZK_NAMESPACE_SUFFIX = "engine"
-
- private val DELIMITER = "_"
-
- val SPARK_APP_NAME_KEY = "spark.app.name"
-
- def apply(user: String, sessionId: String, conf: KyuubiConf): EngineAppName =
- new EngineAppName(user, sessionId, conf)
-
- def parseAppName(appName: String, conf: KyuubiConf): EngineAppName = {
- val params = appName.split(DELIMITER)
- val clone = conf.clone
- clone.set(ENGINE_SCOPE, params(1))
- EngineAppName(params(2), params(3), conf)
- }
-
-}
-
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/ShareLevel.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/ShareLevel.scala
new file mode 100644
index 000000000..c3718cc56
--- /dev/null
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/ShareLevel.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine
+
+/**
+ * A SQL Engine APP will be shared by different levels
+ */
+object ShareLevel extends Enumeration {
+ type ShareLevel = Value
+ val
+ /** In this level, An APP will not be shared and used only for a single session */
+ CONNECTION,
+ /** DEFAULT level, An APP will be shared for all sessions created by a user */
+ USER,
+ /** In this level, An APP will not be shared in a queue or namespace */
+ GROUP,
+ /** In this level, All sessions from one or more Kyuubi server's will share one single APP */
+ SERVER = Value
+}
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/engine/EngineAppNameSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/engine/EngineAppNameSuite.scala
deleted file mode 100644
index 838be9555..000000000
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/engine/EngineAppNameSuite.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine
-
-import org.apache.kyuubi.KyuubiFunSuite
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.EngineScope.EngineScope
-
-class EngineAppNameSuite extends KyuubiFunSuite {
-
- private val kyuubiConf: KyuubiConf = KyuubiConf()
- private val zkNamespace: String = "kyuubi"
- private val user: String = "hive"
- private val handle: String = "a9938028-667d-4006-993e-0bdb5a14ae91"
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- }
-
-
- test("SparkSQLEngineAppName") {
-
- // SESSION SCOPE
- val sessionScopeAppName = "kyuubi_S_hive_a9938028-667d-4006-993e-0bdb5a14ae91"
- val sessionScopeZkPath = "/kyuubi-engine/S/hive/a9938028-667d-4006-993e-0bdb5a14ae91"
- checkAppNameAndZkPath(EngineScope.SESSION, sessionScopeAppName, sessionScopeZkPath)
-
- // USER SCOPE
- val userScopeAppName = "kyuubi_U_hive_a9938028-667d-4006-993e-0bdb5a14ae91"
- val userScopeZkPath = "/kyuubi-engine/U/hive"
- checkAppNameAndZkPath(EngineScope.USER, userScopeAppName, userScopeZkPath)
-
- // GROUP SCOPE
- val groupScopeAppName = "kyuubi_G_hive_a9938028-667d-4006-993e-0bdb5a14ae91"
- val groupScopeZkPath = "/kyuubi-engine/G/hive"
- checkAppNameAndZkPath(EngineScope.GROUP, groupScopeAppName, groupScopeZkPath)
-
- // SERVER SCOPE
- val serverScopeAppName = "kyuubi_K_hive_a9938028-667d-4006-993e-0bdb5a14ae91"
- val serverScopeZkPath = "/kyuubi-engine/K/hive"
- checkAppNameAndZkPath(EngineScope.SERVER, serverScopeAppName, serverScopeZkPath)
-
- }
-
- private def checkAppNameAndZkPath(scope: EngineScope,
- expectAppName: String, expectZkPath: String): Unit = {
- kyuubiConf.set(KyuubiConf.ENGINE_SCOPE, scope.toString)
- val engine = EngineAppName(user, handle, kyuubiConf.getUserDefaults(user))
- assert(engine.generateAppName() === expectAppName)
- assert(engine.makeZkPath(zkNamespace) === expectZkPath)
- val zkPath = EngineAppName.parseAppName(expectAppName, kyuubiConf).makeZkPath(zkNamespace)
- assert(zkPath === expectZkPath)
- }
-
-
-}
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTests.scala
index 66bc13415..ec76dea1c 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTests.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTests.scala
@@ -28,7 +28,7 @@ import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
trait JDBCTests extends KyuubiFunSuite {
protected val dftSchema = "default"
- protected val user: String = System.getProperty("user.name")
+ protected val user: String = Utils.currentUser
protected def jdbcUrl: String
protected def withMultipleConnectionJdbcStatement(
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
index 34948cb4c..1260c4271 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
@@ -28,6 +28,7 @@ import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode
import org.apache.curator.framework.state.{ConnectionState, ConnectionStateListener}
import org.apache.curator.framework.state.ConnectionState.{CONNECTED, LOST, RECONNECTED}
import org.apache.curator.retry._
+import org.apache.curator.utils.ZKPaths
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.JaasConfiguration
import org.apache.zookeeper.{KeeperException, WatchedEvent, Watcher}
@@ -90,16 +91,18 @@ class ServiceDiscovery private (
})
zkClient.start()
+
+ val ns = ZKPaths.makePath(null, namespace)
try {
zkClient
.create()
.creatingParentsIfNeeded()
.withMode(PERSISTENT)
- .forPath(s"/$namespace")
+ .forPath(ns)
} catch {
case _: NodeExistsException => // do nothing
case e: KeeperException =>
- throw new KyuubiException(s"Failed to create namespace '/$namespace'", e)
+ throw new KyuubiException(s"Failed to create namespace '$ns'", e)
}
super.initialize(conf)
}
@@ -107,7 +110,9 @@ class ServiceDiscovery private (
override def start(): Unit = {
val instance = server.connectionUrl
- val pathPrefix = s"/$namespace/serviceUri=$instance;version=$KYUUBI_VERSION;sequence="
+ val pathPrefix = ZKPaths.makePath(
+ namespace,
+ s"serviceUri=$instance;version=$KYUUBI_VERSION;sequence=")
try {
serviceNode = new PersistentEphemeralNode(
zkClient,
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/SQLEngineAppName.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/SQLEngineAppName.scala
new file mode 100644
index 000000000..e51555705
--- /dev/null
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/SQLEngineAppName.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine
+
+import org.apache.curator.utils.ZKPaths
+
+import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, ShareLevel}
+import org.apache.kyuubi.session.SessionHandle
+
+case class SQLEngineAppName(
+ sharedLevel: ShareLevel, user: String, sessionId: String) {
+ override def toString: String = s"kyuubi_${sharedLevel}_${user}_$sessionId"
+
+ def getZkNamespace(prefix: String): String = {
+ sharedLevel match {
+ case CONNECTION => ZKPaths.makePath(s"${prefix}_$sharedLevel", user, sessionId)
+ case _ => ZKPaths.makePath(s"${prefix}_$sharedLevel", user)
+ }
+ }
+}
+
+private[kyuubi] object SQLEngineAppName {
+ def apply(sharedLevel: ShareLevel, user: String, handle: SessionHandle): SQLEngineAppName = {
+ SQLEngineAppName(sharedLevel, user, handle.identifier.toString)
+ }
+}
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 01cded4c2..213460b85 100644
--- a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -125,4 +125,5 @@ object SparkProcessBuilder {
private final val CONF = "--conf"
private final val CLASS = "--class"
private final val PROXY_USER = "--proxy-user"
+ final val APP_KEY = "spark.app.name"
}
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index eef0a62b1..aa3bd1724 100644
--- a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -24,16 +24,18 @@ import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import org.apache.curator.utils.ZKPaths
-import org.apache.hive.service.rpc.thrift._
+import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TOpenSessionReq, TProtocolVersion, TSessionHandle}
import org.apache.thrift.TException
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.{TSocket, TTransport}
-import org.apache.kyuubi._
+import org.apache.kyuubi.{KyuubiSQLException, ThriftUtils, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.engine.EngineAppName
+import org.apache.kyuubi.engine.{ShareLevel, SQLEngineAppName}
+import org.apache.kyuubi.engine.ShareLevel.{SERVER, ShareLevel}
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.ServiceDiscovery
import org.apache.kyuubi.service.authentication.PlainSASLHelper
@@ -44,25 +46,33 @@ class KyuubiSessionImpl(
ipAddress: String,
conf: Map[String, String],
sessionManager: KyuubiSessionManager,
- sessionConf: KyuubiConf,
- zkNamespacePrefix: String)
+ sessionConf: KyuubiConf)
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
- private def configureSession(): Unit = {
+ private def mergeConf(): Unit = {
conf.foreach {
case (HIVE_VAR_PREFIX(key), value) => sessionConf.set(key, value)
case (HIVE_CONF_PREFIX(key), value) => sessionConf.set(key, value)
case ("use:database", _) =>
case (key, value) => sessionConf.set(key, value)
}
- sessionConf.set(EngineAppName.SPARK_APP_NAME_KEY, engineAppName.generateAppName())
}
- private val engineAppName = EngineAppName(user, handle.identifier.toString, sessionConf)
- private val timeout: Long = sessionConf.get(ENGINE_INIT_TIMEOUT)
- private val zkNamespace = engineAppName.makeZkPath(zkNamespacePrefix)
- private val zkPath = ZKPaths.makePath(null, zkNamespace)
+ mergeConf()
+
+ private val shareLevel: ShareLevel = ShareLevel.withName(sessionConf.get(ENGINE_SHARED_LEVEL))
+
+ private val appUser: String = shareLevel match {
+ case SERVER => Utils.currentUser
+ case _ => user
+ }
+
+ private val boundAppName: SQLEngineAppName = SQLEngineAppName(shareLevel, appUser, handle)
+
+ private val appZkNamespace: String = boundAppName.getZkNamespace(sessionConf.get(HA_ZK_NAMESPACE))
+
private lazy val zkClient = ServiceDiscovery.startZookeeperClient(sessionConf)
+ private val timeout: Long = sessionConf.get(ENGINE_INIT_TIMEOUT)
private var transport: TTransport = _
private var client: TCLIService.Client = _
@@ -70,11 +80,11 @@ class KyuubiSessionImpl(
private def getServerHost: Option[(String, Int)] = {
try {
- val hosts = zkClient.getChildren.forPath(zkPath)
+ val hosts = zkClient.getChildren.forPath(appZkNamespace)
// TODO: use last one because to avoid touching some maybe-crashed engines
// We need a big improvement here.
hosts.asScala.lastOption.map { p =>
- val path = ZKPaths.makePath(null, zkNamespace, p)
+ val path = ZKPaths.makePath(appZkNamespace, p)
val hostPort = new String(zkClient.getData.forPath(path), StandardCharsets.UTF_8)
val strings = hostPort.split(":")
val host = strings.head
@@ -93,8 +103,9 @@ class KyuubiSessionImpl(
getServerHost match {
case Some((host, port)) => openSession(host, port)
case None =>
- configureSession()
- val builder = new SparkProcessBuilder(user, sessionConf.toSparkPrefixedConf)
+ sessionConf.set(SparkProcessBuilder.APP_KEY, boundAppName.toString)
+ sessionConf.set(HA_ZK_NAMESPACE, appZkNamespace.stripPrefix(ZKPaths.PATH_SEPARATOR))
+ val builder = new SparkProcessBuilder(appUser, sessionConf.toSparkPrefixedConf)
val process = builder.start
info(s"Launching SQL engine: $builder")
var sh = getServerHost
@@ -114,7 +125,7 @@ class KyuubiSessionImpl(
}
sh = getServerHost
}
- val Some((host, port)) = getServerHost
+ val Some((host, port)) = sh
openSession(host, port)
}
try {
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 0b91fe2b2..6674216be 100644
--- a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -21,7 +21,6 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.ServiceDiscovery
import org.apache.kyuubi.operation.KyuubiOperationManager
@@ -31,10 +30,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
val operationManager = new KyuubiOperationManager()
- private var zkNamespacePrefix: String = _
-
override def initialize(conf: KyuubiConf): Unit = {
- zkNamespacePrefix = conf.get(HA_ZK_NAMESPACE)
ServiceDiscovery.setUpZooKeeperAuth(conf)
super.initialize(conf)
}
@@ -55,8 +51,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
ipAddress,
conf,
this,
- this.getConf.getUserDefaults(user),
- zkNamespacePrefix)
+ this.getConf.getUserDefaults(user))
val handle = sessionImpl.handle
try {
sessionImpl.open()
diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/SQLEngineAppNameSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/SQLEngineAppNameSuite.scala
new file mode 100644
index 000000000..038a5297a
--- /dev/null
+++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/SQLEngineAppNameSuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine
+
+import org.apache.curator.utils.ZKPaths
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.session.SessionHandle
+
+class SQLEngineAppNameSuite extends KyuubiFunSuite {
+ import ShareLevel._
+
+ test("SESSION_LEVEL sql engine app name") {
+ val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10).identifier
+ val user = Utils.currentUser
+ val appName = SQLEngineAppName(CONNECTION, user, id.toString)
+ assert(appName.getZkNamespace("kyuubi") ===
+ ZKPaths.makePath(s"kyuubi_$CONNECTION", user, id.toString))
+ assert(appName.toString === s"kyuubi_${CONNECTION}_${user}_$id")
+ }
+
+ test("USER_LEVEL sql engine app name") {
+ val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10).identifier
+ val user = Utils.currentUser
+ val appName = SQLEngineAppName(USER, user, id.toString)
+ assert(appName.getZkNamespace("kyuubi") ===
+ ZKPaths.makePath(s"kyuubi_$USER", user))
+ assert(appName.toString === s"kyuubi_${USER}_${user}_$id")
+ }
+
+ test("QUEUE_LEVEL sql engine app name") {
+ val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10).identifier
+ val user = Utils.currentUser
+ val appName = SQLEngineAppName(GROUP, user, id.toString)
+ assert(appName.getZkNamespace("kyuubi") ===
+ ZKPaths.makePath(s"kyuubi_$GROUP", user))
+ assert(appName.toString === s"kyuubi_${GROUP}_${user}_$id")
+ }
+
+ test("SERVER_LEVEL sql engine app name") {
+ val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10).identifier
+ val user = Utils.currentUser
+ val appName = SQLEngineAppName(SERVER, user, id.toString)
+ assert(appName.getZkNamespace("kyuubi") ===
+ ZKPaths.makePath(s"kyuubi_$SERVER", user))
+ assert(appName.toString === s"kyuubi_${SERVER}_${user}_$id")
+ }
+}
diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
new file mode 100644
index 000000000..ed561e9e8
--- /dev/null
+++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import org.apache.kyuubi.config.KyuubiConf
+
+class KyuubiOperationPerConnectionSuite extends KyuubiOperationSuite {
+
+ override protected val conf: KyuubiConf = {
+ KyuubiConf().set(KyuubiConf.ENGINE_SHARED_LEVEL, "connection")
+ }
+
+}
diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerGroupSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerGroupSuite.scala
new file mode 100644
index 000000000..7bee2a741
--- /dev/null
+++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerGroupSuite.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import org.apache.kyuubi.config.KyuubiConf
+
+class KyuubiOperationPerGroupSuite extends KyuubiOperationSuite {
+
+ override protected val conf: KyuubiConf = {
+ KyuubiConf().set(KyuubiConf.ENGINE_SHARED_LEVEL, "group")
+ }
+}
diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerServerSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerServerSuite.scala
new file mode 100644
index 000000000..b7cd0b144
--- /dev/null
+++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerServerSuite.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import org.apache.kyuubi.config.KyuubiConf
+
+class KyuubiOperationPerServerSuite extends KyuubiOperationSuite {
+
+ override protected val conf: KyuubiConf = {
+ KyuubiConf().set(KyuubiConf.ENGINE_SHARED_LEVEL, "server")
+ }
+}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineScope.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
similarity index 75%
rename from kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineScope.scala
rename to kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
index b9c55f1fc..4fdf9638b 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineScope.scala
+++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
@@ -15,12 +15,13 @@
* limitations under the License.
*/
-package org.apache.kyuubi.engine
+package org.apache.kyuubi.operation
-object EngineScope extends Enumeration {
- type EngineScope = Value
- val SESSION = Value("S")
- val USER = Value("U")
- val GROUP = Value("G")
- val SERVER = Value("K")
+import org.apache.kyuubi.config.KyuubiConf
+
+class KyuubiOperationPerUserSuite extends KyuubiOperationSuite {
+
+ override protected val conf: KyuubiConf = {
+ KyuubiConf().set(KyuubiConf.ENGINE_SHARED_LEVEL, "user")
+ }
}
diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationSuite.scala
index b041b1fa8..90bc905b5 100644
--- a/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationSuite.scala
+++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationSuite.scala
@@ -17,20 +17,48 @@
package org.apache.kyuubi.operation
+import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.ha.HighAvailabilityConf._
+import org.apache.kyuubi.ha.server.EmbeddedZkServer
import org.apache.kyuubi.server.KyuubiServer
-class KyuubiOperationSuite extends JDBCTests {
+abstract class KyuubiOperationSuite extends JDBCTests {
- private val conf = KyuubiConf()
- .set(KyuubiConf.FRONTEND_BIND_PORT, 0)
- .set(KyuubiConf.ENGINE_CHECK_INTERVAL, 4000L)
- .set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 10000L)
+ protected val conf: KyuubiConf
- private val server: KyuubiServer = KyuubiServer.startServer(conf)
+ private var zkServer: EmbeddedZkServer = _
+ private var server: KyuubiServer = _
+
+ override def beforeAll(): Unit = {
+ zkServer = new EmbeddedZkServer()
+ conf.set(KyuubiConf.EMBEDDED_ZK_PORT, -1)
+ val zkData = Utils.createTempDir()
+ conf.set(KyuubiConf.EMBEDDED_ZK_TEMP_DIR, zkData.toString)
+ zkServer.initialize(conf)
+ zkServer.start()
+
+ conf.set(KyuubiConf.FRONTEND_BIND_PORT, 0)
+ conf.set(KyuubiConf.ENGINE_CHECK_INTERVAL, 4000L)
+ conf.set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 10000L)
+ conf.set(HA_ZK_QUORUM, zkServer.getConnectString)
+ conf.set(HA_ZK_ACL_ENABLED, false)
+
+ server = KyuubiServer.startServer(conf)
+ super.beforeAll()
+ }
override def afterAll(): Unit = {
- server.stop()
+
+ if (server != null) {
+ server.stop()
+ server = null
+ }
+
+ if (zkServer != null) {
+ zkServer.stop()
+ zkServer = null
+ }
super.afterAll()
}