[KYUUBI #930] Extract the zookeeper part from ServiceDiscovery

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
### Description

`ServiceDiscovery` contains `discovery` and `zookeeper` part, `ServiceDiscovery` should only has one responsibility.

![image](https://user-images.githubusercontent.com/86483005/129339811-9cfaeabf-10dc-4962-9d59-4ca7835ccce5.png)

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #931 from timothy65535/ky-930.

Closes #930

29d48921 [timothy65535] Add ZooKeeperClientProvider
17fbe5a2 [timothy65535] [KYUUBI #930] Extract the zookeeper part from ServiceDiscovery

Authored-by: timothy65535 <timothy65535@163.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
timothy65535 2021-08-14 23:11:18 +08:00 committed by Cheng Pan
parent 7e2d67ed61
commit 9efa78b65e
No known key found for this signature in database
GPG Key ID: F07E6C29ED4E2E5B
9 changed files with 125 additions and 108 deletions

View File

@ -22,7 +22,7 @@ import org.apache.curator.framework.CuratorFramework
import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_ACL_ENABLED, HA_ZK_NAMESPACE, HA_ZK_QUORUM}
import org.apache.kyuubi.ha.client.ServiceDiscovery
import org.apache.kyuubi.ha.client.ZooKeeperClientProvider
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
@ -62,7 +62,7 @@ trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
}
def withZkClient(f: CuratorFramework => Unit): Unit = {
ServiceDiscovery.withZkClient(kyuubiConf)(f)
ZooKeeperClientProvider.withZkClient(kyuubiConf)(f)
}
protected def getDiscoveryConnectionString: String = {

View File

@ -25,7 +25,7 @@ import org.apache.curator.utils.ZKPaths
import org.apache.kyuubi.Logging
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo}
import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo, ZooKeeperClientProvider}
private[ctl] object ServiceControlAction extends Enumeration {
type ServiceControlAction = Value
@ -43,6 +43,7 @@ private[ctl] object ServiceControlObject extends Enumeration {
private[kyuubi] class ServiceControlCli extends Logging {
import ServiceControlCli._
import ServiceDiscovery._
import ZooKeeperClientProvider._
private var verbose: Boolean = false

View File

@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_NAMESPACE, HA_ZK_QUORUM}
import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo}
import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo, ZooKeeperClientProvider}
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
trait TestPrematureExit {
@ -87,6 +87,7 @@ trait TestPrematureExit {
class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
import ServiceControlCli._
import ServiceDiscovery._
import ZooKeeperClientProvider._
val zkServer = new EmbeddedZookeeper()
val conf: KyuubiConf = KyuubiConf()

View File

@ -17,22 +17,18 @@
package org.apache.kyuubi.ha.client
import java.io.{File, IOException}
import java.io.IOException
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import javax.security.auth.login.Configuration
import scala.collection.JavaConverters._
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.nodes.PersistentNode
import org.apache.curator.framework.state.{ConnectionState, ConnectionStateListener}
import org.apache.curator.framework.state.ConnectionState.{CONNECTED, LOST, RECONNECTED}
import org.apache.curator.retry._
import org.apache.curator.utils.ZKPaths
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.JaasConfiguration
import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent, Watcher}
import org.apache.zookeeper.CreateMode.PERSISTENT
import org.apache.zookeeper.KeeperException.NodeExistsException
@ -41,7 +37,7 @@ import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.service.{AbstractService, Serverable}
import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
import org.apache.kyuubi.util.ThreadUtils
/**
* A abstract service for service discovery
@ -54,6 +50,7 @@ abstract class ServiceDiscovery (
server: Serverable) extends AbstractService(name) {
import ServiceDiscovery._
import ZooKeeperClientProvider._
private var _zkClient: CuratorFramework = _
private var _serviceNode: PersistentNode = _
@ -73,7 +70,6 @@ abstract class ServiceDiscovery (
_namespace = conf.get(HA_ZK_NAMESPACE)
val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
setUpZooKeeperAuth(conf)
_zkClient = buildZookeeperClient(conf)
zkClient.getConnectionStateListenable.addListener(new ConnectionStateListener {
private val isConnected = new AtomicBoolean(false)
@ -157,96 +153,9 @@ abstract class ServiceDiscovery (
object ServiceDiscovery extends Logging {
import RetryPolicies._
private final lazy val connectionChecker =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("zk-connection-checker")
def buildZookeeperClient(conf: KyuubiConf): CuratorFramework = {
val connectionStr = conf.get(HA_ZK_QUORUM)
val sessionTimeout = conf.get(HA_ZK_SESSION_TIMEOUT)
val connectionTimeout = conf.get(HA_ZK_CONN_TIMEOUT)
val baseSleepTime = conf.get(HA_ZK_CONN_BASE_RETRY_WAIT)
val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
val retryPolicyName = conf.get(HA_ZK_CONN_RETRY_POLICY)
val retryPolicy = RetryPolicies.withName(retryPolicyName) match {
case ONE_TIME => new RetryOneTime(baseSleepTime)
case N_TIME => new RetryNTimes(maxRetries, baseSleepTime)
case BOUNDED_EXPONENTIAL_BACKOFF =>
new BoundedExponentialBackoffRetry(baseSleepTime, maxSleepTime, maxRetries)
case UNTIL_ELAPSED => new RetryUntilElapsed(maxSleepTime, baseSleepTime)
case _ => new ExponentialBackoffRetry(baseSleepTime, maxRetries)
}
CuratorFrameworkFactory.builder()
.connectString(connectionStr)
.sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(connectionTimeout)
.aclProvider(new ZooKeeperACLProvider(conf))
.retryPolicy(retryPolicy)
.build()
}
/**
* Create a [[CuratorFramework]] instance to be used as the ZooKeeper client
* Use the [[ZooKeeperACLProvider]] to create appropriate ACLs
*/
def startZookeeperClient(conf: KyuubiConf): CuratorFramework = {
val connectionStr = conf.get(HA_ZK_QUORUM)
val sessionTimeout = conf.get(HA_ZK_SESSION_TIMEOUT)
val connectionTimeout = conf.get(HA_ZK_CONN_TIMEOUT)
val retryPolicy = new ExponentialBackoffRetry(1000, 3)
val client = CuratorFrameworkFactory.builder()
.connectString(connectionStr)
.sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(connectionTimeout)
.retryPolicy(retryPolicy)
.build()
client.start()
client
}
/**
* Creates a zookeeper client before calling `f` and close it after calling `f`.
*/
def withZkClient(conf: KyuubiConf)(f: CuratorFramework => Unit): Unit = {
val zkClient = startZookeeperClient(conf)
try {
f(zkClient)
} finally {
try {
zkClient.close()
} catch {
case e: IOException => error("Failed to release the zkClient", e)
}
}
}
/**
* For a kerberized cluster, we dynamically set up the client's JAAS conf.
*
* @param conf SparkConf
* @return
*/
@throws[Exception]
def setUpZooKeeperAuth(conf: KyuubiConf): Unit = {
if (conf.get(HA_ZK_ACL_ENABLED)) {
val keyTabFile = conf.get(KyuubiConf.SERVER_KEYTAB)
val maybePrincipal = conf.get(KyuubiConf.SERVER_PRINCIPAL)
val kerberized = maybePrincipal.isDefined && keyTabFile.isDefined
if (UserGroupInformation.isSecurityEnabled && kerberized) {
if (!new File(keyTabFile.get).exists()) {
throw new IOException(s"${KyuubiConf.SERVER_KEYTAB.key} does not exists")
}
System.setProperty("zookeeper.sasl.clientconfig", "KyuubiZooKeeperClient")
var principal = maybePrincipal.get
principal = KyuubiHadoopUtils.getServerPrincipal(principal)
val jaasConf = new JaasConfiguration("KyuubiZooKeeperClient", principal, keyTabFile.get)
Configuration.setConfiguration(jaasConf)
}
}
}
def supportServiceDiscovery(conf: KyuubiConf): Boolean = {
val zkEnsemble = conf.get(HA_ZK_QUORUM)
zkEnsemble != null && zkEnsemble.nonEmpty

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.ha.client
import java.io.{File, IOException}
import javax.security.auth.login.Configuration
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry._
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.JaasConfiguration
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.util.KyuubiHadoopUtils
object ZooKeeperClientProvider extends Logging {
import RetryPolicies._
/**
* Create a [[CuratorFramework]] instance to be used as the ZooKeeper client
* Use the [[ZooKeeperACLProvider]] to create appropriate ACLs
*/
def buildZookeeperClient(conf: KyuubiConf): CuratorFramework = {
setUpZooKeeperAuth(conf)
val connectionStr = conf.get(HA_ZK_QUORUM)
val sessionTimeout = conf.get(HA_ZK_SESSION_TIMEOUT)
val connectionTimeout = conf.get(HA_ZK_CONN_TIMEOUT)
val baseSleepTime = conf.get(HA_ZK_CONN_BASE_RETRY_WAIT)
val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
val retryPolicyName = conf.get(HA_ZK_CONN_RETRY_POLICY)
val retryPolicy = RetryPolicies.withName(retryPolicyName) match {
case ONE_TIME => new RetryOneTime(baseSleepTime)
case N_TIME => new RetryNTimes(maxRetries, baseSleepTime)
case BOUNDED_EXPONENTIAL_BACKOFF =>
new BoundedExponentialBackoffRetry(baseSleepTime, maxSleepTime, maxRetries)
case UNTIL_ELAPSED => new RetryUntilElapsed(maxSleepTime, baseSleepTime)
case _ => new ExponentialBackoffRetry(baseSleepTime, maxRetries)
}
CuratorFrameworkFactory.builder()
.connectString(connectionStr)
.sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(connectionTimeout)
.aclProvider(new ZooKeeperACLProvider(conf))
.retryPolicy(retryPolicy)
.build()
}
/**
* Creates a zookeeper client before calling `f` and close it after calling `f`.
*/
def withZkClient(conf: KyuubiConf)(f: CuratorFramework => Unit): Unit = {
val zkClient = buildZookeeperClient(conf)
try {
zkClient.start()
f(zkClient)
} finally {
try {
zkClient.close()
} catch {
case e: IOException => error("Failed to release the zkClient", e)
}
}
}
/**
* For a kerberized cluster, we dynamically set up the client's JAAS conf.
*
* @param conf SparkConf
* @return
*/
@throws[Exception]
def setUpZooKeeperAuth(conf: KyuubiConf): Unit = {
if (conf.get(HA_ZK_ACL_ENABLED)) {
val keyTabFile = conf.get(KyuubiConf.SERVER_KEYTAB)
val maybePrincipal = conf.get(KyuubiConf.SERVER_PRINCIPAL)
val kerberized = maybePrincipal.isDefined && keyTabFile.isDefined
if (UserGroupInformation.isSecurityEnabled && kerberized) {
if (!new File(keyTabFile.get).exists()) {
throw new IOException(s"${KyuubiConf.SERVER_KEYTAB.key} does not exists")
}
System.setProperty("zookeeper.sasl.clientconfig", "KyuubiZooKeeperClient")
var principal = maybePrincipal.get
principal = KyuubiHadoopUtils.getServerPrincipal(principal)
val jaasConf = new JaasConfiguration("KyuubiZooKeeperClient", principal, keyTabFile.get)
Configuration.setConfiguration(jaasConf)
}
}
}
}

View File

@ -34,7 +34,7 @@ import org.apache.kyuubi.service.{NoopServer, Serverable, ServiceState}
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
class ServiceDiscoverySuite extends KerberizedTestHelper {
import ServiceDiscovery._
import ZooKeeperClientProvider._
val zkServer = new EmbeddedZookeeper()
val conf: KyuubiConf = KyuubiConf()
@ -118,7 +118,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
conf.set(KyuubiConf.SERVER_PRINCIPAL, principal)
conf.set(HighAvailabilityConf.HA_ZK_ACL_ENABLED, true)
ServiceDiscovery.setUpZooKeeperAuth(conf)
ZooKeeperClientProvider.setUpZooKeeperAuth(conf)
val configuration = Configuration.getConfiguration
val entries = configuration.getAppConfigurationEntry("KyuubiZooKeeperClient")
@ -130,7 +130,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
assert(options("useKeyTab").toString.toBoolean)
conf.set(KyuubiConf.SERVER_KEYTAB, keytab.getName)
val e = intercept[IOException](ServiceDiscovery.setUpZooKeeperAuth(conf))
val e = intercept[IOException](ZooKeeperClientProvider.setUpZooKeeperAuth(conf))
assert(e.getMessage === s"${KyuubiConf.SERVER_KEYTAB.key} does not exists")
}
}

View File

@ -28,7 +28,7 @@ import org.apache.kyuubi.client.KyuubiSyncThriftClient
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.EngineRef
import org.apache.kyuubi.ha.client.ServiceDiscovery._
import org.apache.kyuubi.ha.client.ZooKeeperClientProvider._
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.service.authentication.PlainSASLHelper

View File

@ -22,7 +22,6 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.client.ServiceDiscovery
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.KyuubiOperationManager
@ -34,7 +33,6 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
val operationManager = new KyuubiOperationManager()
override def initialize(conf: KyuubiConf): Unit = {
ServiceDiscovery.setUpZooKeeperAuth(conf)
super.initialize(conf)
}

View File

@ -24,7 +24,7 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.ServiceDiscovery
import org.apache.kyuubi.ha.client.ZooKeeperClientProvider
import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.util.NamedThreadFactory
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
@ -106,7 +106,7 @@ class EngineRefSuite extends KyuubiFunSuite {
val r1 = new Runnable {
override def run(): Unit = {
ServiceDiscovery.withZkClient(conf) { client =>
ZooKeeperClientProvider.withZkClient(conf) { client =>
val hp = engine.getOrCreate(client)
port1 = hp._2
}
@ -115,7 +115,7 @@ class EngineRefSuite extends KyuubiFunSuite {
val r2 = new Runnable {
override def run(): Unit = {
ServiceDiscovery.withZkClient(conf) { client =>
ZooKeeperClientProvider.withZkClient(conf) { client =>
val hp = engine.getOrCreate(client)
port2 = hp._2
}