[KYUUBI #629] Refine distributed lock to lock only engine bootstrap

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/NetEase/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

1. refine the zk lock to lock only during engine bootstrapping, not wrap the part that only gets the address of engines. This can significantly reduce overhead.

2. add tests for the locking proccess.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

Closes #629 from yaooqinn/lock.

Closes #629

a1b4e5f1 [Kent Yao] Refine distributed lock to lock only engine bootstrap
a6738652 [Kent Yao] Refine distributed lock to lock only engine bootstrap
ff238e6f [Kent Yao] Refine distributed lock to lock only engine bootstrap
f3979fce [Kent Yao] Refine distributed lock to lock only engine bootstrap
e8ca9954 [Kent Yao] Refine distributed lock to lock only engine bootstrap

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Cheng Pan <379377944@qq.com>
This commit is contained in:
Kent Yao 2021-05-12 13:19:15 +08:00 committed by Cheng Pan
parent 0918492498
commit d2be0f34e8
6 changed files with 344 additions and 261 deletions

View File

@ -26,7 +26,6 @@ import javax.security.auth.login.Configuration
import scala.collection.JavaConverters._
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex
import org.apache.curator.framework.recipes.nodes.PersistentNode
import org.apache.curator.framework.state.{ConnectionState, ConnectionStateListener}
import org.apache.curator.framework.state.ConnectionState.{CONNECTED, LOST, RECONNECTED}
@ -38,7 +37,7 @@ import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent, Watcher}
import org.apache.zookeeper.CreateMode.PERSISTENT
import org.apache.zookeeper.KeeperException.NodeExistsException
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, KyuubiSQLException, Logging}
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.service.{AbstractService, Serverable}
@ -47,12 +46,13 @@ import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
/**
* A abstract service for service discovery
*
* @param name the name of the service itself
* @param name the name of the service itself
* @param server the instance uri a service that used to publish itself
*/
abstract class ServiceDiscovery private (
abstract class ServiceDiscovery private(
name: String,
server: Serverable) extends AbstractService(name) {
import ServiceDiscovery._
def this(server: Serverable) =
@ -66,7 +66,9 @@ abstract class ServiceDiscovery private (
private var _namespace: String = _
def zkClient: CuratorFramework = _zkClient
def serviceNode: PersistentNode = _serviceNode
def namespace: String = _namespace
override def initialize(conf: KyuubiConf): Unit = {
@ -104,7 +106,6 @@ abstract class ServiceDiscovery private (
override def start(): Unit = {
val instance = server.connectionUrl
_serviceNode = createZkServiceNode(conf, zkClient, namespace, instance)
// Set a watch on the serviceNode
val watcher = new DeRegisterWatcher
if (zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath) == null) {
@ -117,7 +118,6 @@ abstract class ServiceDiscovery private (
override def stop(): Unit = {
closeServiceNode()
if (zkClient != null) zkClient.close()
super.stop()
}
@ -155,9 +155,11 @@ abstract class ServiceDiscovery private (
}
}
}
}
object ServiceDiscovery extends Logging {
import RetryPolicies._
private final lazy val connectionChecker =
@ -171,7 +173,6 @@ object ServiceDiscovery extends Logging {
val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
val retryPolicyName = conf.get(HA_ZK_CONN_RETRY_POLICY)
val retryPolicy = RetryPolicies.withName(retryPolicyName) match {
case ONE_TIME => new RetryOneTime(baseSleepTime)
case N_TIME => new RetryNTimes(maxRetries, baseSleepTime)
@ -180,7 +181,6 @@ object ServiceDiscovery extends Logging {
case UNTIL_ELAPSED => new RetryUntilElapsed(maxSleepTime, baseSleepTime)
case _ => new ExponentialBackoffRetry(baseSleepTime, maxRetries)
}
CuratorFrameworkFactory.builder()
.connectString(connectionStr)
.sessionTimeoutMs(sessionTimeout)
@ -258,7 +258,7 @@ object ServiceDiscovery extends Logging {
def getServerHost(zkClient: CuratorFramework, namespace: String): Option[(String, Int)] = {
// TODO: use last one because to avoid touching some maybe-crashed engines
// We need a big improvement here.
getServiceNodesInfo(zkClient, namespace, Some(1)) match {
getServiceNodesInfo(zkClient, namespace, Some(1), silent = true) match {
case Seq(sn) => Some((sn.host, sn.port))
case _ => None
}
@ -267,7 +267,8 @@ object ServiceDiscovery extends Logging {
def getServiceNodesInfo(
zkClient: CuratorFramework,
namespace: String,
sizeOpt: Option[Int] = None): Seq[ServiceNodeInfo] = {
sizeOpt: Option[Int] = None,
silent: Boolean = false): Seq[ServiceNodeInfo] = {
try {
val hosts = zkClient.getChildren.forPath(namespace)
val size = sizeOpt.getOrElse(hosts.size())
@ -282,9 +283,10 @@ object ServiceDiscovery extends Logging {
ServiceNodeInfo(namespace, p, host, port, version)
}
} catch {
case _: Exception if silent => Nil
case e: Exception =>
error(s"Failed to get service node info", e)
Seq.empty
Nil
}
}
@ -303,7 +305,7 @@ object ServiceDiscovery extends Logging {
.withMode(PERSISTENT)
.forPath(ns)
} catch {
case _: NodeExistsException => // do nothing
case _: NodeExistsException => // do nothing
case e: KeeperException =>
throw new KyuubiException(s"Failed to create namespace '$ns'", e)
}
@ -343,30 +345,6 @@ object ServiceDiscovery extends Logging {
}
serviceNode
}
def withLock(
zkClient: CuratorFramework,
lockPath: String,
lockTimeout: Long)(f: => Unit): Unit = {
var lock: InterProcessSemaphoreMutex = null
try {
try {
lock = new InterProcessSemaphoreMutex(zkClient, ZKPaths.makePath(lockPath, "lock"))
lock.acquire(lockTimeout, TimeUnit.MILLISECONDS)
} catch {
case e: Exception => throw KyuubiSQLException(s"Lock failed on path [$lockPath]", e)
}
f
} finally {
try {
if (lock != null) {
lock.release()
}
} catch {
case _: Exception =>
}
}
}
}
case class ServiceNodeInfo(

View File

@ -1,73 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine
import org.apache.curator.utils.ZKPaths
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, ShareLevel}
import org.apache.kyuubi.session.SessionHandle
/**
* The default engine name, kyuubi_[USER|CONNECTION|SERVER]_username_subdomain?_sessionId
*
* @param shareLevel Share level of the engine
* @param user Launch user of the engine
* @param sessionId Id of the corresponding session in which the engine is created
*/
private[kyuubi] class EngineName private (
shareLevel: ShareLevel,
user: String,
sessionId: String,
subDomain: Option[String]) {
val defaultEngineName: String = shareLevel match {
case CONNECTION => s"kyuubi_${shareLevel}_${user}_$sessionId"
case _ => subDomain match {
case Some(domain) => s"kyuubi_${shareLevel}_${user}_${domain}_$sessionId"
case _ => s"kyuubi_${shareLevel}_${user}_$sessionId"
}
}
def getEngineSpace(prefix: String): String = {
shareLevel match {
case CONNECTION => ZKPaths.makePath(s"${prefix}_$shareLevel", user, sessionId)
case _ => subDomain match {
case Some(domain) => ZKPaths.makePath(s"${prefix}_$shareLevel", user, domain)
case None => ZKPaths.makePath(s"${prefix}_$shareLevel", user)
}
}
}
def getZkLockPath(prefix: String): String = {
assert(shareLevel != CONNECTION)
subDomain match {
case Some(domain) => ZKPaths.makePath(s"${prefix}_$shareLevel", "lock", user, domain)
case None => ZKPaths.makePath(s"${prefix}_$shareLevel", "lock", user)
}
}
}
private[kyuubi] object EngineName {
def apply(
shareLevel: ShareLevel,
user: String,
handle: SessionHandle,
subDomain: Option[String]): EngineName = {
new EngineName(shareLevel, user, handle.identifier.toString, subDomain)
}
}

View File

@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine
import java.util.concurrent.TimeUnit
import com.codahale.metrics.MetricRegistry
import com.google.common.annotations.VisibleForTesting
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex
import org.apache.curator.utils.ZKPaths
import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_INIT_TIMEOUT, ENGINE_SHARE_LEVEL, ENGINE_SHARE_LEVEL_SUB_DOMAIN}
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, SERVER, ShareLevel}
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
import org.apache.kyuubi.ha.client.ServiceDiscovery.getServerHost
import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, ENGINE_TOTAL}
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.session.SessionHandle
/**
* The description and functionality of an engine at server side
*
* @param conf Engine configuration
* @param user Caller of the engine
* @param sessionId Id of the corresponding session in which the engine is created
*/
private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionId: String)
extends Logging {
// The corresponding ServerSpace where the engine belongs to
private val serverSpace: String = conf.get(HA_ZK_NAMESPACE)
private val timeout: Long = conf.get(ENGINE_INIT_TIMEOUT)
// Share level of the engine
private val shareLevel: ShareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL))
private val subDomain: Option[String] = conf.get(ENGINE_SHARE_LEVEL_SUB_DOMAIN)
// Launcher of the engine
private val appUser: String = shareLevel match {
case SERVER => Utils.currentUser
case _ => user
}
/**
* The default engine name, used as default `spark.app.name` if not set
*/
@VisibleForTesting
private[kyuubi] val defaultEngineName: String = shareLevel match {
case CONNECTION => s"kyuubi_${shareLevel}_${appUser}_$sessionId"
case _ => subDomain match {
case Some(domain) => s"kyuubi_${shareLevel}_${appUser}_${domain}_$sessionId"
case _ => s"kyuubi_${shareLevel}_${appUser}_$sessionId"
}
}
/**
* The EngineSpace used to expose itself to the KyuubiServers in `serverSpace`
*
* For `CONNECTION` share level:
* /$serverSpace_CONNECTION/$user/$sessionId
* For `USER` share level:
* /$serverSpace_USER/$user[/$subDomain]
*
*/
@VisibleForTesting
private[kyuubi] lazy val engineSpace: String = shareLevel match {
case CONNECTION => ZKPaths.makePath(s"${serverSpace}_$shareLevel", appUser, sessionId)
case _ => subDomain match {
case Some(domain) => ZKPaths.makePath(s"${serverSpace}_$shareLevel", appUser, domain)
case None => ZKPaths.makePath(s"${serverSpace}_$shareLevel", appUser)
}
}
/**
* The distributed lock path used to ensure only once engine being created for non-CONNECTION
* share level.
*/
private def tryWithLock[T](zkClient: CuratorFramework)(f: => T): T = shareLevel match {
case CONNECTION => f
case _ =>
val lockPath =
ZKPaths.makePath(s"${serverSpace}_$shareLevel", "lock", appUser, subDomain.orNull)
var lock: InterProcessSemaphoreMutex = null
try {
try {
lock = new InterProcessSemaphoreMutex(zkClient, lockPath)
// Acquire a lease. If no leases are available, this method blocks until either the
// maximum number of leases is increased or another client/process closes a lease
lock.acquire(timeout, TimeUnit.MILLISECONDS)
} catch {
case e: Exception => throw KyuubiSQLException(s"Lock failed on path [$lockPath]", e)
}
f
} finally {
try {
if (lock != null) {
lock.release()
}
} catch {
case _: Exception =>
}
}
}
private def get(zkClient: CuratorFramework): Option[(String, Int)] = {
getServerHost(zkClient, engineSpace)
}
private def create(zkClient: CuratorFramework): (String, Int) = tryWithLock(zkClient) {
var engineRef = get(zkClient)
// Get the engine address ahead if another process has succeeded
if (engineRef.nonEmpty) return engineRef.get
conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
// tag is a seq type with comma-separated
conf.set(SparkProcessBuilder.TAG_KEY,
conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
conf.set(HA_ZK_NAMESPACE, engineSpace)
val builder = new SparkProcessBuilder(appUser, conf)
MetricsSystem.tracing(_.incAndGetCount(ENGINE_TOTAL))
try {
info(s"Launching engine:\n$builder")
val process = builder.start
val started = System.currentTimeMillis()
var exitValue: Option[Int] = None
while (engineRef.isEmpty) {
if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {
exitValue = Some(process.exitValue())
if (exitValue.get != 0) {
val error = builder.getError
MetricsSystem.tracing { ms =>
ms.incAndGetCount(MetricRegistry.name(ENGINE_FAIL, appUser))
ms.incAndGetCount(
MetricRegistry.name(ENGINE_FAIL, error.getClass.getSimpleName))
}
throw error
}
}
if (started + timeout <= System.currentTimeMillis()) {
process.destroyForcibly()
MetricsSystem.tracing(_.incAndGetCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser)))
throw KyuubiSQLException(
s"Timeout($timeout) to launched Spark with $builder",
builder.getError)
}
engineRef = get(zkClient)
}
engineRef.get
} finally {
// we must close the process builder whether session open is success or failure since
// we have a log capture thread in process builder.
builder.close()
}
}
/**
* Get the engine ref from engine space first first or create a new one
*/
def getOrCreate(zkClient: CuratorFramework): (String, Int) = {
get(zkClient).getOrElse {
create(zkClient)
}
}
}
private[kyuubi] object EngineRef {
def apply(conf: KyuubiConf, user: String, handle: SessionHandle): EngineRef = {
new EngineRef(conf, user, handle.identifier.toString)
}
}

View File

@ -17,8 +17,6 @@
package org.apache.kyuubi.session
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import com.codahale.metrics.MetricRegistry
@ -27,13 +25,10 @@ import org.apache.thrift.TException
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.{TSocket, TTransport}
import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.{EngineName, ShareLevel}
import org.apache.kyuubi.engine.ShareLevel.{SERVER, ShareLevel}
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.engine.EngineRef
import org.apache.kyuubi.ha.client.ServiceDiscovery._
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
@ -55,28 +50,12 @@ class KyuubiSessionImpl(
case (key, value) => sessionConf.set(key, value)
}
private val shareLevel: ShareLevel = ShareLevel.withName(sessionConf.get(ENGINE_SHARE_LEVEL))
private val subDomain: Option[String] = sessionConf.get(ENGINE_SHARE_LEVEL_SUB_DOMAIN)
private val appUser: String = shareLevel match {
case SERVER => Utils.currentUser
case _ => user
}
private val zkNamespace: String = sessionConf.get(HA_ZK_NAMESPACE)
private val boundAppName: EngineName = EngineName(shareLevel, appUser, handle, subDomain)
private val appZkNamespace: String = boundAppName.getEngineSpace(zkNamespace)
private val timeout: Long = sessionConf.get(ENGINE_INIT_TIMEOUT)
private val engine: EngineRef = EngineRef(sessionConf, user, handle)
private var transport: TTransport = _
private var client: TCLIService.Client = _
private var remoteSessionHandle: TSessionHandle = _
private def appZkLockPath: String = boundAppName.getZkLockPath(zkNamespace)
override def open(): Unit = {
MetricsSystem.tracing { ms =>
ms.incAndGetCount(CONN_TOTAL)
@ -84,61 +63,8 @@ class KyuubiSessionImpl(
}
super.open()
withZkClient(sessionConf) { zkClient =>
logSessionInfo(s"Connected to Zookeeper")
def tryOpenSession: Unit = getServerHost(zkClient, appZkNamespace) match {
case Some((host, port)) => openSession(host, port)
case None =>
sessionConf.setIfMissing(SparkProcessBuilder.APP_KEY, boundAppName.defaultEngineName)
// tag is a seq type with comma-separated
sessionConf.set(SparkProcessBuilder.TAG_KEY,
sessionConf.getOption(SparkProcessBuilder.TAG_KEY)
.map(_ + ",").getOrElse("") + "KYUUBI")
sessionConf.set(HA_ZK_NAMESPACE, appZkNamespace)
val builder = new SparkProcessBuilder(appUser, sessionConf)
MetricsSystem.tracing(_.incAndGetCount(ENGINE_TOTAL))
try {
logSessionInfo(s"Launching engine:\n$builder")
val process = builder.start
var sh = getServerHost(zkClient, appZkNamespace)
val started = System.currentTimeMillis()
var exitValue: Option[Int] = None
while (sh.isEmpty) {
if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {
exitValue = Some(process.exitValue())
if (exitValue.get != 0) {
val error = builder.getError
MetricsSystem.tracing { ms =>
ms.incAndGetCount(MetricRegistry.name(ENGINE_FAIL, user))
ms.incAndGetCount(
MetricRegistry.name(ENGINE_FAIL, error.getClass.getSimpleName))
}
throw error
}
}
if (started + timeout <= System.currentTimeMillis()) {
process.destroyForcibly()
MetricsSystem.tracing(_.incAndGetCount(MetricRegistry.name(ENGINE_TIMEOUT, user)))
throw KyuubiSQLException(s"Timed out($timeout ms) to launched Spark with $builder",
builder.getError)
}
sh = getServerHost(zkClient, appZkNamespace)
}
val Some((host, port)) = sh
openSession(host, port)
} finally {
// we must close the process builder whether session open is success or failure since
// we have a log capture thread in process builder.
builder.close()
}
}
// Add lock for creating engine except ShareLevel of CONNECTION
if (shareLevel != ShareLevel.CONNECTION) {
withLock(zkClient, appZkLockPath, timeout) {
tryOpenSession
}
} else {
tryOpenSession
}
val (host, port) = engine.getOrCreate(zkClient)
openSession(host, port)
}
}

View File

@ -1,73 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine
import org.apache.curator.utils.ZKPaths
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
import org.apache.kyuubi.session.SessionHandle
class EngineNameSuite extends KyuubiFunSuite {
import ShareLevel._
test(s"${CONNECTION} shared level engine name") {
val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
val user = Utils.currentUser
Seq(Some("suffix"), None).foreach { maybeSubDomain =>
val appName = EngineName(CONNECTION, user, id, maybeSubDomain)
assert(appName.getEngineSpace("kyuubi") ===
ZKPaths.makePath(s"kyuubi_$CONNECTION", user, id.identifier.toString))
assert(appName.defaultEngineName === s"kyuubi_${CONNECTION}_${user}_${id.identifier}")
intercept[AssertionError](appName.getZkLockPath("kyuubi"))
}
}
test(s"${USER} shared level engine name") {
val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
val user = Utils.currentUser
val appName = EngineName(USER, user, id, None)
assert(appName.getEngineSpace("kyuubi") ===
ZKPaths.makePath(s"kyuubi_$USER", user))
assert(appName.defaultEngineName === s"kyuubi_${USER}_${user}_${id.identifier}")
assert(appName.getZkLockPath("kyuubi") === s"/kyuubi_${USER}/lock/$user")
val appName2 = EngineName(USER, user, id, Some("abc"))
assert(appName2.getEngineSpace("kyuubi") ===
ZKPaths.makePath(s"kyuubi_$USER", user, "abc"))
assert(appName2.defaultEngineName === s"kyuubi_${USER}_${user}_abc_${id.identifier}")
assert(appName2.getZkLockPath("kyuubi") === s"/kyuubi_${USER}/lock/$user/abc")
}
test(s"${SERVER} shared level engine name") {
val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
val user = Utils.currentUser
val appName = EngineName(SERVER, user, id, None)
assert(appName.getEngineSpace("kyuubi") ===
ZKPaths.makePath(s"kyuubi_$SERVER", user))
assert(appName.defaultEngineName === s"kyuubi_${SERVER}_${user}_${id.identifier}")
assert(appName.getZkLockPath("kyuubi") === s"/kyuubi_${SERVER}/lock/$user")
val appName2 = EngineName(SERVER, user, id, Some("abc"))
assert(appName2.getEngineSpace("kyuubi") ===
ZKPaths.makePath(s"kyuubi_$SERVER", user, "abc"))
assert(appName2.defaultEngineName === s"kyuubi_${SERVER}_${user}_abc_${id.identifier}")
assert(appName2.getZkLockPath("kyuubi") === s"/kyuubi_${SERVER}/lock/$user/abc")
}
}

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine
import org.apache.curator.utils.ZKPaths
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.ServiceDiscovery
import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.util.NamedThreadFactory
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
class EngineRefSuite extends KyuubiFunSuite {
import ShareLevel._
private val zkServer = new EmbeddedZookeeper
private val conf = KyuubiConf()
val user = Utils.currentUser
override def beforeAll(): Unit = {
val zkData = Utils.createTempDir()
conf.set(ZookeeperConf.ZK_DATA_DIR, zkData.toString)
.set(ZookeeperConf.ZK_CLIENT_PORT, 0)
zkServer.initialize(conf)
zkServer.start()
super.beforeAll()
}
override def afterAll(): Unit = {
zkServer.stop()
super.afterAll()
}
test(s"${CONNECTION} shared level engine name") {
val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
Seq(None, Some("suffix")).foreach { domain =>
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, CONNECTION.toString)
domain.foreach(conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN.key, _))
val engine = EngineRef(conf, user, id)
assert(engine.engineSpace ===
ZKPaths.makePath(s"kyuubi_$CONNECTION", user, id.identifier.toString))
assert(engine.defaultEngineName === s"kyuubi_${CONNECTION}_${user}_${id.identifier}")
}
}
test(s"${USER} shared level engine name") {
val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
conf.unset(KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN)
val appName = EngineRef(conf, user, id)
assert(appName.engineSpace === ZKPaths.makePath(s"kyuubi_$USER", user))
assert(appName.defaultEngineName === s"kyuubi_${USER}_${user}_${id.identifier}")
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN.key, "abc")
val appName2 = EngineRef(conf, user, id)
assert(appName2.engineSpace ===
ZKPaths.makePath(s"kyuubi_$USER", user, "abc"))
assert(appName2.defaultEngineName === s"kyuubi_${USER}_${user}_abc_${id.identifier}")
}
test(s"${SERVER} shared level engine name") {
val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, SERVER.toString)
conf.unset(KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN)
val appName = EngineRef(conf, user, id)
assert(appName.engineSpace ===
ZKPaths.makePath(s"kyuubi_$SERVER", user))
assert(appName.defaultEngineName === s"kyuubi_${SERVER}_${user}_${id.identifier}")
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN.key, "abc")
val appName2 = EngineRef(conf, user, id)
assert(appName2.engineSpace ===
ZKPaths.makePath(s"kyuubi_$SERVER", user, "abc"))
assert(appName2.defaultEngineName === s"kyuubi_${SERVER}_${user}_abc_${id.identifier}")
}
test("start and get engine address with lock") {
val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
conf.set(KyuubiConf.FRONTEND_BIND_PORT, 0)
conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test")
conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString)
val engine = EngineRef(conf, user, id)
var port1 = 0
var port2 = 0
val r1 = new Runnable {
override def run(): Unit = {
ServiceDiscovery.withZkClient(conf) { client =>
val hp = engine.getOrCreate(client)
port1 = hp._2
}
}
}
val r2 = new Runnable {
override def run(): Unit = {
ServiceDiscovery.withZkClient(conf) { client =>
val hp = engine.getOrCreate(client)
port2 = hp._2
}
}
}
val factory = new NamedThreadFactory("engine-test", false)
val thread1 = factory.newThread(r1)
val thread2 = factory.newThread(r2)
thread1.start()
thread2.start()
eventually(timeout(90.seconds), interval(1.second)) {
assert(port1 != 0, "engine started")
assert(port2 == port1, "engine shared")
}
}
}