diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index c3aa472fa..376aae4c2 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -191,6 +191,7 @@ kyuubi\.frontend
\.worker\.keepalive\.time|
<undefined>
|The sessionId will be attached to zookeeper node when engine started, and the kyuubi server will check it cyclically.
|string
|1.3.2
kyuubi\.ha\.zookeeper
\.acl\.enabled|false
|Set to true if the zookeeper ensemble is kerberized
|boolean
|1.0.0
kyuubi\.ha\.zookeeper
\.connection\.base\.retry
\.wait|1000
|Initial amount of time to wait between retries to the zookeeper ensemble
|int
|1.0.0
kyuubi\.ha\.zookeeper
\.connection\.max
\.retries|3
|Max retry times for connecting to the zookeeper ensemble
|int
|1.0.0
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala
index dd27f099f..cd8af4e0e 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala
@@ -94,7 +94,7 @@ private[kyuubi] class ServiceControlCli extends Logging {
currentServerNodes.foreach { sn =>
info(s"Exposing server instance:${sn.instance} with version:${sn.version}" +
s" from $fromNamespace to $toNamespace")
- val newNode = createZkServiceNode(
+ val newNode = createServiceNode(
kyuubiConf, zc, args.cliArgs.namespace, sn.instance, sn.version, true)
exposedServiceNodes += sn.copy(
namespace = toNamespace,
diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala
index 2e4655ffb..4ecdf4a41 100644
--- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala
+++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala
@@ -159,7 +159,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
test("test render zookeeper service node info") {
val title = "test render"
- val nodes = Seq(ServiceNodeInfo("/kyuubi", "serviceNode", "localhost", 10000, Some("version")))
+ val nodes = Seq(
+ ServiceNodeInfo("/kyuubi", "serviceNode", "localhost", 10000, Some("version"), None))
val renderedInfo = renderServiceNodesInfo(title, nodes, true)
val expected = {
s"\n $title " +
@@ -187,8 +188,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
System.setProperty(HA_ZK_NAMESPACE.key, uniqueNamespace)
withZkClient(conf) { framework =>
- createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
- createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
+ createServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
+ createServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
val newNamespace = getUniqueNamespace()
val args = Array(
@@ -198,8 +199,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
)
val expectedCreatedNodes = Seq(
- ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION)),
- ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION))
+ ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None),
+ ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None)
)
testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedCreatedNodes, false))
@@ -244,8 +245,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(KyuubiConf.FRONTEND_BIND_PORT, 0)
withZkClient(conf) { framework =>
- createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
- createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
+ createServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
+ createServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
val args = Array(
"list", "server",
@@ -254,8 +255,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
)
val expectedNodes = Seq(
- ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION)),
- ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION))
+ ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None),
+ ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None)
)
testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, false))
@@ -272,8 +273,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(KyuubiConf.FRONTEND_BIND_PORT, 0)
withZkClient(conf) { framework =>
- createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
- createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
+ createServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
+ createServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
val args = Array(
"get", "server",
@@ -284,7 +285,7 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
)
val expectedNodes = Seq(
- ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION))
+ ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None)
)
testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, false))
@@ -302,8 +303,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
withZkClient(conf) { framework =>
withZkClient(conf) { zc =>
- createZkServiceNode(conf, zc, uniqueNamespace, "localhost:10000", external = true)
- createZkServiceNode(conf, zc, uniqueNamespace, "localhost:10001", external = true)
+ createServiceNode(conf, zc, uniqueNamespace, "localhost:10000", external = true)
+ createServiceNode(conf, zc, uniqueNamespace, "localhost:10001", external = true)
}
val args = Array(
@@ -315,7 +316,7 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
)
val expectedDeletedNodes = Seq(
- ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION))
+ ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None)
)
testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedDeletedNodes, false))
@@ -332,8 +333,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(KyuubiConf.FRONTEND_BIND_PORT, 0)
withZkClient(conf) { framework =>
- createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
- createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
+ createServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
+ createServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
val args = Array(
"list", "server",
@@ -343,8 +344,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
)
val expectedNodes = Seq(
- ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION)),
- ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION))
+ ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None),
+ ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None)
)
testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, true))
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
index 94fd91678..9cc829a2b 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
@@ -21,7 +21,7 @@ import java.time.Duration
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf}
+import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf, OptionalConfigEntry}
import org.apache.kyuubi.ha.client.RetryPolicies
object HighAvailabilityConf {
@@ -99,4 +99,12 @@ object HighAvailabilityConf {
.timeConf
.checkValue(_ > 0, "Must be positive")
.createWithDefault(Duration.ofSeconds(120).toMillis)
+
+ val HA_ZK_ENGINE_REF_ID: OptionalConfigEntry[String] =
+ buildConf("ha.engine.session.id")
+ .doc("The sessionId will be attached to zookeeper node when engine started, " +
+ "and the kyuubi server will check it cyclically.")
+ .version("1.3.2")
+ .stringConf
+ .createOptional
}
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
index f67151a5f..8df1878c5 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
@@ -98,7 +98,7 @@ abstract class ServiceDiscovery (
override def start(): Unit = {
val instance = server.connectionUrl
- _serviceNode = createZkServiceNode(conf, zkClient, namespace, instance)
+ _serviceNode = createServiceNode(conf, zkClient, namespace, instance)
// Set a watch on the serviceNode
val watcher = new DeRegisterWatcher
if (zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath) == null) {
@@ -170,6 +170,15 @@ object ServiceDiscovery extends Logging {
}
}
+ def getEngineBySessionId(
+ zkClient: CuratorFramework,
+ namespace: String,
+ sessionId: String): Option[(String, Int)] = {
+ getServiceNodesInfo(zkClient, namespace, silent = true)
+ .find(_.createSessionId.exists(_.equals(sessionId)))
+ .map(data => (data.host, data.port))
+ }
+
def getServiceNodesInfo(
zkClient: CuratorFramework,
namespace: String,
@@ -185,8 +194,9 @@ object ServiceDiscovery extends Logging {
val host = strings.head
val port = strings(1).toInt
val version = p.split(";").find(_.startsWith("version=")).map(_.stripPrefix("version="))
+ val sessionId = p.split(";").find(_.startsWith("session=")).map(_.stripPrefix("session="))
info(s"Get service instance:$instance and version:$version under $namespace")
- ServiceNodeInfo(namespace, p, host, port, version)
+ ServiceNodeInfo(namespace, p, host, port, version, sessionId)
}
} catch {
case _: Exception if silent => Nil
@@ -196,7 +206,7 @@ object ServiceDiscovery extends Logging {
}
}
- def createZkServiceNode(
+ def createServiceNode(
conf: KyuubiConf,
zkClient: CuratorFramework,
namespace: String,
@@ -215,26 +225,22 @@ object ServiceDiscovery extends Logging {
case e: KeeperException =>
throw new KyuubiException(s"Failed to create namespace '$ns'", e)
}
+
+ val session = conf.get(HA_ZK_ENGINE_REF_ID)
+ .map(sid => s"session=$sid;").getOrElse("")
val pathPrefix = ZKPaths.makePath(
namespace,
- s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};sequence=")
+ s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};${session}sequence=")
var serviceNode: PersistentNode = null
+ val createMode = if (external) CreateMode.PERSISTENT_SEQUENTIAL
+ else CreateMode.EPHEMERAL_SEQUENTIAL
try {
- if (external) {
- serviceNode = new PersistentNode(
- zkClient,
- CreateMode.PERSISTENT_SEQUENTIAL,
- false,
- pathPrefix,
- instance.getBytes(StandardCharsets.UTF_8))
- } else {
- serviceNode = new PersistentNode(
- zkClient,
- CreateMode.EPHEMERAL_SEQUENTIAL,
- false,
- pathPrefix,
- instance.getBytes(StandardCharsets.UTF_8))
- }
+ serviceNode = new PersistentNode(
+ zkClient,
+ createMode,
+ false,
+ pathPrefix,
+ instance.getBytes(StandardCharsets.UTF_8))
serviceNode.start()
val znodeTimeout = conf.get(HA_ZK_NODE_TIMEOUT)
if (!serviceNode.waitForInitialCreate(znodeTimeout, TimeUnit.MILLISECONDS)) {
@@ -258,6 +264,7 @@ case class ServiceNodeInfo(
nodeName: String,
host: String,
port: Int,
- version: Option[String]) {
+ version: Option[String],
+ createSessionId: Option[String]) {
def instance: String = s"$host:$port"
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 4ec387b97..ccac30930 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -30,7 +30,9 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_INIT_TIMEOUT, ENGINE_SHARE_LEVEL, ENGINE_SHARE_LEVEL_SUB_DOMAIN}
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, SERVER, ShareLevel}
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
+import org.apache.kyuubi.ha.client.ServiceDiscovery.getEngineBySessionId
import org.apache.kyuubi.ha.client.ServiceDiscovery.getServerHost
import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, ENGINE_TOTAL}
import org.apache.kyuubi.metrics.MetricsSystem
@@ -123,12 +125,9 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI
}
}
- private def get(zkClient: CuratorFramework): Option[(String, Int)] = {
- getServerHost(zkClient, engineSpace)
- }
-
private def create(zkClient: CuratorFramework): (String, Int) = tryWithLock(zkClient) {
- var engineRef = get(zkClient)
+ // TODO: improve this after support engine pool. (KYUUBI #918)
+ var engineRef = getServerHost(zkClient, engineSpace)
// Get the engine address ahead if another process has succeeded
if (engineRef.nonEmpty) return engineRef.get
@@ -137,6 +136,7 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI
conf.set(SparkProcessBuilder.TAG_KEY,
conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
conf.set(HA_ZK_NAMESPACE, engineSpace)
+ conf.set(HA_ZK_ENGINE_REF_ID, sessionId)
val builder = new SparkProcessBuilder(appUser, conf)
MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
try {
@@ -163,7 +163,7 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI
s"Timeout($timeout ms) to launched Spark with $builder",
builder.getError)
}
- engineRef = get(zkClient)
+ engineRef = getEngineBySessionId(zkClient, engineSpace, sessionId)
}
engineRef.get
} finally {
@@ -177,9 +177,10 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI
* Get the engine ref from engine space first first or create a new one
*/
def getOrCreate(zkClient: CuratorFramework): (String, Int) = {
- get(zkClient).getOrElse {
- create(zkClient)
- }
+ getServerHost(zkClient, engineSpace)
+ .getOrElse {
+ create(zkClient)
+ }
}
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala
index e4e0eb472..c78927009 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala
@@ -27,12 +27,12 @@ class InitializeSQLSuite extends WithKyuubiServer with JDBCTestUtils {
KyuubiConf()
.set(ENGINE_INITIALIZE_SQL.key,
"CREATE DATABASE IF NOT EXISTS INIT_DB;" +
- "CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" +
- "INSERT OVERWRITE TABLE INIT_DB.test VALUES (1);")
+ "CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" +
+ "INSERT OVERWRITE TABLE INIT_DB.test VALUES (1);")
.set(ENGINE_SESSION_INITIALIZE_SQL.key,
"CREATE DATABASE IF NOT EXISTS INIT_DB;" +
- "CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" +
- "INSERT INTO INIT_DB.test VALUES (2);")
+ "CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" +
+ "INSERT INTO INIT_DB.test VALUES (2);")
}
override def afterAll(): Unit = {