From 32b18ba19c969206d3ce03637560ff84f88009d2 Mon Sep 17 00:00:00 2001 From: timothy65535 Date: Thu, 19 Aug 2021 11:10:21 +0800 Subject: [PATCH] [KYUUBI #933] Enhance the detection mechanism for engine startup ### 1. Description EngineRef use exists znode to determine whether engine started successfully or not(use the last znode), there are inconsistencies in this. So, take an idea to improve the detection mechanism by adding the started sessionid to the znone. `EngineRef` will check the sessionid in a loop after a engine started. ### 2. Here are two options **Option I, add sesssionid to the data of znode** ![image](https://user-images.githubusercontent.com/86483005/128994276-5c9c7839-0a21-4253-8989-110b88ac0e74.png) **Option II, add sessionid to the path of znode** ![image](https://user-images.githubusercontent.com/86483005/129527282-46f7f726-e110-4d87-9272-dda9b9f325bf.png) ### 3. Solution In order to be consistent with the design of server znode, prefer to choose _Option II_ . **Demo** ``` /kyuubi - serviceUri=bigdata:10009;version=1.3.0-SNAPSHOT;sequence=0000000000 /kyuubi_USER/test - serviceUri=bigdata:39869;version=1.3.0-SNAPSHOT;session=8178069f-0b22-4d06-b1c0-908094769397;sequence=0000000000] ``` Closes #935 from timothy65535/ky-933. Closes #933 ab5d6d80 [timothy65535] add issue id d21cb990 [timothy65535] compatible with back 568acf21 [timothy65535] fix error 3db8ba7f [timothy65535] move EngineRef get to Discovery 827f3780 [timothy65535] rename get to getEngineBySessionId 88229314 [timothy65535] update session id using internal ab06adf8 [timothy65535] improve EngineRef get logic 6a1cc39a [timothy65535] update session version c94b6e55 [timothy65535] rename sessionId to createSessionId 0cc6aae2 [timothy65535] update ha conf d24a152f [timothy65535] update ha conf 1a67c864 [timothy65535] fix initialize sql suite b3b502de [timothy65535] [KYUUBI #933] Enhance the detection mechanism for engine startup Authored-by: timothy65535 Signed-off-by: Kent Yao --- .../apache/kyuubi/ctl/ServiceControlCli.scala | 2 +- .../kyuubi/ctl/ServiceControlCliSuite.scala | 39 +++++++-------- .../kyuubi/ha/HighAvailabilityConf.scala | 11 ++++- .../kyuubi/ha/client/ServiceDiscovery.scala | 47 +++++++++++-------- .../org/apache/kyuubi/engine/EngineRef.scala | 19 ++++---- .../engine/spark/InitializeSQLSuite.scala | 8 ++-- 6 files changed, 72 insertions(+), 54 deletions(-) diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala index dd27f099f..cd8af4e0e 100644 --- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala +++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala @@ -94,7 +94,7 @@ private[kyuubi] class ServiceControlCli extends Logging { currentServerNodes.foreach { sn => info(s"Exposing server instance:${sn.instance} with version:${sn.version}" + s" from $fromNamespace to $toNamespace") - val newNode = createZkServiceNode( + val newNode = createServiceNode( kyuubiConf, zc, args.cliArgs.namespace, sn.instance, sn.version, true) exposedServiceNodes += sn.copy( namespace = toNamespace, diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala index 2e4655ffb..4ecdf4a41 100644 --- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala +++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala @@ -159,7 +159,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { test("test render zookeeper service node info") { val title = "test render" - val nodes = Seq(ServiceNodeInfo("/kyuubi", "serviceNode", "localhost", 10000, Some("version"))) + val nodes = Seq( + ServiceNodeInfo("/kyuubi", "serviceNode", "localhost", 10000, Some("version"), None)) val renderedInfo = renderServiceNodesInfo(title, nodes, true) val expected = { s"\n $title " + @@ -187,8 +188,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { System.setProperty(HA_ZK_NAMESPACE.key, uniqueNamespace) withZkClient(conf) { framework => - createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000") - createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001") + createServiceNode(conf, framework, uniqueNamespace, "localhost:10000") + createServiceNode(conf, framework, uniqueNamespace, "localhost:10001") val newNamespace = getUniqueNamespace() val args = Array( @@ -198,8 +199,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { ) val expectedCreatedNodes = Seq( - ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION)), - ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION)) + ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None), + ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None) ) testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedCreatedNodes, false)) @@ -244,8 +245,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { .set(KyuubiConf.FRONTEND_BIND_PORT, 0) withZkClient(conf) { framework => - createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000") - createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001") + createServiceNode(conf, framework, uniqueNamespace, "localhost:10000") + createServiceNode(conf, framework, uniqueNamespace, "localhost:10001") val args = Array( "list", "server", @@ -254,8 +255,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { ) val expectedNodes = Seq( - ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION)), - ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION)) + ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None), + ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None) ) testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, false)) @@ -272,8 +273,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { .set(KyuubiConf.FRONTEND_BIND_PORT, 0) withZkClient(conf) { framework => - createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000") - createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001") + createServiceNode(conf, framework, uniqueNamespace, "localhost:10000") + createServiceNode(conf, framework, uniqueNamespace, "localhost:10001") val args = Array( "get", "server", @@ -284,7 +285,7 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { ) val expectedNodes = Seq( - ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION)) + ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None) ) testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, false)) @@ -302,8 +303,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { withZkClient(conf) { framework => withZkClient(conf) { zc => - createZkServiceNode(conf, zc, uniqueNamespace, "localhost:10000", external = true) - createZkServiceNode(conf, zc, uniqueNamespace, "localhost:10001", external = true) + createServiceNode(conf, zc, uniqueNamespace, "localhost:10000", external = true) + createServiceNode(conf, zc, uniqueNamespace, "localhost:10001", external = true) } val args = Array( @@ -315,7 +316,7 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { ) val expectedDeletedNodes = Seq( - ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION)) + ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None) ) testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedDeletedNodes, false)) @@ -332,8 +333,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { .set(KyuubiConf.FRONTEND_BIND_PORT, 0) withZkClient(conf) { framework => - createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000") - createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001") + createServiceNode(conf, framework, uniqueNamespace, "localhost:10000") + createServiceNode(conf, framework, uniqueNamespace, "localhost:10001") val args = Array( "list", "server", @@ -343,8 +344,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { ) val expectedNodes = Seq( - ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION)), - ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION)) + ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None), + ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None) ) testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, true)) diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala index 94fd91678..fdee134d6 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala @@ -21,7 +21,7 @@ import java.time.Duration import org.apache.hadoop.security.UserGroupInformation -import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf} +import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf, OptionalConfigEntry} import org.apache.kyuubi.ha.client.RetryPolicies object HighAvailabilityConf { @@ -99,4 +99,13 @@ object HighAvailabilityConf { .timeConf .checkValue(_ > 0, "Must be positive") .createWithDefault(Duration.ofSeconds(120).toMillis) + + val HA_ZK_ENGINE_SESSION_ID: OptionalConfigEntry[String] = + buildConf("ha.engine.session.id") + .doc("The sessionId will be attached to zookeeper node when engine started, " + + "and the kyuubi server will check it cyclically.") + .internal + .version("1.4.0") + .stringConf + .createOptional } diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala index f67151a5f..df106bc41 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala @@ -98,7 +98,7 @@ abstract class ServiceDiscovery ( override def start(): Unit = { val instance = server.connectionUrl - _serviceNode = createZkServiceNode(conf, zkClient, namespace, instance) + _serviceNode = createServiceNode(conf, zkClient, namespace, instance) // Set a watch on the serviceNode val watcher = new DeRegisterWatcher if (zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath) == null) { @@ -170,6 +170,15 @@ object ServiceDiscovery extends Logging { } } + def getEngineBySessionId( + zkClient: CuratorFramework, + namespace: String, + sessionId: String): Option[(String, Int)] = { + getServiceNodesInfo(zkClient, namespace, silent = true) + .find(_.createSessionId.exists(_.equals(sessionId))) + .map(data => (data.host, data.port)) + } + def getServiceNodesInfo( zkClient: CuratorFramework, namespace: String, @@ -185,8 +194,9 @@ object ServiceDiscovery extends Logging { val host = strings.head val port = strings(1).toInt val version = p.split(";").find(_.startsWith("version=")).map(_.stripPrefix("version=")) + val sessionId = p.split(";").find(_.startsWith("session=")).map(_.stripPrefix("session=")) info(s"Get service instance:$instance and version:$version under $namespace") - ServiceNodeInfo(namespace, p, host, port, version) + ServiceNodeInfo(namespace, p, host, port, version, sessionId) } } catch { case _: Exception if silent => Nil @@ -196,7 +206,7 @@ object ServiceDiscovery extends Logging { } } - def createZkServiceNode( + def createServiceNode( conf: KyuubiConf, zkClient: CuratorFramework, namespace: String, @@ -215,26 +225,22 @@ object ServiceDiscovery extends Logging { case e: KeeperException => throw new KyuubiException(s"Failed to create namespace '$ns'", e) } + + val session = conf.get(HA_ZK_ENGINE_SESSION_ID) + .map(sid => s"session=$sid;").getOrElse("") val pathPrefix = ZKPaths.makePath( namespace, - s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};sequence=") + s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};${session}sequence=") var serviceNode: PersistentNode = null + val createMode = if (external) CreateMode.PERSISTENT_SEQUENTIAL + else CreateMode.EPHEMERAL_SEQUENTIAL try { - if (external) { - serviceNode = new PersistentNode( - zkClient, - CreateMode.PERSISTENT_SEQUENTIAL, - false, - pathPrefix, - instance.getBytes(StandardCharsets.UTF_8)) - } else { - serviceNode = new PersistentNode( - zkClient, - CreateMode.EPHEMERAL_SEQUENTIAL, - false, - pathPrefix, - instance.getBytes(StandardCharsets.UTF_8)) - } + serviceNode = new PersistentNode( + zkClient, + createMode, + false, + pathPrefix, + instance.getBytes(StandardCharsets.UTF_8)) serviceNode.start() val znodeTimeout = conf.get(HA_ZK_NODE_TIMEOUT) if (!serviceNode.waitForInitialCreate(znodeTimeout, TimeUnit.MILLISECONDS)) { @@ -258,6 +264,7 @@ case class ServiceNodeInfo( nodeName: String, host: String, port: Int, - version: Option[String]) { + version: Option[String], + createSessionId: Option[String]) { def instance: String = s"$host:$port" } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index 4ec387b97..ee34b2611 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -30,7 +30,9 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.{ENGINE_INIT_TIMEOUT, ENGINE_SHARE_LEVEL, ENGINE_SHARE_LEVEL_SUB_DOMAIN} import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, SERVER, ShareLevel} import org.apache.kyuubi.engine.spark.SparkProcessBuilder +import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_SESSION_ID import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE +import org.apache.kyuubi.ha.client.ServiceDiscovery.getEngineBySessionId import org.apache.kyuubi.ha.client.ServiceDiscovery.getServerHost import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, ENGINE_TOTAL} import org.apache.kyuubi.metrics.MetricsSystem @@ -123,12 +125,9 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI } } - private def get(zkClient: CuratorFramework): Option[(String, Int)] = { - getServerHost(zkClient, engineSpace) - } - private def create(zkClient: CuratorFramework): (String, Int) = tryWithLock(zkClient) { - var engineRef = get(zkClient) + // TODO: improve this after support engine pool. (KYUUBI #918) + var engineRef = getServerHost(zkClient, engineSpace) // Get the engine address ahead if another process has succeeded if (engineRef.nonEmpty) return engineRef.get @@ -137,6 +136,7 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI conf.set(SparkProcessBuilder.TAG_KEY, conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI") conf.set(HA_ZK_NAMESPACE, engineSpace) + conf.set(HA_ZK_ENGINE_SESSION_ID, sessionId) val builder = new SparkProcessBuilder(appUser, conf) MetricsSystem.tracing(_.incCount(ENGINE_TOTAL)) try { @@ -163,7 +163,7 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI s"Timeout($timeout ms) to launched Spark with $builder", builder.getError) } - engineRef = get(zkClient) + engineRef = getEngineBySessionId(zkClient, engineSpace, sessionId) } engineRef.get } finally { @@ -177,9 +177,10 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI * Get the engine ref from engine space first first or create a new one */ def getOrCreate(zkClient: CuratorFramework): (String, Int) = { - get(zkClient).getOrElse { - create(zkClient) - } + getServerHost(zkClient, engineSpace) + .getOrElse { + create(zkClient) + } } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala index e4e0eb472..c78927009 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala @@ -27,12 +27,12 @@ class InitializeSQLSuite extends WithKyuubiServer with JDBCTestUtils { KyuubiConf() .set(ENGINE_INITIALIZE_SQL.key, "CREATE DATABASE IF NOT EXISTS INIT_DB;" + - "CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" + - "INSERT OVERWRITE TABLE INIT_DB.test VALUES (1);") + "CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" + + "INSERT OVERWRITE TABLE INIT_DB.test VALUES (1);") .set(ENGINE_SESSION_INITIALIZE_SQL.key, "CREATE DATABASE IF NOT EXISTS INIT_DB;" + - "CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" + - "INSERT INTO INIT_DB.test VALUES (2);") + "CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" + + "INSERT INTO INIT_DB.test VALUES (2);") } override def afterAll(): Unit = {