[KYUUBI #979] Introduce AbstractFrontendService and KyuubiFrontendService
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #980 from yanghua/KPIP-1. Closes #979 e67e37a4 [yanghua] [KYUUBI 979] Introduce AbstractFrontendService and KyuubiFrontendService Authored-by: yanghua <yanghua1127@gmail.com> Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
parent
f1ae664910
commit
cb70e062c3
@ -33,15 +33,17 @@ import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
|
||||
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EventLoggingService}
|
||||
import org.apache.kyuubi.ha.HighAvailabilityConf._
|
||||
import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, RetryPolicies, ServiceDiscovery}
|
||||
import org.apache.kyuubi.service.{Serverable, Service, ServiceState}
|
||||
import org.apache.kyuubi.service.{Serverable, Service, ServiceState, ThriftFrontendService}
|
||||
import org.apache.kyuubi.util.SignalRegister
|
||||
|
||||
case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngine") {
|
||||
|
||||
lazy val engineStatus: EngineEvent = EngineEvent(this)
|
||||
|
||||
private val OOMHook = new Runnable { override def run(): Unit = stop() }
|
||||
private val eventLogging = new EventLoggingService(this)
|
||||
override val backendService = new SparkSQLBackendService(spark)
|
||||
override val frontendService = new ThriftFrontendService(backendService, OOMHook)
|
||||
override val discoveryService: Service = new EngineServiceDiscovery(this)
|
||||
|
||||
override protected def supportsServiceDiscovery: Boolean = {
|
||||
|
||||
@ -0,0 +1,28 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.service
|
||||
|
||||
/**
|
||||
* A basic abstraction for frontend services.
|
||||
*/
|
||||
abstract class AbstractFrontendService(name: String, be: BackendService)
|
||||
extends CompositeService(name) {
|
||||
|
||||
def connectionUrl(server: Boolean = false): String
|
||||
|
||||
}
|
||||
@ -23,11 +23,10 @@ import org.apache.kyuubi.config.KyuubiConf
|
||||
|
||||
abstract class Serverable(name: String) extends CompositeService(name) {
|
||||
|
||||
private val OOMHook = new Runnable { override def run(): Unit = stop() }
|
||||
private val started = new AtomicBoolean(false)
|
||||
|
||||
val backendService: AbstractBackendService
|
||||
private lazy val frontendService = new ThriftFrontendService(backendService, OOMHook)
|
||||
val frontendService: AbstractFrontendService
|
||||
protected def supportsServiceDiscovery: Boolean
|
||||
val discoveryService: Service
|
||||
|
||||
|
||||
@ -38,7 +38,7 @@ import org.apache.kyuubi.session.SessionHandle
|
||||
import org.apache.kyuubi.util.{ExecutorPoolCaptureOom, KyuubiHadoopUtils, NamedThreadFactory}
|
||||
|
||||
class ThriftFrontendService private(name: String, be: BackendService, oomHook: Runnable)
|
||||
extends AbstractService(name) with TCLIService.Iface with Runnable with Logging {
|
||||
extends AbstractFrontendService(name, be) with TCLIService.Iface with Runnable with Logging {
|
||||
|
||||
import ThriftFrontendService._
|
||||
import KyuubiConf._
|
||||
@ -105,7 +105,7 @@ class ThriftFrontendService private(name: String, be: BackendService, oomHook: R
|
||||
super.initialize(conf)
|
||||
}
|
||||
|
||||
def connectionUrl(server: Boolean = false): String = {
|
||||
override def connectionUrl(server: Boolean = false): String = {
|
||||
getServiceState match {
|
||||
case s @ ServiceState.LATENT => throw new IllegalStateException(s"Illegal Service State: $s")
|
||||
case _ =>
|
||||
|
||||
@ -20,7 +20,11 @@ package org.apache.kyuubi.service
|
||||
import org.apache.kyuubi.KyuubiException
|
||||
|
||||
class NoopServer extends Serverable("noop") {
|
||||
|
||||
private val OOMHook = new Runnable { override def run(): Unit = stop() }
|
||||
|
||||
override val backendService = new NoopBackendService
|
||||
override val frontendService = new ThriftFrontendService(backendService, OOMHook)
|
||||
|
||||
override def start(): Unit = {
|
||||
super.start()
|
||||
|
||||
@ -0,0 +1,54 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.server
|
||||
|
||||
import org.apache.kyuubi.Logging
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.service.{AbstractFrontendService, BackendService, ServiceState, ThriftFrontendService}
|
||||
|
||||
/**
|
||||
* A kyuubi frontend service is a kind of composite service, which used to
|
||||
* composite multiple frontend services for kyuubi server.
|
||||
*/
|
||||
class KyuubiFrontendService private(name: String, be: BackendService)
|
||||
extends AbstractFrontendService(name, be) with Logging {
|
||||
|
||||
private val OOMHook = new Runnable { override def run(): Unit = stop() }
|
||||
|
||||
def this(be: BackendService) = {
|
||||
this(classOf[KyuubiFrontendService].getSimpleName, be)
|
||||
}
|
||||
|
||||
override def connectionUrl(server: Boolean): String = {
|
||||
getServiceState match {
|
||||
case s @ ServiceState.LATENT => throw new IllegalStateException(s"Illegal Service State: $s")
|
||||
case _ =>
|
||||
val defaultFEService = getServices(0).asInstanceOf[AbstractFrontendService]
|
||||
if (defaultFEService != null) {
|
||||
defaultFEService.connectionUrl(server)
|
||||
} else {
|
||||
throw new IllegalStateException("Can not find frontend services!")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def initialize(conf: KyuubiConf): Unit = {
|
||||
addService(new ThriftFrontendService(be, OOMHook))
|
||||
super.initialize(conf)
|
||||
}
|
||||
}
|
||||
@ -82,6 +82,7 @@ class KyuubiServer(name: String) extends Serverable(name) {
|
||||
def this() = this(classOf[KyuubiServer].getSimpleName)
|
||||
|
||||
override val backendService: AbstractBackendService = new KyuubiBackendService()
|
||||
override val frontendService = new KyuubiFrontendService(backendService)
|
||||
override protected def supportsServiceDiscovery: Boolean = {
|
||||
ServiceDiscovery.supportServiceDiscovery(conf)
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user