diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala index 7177b2c4e..adb147c9a 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala @@ -26,7 +26,6 @@ import javax.security.auth.login.Configuration import scala.collection.JavaConverters._ import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} -import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex import org.apache.curator.framework.recipes.nodes.PersistentNode import org.apache.curator.framework.state.{ConnectionState, ConnectionStateListener} import org.apache.curator.framework.state.ConnectionState.{CONNECTED, LOST, RECONNECTED} @@ -38,7 +37,7 @@ import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent, Watcher} import org.apache.zookeeper.CreateMode.PERSISTENT import org.apache.zookeeper.KeeperException.NodeExistsException -import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, KyuubiSQLException, Logging} +import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, Logging} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.ha.HighAvailabilityConf._ import org.apache.kyuubi.service.{AbstractService, Serverable} @@ -47,12 +46,13 @@ import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils} /** * A abstract service for service discovery * - * @param name the name of the service itself + * @param name the name of the service itself * @param server the instance uri a service that used to publish itself */ -abstract class ServiceDiscovery private ( +abstract class ServiceDiscovery private( name: String, server: Serverable) extends AbstractService(name) { + import ServiceDiscovery._ def this(server: Serverable) = @@ -66,7 +66,9 @@ abstract class ServiceDiscovery private ( private var _namespace: String = _ def zkClient: CuratorFramework = _zkClient + def serviceNode: PersistentNode = _serviceNode + def namespace: String = _namespace override def initialize(conf: KyuubiConf): Unit = { @@ -104,7 +106,6 @@ abstract class ServiceDiscovery private ( override def start(): Unit = { val instance = server.connectionUrl _serviceNode = createZkServiceNode(conf, zkClient, namespace, instance) - // Set a watch on the serviceNode val watcher = new DeRegisterWatcher if (zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath) == null) { @@ -117,7 +118,6 @@ abstract class ServiceDiscovery private ( override def stop(): Unit = { closeServiceNode() - if (zkClient != null) zkClient.close() super.stop() } @@ -155,9 +155,11 @@ abstract class ServiceDiscovery private ( } } } + } object ServiceDiscovery extends Logging { + import RetryPolicies._ private final lazy val connectionChecker = @@ -171,7 +173,6 @@ object ServiceDiscovery extends Logging { val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT) val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES) val retryPolicyName = conf.get(HA_ZK_CONN_RETRY_POLICY) - val retryPolicy = RetryPolicies.withName(retryPolicyName) match { case ONE_TIME => new RetryOneTime(baseSleepTime) case N_TIME => new RetryNTimes(maxRetries, baseSleepTime) @@ -180,7 +181,6 @@ object ServiceDiscovery extends Logging { case UNTIL_ELAPSED => new RetryUntilElapsed(maxSleepTime, baseSleepTime) case _ => new ExponentialBackoffRetry(baseSleepTime, maxRetries) } - CuratorFrameworkFactory.builder() .connectString(connectionStr) .sessionTimeoutMs(sessionTimeout) @@ -258,7 +258,7 @@ object ServiceDiscovery extends Logging { def getServerHost(zkClient: CuratorFramework, namespace: String): Option[(String, Int)] = { // TODO: use last one because to avoid touching some maybe-crashed engines // We need a big improvement here. - getServiceNodesInfo(zkClient, namespace, Some(1)) match { + getServiceNodesInfo(zkClient, namespace, Some(1), silent = true) match { case Seq(sn) => Some((sn.host, sn.port)) case _ => None } @@ -267,7 +267,8 @@ object ServiceDiscovery extends Logging { def getServiceNodesInfo( zkClient: CuratorFramework, namespace: String, - sizeOpt: Option[Int] = None): Seq[ServiceNodeInfo] = { + sizeOpt: Option[Int] = None, + silent: Boolean = false): Seq[ServiceNodeInfo] = { try { val hosts = zkClient.getChildren.forPath(namespace) val size = sizeOpt.getOrElse(hosts.size()) @@ -282,9 +283,10 @@ object ServiceDiscovery extends Logging { ServiceNodeInfo(namespace, p, host, port, version) } } catch { + case _: Exception if silent => Nil case e: Exception => error(s"Failed to get service node info", e) - Seq.empty + Nil } } @@ -303,7 +305,7 @@ object ServiceDiscovery extends Logging { .withMode(PERSISTENT) .forPath(ns) } catch { - case _: NodeExistsException => // do nothing + case _: NodeExistsException => // do nothing case e: KeeperException => throw new KyuubiException(s"Failed to create namespace '$ns'", e) } @@ -343,30 +345,6 @@ object ServiceDiscovery extends Logging { } serviceNode } - - def withLock( - zkClient: CuratorFramework, - lockPath: String, - lockTimeout: Long)(f: => Unit): Unit = { - var lock: InterProcessSemaphoreMutex = null - try { - try { - lock = new InterProcessSemaphoreMutex(zkClient, ZKPaths.makePath(lockPath, "lock")) - lock.acquire(lockTimeout, TimeUnit.MILLISECONDS) - } catch { - case e: Exception => throw KyuubiSQLException(s"Lock failed on path [$lockPath]", e) - } - f - } finally { - try { - if (lock != null) { - lock.release() - } - } catch { - case _: Exception => - } - } - } } case class ServiceNodeInfo( diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineName.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineName.scala deleted file mode 100644 index ff143032c..000000000 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineName.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.engine - -import org.apache.curator.utils.ZKPaths - -import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, ShareLevel} -import org.apache.kyuubi.session.SessionHandle - -/** - * The default engine name, kyuubi_[USER|CONNECTION|SERVER]_username_subdomain?_sessionId - * - * @param shareLevel Share level of the engine - * @param user Launch user of the engine - * @param sessionId Id of the corresponding session in which the engine is created - */ -private[kyuubi] class EngineName private ( - shareLevel: ShareLevel, - user: String, - sessionId: String, - subDomain: Option[String]) { - - val defaultEngineName: String = shareLevel match { - case CONNECTION => s"kyuubi_${shareLevel}_${user}_$sessionId" - case _ => subDomain match { - case Some(domain) => s"kyuubi_${shareLevel}_${user}_${domain}_$sessionId" - case _ => s"kyuubi_${shareLevel}_${user}_$sessionId" - } - } - - def getEngineSpace(prefix: String): String = { - shareLevel match { - case CONNECTION => ZKPaths.makePath(s"${prefix}_$shareLevel", user, sessionId) - case _ => subDomain match { - case Some(domain) => ZKPaths.makePath(s"${prefix}_$shareLevel", user, domain) - case None => ZKPaths.makePath(s"${prefix}_$shareLevel", user) - } - } - } - - def getZkLockPath(prefix: String): String = { - assert(shareLevel != CONNECTION) - subDomain match { - case Some(domain) => ZKPaths.makePath(s"${prefix}_$shareLevel", "lock", user, domain) - case None => ZKPaths.makePath(s"${prefix}_$shareLevel", "lock", user) - } - } -} - -private[kyuubi] object EngineName { - def apply( - shareLevel: ShareLevel, - user: String, - handle: SessionHandle, - subDomain: Option[String]): EngineName = { - new EngineName(shareLevel, user, handle.identifier.toString, subDomain) - } -} diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala new file mode 100644 index 000000000..13c6af7ef --- /dev/null +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine + +import java.util.concurrent.TimeUnit + +import com.codahale.metrics.MetricRegistry +import com.google.common.annotations.VisibleForTesting +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex +import org.apache.curator.utils.ZKPaths + +import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils} +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.{ENGINE_INIT_TIMEOUT, ENGINE_SHARE_LEVEL, ENGINE_SHARE_LEVEL_SUB_DOMAIN} +import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, SERVER, ShareLevel} +import org.apache.kyuubi.engine.spark.SparkProcessBuilder +import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE +import org.apache.kyuubi.ha.client.ServiceDiscovery.getServerHost +import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, ENGINE_TOTAL} +import org.apache.kyuubi.metrics.MetricsSystem +import org.apache.kyuubi.session.SessionHandle + +/** + * The description and functionality of an engine at server side + * + * @param conf Engine configuration + * @param user Caller of the engine + * @param sessionId Id of the corresponding session in which the engine is created + */ +private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionId: String) + extends Logging { + + // The corresponding ServerSpace where the engine belongs to + private val serverSpace: String = conf.get(HA_ZK_NAMESPACE) + + private val timeout: Long = conf.get(ENGINE_INIT_TIMEOUT) + + // Share level of the engine + private val shareLevel: ShareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL)) + + private val subDomain: Option[String] = conf.get(ENGINE_SHARE_LEVEL_SUB_DOMAIN) + + // Launcher of the engine + private val appUser: String = shareLevel match { + case SERVER => Utils.currentUser + case _ => user + } + + /** + * The default engine name, used as default `spark.app.name` if not set + */ + @VisibleForTesting + private[kyuubi] val defaultEngineName: String = shareLevel match { + case CONNECTION => s"kyuubi_${shareLevel}_${appUser}_$sessionId" + case _ => subDomain match { + case Some(domain) => s"kyuubi_${shareLevel}_${appUser}_${domain}_$sessionId" + case _ => s"kyuubi_${shareLevel}_${appUser}_$sessionId" + } + } + + /** + * The EngineSpace used to expose itself to the KyuubiServers in `serverSpace` + * + * For `CONNECTION` share level: + * /$serverSpace_CONNECTION/$user/$sessionId + * For `USER` share level: + * /$serverSpace_USER/$user[/$subDomain] + * + */ + @VisibleForTesting + private[kyuubi] lazy val engineSpace: String = shareLevel match { + case CONNECTION => ZKPaths.makePath(s"${serverSpace}_$shareLevel", appUser, sessionId) + case _ => subDomain match { + case Some(domain) => ZKPaths.makePath(s"${serverSpace}_$shareLevel", appUser, domain) + case None => ZKPaths.makePath(s"${serverSpace}_$shareLevel", appUser) + } + } + + /** + * The distributed lock path used to ensure only once engine being created for non-CONNECTION + * share level. + */ + private def tryWithLock[T](zkClient: CuratorFramework)(f: => T): T = shareLevel match { + case CONNECTION => f + case _ => + val lockPath = + ZKPaths.makePath(s"${serverSpace}_$shareLevel", "lock", appUser, subDomain.orNull) + var lock: InterProcessSemaphoreMutex = null + try { + try { + lock = new InterProcessSemaphoreMutex(zkClient, lockPath) + // Acquire a lease. If no leases are available, this method blocks until either the + // maximum number of leases is increased or another client/process closes a lease + lock.acquire(timeout, TimeUnit.MILLISECONDS) + } catch { + case e: Exception => throw KyuubiSQLException(s"Lock failed on path [$lockPath]", e) + } + f + } finally { + try { + if (lock != null) { + lock.release() + } + } catch { + case _: Exception => + } + } + } + + private def get(zkClient: CuratorFramework): Option[(String, Int)] = { + getServerHost(zkClient, engineSpace) + } + + private def create(zkClient: CuratorFramework): (String, Int) = tryWithLock(zkClient) { + var engineRef = get(zkClient) + // Get the engine address ahead if another process has succeeded + if (engineRef.nonEmpty) return engineRef.get + + conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName) + // tag is a seq type with comma-separated + conf.set(SparkProcessBuilder.TAG_KEY, + conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI") + conf.set(HA_ZK_NAMESPACE, engineSpace) + val builder = new SparkProcessBuilder(appUser, conf) + MetricsSystem.tracing(_.incAndGetCount(ENGINE_TOTAL)) + try { + info(s"Launching engine:\n$builder") + val process = builder.start + val started = System.currentTimeMillis() + var exitValue: Option[Int] = None + while (engineRef.isEmpty) { + if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) { + exitValue = Some(process.exitValue()) + if (exitValue.get != 0) { + val error = builder.getError + MetricsSystem.tracing { ms => + ms.incAndGetCount(MetricRegistry.name(ENGINE_FAIL, appUser)) + ms.incAndGetCount( + MetricRegistry.name(ENGINE_FAIL, error.getClass.getSimpleName)) + } + throw error + } + } + if (started + timeout <= System.currentTimeMillis()) { + process.destroyForcibly() + MetricsSystem.tracing(_.incAndGetCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser))) + throw KyuubiSQLException( + s"Timeout($timeout) to launched Spark with $builder", + builder.getError) + } + engineRef = get(zkClient) + } + engineRef.get + } finally { + // we must close the process builder whether session open is success or failure since + // we have a log capture thread in process builder. + builder.close() + } + } + + /** + * Get the engine ref from engine space first first or create a new one + */ + def getOrCreate(zkClient: CuratorFramework): (String, Int) = { + get(zkClient).getOrElse { + create(zkClient) + } + } +} + +private[kyuubi] object EngineRef { + def apply(conf: KyuubiConf, user: String, handle: SessionHandle): EngineRef = { + new EngineRef(conf, user, handle.identifier.toString) + } +} diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index 4dd12fe8a..14be45d2c 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -17,8 +17,6 @@ package org.apache.kyuubi.session -import java.util.concurrent.TimeUnit - import scala.collection.JavaConverters._ import com.codahale.metrics.MetricRegistry @@ -27,13 +25,10 @@ import org.apache.thrift.TException import org.apache.thrift.protocol.TBinaryProtocol import org.apache.thrift.transport.{TSocket, TTransport} -import org.apache.kyuubi.{KyuubiSQLException, Utils} +import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ -import org.apache.kyuubi.engine.{EngineName, ShareLevel} -import org.apache.kyuubi.engine.ShareLevel.{SERVER, ShareLevel} -import org.apache.kyuubi.engine.spark.SparkProcessBuilder -import org.apache.kyuubi.ha.HighAvailabilityConf._ +import org.apache.kyuubi.engine.EngineRef import org.apache.kyuubi.ha.client.ServiceDiscovery._ import org.apache.kyuubi.metrics.MetricsConstants._ import org.apache.kyuubi.metrics.MetricsSystem @@ -55,28 +50,12 @@ class KyuubiSessionImpl( case (key, value) => sessionConf.set(key, value) } - private val shareLevel: ShareLevel = ShareLevel.withName(sessionConf.get(ENGINE_SHARE_LEVEL)) - private val subDomain: Option[String] = sessionConf.get(ENGINE_SHARE_LEVEL_SUB_DOMAIN) - - private val appUser: String = shareLevel match { - case SERVER => Utils.currentUser - case _ => user - } - - private val zkNamespace: String = sessionConf.get(HA_ZK_NAMESPACE) - - private val boundAppName: EngineName = EngineName(shareLevel, appUser, handle, subDomain) - - private val appZkNamespace: String = boundAppName.getEngineSpace(zkNamespace) - - private val timeout: Long = sessionConf.get(ENGINE_INIT_TIMEOUT) + private val engine: EngineRef = EngineRef(sessionConf, user, handle) private var transport: TTransport = _ private var client: TCLIService.Client = _ private var remoteSessionHandle: TSessionHandle = _ - private def appZkLockPath: String = boundAppName.getZkLockPath(zkNamespace) - override def open(): Unit = { MetricsSystem.tracing { ms => ms.incAndGetCount(CONN_TOTAL) @@ -84,61 +63,8 @@ class KyuubiSessionImpl( } super.open() withZkClient(sessionConf) { zkClient => - logSessionInfo(s"Connected to Zookeeper") - def tryOpenSession: Unit = getServerHost(zkClient, appZkNamespace) match { - case Some((host, port)) => openSession(host, port) - case None => - sessionConf.setIfMissing(SparkProcessBuilder.APP_KEY, boundAppName.defaultEngineName) - // tag is a seq type with comma-separated - sessionConf.set(SparkProcessBuilder.TAG_KEY, - sessionConf.getOption(SparkProcessBuilder.TAG_KEY) - .map(_ + ",").getOrElse("") + "KYUUBI") - sessionConf.set(HA_ZK_NAMESPACE, appZkNamespace) - val builder = new SparkProcessBuilder(appUser, sessionConf) - MetricsSystem.tracing(_.incAndGetCount(ENGINE_TOTAL)) - try { - logSessionInfo(s"Launching engine:\n$builder") - val process = builder.start - var sh = getServerHost(zkClient, appZkNamespace) - val started = System.currentTimeMillis() - var exitValue: Option[Int] = None - while (sh.isEmpty) { - if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) { - exitValue = Some(process.exitValue()) - if (exitValue.get != 0) { - val error = builder.getError - MetricsSystem.tracing { ms => - ms.incAndGetCount(MetricRegistry.name(ENGINE_FAIL, user)) - ms.incAndGetCount( - MetricRegistry.name(ENGINE_FAIL, error.getClass.getSimpleName)) - } - throw error - } - } - if (started + timeout <= System.currentTimeMillis()) { - process.destroyForcibly() - MetricsSystem.tracing(_.incAndGetCount(MetricRegistry.name(ENGINE_TIMEOUT, user))) - throw KyuubiSQLException(s"Timed out($timeout ms) to launched Spark with $builder", - builder.getError) - } - sh = getServerHost(zkClient, appZkNamespace) - } - val Some((host, port)) = sh - openSession(host, port) - } finally { - // we must close the process builder whether session open is success or failure since - // we have a log capture thread in process builder. - builder.close() - } - } - // Add lock for creating engine except ShareLevel of CONNECTION - if (shareLevel != ShareLevel.CONNECTION) { - withLock(zkClient, appZkLockPath, timeout) { - tryOpenSession - } - } else { - tryOpenSession - } + val (host, port) = engine.getOrCreate(zkClient) + openSession(host, port) } } diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/EngineNameSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/EngineNameSuite.scala deleted file mode 100644 index f18c378c6..000000000 --- a/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/EngineNameSuite.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.engine - -import org.apache.curator.utils.ZKPaths -import org.apache.hive.service.rpc.thrift.TProtocolVersion - -import org.apache.kyuubi.{KyuubiFunSuite, Utils} -import org.apache.kyuubi.session.SessionHandle - -class EngineNameSuite extends KyuubiFunSuite { - import ShareLevel._ - - test(s"${CONNECTION} shared level engine name") { - val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) - val user = Utils.currentUser - Seq(Some("suffix"), None).foreach { maybeSubDomain => - val appName = EngineName(CONNECTION, user, id, maybeSubDomain) - assert(appName.getEngineSpace("kyuubi") === - ZKPaths.makePath(s"kyuubi_$CONNECTION", user, id.identifier.toString)) - assert(appName.defaultEngineName === s"kyuubi_${CONNECTION}_${user}_${id.identifier}") - intercept[AssertionError](appName.getZkLockPath("kyuubi")) - } - } - - test(s"${USER} shared level engine name") { - val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) - val user = Utils.currentUser - val appName = EngineName(USER, user, id, None) - assert(appName.getEngineSpace("kyuubi") === - ZKPaths.makePath(s"kyuubi_$USER", user)) - assert(appName.defaultEngineName === s"kyuubi_${USER}_${user}_${id.identifier}") - assert(appName.getZkLockPath("kyuubi") === s"/kyuubi_${USER}/lock/$user") - - val appName2 = EngineName(USER, user, id, Some("abc")) - assert(appName2.getEngineSpace("kyuubi") === - ZKPaths.makePath(s"kyuubi_$USER", user, "abc")) - assert(appName2.defaultEngineName === s"kyuubi_${USER}_${user}_abc_${id.identifier}") - assert(appName2.getZkLockPath("kyuubi") === s"/kyuubi_${USER}/lock/$user/abc") - } - - test(s"${SERVER} shared level engine name") { - val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) - val user = Utils.currentUser - val appName = EngineName(SERVER, user, id, None) - assert(appName.getEngineSpace("kyuubi") === - ZKPaths.makePath(s"kyuubi_$SERVER", user)) - assert(appName.defaultEngineName === s"kyuubi_${SERVER}_${user}_${id.identifier}") - assert(appName.getZkLockPath("kyuubi") === s"/kyuubi_${SERVER}/lock/$user") - - - val appName2 = EngineName(SERVER, user, id, Some("abc")) - assert(appName2.getEngineSpace("kyuubi") === - ZKPaths.makePath(s"kyuubi_$SERVER", user, "abc")) - assert(appName2.defaultEngineName === s"kyuubi_${SERVER}_${user}_abc_${id.identifier}") - assert(appName2.getZkLockPath("kyuubi") === s"/kyuubi_${SERVER}/lock/$user/abc") - } -} diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala new file mode 100644 index 000000000..ac955b27e --- /dev/null +++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine + +import org.apache.curator.utils.ZKPaths +import org.apache.hive.service.rpc.thrift.TProtocolVersion +import org.scalatest.time.SpanSugar.convertIntToGrainOfTime + +import org.apache.kyuubi.{KyuubiFunSuite, Utils} +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.ha.HighAvailabilityConf +import org.apache.kyuubi.ha.client.ServiceDiscovery +import org.apache.kyuubi.session.SessionHandle +import org.apache.kyuubi.util.NamedThreadFactory +import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf} + +class EngineRefSuite extends KyuubiFunSuite { + import ShareLevel._ + private val zkServer = new EmbeddedZookeeper + private val conf = KyuubiConf() + val user = Utils.currentUser + + override def beforeAll(): Unit = { + val zkData = Utils.createTempDir() + conf.set(ZookeeperConf.ZK_DATA_DIR, zkData.toString) + .set(ZookeeperConf.ZK_CLIENT_PORT, 0) + zkServer.initialize(conf) + zkServer.start() + super.beforeAll() + } + + override def afterAll(): Unit = { + zkServer.stop() + super.afterAll() + } + + test(s"${CONNECTION} shared level engine name") { + val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) + Seq(None, Some("suffix")).foreach { domain => + conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, CONNECTION.toString) + domain.foreach(conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN.key, _)) + val engine = EngineRef(conf, user, id) + assert(engine.engineSpace === + ZKPaths.makePath(s"kyuubi_$CONNECTION", user, id.identifier.toString)) + assert(engine.defaultEngineName === s"kyuubi_${CONNECTION}_${user}_${id.identifier}") + } + } + + test(s"${USER} shared level engine name") { + val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) + conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString) + conf.unset(KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN) + val appName = EngineRef(conf, user, id) + assert(appName.engineSpace === ZKPaths.makePath(s"kyuubi_$USER", user)) + assert(appName.defaultEngineName === s"kyuubi_${USER}_${user}_${id.identifier}") + + conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN.key, "abc") + val appName2 = EngineRef(conf, user, id) + assert(appName2.engineSpace === + ZKPaths.makePath(s"kyuubi_$USER", user, "abc")) + assert(appName2.defaultEngineName === s"kyuubi_${USER}_${user}_abc_${id.identifier}") + } + + test(s"${SERVER} shared level engine name") { + val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) + conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, SERVER.toString) + conf.unset(KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN) + val appName = EngineRef(conf, user, id) + assert(appName.engineSpace === + ZKPaths.makePath(s"kyuubi_$SERVER", user)) + assert(appName.defaultEngineName === s"kyuubi_${SERVER}_${user}_${id.identifier}") + + conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN.key, "abc") + val appName2 = EngineRef(conf, user, id) + assert(appName2.engineSpace === + ZKPaths.makePath(s"kyuubi_$SERVER", user, "abc")) + assert(appName2.defaultEngineName === s"kyuubi_${SERVER}_${user}_abc_${id.identifier}") + } + + test("start and get engine address with lock") { + val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) + conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString) + conf.set(KyuubiConf.FRONTEND_BIND_PORT, 0) + conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test") + conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString) + val engine = EngineRef(conf, user, id) + + var port1 = 0 + var port2 = 0 + + val r1 = new Runnable { + override def run(): Unit = { + ServiceDiscovery.withZkClient(conf) { client => + val hp = engine.getOrCreate(client) + port1 = hp._2 + } + } + } + + val r2 = new Runnable { + override def run(): Unit = { + ServiceDiscovery.withZkClient(conf) { client => + val hp = engine.getOrCreate(client) + port2 = hp._2 + } + } + } + val factory = new NamedThreadFactory("engine-test", false) + val thread1 = factory.newThread(r1) + val thread2 = factory.newThread(r2) + thread1.start() + thread2.start() + + eventually(timeout(90.seconds), interval(1.second)) { + assert(port1 != 0, "engine started") + assert(port2 == port1, "engine shared") + } + } +}