diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala index a2be2483a..379c28490 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala @@ -22,7 +22,7 @@ import org.apache.curator.framework.CuratorFramework import org.apache.kyuubi.Utils import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_ACL_ENABLED, HA_ZK_NAMESPACE, HA_ZK_QUORUM} -import org.apache.kyuubi.ha.client.ServiceDiscovery +import org.apache.kyuubi.ha.client.ZooKeeperClientProvider import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf} trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine { @@ -62,7 +62,7 @@ trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine { } def withZkClient(f: CuratorFramework => Unit): Unit = { - ServiceDiscovery.withZkClient(kyuubiConf)(f) + ZooKeeperClientProvider.withZkClient(kyuubiConf)(f) } protected def getDiscoveryConnectionString: String = { diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala index b870f0977..dd27f099f 100644 --- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala +++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala @@ -25,7 +25,7 @@ import org.apache.curator.utils.ZKPaths import org.apache.kyuubi.Logging import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.ha.HighAvailabilityConf._ -import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo} +import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo, ZooKeeperClientProvider} private[ctl] object ServiceControlAction extends Enumeration { type ServiceControlAction = Value @@ -43,6 +43,7 @@ private[ctl] object ServiceControlObject extends Enumeration { private[kyuubi] class ServiceControlCli extends Logging { import ServiceControlCli._ import ServiceDiscovery._ + import ZooKeeperClientProvider._ private var verbose: Boolean = false diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala index c68455a1f..2e4655ffb 100644 --- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala +++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala @@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_NAMESPACE, HA_ZK_QUORUM} -import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo} +import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo, ZooKeeperClientProvider} import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf} trait TestPrematureExit { @@ -87,6 +87,7 @@ trait TestPrematureExit { class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { import ServiceControlCli._ import ServiceDiscovery._ + import ZooKeeperClientProvider._ val zkServer = new EmbeddedZookeeper() val conf: KyuubiConf = KyuubiConf() diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala index 7f226caaa..f67151a5f 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala @@ -17,22 +17,18 @@ package org.apache.kyuubi.ha.client -import java.io.{File, IOException} +import java.io.IOException import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean -import javax.security.auth.login.Configuration import scala.collection.JavaConverters._ -import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} +import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.recipes.nodes.PersistentNode import org.apache.curator.framework.state.{ConnectionState, ConnectionStateListener} import org.apache.curator.framework.state.ConnectionState.{CONNECTED, LOST, RECONNECTED} -import org.apache.curator.retry._ import org.apache.curator.utils.ZKPaths -import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.JaasConfiguration import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent, Watcher} import org.apache.zookeeper.CreateMode.PERSISTENT import org.apache.zookeeper.KeeperException.NodeExistsException @@ -41,7 +37,7 @@ import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, Logging} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.ha.HighAvailabilityConf._ import org.apache.kyuubi.service.{AbstractService, Serverable} -import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils} +import org.apache.kyuubi.util.ThreadUtils /** * A abstract service for service discovery @@ -54,6 +50,7 @@ abstract class ServiceDiscovery ( server: Serverable) extends AbstractService(name) { import ServiceDiscovery._ + import ZooKeeperClientProvider._ private var _zkClient: CuratorFramework = _ private var _serviceNode: PersistentNode = _ @@ -73,7 +70,6 @@ abstract class ServiceDiscovery ( _namespace = conf.get(HA_ZK_NAMESPACE) val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT) val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES) - setUpZooKeeperAuth(conf) _zkClient = buildZookeeperClient(conf) zkClient.getConnectionStateListenable.addListener(new ConnectionStateListener { private val isConnected = new AtomicBoolean(false) @@ -157,96 +153,9 @@ abstract class ServiceDiscovery ( object ServiceDiscovery extends Logging { - import RetryPolicies._ - private final lazy val connectionChecker = ThreadUtils.newDaemonSingleThreadScheduledExecutor("zk-connection-checker") - def buildZookeeperClient(conf: KyuubiConf): CuratorFramework = { - val connectionStr = conf.get(HA_ZK_QUORUM) - val sessionTimeout = conf.get(HA_ZK_SESSION_TIMEOUT) - val connectionTimeout = conf.get(HA_ZK_CONN_TIMEOUT) - val baseSleepTime = conf.get(HA_ZK_CONN_BASE_RETRY_WAIT) - val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT) - val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES) - val retryPolicyName = conf.get(HA_ZK_CONN_RETRY_POLICY) - val retryPolicy = RetryPolicies.withName(retryPolicyName) match { - case ONE_TIME => new RetryOneTime(baseSleepTime) - case N_TIME => new RetryNTimes(maxRetries, baseSleepTime) - case BOUNDED_EXPONENTIAL_BACKOFF => - new BoundedExponentialBackoffRetry(baseSleepTime, maxSleepTime, maxRetries) - case UNTIL_ELAPSED => new RetryUntilElapsed(maxSleepTime, baseSleepTime) - case _ => new ExponentialBackoffRetry(baseSleepTime, maxRetries) - } - CuratorFrameworkFactory.builder() - .connectString(connectionStr) - .sessionTimeoutMs(sessionTimeout) - .connectionTimeoutMs(connectionTimeout) - .aclProvider(new ZooKeeperACLProvider(conf)) - .retryPolicy(retryPolicy) - .build() - } - - /** - * Create a [[CuratorFramework]] instance to be used as the ZooKeeper client - * Use the [[ZooKeeperACLProvider]] to create appropriate ACLs - */ - def startZookeeperClient(conf: KyuubiConf): CuratorFramework = { - val connectionStr = conf.get(HA_ZK_QUORUM) - val sessionTimeout = conf.get(HA_ZK_SESSION_TIMEOUT) - val connectionTimeout = conf.get(HA_ZK_CONN_TIMEOUT) - val retryPolicy = new ExponentialBackoffRetry(1000, 3) - val client = CuratorFrameworkFactory.builder() - .connectString(connectionStr) - .sessionTimeoutMs(sessionTimeout) - .connectionTimeoutMs(connectionTimeout) - .retryPolicy(retryPolicy) - .build() - client.start() - client - } - - /** - * Creates a zookeeper client before calling `f` and close it after calling `f`. - */ - def withZkClient(conf: KyuubiConf)(f: CuratorFramework => Unit): Unit = { - val zkClient = startZookeeperClient(conf) - try { - f(zkClient) - } finally { - try { - zkClient.close() - } catch { - case e: IOException => error("Failed to release the zkClient", e) - } - } - } - - /** - * For a kerberized cluster, we dynamically set up the client's JAAS conf. - * - * @param conf SparkConf - * @return - */ - @throws[Exception] - def setUpZooKeeperAuth(conf: KyuubiConf): Unit = { - if (conf.get(HA_ZK_ACL_ENABLED)) { - val keyTabFile = conf.get(KyuubiConf.SERVER_KEYTAB) - val maybePrincipal = conf.get(KyuubiConf.SERVER_PRINCIPAL) - val kerberized = maybePrincipal.isDefined && keyTabFile.isDefined - if (UserGroupInformation.isSecurityEnabled && kerberized) { - if (!new File(keyTabFile.get).exists()) { - throw new IOException(s"${KyuubiConf.SERVER_KEYTAB.key} does not exists") - } - System.setProperty("zookeeper.sasl.clientconfig", "KyuubiZooKeeperClient") - var principal = maybePrincipal.get - principal = KyuubiHadoopUtils.getServerPrincipal(principal) - val jaasConf = new JaasConfiguration("KyuubiZooKeeperClient", principal, keyTabFile.get) - Configuration.setConfiguration(jaasConf) - } - } - } - def supportServiceDiscovery(conf: KyuubiConf): Boolean = { val zkEnsemble = conf.get(HA_ZK_QUORUM) zkEnsemble != null && zkEnsemble.nonEmpty diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala new file mode 100644 index 000000000..c9a7fbcc8 --- /dev/null +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.ha.client + +import java.io.{File, IOException} +import javax.security.auth.login.Configuration + +import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} +import org.apache.curator.retry._ +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.JaasConfiguration + +import org.apache.kyuubi.Logging +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.ha.HighAvailabilityConf._ +import org.apache.kyuubi.util.KyuubiHadoopUtils + +object ZooKeeperClientProvider extends Logging { + + import RetryPolicies._ + + /** + * Create a [[CuratorFramework]] instance to be used as the ZooKeeper client + * Use the [[ZooKeeperACLProvider]] to create appropriate ACLs + */ + def buildZookeeperClient(conf: KyuubiConf): CuratorFramework = { + setUpZooKeeperAuth(conf) + val connectionStr = conf.get(HA_ZK_QUORUM) + val sessionTimeout = conf.get(HA_ZK_SESSION_TIMEOUT) + val connectionTimeout = conf.get(HA_ZK_CONN_TIMEOUT) + val baseSleepTime = conf.get(HA_ZK_CONN_BASE_RETRY_WAIT) + val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT) + val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES) + val retryPolicyName = conf.get(HA_ZK_CONN_RETRY_POLICY) + val retryPolicy = RetryPolicies.withName(retryPolicyName) match { + case ONE_TIME => new RetryOneTime(baseSleepTime) + case N_TIME => new RetryNTimes(maxRetries, baseSleepTime) + case BOUNDED_EXPONENTIAL_BACKOFF => + new BoundedExponentialBackoffRetry(baseSleepTime, maxSleepTime, maxRetries) + case UNTIL_ELAPSED => new RetryUntilElapsed(maxSleepTime, baseSleepTime) + case _ => new ExponentialBackoffRetry(baseSleepTime, maxRetries) + } + CuratorFrameworkFactory.builder() + .connectString(connectionStr) + .sessionTimeoutMs(sessionTimeout) + .connectionTimeoutMs(connectionTimeout) + .aclProvider(new ZooKeeperACLProvider(conf)) + .retryPolicy(retryPolicy) + .build() + } + + /** + * Creates a zookeeper client before calling `f` and close it after calling `f`. + */ + def withZkClient(conf: KyuubiConf)(f: CuratorFramework => Unit): Unit = { + val zkClient = buildZookeeperClient(conf) + try { + zkClient.start() + f(zkClient) + } finally { + try { + zkClient.close() + } catch { + case e: IOException => error("Failed to release the zkClient", e) + } + } + } + + /** + * For a kerberized cluster, we dynamically set up the client's JAAS conf. + * + * @param conf SparkConf + * @return + */ + @throws[Exception] + def setUpZooKeeperAuth(conf: KyuubiConf): Unit = { + if (conf.get(HA_ZK_ACL_ENABLED)) { + val keyTabFile = conf.get(KyuubiConf.SERVER_KEYTAB) + val maybePrincipal = conf.get(KyuubiConf.SERVER_PRINCIPAL) + val kerberized = maybePrincipal.isDefined && keyTabFile.isDefined + if (UserGroupInformation.isSecurityEnabled && kerberized) { + if (!new File(keyTabFile.get).exists()) { + throw new IOException(s"${KyuubiConf.SERVER_KEYTAB.key} does not exists") + } + System.setProperty("zookeeper.sasl.clientconfig", "KyuubiZooKeeperClient") + var principal = maybePrincipal.get + principal = KyuubiHadoopUtils.getServerPrincipal(principal) + val jaasConf = new JaasConfiguration("KyuubiZooKeeperClient", principal, keyTabFile.get) + Configuration.setConfiguration(jaasConf) + } + } + } +} diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala index ae7d70412..8e870efe7 100644 --- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala +++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala @@ -34,7 +34,7 @@ import org.apache.kyuubi.service.{NoopServer, Serverable, ServiceState} import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf} class ServiceDiscoverySuite extends KerberizedTestHelper { - import ServiceDiscovery._ + import ZooKeeperClientProvider._ val zkServer = new EmbeddedZookeeper() val conf: KyuubiConf = KyuubiConf() @@ -118,7 +118,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper { conf.set(KyuubiConf.SERVER_PRINCIPAL, principal) conf.set(HighAvailabilityConf.HA_ZK_ACL_ENABLED, true) - ServiceDiscovery.setUpZooKeeperAuth(conf) + ZooKeeperClientProvider.setUpZooKeeperAuth(conf) val configuration = Configuration.getConfiguration val entries = configuration.getAppConfigurationEntry("KyuubiZooKeeperClient") @@ -130,7 +130,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper { assert(options("useKeyTab").toString.toBoolean) conf.set(KyuubiConf.SERVER_KEYTAB, keytab.getName) - val e = intercept[IOException](ServiceDiscovery.setUpZooKeeperAuth(conf)) + val e = intercept[IOException](ZooKeeperClientProvider.setUpZooKeeperAuth(conf)) assert(e.getMessage === s"${KyuubiConf.SERVER_KEYTAB.key} does not exists") } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index a15d84d42..536464acd 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -28,7 +28,7 @@ import org.apache.kyuubi.client.KyuubiSyncThriftClient import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.engine.EngineRef -import org.apache.kyuubi.ha.client.ServiceDiscovery._ +import org.apache.kyuubi.ha.client.ZooKeeperClientProvider._ import org.apache.kyuubi.metrics.MetricsConstants._ import org.apache.kyuubi.metrics.MetricsSystem import org.apache.kyuubi.service.authentication.PlainSASLHelper diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index 93030f0bf..bd2aeabdd 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -22,7 +22,6 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.ha.client.ServiceDiscovery import org.apache.kyuubi.metrics.MetricsConstants._ import org.apache.kyuubi.metrics.MetricsSystem import org.apache.kyuubi.operation.KyuubiOperationManager @@ -34,7 +33,6 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { val operationManager = new KyuubiOperationManager() override def initialize(conf: KyuubiConf): Unit = { - ServiceDiscovery.setUpZooKeeperAuth(conf) super.initialize(conf) } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala index 5969c5738..53a46d514 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala @@ -24,7 +24,7 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime import org.apache.kyuubi.{KyuubiFunSuite, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.ha.HighAvailabilityConf -import org.apache.kyuubi.ha.client.ServiceDiscovery +import org.apache.kyuubi.ha.client.ZooKeeperClientProvider import org.apache.kyuubi.session.SessionHandle import org.apache.kyuubi.util.NamedThreadFactory import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf} @@ -106,7 +106,7 @@ class EngineRefSuite extends KyuubiFunSuite { val r1 = new Runnable { override def run(): Unit = { - ServiceDiscovery.withZkClient(conf) { client => + ZooKeeperClientProvider.withZkClient(conf) { client => val hp = engine.getOrCreate(client) port1 = hp._2 } @@ -115,7 +115,7 @@ class EngineRefSuite extends KyuubiFunSuite { val r2 = new Runnable { override def run(): Unit = { - ServiceDiscovery.withZkClient(conf) { client => + ZooKeeperClientProvider.withZkClient(conf) { client => val hp = engine.getOrCreate(client) port2 = hp._2 }