Introduce ServiceDiscovery

This commit is contained in:
Kent Yao 2020-06-17 16:14:20 +08:00
parent 5100d8a519
commit e1d55f8351
17 changed files with 764 additions and 34 deletions

View File

@ -42,17 +42,81 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<!-- ut -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.directory.server</groupId>
<artifactId>apacheds-service</artifactId>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
<resource>
<!-- Include the properties file to provide the build information. -->
<directory>${project.build.directory}/extra-resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>build-info</id>
<phase>generate-resources</phase>
<configuration>
<target>
<exec executable="bash">
<arg value="${project.basedir}/../build/kyuubi-build-info"/>
<arg value="${project.build.directory}/extra-resources"/>
<arg value="${project.version}"/>
<arg value="${spark.version}"/>
</exec>
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
<execution>
<id>build-info-test</id>
<phase>generate-test-resources</phase>
<configuration>
<target>
<exec executable="bash">
<arg value="${project.basedir}/../build/kyuubi-build-info"/>
<arg value="${project.build.testOutputDirectory}"/>
<arg value="${project.version}"/>
<arg value="${spark.version}"/>
</exec>
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>

View File

@ -20,5 +20,5 @@ package org.apache.kyuubi
class KyuubiException(message: String, cause: Throwable) extends Exception(message, cause) {
def this(message: String) = this(message, null)
def this(cause: Throwable) = this(cause.getMessage, cause)
}

View File

@ -41,6 +41,11 @@ case class KyuubiConf(loadSysDefault: Boolean = true) {
this
}
def set[T](entry: OptionalConfigEntry[T], value: T): KyuubiConf = {
set(entry.key, entry.rawStrConverter(value))
this
}
def set(key: String, value: String): KyuubiConf = {
require(key != null)
require(value != null)
@ -52,6 +57,15 @@ case class KyuubiConf(loadSysDefault: Boolean = true) {
config.readFrom(reader)
}
/** unset a parameter from the configuration */
def unset(key: String): KyuubiConf = {
settings.remove(key)
this
}
def unset(entry: ConfigEntry[_]): KyuubiConf = {
unset(entry.key)
}
}
object KyuubiConf {
@ -64,24 +78,66 @@ object KyuubiConf {
def buildConf(key: String): ConfigBuilder = ConfigBuilder(KYUUBI_PREFIX + key)
val EMBEDDED_ZK_PORT: ConfigEntry[Int] =
buildConf("embedded.zk.port")
.doc("The port of the embedded zookeeper server")
.version("1.0.0")
.intConf.checkValue(_ >= 0, s"The value of $EMBEDDED_ZK_PORT must be >= 0")
.createWithDefault(2181)
val EMBEDDED_ZK_PORT: ConfigEntry[Int] = buildConf("embedded.zookeeper.port")
.doc("The port of the embedded zookeeper server")
.version("1.0.0")
.intConf
.createWithDefault(2181)
val EMBEDDED_ZK_TEMP_DIR: ConfigEntry[String] =
buildConf("embedded.zk.directory")
val EMBEDDED_ZK_TEMP_DIR: ConfigEntry[String] = buildConf("embedded.zookeeper.directory")
.doc("The temporary directory for the embedded zookeeper server")
.version("1.0.0")
.stringConf
.createWithDefault(Utils.resolveURI("embedded_zookeeper").getRawPath)
val HA_ZK_QUORUM: OptionalConfigEntry[Seq[String]] =
buildConf("ha.zk.quorum")
.version("1.0.0")
.stringConf
.toSequence
.createOptional
val HA_ZK_QUORUM: ConfigEntry[String] = buildConf("ha.zookeeper.quorum")
.doc("The connection string for the zookeeper ensemble")
.version("1.0.0")
.stringConf
.createWithDefault("")
val HA_ZK_NAMESPACE: ConfigEntry[String] = buildConf("ha.zookeeper.namespace")
.doc("The connection string for the zookeeper ensemble")
.version("1.0.0")
.stringConf
.createWithDefault("")
val HA_ZK_CONNECTION_MAX_RETRIES: ConfigEntry[Int] =
buildConf("ha.zookeeper.connection.max.retries")
.doc("Max retry times for connecting to the zookeeper ensemble")
.version("1.0.0")
.intConf
.createWithDefault(3)
val HA_ZK_CONNECTION_RETRY_WAIT: ConfigEntry[Int] =
buildConf("ha.zookeeper.connection.retry.wait")
.doc("Initial amount of time to wait between retries to the zookeeper ensemble")
.version("1.0.0")
.intConf
.createWithDefault(1000)
val HA_ZK_CONNECTION_TIMEOUT: ConfigEntry[Int] = buildConf("ha.zookeeper.connection.timeout")
.doc("The timeout(ms) of creating the connection to the zookeeper ensemble")
.version("1.0.0")
.intConf
.createWithDefault(60 * 1000)
val HA_ZK_SESSION_TIMEOUT: ConfigEntry[Int] = buildConf("ha.zookeeper.session.timeout")
.doc("The timeout(ms) of a connected session to be idled")
.version("1.0.0")
.intConf
.createWithDefault(60 * 1000)
val SERVER_PRINCIPAL: OptionalConfigEntry[String] = buildConf("server.principal")
.doc("")
.version("1.0.0")
.stringConf
.createOptional
val SERVER_KEYTAB: OptionalConfigEntry[String] = buildConf("server.keytab")
.doc("")
.version("1.0.0")
.stringConf
.createOptional
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache
import java.util.Properties
import scala.util.Try
package object kyuubi {
private object BuildInfo {
private val buildFile = "kyuubi-version-info.properties"
private val buildFileStream =
Thread.currentThread().getContextClassLoader.getResourceAsStream(buildFile)
private val unknown = "<unknown>"
private val props = new Properties()
try {
props.load(buildFileStream)
} catch {
case e: Exception => throw new KyuubiException(e)
} finally {
Try(buildFileStream.close())
}
val version: String = props.getProperty("kyuubi_version", unknown)
val sparkVersion: String = props.getProperty("spark_version", unknown)
val branch: String = props.getProperty("branch", unknown)
val jar: String = props.getProperty("jar", unknown)
val revision: String = props.getProperty("revision", unknown)
val user: String = props.getProperty("user", unknown)
val repoUrl: String = props.getProperty("url", unknown)
val buildDate: String = props.getProperty("date", unknown)
}
val KYUUBI_VERSION: String = BuildInfo.version
val SPARK_COMPILE_VERSION: String = BuildInfo.sparkVersion
val BRANCH: String = BuildInfo.branch
val KYUUBI_JAR_NAME: String = BuildInfo.jar
val REVISION: String = BuildInfo.revision
val BUILD_USER: String = BuildInfo.user
val REPO_URL: String = BuildInfo.repoUrl
val BUILD_DATE: String = BuildInfo.buildDate
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi
import java.io.{File, IOException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.minikdc.MiniKdc
import org.apache.hadoop.security.UserGroupInformation
trait KerberizedTestHelper {
var kdc: MiniKdc = _
val baseDir: File = Utils.createTempDir(
this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath, "kyuubi-kdc")
try {
val kdcConf = MiniKdc.createConf()
kdcConf.setProperty(MiniKdc.INSTANCE, "KyuubiKrbServer")
kdcConf.setProperty(MiniKdc.ORG_NAME, "KYUUBI")
kdcConf.setProperty(MiniKdc.ORG_DOMAIN, "COM")
if (kdc == null) {
kdc = new MiniKdc(kdcConf, baseDir)
kdc.start()
}
} catch {
case e: IOException =>
throw new AssertionError("unable to create temporary directory: " + e.getMessage)
}
def tryWithSecurityEnabled(block: => Unit): Unit = {
val conf = new Configuration()
assert(!UserGroupInformation.isSecurityEnabled)
val authType = "hadoop.security.authentication"
try {
conf.set(authType, "KERBEROS")
System.setProperty("java.security.krb5.realm", kdc.getRealm)
UserGroupInformation.setConfiguration(conf)
assert(UserGroupInformation.isSecurityEnabled)
block
} finally {
conf.unset(authType)
System.clearProperty("java.security.krb5.realm")
UserGroupInformation.setConfiguration(conf)
assert(!UserGroupInformation.isSecurityEnabled)
}
}
}

View File

@ -27,6 +27,6 @@ class KyuubiConfSuite extends KyuubiFunSuite {
val conf = new KyuubiConf()
assert(conf.get(EMBEDDED_ZK_PORT) === 2181)
assert(conf.get(EMBEDDED_ZK_TEMP_DIR).endsWith("embedded_zookeeper"))
assert(conf.get(HA_ZK_QUORUM) === None)
assert(conf.get(HA_ZK_QUORUM).isEmpty)
}
}

View File

@ -47,6 +47,13 @@
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<!-- ut -->
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common</artifactId>
@ -54,6 +61,21 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.directory.server</groupId>
<artifactId>apacheds-service</artifactId>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>

View File

@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.ha.client
import java.io.{File, IOException}
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import javax.security.auth.login.Configuration
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation}
import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.JaasConfiguration
import org.apache.zookeeper.{KeeperException, WatchedEvent, Watcher}
import org.apache.zookeeper.CreateMode.PERSISTENT
import org.apache.zookeeper.KeeperException.NodeExistsException
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.service.AbstractService
/**
* A service for service discovery
*
* @param name the name of the service itself
* @param instance the instance uri a service that used to publish itself
* @param namespace a pre-defined namespace used to publish the instance of the associate service
*/
class ServiceDiscovery private (
name: String,
instance: String,
namespace: String,
postHook: Thread) extends AbstractService(name) {
postHook.setDaemon(true)
def this(instance: String, namespace: String, postHook: Thread) =
this(classOf[ServiceDiscovery].getSimpleName, instance, namespace, postHook)
private var zkClient: CuratorFramework = _
private var serviceNode: PersistentEphemeralNode = _
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
ServiceDiscovery.setUpZooKeeperAuth(conf)
zkClient = ServiceDiscovery.newZookeeperClient(conf)
try {
zkClient
.create()
.creatingParentsIfNeeded()
.withMode(PERSISTENT)
.forPath(s"/$namespace")
} catch {
case _: NodeExistsException => // do nothing
case e: KeeperException => throw e
}
super.initialize(conf)
}
override def start(): Unit = {
val pathPrefix = s"/$namespace/serviceUri=$instance;version=$KYUUBI_VERSION;sequence="
try {
serviceNode = new PersistentEphemeralNode(
zkClient,
PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL,
pathPrefix,
instance.getBytes(StandardCharsets.UTF_8))
serviceNode.start()
val znodeTimeout = 120
if (!serviceNode.waitForInitialCreate(znodeTimeout, TimeUnit.SECONDS)) {
throw new KyuubiException(s"Max znode creation wait time $znodeTimeout s exhausted")
}
// Set a watch on the serviceNode
val watcher = new DeRegisterWatcher
if (zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath) == null) {
// No node exists, throw exception
throw new KyuubiException("Unable to create znode for this Kyuubi instance on ZooKeeper.")
}
info("Created a serviceNode on ZooKeeper for KyuubiServer uri: " + instance)
} catch {
case e: Exception =>
if (serviceNode != null) {
serviceNode.close()
}
throw new KyuubiException(
s"Unable to create a znode for this server instance: $instance", e)
}
super.start()
}
override def stop(): Unit = {
if (serviceNode != null) {
try {
serviceNode.close()
} catch {
case e: IOException =>
error("Failed to close the persistent ephemeral znode" + serviceNode.getActualPath, e)
}
}
if (zkClient != null) zkClient.close()
super.stop()
}
class DeRegisterWatcher extends Watcher {
override def process(event: WatchedEvent): Unit = {
if (event.getType == Watcher.Event.EventType.NodeDeleted) {
if (serviceNode != null) {
try {
serviceNode.close()
warn(s"This Kyuubi instance ${instance} is now de-registered from ZooKeeper. " +
"The server will be shut down after the last client session completes.")
} catch {
case e: IOException =>
error(s"Failed to close the persistent ephemeral znode: ${serviceNode.getActualPath}",
e)
} finally {
postHook.start()
ServiceDiscovery.this.stop()
}
}
}
}
}
}
object ServiceDiscovery {
import org.apache.kyuubi.config.KyuubiConf._
private final val DEFAULT_ACL_PROVIDER = ZooKeeperACLProvider()
/**
* Create a [[CuratorFramework]] instance to be used as the ZooKeeper client
* Use the [[ZooKeeperACLProvider]] to create appropriate ACLs
*/
def newZookeeperClient(conf: KyuubiConf): CuratorFramework = {
val connectionStr = conf.get(HA_ZK_QUORUM)
val sessionTimeout = conf.get(HA_ZK_SESSION_TIMEOUT)
val connectionTimeout = conf.get(HA_ZK_CONNECTION_TIMEOUT)
val baseSleepTime = conf.get(HA_ZK_CONNECTION_RETRY_WAIT)
val maxRetries = conf.get(HA_ZK_CONNECTION_MAX_RETRIES)
val retryPolicy = new ExponentialBackoffRetry(baseSleepTime, maxRetries)
val client = CuratorFrameworkFactory.builder()
.connectString(connectionStr)
.sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(connectionTimeout)
.aclProvider(DEFAULT_ACL_PROVIDER)
.retryPolicy(retryPolicy)
.build()
client.start()
client
}
/**
* For a kerberized cluster, we dynamically set up the client's JAAS conf.
*
* @param conf SparkConf
* @return
*/
@throws[Exception]
def setUpZooKeeperAuth(conf: KyuubiConf): Unit = {
val keyTabFile = conf.get(SERVER_KEYTAB)
val maybePrincipal = conf.get(SERVER_PRINCIPAL)
val kerberized = maybePrincipal.isDefined && keyTabFile.isDefined
if (UserGroupInformation.isSecurityEnabled && kerberized) {
if (!new File(keyTabFile.get).exists()) {
throw new IOException(s"${SERVER_KEYTAB.key} does not exists")
}
System.setProperty("zookeeper.sasl.clientconfig", "KyuubiZooKeeperClient")
var principal = maybePrincipal.get
principal = SecurityUtil.getServerPrincipal(principal, "0.0.0.0")
val jaasConf = new JaasConfiguration("KyuubiZooKeeperClient", principal, keyTabFile.get)
Configuration.setConfiguration(jaasConf)
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.ha.client
import org.apache.curator.framework.api.ACLProvider
import org.apache.hadoop.security.UserGroupInformation
import org.apache.zookeeper.ZooDefs
import org.apache.zookeeper.data.ACL
case class ZooKeeperACLProvider() extends ACLProvider {
/**
* Return the ACL list to use by default.
*
* @return default ACL list
*/
override def getDefaultAcl: java.util.List[ACL] = {
val nodeAcls = new java.util.ArrayList[ACL]
if (UserGroupInformation.isSecurityEnabled) {
// Read all to the world
nodeAcls.addAll(ZooDefs.Ids.READ_ACL_UNSAFE)
// Create/Delete/Write/Admin to the authenticated user
nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL)
} else {
// ACLs for znodes on a non-kerberized cluster
// Create/Read/Delete/Write/Admin to the world
nodeAcls.addAll(ZooDefs.Ids.OPEN_ACL_UNSAFE)
}
nodeAcls
}
override def getAclForPath(path: String): java.util.List[ACL] = getDefaultAcl
}

View File

@ -40,7 +40,10 @@ class EmbeddedZkServer private(name: String) extends AbstractService(name) {
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
val port = conf.get(KyuubiConf.EMBEDDED_ZK_PORT)
var port = conf.get(KyuubiConf.EMBEDDED_ZK_PORT)
// When the client port is 0, the TestingServer will not randomly pick free local port to use
// So adjust it to -1 to achieve what is common cognition.
if (port == 0) port = -1
val dataDir = conf.get(KyuubiConf.EMBEDDED_ZK_TEMP_DIR)
server = new TestingServer(port, new File(dataDir), false)
super.initialize(conf)

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.ha.client
import java.io.{File, IOException}
import javax.security.auth.login.Configuration
import scala.collection.JavaConverters._
import org.apache.kyuubi.{KerberizedTestHelper, KyuubiFunSuite}
import org.apache.kyuubi.KYUUBI_VERSION
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.ha.server.EmbeddedZkServer
import org.apache.kyuubi.service.ServiceState
class ServiceDiscoverySuite extends KyuubiFunSuite with KerberizedTestHelper {
val zkServer = new EmbeddedZkServer()
val conf: KyuubiConf = KyuubiConf()
override def beforeAll(): Unit = {
conf.set(EMBEDDED_ZK_PORT, 0)
zkServer.initialize(conf)
zkServer.start()
super.beforeAll()
}
override def afterAll(): Unit = {
conf.unset(SERVER_KEYTAB)
conf.unset(SERVER_PRINCIPAL)
conf.unset(HA_ZK_QUORUM)
zkServer.stop()
super.afterAll()
}
test("set up zookeeper auth") {
tryWithSecurityEnabled {
val keytab = File.createTempFile("kentyao", ".keytab")
val principal = "kentyao/_HOST@apache.org"
conf.set(SERVER_KEYTAB, keytab.getCanonicalPath)
conf.set(SERVER_PRINCIPAL, principal)
ServiceDiscovery.setUpZooKeeperAuth(conf)
val configuration = Configuration.getConfiguration
val entries = configuration.getAppConfigurationEntry("KyuubiZooKeeperClient")
assert(entries.head.getLoginModuleName === "com.sun.security.auth.module.Krb5LoginModule")
val options = entries.head.getOptions.asScala.toMap
assert(options("principal") === "kentyao/localhost@apache.org")
assert(options("useKeyTab").toString.toBoolean)
conf.set(SERVER_KEYTAB, keytab.getName)
val e = intercept[IOException](ServiceDiscovery.setUpZooKeeperAuth(conf))
assert(e.getMessage === s"${SERVER_KEYTAB.key} does not exists")
}
}
test("publish instance to embedded zookeeper server") {
conf
.unset(SERVER_KEYTAB)
.unset(SERVER_PRINCIPAL)
.set(HA_ZK_QUORUM, zkServer.getConnectString)
val namespace = "kyuubiserver"
val znodeRoot = s"/$namespace"
val instance = "kentyao.apache.org:10009"
var deleted = false
val postHook = new Thread {
override def run(): Unit = deleted = true
}
val serviceDiscovery = new ServiceDiscovery(instance, namespace, postHook)
val framework = ServiceDiscovery.newZookeeperClient(conf)
try {
serviceDiscovery.initialize(conf)
serviceDiscovery.start()
assert(framework.checkExists().forPath("/abc") === null)
assert(framework.checkExists().forPath(znodeRoot) !== null)
val children = framework.getChildren.forPath(znodeRoot).asScala
assert(children.head ===
s"serviceUri=$instance;version=$KYUUBI_VERSION;sequence=0000000000")
children.foreach { child =>
framework.delete().forPath(s"""$znodeRoot/$child""")
}
Thread.sleep(5000)
assert(deleted, "Post hook called")
assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
} finally {
serviceDiscovery.stop()
framework.close()
}
}
}

View File

@ -27,17 +27,17 @@ import org.apache.curator.framework.api.ACLProvider
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation}
import org.apache.kyuubi.Logging
import org.apache.spark.{KyuubiSparkUtil, SparkConf}
import org.apache.spark.KyuubiConf._
import org.apache.zookeeper.{KeeperException, WatchedEvent, Watcher}
import org.apache.zookeeper.CreateMode.PERSISTENT
import org.apache.zookeeper.KeeperException.NodeExistsException
import yaooqinn.kyuubi.KYUUBI_VERSION
import yaooqinn.kyuubi.server.{FrontendService, KyuubiServer}
import yaooqinn.kyuubi.service.{AbstractService, ServiceException}
import org.apache.kyuubi.Logging
/**
* An abstract class provides [[KyuubiServer]] with high availability functions using Zookeeper
* dynamic service discovery.
@ -46,7 +46,7 @@ import yaooqinn.kyuubi.service.{AbstractService, ServiceException}
private[kyuubi] abstract class HighAvailableService(name: String, server: KyuubiServer)
extends AbstractService(name) with Logging {
import HighAvailableService._
import yaooqinn.kyuubi.ha.HighAvailableService._
protected final var zkClient: CuratorFramework = _
protected final var serviceRootNamespace: String = _

View File

@ -0,0 +1,36 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the file target/unit-tests.log
log4j.rootLogger=OFF, CA, FA
#Console Appender
log4j.appender.CA=org.apache.log4j.ConsoleAppender
log4j.appender.CA.layout=org.apache.log4j.PatternLayout
log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
log4j.appender.CA.Threshold = WARN
#File Appender
log4j.appender.FA=org.apache.log4j.FileAppender
log4j.appender.FA.append=false
log4j.appender.FA.file=target/unit-tests.log
log4j.appender.FA.layout=org.apache.log4j.PatternLayout
log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{2}: %m%n
# Set the logger level of File Appender to WARN
log4j.appender.FA.Threshold = DEBUG

View File

@ -42,6 +42,12 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-ha</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>

View File

@ -46,7 +46,8 @@ public class KyuubiThriftBinaryCliService extends ThriftCLIService {
}
@Override
public void run() {
public synchronized void init(HiveConf hiveConf) {
super.init(hiveConf);
// a hook stop the app when oom occurs
Runnable hook = new Runnable() {
@Override
@ -54,7 +55,6 @@ public class KyuubiThriftBinaryCliService extends ThriftCLIService {
stop();
}
};
try {
ExecutorPoolCaptureOom executorService = new ExecutorPoolCaptureOom(
"threadPoolName",
@ -102,6 +102,17 @@ public class KyuubiThriftBinaryCliService extends ThriftCLIService {
String msg = "Starting " + getName() + " on port "
+ portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
LOG.info(msg);
} catch (Throwable t) {
LOG.error("Error starting" + getName(), t);
stop();
}
}
protected void initializeServer() {}
@Override
public void run() {
try {
server.serve();
} catch (Throwable t) {
LOG.error("Error starting" + getName(), t);

View File

@ -19,12 +19,16 @@ package org.apache.kyuubi.spark
import scala.collection.JavaConverters._
import org.apache.hive.service.Service
import org.apache.hive.service.cli.CLIService
import org.apache.hive.service.cli.thrift.ThriftCLIService
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.client.ServiceDiscovery
object SparkSQLEngineApp {
def main(args: Array[String]): Unit = {
@ -40,7 +44,11 @@ object SparkSQLEngineApp {
var thriftCLIService: ThriftCLIService = null
var cliService: CLIService = null
server.getServices.asScala.foreach {
case t: ThriftCLIService if t.getPortNumber > 0 =>
case t: ThriftCLIService =>
if (t.getPortNumber == 0) {
// Some Spark Version
Thread.sleep(3000)
}
thriftCLIService = t
case c: CLIService => cliService = c
case _ =>
@ -48,15 +56,39 @@ object SparkSQLEngineApp {
if (thriftCLIService.getPortNumber <= 0) {
thriftCLIService.stop()
try {
thriftCLIService = new KyuubiThriftBinaryCliService(cliService)
thriftCLIService.init(server.getHiveConf)
thriftCLIService.start()
}
thriftCLIService = new KyuubiThriftBinaryCliService(cliService)
thriftCLIService.init(server.getHiveConf)
thriftCLIService.start()
}
val port = thriftCLIService.getPortNumber
val hostName = thriftCLIService.getServerIPAddress.getHostName
if (thriftCLIService == null || thriftCLIService.getServiceState != Service.STATE.STARTED) {
server.stop()
session.stop()
} else {
val port = thriftCLIService.getPortNumber
val hostName = thriftCLIService.getServerIPAddress.getHostName
val instance = s"$hostName:$port"
val kyuubiConf = KyuubiConf()
conf.getAllWithPrefix("spark.kyuubi.").foreach { case (k, v) =>
kyuubiConf.set(k.substring(6), v)
}
val postHook = new Thread {
override def run(): Unit = {
while (cliService.getSessionManager.getOpenSessionCount > 0) {
Thread.sleep(60 * 1000)
}
server.stop()
}
}
val namespace =
kyuubiConf.get(KyuubiConf.HA_ZK_NAMESPACE) + "-" + session.sparkContext.sparkUser
val serviceDiscovery = new ServiceDiscovery(instance, namespace, postHook)
serviceDiscovery.initialize(kyuubiConf)
serviceDiscovery.start()
}
}
}

31
pom.xml
View File

@ -217,6 +217,13 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>${hadoop.deps.scope}</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
@ -402,8 +409,6 @@
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<pluginManagement>
<plugins>
<plugin>
@ -431,7 +436,7 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.1</version>
<version>4.3.0</version>
<executions>
<execution>
<id>eclipse-add-source</id>
@ -455,7 +460,6 @@
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<recompileMode>incremental</recompileMode>
<useZincServer>true</useZincServer>
<args>
<arg>-unchecked</arg>
<arg>-deprecation</arg>
@ -675,6 +679,25 @@
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>create-tmp-dir</id>
<phase>generate-test-resources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<mkdir dir="${project.build.directory}/tmp" />
</target>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>