From 7f45a0bc4b6e7ce74c75b22da6035791a77e70aa Mon Sep 17 00:00:00 2001 From: timothy65535 Date: Fri, 12 Nov 2021 11:47:15 +0800 Subject: [PATCH] [KYUUBI #1364] [BACKPORT] [KYUUBI #933] Enhance the detection mechanism for engine startup ### _Why are the changes needed?_ Sub task of #1361. #1176 is based on #933 . In order to backport #1176, #933 has to be backported first. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1364 from zhouyifan279/1361. Closes #1364 Closes #933 8cd735cc [ulysses-you] [BACKPORT] [KYUUBI #1176] InvalidACL appears in the engine when zookeeper acl is turned on 42d0adcc [timothy65535] [KYUUBI #933] Enhance the detection mechanism for engine startup Lead-authored-by: timothy65535 Co-authored-by: ulysses-you Signed-off-by: ulysses-you --- docs/deployment/settings.md | 1 + .../apache/kyuubi/ctl/ServiceControlCli.scala | 2 +- .../kyuubi/ctl/ServiceControlCliSuite.scala | 39 +++++++-------- .../kyuubi/ha/HighAvailabilityConf.scala | 10 +++- .../kyuubi/ha/client/ServiceDiscovery.scala | 47 +++++++++++-------- .../org/apache/kyuubi/engine/EngineRef.scala | 19 ++++---- .../engine/spark/InitializeSQLSuite.scala | 8 ++-- 7 files changed, 72 insertions(+), 54 deletions(-) diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index c3aa472fa..376aae4c2 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -191,6 +191,7 @@ kyuubi\.frontend
\.worker\.keepalive\.time|
<undefined>
|
The sessionId will be attached to zookeeper node when engine started, and the kyuubi server will check it cyclically.
|
string
|
1.3.2
kyuubi\.ha\.zookeeper
\.acl\.enabled|
false
|
Set to true if the zookeeper ensemble is kerberized
|
boolean
|
1.0.0
kyuubi\.ha\.zookeeper
\.connection\.base\.retry
\.wait|
1000
|
Initial amount of time to wait between retries to the zookeeper ensemble
|
int
|
1.0.0
kyuubi\.ha\.zookeeper
\.connection\.max
\.retries|
3
|
Max retry times for connecting to the zookeeper ensemble
|
int
|
1.0.0
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala index dd27f099f..cd8af4e0e 100644 --- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala +++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala @@ -94,7 +94,7 @@ private[kyuubi] class ServiceControlCli extends Logging { currentServerNodes.foreach { sn => info(s"Exposing server instance:${sn.instance} with version:${sn.version}" + s" from $fromNamespace to $toNamespace") - val newNode = createZkServiceNode( + val newNode = createServiceNode( kyuubiConf, zc, args.cliArgs.namespace, sn.instance, sn.version, true) exposedServiceNodes += sn.copy( namespace = toNamespace, diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala index 2e4655ffb..4ecdf4a41 100644 --- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala +++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala @@ -159,7 +159,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { test("test render zookeeper service node info") { val title = "test render" - val nodes = Seq(ServiceNodeInfo("/kyuubi", "serviceNode", "localhost", 10000, Some("version"))) + val nodes = Seq( + ServiceNodeInfo("/kyuubi", "serviceNode", "localhost", 10000, Some("version"), None)) val renderedInfo = renderServiceNodesInfo(title, nodes, true) val expected = { s"\n $title " + @@ -187,8 +188,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { System.setProperty(HA_ZK_NAMESPACE.key, uniqueNamespace) withZkClient(conf) { framework => - createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000") - createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001") + createServiceNode(conf, framework, uniqueNamespace, "localhost:10000") + createServiceNode(conf, framework, uniqueNamespace, "localhost:10001") val newNamespace = getUniqueNamespace() val args = Array( @@ -198,8 +199,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { ) val expectedCreatedNodes = Seq( - ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION)), - ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION)) + ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None), + ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None) ) testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedCreatedNodes, false)) @@ -244,8 +245,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { .set(KyuubiConf.FRONTEND_BIND_PORT, 0) withZkClient(conf) { framework => - createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000") - createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001") + createServiceNode(conf, framework, uniqueNamespace, "localhost:10000") + createServiceNode(conf, framework, uniqueNamespace, "localhost:10001") val args = Array( "list", "server", @@ -254,8 +255,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { ) val expectedNodes = Seq( - ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION)), - ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION)) + ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None), + ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None) ) testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, false)) @@ -272,8 +273,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { .set(KyuubiConf.FRONTEND_BIND_PORT, 0) withZkClient(conf) { framework => - createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000") - createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001") + createServiceNode(conf, framework, uniqueNamespace, "localhost:10000") + createServiceNode(conf, framework, uniqueNamespace, "localhost:10001") val args = Array( "get", "server", @@ -284,7 +285,7 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { ) val expectedNodes = Seq( - ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION)) + ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None) ) testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, false)) @@ -302,8 +303,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { withZkClient(conf) { framework => withZkClient(conf) { zc => - createZkServiceNode(conf, zc, uniqueNamespace, "localhost:10000", external = true) - createZkServiceNode(conf, zc, uniqueNamespace, "localhost:10001", external = true) + createServiceNode(conf, zc, uniqueNamespace, "localhost:10000", external = true) + createServiceNode(conf, zc, uniqueNamespace, "localhost:10001", external = true) } val args = Array( @@ -315,7 +316,7 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { ) val expectedDeletedNodes = Seq( - ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION)) + ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None) ) testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedDeletedNodes, false)) @@ -332,8 +333,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { .set(KyuubiConf.FRONTEND_BIND_PORT, 0) withZkClient(conf) { framework => - createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000") - createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001") + createServiceNode(conf, framework, uniqueNamespace, "localhost:10000") + createServiceNode(conf, framework, uniqueNamespace, "localhost:10001") val args = Array( "list", "server", @@ -343,8 +344,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { ) val expectedNodes = Seq( - ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION)), - ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION)) + ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None), + ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None) ) testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, true)) diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala index 94fd91678..9cc829a2b 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala @@ -21,7 +21,7 @@ import java.time.Duration import org.apache.hadoop.security.UserGroupInformation -import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf} +import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf, OptionalConfigEntry} import org.apache.kyuubi.ha.client.RetryPolicies object HighAvailabilityConf { @@ -99,4 +99,12 @@ object HighAvailabilityConf { .timeConf .checkValue(_ > 0, "Must be positive") .createWithDefault(Duration.ofSeconds(120).toMillis) + + val HA_ZK_ENGINE_REF_ID: OptionalConfigEntry[String] = + buildConf("ha.engine.session.id") + .doc("The sessionId will be attached to zookeeper node when engine started, " + + "and the kyuubi server will check it cyclically.") + .version("1.3.2") + .stringConf + .createOptional } diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala index f67151a5f..8df1878c5 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala @@ -98,7 +98,7 @@ abstract class ServiceDiscovery ( override def start(): Unit = { val instance = server.connectionUrl - _serviceNode = createZkServiceNode(conf, zkClient, namespace, instance) + _serviceNode = createServiceNode(conf, zkClient, namespace, instance) // Set a watch on the serviceNode val watcher = new DeRegisterWatcher if (zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath) == null) { @@ -170,6 +170,15 @@ object ServiceDiscovery extends Logging { } } + def getEngineBySessionId( + zkClient: CuratorFramework, + namespace: String, + sessionId: String): Option[(String, Int)] = { + getServiceNodesInfo(zkClient, namespace, silent = true) + .find(_.createSessionId.exists(_.equals(sessionId))) + .map(data => (data.host, data.port)) + } + def getServiceNodesInfo( zkClient: CuratorFramework, namespace: String, @@ -185,8 +194,9 @@ object ServiceDiscovery extends Logging { val host = strings.head val port = strings(1).toInt val version = p.split(";").find(_.startsWith("version=")).map(_.stripPrefix("version=")) + val sessionId = p.split(";").find(_.startsWith("session=")).map(_.stripPrefix("session=")) info(s"Get service instance:$instance and version:$version under $namespace") - ServiceNodeInfo(namespace, p, host, port, version) + ServiceNodeInfo(namespace, p, host, port, version, sessionId) } } catch { case _: Exception if silent => Nil @@ -196,7 +206,7 @@ object ServiceDiscovery extends Logging { } } - def createZkServiceNode( + def createServiceNode( conf: KyuubiConf, zkClient: CuratorFramework, namespace: String, @@ -215,26 +225,22 @@ object ServiceDiscovery extends Logging { case e: KeeperException => throw new KyuubiException(s"Failed to create namespace '$ns'", e) } + + val session = conf.get(HA_ZK_ENGINE_REF_ID) + .map(sid => s"session=$sid;").getOrElse("") val pathPrefix = ZKPaths.makePath( namespace, - s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};sequence=") + s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};${session}sequence=") var serviceNode: PersistentNode = null + val createMode = if (external) CreateMode.PERSISTENT_SEQUENTIAL + else CreateMode.EPHEMERAL_SEQUENTIAL try { - if (external) { - serviceNode = new PersistentNode( - zkClient, - CreateMode.PERSISTENT_SEQUENTIAL, - false, - pathPrefix, - instance.getBytes(StandardCharsets.UTF_8)) - } else { - serviceNode = new PersistentNode( - zkClient, - CreateMode.EPHEMERAL_SEQUENTIAL, - false, - pathPrefix, - instance.getBytes(StandardCharsets.UTF_8)) - } + serviceNode = new PersistentNode( + zkClient, + createMode, + false, + pathPrefix, + instance.getBytes(StandardCharsets.UTF_8)) serviceNode.start() val znodeTimeout = conf.get(HA_ZK_NODE_TIMEOUT) if (!serviceNode.waitForInitialCreate(znodeTimeout, TimeUnit.MILLISECONDS)) { @@ -258,6 +264,7 @@ case class ServiceNodeInfo( nodeName: String, host: String, port: Int, - version: Option[String]) { + version: Option[String], + createSessionId: Option[String]) { def instance: String = s"$host:$port" } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index 4ec387b97..ccac30930 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -30,7 +30,9 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.{ENGINE_INIT_TIMEOUT, ENGINE_SHARE_LEVEL, ENGINE_SHARE_LEVEL_SUB_DOMAIN} import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, SERVER, ShareLevel} import org.apache.kyuubi.engine.spark.SparkProcessBuilder +import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE +import org.apache.kyuubi.ha.client.ServiceDiscovery.getEngineBySessionId import org.apache.kyuubi.ha.client.ServiceDiscovery.getServerHost import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, ENGINE_TOTAL} import org.apache.kyuubi.metrics.MetricsSystem @@ -123,12 +125,9 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI } } - private def get(zkClient: CuratorFramework): Option[(String, Int)] = { - getServerHost(zkClient, engineSpace) - } - private def create(zkClient: CuratorFramework): (String, Int) = tryWithLock(zkClient) { - var engineRef = get(zkClient) + // TODO: improve this after support engine pool. (KYUUBI #918) + var engineRef = getServerHost(zkClient, engineSpace) // Get the engine address ahead if another process has succeeded if (engineRef.nonEmpty) return engineRef.get @@ -137,6 +136,7 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI conf.set(SparkProcessBuilder.TAG_KEY, conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI") conf.set(HA_ZK_NAMESPACE, engineSpace) + conf.set(HA_ZK_ENGINE_REF_ID, sessionId) val builder = new SparkProcessBuilder(appUser, conf) MetricsSystem.tracing(_.incCount(ENGINE_TOTAL)) try { @@ -163,7 +163,7 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI s"Timeout($timeout ms) to launched Spark with $builder", builder.getError) } - engineRef = get(zkClient) + engineRef = getEngineBySessionId(zkClient, engineSpace, sessionId) } engineRef.get } finally { @@ -177,9 +177,10 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI * Get the engine ref from engine space first first or create a new one */ def getOrCreate(zkClient: CuratorFramework): (String, Int) = { - get(zkClient).getOrElse { - create(zkClient) - } + getServerHost(zkClient, engineSpace) + .getOrElse { + create(zkClient) + } } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala index e4e0eb472..c78927009 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala @@ -27,12 +27,12 @@ class InitializeSQLSuite extends WithKyuubiServer with JDBCTestUtils { KyuubiConf() .set(ENGINE_INITIALIZE_SQL.key, "CREATE DATABASE IF NOT EXISTS INIT_DB;" + - "CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" + - "INSERT OVERWRITE TABLE INIT_DB.test VALUES (1);") + "CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" + + "INSERT OVERWRITE TABLE INIT_DB.test VALUES (1);") .set(ENGINE_SESSION_INITIALIZE_SQL.key, "CREATE DATABASE IF NOT EXISTS INIT_DB;" + - "CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" + - "INSERT INTO INIT_DB.test VALUES (2);") + "CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" + + "INSERT INTO INIT_DB.test VALUES (2);") } override def afterAll(): Unit = {