[KYUUBI #983] Refactor the basic abstraction of frontend service
… <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #984 from yanghua/KYUUBI-983. Closes #983 db953b8a [yanghua] Rename KyuubiFrontendService to be KyuubiFrontendServices 8c3d7f2a [yanghua] [KYUUBI #983] Refactor the basic abstraction of frontend service Authored-by: yanghua <yanghua1127@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
cd426798bf
commit
4ed5704590
@ -43,7 +43,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
|
||||
private val OOMHook = new Runnable { override def run(): Unit = stop() }
|
||||
private val eventLogging = new EventLoggingService(this)
|
||||
override val backendService = new SparkSQLBackendService(spark)
|
||||
override val frontendService = new ThriftFrontendService(backendService, OOMHook)
|
||||
val frontendService = new ThriftFrontendService(backendService, OOMHook)
|
||||
override val discoveryService: Service = new EngineServiceDiscovery(this)
|
||||
|
||||
override protected def supportsServiceDiscovery: Boolean = {
|
||||
@ -54,6 +54,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
|
||||
val listener = new SparkSQLEngineListener(this)
|
||||
spark.sparkContext.addSparkListener(listener)
|
||||
addService(eventLogging)
|
||||
addService(frontendService)
|
||||
super.initialize(conf)
|
||||
eventLogging.onEvent(engineStatus.copy(state = ServiceState.INITIALIZED.id))
|
||||
}
|
||||
@ -76,6 +77,8 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
|
||||
countDownLatch.countDown()
|
||||
}
|
||||
|
||||
override def connectionUrl: String = frontendService.connectionUrl()
|
||||
|
||||
def engineId: String = {
|
||||
spark.sparkContext.applicationAttemptId.getOrElse(spark.sparkContext.applicationId)
|
||||
}
|
||||
|
||||
@ -21,7 +21,7 @@ package org.apache.kyuubi.service
|
||||
* A basic abstraction for frontend services.
|
||||
*/
|
||||
abstract class AbstractFrontendService(name: String, be: BackendService)
|
||||
extends CompositeService(name) {
|
||||
extends AbstractService(name) {
|
||||
|
||||
def connectionUrl(server: Boolean = false): String
|
||||
|
||||
|
||||
@ -26,16 +26,14 @@ abstract class Serverable(name: String) extends CompositeService(name) {
|
||||
private val started = new AtomicBoolean(false)
|
||||
|
||||
val backendService: AbstractBackendService
|
||||
val frontendService: AbstractFrontendService
|
||||
protected def supportsServiceDiscovery: Boolean
|
||||
val discoveryService: Service
|
||||
|
||||
def connectionUrl: String = frontendService.connectionUrl()
|
||||
def connectionUrl: String
|
||||
|
||||
override def initialize(conf: KyuubiConf): Unit = synchronized {
|
||||
this.conf = conf
|
||||
addService(backendService)
|
||||
addService(frontendService)
|
||||
if (supportsServiceDiscovery) {
|
||||
// Service Discovery depends on the frontend service to be ready
|
||||
addService(discoveryService)
|
||||
|
||||
@ -18,13 +18,19 @@
|
||||
package org.apache.kyuubi.service
|
||||
|
||||
import org.apache.kyuubi.KyuubiException
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
|
||||
class NoopServer extends Serverable("noop") {
|
||||
|
||||
private val OOMHook = new Runnable { override def run(): Unit = stop() }
|
||||
|
||||
override val backendService = new NoopBackendService
|
||||
override val frontendService = new ThriftFrontendService(backendService, OOMHook)
|
||||
val frontendService = new ThriftFrontendService(backendService, OOMHook)
|
||||
|
||||
override def initialize(conf: KyuubiConf): Unit = {
|
||||
addService(frontendService)
|
||||
super.initialize(conf)
|
||||
}
|
||||
|
||||
override def start(): Unit = {
|
||||
super.start()
|
||||
@ -37,7 +43,8 @@ class NoopServer extends Serverable("noop") {
|
||||
throw new KyuubiException("no need to stop me")
|
||||
}
|
||||
|
||||
|
||||
override val discoveryService: Service = backendService
|
||||
override protected val supportsServiceDiscovery: Boolean = false
|
||||
|
||||
override def connectionUrl: String = frontendService.connectionUrl()
|
||||
}
|
||||
|
||||
@ -32,7 +32,7 @@ class PlainSASLHelperSuite extends KyuubiFunSuite {
|
||||
val server = new NoopServer()
|
||||
val conf = KyuubiConf().set(KyuubiConf.FRONTEND_BIND_PORT, 0)
|
||||
server.initialize(conf)
|
||||
val service = server.getServices(1).asInstanceOf[ThriftFrontendService]
|
||||
val service = server.getServices(0).asInstanceOf[ThriftFrontendService]
|
||||
val tProcessorFactory = PlainSASLHelper.getProcessFactory(service)
|
||||
val tSocket = new TSocket("0.0.0.0", 0)
|
||||
|
||||
|
||||
@ -19,30 +19,30 @@ package org.apache.kyuubi.server
|
||||
|
||||
import org.apache.kyuubi.Logging
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.service.{AbstractFrontendService, BackendService, ServiceState, ThriftFrontendService}
|
||||
import org.apache.kyuubi.service.{AbstractFrontendService, BackendService, CompositeService, ServiceState, ThriftFrontendService}
|
||||
|
||||
/**
|
||||
* A kyuubi frontend service is a kind of composite service, which used to
|
||||
* composite multiple frontend services for kyuubi server.
|
||||
*/
|
||||
class KyuubiFrontendService private(name: String, be: BackendService)
|
||||
extends AbstractFrontendService(name, be) with Logging {
|
||||
class KyuubiFrontendServices private(name: String, be: BackendService)
|
||||
extends CompositeService(name) with Logging {
|
||||
|
||||
private val OOMHook = new Runnable { override def run(): Unit = stop() }
|
||||
|
||||
def this(be: BackendService) = {
|
||||
this(classOf[KyuubiFrontendService].getSimpleName, be)
|
||||
this(classOf[KyuubiFrontendServices].getSimpleName, be)
|
||||
}
|
||||
|
||||
override def connectionUrl(server: Boolean): String = {
|
||||
def connectionUrl(server: Boolean): String = {
|
||||
getServiceState match {
|
||||
case s @ ServiceState.LATENT => throw new IllegalStateException(s"Illegal Service State: $s")
|
||||
case _ =>
|
||||
val defaultFEService = getServices(0).asInstanceOf[AbstractFrontendService]
|
||||
if (defaultFEService != null) {
|
||||
if (defaultFEService != null && defaultFEService.isInstanceOf[ThriftFrontendService]) {
|
||||
defaultFEService.connectionUrl(server)
|
||||
} else {
|
||||
throw new IllegalStateException("Can not find frontend services!")
|
||||
throw new IllegalStateException("Can not find thrift frontend services!")
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -82,7 +82,7 @@ class KyuubiServer(name: String) extends Serverable(name) {
|
||||
def this() = this(classOf[KyuubiServer].getSimpleName)
|
||||
|
||||
override val backendService: AbstractBackendService = new KyuubiBackendService()
|
||||
override val frontendService = new KyuubiFrontendService(backendService)
|
||||
val frontendService = new KyuubiFrontendServices(backendService)
|
||||
private val eventLoggingService: EventLoggingService = new EventLoggingService
|
||||
override protected def supportsServiceDiscovery: Boolean = {
|
||||
ServiceDiscovery.supportServiceDiscovery(conf)
|
||||
@ -98,9 +98,14 @@ class KyuubiServer(name: String) extends Serverable(name) {
|
||||
addService(new MetricsSystem)
|
||||
}
|
||||
|
||||
addService(frontendService)
|
||||
|
||||
super.initialize(conf)
|
||||
}
|
||||
|
||||
override protected def stopServer(): Unit = {}
|
||||
|
||||
override def connectionUrl: String = {
|
||||
frontendService.connectionUrl(true)
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user