From 07de3c0d00a9b369f4deef0a2e3dc1a6c5c560c7 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 29 Jun 2018 19:41:49 +0800 Subject: [PATCH] catch ConnectionLossException tooo --- kyuubi-server/pom.xml | 5 +- .../scala/org/apache/spark/KyuubiConf.scala | 6 + .../kyuubi/auth/KyuubiAuthFactory.scala | 5 +- .../kyuubi/ha/HighAvailabilityUtils.scala | 60 +++++----- .../ha/HighAvailabilityUtilsSuite.scala | 109 ++++++++++++++++++ pom.xml | 7 +- 6 files changed, 158 insertions(+), 34 deletions(-) create mode 100644 kyuubi-server/src/test/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtilsSuite.scala diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml index 79512f122..ba5712728 100644 --- a/kyuubi-server/pom.xml +++ b/kyuubi-server/pom.xml @@ -105,9 +105,12 @@ org.apache.directory.server apacheds-service + + org.apache.curator + curator-test + - target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes diff --git a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala index b94906061..5ba98c7d7 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala @@ -97,6 +97,12 @@ object KyuubiConf { .intConf .createWithDefault(3) + val HA_ZOOKEEPER_ZNODE_CREATION_TIMEOUT: ConfigEntry[Long] = + KyuubiConfigBuilder("spark.kyuubi.ha.zk.znode.creation.timeout") + .doc("ZooKeeper znode's creatation timeout (in milliseconds).") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(TimeUnit.SECONDS.toMillis(120L)) + ///////////////////////////////////////////////////////////////////////////////////////////////// // Operation Log // ///////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/auth/KyuubiAuthFactory.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/auth/KyuubiAuthFactory.scala index 93c2a1588..4a7d79d99 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/auth/KyuubiAuthFactory.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/auth/KyuubiAuthFactory.scala @@ -183,10 +183,7 @@ object KyuubiAuthFactory { if (!proxyUser.equalsIgnoreCase(realUser)) { ProxyUsers.refreshSuperUserGroupsConfiguration(hadoopConf) - ProxyUsers.authorize( - UserGroupInformation.createProxyUser(proxyUser, sessionUgi), - ipAddress, - hadoopConf) + ProxyUsers.authorize(UserGroupInformation.createProxyUser(proxyUser, sessionUgi), ipAddress) } } catch { case e: IOException => diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtils.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtils.scala index c261f4cdb..194ebd22f 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtils.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtils.scala @@ -30,27 +30,29 @@ import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation} import org.apache.hadoop.security.authentication.util.KerberosUtil -import org.apache.hive.common.util.HiveVersionInfo import org.apache.spark.{KyuubiSparkUtil, SparkConf} import org.apache.spark.KyuubiConf._ import org.apache.zookeeper._ +import org.apache.zookeeper.KeeperException.{ConnectionLossException, NodeExistsException} import org.apache.zookeeper.data.ACL -import yaooqinn.kyuubi.Logging +import yaooqinn.kyuubi.{Logging, _} import yaooqinn.kyuubi.server.{FrontendService, KyuubiServer} +import yaooqinn.kyuubi.service.ServiceException -object HighAvailabilityUtils extends Logging { +private[kyuubi] object HighAvailabilityUtils extends Logging { - private[this] val ZOOKEEPER_PATH_SEPARATOR = "/" + private[this] val ZK_PATH_SEPARATOR = "/" - private[this] var zooKeeperClient: CuratorFramework = _ + private[this] var zkClient: CuratorFramework = _ private[this] var znode: PersistentEphemeralNode = _ private[this] var znodePath: String = _ // Set to true only when deregistration happens private[this] var deregisteredWithZooKeeper = false def isSupportDynamicServiceDiscovery(conf: SparkConf): Boolean = { - conf.get(HA_ENABLED.key).toBoolean && conf.get(HA_ZOOKEEPER_QUORUM.key).split(",").nonEmpty + conf.getBoolean(HA_ENABLED.key, defaultValue = false) && + conf.get(HA_ZOOKEEPER_QUORUM.key, "").nonEmpty } @throws[Exception] @@ -63,67 +65,69 @@ object HighAvailabilityUtils extends Logging { val sessionTimeout = conf.getTimeAsMs(HA_ZOOKEEPER_SESSION_TIMEOUT.key).toInt val baseSleepTime = conf.getTimeAsMs(HA_ZOOKEEPER_CONNECTION_BASESLEEPTIME.key).toInt val maxRetries = conf.get(HA_ZOOKEEPER_CONNECTION_MAX_RETRIES.key).toInt + val znodeTimeout = conf.getTimeAsSeconds(HA_ZOOKEEPER_ZNODE_CREATION_TIMEOUT.key) + // Create a CuratorFramework instance to be used as the ZooKeeper client // Use the zooKeeperAclProvider to create appropriate ACLs - zooKeeperClient = + zkClient = CuratorFrameworkFactory.builder.connectString(zooKeeperEnsemble) .sessionTimeoutMs(sessionTimeout) .aclProvider(zooKeeperAclProvider) .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)) .build - zooKeeperClient.start() + zkClient.start() // Create the parent znodes recursively; ignore if the parent already exists. try { - zooKeeperClient + zkClient .create .creatingParentsIfNeeded .withMode(CreateMode.PERSISTENT) - .forPath(ZOOKEEPER_PATH_SEPARATOR + rootNamespace) + .forPath(ZK_PATH_SEPARATOR + rootNamespace) info("Created the root name space: " + rootNamespace + " on ZooKeeper for KyuubiServer") } catch { + case e: ConnectionLossException => + throwServiceEx( s"ZooKeeper is still unreachable after ${sessionTimeout / 1000}s", e) + case _: NodeExistsException => case e: KeeperException => - if (e.code ne KeeperException.Code.NODEEXISTS) { - error("Unable to create KyuubiServer namespace: " + rootNamespace + " on ZooKeeper", e) - throw e - } + throwServiceEx( s"Unable to create KyuubiServer namespace $rootNamespace on ZooKeeper", e) } // Create a znode under the rootNamespace parent for this instance of the server // Znode name: serverUri=host:port;version=versionInfo;sequence=sequenceNumber try { - val pathPrefix = ZOOKEEPER_PATH_SEPARATOR + rootNamespace + ZOOKEEPER_PATH_SEPARATOR + - "serverUri=" + instanceURI + ";" + - "version=" + HiveVersionInfo.getVersion + ";" + "sequence=" - var znodeData = "" - znodeData = instanceURI + val pathPrefix = ZK_PATH_SEPARATOR + rootNamespace + ZK_PATH_SEPARATOR + + "serverUri=" + instanceURI + ";" + "version=" + KYUUBI_VERSION + ";" + "sequence=" + val znodeData = instanceURI val znodeDataUTF8 = znodeData.getBytes(Charset.forName("UTF-8")) znode = new PersistentEphemeralNode( - zooKeeperClient, + zkClient, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8) znode.start() - // We'll wait for 120s for node creation - val znodeCreationTimeout = 120 - if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) { - throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted") + if (!znode.waitForInitialCreate(znodeTimeout, TimeUnit.SECONDS)) { + throw new ServiceException("Max znode creation wait time: " + znodeTimeout + "s exhausted") } setDeregisteredWithZooKeeper(false) znodePath = znode.getActualPath // Set a watch on the znode - if (zooKeeperClient.checkExists.usingWatcher(new DeRegisterWatcher(server)) + if (zkClient.checkExists.usingWatcher(new DeRegisterWatcher(server)) .forPath(znodePath) == null) { // No node exists, throw exception - throw new Exception("Unable to create znode for this KyuubiServer instance on ZooKeeper.") + throwServiceEx("Unable to create znode for this KyuubiServer instance on ZooKeeper.") } info("Created a znode on ZooKeeper for KyuubiServer uri: " + instanceURI) } catch { case e: Exception => - error("Unable to create a znode for this server instance", e) if (znode != null) znode.close() - throw e + throwServiceEx("Unable to create a znode for this server instance", e) } } + private[this] def throwServiceEx(msg: String, e: Exception = null): Unit = { + error(msg, e) + throw new ServiceException(msg, e) + } + /** * Get the ensemble server addresses from the configuration. The format is: host1:port, * host2:port... diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtilsSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtilsSuite.scala new file mode 100644 index 000000000..766448465 --- /dev/null +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtilsSuite.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package yaooqinn.kyuubi.ha + +import org.apache.curator.test.TestingServer +import org.apache.spark.{KyuubiConf, SparkConf, SparkFunSuite} +import org.apache.spark.KyuubiConf.HA_ZOOKEEPER_CONNECTION_MAX_RETRIES +import org.apache.zookeeper.KeeperException.ConnectionLossException +import org.scalatest.{BeforeAndAfterEach, ConfigMap} + +import yaooqinn.kyuubi.server.KyuubiServer +import yaooqinn.kyuubi.service.ServiceException + +class HighAvailabilityUtilsSuite extends SparkFunSuite with BeforeAndAfterEach { + + var zkServer: TestingServer = _ + var connectString: String = _ + val conf = new SparkConf(loadDefaults = true) + KyuubiServer.setupCommonConfig(conf) + conf.set(KyuubiConf.FRONTEND_BIND_PORT.key, "0") + var server: KyuubiServer = _ + + override def beforeAll(configMap: ConfigMap): Unit = { + zkServer = new TestingServer(2181, true) + connectString = zkServer.getConnectString + conf.set(KyuubiConf.HA_ZOOKEEPER_QUORUM.key, connectString) + conf.set(KyuubiConf.HA_ZOOKEEPER_ZNODE_CREATION_TIMEOUT.key, "1s") + conf.set(KyuubiConf.HA_ZOOKEEPER_SESSION_TIMEOUT.key, "3s") + conf.set(HA_ZOOKEEPER_CONNECTION_MAX_RETRIES.key, "1") + } + + override def afterAll(): Unit = { + zkServer.stop() + } + + override def afterEach(): Unit = { + server.stop() + } + + test("Add Server Instance To ZooKeeper with host:port") { + server = new KyuubiServer() + server.init(conf) + server.start() + HighAvailabilityUtils.addServerInstanceToZooKeeper(server) + } + + test("Add Server Instance To ZooKeeper with wrong port") { + server = new KyuubiServer() + conf.set(KyuubiConf.HA_ZOOKEEPER_CLIENT_PORT.key, "2000") + server.init(conf) + server.start() + HighAvailabilityUtils.addServerInstanceToZooKeeper(server) + } + + test("Add Server Instance To ZooKeeper with host and port") { + server = new KyuubiServer() + conf.set(KyuubiConf.HA_ZOOKEEPER_QUORUM.key, connectString.split(":")(0)) + conf.set(KyuubiConf.HA_ZOOKEEPER_CLIENT_PORT.key, "2181") + server.init(conf) + server.start() + HighAvailabilityUtils.addServerInstanceToZooKeeper(server) + } + + test("Add Server Instance To ZooKeeper with host and wrong port") { + server = new KyuubiServer() + conf.set(KyuubiConf.HA_ZOOKEEPER_QUORUM.key, connectString.split(":")(0)) + conf.set(KyuubiConf.HA_ZOOKEEPER_CLIENT_PORT.key, "2000") + server.init(conf) + server.start() + val e = intercept[ServiceException](HighAvailabilityUtils.addServerInstanceToZooKeeper(server)) + assert(e.getCause.isInstanceOf[ConnectionLossException]) + } + + test("Add Server Instance To ZooKeeper with wrong host and right port") { + server = new KyuubiServer() + conf.set(KyuubiConf.HA_ZOOKEEPER_QUORUM.key, connectString.split(":")(0) + "1") + conf.set(KyuubiConf.HA_ZOOKEEPER_CLIENT_PORT.key, "2181") + server.init(conf) + server.start() + val e = intercept[ServiceException](HighAvailabilityUtils.addServerInstanceToZooKeeper(server)) + assert(e.getCause.isInstanceOf[ConnectionLossException]) + } + + test("Is Support Dynamic Service Discovery") { + val conf = new SparkConf(loadDefaults = true) + assert(!HighAvailabilityUtils.isSupportDynamicServiceDiscovery(conf)) + KyuubiServer.setupCommonConfig(conf) + assert(!HighAvailabilityUtils.isSupportDynamicServiceDiscovery(conf)) + conf.set(KyuubiConf.HA_ENABLED.key, "true") + assert(!HighAvailabilityUtils.isSupportDynamicServiceDiscovery(conf)) + conf.set(KyuubiConf.HA_ZOOKEEPER_QUORUM.key, "localhost") + assert(HighAvailabilityUtils.isSupportDynamicServiceDiscovery(conf)) + } +} diff --git a/pom.xml b/pom.xml index d1fb051a1..84d4b871e 100644 --- a/pom.xml +++ b/pom.xml @@ -313,7 +313,12 @@ - + + org.apache.curator + curator-test + 2.6.0 + test +