catch ConnectionLossException tooo
This commit is contained in:
parent
2204994ebf
commit
07de3c0d00
@ -105,9 +105,12 @@
|
||||
<groupId>org.apache.directory.server</groupId>
|
||||
<artifactId>apacheds-service</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.curator</groupId>
|
||||
<artifactId>curator-test</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
||||
<build>
|
||||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
|
||||
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
|
||||
|
||||
@ -97,6 +97,12 @@ object KyuubiConf {
|
||||
.intConf
|
||||
.createWithDefault(3)
|
||||
|
||||
val HA_ZOOKEEPER_ZNODE_CREATION_TIMEOUT: ConfigEntry[Long] =
|
||||
KyuubiConfigBuilder("spark.kyuubi.ha.zk.znode.creation.timeout")
|
||||
.doc("ZooKeeper znode's creatation timeout (in milliseconds).")
|
||||
.timeConf(TimeUnit.MILLISECONDS)
|
||||
.createWithDefault(TimeUnit.SECONDS.toMillis(120L))
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Operation Log //
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@ -183,10 +183,7 @@ object KyuubiAuthFactory {
|
||||
|
||||
if (!proxyUser.equalsIgnoreCase(realUser)) {
|
||||
ProxyUsers.refreshSuperUserGroupsConfiguration(hadoopConf)
|
||||
ProxyUsers.authorize(
|
||||
UserGroupInformation.createProxyUser(proxyUser, sessionUgi),
|
||||
ipAddress,
|
||||
hadoopConf)
|
||||
ProxyUsers.authorize(UserGroupInformation.createProxyUser(proxyUser, sessionUgi), ipAddress)
|
||||
}
|
||||
} catch {
|
||||
case e: IOException =>
|
||||
|
||||
@ -30,27 +30,29 @@ import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode
|
||||
import org.apache.curator.retry.ExponentialBackoffRetry
|
||||
import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation}
|
||||
import org.apache.hadoop.security.authentication.util.KerberosUtil
|
||||
import org.apache.hive.common.util.HiveVersionInfo
|
||||
import org.apache.spark.{KyuubiSparkUtil, SparkConf}
|
||||
import org.apache.spark.KyuubiConf._
|
||||
import org.apache.zookeeper._
|
||||
import org.apache.zookeeper.KeeperException.{ConnectionLossException, NodeExistsException}
|
||||
import org.apache.zookeeper.data.ACL
|
||||
|
||||
import yaooqinn.kyuubi.Logging
|
||||
import yaooqinn.kyuubi.{Logging, _}
|
||||
import yaooqinn.kyuubi.server.{FrontendService, KyuubiServer}
|
||||
import yaooqinn.kyuubi.service.ServiceException
|
||||
|
||||
object HighAvailabilityUtils extends Logging {
|
||||
private[kyuubi] object HighAvailabilityUtils extends Logging {
|
||||
|
||||
private[this] val ZOOKEEPER_PATH_SEPARATOR = "/"
|
||||
private[this] val ZK_PATH_SEPARATOR = "/"
|
||||
|
||||
private[this] var zooKeeperClient: CuratorFramework = _
|
||||
private[this] var zkClient: CuratorFramework = _
|
||||
private[this] var znode: PersistentEphemeralNode = _
|
||||
private[this] var znodePath: String = _
|
||||
// Set to true only when deregistration happens
|
||||
private[this] var deregisteredWithZooKeeper = false
|
||||
|
||||
def isSupportDynamicServiceDiscovery(conf: SparkConf): Boolean = {
|
||||
conf.get(HA_ENABLED.key).toBoolean && conf.get(HA_ZOOKEEPER_QUORUM.key).split(",").nonEmpty
|
||||
conf.getBoolean(HA_ENABLED.key, defaultValue = false) &&
|
||||
conf.get(HA_ZOOKEEPER_QUORUM.key, "").nonEmpty
|
||||
}
|
||||
|
||||
@throws[Exception]
|
||||
@ -63,67 +65,69 @@ object HighAvailabilityUtils extends Logging {
|
||||
val sessionTimeout = conf.getTimeAsMs(HA_ZOOKEEPER_SESSION_TIMEOUT.key).toInt
|
||||
val baseSleepTime = conf.getTimeAsMs(HA_ZOOKEEPER_CONNECTION_BASESLEEPTIME.key).toInt
|
||||
val maxRetries = conf.get(HA_ZOOKEEPER_CONNECTION_MAX_RETRIES.key).toInt
|
||||
val znodeTimeout = conf.getTimeAsSeconds(HA_ZOOKEEPER_ZNODE_CREATION_TIMEOUT.key)
|
||||
|
||||
// Create a CuratorFramework instance to be used as the ZooKeeper client
|
||||
// Use the zooKeeperAclProvider to create appropriate ACLs
|
||||
zooKeeperClient =
|
||||
zkClient =
|
||||
CuratorFrameworkFactory.builder.connectString(zooKeeperEnsemble)
|
||||
.sessionTimeoutMs(sessionTimeout)
|
||||
.aclProvider(zooKeeperAclProvider)
|
||||
.retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
|
||||
.build
|
||||
zooKeeperClient.start()
|
||||
zkClient.start()
|
||||
// Create the parent znodes recursively; ignore if the parent already exists.
|
||||
try {
|
||||
zooKeeperClient
|
||||
zkClient
|
||||
.create
|
||||
.creatingParentsIfNeeded
|
||||
.withMode(CreateMode.PERSISTENT)
|
||||
.forPath(ZOOKEEPER_PATH_SEPARATOR + rootNamespace)
|
||||
.forPath(ZK_PATH_SEPARATOR + rootNamespace)
|
||||
info("Created the root name space: " + rootNamespace + " on ZooKeeper for KyuubiServer")
|
||||
} catch {
|
||||
case e: ConnectionLossException =>
|
||||
throwServiceEx( s"ZooKeeper is still unreachable after ${sessionTimeout / 1000}s", e)
|
||||
case _: NodeExistsException =>
|
||||
case e: KeeperException =>
|
||||
if (e.code ne KeeperException.Code.NODEEXISTS) {
|
||||
error("Unable to create KyuubiServer namespace: " + rootNamespace + " on ZooKeeper", e)
|
||||
throw e
|
||||
}
|
||||
throwServiceEx( s"Unable to create KyuubiServer namespace $rootNamespace on ZooKeeper", e)
|
||||
}
|
||||
// Create a znode under the rootNamespace parent for this instance of the server
|
||||
// Znode name: serverUri=host:port;version=versionInfo;sequence=sequenceNumber
|
||||
try {
|
||||
val pathPrefix = ZOOKEEPER_PATH_SEPARATOR + rootNamespace + ZOOKEEPER_PATH_SEPARATOR +
|
||||
"serverUri=" + instanceURI + ";" +
|
||||
"version=" + HiveVersionInfo.getVersion + ";" + "sequence="
|
||||
var znodeData = ""
|
||||
znodeData = instanceURI
|
||||
val pathPrefix = ZK_PATH_SEPARATOR + rootNamespace + ZK_PATH_SEPARATOR +
|
||||
"serverUri=" + instanceURI + ";" + "version=" + KYUUBI_VERSION + ";" + "sequence="
|
||||
val znodeData = instanceURI
|
||||
val znodeDataUTF8 = znodeData.getBytes(Charset.forName("UTF-8"))
|
||||
znode = new PersistentEphemeralNode(
|
||||
zooKeeperClient,
|
||||
zkClient,
|
||||
PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL,
|
||||
pathPrefix,
|
||||
znodeDataUTF8)
|
||||
znode.start()
|
||||
// We'll wait for 120s for node creation
|
||||
val znodeCreationTimeout = 120
|
||||
if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) {
|
||||
throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted")
|
||||
if (!znode.waitForInitialCreate(znodeTimeout, TimeUnit.SECONDS)) {
|
||||
throw new ServiceException("Max znode creation wait time: " + znodeTimeout + "s exhausted")
|
||||
}
|
||||
setDeregisteredWithZooKeeper(false)
|
||||
znodePath = znode.getActualPath
|
||||
// Set a watch on the znode
|
||||
if (zooKeeperClient.checkExists.usingWatcher(new DeRegisterWatcher(server))
|
||||
if (zkClient.checkExists.usingWatcher(new DeRegisterWatcher(server))
|
||||
.forPath(znodePath) == null) {
|
||||
// No node exists, throw exception
|
||||
throw new Exception("Unable to create znode for this KyuubiServer instance on ZooKeeper.")
|
||||
throwServiceEx("Unable to create znode for this KyuubiServer instance on ZooKeeper.")
|
||||
}
|
||||
info("Created a znode on ZooKeeper for KyuubiServer uri: " + instanceURI)
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
error("Unable to create a znode for this server instance", e)
|
||||
if (znode != null) znode.close()
|
||||
throw e
|
||||
throwServiceEx("Unable to create a znode for this server instance", e)
|
||||
}
|
||||
}
|
||||
|
||||
private[this] def throwServiceEx(msg: String, e: Exception = null): Unit = {
|
||||
error(msg, e)
|
||||
throw new ServiceException(msg, e)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the ensemble server addresses from the configuration. The format is: host1:port,
|
||||
* host2:port...
|
||||
|
||||
@ -0,0 +1,109 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package yaooqinn.kyuubi.ha
|
||||
|
||||
import org.apache.curator.test.TestingServer
|
||||
import org.apache.spark.{KyuubiConf, SparkConf, SparkFunSuite}
|
||||
import org.apache.spark.KyuubiConf.HA_ZOOKEEPER_CONNECTION_MAX_RETRIES
|
||||
import org.apache.zookeeper.KeeperException.ConnectionLossException
|
||||
import org.scalatest.{BeforeAndAfterEach, ConfigMap}
|
||||
|
||||
import yaooqinn.kyuubi.server.KyuubiServer
|
||||
import yaooqinn.kyuubi.service.ServiceException
|
||||
|
||||
class HighAvailabilityUtilsSuite extends SparkFunSuite with BeforeAndAfterEach {
|
||||
|
||||
var zkServer: TestingServer = _
|
||||
var connectString: String = _
|
||||
val conf = new SparkConf(loadDefaults = true)
|
||||
KyuubiServer.setupCommonConfig(conf)
|
||||
conf.set(KyuubiConf.FRONTEND_BIND_PORT.key, "0")
|
||||
var server: KyuubiServer = _
|
||||
|
||||
override def beforeAll(configMap: ConfigMap): Unit = {
|
||||
zkServer = new TestingServer(2181, true)
|
||||
connectString = zkServer.getConnectString
|
||||
conf.set(KyuubiConf.HA_ZOOKEEPER_QUORUM.key, connectString)
|
||||
conf.set(KyuubiConf.HA_ZOOKEEPER_ZNODE_CREATION_TIMEOUT.key, "1s")
|
||||
conf.set(KyuubiConf.HA_ZOOKEEPER_SESSION_TIMEOUT.key, "3s")
|
||||
conf.set(HA_ZOOKEEPER_CONNECTION_MAX_RETRIES.key, "1")
|
||||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
zkServer.stop()
|
||||
}
|
||||
|
||||
override def afterEach(): Unit = {
|
||||
server.stop()
|
||||
}
|
||||
|
||||
test("Add Server Instance To ZooKeeper with host:port") {
|
||||
server = new KyuubiServer()
|
||||
server.init(conf)
|
||||
server.start()
|
||||
HighAvailabilityUtils.addServerInstanceToZooKeeper(server)
|
||||
}
|
||||
|
||||
test("Add Server Instance To ZooKeeper with wrong port") {
|
||||
server = new KyuubiServer()
|
||||
conf.set(KyuubiConf.HA_ZOOKEEPER_CLIENT_PORT.key, "2000")
|
||||
server.init(conf)
|
||||
server.start()
|
||||
HighAvailabilityUtils.addServerInstanceToZooKeeper(server)
|
||||
}
|
||||
|
||||
test("Add Server Instance To ZooKeeper with host and port") {
|
||||
server = new KyuubiServer()
|
||||
conf.set(KyuubiConf.HA_ZOOKEEPER_QUORUM.key, connectString.split(":")(0))
|
||||
conf.set(KyuubiConf.HA_ZOOKEEPER_CLIENT_PORT.key, "2181")
|
||||
server.init(conf)
|
||||
server.start()
|
||||
HighAvailabilityUtils.addServerInstanceToZooKeeper(server)
|
||||
}
|
||||
|
||||
test("Add Server Instance To ZooKeeper with host and wrong port") {
|
||||
server = new KyuubiServer()
|
||||
conf.set(KyuubiConf.HA_ZOOKEEPER_QUORUM.key, connectString.split(":")(0))
|
||||
conf.set(KyuubiConf.HA_ZOOKEEPER_CLIENT_PORT.key, "2000")
|
||||
server.init(conf)
|
||||
server.start()
|
||||
val e = intercept[ServiceException](HighAvailabilityUtils.addServerInstanceToZooKeeper(server))
|
||||
assert(e.getCause.isInstanceOf[ConnectionLossException])
|
||||
}
|
||||
|
||||
test("Add Server Instance To ZooKeeper with wrong host and right port") {
|
||||
server = new KyuubiServer()
|
||||
conf.set(KyuubiConf.HA_ZOOKEEPER_QUORUM.key, connectString.split(":")(0) + "1")
|
||||
conf.set(KyuubiConf.HA_ZOOKEEPER_CLIENT_PORT.key, "2181")
|
||||
server.init(conf)
|
||||
server.start()
|
||||
val e = intercept[ServiceException](HighAvailabilityUtils.addServerInstanceToZooKeeper(server))
|
||||
assert(e.getCause.isInstanceOf[ConnectionLossException])
|
||||
}
|
||||
|
||||
test("Is Support Dynamic Service Discovery") {
|
||||
val conf = new SparkConf(loadDefaults = true)
|
||||
assert(!HighAvailabilityUtils.isSupportDynamicServiceDiscovery(conf))
|
||||
KyuubiServer.setupCommonConfig(conf)
|
||||
assert(!HighAvailabilityUtils.isSupportDynamicServiceDiscovery(conf))
|
||||
conf.set(KyuubiConf.HA_ENABLED.key, "true")
|
||||
assert(!HighAvailabilityUtils.isSupportDynamicServiceDiscovery(conf))
|
||||
conf.set(KyuubiConf.HA_ZOOKEEPER_QUORUM.key, "localhost")
|
||||
assert(HighAvailabilityUtils.isSupportDynamicServiceDiscovery(conf))
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user