diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index 79512f122..ba5712728 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -105,9 +105,12 @@
org.apache.directory.server
apacheds-service
+
+ org.apache.curator
+ curator-test
+
-
target/scala-${scala.binary.version}/classes
target/scala-${scala.binary.version}/test-classes
diff --git a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala
index b94906061..5ba98c7d7 100644
--- a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala
+++ b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala
@@ -97,6 +97,12 @@ object KyuubiConf {
.intConf
.createWithDefault(3)
+ val HA_ZOOKEEPER_ZNODE_CREATION_TIMEOUT: ConfigEntry[Long] =
+ KyuubiConfigBuilder("spark.kyuubi.ha.zk.znode.creation.timeout")
+ .doc("ZooKeeper znode's creatation timeout (in milliseconds).")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(TimeUnit.SECONDS.toMillis(120L))
+
/////////////////////////////////////////////////////////////////////////////////////////////////
// Operation Log //
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/auth/KyuubiAuthFactory.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/auth/KyuubiAuthFactory.scala
index 93c2a1588..4a7d79d99 100644
--- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/auth/KyuubiAuthFactory.scala
+++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/auth/KyuubiAuthFactory.scala
@@ -183,10 +183,7 @@ object KyuubiAuthFactory {
if (!proxyUser.equalsIgnoreCase(realUser)) {
ProxyUsers.refreshSuperUserGroupsConfiguration(hadoopConf)
- ProxyUsers.authorize(
- UserGroupInformation.createProxyUser(proxyUser, sessionUgi),
- ipAddress,
- hadoopConf)
+ ProxyUsers.authorize(UserGroupInformation.createProxyUser(proxyUser, sessionUgi), ipAddress)
}
} catch {
case e: IOException =>
diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtils.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtils.scala
index c261f4cdb..194ebd22f 100644
--- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtils.scala
+++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtils.scala
@@ -30,27 +30,29 @@ import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation}
import org.apache.hadoop.security.authentication.util.KerberosUtil
-import org.apache.hive.common.util.HiveVersionInfo
import org.apache.spark.{KyuubiSparkUtil, SparkConf}
import org.apache.spark.KyuubiConf._
import org.apache.zookeeper._
+import org.apache.zookeeper.KeeperException.{ConnectionLossException, NodeExistsException}
import org.apache.zookeeper.data.ACL
-import yaooqinn.kyuubi.Logging
+import yaooqinn.kyuubi.{Logging, _}
import yaooqinn.kyuubi.server.{FrontendService, KyuubiServer}
+import yaooqinn.kyuubi.service.ServiceException
-object HighAvailabilityUtils extends Logging {
+private[kyuubi] object HighAvailabilityUtils extends Logging {
- private[this] val ZOOKEEPER_PATH_SEPARATOR = "/"
+ private[this] val ZK_PATH_SEPARATOR = "/"
- private[this] var zooKeeperClient: CuratorFramework = _
+ private[this] var zkClient: CuratorFramework = _
private[this] var znode: PersistentEphemeralNode = _
private[this] var znodePath: String = _
// Set to true only when deregistration happens
private[this] var deregisteredWithZooKeeper = false
def isSupportDynamicServiceDiscovery(conf: SparkConf): Boolean = {
- conf.get(HA_ENABLED.key).toBoolean && conf.get(HA_ZOOKEEPER_QUORUM.key).split(",").nonEmpty
+ conf.getBoolean(HA_ENABLED.key, defaultValue = false) &&
+ conf.get(HA_ZOOKEEPER_QUORUM.key, "").nonEmpty
}
@throws[Exception]
@@ -63,67 +65,69 @@ object HighAvailabilityUtils extends Logging {
val sessionTimeout = conf.getTimeAsMs(HA_ZOOKEEPER_SESSION_TIMEOUT.key).toInt
val baseSleepTime = conf.getTimeAsMs(HA_ZOOKEEPER_CONNECTION_BASESLEEPTIME.key).toInt
val maxRetries = conf.get(HA_ZOOKEEPER_CONNECTION_MAX_RETRIES.key).toInt
+ val znodeTimeout = conf.getTimeAsSeconds(HA_ZOOKEEPER_ZNODE_CREATION_TIMEOUT.key)
+
// Create a CuratorFramework instance to be used as the ZooKeeper client
// Use the zooKeeperAclProvider to create appropriate ACLs
- zooKeeperClient =
+ zkClient =
CuratorFrameworkFactory.builder.connectString(zooKeeperEnsemble)
.sessionTimeoutMs(sessionTimeout)
.aclProvider(zooKeeperAclProvider)
.retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
.build
- zooKeeperClient.start()
+ zkClient.start()
// Create the parent znodes recursively; ignore if the parent already exists.
try {
- zooKeeperClient
+ zkClient
.create
.creatingParentsIfNeeded
.withMode(CreateMode.PERSISTENT)
- .forPath(ZOOKEEPER_PATH_SEPARATOR + rootNamespace)
+ .forPath(ZK_PATH_SEPARATOR + rootNamespace)
info("Created the root name space: " + rootNamespace + " on ZooKeeper for KyuubiServer")
} catch {
+ case e: ConnectionLossException =>
+ throwServiceEx( s"ZooKeeper is still unreachable after ${sessionTimeout / 1000}s", e)
+ case _: NodeExistsException =>
case e: KeeperException =>
- if (e.code ne KeeperException.Code.NODEEXISTS) {
- error("Unable to create KyuubiServer namespace: " + rootNamespace + " on ZooKeeper", e)
- throw e
- }
+ throwServiceEx( s"Unable to create KyuubiServer namespace $rootNamespace on ZooKeeper", e)
}
// Create a znode under the rootNamespace parent for this instance of the server
// Znode name: serverUri=host:port;version=versionInfo;sequence=sequenceNumber
try {
- val pathPrefix = ZOOKEEPER_PATH_SEPARATOR + rootNamespace + ZOOKEEPER_PATH_SEPARATOR +
- "serverUri=" + instanceURI + ";" +
- "version=" + HiveVersionInfo.getVersion + ";" + "sequence="
- var znodeData = ""
- znodeData = instanceURI
+ val pathPrefix = ZK_PATH_SEPARATOR + rootNamespace + ZK_PATH_SEPARATOR +
+ "serverUri=" + instanceURI + ";" + "version=" + KYUUBI_VERSION + ";" + "sequence="
+ val znodeData = instanceURI
val znodeDataUTF8 = znodeData.getBytes(Charset.forName("UTF-8"))
znode = new PersistentEphemeralNode(
- zooKeeperClient,
+ zkClient,
PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL,
pathPrefix,
znodeDataUTF8)
znode.start()
- // We'll wait for 120s for node creation
- val znodeCreationTimeout = 120
- if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) {
- throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted")
+ if (!znode.waitForInitialCreate(znodeTimeout, TimeUnit.SECONDS)) {
+ throw new ServiceException("Max znode creation wait time: " + znodeTimeout + "s exhausted")
}
setDeregisteredWithZooKeeper(false)
znodePath = znode.getActualPath
// Set a watch on the znode
- if (zooKeeperClient.checkExists.usingWatcher(new DeRegisterWatcher(server))
+ if (zkClient.checkExists.usingWatcher(new DeRegisterWatcher(server))
.forPath(znodePath) == null) {
// No node exists, throw exception
- throw new Exception("Unable to create znode for this KyuubiServer instance on ZooKeeper.")
+ throwServiceEx("Unable to create znode for this KyuubiServer instance on ZooKeeper.")
}
info("Created a znode on ZooKeeper for KyuubiServer uri: " + instanceURI)
} catch {
case e: Exception =>
- error("Unable to create a znode for this server instance", e)
if (znode != null) znode.close()
- throw e
+ throwServiceEx("Unable to create a znode for this server instance", e)
}
}
+ private[this] def throwServiceEx(msg: String, e: Exception = null): Unit = {
+ error(msg, e)
+ throw new ServiceException(msg, e)
+ }
+
/**
* Get the ensemble server addresses from the configuration. The format is: host1:port,
* host2:port...
diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtilsSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtilsSuite.scala
new file mode 100644
index 000000000..766448465
--- /dev/null
+++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtilsSuite.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package yaooqinn.kyuubi.ha
+
+import org.apache.curator.test.TestingServer
+import org.apache.spark.{KyuubiConf, SparkConf, SparkFunSuite}
+import org.apache.spark.KyuubiConf.HA_ZOOKEEPER_CONNECTION_MAX_RETRIES
+import org.apache.zookeeper.KeeperException.ConnectionLossException
+import org.scalatest.{BeforeAndAfterEach, ConfigMap}
+
+import yaooqinn.kyuubi.server.KyuubiServer
+import yaooqinn.kyuubi.service.ServiceException
+
+class HighAvailabilityUtilsSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+ var zkServer: TestingServer = _
+ var connectString: String = _
+ val conf = new SparkConf(loadDefaults = true)
+ KyuubiServer.setupCommonConfig(conf)
+ conf.set(KyuubiConf.FRONTEND_BIND_PORT.key, "0")
+ var server: KyuubiServer = _
+
+ override def beforeAll(configMap: ConfigMap): Unit = {
+ zkServer = new TestingServer(2181, true)
+ connectString = zkServer.getConnectString
+ conf.set(KyuubiConf.HA_ZOOKEEPER_QUORUM.key, connectString)
+ conf.set(KyuubiConf.HA_ZOOKEEPER_ZNODE_CREATION_TIMEOUT.key, "1s")
+ conf.set(KyuubiConf.HA_ZOOKEEPER_SESSION_TIMEOUT.key, "3s")
+ conf.set(HA_ZOOKEEPER_CONNECTION_MAX_RETRIES.key, "1")
+ }
+
+ override def afterAll(): Unit = {
+ zkServer.stop()
+ }
+
+ override def afterEach(): Unit = {
+ server.stop()
+ }
+
+ test("Add Server Instance To ZooKeeper with host:port") {
+ server = new KyuubiServer()
+ server.init(conf)
+ server.start()
+ HighAvailabilityUtils.addServerInstanceToZooKeeper(server)
+ }
+
+ test("Add Server Instance To ZooKeeper with wrong port") {
+ server = new KyuubiServer()
+ conf.set(KyuubiConf.HA_ZOOKEEPER_CLIENT_PORT.key, "2000")
+ server.init(conf)
+ server.start()
+ HighAvailabilityUtils.addServerInstanceToZooKeeper(server)
+ }
+
+ test("Add Server Instance To ZooKeeper with host and port") {
+ server = new KyuubiServer()
+ conf.set(KyuubiConf.HA_ZOOKEEPER_QUORUM.key, connectString.split(":")(0))
+ conf.set(KyuubiConf.HA_ZOOKEEPER_CLIENT_PORT.key, "2181")
+ server.init(conf)
+ server.start()
+ HighAvailabilityUtils.addServerInstanceToZooKeeper(server)
+ }
+
+ test("Add Server Instance To ZooKeeper with host and wrong port") {
+ server = new KyuubiServer()
+ conf.set(KyuubiConf.HA_ZOOKEEPER_QUORUM.key, connectString.split(":")(0))
+ conf.set(KyuubiConf.HA_ZOOKEEPER_CLIENT_PORT.key, "2000")
+ server.init(conf)
+ server.start()
+ val e = intercept[ServiceException](HighAvailabilityUtils.addServerInstanceToZooKeeper(server))
+ assert(e.getCause.isInstanceOf[ConnectionLossException])
+ }
+
+ test("Add Server Instance To ZooKeeper with wrong host and right port") {
+ server = new KyuubiServer()
+ conf.set(KyuubiConf.HA_ZOOKEEPER_QUORUM.key, connectString.split(":")(0) + "1")
+ conf.set(KyuubiConf.HA_ZOOKEEPER_CLIENT_PORT.key, "2181")
+ server.init(conf)
+ server.start()
+ val e = intercept[ServiceException](HighAvailabilityUtils.addServerInstanceToZooKeeper(server))
+ assert(e.getCause.isInstanceOf[ConnectionLossException])
+ }
+
+ test("Is Support Dynamic Service Discovery") {
+ val conf = new SparkConf(loadDefaults = true)
+ assert(!HighAvailabilityUtils.isSupportDynamicServiceDiscovery(conf))
+ KyuubiServer.setupCommonConfig(conf)
+ assert(!HighAvailabilityUtils.isSupportDynamicServiceDiscovery(conf))
+ conf.set(KyuubiConf.HA_ENABLED.key, "true")
+ assert(!HighAvailabilityUtils.isSupportDynamicServiceDiscovery(conf))
+ conf.set(KyuubiConf.HA_ZOOKEEPER_QUORUM.key, "localhost")
+ assert(HighAvailabilityUtils.isSupportDynamicServiceDiscovery(conf))
+ }
+}
diff --git a/pom.xml b/pom.xml
index d1fb051a1..84d4b871e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -313,7 +313,12 @@
-
+
+ org.apache.curator
+ curator-test
+ 2.6.0
+ test
+