Improve Session Share Level
This commit is contained in:
parent
7a79b16cdf
commit
3ec0db6a40
@ -181,6 +181,7 @@ kyuubi\.session\.engine<br>\.check\.interval|<div style='width: 80pt;word-wrap:
|
||||
kyuubi\.session\.engine<br>\.idle\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT30M</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>engine timeout, the engine will self-terminate when it's not accessed for this duration</div>|<div style='width: 20pt'>1.0.0</div>
|
||||
kyuubi\.session\.engine<br>\.initialize\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Timeout for starting the background engine, e.g. SparkSQLEngine.</div>|<div style='width: 20pt'>1.0.0</div>
|
||||
kyuubi\.session\.engine<br>\.login\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT15S</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>The timeout(ms) of creating the connection to remote sql query engine</div>|<div style='width: 20pt'>1.0.0</div>
|
||||
kyuubi\.session\.engine<br>\.share\.level|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>USER</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>The SQL engine App will be shared in different levels, available configs are: <ul> <li>CONNECTION: the App will not be shared but only used by the current client connection</li> <li>USER: the App will be shared by all sessions created by a unique username</li> <li>GROUP: the App will be shared within a certain group (NOT YET)</li> <li>SERVER: the App will be shared by Kyuubi servers</li></ul></div>|<div style='width: 20pt'>1.0.0</div>
|
||||
kyuubi\.session\.engine<br>\.spark\.main\.resource|<div style='width: 80pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default</div>|<div style='width: 20pt'>1.0.0</div>
|
||||
kyuubi\.session<br>\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT6H</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>session timeout, it will be closed when it's not accessed for this duration</div>|<div style='width: 20pt'>1.0.0</div>
|
||||
|
||||
|
||||
@ -25,7 +25,6 @@ import org.apache.spark.sql.SparkSession
|
||||
|
||||
import org.apache.kyuubi.{Logging, Utils}
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.engine.{EngineAppName, EngineScope}
|
||||
import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
|
||||
import org.apache.kyuubi.ha.HighAvailabilityConf._
|
||||
import org.apache.kyuubi.ha.client.{RetryPolicies, ServiceDiscovery}
|
||||
@ -39,11 +38,6 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
|
||||
|
||||
override private[kyuubi] val backendService = new SparkSQLBackendService(spark)
|
||||
|
||||
val appName: EngineAppName = EngineAppName.parseAppName(
|
||||
spark.conf.get(EngineAppName.SPARK_APP_NAME_KEY),
|
||||
SparkSQLEngine.kyuubiConf
|
||||
)
|
||||
|
||||
override protected def stopServer(): Unit = {
|
||||
countDownLatch.countDown()
|
||||
spark.stop()
|
||||
@ -64,8 +58,7 @@ object SparkSQLEngine extends Logging {
|
||||
sparkConf.setIfMissing("spark.ui.port", "0")
|
||||
|
||||
val appName = s"kyuubi_${user}_spark_${Instant.now}"
|
||||
|
||||
sparkConf.setIfMissing(EngineAppName.SPARK_APP_NAME_KEY, appName)
|
||||
sparkConf.setIfMissing("spark.app.name", appName)
|
||||
|
||||
kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_BIND_PORT, 0)
|
||||
kyuubiConf.setIfMissing(HA_ZK_CONN_RETRY_POLICY, RetryPolicies.N_TIME.toString)
|
||||
@ -98,23 +91,11 @@ object SparkSQLEngine extends Logging {
|
||||
def exposeEngine(engine: SparkSQLEngine): Unit = {
|
||||
val needExpose = kyuubiConf.get(HA_ZK_QUORUM).nonEmpty
|
||||
if (needExpose) {
|
||||
val zkNamespacePrefix = kyuubiConf.get(HA_ZK_NAMESPACE)
|
||||
val zkNamespace = engine.appName.makeZkPath(zkNamespacePrefix)
|
||||
val serviceDiscovery = new ServiceDiscovery(engine, zkNamespace.substring(1))
|
||||
val zkNamespace = kyuubiConf.get(HA_ZK_NAMESPACE)
|
||||
val serviceDiscovery = new ServiceDiscovery(engine, zkNamespace)
|
||||
serviceDiscovery.initialize(kyuubiConf)
|
||||
serviceDiscovery.start()
|
||||
sys.addShutdownHook({
|
||||
serviceDiscovery.stop()
|
||||
if (EngineScope.SESSION.equals(engine.appName.getEngineScope)) {
|
||||
val zkClient = ServiceDiscovery.startZookeeperClient(kyuubiConf)
|
||||
try {
|
||||
info(s"Deleting engine service's namespace: $zkNamespace")
|
||||
zkClient.delete().forPath(zkNamespace)
|
||||
} finally {
|
||||
zkClient.close()
|
||||
}
|
||||
}
|
||||
})
|
||||
sys.addShutdownHook(serviceDiscovery.stop())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -24,7 +24,7 @@ import java.util.concurrent.ConcurrentHashMap
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.kyuubi.{Logging, Utils}
|
||||
import org.apache.kyuubi.engine.EngineScope
|
||||
import org.apache.kyuubi.engine.ShareLevel
|
||||
import org.apache.kyuubi.service.authentication.{AuthTypes, SaslQOP}
|
||||
|
||||
case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
|
||||
@ -473,15 +473,16 @@ object KyuubiConf {
|
||||
.timeConf
|
||||
.createWithDefault(Duration.ofSeconds(5).toMillis)
|
||||
|
||||
val ENGINE_SCOPE: ConfigEntry[String] = buildConf("session.engine.scope")
|
||||
.doc("The engine session scope.<ul>" +
|
||||
" <li>S: One engine per kyuubi session in kyuubi cluster.</li>" +
|
||||
" <li>U: One engine per user in kyuubi cluster.</li>" +
|
||||
" <li>G: One engine per group in kyuubi cluster.</li>" +
|
||||
" <li>K: One engine per kyuubi server in kyuubi cluster.</li></ul>")
|
||||
val ENGINE_SHARED_LEVEL: ConfigEntry[String] = buildConf("session.engine.share.level")
|
||||
.doc("The SQL engine App will be shared in different levels, available configs are: <ul>" +
|
||||
" <li>CONNECTION: the App will not be shared but only used by the current client" +
|
||||
" connection</li>" +
|
||||
" <li>USER: the App will be shared by all sessions created by a unique username</li>" +
|
||||
" <li>GROUP: the App will be shared within a certain group (NOT YET)</li>" +
|
||||
" <li>SERVER: the App will be shared by Kyuubi servers</li></ul>")
|
||||
.version("1.0.0")
|
||||
.stringConf
|
||||
.transform(_.toUpperCase(Locale.ROOT))
|
||||
.checkValues(EngineScope.values.map(_.toString))
|
||||
.createWithDefault(EngineScope.USER.toString)
|
||||
.checkValues(ShareLevel.values.map(_.toString))
|
||||
.createWithDefault(ShareLevel.USER.toString)
|
||||
}
|
||||
|
||||
@ -1,88 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.engine
|
||||
|
||||
import org.apache.curator.utils.ZKPaths
|
||||
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SCOPE
|
||||
import org.apache.kyuubi.engine.EngineScope.{EngineScope, SESSION}
|
||||
|
||||
class EngineAppName(user: String, sessionId: String, conf: KyuubiConf) {
|
||||
|
||||
import EngineAppName._
|
||||
|
||||
private val engineScope = EngineScope.withName(conf.get(ENGINE_SCOPE))
|
||||
|
||||
def getEngineScope: EngineScope = engineScope
|
||||
|
||||
/**
|
||||
* kyuubi_[KGUS]_user_sessionid
|
||||
*
|
||||
* @return engine app name
|
||||
*/
|
||||
def generateAppName(): String = {
|
||||
StringBuilder.newBuilder.append(APP_NAME_PREFIX)
|
||||
.append(DELIMITER).append(engineScope)
|
||||
.append(DELIMITER).append(user)
|
||||
.append(DELIMITER).append(sessionId).mkString
|
||||
}
|
||||
|
||||
/**
|
||||
* if engine scope is [S] return
|
||||
* /[zkNamespace]-engine/S/[user]/[sessionId]
|
||||
* else return
|
||||
* /[zkNamespace]-engine/[engineScope]/[user]
|
||||
*
|
||||
* @param zkNamespace zk root path
|
||||
* @return engine zk path
|
||||
*/
|
||||
def makeZkPath(zkNamespace: String): String = {
|
||||
val namespace = zkNamespace + "-" + ZK_NAMESPACE_SUFFIX
|
||||
engineScope match {
|
||||
case SESSION =>
|
||||
ZKPaths.makePath(namespace, engineScope.toString, user, sessionId)
|
||||
case _ =>
|
||||
ZKPaths.makePath(namespace, engineScope.toString, user)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object EngineAppName {
|
||||
|
||||
private val APP_NAME_PREFIX = "kyuubi"
|
||||
|
||||
private val ZK_NAMESPACE_SUFFIX = "engine"
|
||||
|
||||
private val DELIMITER = "_"
|
||||
|
||||
val SPARK_APP_NAME_KEY = "spark.app.name"
|
||||
|
||||
def apply(user: String, sessionId: String, conf: KyuubiConf): EngineAppName =
|
||||
new EngineAppName(user, sessionId, conf)
|
||||
|
||||
def parseAppName(appName: String, conf: KyuubiConf): EngineAppName = {
|
||||
val params = appName.split(DELIMITER)
|
||||
val clone = conf.clone
|
||||
clone.set(ENGINE_SCOPE, params(1))
|
||||
EngineAppName(params(2), params(3), conf)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.engine
|
||||
|
||||
/**
|
||||
* A SQL Engine APP will be shared by different levels
|
||||
*/
|
||||
object ShareLevel extends Enumeration {
|
||||
type ShareLevel = Value
|
||||
val
|
||||
/** In this level, An APP will not be shared and used only for a single session */
|
||||
CONNECTION,
|
||||
/** DEFAULT level, An APP will be shared for all sessions created by a user */
|
||||
USER,
|
||||
/** In this level, An APP will not be shared in a queue or namespace */
|
||||
GROUP,
|
||||
/** In this level, All sessions from one or more Kyuubi server's will share one single APP */
|
||||
SERVER = Value
|
||||
}
|
||||
@ -1,71 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.engine
|
||||
|
||||
import org.apache.kyuubi.KyuubiFunSuite
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.engine.EngineScope.EngineScope
|
||||
|
||||
class EngineAppNameSuite extends KyuubiFunSuite {
|
||||
|
||||
private val kyuubiConf: KyuubiConf = KyuubiConf()
|
||||
private val zkNamespace: String = "kyuubi"
|
||||
private val user: String = "hive"
|
||||
private val handle: String = "a9938028-667d-4006-993e-0bdb5a14ae91"
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
}
|
||||
|
||||
|
||||
test("SparkSQLEngineAppName") {
|
||||
|
||||
// SESSION SCOPE
|
||||
val sessionScopeAppName = "kyuubi_S_hive_a9938028-667d-4006-993e-0bdb5a14ae91"
|
||||
val sessionScopeZkPath = "/kyuubi-engine/S/hive/a9938028-667d-4006-993e-0bdb5a14ae91"
|
||||
checkAppNameAndZkPath(EngineScope.SESSION, sessionScopeAppName, sessionScopeZkPath)
|
||||
|
||||
// USER SCOPE
|
||||
val userScopeAppName = "kyuubi_U_hive_a9938028-667d-4006-993e-0bdb5a14ae91"
|
||||
val userScopeZkPath = "/kyuubi-engine/U/hive"
|
||||
checkAppNameAndZkPath(EngineScope.USER, userScopeAppName, userScopeZkPath)
|
||||
|
||||
// GROUP SCOPE
|
||||
val groupScopeAppName = "kyuubi_G_hive_a9938028-667d-4006-993e-0bdb5a14ae91"
|
||||
val groupScopeZkPath = "/kyuubi-engine/G/hive"
|
||||
checkAppNameAndZkPath(EngineScope.GROUP, groupScopeAppName, groupScopeZkPath)
|
||||
|
||||
// SERVER SCOPE
|
||||
val serverScopeAppName = "kyuubi_K_hive_a9938028-667d-4006-993e-0bdb5a14ae91"
|
||||
val serverScopeZkPath = "/kyuubi-engine/K/hive"
|
||||
checkAppNameAndZkPath(EngineScope.SERVER, serverScopeAppName, serverScopeZkPath)
|
||||
|
||||
}
|
||||
|
||||
private def checkAppNameAndZkPath(scope: EngineScope,
|
||||
expectAppName: String, expectZkPath: String): Unit = {
|
||||
kyuubiConf.set(KyuubiConf.ENGINE_SCOPE, scope.toString)
|
||||
val engine = EngineAppName(user, handle, kyuubiConf.getUserDefaults(user))
|
||||
assert(engine.generateAppName() === expectAppName)
|
||||
assert(engine.makeZkPath(zkNamespace) === expectZkPath)
|
||||
val zkPath = EngineAppName.parseAppName(expectAppName, kyuubiConf).makeZkPath(zkNamespace)
|
||||
assert(zkPath === expectZkPath)
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@ -28,7 +28,7 @@ import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
|
||||
trait JDBCTests extends KyuubiFunSuite {
|
||||
|
||||
protected val dftSchema = "default"
|
||||
protected val user: String = System.getProperty("user.name")
|
||||
protected val user: String = Utils.currentUser
|
||||
protected def jdbcUrl: String
|
||||
|
||||
protected def withMultipleConnectionJdbcStatement(
|
||||
|
||||
@ -28,6 +28,7 @@ import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode
|
||||
import org.apache.curator.framework.state.{ConnectionState, ConnectionStateListener}
|
||||
import org.apache.curator.framework.state.ConnectionState.{CONNECTED, LOST, RECONNECTED}
|
||||
import org.apache.curator.retry._
|
||||
import org.apache.curator.utils.ZKPaths
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.JaasConfiguration
|
||||
import org.apache.zookeeper.{KeeperException, WatchedEvent, Watcher}
|
||||
@ -90,16 +91,18 @@ class ServiceDiscovery private (
|
||||
})
|
||||
zkClient.start()
|
||||
|
||||
|
||||
val ns = ZKPaths.makePath(null, namespace)
|
||||
try {
|
||||
zkClient
|
||||
.create()
|
||||
.creatingParentsIfNeeded()
|
||||
.withMode(PERSISTENT)
|
||||
.forPath(s"/$namespace")
|
||||
.forPath(ns)
|
||||
} catch {
|
||||
case _: NodeExistsException => // do nothing
|
||||
case e: KeeperException =>
|
||||
throw new KyuubiException(s"Failed to create namespace '/$namespace'", e)
|
||||
throw new KyuubiException(s"Failed to create namespace '$ns'", e)
|
||||
}
|
||||
super.initialize(conf)
|
||||
}
|
||||
@ -107,7 +110,9 @@ class ServiceDiscovery private (
|
||||
|
||||
override def start(): Unit = {
|
||||
val instance = server.connectionUrl
|
||||
val pathPrefix = s"/$namespace/serviceUri=$instance;version=$KYUUBI_VERSION;sequence="
|
||||
val pathPrefix = ZKPaths.makePath(
|
||||
namespace,
|
||||
s"serviceUri=$instance;version=$KYUUBI_VERSION;sequence=")
|
||||
try {
|
||||
serviceNode = new PersistentEphemeralNode(
|
||||
zkClient,
|
||||
|
||||
@ -0,0 +1,41 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.engine
|
||||
|
||||
import org.apache.curator.utils.ZKPaths
|
||||
|
||||
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, ShareLevel}
|
||||
import org.apache.kyuubi.session.SessionHandle
|
||||
|
||||
case class SQLEngineAppName(
|
||||
sharedLevel: ShareLevel, user: String, sessionId: String) {
|
||||
override def toString: String = s"kyuubi_${sharedLevel}_${user}_$sessionId"
|
||||
|
||||
def getZkNamespace(prefix: String): String = {
|
||||
sharedLevel match {
|
||||
case CONNECTION => ZKPaths.makePath(s"${prefix}_$sharedLevel", user, sessionId)
|
||||
case _ => ZKPaths.makePath(s"${prefix}_$sharedLevel", user)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private[kyuubi] object SQLEngineAppName {
|
||||
def apply(sharedLevel: ShareLevel, user: String, handle: SessionHandle): SQLEngineAppName = {
|
||||
SQLEngineAppName(sharedLevel, user, handle.identifier.toString)
|
||||
}
|
||||
}
|
||||
@ -125,4 +125,5 @@ object SparkProcessBuilder {
|
||||
private final val CONF = "--conf"
|
||||
private final val CLASS = "--class"
|
||||
private final val PROXY_USER = "--proxy-user"
|
||||
final val APP_KEY = "spark.app.name"
|
||||
}
|
||||
|
||||
@ -24,16 +24,18 @@ import java.util.concurrent.TimeUnit
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.curator.utils.ZKPaths
|
||||
import org.apache.hive.service.rpc.thrift._
|
||||
import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TOpenSessionReq, TProtocolVersion, TSessionHandle}
|
||||
import org.apache.thrift.TException
|
||||
import org.apache.thrift.protocol.TBinaryProtocol
|
||||
import org.apache.thrift.transport.{TSocket, TTransport}
|
||||
|
||||
import org.apache.kyuubi._
|
||||
import org.apache.kyuubi.{KyuubiSQLException, ThriftUtils, Utils}
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.KyuubiConf._
|
||||
import org.apache.kyuubi.engine.EngineAppName
|
||||
import org.apache.kyuubi.engine.{ShareLevel, SQLEngineAppName}
|
||||
import org.apache.kyuubi.engine.ShareLevel.{SERVER, ShareLevel}
|
||||
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
|
||||
import org.apache.kyuubi.ha.HighAvailabilityConf._
|
||||
import org.apache.kyuubi.ha.client.ServiceDiscovery
|
||||
import org.apache.kyuubi.service.authentication.PlainSASLHelper
|
||||
|
||||
@ -44,25 +46,33 @@ class KyuubiSessionImpl(
|
||||
ipAddress: String,
|
||||
conf: Map[String, String],
|
||||
sessionManager: KyuubiSessionManager,
|
||||
sessionConf: KyuubiConf,
|
||||
zkNamespacePrefix: String)
|
||||
sessionConf: KyuubiConf)
|
||||
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
|
||||
|
||||
private def configureSession(): Unit = {
|
||||
private def mergeConf(): Unit = {
|
||||
conf.foreach {
|
||||
case (HIVE_VAR_PREFIX(key), value) => sessionConf.set(key, value)
|
||||
case (HIVE_CONF_PREFIX(key), value) => sessionConf.set(key, value)
|
||||
case ("use:database", _) =>
|
||||
case (key, value) => sessionConf.set(key, value)
|
||||
}
|
||||
sessionConf.set(EngineAppName.SPARK_APP_NAME_KEY, engineAppName.generateAppName())
|
||||
}
|
||||
|
||||
private val engineAppName = EngineAppName(user, handle.identifier.toString, sessionConf)
|
||||
private val timeout: Long = sessionConf.get(ENGINE_INIT_TIMEOUT)
|
||||
private val zkNamespace = engineAppName.makeZkPath(zkNamespacePrefix)
|
||||
private val zkPath = ZKPaths.makePath(null, zkNamespace)
|
||||
mergeConf()
|
||||
|
||||
private val shareLevel: ShareLevel = ShareLevel.withName(sessionConf.get(ENGINE_SHARED_LEVEL))
|
||||
|
||||
private val appUser: String = shareLevel match {
|
||||
case SERVER => Utils.currentUser
|
||||
case _ => user
|
||||
}
|
||||
|
||||
private val boundAppName: SQLEngineAppName = SQLEngineAppName(shareLevel, appUser, handle)
|
||||
|
||||
private val appZkNamespace: String = boundAppName.getZkNamespace(sessionConf.get(HA_ZK_NAMESPACE))
|
||||
|
||||
private lazy val zkClient = ServiceDiscovery.startZookeeperClient(sessionConf)
|
||||
private val timeout: Long = sessionConf.get(ENGINE_INIT_TIMEOUT)
|
||||
|
||||
private var transport: TTransport = _
|
||||
private var client: TCLIService.Client = _
|
||||
@ -70,11 +80,11 @@ class KyuubiSessionImpl(
|
||||
|
||||
private def getServerHost: Option[(String, Int)] = {
|
||||
try {
|
||||
val hosts = zkClient.getChildren.forPath(zkPath)
|
||||
val hosts = zkClient.getChildren.forPath(appZkNamespace)
|
||||
// TODO: use last one because to avoid touching some maybe-crashed engines
|
||||
// We need a big improvement here.
|
||||
hosts.asScala.lastOption.map { p =>
|
||||
val path = ZKPaths.makePath(null, zkNamespace, p)
|
||||
val path = ZKPaths.makePath(appZkNamespace, p)
|
||||
val hostPort = new String(zkClient.getData.forPath(path), StandardCharsets.UTF_8)
|
||||
val strings = hostPort.split(":")
|
||||
val host = strings.head
|
||||
@ -93,8 +103,9 @@ class KyuubiSessionImpl(
|
||||
getServerHost match {
|
||||
case Some((host, port)) => openSession(host, port)
|
||||
case None =>
|
||||
configureSession()
|
||||
val builder = new SparkProcessBuilder(user, sessionConf.toSparkPrefixedConf)
|
||||
sessionConf.set(SparkProcessBuilder.APP_KEY, boundAppName.toString)
|
||||
sessionConf.set(HA_ZK_NAMESPACE, appZkNamespace.stripPrefix(ZKPaths.PATH_SEPARATOR))
|
||||
val builder = new SparkProcessBuilder(appUser, sessionConf.toSparkPrefixedConf)
|
||||
val process = builder.start
|
||||
info(s"Launching SQL engine: $builder")
|
||||
var sh = getServerHost
|
||||
@ -114,7 +125,7 @@ class KyuubiSessionImpl(
|
||||
}
|
||||
sh = getServerHost
|
||||
}
|
||||
val Some((host, port)) = getServerHost
|
||||
val Some((host, port)) = sh
|
||||
openSession(host, port)
|
||||
}
|
||||
try {
|
||||
|
||||
@ -21,7 +21,6 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
|
||||
|
||||
import org.apache.kyuubi.KyuubiSQLException
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.ha.HighAvailabilityConf._
|
||||
import org.apache.kyuubi.ha.client.ServiceDiscovery
|
||||
import org.apache.kyuubi.operation.KyuubiOperationManager
|
||||
|
||||
@ -31,10 +30,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
|
||||
|
||||
val operationManager = new KyuubiOperationManager()
|
||||
|
||||
private var zkNamespacePrefix: String = _
|
||||
|
||||
override def initialize(conf: KyuubiConf): Unit = {
|
||||
zkNamespacePrefix = conf.get(HA_ZK_NAMESPACE)
|
||||
ServiceDiscovery.setUpZooKeeperAuth(conf)
|
||||
super.initialize(conf)
|
||||
}
|
||||
@ -55,8 +51,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
|
||||
ipAddress,
|
||||
conf,
|
||||
this,
|
||||
this.getConf.getUserDefaults(user),
|
||||
zkNamespacePrefix)
|
||||
this.getConf.getUserDefaults(user))
|
||||
val handle = sessionImpl.handle
|
||||
try {
|
||||
sessionImpl.open()
|
||||
|
||||
@ -0,0 +1,64 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.engine
|
||||
|
||||
import org.apache.curator.utils.ZKPaths
|
||||
import org.apache.hive.service.rpc.thrift.TProtocolVersion
|
||||
|
||||
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
|
||||
import org.apache.kyuubi.session.SessionHandle
|
||||
|
||||
class SQLEngineAppNameSuite extends KyuubiFunSuite {
|
||||
import ShareLevel._
|
||||
|
||||
test("SESSION_LEVEL sql engine app name") {
|
||||
val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10).identifier
|
||||
val user = Utils.currentUser
|
||||
val appName = SQLEngineAppName(CONNECTION, user, id.toString)
|
||||
assert(appName.getZkNamespace("kyuubi") ===
|
||||
ZKPaths.makePath(s"kyuubi_$CONNECTION", user, id.toString))
|
||||
assert(appName.toString === s"kyuubi_${CONNECTION}_${user}_$id")
|
||||
}
|
||||
|
||||
test("USER_LEVEL sql engine app name") {
|
||||
val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10).identifier
|
||||
val user = Utils.currentUser
|
||||
val appName = SQLEngineAppName(USER, user, id.toString)
|
||||
assert(appName.getZkNamespace("kyuubi") ===
|
||||
ZKPaths.makePath(s"kyuubi_$USER", user))
|
||||
assert(appName.toString === s"kyuubi_${USER}_${user}_$id")
|
||||
}
|
||||
|
||||
test("QUEUE_LEVEL sql engine app name") {
|
||||
val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10).identifier
|
||||
val user = Utils.currentUser
|
||||
val appName = SQLEngineAppName(GROUP, user, id.toString)
|
||||
assert(appName.getZkNamespace("kyuubi") ===
|
||||
ZKPaths.makePath(s"kyuubi_$GROUP", user))
|
||||
assert(appName.toString === s"kyuubi_${GROUP}_${user}_$id")
|
||||
}
|
||||
|
||||
test("SERVER_LEVEL sql engine app name") {
|
||||
val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10).identifier
|
||||
val user = Utils.currentUser
|
||||
val appName = SQLEngineAppName(SERVER, user, id.toString)
|
||||
assert(appName.getZkNamespace("kyuubi") ===
|
||||
ZKPaths.makePath(s"kyuubi_$SERVER", user))
|
||||
assert(appName.toString === s"kyuubi_${SERVER}_${user}_$id")
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,28 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.operation
|
||||
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
|
||||
class KyuubiOperationPerConnectionSuite extends KyuubiOperationSuite {
|
||||
|
||||
override protected val conf: KyuubiConf = {
|
||||
KyuubiConf().set(KyuubiConf.ENGINE_SHARED_LEVEL, "connection")
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,27 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.operation
|
||||
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
|
||||
class KyuubiOperationPerGroupSuite extends KyuubiOperationSuite {
|
||||
|
||||
override protected val conf: KyuubiConf = {
|
||||
KyuubiConf().set(KyuubiConf.ENGINE_SHARED_LEVEL, "group")
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,27 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.operation
|
||||
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
|
||||
class KyuubiOperationPerServerSuite extends KyuubiOperationSuite {
|
||||
|
||||
override protected val conf: KyuubiConf = {
|
||||
KyuubiConf().set(KyuubiConf.ENGINE_SHARED_LEVEL, "server")
|
||||
}
|
||||
}
|
||||
@ -15,12 +15,13 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.engine
|
||||
package org.apache.kyuubi.operation
|
||||
|
||||
object EngineScope extends Enumeration {
|
||||
type EngineScope = Value
|
||||
val SESSION = Value("S")
|
||||
val USER = Value("U")
|
||||
val GROUP = Value("G")
|
||||
val SERVER = Value("K")
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
|
||||
class KyuubiOperationPerUserSuite extends KyuubiOperationSuite {
|
||||
|
||||
override protected val conf: KyuubiConf = {
|
||||
KyuubiConf().set(KyuubiConf.ENGINE_SHARED_LEVEL, "user")
|
||||
}
|
||||
}
|
||||
@ -17,20 +17,48 @@
|
||||
|
||||
package org.apache.kyuubi.operation
|
||||
|
||||
import org.apache.kyuubi.Utils
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.ha.HighAvailabilityConf._
|
||||
import org.apache.kyuubi.ha.server.EmbeddedZkServer
|
||||
import org.apache.kyuubi.server.KyuubiServer
|
||||
|
||||
class KyuubiOperationSuite extends JDBCTests {
|
||||
abstract class KyuubiOperationSuite extends JDBCTests {
|
||||
|
||||
private val conf = KyuubiConf()
|
||||
.set(KyuubiConf.FRONTEND_BIND_PORT, 0)
|
||||
.set(KyuubiConf.ENGINE_CHECK_INTERVAL, 4000L)
|
||||
.set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 10000L)
|
||||
protected val conf: KyuubiConf
|
||||
|
||||
private val server: KyuubiServer = KyuubiServer.startServer(conf)
|
||||
private var zkServer: EmbeddedZkServer = _
|
||||
private var server: KyuubiServer = _
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
zkServer = new EmbeddedZkServer()
|
||||
conf.set(KyuubiConf.EMBEDDED_ZK_PORT, -1)
|
||||
val zkData = Utils.createTempDir()
|
||||
conf.set(KyuubiConf.EMBEDDED_ZK_TEMP_DIR, zkData.toString)
|
||||
zkServer.initialize(conf)
|
||||
zkServer.start()
|
||||
|
||||
conf.set(KyuubiConf.FRONTEND_BIND_PORT, 0)
|
||||
conf.set(KyuubiConf.ENGINE_CHECK_INTERVAL, 4000L)
|
||||
conf.set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 10000L)
|
||||
conf.set(HA_ZK_QUORUM, zkServer.getConnectString)
|
||||
conf.set(HA_ZK_ACL_ENABLED, false)
|
||||
|
||||
server = KyuubiServer.startServer(conf)
|
||||
super.beforeAll()
|
||||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
server.stop()
|
||||
|
||||
if (server != null) {
|
||||
server.stop()
|
||||
server = null
|
||||
}
|
||||
|
||||
if (zkServer != null) {
|
||||
zkServer.stop()
|
||||
zkServer = null
|
||||
}
|
||||
super.afterAll()
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user