Support EmbeddedZkServer

This commit is contained in:
Kent Yao 2020-06-12 17:18:46 +08:00
parent 15d179507f
commit 5100d8a519
20 changed files with 447 additions and 21 deletions

1
.gitignore vendored
View File

@ -42,3 +42,4 @@ ldap
kyuubi-server/kyuubi-kdc/
metrics/report.json
metrics/.report.json.crc
/kyuubi-ha/embedded_zookeeper/

View File

@ -24,7 +24,7 @@
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>0.8.0-SNAPSHOT</version>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@ -36,7 +36,7 @@
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common</artifactId>
<version>0.8.0-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>

View File

@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi</artifactId>
<version>0.8.0-SNAPSHOT</version>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -18,10 +18,12 @@
package org.apache.kyuubi
import java.io.{File, InputStreamReader, IOException}
import java.net.{URI, URISyntaxException}
import java.nio.charset.StandardCharsets
import java.util.Properties
import java.util.{Properties, UUID}
import scala.collection.JavaConverters._
import scala.util.{Success, Try}
private[kyuubi] object Utils extends Logging {
@ -61,4 +63,61 @@ private[kyuubi] object Utils extends Logging {
}
}.getOrElse(Map.empty)
}
/**
* Return a well-formed URI for the file described by a user input string.
*
* If the supplied path does not contain a scheme, or is a relative path, it will be
* converted into an absolute path with a file:// scheme.
*/
def resolveURI(path: String): URI = {
try {
val uri = new URI(path)
if (uri.getScheme != null) {
return uri
}
// make sure to handle if the path has a fragment (applies to yarn
// distributed cache)
if (uri.getFragment != null) {
val absoluteURI = new File(uri.getPath).getAbsoluteFile.toURI
return new URI(absoluteURI.getScheme, absoluteURI.getHost, absoluteURI.getPath,
uri.getFragment)
}
} catch {
case _: URISyntaxException =>
}
new File(path).getAbsoluteFile.toURI
}
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
/**
* Create a directory inside the given parent directory. The directory is guaranteed to be
* newly created, and is not marked for automatic deletion.
*/
def createDirectory(root: String, namePrefix: String = "kyuubi"): File = {
(0 until MAX_DIR_CREATION_ATTEMPTS).foreach { _ =>
val dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString)
Try { dir.mkdirs() } match {
case Success(_) => return dir
case _ =>
}
}
throw new IOException("Failed to create a temp directory (under " + root + ") after " +
MAX_DIR_CREATION_ATTEMPTS + " attempts!")
}
/**
* Create a temporary directory inside the given parent directory. The directory will be
* automatically deleted when the VM shuts down.
*/
def createTempDir(
root: String = System.getProperty("java.io.tmpdir"),
namePrefix: String = "kyuubi"): File = {
val dir = createDirectory(root, namePrefix)
dir.deleteOnExit()
dir
}
}

View File

@ -23,6 +23,7 @@ import org.apache.kyuubi.Utils
case class KyuubiConf(loadSysDefault: Boolean = true) {
private val settings = new ConcurrentHashMap[String, String]()
private lazy val reader: ConfigProvider = new ConfigProvider(settings)
if (loadSysDefault) {
loadSysProps()
@ -35,6 +36,11 @@ case class KyuubiConf(loadSysDefault: Boolean = true) {
this
}
def set[T](entry: ConfigEntry[T], value: T): KyuubiConf = {
settings.put(entry.key, entry.strConverter(value))
this
}
def set(key: String, value: String): KyuubiConf = {
require(key != null)
require(value != null)
@ -42,6 +48,10 @@ case class KyuubiConf(loadSysDefault: Boolean = true) {
this
}
def get[T](config: ConfigEntry[T]): T = {
config.readFrom(reader)
}
}
object KyuubiConf {
@ -51,4 +61,27 @@ object KyuubiConf {
/** the default file that contains kyuubi properties */
final val KYUUBI_CONF_FILE_NAME = "kyuubi-defaults.conf"
final val KYUUBI_HOME = "KYUUBI_HOME"
def buildConf(key: String): ConfigBuilder = ConfigBuilder(KYUUBI_PREFIX + key)
val EMBEDDED_ZK_PORT: ConfigEntry[Int] =
buildConf("embedded.zk.port")
.doc("The port of the embedded zookeeper server")
.version("1.0.0")
.intConf.checkValue(_ >= 0, s"The value of $EMBEDDED_ZK_PORT must be >= 0")
.createWithDefault(2181)
val EMBEDDED_ZK_TEMP_DIR: ConfigEntry[String] =
buildConf("embedded.zk.directory")
.doc("The temporary directory for the embedded zookeeper server")
.version("1.0.0")
.stringConf
.createWithDefault(Utils.resolveURI("embedded_zookeeper").getRawPath)
val HA_ZK_QUORUM: OptionalConfigEntry[Seq[String]] =
buildConf("ha.zk.quorum")
.version("1.0.0")
.stringConf
.toSequence
.createOptional
}

View File

@ -22,20 +22,20 @@ import org.apache.kyuubi.config.KyuubiConf
abstract class AbstractService(serviceName: String) extends Service with Logging {
import ServiceState._
private var conf: KyuubiConf = _
private var state: ServiceState = NEW
private var startTime: Long = _
protected var conf: KyuubiConf = _
protected var state: ServiceState = LATENT
protected var startTime: Long = _
/**
* Initialize the service.
*
* The transition must be from [[NEW]]to [[INITIALIZED]] unless the
* The transition must be from [[LATENT]]to [[INITIALIZED]] unless the
* operation failed and an exception was raised.
*
* @param conf the configuration of the service
*/
override def initialize(conf: KyuubiConf): Unit = {
ensureCurrentState(NEW)
ensureCurrentState(LATENT)
this.conf = conf
changeState(INITIALIZED)
info(s"Service[$serviceName] is initialized.")
@ -62,7 +62,7 @@ abstract class AbstractService(serviceName: String) extends Service with Logging
*/
override def stop(): Unit = {
state match {
case NEW | INITIALIZED | STOPPED =>
case LATENT | INITIALIZED | STOPPED =>
case _ =>
ensureCurrentState(STARTED)
changeState(STOPPED)

View File

@ -24,7 +24,7 @@ trait Service {
/**
* Initialize the service.
*
* The transition must be from [[NEW]]to [[INITIALIZED]] unless the
* The transition must be from [[LATENT]]to [[INITIALIZED]] unless the
* operation failed and an exception was raised.
*
* @param conf the configuration of the service

View File

@ -24,7 +24,7 @@ private[kyuubi] object ServiceState extends Enumeration {
type ServiceState = Value
val
/** Constructed but not initialized */ NEW,
/** Constructed but not initialized */ LATENT,
/** Initialized but not started or stopped */ INITIALIZED,
/** Started and not stopped */ STARTED,
/** Stopped. No further state transitions are permitted */ STOPPED = Value

View File

@ -0,0 +1,36 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the file target/unit-tests.log
log4j.rootLogger=DEBUG, CA, FA
#Console Appender
log4j.appender.CA=org.apache.log4j.ConsoleAppender
log4j.appender.CA.layout=org.apache.log4j.PatternLayout
log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
log4j.appender.CA.Threshold = WARN
#File Appender
log4j.appender.FA=org.apache.log4j.FileAppender
log4j.appender.FA.append=false
log4j.appender.FA.file=target/unit-tests.log
log4j.appender.FA.layout=org.apache.log4j.PatternLayout
log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{2}: %m%n
# Set the logger level of File Appender to WARN
log4j.appender.FA.Threshold = DEBUG

View File

@ -49,4 +49,25 @@ class UtilsSuite extends KyuubiFunSuite {
assert(props("kyuubi.yes") === "yes")
assert(!props.contains("kyuubi.no"))
}
test("resolveURI") {
def assertResolves(before: String, after: String): Unit = {
// This should test only single paths
assert(before.split(",").length === 1)
def resolve(uri: String): String = Utils.resolveURI(uri).toString
assert(resolve(before) === after)
assert(resolve(after) === after)
// Repeated invocations of resolveURI should yield the same result
assert(resolve(resolve(after)) === after)
assert(resolve(resolve(resolve(after))) === after)
}
assertResolves("hdfs:/root/spark.jar", "hdfs:/root/spark.jar")
assertResolves("hdfs:///root/spark.jar#app.jar", "hdfs:///root/spark.jar#app.jar")
assertResolves("file:/C:/path/to/file.txt", "file:/C:/path/to/file.txt")
assertResolves("file:///C:/path/to/file.txt", "file:///C:/path/to/file.txt")
assertResolves("file:/C:/file.txt#alias.txt", "file:/C:/file.txt#alias.txt")
assertResolves("file:foo", "file:foo")
assertResolves("file:foo:baby", "file:foo:baby")
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.config
import org.apache.kyuubi.KyuubiFunSuite
class KyuubiConfSuite extends KyuubiFunSuite {
import KyuubiConf._
test("kyuubi conf defaults") {
val conf = new KyuubiConf()
assert(conf.get(EMBEDDED_ZK_PORT) === 2181)
assert(conf.get(EMBEDDED_ZK_TEMP_DIR).endsWith("embedded_zookeeper"))
assert(conf.get(HA_ZK_QUORUM) === None)
}
}

59
kyuubi-ha/pom.xml Normal file
View File

@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kyuubi-ha</artifactId>
<packaging>jar</packaging>
<name>Kyuubi Project High Availability</name>
<dependencies>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.ha.server
import java.io.File
import org.apache.curator.test.TestingServer
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.service.AbstractService
/**
* An embedded zookeeper server for testing and quick try with Kyuubi with external
* zookeeper cluster.
*
* @note Avoid to use this for production purpose
*
* @param name the service name
*/
class EmbeddedZkServer private(name: String) extends AbstractService(name) {
def this() = this(classOf[EmbeddedZkServer].getSimpleName)
private var server: TestingServer = _
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
val port = conf.get(KyuubiConf.EMBEDDED_ZK_PORT)
val dataDir = conf.get(KyuubiConf.EMBEDDED_ZK_TEMP_DIR)
server = new TestingServer(port, new File(dataDir), false)
super.initialize(conf)
}
override def start(): Unit = {
server.start()
super.start()
}
override def stop(): Unit = {
if (server != null) {
server.close()
}
server = null
super.stop()
}
def getConnectString: String = {
if (server == null) {
null
} else {
server.getConnectString
}
}
}

View File

@ -0,0 +1,36 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the file target/unit-tests.log
log4j.rootLogger=DEBUG, CA, FA
#Console Appender
log4j.appender.CA=org.apache.log4j.ConsoleAppender
log4j.appender.CA.layout=org.apache.log4j.PatternLayout
log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
log4j.appender.CA.Threshold = WARN
#File Appender
log4j.appender.FA=org.apache.log4j.FileAppender
log4j.appender.FA.append=false
log4j.appender.FA.file=target/unit-tests.log
log4j.appender.FA.layout=org.apache.log4j.PatternLayout
log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{2}: %m%n
# Set the logger level of File Appender to WARN
log4j.appender.FA.Threshold = DEBUG

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.ha.server
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.imps.CuratorFrameworkState
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.service.ServiceState._
class EmbeddedZkServerSuite extends KyuubiFunSuite {
test("embedded zookeeper server") {
val zkServer = new EmbeddedZkServer()
assert(zkServer.getConf == null)
assert(zkServer.getName === zkServer.getClass.getSimpleName)
assert(zkServer.getServiceState === LATENT)
val conf = KyuubiConf()
zkServer.initialize(conf)
assert(zkServer.getConf === conf)
assert(zkServer.getServiceState === INITIALIZED)
assert(zkServer.getConnectString.endsWith("2181"))
assert(zkServer.getStartTime === 0)
zkServer.start()
assert(zkServer.getServiceState === STARTED)
assert(zkServer.getConnectString.endsWith("2181"))
assert(zkServer.getStartTime !== 0)
zkServer.stop()
assert(zkServer.getServiceState === STOPPED)
}
test("connect test with embedded zookeeper") {
val zkServer = new EmbeddedZkServer()
zkServer.initialize(KyuubiConf())
zkServer.start()
val zkClient = CuratorFrameworkFactory.builder()
.connectString(zkServer.getConnectString)
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build
zkClient.start()
assert(zkClient.getState === CuratorFrameworkState.STARTED)
assert(zkClient.getZookeeperClient.blockUntilConnectedOrTimedOut())
}
}

View File

@ -22,11 +22,10 @@
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>0.8.0-SNAPSHOT</version>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-hive-thrift</artifactId>
<name>Kyuubi Project Hive Thrift IDL</name>
<packaging>jar</packaging>

View File

@ -22,7 +22,7 @@
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>0.8.0-SNAPSHOT</version>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -22,7 +22,7 @@
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>0.8.0-SNAPSHOT</version>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -22,7 +22,7 @@
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>0.8.0-SNAPSHOT</version>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

25
pom.xml
View File

@ -23,7 +23,7 @@
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi</artifactId>
<name>Kyuubi Project Parent</name>
<version>0.8.0-SNAPSHOT</version>
<version>1.0.0-SNAPSHOT</version>
<modules>
<module>kyuubi-common</module>
<module>kyuubi-thrift</module>
@ -31,6 +31,7 @@
<module>kyuubi-assembly</module>
<module>kyuubi-hive-thrift</module>
<module>kyuubi-spark-sql-engine</module>
<module>kyuubi-ha</module>
</modules>
<packaging>pom</packaging>
@ -70,7 +71,7 @@
<jpam.version>1.1</jpam.version>
<jars.target.dir>${project.build.directory}/scala-${scala.binary.version}/jars</jars.target.dir>
<apacheds.version>2.0.0-M15</apacheds.version>
<curator.version>2.6.0</curator.version>
<curator.version>2.7.1</curator.version>
<codahale.metrics.version>3.1.2</codahale.metrics.version>
<commons-lang.version>2.6</commons-lang.version>
<libthrift.version>0.12.0</libthrift.version>
@ -250,6 +251,13 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>${curator.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
@ -407,7 +415,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.version}</version>
<version>${maven.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
@ -489,11 +497,15 @@
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<version>2.0.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>TestSuite.txt</filereports>
<systemProperties>
<log4j.configuration>src/test/resources/log4j.properties</log4j.configuration>
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
</systemProperties>
</configuration>
<executions>
<execution>
@ -637,6 +649,11 @@
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>