[KYUUBI #1359] Support setting zk chroot path when initialize cluster

### _Why are the changes needed?_
Users may encounter the following exception when trying to set `kyuubi.ha.zookeeper.quorum` to `localhost:2181/lakehouse` with chroot path `/lakehouse` nonexisted:
```
2021-11-10 10:56:34.510 ERROR server.KyuubiThriftBinaryFrontendService: Error starting service KyuubiServiceDiscovery
org.apache.kyuubi.KyuubiException: Failed to create namespace '/kyuubi'
        at org.apache.kyuubi.ha.client.ServiceDiscovery$.createServiceNode(ServiceDiscovery.scala:225)
        at org.apache.kyuubi.ha.client.ServiceDiscovery.start(ServiceDiscovery.scala:101)
......
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /kyuubi
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:114)
......
```
It is wonderful to support this since zookeeper connection with chroot path is generally recommended in production environments. With this feture the znodes in zookeeper likes below:
```
[zk: localhost:2181(CONNECTED) 28] ls /lakehouse
[kyuubi, kyuubi_USER, kyuubi_USER_SPARK_SQL]
```
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1359 from murong00/branch-1358.

Closes #1359

42459ba0 [sunfangbin] Add a test to cover the changes
0191ab66 [sunfangbin] Fail the invalid cases
d04c532b [sunfangbin] address all the comments
8e7b5a64 [sunfangbin] Support setting zk chroot path when initialize cluster

Authored-by: sunfangbin <sunfangbin@cmss.chinamobile.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
This commit is contained in:
sunfangbin 2021-11-12 12:04:35 +08:00 committed by ulysses-you
parent 1061d176c8
commit 9e1f9c2e98
No known key found for this signature in database
GPG Key ID: 4C500BC62D576766
2 changed files with 110 additions and 1 deletions

View File

@ -17,9 +17,15 @@
package org.apache.kyuubi.server
import java.util
import scala.util.Properties
import org.apache.curator.utils.ZKPaths
import org.apache.hadoop.security.UserGroupInformation
import org.apache.zookeeper.CreateMode.PERSISTENT
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.NodeExistsException
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
@ -27,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_PROTOCOLS, FrontendProtocol
import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols._
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{ServiceDiscovery, ZooKeeperAuthTypes}
import org.apache.kyuubi.ha.client.ZooKeeperClientProvider._
import org.apache.kyuubi.metrics.{MetricsConf, MetricsSystem}
import org.apache.kyuubi.service.{AbstractBackendService, AbstractFrontendService, Serverable}
import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
@ -42,6 +49,43 @@ object KyuubiServer extends Logging {
zkServer.start()
conf.set(HA_ZK_QUORUM, zkServer.getConnectString)
conf.set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString)
} else {
// create chroot path if necessary
val connectionStr = conf.get(HA_ZK_QUORUM)
val addresses = connectionStr.split(",")
val slashOption = util.Arrays.copyOfRange(addresses, 0, addresses.length -1)
.toList
.find(_.contains("/"))
if (slashOption.isDefined) {
throw new IllegalArgumentException(s"Illegal zookeeper quorum '$connectionStr', " +
s"the chroot path started with / is only allowed at the end!")
}
val chrootIndex = connectionStr.indexOf("/")
val chrootOption = {
if (chrootIndex > 0) Some(connectionStr.substring(chrootIndex))
else None
}
chrootOption.foreach { chroot =>
val zkConnectionForChrootCreation = connectionStr.substring(0, chrootIndex)
val overrideQuorumConf = conf.clone.set(HA_ZK_QUORUM, zkConnectionForChrootCreation)
withZkClient(overrideQuorumConf) { zkClient =>
if (zkClient.checkExists().forPath(chroot) == null) {
val chrootPath = ZKPaths.makePath(null, chroot)
try {
zkClient
.create()
.creatingParentsIfNeeded()
.withMode(PERSISTENT)
.forPath(chrootPath)
} catch {
case _: NodeExistsException => // do nothing
case e: KeeperException =>
throw new KyuubiException(s"Failed to create chroot path '$chrootPath'", e)
}
}
}
info(s"Created zookeeper chroot path $chroot")
}
}
val server = new KyuubiServer()

View File

@ -17,12 +17,43 @@
package org.apache.kyuubi.server
import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite}
import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.ZooKeeperClientProvider
import org.apache.kyuubi.service.ServiceState._
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
class KyuubiServerSuite extends KyuubiFunSuite {
private var zkServer: EmbeddedZookeeper = _
private var server: KyuubiServer = _
override def beforeAll(): Unit = {
val conf = KyuubiConf()
zkServer = new EmbeddedZookeeper()
conf.set(ZookeeperConf.ZK_CLIENT_PORT, 0)
val zkData = Utils.createTempDir()
conf.set(ZookeeperConf.ZK_DATA_DIR, zkData.toString)
zkServer.initialize(conf)
zkServer.start()
super.beforeAll()
Thread.sleep(1000)
}
override def afterAll(): Unit = {
if (server != null) {
server.stop()
server = null
}
if (zkServer != null) {
zkServer.stop()
zkServer = null
}
super.afterAll()
}
test("kyuubi server basic") {
val server = new KyuubiServer()
server.stop()
@ -65,4 +96,38 @@ class KyuubiServerSuite extends KyuubiFunSuite {
assert(e.getMessage.contains("Failed to initialize frontend service"))
assert(e.getCause.getMessage === "Invalid Port number")
}
test("invalid zookeeper quorum") {
val conf = KyuubiConf()
val quorum1 = "localhost:2181/lake,localhost:2182/lake"
conf.set(HighAvailabilityConf.HA_ZK_QUORUM, quorum1)
val exp1 = intercept[IllegalArgumentException](KyuubiServer.startServer(conf))
assert(exp1.getMessage === s"Illegal zookeeper quorum '$quorum1', " +
s"the chroot path started with / is only allowed at the end!")
val quorum2 = "localhost:2181/lake,localhost:2182"
conf.set(HighAvailabilityConf.HA_ZK_QUORUM, quorum2)
val exp2 = intercept[IllegalArgumentException](KyuubiServer.startServer(conf))
assert(exp2.getMessage === s"Illegal zookeeper quorum '$quorum2', " +
s"the chroot path started with / is only allowed at the end!")
}
test("kyuubi server starts with chroot") {
val conf = KyuubiConf()
val zkConnection = zkServer.getConnectString
val chrootPath = "/lake"
conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkConnection)
// chroot path does not exist before server start
ZooKeeperClientProvider.withZkClient(conf) { client =>
assert(client.checkExists().forPath(chrootPath) == null)
}
val zkWithChroot = zkConnection + chrootPath
val chrootConf = conf.clone.set(HighAvailabilityConf.HA_ZK_QUORUM, zkWithChroot)
server = KyuubiServer.startServer(chrootConf)
// chroot path exists after server started
ZooKeeperClientProvider.withZkClient(conf) { client =>
assert(client.checkExists().forPath(chrootPath) != null)
}
}
}