ServiceDiscovery for kyuubi servver

This commit is contained in:
Kent Yao 2020-12-28 13:05:35 +08:00 committed by Kent Yao
parent a0f900a629
commit 17fe83778e
No known key found for this signature in database
GPG Key ID: A4F0BE81C89B595B
4 changed files with 28 additions and 14 deletions

View File

@ -37,6 +37,15 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
def this(spark: SparkSession) = this(classOf[SparkSQLEngine].getSimpleName, spark)
override private[kyuubi] val backendService = new SparkSQLBackendService(spark)
private val discoveryService = new ServiceDiscovery(this)
override def initialize(conf: KyuubiConf): Unit = {
super.initialize(conf)
if (ServiceDiscovery.supportServiceDiscovery(conf)) {
addService(discoveryService)
discoveryService.initialize(conf)
}
}
override protected def stopServer(): Unit = {
countDownLatch.countDown()
@ -88,16 +97,6 @@ object SparkSQLEngine extends Logging {
engine
}
def exposeEngine(engine: SparkSQLEngine): Unit = {
val needExpose = kyuubiConf.get(HA_ZK_QUORUM).nonEmpty
if (needExpose) {
val serviceDiscovery = new ServiceDiscovery(engine)
serviceDiscovery.initialize(kyuubiConf)
serviceDiscovery.start()
sys.addShutdownHook(serviceDiscovery.stop())
}
}
def main(args: Array[String]): Unit = {
SignalRegister.registerLogger(logger)
var spark: SparkSession = null
@ -105,7 +104,6 @@ object SparkSQLEngine extends Logging {
try {
spark = createSpark()
engine = startEngine(spark)
exposeEngine(engine)
info(KyuubiSparkUtil.diagnostics(spark))
// blocking main thread
countDownLatch.await()

View File

@ -79,6 +79,12 @@
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>

View File

@ -131,7 +131,7 @@ class ServiceDiscovery private (
throw new KyuubiException(s"Unable to create znode for this Kyuubi instance[$instance]" +
s" on ZooKeeper.")
}
info("Created a serviceNode on ZooKeeper for KyuubiServer uri: " + instance)
info(s"Created a ${serviceNode.getActualPath} on ZooKeeper for KyuubiServer uri: " + instance)
} catch {
case e: Exception =>
if (serviceNode != null) {
@ -245,4 +245,9 @@ object ServiceDiscovery {
}
}
}
def supportServiceDiscovery(conf: KyuubiConf): Boolean = {
val zkEnsemble = conf.get(HA_ZK_QUORUM)
zkEnsemble != null && zkEnsemble.nonEmpty
}
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.ServiceDiscovery
import org.apache.kyuubi.ha.server.EmbeddedZkServer
import org.apache.kyuubi.service.{AbstractBackendService, KinitAuxiliaryService, Serverable}
import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
@ -32,8 +33,7 @@ object KyuubiServer extends Logging {
private val zkServer = new EmbeddedZkServer()
def startServer(conf: KyuubiConf): KyuubiServer = {
val zkEnsemble = conf.get(HA_ZK_QUORUM)
if (zkEnsemble == null || zkEnsemble.isEmpty) {
if (!ServiceDiscovery.supportServiceDiscovery(conf)) {
zkServer.initialize(conf)
zkServer.start()
sys.addShutdownHook(zkServer.stop())
@ -80,11 +80,16 @@ class KyuubiServer(name: String) extends Serverable(name) {
def this() = this(classOf[KyuubiServer].getSimpleName)
override private[kyuubi] val backendService: AbstractBackendService = new KyuubiBackendService()
private val discoveryService = new ServiceDiscovery(this)
override def initialize(conf: KyuubiConf): Unit = {
val kinit = new KinitAuxiliaryService()
addService(kinit)
super.initialize(conf)
if (ServiceDiscovery.supportServiceDiscovery(conf)) {
addService(discoveryService)
discoveryService.initialize(conf)
}
}
override protected def stopServer(): Unit = KyuubiServer.zkServer.stop()