[KYUUBI #1364] [BACKPORT] [KYUUBI #933] Enhance the detection mechanism for engine startup

### _Why are the changes needed?_
Sub task of #1361.
#1176 is based on #933 . In order to backport #1176, #933 has to be backported first.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1364 from zhouyifan279/1361.

Closes #1364

Closes #933

8cd735cc [ulysses-you] [BACKPORT] [KYUUBI #1176] InvalidACL appears in the engine when zookeeper acl is turned on
42d0adcc [timothy65535] [KYUUBI #933] Enhance the detection mechanism for engine startup

Lead-authored-by: timothy65535 <timothy65535@163.com>
Co-authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
This commit is contained in:
timothy65535 2021-11-12 11:47:15 +08:00 committed by ulysses-you
parent 89f1cfb141
commit 7f45a0bc4b
No known key found for this signature in database
GPG Key ID: 4C500BC62D576766
7 changed files with 72 additions and 54 deletions

View File

@ -191,6 +191,7 @@ kyuubi\.frontend<br>\.worker\.keepalive\.time|<div style='width: 65pt;word-wrap:
Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
kyuubi\.ha\.engine<br>\.session\.id|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The sessionId will be attached to zookeeper node when engine started, and the kyuubi server will check it cyclically.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.3.2</div>
kyuubi\.ha\.zookeeper<br>\.acl\.enabled|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>false</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Set to true if the zookeeper ensemble is kerberized</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.ha\.zookeeper<br>\.connection\.base\.retry<br>\.wait|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>1000</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Initial amount of time to wait between retries to the zookeeper ensemble</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.ha\.zookeeper<br>\.connection\.max<br>\.retries|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>3</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Max retry times for connecting to the zookeeper ensemble</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>

View File

@ -94,7 +94,7 @@ private[kyuubi] class ServiceControlCli extends Logging {
currentServerNodes.foreach { sn =>
info(s"Exposing server instance:${sn.instance} with version:${sn.version}" +
s" from $fromNamespace to $toNamespace")
val newNode = createZkServiceNode(
val newNode = createServiceNode(
kyuubiConf, zc, args.cliArgs.namespace, sn.instance, sn.version, true)
exposedServiceNodes += sn.copy(
namespace = toNamespace,

View File

@ -159,7 +159,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
test("test render zookeeper service node info") {
val title = "test render"
val nodes = Seq(ServiceNodeInfo("/kyuubi", "serviceNode", "localhost", 10000, Some("version")))
val nodes = Seq(
ServiceNodeInfo("/kyuubi", "serviceNode", "localhost", 10000, Some("version"), None))
val renderedInfo = renderServiceNodesInfo(title, nodes, true)
val expected = {
s"\n $title " +
@ -187,8 +188,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
System.setProperty(HA_ZK_NAMESPACE.key, uniqueNamespace)
withZkClient(conf) { framework =>
createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
createServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
val newNamespace = getUniqueNamespace()
val args = Array(
@ -198,8 +199,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
)
val expectedCreatedNodes = Seq(
ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION)),
ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION))
ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None),
ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None)
)
testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedCreatedNodes, false))
@ -244,8 +245,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(KyuubiConf.FRONTEND_BIND_PORT, 0)
withZkClient(conf) { framework =>
createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
createServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
val args = Array(
"list", "server",
@ -254,8 +255,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
)
val expectedNodes = Seq(
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION)),
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION))
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None),
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None)
)
testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, false))
@ -272,8 +273,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(KyuubiConf.FRONTEND_BIND_PORT, 0)
withZkClient(conf) { framework =>
createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
createServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
val args = Array(
"get", "server",
@ -284,7 +285,7 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
)
val expectedNodes = Seq(
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION))
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None)
)
testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, false))
@ -302,8 +303,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
withZkClient(conf) { framework =>
withZkClient(conf) { zc =>
createZkServiceNode(conf, zc, uniqueNamespace, "localhost:10000", external = true)
createZkServiceNode(conf, zc, uniqueNamespace, "localhost:10001", external = true)
createServiceNode(conf, zc, uniqueNamespace, "localhost:10000", external = true)
createServiceNode(conf, zc, uniqueNamespace, "localhost:10001", external = true)
}
val args = Array(
@ -315,7 +316,7 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
)
val expectedDeletedNodes = Seq(
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION))
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None)
)
testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedDeletedNodes, false))
@ -332,8 +333,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(KyuubiConf.FRONTEND_BIND_PORT, 0)
withZkClient(conf) { framework =>
createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createZkServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
createServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
val args = Array(
"list", "server",
@ -343,8 +344,8 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
)
val expectedNodes = Seq(
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION)),
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION))
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None),
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None)
)
testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, true))

View File

@ -21,7 +21,7 @@ import java.time.Duration
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf}
import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf, OptionalConfigEntry}
import org.apache.kyuubi.ha.client.RetryPolicies
object HighAvailabilityConf {
@ -99,4 +99,12 @@ object HighAvailabilityConf {
.timeConf
.checkValue(_ > 0, "Must be positive")
.createWithDefault(Duration.ofSeconds(120).toMillis)
val HA_ZK_ENGINE_REF_ID: OptionalConfigEntry[String] =
buildConf("ha.engine.session.id")
.doc("The sessionId will be attached to zookeeper node when engine started, " +
"and the kyuubi server will check it cyclically.")
.version("1.3.2")
.stringConf
.createOptional
}

View File

@ -98,7 +98,7 @@ abstract class ServiceDiscovery (
override def start(): Unit = {
val instance = server.connectionUrl
_serviceNode = createZkServiceNode(conf, zkClient, namespace, instance)
_serviceNode = createServiceNode(conf, zkClient, namespace, instance)
// Set a watch on the serviceNode
val watcher = new DeRegisterWatcher
if (zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath) == null) {
@ -170,6 +170,15 @@ object ServiceDiscovery extends Logging {
}
}
def getEngineBySessionId(
zkClient: CuratorFramework,
namespace: String,
sessionId: String): Option[(String, Int)] = {
getServiceNodesInfo(zkClient, namespace, silent = true)
.find(_.createSessionId.exists(_.equals(sessionId)))
.map(data => (data.host, data.port))
}
def getServiceNodesInfo(
zkClient: CuratorFramework,
namespace: String,
@ -185,8 +194,9 @@ object ServiceDiscovery extends Logging {
val host = strings.head
val port = strings(1).toInt
val version = p.split(";").find(_.startsWith("version=")).map(_.stripPrefix("version="))
val sessionId = p.split(";").find(_.startsWith("session=")).map(_.stripPrefix("session="))
info(s"Get service instance:$instance and version:$version under $namespace")
ServiceNodeInfo(namespace, p, host, port, version)
ServiceNodeInfo(namespace, p, host, port, version, sessionId)
}
} catch {
case _: Exception if silent => Nil
@ -196,7 +206,7 @@ object ServiceDiscovery extends Logging {
}
}
def createZkServiceNode(
def createServiceNode(
conf: KyuubiConf,
zkClient: CuratorFramework,
namespace: String,
@ -215,26 +225,22 @@ object ServiceDiscovery extends Logging {
case e: KeeperException =>
throw new KyuubiException(s"Failed to create namespace '$ns'", e)
}
val session = conf.get(HA_ZK_ENGINE_REF_ID)
.map(sid => s"session=$sid;").getOrElse("")
val pathPrefix = ZKPaths.makePath(
namespace,
s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};sequence=")
s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};${session}sequence=")
var serviceNode: PersistentNode = null
val createMode = if (external) CreateMode.PERSISTENT_SEQUENTIAL
else CreateMode.EPHEMERAL_SEQUENTIAL
try {
if (external) {
serviceNode = new PersistentNode(
zkClient,
CreateMode.PERSISTENT_SEQUENTIAL,
false,
pathPrefix,
instance.getBytes(StandardCharsets.UTF_8))
} else {
serviceNode = new PersistentNode(
zkClient,
CreateMode.EPHEMERAL_SEQUENTIAL,
false,
pathPrefix,
instance.getBytes(StandardCharsets.UTF_8))
}
serviceNode = new PersistentNode(
zkClient,
createMode,
false,
pathPrefix,
instance.getBytes(StandardCharsets.UTF_8))
serviceNode.start()
val znodeTimeout = conf.get(HA_ZK_NODE_TIMEOUT)
if (!serviceNode.waitForInitialCreate(znodeTimeout, TimeUnit.MILLISECONDS)) {
@ -258,6 +264,7 @@ case class ServiceNodeInfo(
nodeName: String,
host: String,
port: Int,
version: Option[String]) {
version: Option[String],
createSessionId: Option[String]) {
def instance: String = s"$host:$port"
}

View File

@ -30,7 +30,9 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_INIT_TIMEOUT, ENGINE_SHARE_LEVEL, ENGINE_SHARE_LEVEL_SUB_DOMAIN}
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, SERVER, ShareLevel}
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
import org.apache.kyuubi.ha.client.ServiceDiscovery.getEngineBySessionId
import org.apache.kyuubi.ha.client.ServiceDiscovery.getServerHost
import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, ENGINE_TOTAL}
import org.apache.kyuubi.metrics.MetricsSystem
@ -123,12 +125,9 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI
}
}
private def get(zkClient: CuratorFramework): Option[(String, Int)] = {
getServerHost(zkClient, engineSpace)
}
private def create(zkClient: CuratorFramework): (String, Int) = tryWithLock(zkClient) {
var engineRef = get(zkClient)
// TODO: improve this after support engine pool. (KYUUBI #918)
var engineRef = getServerHost(zkClient, engineSpace)
// Get the engine address ahead if another process has succeeded
if (engineRef.nonEmpty) return engineRef.get
@ -137,6 +136,7 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI
conf.set(SparkProcessBuilder.TAG_KEY,
conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
conf.set(HA_ZK_NAMESPACE, engineSpace)
conf.set(HA_ZK_ENGINE_REF_ID, sessionId)
val builder = new SparkProcessBuilder(appUser, conf)
MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
try {
@ -163,7 +163,7 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI
s"Timeout($timeout ms) to launched Spark with $builder",
builder.getError)
}
engineRef = get(zkClient)
engineRef = getEngineBySessionId(zkClient, engineSpace, sessionId)
}
engineRef.get
} finally {
@ -177,9 +177,10 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI
* Get the engine ref from engine space first first or create a new one
*/
def getOrCreate(zkClient: CuratorFramework): (String, Int) = {
get(zkClient).getOrElse {
create(zkClient)
}
getServerHost(zkClient, engineSpace)
.getOrElse {
create(zkClient)
}
}
}

View File

@ -27,12 +27,12 @@ class InitializeSQLSuite extends WithKyuubiServer with JDBCTestUtils {
KyuubiConf()
.set(ENGINE_INITIALIZE_SQL.key,
"CREATE DATABASE IF NOT EXISTS INIT_DB;" +
"CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" +
"INSERT OVERWRITE TABLE INIT_DB.test VALUES (1);")
"CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" +
"INSERT OVERWRITE TABLE INIT_DB.test VALUES (1);")
.set(ENGINE_SESSION_INITIALIZE_SQL.key,
"CREATE DATABASE IF NOT EXISTS INIT_DB;" +
"CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" +
"INSERT INTO INIT_DB.test VALUES (2);")
"CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" +
"INSERT INTO INIT_DB.test VALUES (2);")
}
override def afterAll(): Unit = {