[KYUUBI #933] Enhance the detection mechanism for engine startup

### 1. Description

EngineRef use exists znode to determine whether engine started successfully or not(use the last znode), there are inconsistencies in this. So, take an idea to improve the detection mechanism by adding the started sessionid to the znone. `EngineRef` will check the sessionid in a loop after a engine started.

### 2. Here are two options
**Option I, add sesssionid to the data of znode**
![image](https://user-images.githubusercontent.com/86483005/128994276-5c9c7839-0a21-4253-8989-110b88ac0e74.png)

**Option II, add sessionid to the path of znode**
![image](https://user-images.githubusercontent.com/86483005/129527282-46f7f726-e110-4d87-9272-dda9b9f325bf.png)

### 3. Solution

In order to be consistent with the design of server znode, prefer to choose _Option II_ .

**Demo**
```
/kyuubi
- serviceUri=bigdata:10009;version=1.3.0-SNAPSHOT;sequence=0000000000
/kyuubi_USER/test
- serviceUri=bigdata:39869;version=1.3.0-SNAPSHOT;session=8178069f-0b22-4d06-b1c0-908094769397;sequence=0000000000]
```

Closes #935 from timothy65535/ky-933.

Closes #933

ab5d6d80 [timothy65535] add issue id
d21cb990 [timothy65535] compatible with back
568acf21 [timothy65535] fix error
3db8ba7f [timothy65535] move EngineRef get to Discovery
827f3780 [timothy65535] rename get to getEngineBySessionId
88229314 [timothy65535] update session id using internal
ab06adf8 [timothy65535] improve EngineRef get logic
6a1cc39a [timothy65535] update session version
c94b6e55 [timothy65535] rename sessionId to createSessionId
0cc6aae2 [timothy65535] update ha conf
d24a152f [timothy65535] update ha conf
1a67c864 [timothy65535] fix initialize sql suite
b3b502de [timothy65535] [KYUUBI #933] Enhance the detection mechanism for engine startup

Authored-by: timothy65535 <timothy65535@163.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
timothy65535 2021-08-19 11:10:21 +08:00 committed by Kent Yao
parent 636d60aa71
commit 32b18ba19c
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
6 changed files with 72 additions and 54 deletions

View File

@ -94,7 +94,7 @@ private[kyuubi] class ServiceControlCli extends Logging {
currentServerNodes.foreach { sn =>
info(s"Exposing server instance:${sn.instance} with version:${sn.version}" +
s" from $fromNamespace to $toNamespace")
val newNode = createZkServiceNode(
val newNode = createServiceNode(
kyuubiConf, zc, args.cliArgs.namespace, sn.instance, sn.version, true)
exposedServiceNodes += sn.copy(
namespace = toNamespace,

View File

@ -159,7 +159,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
test("test render zookeeper service node info") {
val title = "test render"
val nodes = Seq(ServiceNodeInfo("/kyuubi", "serviceNode", "localhost", 10000, Some("version")))
val nodes = Seq(
ServiceNodeInfo("/kyuubi", "serviceNode", "localhost", 10000, Some("version"), None))
val renderedInfo = renderServiceNodesInfo(title, nodes, true)
val expected = {
s"\n $title " +
@ -187,8 +188,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
System.setProperty(HA_ZK_NAMESPACE.key, uniqueNamespace)
withZkClient(conf) { framework =>
createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
createServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
val newNamespace = getUniqueNamespace()
val args = Array(
@ -198,8 +199,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
)
val expectedCreatedNodes = Seq(
ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION)),
ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION))
ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None),
ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None)
)
testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedCreatedNodes, false))
@ -244,8 +245,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(KyuubiConf.FRONTEND_BIND_PORT, 0)
withZkClient(conf) { framework =>
createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
createServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
val args = Array(
"list", "server",
@ -254,8 +255,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
)
val expectedNodes = Seq(
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION)),
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION))
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None),
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None)
)
testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, false))
@ -272,8 +273,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(KyuubiConf.FRONTEND_BIND_PORT, 0)
withZkClient(conf) { framework =>
createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
createServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
val args = Array(
"get", "server",
@ -284,7 +285,7 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
)
val expectedNodes = Seq(
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION))
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None)
)
testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, false))
@ -302,8 +303,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
withZkClient(conf) { framework =>
withZkClient(conf) { zc =>
createZkServiceNode(conf, zc, uniqueNamespace, "localhost:10000", external = true)
createZkServiceNode(conf, zc, uniqueNamespace, "localhost:10001", external = true)
createServiceNode(conf, zc, uniqueNamespace, "localhost:10000", external = true)
createServiceNode(conf, zc, uniqueNamespace, "localhost:10001", external = true)
}
val args = Array(
@ -315,7 +316,7 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
)
val expectedDeletedNodes = Seq(
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION))
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None)
)
testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedDeletedNodes, false))
@ -332,8 +333,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(KyuubiConf.FRONTEND_BIND_PORT, 0)
withZkClient(conf) { framework =>
createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
createServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
val args = Array(
"list", "server",
@ -343,8 +344,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
)
val expectedNodes = Seq(
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION)),
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION))
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None),
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None)
)
testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, true))

View File

@ -21,7 +21,7 @@ import java.time.Duration
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf}
import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf, OptionalConfigEntry}
import org.apache.kyuubi.ha.client.RetryPolicies
object HighAvailabilityConf {
@ -99,4 +99,13 @@ object HighAvailabilityConf {
.timeConf
.checkValue(_ > 0, "Must be positive")
.createWithDefault(Duration.ofSeconds(120).toMillis)
val HA_ZK_ENGINE_SESSION_ID: OptionalConfigEntry[String] =
buildConf("ha.engine.session.id")
.doc("The sessionId will be attached to zookeeper node when engine started, " +
"and the kyuubi server will check it cyclically.")
.internal
.version("1.4.0")
.stringConf
.createOptional
}

View File

@ -98,7 +98,7 @@ abstract class ServiceDiscovery (
override def start(): Unit = {
val instance = server.connectionUrl
_serviceNode = createZkServiceNode(conf, zkClient, namespace, instance)
_serviceNode = createServiceNode(conf, zkClient, namespace, instance)
// Set a watch on the serviceNode
val watcher = new DeRegisterWatcher
if (zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath) == null) {
@ -170,6 +170,15 @@ object ServiceDiscovery extends Logging {
}
}
def getEngineBySessionId(
zkClient: CuratorFramework,
namespace: String,
sessionId: String): Option[(String, Int)] = {
getServiceNodesInfo(zkClient, namespace, silent = true)
.find(_.createSessionId.exists(_.equals(sessionId)))
.map(data => (data.host, data.port))
}
def getServiceNodesInfo(
zkClient: CuratorFramework,
namespace: String,
@ -185,8 +194,9 @@ object ServiceDiscovery extends Logging {
val host = strings.head
val port = strings(1).toInt
val version = p.split(";").find(_.startsWith("version=")).map(_.stripPrefix("version="))
val sessionId = p.split(";").find(_.startsWith("session=")).map(_.stripPrefix("session="))
info(s"Get service instance:$instance and version:$version under $namespace")
ServiceNodeInfo(namespace, p, host, port, version)
ServiceNodeInfo(namespace, p, host, port, version, sessionId)
}
} catch {
case _: Exception if silent => Nil
@ -196,7 +206,7 @@ object ServiceDiscovery extends Logging {
}
}
def createZkServiceNode(
def createServiceNode(
conf: KyuubiConf,
zkClient: CuratorFramework,
namespace: String,
@ -215,26 +225,22 @@ object ServiceDiscovery extends Logging {
case e: KeeperException =>
throw new KyuubiException(s"Failed to create namespace '$ns'", e)
}
val session = conf.get(HA_ZK_ENGINE_SESSION_ID)
.map(sid => s"session=$sid;").getOrElse("")
val pathPrefix = ZKPaths.makePath(
namespace,
s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};sequence=")
s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};${session}sequence=")
var serviceNode: PersistentNode = null
val createMode = if (external) CreateMode.PERSISTENT_SEQUENTIAL
else CreateMode.EPHEMERAL_SEQUENTIAL
try {
if (external) {
serviceNode = new PersistentNode(
zkClient,
CreateMode.PERSISTENT_SEQUENTIAL,
false,
pathPrefix,
instance.getBytes(StandardCharsets.UTF_8))
} else {
serviceNode = new PersistentNode(
zkClient,
CreateMode.EPHEMERAL_SEQUENTIAL,
false,
pathPrefix,
instance.getBytes(StandardCharsets.UTF_8))
}
serviceNode = new PersistentNode(
zkClient,
createMode,
false,
pathPrefix,
instance.getBytes(StandardCharsets.UTF_8))
serviceNode.start()
val znodeTimeout = conf.get(HA_ZK_NODE_TIMEOUT)
if (!serviceNode.waitForInitialCreate(znodeTimeout, TimeUnit.MILLISECONDS)) {
@ -258,6 +264,7 @@ case class ServiceNodeInfo(
nodeName: String,
host: String,
port: Int,
version: Option[String]) {
version: Option[String],
createSessionId: Option[String]) {
def instance: String = s"$host:$port"
}

View File

@ -30,7 +30,9 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_INIT_TIMEOUT, ENGINE_SHARE_LEVEL, ENGINE_SHARE_LEVEL_SUB_DOMAIN}
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, SERVER, ShareLevel}
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_SESSION_ID
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
import org.apache.kyuubi.ha.client.ServiceDiscovery.getEngineBySessionId
import org.apache.kyuubi.ha.client.ServiceDiscovery.getServerHost
import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, ENGINE_TOTAL}
import org.apache.kyuubi.metrics.MetricsSystem
@ -123,12 +125,9 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI
}
}
private def get(zkClient: CuratorFramework): Option[(String, Int)] = {
getServerHost(zkClient, engineSpace)
}
private def create(zkClient: CuratorFramework): (String, Int) = tryWithLock(zkClient) {
var engineRef = get(zkClient)
// TODO: improve this after support engine pool. (KYUUBI #918)
var engineRef = getServerHost(zkClient, engineSpace)
// Get the engine address ahead if another process has succeeded
if (engineRef.nonEmpty) return engineRef.get
@ -137,6 +136,7 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI
conf.set(SparkProcessBuilder.TAG_KEY,
conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
conf.set(HA_ZK_NAMESPACE, engineSpace)
conf.set(HA_ZK_ENGINE_SESSION_ID, sessionId)
val builder = new SparkProcessBuilder(appUser, conf)
MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
try {
@ -163,7 +163,7 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI
s"Timeout($timeout ms) to launched Spark with $builder",
builder.getError)
}
engineRef = get(zkClient)
engineRef = getEngineBySessionId(zkClient, engineSpace, sessionId)
}
engineRef.get
} finally {
@ -177,9 +177,10 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI
* Get the engine ref from engine space first first or create a new one
*/
def getOrCreate(zkClient: CuratorFramework): (String, Int) = {
get(zkClient).getOrElse {
create(zkClient)
}
getServerHost(zkClient, engineSpace)
.getOrElse {
create(zkClient)
}
}
}

View File

@ -27,12 +27,12 @@ class InitializeSQLSuite extends WithKyuubiServer with JDBCTestUtils {
KyuubiConf()
.set(ENGINE_INITIALIZE_SQL.key,
"CREATE DATABASE IF NOT EXISTS INIT_DB;" +
"CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" +
"INSERT OVERWRITE TABLE INIT_DB.test VALUES (1);")
"CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" +
"INSERT OVERWRITE TABLE INIT_DB.test VALUES (1);")
.set(ENGINE_SESSION_INITIALIZE_SQL.key,
"CREATE DATABASE IF NOT EXISTS INIT_DB;" +
"CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" +
"INSERT INTO INIT_DB.test VALUES (2);")
"CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" +
"INSERT INTO INIT_DB.test VALUES (2);")
}
override def afterAll(): Unit = {