From d2be0f34e88d68e3600a40e2a67746a91f2fa03a Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 12 May 2021 13:19:15 +0800 Subject: [PATCH] [KYUUBI #629] Refine distributed lock to lock only engine bootstrap ### _Why are the changes needed?_ 1. refine the zk lock to lock only during engine bootstrapping, not wrap the part that only gets the address of engines. This can significantly reduce overhead. 2. add tests for the locking proccess. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request Closes #629 from yaooqinn/lock. Closes #629 a1b4e5f1 [Kent Yao] Refine distributed lock to lock only engine bootstrap a6738652 [Kent Yao] Refine distributed lock to lock only engine bootstrap ff238e6f [Kent Yao] Refine distributed lock to lock only engine bootstrap f3979fce [Kent Yao] Refine distributed lock to lock only engine bootstrap e8ca9954 [Kent Yao] Refine distributed lock to lock only engine bootstrap Authored-by: Kent Yao Signed-off-by: Cheng Pan <379377944@qq.com> --- .../kyuubi/ha/client/ServiceDiscovery.scala | 50 ++--- .../org/apache/kyuubi/engine/EngineName.scala | 73 ------- .../org/apache/kyuubi/engine/EngineRef.scala | 191 ++++++++++++++++++ .../kyuubi/session/KyuubiSessionImpl.scala | 84 +------- .../kyuubi/engine/EngineNameSuite.scala | 73 ------- .../apache/kyuubi/engine/EngineRefSuite.scala | 134 ++++++++++++ 6 files changed, 344 insertions(+), 261 deletions(-) delete mode 100644 kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineName.scala create mode 100644 kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala delete mode 100644 kyuubi-main/src/test/scala/org/apache/kyuubi/engine/EngineNameSuite.scala create mode 100644 kyuubi-main/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala index 7177b2c4e..adb147c9a 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala @@ -26,7 +26,6 @@ import javax.security.auth.login.Configuration import scala.collection.JavaConverters._ import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} -import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex import org.apache.curator.framework.recipes.nodes.PersistentNode import org.apache.curator.framework.state.{ConnectionState, ConnectionStateListener} import org.apache.curator.framework.state.ConnectionState.{CONNECTED, LOST, RECONNECTED} @@ -38,7 +37,7 @@ import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent, Watcher} import org.apache.zookeeper.CreateMode.PERSISTENT import org.apache.zookeeper.KeeperException.NodeExistsException -import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, KyuubiSQLException, Logging} +import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, Logging} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.ha.HighAvailabilityConf._ import org.apache.kyuubi.service.{AbstractService, Serverable} @@ -47,12 +46,13 @@ import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils} /** * A abstract service for service discovery * - * @param name the name of the service itself + * @param name the name of the service itself * @param server the instance uri a service that used to publish itself */ -abstract class ServiceDiscovery private ( +abstract class ServiceDiscovery private( name: String, server: Serverable) extends AbstractService(name) { + import ServiceDiscovery._ def this(server: Serverable) = @@ -66,7 +66,9 @@ abstract class ServiceDiscovery private ( private var _namespace: String = _ def zkClient: CuratorFramework = _zkClient + def serviceNode: PersistentNode = _serviceNode + def namespace: String = _namespace override def initialize(conf: KyuubiConf): Unit = { @@ -104,7 +106,6 @@ abstract class ServiceDiscovery private ( override def start(): Unit = { val instance = server.connectionUrl _serviceNode = createZkServiceNode(conf, zkClient, namespace, instance) - // Set a watch on the serviceNode val watcher = new DeRegisterWatcher if (zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath) == null) { @@ -117,7 +118,6 @@ abstract class ServiceDiscovery private ( override def stop(): Unit = { closeServiceNode() - if (zkClient != null) zkClient.close() super.stop() } @@ -155,9 +155,11 @@ abstract class ServiceDiscovery private ( } } } + } object ServiceDiscovery extends Logging { + import RetryPolicies._ private final lazy val connectionChecker = @@ -171,7 +173,6 @@ object ServiceDiscovery extends Logging { val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT) val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES) val retryPolicyName = conf.get(HA_ZK_CONN_RETRY_POLICY) - val retryPolicy = RetryPolicies.withName(retryPolicyName) match { case ONE_TIME => new RetryOneTime(baseSleepTime) case N_TIME => new RetryNTimes(maxRetries, baseSleepTime) @@ -180,7 +181,6 @@ object ServiceDiscovery extends Logging { case UNTIL_ELAPSED => new RetryUntilElapsed(maxSleepTime, baseSleepTime) case _ => new ExponentialBackoffRetry(baseSleepTime, maxRetries) } - CuratorFrameworkFactory.builder() .connectString(connectionStr) .sessionTimeoutMs(sessionTimeout) @@ -258,7 +258,7 @@ object ServiceDiscovery extends Logging { def getServerHost(zkClient: CuratorFramework, namespace: String): Option[(String, Int)] = { // TODO: use last one because to avoid touching some maybe-crashed engines // We need a big improvement here. - getServiceNodesInfo(zkClient, namespace, Some(1)) match { + getServiceNodesInfo(zkClient, namespace, Some(1), silent = true) match { case Seq(sn) => Some((sn.host, sn.port)) case _ => None } @@ -267,7 +267,8 @@ object ServiceDiscovery extends Logging { def getServiceNodesInfo( zkClient: CuratorFramework, namespace: String, - sizeOpt: Option[Int] = None): Seq[ServiceNodeInfo] = { + sizeOpt: Option[Int] = None, + silent: Boolean = false): Seq[ServiceNodeInfo] = { try { val hosts = zkClient.getChildren.forPath(namespace) val size = sizeOpt.getOrElse(hosts.size()) @@ -282,9 +283,10 @@ object ServiceDiscovery extends Logging { ServiceNodeInfo(namespace, p, host, port, version) } } catch { + case _: Exception if silent => Nil case e: Exception => error(s"Failed to get service node info", e) - Seq.empty + Nil } } @@ -303,7 +305,7 @@ object ServiceDiscovery extends Logging { .withMode(PERSISTENT) .forPath(ns) } catch { - case _: NodeExistsException => // do nothing + case _: NodeExistsException => // do nothing case e: KeeperException => throw new KyuubiException(s"Failed to create namespace '$ns'", e) } @@ -343,30 +345,6 @@ object ServiceDiscovery extends Logging { } serviceNode } - - def withLock( - zkClient: CuratorFramework, - lockPath: String, - lockTimeout: Long)(f: => Unit): Unit = { - var lock: InterProcessSemaphoreMutex = null - try { - try { - lock = new InterProcessSemaphoreMutex(zkClient, ZKPaths.makePath(lockPath, "lock")) - lock.acquire(lockTimeout, TimeUnit.MILLISECONDS) - } catch { - case e: Exception => throw KyuubiSQLException(s"Lock failed on path [$lockPath]", e) - } - f - } finally { - try { - if (lock != null) { - lock.release() - } - } catch { - case _: Exception => - } - } - } } case class ServiceNodeInfo( diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineName.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineName.scala deleted file mode 100644 index ff143032c..000000000 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineName.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.engine - -import org.apache.curator.utils.ZKPaths - -import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, ShareLevel} -import org.apache.kyuubi.session.SessionHandle - -/** - * The default engine name, kyuubi_[USER|CONNECTION|SERVER]_username_subdomain?_sessionId - * - * @param shareLevel Share level of the engine - * @param user Launch user of the engine - * @param sessionId Id of the corresponding session in which the engine is created - */ -private[kyuubi] class EngineName private ( - shareLevel: ShareLevel, - user: String, - sessionId: String, - subDomain: Option[String]) { - - val defaultEngineName: String = shareLevel match { - case CONNECTION => s"kyuubi_${shareLevel}_${user}_$sessionId" - case _ => subDomain match { - case Some(domain) => s"kyuubi_${shareLevel}_${user}_${domain}_$sessionId" - case _ => s"kyuubi_${shareLevel}_${user}_$sessionId" - } - } - - def getEngineSpace(prefix: String): String = { - shareLevel match { - case CONNECTION => ZKPaths.makePath(s"${prefix}_$shareLevel", user, sessionId) - case _ => subDomain match { - case Some(domain) => ZKPaths.makePath(s"${prefix}_$shareLevel", user, domain) - case None => ZKPaths.makePath(s"${prefix}_$shareLevel", user) - } - } - } - - def getZkLockPath(prefix: String): String = { - assert(shareLevel != CONNECTION) - subDomain match { - case Some(domain) => ZKPaths.makePath(s"${prefix}_$shareLevel", "lock", user, domain) - case None => ZKPaths.makePath(s"${prefix}_$shareLevel", "lock", user) - } - } -} - -private[kyuubi] object EngineName { - def apply( - shareLevel: ShareLevel, - user: String, - handle: SessionHandle, - subDomain: Option[String]): EngineName = { - new EngineName(shareLevel, user, handle.identifier.toString, subDomain) - } -} diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala new file mode 100644 index 000000000..13c6af7ef --- /dev/null +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine + +import java.util.concurrent.TimeUnit + +import com.codahale.metrics.MetricRegistry +import com.google.common.annotations.VisibleForTesting +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex +import org.apache.curator.utils.ZKPaths + +import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils} +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.{ENGINE_INIT_TIMEOUT, ENGINE_SHARE_LEVEL, ENGINE_SHARE_LEVEL_SUB_DOMAIN} +import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, SERVER, ShareLevel} +import org.apache.kyuubi.engine.spark.SparkProcessBuilder +import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE +import org.apache.kyuubi.ha.client.ServiceDiscovery.getServerHost +import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, ENGINE_TOTAL} +import org.apache.kyuubi.metrics.MetricsSystem +import org.apache.kyuubi.session.SessionHandle + +/** + * The description and functionality of an engine at server side + * + * @param conf Engine configuration + * @param user Caller of the engine + * @param sessionId Id of the corresponding session in which the engine is created + */ +private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionId: String) + extends Logging { + + // The corresponding ServerSpace where the engine belongs to + private val serverSpace: String = conf.get(HA_ZK_NAMESPACE) + + private val timeout: Long = conf.get(ENGINE_INIT_TIMEOUT) + + // Share level of the engine + private val shareLevel: ShareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL)) + + private val subDomain: Option[String] = conf.get(ENGINE_SHARE_LEVEL_SUB_DOMAIN) + + // Launcher of the engine + private val appUser: String = shareLevel match { + case SERVER => Utils.currentUser + case _ => user + } + + /** + * The default engine name, used as default `spark.app.name` if not set + */ + @VisibleForTesting + private[kyuubi] val defaultEngineName: String = shareLevel match { + case CONNECTION => s"kyuubi_${shareLevel}_${appUser}_$sessionId" + case _ => subDomain match { + case Some(domain) => s"kyuubi_${shareLevel}_${appUser}_${domain}_$sessionId" + case _ => s"kyuubi_${shareLevel}_${appUser}_$sessionId" + } + } + + /** + * The EngineSpace used to expose itself to the KyuubiServers in `serverSpace` + * + * For `CONNECTION` share level: + * /$serverSpace_CONNECTION/$user/$sessionId + * For `USER` share level: + * /$serverSpace_USER/$user[/$subDomain] + * + */ + @VisibleForTesting + private[kyuubi] lazy val engineSpace: String = shareLevel match { + case CONNECTION => ZKPaths.makePath(s"${serverSpace}_$shareLevel", appUser, sessionId) + case _ => subDomain match { + case Some(domain) => ZKPaths.makePath(s"${serverSpace}_$shareLevel", appUser, domain) + case None => ZKPaths.makePath(s"${serverSpace}_$shareLevel", appUser) + } + } + + /** + * The distributed lock path used to ensure only once engine being created for non-CONNECTION + * share level. + */ + private def tryWithLock[T](zkClient: CuratorFramework)(f: => T): T = shareLevel match { + case CONNECTION => f + case _ => + val lockPath = + ZKPaths.makePath(s"${serverSpace}_$shareLevel", "lock", appUser, subDomain.orNull) + var lock: InterProcessSemaphoreMutex = null + try { + try { + lock = new InterProcessSemaphoreMutex(zkClient, lockPath) + // Acquire a lease. If no leases are available, this method blocks until either the + // maximum number of leases is increased or another client/process closes a lease + lock.acquire(timeout, TimeUnit.MILLISECONDS) + } catch { + case e: Exception => throw KyuubiSQLException(s"Lock failed on path [$lockPath]", e) + } + f + } finally { + try { + if (lock != null) { + lock.release() + } + } catch { + case _: Exception => + } + } + } + + private def get(zkClient: CuratorFramework): Option[(String, Int)] = { + getServerHost(zkClient, engineSpace) + } + + private def create(zkClient: CuratorFramework): (String, Int) = tryWithLock(zkClient) { + var engineRef = get(zkClient) + // Get the engine address ahead if another process has succeeded + if (engineRef.nonEmpty) return engineRef.get + + conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName) + // tag is a seq type with comma-separated + conf.set(SparkProcessBuilder.TAG_KEY, + conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI") + conf.set(HA_ZK_NAMESPACE, engineSpace) + val builder = new SparkProcessBuilder(appUser, conf) + MetricsSystem.tracing(_.incAndGetCount(ENGINE_TOTAL)) + try { + info(s"Launching engine:\n$builder") + val process = builder.start + val started = System.currentTimeMillis() + var exitValue: Option[Int] = None + while (engineRef.isEmpty) { + if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) { + exitValue = Some(process.exitValue()) + if (exitValue.get != 0) { + val error = builder.getError + MetricsSystem.tracing { ms => + ms.incAndGetCount(MetricRegistry.name(ENGINE_FAIL, appUser)) + ms.incAndGetCount( + MetricRegistry.name(ENGINE_FAIL, error.getClass.getSimpleName)) + } + throw error + } + } + if (started + timeout <= System.currentTimeMillis()) { + process.destroyForcibly() + MetricsSystem.tracing(_.incAndGetCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser))) + throw KyuubiSQLException( + s"Timeout($timeout) to launched Spark with $builder", + builder.getError) + } + engineRef = get(zkClient) + } + engineRef.get + } finally { + // we must close the process builder whether session open is success or failure since + // we have a log capture thread in process builder. + builder.close() + } + } + + /** + * Get the engine ref from engine space first first or create a new one + */ + def getOrCreate(zkClient: CuratorFramework): (String, Int) = { + get(zkClient).getOrElse { + create(zkClient) + } + } +} + +private[kyuubi] object EngineRef { + def apply(conf: KyuubiConf, user: String, handle: SessionHandle): EngineRef = { + new EngineRef(conf, user, handle.identifier.toString) + } +} diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index 4dd12fe8a..14be45d2c 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -17,8 +17,6 @@ package org.apache.kyuubi.session -import java.util.concurrent.TimeUnit - import scala.collection.JavaConverters._ import com.codahale.metrics.MetricRegistry @@ -27,13 +25,10 @@ import org.apache.thrift.TException import org.apache.thrift.protocol.TBinaryProtocol import org.apache.thrift.transport.{TSocket, TTransport} -import org.apache.kyuubi.{KyuubiSQLException, Utils} +import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ -import org.apache.kyuubi.engine.{EngineName, ShareLevel} -import org.apache.kyuubi.engine.ShareLevel.{SERVER, ShareLevel} -import org.apache.kyuubi.engine.spark.SparkProcessBuilder -import org.apache.kyuubi.ha.HighAvailabilityConf._ +import org.apache.kyuubi.engine.EngineRef import org.apache.kyuubi.ha.client.ServiceDiscovery._ import org.apache.kyuubi.metrics.MetricsConstants._ import org.apache.kyuubi.metrics.MetricsSystem @@ -55,28 +50,12 @@ class KyuubiSessionImpl( case (key, value) => sessionConf.set(key, value) } - private val shareLevel: ShareLevel = ShareLevel.withName(sessionConf.get(ENGINE_SHARE_LEVEL)) - private val subDomain: Option[String] = sessionConf.get(ENGINE_SHARE_LEVEL_SUB_DOMAIN) - - private val appUser: String = shareLevel match { - case SERVER => Utils.currentUser - case _ => user - } - - private val zkNamespace: String = sessionConf.get(HA_ZK_NAMESPACE) - - private val boundAppName: EngineName = EngineName(shareLevel, appUser, handle, subDomain) - - private val appZkNamespace: String = boundAppName.getEngineSpace(zkNamespace) - - private val timeout: Long = sessionConf.get(ENGINE_INIT_TIMEOUT) + private val engine: EngineRef = EngineRef(sessionConf, user, handle) private var transport: TTransport = _ private var client: TCLIService.Client = _ private var remoteSessionHandle: TSessionHandle = _ - private def appZkLockPath: String = boundAppName.getZkLockPath(zkNamespace) - override def open(): Unit = { MetricsSystem.tracing { ms => ms.incAndGetCount(CONN_TOTAL) @@ -84,61 +63,8 @@ class KyuubiSessionImpl( } super.open() withZkClient(sessionConf) { zkClient => - logSessionInfo(s"Connected to Zookeeper") - def tryOpenSession: Unit = getServerHost(zkClient, appZkNamespace) match { - case Some((host, port)) => openSession(host, port) - case None => - sessionConf.setIfMissing(SparkProcessBuilder.APP_KEY, boundAppName.defaultEngineName) - // tag is a seq type with comma-separated - sessionConf.set(SparkProcessBuilder.TAG_KEY, - sessionConf.getOption(SparkProcessBuilder.TAG_KEY) - .map(_ + ",").getOrElse("") + "KYUUBI") - sessionConf.set(HA_ZK_NAMESPACE, appZkNamespace) - val builder = new SparkProcessBuilder(appUser, sessionConf) - MetricsSystem.tracing(_.incAndGetCount(ENGINE_TOTAL)) - try { - logSessionInfo(s"Launching engine:\n$builder") - val process = builder.start - var sh = getServerHost(zkClient, appZkNamespace) - val started = System.currentTimeMillis() - var exitValue: Option[Int] = None - while (sh.isEmpty) { - if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) { - exitValue = Some(process.exitValue()) - if (exitValue.get != 0) { - val error = builder.getError - MetricsSystem.tracing { ms => - ms.incAndGetCount(MetricRegistry.name(ENGINE_FAIL, user)) - ms.incAndGetCount( - MetricRegistry.name(ENGINE_FAIL, error.getClass.getSimpleName)) - } - throw error - } - } - if (started + timeout <= System.currentTimeMillis()) { - process.destroyForcibly() - MetricsSystem.tracing(_.incAndGetCount(MetricRegistry.name(ENGINE_TIMEOUT, user))) - throw KyuubiSQLException(s"Timed out($timeout ms) to launched Spark with $builder", - builder.getError) - } - sh = getServerHost(zkClient, appZkNamespace) - } - val Some((host, port)) = sh - openSession(host, port) - } finally { - // we must close the process builder whether session open is success or failure since - // we have a log capture thread in process builder. - builder.close() - } - } - // Add lock for creating engine except ShareLevel of CONNECTION - if (shareLevel != ShareLevel.CONNECTION) { - withLock(zkClient, appZkLockPath, timeout) { - tryOpenSession - } - } else { - tryOpenSession - } + val (host, port) = engine.getOrCreate(zkClient) + openSession(host, port) } } diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/EngineNameSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/EngineNameSuite.scala deleted file mode 100644 index f18c378c6..000000000 --- a/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/EngineNameSuite.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.engine - -import org.apache.curator.utils.ZKPaths -import org.apache.hive.service.rpc.thrift.TProtocolVersion - -import org.apache.kyuubi.{KyuubiFunSuite, Utils} -import org.apache.kyuubi.session.SessionHandle - -class EngineNameSuite extends KyuubiFunSuite { - import ShareLevel._ - - test(s"${CONNECTION} shared level engine name") { - val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) - val user = Utils.currentUser - Seq(Some("suffix"), None).foreach { maybeSubDomain => - val appName = EngineName(CONNECTION, user, id, maybeSubDomain) - assert(appName.getEngineSpace("kyuubi") === - ZKPaths.makePath(s"kyuubi_$CONNECTION", user, id.identifier.toString)) - assert(appName.defaultEngineName === s"kyuubi_${CONNECTION}_${user}_${id.identifier}") - intercept[AssertionError](appName.getZkLockPath("kyuubi")) - } - } - - test(s"${USER} shared level engine name") { - val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) - val user = Utils.currentUser - val appName = EngineName(USER, user, id, None) - assert(appName.getEngineSpace("kyuubi") === - ZKPaths.makePath(s"kyuubi_$USER", user)) - assert(appName.defaultEngineName === s"kyuubi_${USER}_${user}_${id.identifier}") - assert(appName.getZkLockPath("kyuubi") === s"/kyuubi_${USER}/lock/$user") - - val appName2 = EngineName(USER, user, id, Some("abc")) - assert(appName2.getEngineSpace("kyuubi") === - ZKPaths.makePath(s"kyuubi_$USER", user, "abc")) - assert(appName2.defaultEngineName === s"kyuubi_${USER}_${user}_abc_${id.identifier}") - assert(appName2.getZkLockPath("kyuubi") === s"/kyuubi_${USER}/lock/$user/abc") - } - - test(s"${SERVER} shared level engine name") { - val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) - val user = Utils.currentUser - val appName = EngineName(SERVER, user, id, None) - assert(appName.getEngineSpace("kyuubi") === - ZKPaths.makePath(s"kyuubi_$SERVER", user)) - assert(appName.defaultEngineName === s"kyuubi_${SERVER}_${user}_${id.identifier}") - assert(appName.getZkLockPath("kyuubi") === s"/kyuubi_${SERVER}/lock/$user") - - - val appName2 = EngineName(SERVER, user, id, Some("abc")) - assert(appName2.getEngineSpace("kyuubi") === - ZKPaths.makePath(s"kyuubi_$SERVER", user, "abc")) - assert(appName2.defaultEngineName === s"kyuubi_${SERVER}_${user}_abc_${id.identifier}") - assert(appName2.getZkLockPath("kyuubi") === s"/kyuubi_${SERVER}/lock/$user/abc") - } -} diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala new file mode 100644 index 000000000..ac955b27e --- /dev/null +++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine + +import org.apache.curator.utils.ZKPaths +import org.apache.hive.service.rpc.thrift.TProtocolVersion +import org.scalatest.time.SpanSugar.convertIntToGrainOfTime + +import org.apache.kyuubi.{KyuubiFunSuite, Utils} +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.ha.HighAvailabilityConf +import org.apache.kyuubi.ha.client.ServiceDiscovery +import org.apache.kyuubi.session.SessionHandle +import org.apache.kyuubi.util.NamedThreadFactory +import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf} + +class EngineRefSuite extends KyuubiFunSuite { + import ShareLevel._ + private val zkServer = new EmbeddedZookeeper + private val conf = KyuubiConf() + val user = Utils.currentUser + + override def beforeAll(): Unit = { + val zkData = Utils.createTempDir() + conf.set(ZookeeperConf.ZK_DATA_DIR, zkData.toString) + .set(ZookeeperConf.ZK_CLIENT_PORT, 0) + zkServer.initialize(conf) + zkServer.start() + super.beforeAll() + } + + override def afterAll(): Unit = { + zkServer.stop() + super.afterAll() + } + + test(s"${CONNECTION} shared level engine name") { + val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) + Seq(None, Some("suffix")).foreach { domain => + conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, CONNECTION.toString) + domain.foreach(conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN.key, _)) + val engine = EngineRef(conf, user, id) + assert(engine.engineSpace === + ZKPaths.makePath(s"kyuubi_$CONNECTION", user, id.identifier.toString)) + assert(engine.defaultEngineName === s"kyuubi_${CONNECTION}_${user}_${id.identifier}") + } + } + + test(s"${USER} shared level engine name") { + val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) + conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString) + conf.unset(KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN) + val appName = EngineRef(conf, user, id) + assert(appName.engineSpace === ZKPaths.makePath(s"kyuubi_$USER", user)) + assert(appName.defaultEngineName === s"kyuubi_${USER}_${user}_${id.identifier}") + + conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN.key, "abc") + val appName2 = EngineRef(conf, user, id) + assert(appName2.engineSpace === + ZKPaths.makePath(s"kyuubi_$USER", user, "abc")) + assert(appName2.defaultEngineName === s"kyuubi_${USER}_${user}_abc_${id.identifier}") + } + + test(s"${SERVER} shared level engine name") { + val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) + conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, SERVER.toString) + conf.unset(KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN) + val appName = EngineRef(conf, user, id) + assert(appName.engineSpace === + ZKPaths.makePath(s"kyuubi_$SERVER", user)) + assert(appName.defaultEngineName === s"kyuubi_${SERVER}_${user}_${id.identifier}") + + conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN.key, "abc") + val appName2 = EngineRef(conf, user, id) + assert(appName2.engineSpace === + ZKPaths.makePath(s"kyuubi_$SERVER", user, "abc")) + assert(appName2.defaultEngineName === s"kyuubi_${SERVER}_${user}_abc_${id.identifier}") + } + + test("start and get engine address with lock") { + val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) + conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString) + conf.set(KyuubiConf.FRONTEND_BIND_PORT, 0) + conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test") + conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString) + val engine = EngineRef(conf, user, id) + + var port1 = 0 + var port2 = 0 + + val r1 = new Runnable { + override def run(): Unit = { + ServiceDiscovery.withZkClient(conf) { client => + val hp = engine.getOrCreate(client) + port1 = hp._2 + } + } + } + + val r2 = new Runnable { + override def run(): Unit = { + ServiceDiscovery.withZkClient(conf) { client => + val hp = engine.getOrCreate(client) + port2 = hp._2 + } + } + } + val factory = new NamedThreadFactory("engine-test", false) + val thread1 = factory.newThread(r1) + val thread2 = factory.newThread(r2) + thread1.start() + thread2.start() + + eventually(timeout(90.seconds), interval(1.second)) { + assert(port1 != 0, "engine started") + assert(port2 == port1, "engine shared") + } + } +}