[KYUUBI #453] Refine ServiceDiscovery in Serverable API
 [](https://github.com/yaooqinn/kyuubi/pull/453)      [<img width="16" alt="Powered by Pull Request Badge" src="https://user-images.githubusercontent.com/1393946/111216524-d2bb8e00-85d4-11eb-821b-ed4c00989c02.png">](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT --> <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> A service discovery should be a default module of Severable API and shall be initialized after `FroundendService` for a resolved serviceUri. it's a reflection only fix to enhance this logic ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request Closes #453 from yaooqinn/refine. 077dca0 [Kent Yao] Refine ServiceDiscovery in Serverable API Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
parent
fc437a41a4
commit
207a7df934
@ -28,7 +28,7 @@ import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
|
||||
import org.apache.kyuubi.ha.HighAvailabilityConf._
|
||||
import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, RetryPolicies, ServiceDiscovery}
|
||||
import org.apache.kyuubi.service.Serverable
|
||||
import org.apache.kyuubi.service.{Serverable, Service}
|
||||
import org.apache.kyuubi.util.SignalRegister
|
||||
|
||||
private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
|
||||
@ -37,16 +37,16 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
|
||||
def this(spark: SparkSession) = this(classOf[SparkSQLEngine].getSimpleName, spark)
|
||||
|
||||
override private[kyuubi] val backendService = new SparkSQLBackendService(spark)
|
||||
private val discoveryService = new EngineServiceDiscovery(this)
|
||||
override protected def supportsServiceDiscovery: Boolean = {
|
||||
ServiceDiscovery.supportServiceDiscovery(conf)
|
||||
}
|
||||
|
||||
override protected val discoveryService: Service = new EngineServiceDiscovery(this)
|
||||
|
||||
override def initialize(conf: KyuubiConf): Unit = {
|
||||
val listener = new SparkSQLEngineListener(this)
|
||||
spark.sparkContext.addSparkListener(listener)
|
||||
super.initialize(conf)
|
||||
if (ServiceDiscovery.supportServiceDiscovery(conf)) {
|
||||
addService(discoveryService)
|
||||
discoveryService.initialize(conf)
|
||||
}
|
||||
}
|
||||
|
||||
override def start(): Unit = {
|
||||
|
||||
@ -28,12 +28,19 @@ abstract class Serverable(name: String) extends CompositeService(name) {
|
||||
|
||||
private[kyuubi] val backendService: AbstractBackendService
|
||||
private lazy val frontendService = new FrontendService(backendService, OOMHook)
|
||||
protected def supportsServiceDiscovery: Boolean
|
||||
protected val discoveryService: Service
|
||||
|
||||
def connectionUrl: String = frontendService.connectionUrl
|
||||
|
||||
override def initialize(conf: KyuubiConf): Unit = synchronized {
|
||||
this.conf = conf
|
||||
addService(backendService)
|
||||
addService(frontendService)
|
||||
if (supportsServiceDiscovery) {
|
||||
// Service Discovery depends on the frontend service to be ready
|
||||
addService(discoveryService)
|
||||
}
|
||||
super.initialize(conf)
|
||||
}
|
||||
|
||||
|
||||
@ -32,4 +32,8 @@ class NoopServer extends Serverable("noop") {
|
||||
override protected def stopServer(): Unit = {
|
||||
throw new KyuubiException("no need to stop me")
|
||||
}
|
||||
|
||||
|
||||
override protected val discoveryService: Service = backendService
|
||||
override protected val supportsServiceDiscovery: Boolean = false
|
||||
}
|
||||
|
||||
@ -80,7 +80,10 @@ class KyuubiServer(name: String) extends Serverable(name) {
|
||||
def this() = this(classOf[KyuubiServer].getSimpleName)
|
||||
|
||||
override private[kyuubi] val backendService: AbstractBackendService = new KyuubiBackendService()
|
||||
private val discoveryService = new KyuubiServiceDiscovery(this)
|
||||
override protected def supportsServiceDiscovery: Boolean = {
|
||||
ServiceDiscovery.supportServiceDiscovery(conf)
|
||||
}
|
||||
override protected val discoveryService = new KyuubiServiceDiscovery(this)
|
||||
|
||||
override def initialize(conf: KyuubiConf): Unit = synchronized {
|
||||
val kinit = new KinitAuxiliaryService()
|
||||
@ -91,11 +94,8 @@ class KyuubiServer(name: String) extends Serverable(name) {
|
||||
}
|
||||
|
||||
super.initialize(conf)
|
||||
if (ServiceDiscovery.supportServiceDiscovery(conf)) {
|
||||
addService(discoveryService)
|
||||
discoveryService.initialize(conf)
|
||||
}
|
||||
}
|
||||
|
||||
override protected def stopServer(): Unit = KyuubiServer.zkServer.stop()
|
||||
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user