From 9e1f9c2e982f0b59da20b58476c6fac15de91c32 Mon Sep 17 00:00:00 2001 From: sunfangbin Date: Fri, 12 Nov 2021 12:04:35 +0800 Subject: [PATCH] [KYUUBI #1359] Support setting zk chroot path when initialize cluster ### _Why are the changes needed?_ Users may encounter the following exception when trying to set `kyuubi.ha.zookeeper.quorum` to `localhost:2181/lakehouse` with chroot path `/lakehouse` nonexisted: ``` 2021-11-10 10:56:34.510 ERROR server.KyuubiThriftBinaryFrontendService: Error starting service KyuubiServiceDiscovery org.apache.kyuubi.KyuubiException: Failed to create namespace '/kyuubi' at org.apache.kyuubi.ha.client.ServiceDiscovery$.createServiceNode(ServiceDiscovery.scala:225) at org.apache.kyuubi.ha.client.ServiceDiscovery.start(ServiceDiscovery.scala:101) ...... Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /kyuubi at org.apache.zookeeper.KeeperException.create(KeeperException.java:114) ...... ``` It is wonderful to support this since zookeeper connection with chroot path is generally recommended in production environments. With this feture the znodes in zookeeper likes below: ``` [zk: localhost:2181(CONNECTED) 28] ls /lakehouse [kyuubi, kyuubi_USER, kyuubi_USER_SPARK_SQL] ``` ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1359 from murong00/branch-1358. Closes #1359 42459ba0 [sunfangbin] Add a test to cover the changes 0191ab66 [sunfangbin] Fail the invalid cases d04c532b [sunfangbin] address all the comments 8e7b5a64 [sunfangbin] Support setting zk chroot path when initialize cluster Authored-by: sunfangbin Signed-off-by: ulysses-you --- .../apache/kyuubi/server/KyuubiServer.scala | 44 ++++++++++++ .../kyuubi/server/KyuubiServerSuite.scala | 67 ++++++++++++++++++- 2 files changed, 110 insertions(+), 1 deletion(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala index 8eb28db1f..f27d0df9a 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala @@ -17,9 +17,15 @@ package org.apache.kyuubi.server +import java.util + import scala.util.Properties +import org.apache.curator.utils.ZKPaths import org.apache.hadoop.security.UserGroupInformation +import org.apache.zookeeper.CreateMode.PERSISTENT +import org.apache.zookeeper.KeeperException +import org.apache.zookeeper.KeeperException.NodeExistsException import org.apache.kyuubi._ import org.apache.kyuubi.config.KyuubiConf @@ -27,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_PROTOCOLS, FrontendProtocol import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols._ import org.apache.kyuubi.ha.HighAvailabilityConf._ import org.apache.kyuubi.ha.client.{ServiceDiscovery, ZooKeeperAuthTypes} +import org.apache.kyuubi.ha.client.ZooKeeperClientProvider._ import org.apache.kyuubi.metrics.{MetricsConf, MetricsSystem} import org.apache.kyuubi.service.{AbstractBackendService, AbstractFrontendService, Serverable} import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister} @@ -42,6 +49,43 @@ object KyuubiServer extends Logging { zkServer.start() conf.set(HA_ZK_QUORUM, zkServer.getConnectString) conf.set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString) + } else { + // create chroot path if necessary + val connectionStr = conf.get(HA_ZK_QUORUM) + val addresses = connectionStr.split(",") + val slashOption = util.Arrays.copyOfRange(addresses, 0, addresses.length -1) + .toList + .find(_.contains("/")) + if (slashOption.isDefined) { + throw new IllegalArgumentException(s"Illegal zookeeper quorum '$connectionStr', " + + s"the chroot path started with / is only allowed at the end!") + } + val chrootIndex = connectionStr.indexOf("/") + val chrootOption = { + if (chrootIndex > 0) Some(connectionStr.substring(chrootIndex)) + else None + } + chrootOption.foreach { chroot => + val zkConnectionForChrootCreation = connectionStr.substring(0, chrootIndex) + val overrideQuorumConf = conf.clone.set(HA_ZK_QUORUM, zkConnectionForChrootCreation) + withZkClient(overrideQuorumConf) { zkClient => + if (zkClient.checkExists().forPath(chroot) == null) { + val chrootPath = ZKPaths.makePath(null, chroot) + try { + zkClient + .create() + .creatingParentsIfNeeded() + .withMode(PERSISTENT) + .forPath(chrootPath) + } catch { + case _: NodeExistsException => // do nothing + case e: KeeperException => + throw new KyuubiException(s"Failed to create chroot path '$chrootPath'", e) + } + } + } + info(s"Created zookeeper chroot path $chroot") + } } val server = new KyuubiServer() diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala index f749b0553..241c8dc1c 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala @@ -17,12 +17,43 @@ package org.apache.kyuubi.server -import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite} +import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite, Utils} import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.ha.HighAvailabilityConf +import org.apache.kyuubi.ha.client.ZooKeeperClientProvider import org.apache.kyuubi.service.ServiceState._ +import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf} class KyuubiServerSuite extends KyuubiFunSuite { + private var zkServer: EmbeddedZookeeper = _ + private var server: KyuubiServer = _ + + override def beforeAll(): Unit = { + val conf = KyuubiConf() + zkServer = new EmbeddedZookeeper() + conf.set(ZookeeperConf.ZK_CLIENT_PORT, 0) + val zkData = Utils.createTempDir() + conf.set(ZookeeperConf.ZK_DATA_DIR, zkData.toString) + zkServer.initialize(conf) + zkServer.start() + super.beforeAll() + Thread.sleep(1000) + } + + override def afterAll(): Unit = { + if (server != null) { + server.stop() + server = null + } + + if (zkServer != null) { + zkServer.stop() + zkServer = null + } + super.afterAll() + } + test("kyuubi server basic") { val server = new KyuubiServer() server.stop() @@ -65,4 +96,38 @@ class KyuubiServerSuite extends KyuubiFunSuite { assert(e.getMessage.contains("Failed to initialize frontend service")) assert(e.getCause.getMessage === "Invalid Port number") } + + test("invalid zookeeper quorum") { + val conf = KyuubiConf() + val quorum1 = "localhost:2181/lake,localhost:2182/lake" + conf.set(HighAvailabilityConf.HA_ZK_QUORUM, quorum1) + val exp1 = intercept[IllegalArgumentException](KyuubiServer.startServer(conf)) + assert(exp1.getMessage === s"Illegal zookeeper quorum '$quorum1', " + + s"the chroot path started with / is only allowed at the end!") + + val quorum2 = "localhost:2181/lake,localhost:2182" + conf.set(HighAvailabilityConf.HA_ZK_QUORUM, quorum2) + val exp2 = intercept[IllegalArgumentException](KyuubiServer.startServer(conf)) + assert(exp2.getMessage === s"Illegal zookeeper quorum '$quorum2', " + + s"the chroot path started with / is only allowed at the end!") + } + + test("kyuubi server starts with chroot") { + val conf = KyuubiConf() + val zkConnection = zkServer.getConnectString + val chrootPath = "/lake" + conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkConnection) + // chroot path does not exist before server start + ZooKeeperClientProvider.withZkClient(conf) { client => + assert(client.checkExists().forPath(chrootPath) == null) + } + + val zkWithChroot = zkConnection + chrootPath + val chrootConf = conf.clone.set(HighAvailabilityConf.HA_ZK_QUORUM, zkWithChroot) + server = KyuubiServer.startServer(chrootConf) + // chroot path exists after server started + ZooKeeperClientProvider.withZkClient(conf) { client => + assert(client.checkExists().forPath(chrootPath) != null) + } + } }