[KYUUBI #1358] Add KyuubiMySQLFrontendService stub
### _Why are the changes needed?_ Add KyuubiMySQLFrontendService stub, without Netty pipeline, part of #1334 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1358 from pan3793/mysql-fe-stub. Closes #1358 c091d955 [Cheng Pan] nit 61abb0f6 [Cheng Pan] Address comments 9ac70456 [Cheng Pan] Update conf 3d75acdf [Cheng Pan] nit 4a4d8a24 [Cheng Pan] Address comments 0136dd52 [Cheng Pan] nit 477474ff [Cheng Pan] KyuubiMySQLFrontendService Stub Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
1d9b98e119
commit
1061d176c8
@ -25,3 +25,6 @@ log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %p %
|
||||
# Set the default kyuubi-ctl log level to WARN. When running the kyuubi-ctl, the
|
||||
# log level for this class is used to overwrite the root logger's log level.
|
||||
log4j.logger.org.apache.kyuubi.ctl.ServiceControlCli=ERROR
|
||||
|
||||
# Analysis MySQLFrontend protocol traffic
|
||||
# log4j.logger.org.apache.kyuubi.server.mysql.codec=TRACE
|
||||
|
||||
@ -205,6 +205,12 @@ kyuubi\.frontend\.login<br>\.timeout|<div style='width: 65pt;word-wrap: break-wo
|
||||
kyuubi\.frontend\.max<br>\.message\.size|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>104857600</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>(deprecated) Maximum message size in bytes a Kyuubi server will accept.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
|
||||
kyuubi\.frontend\.max<br>\.worker\.threads|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>999</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>(deprecated) Maximum number of threads in the of frontend worker thread pool for the thrift frontend service</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
|
||||
kyuubi\.frontend\.min<br>\.worker\.threads|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>9</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>(deprecated) Minimum number of threads in the of frontend worker thread pool for the thrift frontend service</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
|
||||
kyuubi\.frontend\.mysql<br>\.bind\.host|<div style='width: 65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Hostname or IP of the machine on which to run the MySQL frontend service.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
|
||||
kyuubi\.frontend\.mysql<br>\.bind\.port|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>3309</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Port of the machine on which to run the MySQL frontend service.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
|
||||
kyuubi\.frontend\.mysql<br>\.max\.worker\.threads|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>999</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Maximum number of threads in the command execution thread pool for the MySQL frontend service</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
|
||||
kyuubi\.frontend\.mysql<br>\.min\.worker\.threads|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>9</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Minimum number of threads in the command execution thread pool for the MySQL frontend service</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
|
||||
kyuubi\.frontend\.mysql<br>\.netty\.worker\.threads|<div style='width: 65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Number of thread in the netty worker event loop of MySQL frontend service. Use min(cpu_cores, 8) in default.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
|
||||
kyuubi\.frontend\.mysql<br>\.worker\.keepalive\.time|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Time(ms) that an idle async thread of the command execution thread pool will wait for a new task to arrive before terminating in MySQL frontend service</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
|
||||
kyuubi\.frontend<br>\.protocols|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>THRIFT_BINARY</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list for all frontend protocols <ul> <li>THRIFT_BINARY - HiveServer2 compatible thrift binary protocol.</li> <li>REST - Kyuubi defined REST API(experimental).</li> </ul></div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.4.0</div>
|
||||
kyuubi\.frontend\.rest<br>\.bind\.host|<div style='width: 65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Hostname or IP of the machine on which to run the REST frontend service.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
|
||||
kyuubi\.frontend\.rest<br>\.bind\.port|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>10099</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Port of the machine on which to run the REST frontend service.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
|
||||
@ -373,6 +379,9 @@ log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %p %
|
||||
# Set the default kyuubi-ctl log level to WARN. When running the kyuubi-ctl, the
|
||||
# log level for this class is used to overwrite the root logger's log level.
|
||||
log4j.logger.org.apache.kyuubi.ctl.ServiceControlCli=ERROR
|
||||
|
||||
# Analysis MySQLFrontend protocol traffic
|
||||
# log4j.logger.org.apache.kyuubi.server.mysql.codec=TRACE
|
||||
```
|
||||
|
||||
## Other Configurations
|
||||
|
||||
@ -98,6 +98,11 @@
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-all</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-minikdc</artifactId>
|
||||
|
||||
@ -27,6 +27,7 @@ import scala.collection.JavaConverters._
|
||||
import org.apache.kyuubi.{Logging, Utils}
|
||||
import org.apache.kyuubi.engine.{EngineType, ShareLevel}
|
||||
import org.apache.kyuubi.service.authentication.{AuthTypes, SaslQOP}
|
||||
import org.apache.kyuubi.util.NettyUtils.MAX_NETTY_THREADS
|
||||
|
||||
case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
|
||||
import KyuubiConf._
|
||||
@ -136,6 +137,8 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
|
||||
FRONTEND_THRIFT_BINARY_BIND_PORT,
|
||||
FRONTEND_REST_BIND_HOST,
|
||||
FRONTEND_REST_BIND_PORT,
|
||||
FRONTEND_MYSQL_BIND_HOST,
|
||||
FRONTEND_MYSQL_BIND_PORT,
|
||||
AUTHENTICATION_METHOD,
|
||||
KINIT_INTERVAL)
|
||||
|
||||
@ -274,7 +277,6 @@ object KyuubiConf {
|
||||
s"the frontend protocol should be one or more of ${FrontendProtocols.values.mkString(",")}")
|
||||
.createWithDefault(Seq(FrontendProtocols.THRIFT_BINARY.toString))
|
||||
|
||||
@deprecated(s"using ${FRONTEND_THRIFT_BINARY_BIND_HOST.key} instead", "1.4.0")
|
||||
val FRONTEND_BIND_HOST: OptionalConfigEntry[String] = buildConf("frontend.bind.host")
|
||||
.doc("(deprecated) Hostname or IP of the machine on which to run the thrift frontend service " +
|
||||
"via binary protocol.")
|
||||
@ -300,11 +302,10 @@ object KyuubiConf {
|
||||
|
||||
val FRONTEND_THRIFT_BINARY_BIND_PORT: ConfigEntry[Int] =
|
||||
buildConf("frontend.thrift.binary.bind.port")
|
||||
.doc("Port of the machine on which to run the thrift frontend service via binary protocol.")
|
||||
.version("1.4.0")
|
||||
.fallbackConf(FRONTEND_BIND_PORT)
|
||||
.doc("Port of the machine on which to run the thrift frontend service via binary protocol.")
|
||||
.version("1.4.0")
|
||||
.fallbackConf(FRONTEND_BIND_PORT)
|
||||
|
||||
@deprecated(s"using ${FRONTEND_THRIFT_MIN_WORKER_THREADS.key} instead", "1.4.0")
|
||||
val FRONTEND_MIN_WORKER_THREADS: ConfigEntry[Int] = buildConf("frontend.min.worker.threads")
|
||||
.doc("(deprecated) Minimum number of threads in the of frontend worker thread pool for " +
|
||||
"the thrift frontend service")
|
||||
@ -319,7 +320,6 @@ object KyuubiConf {
|
||||
.version("1.4.0")
|
||||
.fallbackConf(FRONTEND_MIN_WORKER_THREADS)
|
||||
|
||||
@deprecated(s"using ${FRONTEND_THRIFT_MAX_WORKER_THREADS.key} instead", "1.4.0")
|
||||
val FRONTEND_MAX_WORKER_THREADS: ConfigEntry[Int] = buildConf("frontend.max.worker.threads")
|
||||
.doc("(deprecated) Maximum number of threads in the of frontend worker thread pool for " +
|
||||
"the thrift frontend service")
|
||||
@ -334,7 +334,6 @@ object KyuubiConf {
|
||||
.version("1.4.0")
|
||||
.fallbackConf(FRONTEND_MAX_WORKER_THREADS)
|
||||
|
||||
@deprecated(s"using ${FRONTEND_THRIFT_WORKER_KEEPALIVE_TIME.key} instead", "1.4.0")
|
||||
val FRONTEND_WORKER_KEEPALIVE_TIME: ConfigEntry[Long] =
|
||||
buildConf("frontend.worker.keepalive.time")
|
||||
.doc("(deprecated) Keep-alive time (in milliseconds) for an idle worker thread")
|
||||
@ -348,7 +347,7 @@ object KyuubiConf {
|
||||
.version("1.4.0")
|
||||
.fallbackConf(FRONTEND_WORKER_KEEPALIVE_TIME)
|
||||
|
||||
@deprecated(s"using ${FRONTEND_THRIFT_WORKER_KEEPALIVE_TIME.key} instead", "1.4.0")
|
||||
@deprecated(s"using ${FRONTEND_THRIFT_MAX_MESSAGE_SIZE.key} instead", "1.4.0")
|
||||
val FRONTEND_MAX_MESSAGE_SIZE: ConfigEntry[Int] =
|
||||
buildConf("frontend.max.message.size")
|
||||
.doc("(deprecated) Maximum message size in bytes a Kyuubi server will accept.")
|
||||
@ -486,11 +485,11 @@ object KyuubiConf {
|
||||
.transform(_.toLowerCase(Locale.ROOT))
|
||||
.createWithDefault(SaslQOP.AUTH.toString)
|
||||
|
||||
val FRONTEND_REST_BIND_HOST: OptionalConfigEntry[String] = buildConf("frontend.rest.bind.host")
|
||||
.doc("Hostname or IP of the machine on which to run the REST frontend service.")
|
||||
.version("1.4.0")
|
||||
.stringConf
|
||||
.createOptional
|
||||
val FRONTEND_REST_BIND_HOST: ConfigEntry[Option[String]] =
|
||||
buildConf("frontend.rest.bind.host")
|
||||
.doc("Hostname or IP of the machine on which to run the REST frontend service.")
|
||||
.version("1.4.0")
|
||||
.fallbackConf(FRONTEND_BIND_HOST)
|
||||
|
||||
val FRONTEND_REST_BIND_PORT: ConfigEntry[Int] = buildConf("frontend.rest.bind.port")
|
||||
.doc("Port of the machine on which to run the REST frontend service.")
|
||||
@ -499,6 +498,50 @@ object KyuubiConf {
|
||||
.checkValue(p => p == 0 || (p > 1024 && p < 65535), "Invalid Port number")
|
||||
.createWithDefault(10099)
|
||||
|
||||
val FRONTEND_MYSQL_BIND_HOST: ConfigEntry[Option[String]] =
|
||||
buildConf("frontend.mysql.bind.host")
|
||||
.doc("Hostname or IP of the machine on which to run the MySQL frontend service.")
|
||||
.version("1.4.0")
|
||||
.fallbackConf(FRONTEND_BIND_HOST)
|
||||
|
||||
val FRONTEND_MYSQL_BIND_PORT: ConfigEntry[Int] = buildConf("frontend.mysql.bind.port")
|
||||
.doc("Port of the machine on which to run the MySQL frontend service.")
|
||||
.version("1.4.0")
|
||||
.intConf
|
||||
.checkValue(p => p == 0 || (p > 1024 && p < 65535), "Invalid Port number")
|
||||
.createWithDefault(3309)
|
||||
|
||||
val FRONTEND_MYSQL_NETTY_WORKER_THREADS: OptionalConfigEntry[Int] =
|
||||
buildConf("frontend.mysql.netty.worker.threads")
|
||||
.doc("Number of thread in the netty worker event loop of MySQL frontend service. " +
|
||||
s"Use min(cpu_cores, $MAX_NETTY_THREADS) in default.")
|
||||
.version("1.4.0")
|
||||
.intConf
|
||||
.checkValue(n => n > 0 && n <= MAX_NETTY_THREADS,
|
||||
s"Invalid thread number, must in (0, $MAX_NETTY_THREADS]")
|
||||
.createOptional
|
||||
|
||||
val FRONTEND_MYSQL_MIN_WORKER_THREADS: ConfigEntry[Int] =
|
||||
buildConf("frontend.mysql.min.worker.threads")
|
||||
.doc("Minimum number of threads in the command execution thread pool for the MySQL " +
|
||||
"frontend service")
|
||||
.version("1.4.0")
|
||||
.fallbackConf(FRONTEND_MIN_WORKER_THREADS)
|
||||
|
||||
val FRONTEND_MYSQL_MAX_WORKER_THREADS: ConfigEntry[Int] =
|
||||
buildConf("frontend.mysql.max.worker.threads")
|
||||
.doc("Maximum number of threads in the command execution thread pool for the MySQL " +
|
||||
"frontend service")
|
||||
.version("1.4.0")
|
||||
.fallbackConf(FRONTEND_MAX_WORKER_THREADS)
|
||||
|
||||
val FRONTEND_MYSQL_WORKER_KEEPALIVE_TIME: ConfigEntry[Long] =
|
||||
buildConf("frontend.mysql.worker.keepalive.time")
|
||||
.doc("Time(ms) that an idle async thread of the command execution thread pool will wait" +
|
||||
" for a new task to arrive before terminating in MySQL frontend service")
|
||||
.version("1.4.0")
|
||||
.fallbackConf(FRONTEND_WORKER_KEEPALIVE_TIME)
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// SQL Engine Configuration //
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@ -0,0 +1,71 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.util
|
||||
|
||||
import java.util.concurrent.ThreadFactory
|
||||
|
||||
import scala.math.min
|
||||
|
||||
import io.netty.channel._
|
||||
import io.netty.channel.epoll._
|
||||
import io.netty.channel.nio.NioEventLoopGroup
|
||||
import io.netty.channel.socket.nio.{NioServerSocketChannel, NioSocketChannel}
|
||||
import io.netty.util.concurrent.DefaultThreadFactory
|
||||
|
||||
object NettyUtils {
|
||||
|
||||
/**
|
||||
* Specifies an upper bound on the number of Netty threads that Kyuubi requires by default.
|
||||
* In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core
|
||||
* that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes
|
||||
* at a premium.
|
||||
*
|
||||
* Thus, this value should still retain maximum throughput and reduce wasted off-heap memory
|
||||
* allocation.
|
||||
*/
|
||||
val MAX_NETTY_THREADS: Int = 8
|
||||
|
||||
val EPOLL_MODE: Boolean = Epoll.isAvailable
|
||||
|
||||
val CLIENT_CHANNEL_CLASS: Class[_ <: Channel] =
|
||||
if (EPOLL_MODE) classOf[EpollSocketChannel] else classOf[NioSocketChannel]
|
||||
|
||||
val SERVER_CHANNEL_CLASS: Class[_ <: ServerChannel] =
|
||||
if (EPOLL_MODE) classOf[EpollServerSocketChannel] else classOf[NioServerSocketChannel]
|
||||
|
||||
def createThreadFactory(threadPoolPrefix: String): ThreadFactory =
|
||||
new DefaultThreadFactory(threadPoolPrefix, true)
|
||||
|
||||
def createEventLoop(numThreads: Int, threadPrefix: String): EventLoopGroup = {
|
||||
val threadFactory = createThreadFactory(threadPrefix)
|
||||
if (EPOLL_MODE) {
|
||||
new EpollEventLoopGroup(numThreads, threadFactory)
|
||||
} else {
|
||||
new NioEventLoopGroup(numThreads, threadFactory)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the default number of threads for the Netty thread pools. If numUsableCores is absent,
|
||||
* we will use Runtime get an approximate number of available cores.
|
||||
*/
|
||||
def defaultNumThreads(numUsableCores: Option[Int]): Int = numUsableCores match {
|
||||
case Some(num) => min(num, MAX_NETTY_THREADS)
|
||||
case None => min(sys.runtime.availableProcessors, MAX_NETTY_THREADS)
|
||||
}
|
||||
}
|
||||
@ -202,11 +202,6 @@
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-all</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.glassfish.jersey.core</groupId>
|
||||
<artifactId>jersey-server</artifactId>
|
||||
|
||||
@ -0,0 +1,126 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.server
|
||||
|
||||
import java.net.{InetAddress, InetSocketAddress}
|
||||
import java.util.concurrent.{ThreadPoolExecutor, TimeUnit}
|
||||
|
||||
import io.netty.bootstrap.ServerBootstrap
|
||||
import io.netty.buffer.PooledByteBufAllocator
|
||||
import io.netty.channel.{ChannelFuture, ChannelInitializer, ChannelOption}
|
||||
import io.netty.channel.socket.SocketChannel
|
||||
import io.netty.handler.logging.{LoggingHandler, LogLevel}
|
||||
|
||||
import org.apache.kyuubi.{KyuubiException, Logging, Utils}
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.KyuubiConf._
|
||||
import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service}
|
||||
import org.apache.kyuubi.util.ExecutorPoolCaptureOom
|
||||
import org.apache.kyuubi.util.NettyUtils._
|
||||
|
||||
/**
|
||||
* A frontend service implement MySQL protocol.
|
||||
*/
|
||||
class KyuubiMySQLFrontendService(override val serverable: Serverable)
|
||||
extends AbstractFrontendService("KyuubiMySQLFrontendService") with Logging {
|
||||
|
||||
private var execPool: ThreadPoolExecutor = _
|
||||
|
||||
private var serverAddr: InetAddress = _
|
||||
private var port: Int = _
|
||||
private var bootstrap: ServerBootstrap = _
|
||||
private var bindFuture: ChannelFuture = _
|
||||
|
||||
@volatile protected var isStarted = false
|
||||
|
||||
protected def oomHook: Runnable = () => serverable.stop()
|
||||
|
||||
override def initialize(conf: KyuubiConf): Unit = synchronized {
|
||||
val minThreads = conf.get(FRONTEND_MYSQL_MIN_WORKER_THREADS)
|
||||
val maxThreads = conf.get(FRONTEND_MYSQL_MAX_WORKER_THREADS)
|
||||
val keepAliveMs = conf.get(FRONTEND_MYSQL_WORKER_KEEPALIVE_TIME)
|
||||
execPool = ExecutorPoolCaptureOom(
|
||||
"mysql-exec-pool",
|
||||
minThreads, maxThreads,
|
||||
keepAliveMs,
|
||||
oomHook)
|
||||
|
||||
serverAddr = conf.get(FRONTEND_MYSQL_BIND_HOST)
|
||||
.map(InetAddress.getByName)
|
||||
.getOrElse(Utils.findLocalInetAddress)
|
||||
port = conf.get(FRONTEND_MYSQL_BIND_PORT)
|
||||
val workerThreads = defaultNumThreads(conf.get(FRONTEND_MYSQL_NETTY_WORKER_THREADS))
|
||||
val bossGroup = createEventLoop(1, "mysql-netty-boss")
|
||||
val workerGroup = createEventLoop(workerThreads, "mysql-netty-worker")
|
||||
bootstrap = new ServerBootstrap()
|
||||
.group(bossGroup, workerGroup)
|
||||
.channel(SERVER_CHANNEL_CLASS)
|
||||
.option(ChannelOption.SO_BACKLOG, Int.box(128))
|
||||
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
|
||||
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
|
||||
.childOption(ChannelOption.TCP_NODELAY, Boolean.box(true))
|
||||
.childHandler(new ChannelInitializer[SocketChannel] {
|
||||
override def initChannel(channel: SocketChannel): Unit = channel.pipeline
|
||||
.addLast(new LoggingHandler("org.apache.kyuubi.server.mysql.codec", LogLevel.TRACE))
|
||||
// TODO implement authentication, codec, command handler
|
||||
})
|
||||
super.initialize(conf)
|
||||
}
|
||||
|
||||
override def connectionUrl: String = {
|
||||
checkInitialized()
|
||||
s"${serverAddr.getCanonicalHostName}:$port"
|
||||
}
|
||||
|
||||
override def start(): Unit = synchronized {
|
||||
if (!isStarted) {
|
||||
try {
|
||||
bindFuture = bootstrap.bind(serverAddr, port)
|
||||
bindFuture.syncUninterruptibly
|
||||
port = bindFuture.channel.localAddress.asInstanceOf[InetSocketAddress].getPort
|
||||
isStarted = true
|
||||
info(s"MySQL frontend service has started at $connectionUrl.")
|
||||
} catch {
|
||||
case rethrow: Exception =>
|
||||
throw new KyuubiException("Cannot start MySQL frontend service Netty server", rethrow)
|
||||
}
|
||||
}
|
||||
super.start()
|
||||
}
|
||||
|
||||
override def stop(): Unit = synchronized {
|
||||
if (isStarted) {
|
||||
if (bindFuture != null) {
|
||||
// close is a local operation and should finish within milliseconds; timeout just to be safe
|
||||
bindFuture.channel.close.awaitUninterruptibly(10, TimeUnit.SECONDS)
|
||||
bindFuture = null
|
||||
}
|
||||
if (bootstrap != null && bootstrap.config.group != null) {
|
||||
bootstrap.config.group.shutdownGracefully
|
||||
}
|
||||
if (bootstrap != null && bootstrap.config.childGroup != null) {
|
||||
bootstrap.config.childGroup.shutdownGracefully
|
||||
}
|
||||
bootstrap = null
|
||||
isStarted = false
|
||||
}
|
||||
super.stop()
|
||||
}
|
||||
|
||||
override val discoveryService: Option[Service] = None
|
||||
}
|
||||
@ -34,12 +34,12 @@ import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service}
|
||||
* Note: Currently, it only be used in the Kyuubi Server side.
|
||||
*/
|
||||
class KyuubiRestFrontendService(override val serverable: Serverable)
|
||||
extends AbstractFrontendService("RestFrontendService") with Logging {
|
||||
extends AbstractFrontendService("KyuubiRestFrontendService") with Logging {
|
||||
|
||||
var serverAddr: InetAddress = _
|
||||
var portNum: Int = _
|
||||
var jettyServer: Server = _
|
||||
var connector: ServerConnector = _
|
||||
private var serverAddr: InetAddress = _
|
||||
private var portNum: Int = _
|
||||
private var jettyServer: Server = _
|
||||
private var connector: ServerConnector = _
|
||||
|
||||
@volatile protected var isStarted = false
|
||||
|
||||
@ -84,13 +84,13 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
|
||||
try {
|
||||
connector.start()
|
||||
jettyServer.start()
|
||||
isStarted = true
|
||||
info(s"Rest frontend service jetty server has started at ${jettyServer.getURI}.")
|
||||
} catch {
|
||||
case rethrow: Exception =>
|
||||
stopHttpServer()
|
||||
throw new KyuubiException("Cannot start rest frontend service jetty server", rethrow)
|
||||
}
|
||||
isStarted = true
|
||||
}
|
||||
|
||||
super.start()
|
||||
|
||||
@ -18,6 +18,9 @@
|
||||
# Set everything to be logged to the file target/unit-tests.log
|
||||
log4j.rootLogger=INFO, CA, FA
|
||||
|
||||
# Analysis MySQLFrontend protocol traffic
|
||||
# log4j.logger.org.apache.kyuubi.server.mysql.codec=TRACE
|
||||
|
||||
#Console Appender
|
||||
log4j.appender.CA=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.CA.layout=org.apache.log4j.PatternLayout
|
||||
|
||||
@ -57,7 +57,7 @@ trait RestFrontendTestHelper {
|
||||
val server = new NoopRestFrontendServer()
|
||||
server.stop()
|
||||
val conf = KyuubiConf()
|
||||
conf.set(KyuubiConf.FRONTEND_REST_BIND_HOST, restFrontendHost)
|
||||
conf.set(KyuubiConf.FRONTEND_REST_BIND_HOST, Some(restFrontendHost))
|
||||
|
||||
server.initialize(conf)
|
||||
server.start()
|
||||
|
||||
@ -0,0 +1,56 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.server
|
||||
|
||||
import org.apache.kyuubi.KyuubiFunSuite
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.service.NoopMySQLFrontendServer
|
||||
import org.apache.kyuubi.service.ServiceState._
|
||||
|
||||
class KyuubiMySQLFrontendServiceSuite extends KyuubiFunSuite {
|
||||
|
||||
test("Kyuubi MySQL frontend service basic") {
|
||||
val server = new NoopMySQLFrontendServer
|
||||
server.stop()
|
||||
val conf = KyuubiConf()
|
||||
assert(server.getServices.isEmpty)
|
||||
assert(server.getServiceState === LATENT)
|
||||
val e = intercept[IllegalStateException](server.frontendServices.head.connectionUrl)
|
||||
assert(e.getMessage startsWith "Illegal Service State: LATENT")
|
||||
assert(server.getConf === null)
|
||||
|
||||
server.initialize(conf)
|
||||
assert(server.getServiceState === INITIALIZED)
|
||||
val frontendService = server.frontendServices.head
|
||||
assert(frontendService.getServiceState == INITIALIZED)
|
||||
assert(server.frontendServices.head.connectionUrl.split(":").length === 2)
|
||||
assert(server.getConf === conf)
|
||||
assert(server.getStartTime === 0)
|
||||
server.stop()
|
||||
|
||||
server.start()
|
||||
assert(server.getServiceState === STARTED)
|
||||
assert(frontendService.getServiceState == STARTED)
|
||||
assert(server.getStartTime !== 0)
|
||||
|
||||
server.stop()
|
||||
assert(server.getServiceState === STOPPED)
|
||||
assert(frontendService.getServiceState == STOPPED)
|
||||
server.stop()
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.service
|
||||
|
||||
import org.apache.kyuubi.server.KyuubiMySQLFrontendService
|
||||
|
||||
class NoopMySQLFrontendServer extends AbstractNoopServer("NoopMySQLFrontendServer") {
|
||||
|
||||
override val frontendServices = Seq(new KyuubiMySQLFrontendService(this))
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user