From 4ed570459050e753f537a7cf329fc7447f12f524 Mon Sep 17 00:00:00 2001 From: yanghua Date: Mon, 30 Aug 2021 09:34:12 +0800 Subject: [PATCH] [KYUUBI #983] Refactor the basic abstraction of frontend service MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit … ### _Why are the changes needed?_ ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #984 from yanghua/KYUUBI-983. Closes #983 db953b8a [yanghua] Rename KyuubiFrontendService to be KyuubiFrontendServices 8c3d7f2a [yanghua] [KYUUBI #983] Refactor the basic abstraction of frontend service Authored-by: yanghua Signed-off-by: Cheng Pan --- .../kyuubi/engine/spark/SparkSQLEngine.scala | 5 ++++- .../kyuubi/service/AbstractFrontendService.scala | 2 +- .../org/apache/kyuubi/service/Serverable.scala | 4 +--- .../org/apache/kyuubi/service/NoopServer.scala | 11 +++++++++-- .../authentication/PlainSASLHelperSuite.scala | 2 +- ...dService.scala => KyuubiFrontendServices.scala} | 14 +++++++------- .../org/apache/kyuubi/server/KyuubiServer.scala | 7 ++++++- 7 files changed, 29 insertions(+), 16 deletions(-) rename kyuubi-server/src/main/scala/org/apache/kyuubi/server/{KyuubiFrontendService.scala => KyuubiFrontendServices.scala} (78%) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index 4126c7e72..bed9f7bc9 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -43,7 +43,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin private val OOMHook = new Runnable { override def run(): Unit = stop() } private val eventLogging = new EventLoggingService(this) override val backendService = new SparkSQLBackendService(spark) - override val frontendService = new ThriftFrontendService(backendService, OOMHook) + val frontendService = new ThriftFrontendService(backendService, OOMHook) override val discoveryService: Service = new EngineServiceDiscovery(this) override protected def supportsServiceDiscovery: Boolean = { @@ -54,6 +54,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin val listener = new SparkSQLEngineListener(this) spark.sparkContext.addSparkListener(listener) addService(eventLogging) + addService(frontendService) super.initialize(conf) eventLogging.onEvent(engineStatus.copy(state = ServiceState.INITIALIZED.id)) } @@ -76,6 +77,8 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin countDownLatch.countDown() } + override def connectionUrl: String = frontendService.connectionUrl() + def engineId: String = { spark.sparkContext.applicationAttemptId.getOrElse(spark.sparkContext.applicationId) } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractFrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractFrontendService.scala index 635ba6787..a65ede7e9 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractFrontendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractFrontendService.scala @@ -21,7 +21,7 @@ package org.apache.kyuubi.service * A basic abstraction for frontend services. */ abstract class AbstractFrontendService(name: String, be: BackendService) - extends CompositeService(name) { + extends AbstractService(name) { def connectionUrl(server: Boolean = false): String diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala index f18994ecb..b36b2f418 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala @@ -26,16 +26,14 @@ abstract class Serverable(name: String) extends CompositeService(name) { private val started = new AtomicBoolean(false) val backendService: AbstractBackendService - val frontendService: AbstractFrontendService protected def supportsServiceDiscovery: Boolean val discoveryService: Service - def connectionUrl: String = frontendService.connectionUrl() + def connectionUrl: String override def initialize(conf: KyuubiConf): Unit = synchronized { this.conf = conf addService(backendService) - addService(frontendService) if (supportsServiceDiscovery) { // Service Discovery depends on the frontend service to be ready addService(discoveryService) diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala index 233e11263..644c8bafb 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala @@ -18,13 +18,19 @@ package org.apache.kyuubi.service import org.apache.kyuubi.KyuubiException +import org.apache.kyuubi.config.KyuubiConf class NoopServer extends Serverable("noop") { private val OOMHook = new Runnable { override def run(): Unit = stop() } override val backendService = new NoopBackendService - override val frontendService = new ThriftFrontendService(backendService, OOMHook) + val frontendService = new ThriftFrontendService(backendService, OOMHook) + + override def initialize(conf: KyuubiConf): Unit = { + addService(frontendService) + super.initialize(conf) + } override def start(): Unit = { super.start() @@ -37,7 +43,8 @@ class NoopServer extends Serverable("noop") { throw new KyuubiException("no need to stop me") } - override val discoveryService: Service = backendService override protected val supportsServiceDiscovery: Boolean = false + + override def connectionUrl: String = frontendService.connectionUrl() } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/PlainSASLHelperSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/PlainSASLHelperSuite.scala index 1b6171519..b57b3165b 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/PlainSASLHelperSuite.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/PlainSASLHelperSuite.scala @@ -32,7 +32,7 @@ class PlainSASLHelperSuite extends KyuubiFunSuite { val server = new NoopServer() val conf = KyuubiConf().set(KyuubiConf.FRONTEND_BIND_PORT, 0) server.initialize(conf) - val service = server.getServices(1).asInstanceOf[ThriftFrontendService] + val service = server.getServices(0).asInstanceOf[ThriftFrontendService] val tProcessorFactory = PlainSASLHelper.getProcessFactory(service) val tSocket = new TSocket("0.0.0.0", 0) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiFrontendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiFrontendServices.scala similarity index 78% rename from kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiFrontendService.scala rename to kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiFrontendServices.scala index f1f34bce1..3b722ed8f 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiFrontendService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiFrontendServices.scala @@ -19,30 +19,30 @@ package org.apache.kyuubi.server import org.apache.kyuubi.Logging import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.service.{AbstractFrontendService, BackendService, ServiceState, ThriftFrontendService} +import org.apache.kyuubi.service.{AbstractFrontendService, BackendService, CompositeService, ServiceState, ThriftFrontendService} /** * A kyuubi frontend service is a kind of composite service, which used to * composite multiple frontend services for kyuubi server. */ -class KyuubiFrontendService private(name: String, be: BackendService) - extends AbstractFrontendService(name, be) with Logging { +class KyuubiFrontendServices private(name: String, be: BackendService) + extends CompositeService(name) with Logging { private val OOMHook = new Runnable { override def run(): Unit = stop() } def this(be: BackendService) = { - this(classOf[KyuubiFrontendService].getSimpleName, be) + this(classOf[KyuubiFrontendServices].getSimpleName, be) } - override def connectionUrl(server: Boolean): String = { + def connectionUrl(server: Boolean): String = { getServiceState match { case s @ ServiceState.LATENT => throw new IllegalStateException(s"Illegal Service State: $s") case _ => val defaultFEService = getServices(0).asInstanceOf[AbstractFrontendService] - if (defaultFEService != null) { + if (defaultFEService != null && defaultFEService.isInstanceOf[ThriftFrontendService]) { defaultFEService.connectionUrl(server) } else { - throw new IllegalStateException("Can not find frontend services!") + throw new IllegalStateException("Can not find thrift frontend services!") } } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala index e955331c0..d97f8273a 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala @@ -82,7 +82,7 @@ class KyuubiServer(name: String) extends Serverable(name) { def this() = this(classOf[KyuubiServer].getSimpleName) override val backendService: AbstractBackendService = new KyuubiBackendService() - override val frontendService = new KyuubiFrontendService(backendService) + val frontendService = new KyuubiFrontendServices(backendService) private val eventLoggingService: EventLoggingService = new EventLoggingService override protected def supportsServiceDiscovery: Boolean = { ServiceDiscovery.supportServiceDiscovery(conf) @@ -98,9 +98,14 @@ class KyuubiServer(name: String) extends Serverable(name) { addService(new MetricsSystem) } + addService(frontendService) + super.initialize(conf) } override protected def stopServer(): Unit = {} + override def connectionUrl: String = { + frontendService.connectionUrl(true) + } }