diff --git a/externals/kyuubi-spark-sql-engine/pom.xml b/externals/kyuubi-spark-sql-engine/pom.xml
index ffc8d428e..4772600ac 100644
--- a/externals/kyuubi-spark-sql-engine/pom.xml
+++ b/externals/kyuubi-spark-sql-engine/pom.xml
@@ -90,6 +90,12 @@
${iceberg.name}
test
+
+
+ org.apache.curator
+ curator-test
+ test
+
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 0a67f7eea..901be401b 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -27,7 +27,7 @@ import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.ha.client.{RetryPolicies, ServiceDiscovery}
+import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, RetryPolicies, ServiceDiscovery}
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.SignalRegister
@@ -37,7 +37,7 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
def this(spark: SparkSession) = this(classOf[SparkSQLEngine].getSimpleName, spark)
override private[kyuubi] val backendService = new SparkSQLBackendService(spark)
- private val discoveryService = new ServiceDiscovery(this)
+ private val discoveryService = new EngineServiceDiscovery(this)
override def initialize(conf: KyuubiConf): Unit = {
val listener = new SparkSQLEngineListener(this)
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala
new file mode 100644
index 000000000..4d07ba371
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark
+
+import java.util.UUID
+
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARED_LEVEL
+import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.ShareLevel.ShareLevel
+import org.apache.kyuubi.operation.JDBCTestUtils
+import org.apache.kyuubi.service.ServiceState
+
+/**
+ * This suite is to test some behaivor with spark engine in different share level.
+ * e.g. cleanup discovery service before stop.
+ */
+abstract class ShareLevelSparkEngineSuite
+ extends WithDiscoverySparkSQLEngine with JDBCTestUtils {
+ def sharedLevel: ShareLevel
+ override def withKyuubiConf: Map[String, String] = {
+ super.withKyuubiConf ++ Map(ENGINE_SHARED_LEVEL.key-> sharedLevel.toString)
+ }
+ override protected def jdbcUrl: String = getJdbcUrl
+ override val namespace: String = {
+ // for test, we always use uuid as namespace
+ s"/kyuubi/${sharedLevel.toString}/${UUID.randomUUID().toString}"
+ }
+
+ test("check discovery service is clean up with different share level") {
+ withZkClient { zkClient =>
+ assert(engine.getServiceState == ServiceState.STARTED)
+ assert(zkClient.checkExists().forPath(namespace) != null)
+ withJdbcStatement() {_ => }
+ sharedLevel match {
+ // Connection level, we will cleanup namespace since it's always a global unique value.
+ case ShareLevel.CONNECTION =>
+ assert(engine.getServiceState == ServiceState.STOPPED)
+ assert(zkClient.checkExists().forPath(namespace) == null)
+ case _ =>
+ assert(engine.getServiceState == ServiceState.STARTED)
+ assert(zkClient.checkExists().forPath(namespace) != null)
+ }
+ }
+ }
+}
+
+class ConnectionLevelSparkEngineSuite extends ShareLevelSparkEngineSuite {
+ override def sharedLevel: ShareLevel = ShareLevel.CONNECTION
+}
+
+class UserLevelSparkEngineSuite extends ShareLevelSparkEngineSuite {
+ override def sharedLevel: ShareLevel = ShareLevel.USER
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala
index 3d3ea7f96..f61298f8c 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark
import org.apache.kyuubi.service.ServiceState
class SparkSQLEngineListenerSuite extends WithSparkSQLEngine {
- override def conf: Map[String, String] = Map.empty
+ override def withKyuubiConf: Map[String, String] = Map.empty
test("application end") {
assert(engine.getServiceState === ServiceState.STARTED)
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala
new file mode 100644
index 000000000..acaf107b5
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark
+
+import org.apache.curator.framework.CuratorFramework
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{EMBEDDED_ZK_PORT, EMBEDDED_ZK_TEMP_DIR}
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_ACL_ENABLED, HA_ZK_NAMESPACE, HA_ZK_QUORUM}
+import org.apache.kyuubi.ha.client.ServiceDiscovery
+import org.apache.kyuubi.ha.server.EmbeddedZkServer
+
+trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
+ private var zkServer: EmbeddedZkServer = _
+ def namespace: String
+ override def withKyuubiConf: Map[String, String] = {
+ assert(zkServer != null)
+ Map(HA_ZK_QUORUM.key -> zkServer.getConnectString,
+ HA_ZK_ACL_ENABLED.key -> "false",
+ HA_ZK_NAMESPACE.key -> namespace)
+ }
+
+ override def beforeAll(): Unit = {
+ zkServer = new EmbeddedZkServer()
+ val zkData = Utils.createTempDir()
+ val tmpConf = KyuubiConf()
+ tmpConf.set(EMBEDDED_ZK_PORT, -1)
+ tmpConf.set(EMBEDDED_ZK_TEMP_DIR, zkData.toString)
+ zkServer.initialize(tmpConf)
+ zkServer.start()
+ }
+
+ override def afterAll(): Unit = {
+ if (zkServer != null) {
+ zkServer.stop()
+ }
+ }
+
+ override protected def beforeEach(): Unit = {
+ super.beforeEach()
+ startSparkEngine()
+ }
+
+ override protected def afterEach(): Unit = {
+ super.afterEach()
+ stopSparkEngine()
+ }
+
+ def withZkClient(f: CuratorFramework => Unit): Unit = {
+ val zkClient = ServiceDiscovery.startZookeeperClient(kyuubiConf)
+ try {
+ f(zkClient)
+ } finally {
+ zkClient.close()
+ }
+ }
+
+ protected def getDiscoveryConnectionString: String = {
+ if (zkServer == null) {
+ ""
+ } else {
+ zkServer.getConnectString
+ }
+ }
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
index 8c19ba021..d622cdd98 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
@@ -20,11 +20,14 @@ package org.apache.kyuubi.engine.spark
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.config.KyuubiConf
trait WithSparkSQLEngine extends KyuubiFunSuite {
protected var spark: SparkSession = _
protected var engine: SparkSQLEngine = _
- def conf: Map[String, String]
+ // conf will be loaded until start spark engine
+ def withKyuubiConf: Map[String, String]
+ val kyuubiConf: KyuubiConf = SparkSQLEngine.kyuubiConf
protected var connectionUrl: String = _
@@ -42,9 +45,9 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
s"jdbc:derby:;databaseName=$metastorePath;create=true")
System.setProperty("spark.sql.warehouse.dir", warehousePath.toString)
System.setProperty("spark.sql.hive.metastore.sharedPrefixes", "org.apache.hive.jdbc")
- conf.foreach { case (k, v) =>
+ withKyuubiConf.foreach { case (k, v) =>
System.setProperty(k, v)
- SparkSQLEngine.kyuubiConf.set(k, v)
+ kyuubiConf.set(k, v)
}
SparkSession.clearActiveSession()
@@ -56,10 +59,17 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
override def afterAll(): Unit = {
super.afterAll()
+
stopSparkEngine()
}
protected def stopSparkEngine(): Unit = {
+ // we need to clean up conf since it's the global config in same jvm.
+ withKyuubiConf.foreach { case (k, _) =>
+ System.clearProperty(k)
+ kyuubiConf.unset(k)
+ }
+
if (engine != null) {
engine.stop()
engine = null
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkIcebergOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkIcebergOperationSuite.scala
index e9362e284..26efc9dac 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkIcebergOperationSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkIcebergOperationSuite.scala
@@ -22,7 +22,7 @@ import org.apache.kyuubi.operation.BasicIcebergJDBCTests
class SparkIcebergOperationSuite extends WithSparkSQLEngine with BasicIcebergJDBCTests {
override protected def jdbcUrl: String = getJdbcUrl
- override def conf: Map[String, String] = icebergConfigs
+ override def withKyuubiConf: Map[String, String] = icebergConfigs
override def afterAll(): Unit = {
super.afterAll()
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
index 13a1fcab6..816a12d5f 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
@@ -39,7 +39,7 @@ import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
class SparkOperationSuite extends WithSparkSQLEngine with JDBCTests {
override protected def jdbcUrl: String = getJdbcUrl
- override def conf: Map[String, String] = Map.empty
+ override def withKyuubiConf: Map[String, String] = Map.empty
test("get table types") {
withJdbcStatement() { statement =>
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SessionSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SessionSuite.scala
index 5395ccdf7..69e682334 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SessionSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SessionSuite.scala
@@ -27,7 +27,7 @@ import org.apache.kyuubi.operation.JDBCTestUtils
import org.apache.kyuubi.service.ServiceState._
class SessionSuite extends WithSparkSQLEngine with JDBCTestUtils {
- override def conf: Map[String, String] = {
+ override def withKyuubiConf: Map[String, String] = {
Map(ENGINE_SHARED_LEVEL.key -> "CONNECTION")
}
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
new file mode 100644
index 000000000..3c823fffc
--- /dev/null
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ha.client
+
+import scala.util.control.NonFatal
+
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARED_LEVEL
+import org.apache.kyuubi.service.Serverable
+
+/**
+ * A service for service discovery used by engine side.
+ *
+ * @param name the name of the service itself
+ * @param server the instance uri a service that used to publish itself
+ */
+class EngineServiceDiscovery private(
+ name: String,
+ server: Serverable) extends ServiceDiscovery(server) {
+ def this(server: Serverable) =
+ this(classOf[EngineServiceDiscovery].getSimpleName, server)
+
+ override def stop(): Unit = {
+ conf.get(ENGINE_SHARED_LEVEL) match {
+ // For connection level, we should clean up the namespace in zk in case the disk stress.
+ case "CONNECTION" if namespace != null =>
+ try {
+ zkClient.delete().deletingChildrenIfNeeded().forPath(namespace)
+ info("Clean up discovery service due to this is connection share level.")
+ } catch {
+ case NonFatal(e) =>
+ warn("Failed to clean up Spark engine before stop.", e)
+ }
+
+ case _ =>
+ }
+ super.stop()
+ }
+}
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/KyuubiServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/KyuubiServiceDiscovery.scala
new file mode 100644
index 000000000..1363d2d29
--- /dev/null
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/KyuubiServiceDiscovery.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ha.client
+
+import org.apache.zookeeper.{WatchedEvent, Watcher}
+
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.service.Serverable
+
+/**
+ * A service for service discovery used by kyuubi server side.
+ * We add another zk watch so that we can stop server more genteelly.
+ *
+ * @param name the name of the service itself
+ * @param server the instance uri a service that used to publish itself
+ */
+class KyuubiServiceDiscovery private(
+ name: String,
+ server: Serverable) extends ServiceDiscovery(server) {
+ def this(server: Serverable) =
+ this(classOf[KyuubiServiceDiscovery].getSimpleName, server)
+
+ override def start(): Unit = {
+ super.start()
+ // Set a watch on the serviceNode
+ val watcher = new DeRegisterWatcher
+ if (zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath) == null) {
+ // No node exists, throw exception
+ throw new KyuubiException(s"Unable to create znode for this Kyuubi " +
+ s"instance[${server.connectionUrl}] on ZooKeeper.")
+ }
+ }
+
+ override def stopGracefully(): Unit = {
+ stop()
+ while (server.backendService != null &&
+ server.backendService.sessionManager.getOpenSessionCount > 0) {
+ Thread.sleep(1000 * 60)
+ }
+ server.stop()
+ }
+
+ class DeRegisterWatcher extends Watcher {
+ override def process(event: WatchedEvent): Unit = {
+ if (event.getType == Watcher.Event.EventType.NodeDeleted) {
+ warn(s"This Kyuubi instance ${server.connectionUrl} is now de-registered from" +
+ s" ZooKeeper. The server will be shut down after the last client session completes.")
+ stopGracefully()
+ }
+ }
+ }
+}
+
+
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
index 4bc0be549..b8cfe09a6 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import javax.security.auth.login.Configuration
import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode
@@ -33,8 +34,8 @@ import org.apache.curator.retry._
import org.apache.curator.utils.ZKPaths
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.JaasConfiguration
-import org.apache.zookeeper.{KeeperException, WatchedEvent, Watcher}
import org.apache.zookeeper.CreateMode.PERSISTENT
+import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.NodeExistsException
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException}
@@ -45,34 +46,37 @@ import org.apache.kyuubi.service.{AbstractService, Serverable}
import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
/**
- * A service for service discovery
+ * A abstract service for service discovery
*
* @param name the name of the service itself
* @param server the instance uri a service that used to publish itself
*/
-class ServiceDiscovery private (
+abstract class ServiceDiscovery private (
name: String,
server: Serverable) extends AbstractService(name) {
def this(server: Serverable) =
this(classOf[ServiceDiscovery].getSimpleName, server)
- private var zkClient: CuratorFramework = _
- private var serviceNode: PersistentEphemeralNode = _
+ private var _zkClient: CuratorFramework = _
+ private var _serviceNode: PersistentEphemeralNode = _
/**
* a pre-defined namespace used to publish the instance of the associate service
*/
- private var namespace: String = _
+ private var _namespace: String = _
+
+ def zkClient: CuratorFramework = _zkClient
+ def serviceNode: PersistentEphemeralNode = _serviceNode
+ def namespace: String = _namespace
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
- namespace = conf.get(HA_ZK_NAMESPACE)
+ _namespace = conf.get(HA_ZK_NAMESPACE)
val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
setUpZooKeeperAuth(conf)
- zkClient = buildZookeeperClient(conf)
+ _zkClient = buildZookeeperClient(conf)
zkClient.getConnectionStateListenable.addListener(new ConnectionStateListener {
-
private val isConnected = new AtomicBoolean(false)
override def stateChanged(client: CuratorFramework, newState: ConnectionState): Unit = {
@@ -86,7 +90,6 @@ class ServiceDiscovery private (
override def run(): Unit = if (!isConnected.get()) {
error(s"Zookeeper client connection state changed to: $newState, but failed to" +
s" reconnect in ${delay / 1000} seconds. Give up retry. ")
- client.close()
stopGracefully()
}
}, delay, TimeUnit.MILLISECONDS)
@@ -116,7 +119,7 @@ class ServiceDiscovery private (
namespace,
s"serviceUri=$instance;version=$KYUUBI_VERSION;sequence=")
try {
- serviceNode = new PersistentEphemeralNode(
+ _serviceNode = new PersistentEphemeralNode(
zkClient,
PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL,
pathPrefix,
@@ -126,13 +129,6 @@ class ServiceDiscovery private (
if (!serviceNode.waitForInitialCreate(znodeTimeout, TimeUnit.SECONDS)) {
throw new KyuubiException(s"Max znode creation wait time $znodeTimeout s exhausted")
}
- // Set a watch on the serviceNode
- val watcher = new DeRegisterWatcher
- if (zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath) == null) {
- // No node exists, throw exception
- throw new KyuubiException(s"Unable to create znode for this Kyuubi instance[$instance]" +
- s" on ZooKeeper.")
- }
info(s"Created a ${serviceNode.getActualPath} on ZooKeeper for KyuubiServer uri: " + instance)
} catch {
case e: Exception =>
@@ -146,38 +142,30 @@ class ServiceDiscovery private (
}
override def stop(): Unit = {
- if (serviceNode != null) {
- try {
- serviceNode.close()
- } catch {
- case e: IOException =>
- error("Failed to close the persistent ephemeral znode" + serviceNode.getActualPath, e)
- }
- }
+ closeServiceNode()
if (zkClient != null) zkClient.close()
super.stop()
}
- private def stopGracefully(): Unit = {
- stop()
- while (server.backendService != null &&
- server.backendService.sessionManager.getOpenSessionCount > 0) {
- Thread.sleep(1000 * 60)
- }
- server.stop()
- }
-
-
- class DeRegisterWatcher extends Watcher {
- override def process(event: WatchedEvent): Unit = {
- if (event.getType == Watcher.Event.EventType.NodeDeleted) {
- warn(s"This Kyuubi instance ${server.connectionUrl} is now de-registered from" +
- s" ZooKeeper. The server will be shut down after the last client session completes.")
- ServiceDiscovery.this.stopGracefully()
+ // close the EPHEMERAL_SEQUENTIAL node in zk
+ protected def closeServiceNode(): Unit = {
+ if (_serviceNode != null) {
+ try {
+ _serviceNode.close()
+ } catch {
+ case e: IOException =>
+ error("Failed to close the persistent ephemeral znode" + serviceNode.getActualPath, e)
+ } finally {
+ _serviceNode = null
}
}
}
+
+ // stop the server genteelly
+ def stopGracefully(): Unit = {
+ stop()
+ }
}
object ServiceDiscovery {
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala
index f3505d9f8..8683fef89 100644
--- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala
@@ -66,7 +66,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
server.start()
val znodeRoot = s"/$namespace"
- val serviceDiscovery = new ServiceDiscovery(server)
+ val serviceDiscovery = new KyuubiServiceDiscovery(server)
val framework = ServiceDiscovery.startZookeeperClient(conf)
try {
serviceDiscovery.initialize(conf)
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index 4ecac8099..5d020b2b6 100644
--- a/kyuubi-main/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.ha.client.ServiceDiscovery
+import org.apache.kyuubi.ha.client.{KyuubiServiceDiscovery, ServiceDiscovery}
import org.apache.kyuubi.ha.server.EmbeddedZkServer
import org.apache.kyuubi.service.{AbstractBackendService, KinitAuxiliaryService, Serverable}
import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
@@ -79,7 +79,7 @@ class KyuubiServer(name: String) extends Serverable(name) {
def this() = this(classOf[KyuubiServer].getSimpleName)
override private[kyuubi] val backendService: AbstractBackendService = new KyuubiBackendService()
- private val discoveryService = new ServiceDiscovery(this)
+ private val discoveryService = new KyuubiServiceDiscovery(this)
override def initialize(conf: KyuubiConf): Unit = synchronized {
val kinit = new KinitAuxiliaryService()