From 1061d176c832967c57be60fb4c908191ddc2dfd9 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 11 Nov 2021 16:50:52 +0800 Subject: [PATCH] [KYUUBI #1358] Add KyuubiMySQLFrontendService stub ### _Why are the changes needed?_ Add KyuubiMySQLFrontendService stub, without Netty pipeline, part of #1334 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1358 from pan3793/mysql-fe-stub. Closes #1358 c091d955 [Cheng Pan] nit 61abb0f6 [Cheng Pan] Address comments 9ac70456 [Cheng Pan] Update conf 3d75acdf [Cheng Pan] nit 4a4d8a24 [Cheng Pan] Address comments 0136dd52 [Cheng Pan] nit 477474ff [Cheng Pan] KyuubiMySQLFrontendService Stub Authored-by: Cheng Pan Signed-off-by: Cheng Pan --- conf/log4j.properties.template | 3 + docs/deployment/settings.md | 9 ++ kyuubi-common/pom.xml | 5 + .../org/apache/kyuubi/config/KyuubiConf.scala | 69 ++++++++-- .../org/apache/kyuubi/util/NettyUtils.scala | 71 ++++++++++ kyuubi-server/pom.xml | 5 - .../server/KyuubiMySQLFrontendService.scala | 126 ++++++++++++++++++ .../server/KyuubiRestFrontendService.scala | 12 +- .../src/test/resources/log4j.properties | 3 + .../kyuubi/RestFrontendTestHelper.scala | 2 +- .../KyuubiMySQLFrontendServiceSuite.scala | 56 ++++++++ .../service/NoopMySQLFrontendServer.scala | 25 ++++ 12 files changed, 361 insertions(+), 25 deletions(-) create mode 100644 kyuubi-common/src/main/scala/org/apache/kyuubi/util/NettyUtils.scala create mode 100644 kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendService.scala create mode 100644 kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendServiceSuite.scala create mode 100644 kyuubi-server/src/test/scala/org/apache/kyuubi/service/NoopMySQLFrontendServer.scala diff --git a/conf/log4j.properties.template b/conf/log4j.properties.template index 4543adbbe..82cf9b14e 100644 --- a/conf/log4j.properties.template +++ b/conf/log4j.properties.template @@ -25,3 +25,6 @@ log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %p % # Set the default kyuubi-ctl log level to WARN. When running the kyuubi-ctl, the # log level for this class is used to overwrite the root logger's log level. log4j.logger.org.apache.kyuubi.ctl.ServiceControlCli=ERROR + +# Analysis MySQLFrontend protocol traffic +# log4j.logger.org.apache.kyuubi.server.mysql.codec=TRACE diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index 8ac415fb6..1e8fe97ee 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -205,6 +205,12 @@ kyuubi\.frontend\.login
\.timeout|
104857600
|
(deprecated) Maximum message size in bytes a Kyuubi server will accept.
|
int
|
1.0.0
kyuubi\.frontend\.max
\.worker\.threads|
999
|
(deprecated) Maximum number of threads in the of frontend worker thread pool for the thrift frontend service
|
int
|
1.0.0
kyuubi\.frontend\.min
\.worker\.threads|
9
|
(deprecated) Minimum number of threads in the of frontend worker thread pool for the thrift frontend service
|
int
|
1.0.0
+kyuubi\.frontend\.mysql
\.bind\.host|
<undefined>
|
Hostname or IP of the machine on which to run the MySQL frontend service.
|
string
|
1.4.0
+kyuubi\.frontend\.mysql
\.bind\.port|
3309
|
Port of the machine on which to run the MySQL frontend service.
|
int
|
1.4.0
+kyuubi\.frontend\.mysql
\.max\.worker\.threads|
999
|
Maximum number of threads in the command execution thread pool for the MySQL frontend service
|
int
|
1.4.0
+kyuubi\.frontend\.mysql
\.min\.worker\.threads|
9
|
Minimum number of threads in the command execution thread pool for the MySQL frontend service
|
int
|
1.4.0
+kyuubi\.frontend\.mysql
\.netty\.worker\.threads|
<undefined>
|
Number of thread in the netty worker event loop of MySQL frontend service. Use min(cpu_cores, 8) in default.
|
int
|
1.4.0
+kyuubi\.frontend\.mysql
\.worker\.keepalive\.time|
PT1M
|
Time(ms) that an idle async thread of the command execution thread pool will wait for a new task to arrive before terminating in MySQL frontend service
|
duration
|
1.4.0
kyuubi\.frontend
\.protocols|
THRIFT_BINARY
|
A comma separated list for all frontend protocols
  • THRIFT_BINARY - HiveServer2 compatible thrift binary protocol.
  • REST - Kyuubi defined REST API(experimental).
|
seq
|
1.4.0
kyuubi\.frontend\.rest
\.bind\.host|
<undefined>
|
Hostname or IP of the machine on which to run the REST frontend service.
|
string
|
1.4.0
kyuubi\.frontend\.rest
\.bind\.port|
10099
|
Port of the machine on which to run the REST frontend service.
|
int
|
1.4.0
@@ -373,6 +379,9 @@ log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %p % # Set the default kyuubi-ctl log level to WARN. When running the kyuubi-ctl, the # log level for this class is used to overwrite the root logger's log level. log4j.logger.org.apache.kyuubi.ctl.ServiceControlCli=ERROR + +# Analysis MySQLFrontend protocol traffic +# log4j.logger.org.apache.kyuubi.server.mysql.codec=TRACE ``` ## Other Configurations diff --git a/kyuubi-common/pom.xml b/kyuubi-common/pom.xml index 02988bd18..90923f97e 100644 --- a/kyuubi-common/pom.xml +++ b/kyuubi-common/pom.xml @@ -98,6 +98,11 @@ jackson-databind + + io.netty + netty-all + + org.apache.hadoop hadoop-minikdc diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index f738642b2..0a3eac33c 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -27,6 +27,7 @@ import scala.collection.JavaConverters._ import org.apache.kyuubi.{Logging, Utils} import org.apache.kyuubi.engine.{EngineType, ShareLevel} import org.apache.kyuubi.service.authentication.{AuthTypes, SaslQOP} +import org.apache.kyuubi.util.NettyUtils.MAX_NETTY_THREADS case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging { import KyuubiConf._ @@ -136,6 +137,8 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging { FRONTEND_THRIFT_BINARY_BIND_PORT, FRONTEND_REST_BIND_HOST, FRONTEND_REST_BIND_PORT, + FRONTEND_MYSQL_BIND_HOST, + FRONTEND_MYSQL_BIND_PORT, AUTHENTICATION_METHOD, KINIT_INTERVAL) @@ -274,7 +277,6 @@ object KyuubiConf { s"the frontend protocol should be one or more of ${FrontendProtocols.values.mkString(",")}") .createWithDefault(Seq(FrontendProtocols.THRIFT_BINARY.toString)) - @deprecated(s"using ${FRONTEND_THRIFT_BINARY_BIND_HOST.key} instead", "1.4.0") val FRONTEND_BIND_HOST: OptionalConfigEntry[String] = buildConf("frontend.bind.host") .doc("(deprecated) Hostname or IP of the machine on which to run the thrift frontend service " + "via binary protocol.") @@ -300,11 +302,10 @@ object KyuubiConf { val FRONTEND_THRIFT_BINARY_BIND_PORT: ConfigEntry[Int] = buildConf("frontend.thrift.binary.bind.port") - .doc("Port of the machine on which to run the thrift frontend service via binary protocol.") - .version("1.4.0") - .fallbackConf(FRONTEND_BIND_PORT) + .doc("Port of the machine on which to run the thrift frontend service via binary protocol.") + .version("1.4.0") + .fallbackConf(FRONTEND_BIND_PORT) - @deprecated(s"using ${FRONTEND_THRIFT_MIN_WORKER_THREADS.key} instead", "1.4.0") val FRONTEND_MIN_WORKER_THREADS: ConfigEntry[Int] = buildConf("frontend.min.worker.threads") .doc("(deprecated) Minimum number of threads in the of frontend worker thread pool for " + "the thrift frontend service") @@ -319,7 +320,6 @@ object KyuubiConf { .version("1.4.0") .fallbackConf(FRONTEND_MIN_WORKER_THREADS) - @deprecated(s"using ${FRONTEND_THRIFT_MAX_WORKER_THREADS.key} instead", "1.4.0") val FRONTEND_MAX_WORKER_THREADS: ConfigEntry[Int] = buildConf("frontend.max.worker.threads") .doc("(deprecated) Maximum number of threads in the of frontend worker thread pool for " + "the thrift frontend service") @@ -334,7 +334,6 @@ object KyuubiConf { .version("1.4.0") .fallbackConf(FRONTEND_MAX_WORKER_THREADS) - @deprecated(s"using ${FRONTEND_THRIFT_WORKER_KEEPALIVE_TIME.key} instead", "1.4.0") val FRONTEND_WORKER_KEEPALIVE_TIME: ConfigEntry[Long] = buildConf("frontend.worker.keepalive.time") .doc("(deprecated) Keep-alive time (in milliseconds) for an idle worker thread") @@ -348,7 +347,7 @@ object KyuubiConf { .version("1.4.0") .fallbackConf(FRONTEND_WORKER_KEEPALIVE_TIME) - @deprecated(s"using ${FRONTEND_THRIFT_WORKER_KEEPALIVE_TIME.key} instead", "1.4.0") + @deprecated(s"using ${FRONTEND_THRIFT_MAX_MESSAGE_SIZE.key} instead", "1.4.0") val FRONTEND_MAX_MESSAGE_SIZE: ConfigEntry[Int] = buildConf("frontend.max.message.size") .doc("(deprecated) Maximum message size in bytes a Kyuubi server will accept.") @@ -486,11 +485,11 @@ object KyuubiConf { .transform(_.toLowerCase(Locale.ROOT)) .createWithDefault(SaslQOP.AUTH.toString) - val FRONTEND_REST_BIND_HOST: OptionalConfigEntry[String] = buildConf("frontend.rest.bind.host") - .doc("Hostname or IP of the machine on which to run the REST frontend service.") - .version("1.4.0") - .stringConf - .createOptional + val FRONTEND_REST_BIND_HOST: ConfigEntry[Option[String]] = + buildConf("frontend.rest.bind.host") + .doc("Hostname or IP of the machine on which to run the REST frontend service.") + .version("1.4.0") + .fallbackConf(FRONTEND_BIND_HOST) val FRONTEND_REST_BIND_PORT: ConfigEntry[Int] = buildConf("frontend.rest.bind.port") .doc("Port of the machine on which to run the REST frontend service.") @@ -499,6 +498,50 @@ object KyuubiConf { .checkValue(p => p == 0 || (p > 1024 && p < 65535), "Invalid Port number") .createWithDefault(10099) + val FRONTEND_MYSQL_BIND_HOST: ConfigEntry[Option[String]] = + buildConf("frontend.mysql.bind.host") + .doc("Hostname or IP of the machine on which to run the MySQL frontend service.") + .version("1.4.0") + .fallbackConf(FRONTEND_BIND_HOST) + + val FRONTEND_MYSQL_BIND_PORT: ConfigEntry[Int] = buildConf("frontend.mysql.bind.port") + .doc("Port of the machine on which to run the MySQL frontend service.") + .version("1.4.0") + .intConf + .checkValue(p => p == 0 || (p > 1024 && p < 65535), "Invalid Port number") + .createWithDefault(3309) + + val FRONTEND_MYSQL_NETTY_WORKER_THREADS: OptionalConfigEntry[Int] = + buildConf("frontend.mysql.netty.worker.threads") + .doc("Number of thread in the netty worker event loop of MySQL frontend service. " + + s"Use min(cpu_cores, $MAX_NETTY_THREADS) in default.") + .version("1.4.0") + .intConf + .checkValue(n => n > 0 && n <= MAX_NETTY_THREADS, + s"Invalid thread number, must in (0, $MAX_NETTY_THREADS]") + .createOptional + + val FRONTEND_MYSQL_MIN_WORKER_THREADS: ConfigEntry[Int] = + buildConf("frontend.mysql.min.worker.threads") + .doc("Minimum number of threads in the command execution thread pool for the MySQL " + + "frontend service") + .version("1.4.0") + .fallbackConf(FRONTEND_MIN_WORKER_THREADS) + + val FRONTEND_MYSQL_MAX_WORKER_THREADS: ConfigEntry[Int] = + buildConf("frontend.mysql.max.worker.threads") + .doc("Maximum number of threads in the command execution thread pool for the MySQL " + + "frontend service") + .version("1.4.0") + .fallbackConf(FRONTEND_MAX_WORKER_THREADS) + + val FRONTEND_MYSQL_WORKER_KEEPALIVE_TIME: ConfigEntry[Long] = + buildConf("frontend.mysql.worker.keepalive.time") + .doc("Time(ms) that an idle async thread of the command execution thread pool will wait" + + " for a new task to arrive before terminating in MySQL frontend service") + .version("1.4.0") + .fallbackConf(FRONTEND_WORKER_KEEPALIVE_TIME) + ///////////////////////////////////////////////////////////////////////////////////////////////// // SQL Engine Configuration // ///////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NettyUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NettyUtils.scala new file mode 100644 index 000000000..c5b6e2e88 --- /dev/null +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NettyUtils.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.util + +import java.util.concurrent.ThreadFactory + +import scala.math.min + +import io.netty.channel._ +import io.netty.channel.epoll._ +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.socket.nio.{NioServerSocketChannel, NioSocketChannel} +import io.netty.util.concurrent.DefaultThreadFactory + +object NettyUtils { + + /** + * Specifies an upper bound on the number of Netty threads that Kyuubi requires by default. + * In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core + * that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes + * at a premium. + * + * Thus, this value should still retain maximum throughput and reduce wasted off-heap memory + * allocation. + */ + val MAX_NETTY_THREADS: Int = 8 + + val EPOLL_MODE: Boolean = Epoll.isAvailable + + val CLIENT_CHANNEL_CLASS: Class[_ <: Channel] = + if (EPOLL_MODE) classOf[EpollSocketChannel] else classOf[NioSocketChannel] + + val SERVER_CHANNEL_CLASS: Class[_ <: ServerChannel] = + if (EPOLL_MODE) classOf[EpollServerSocketChannel] else classOf[NioServerSocketChannel] + + def createThreadFactory(threadPoolPrefix: String): ThreadFactory = + new DefaultThreadFactory(threadPoolPrefix, true) + + def createEventLoop(numThreads: Int, threadPrefix: String): EventLoopGroup = { + val threadFactory = createThreadFactory(threadPrefix) + if (EPOLL_MODE) { + new EpollEventLoopGroup(numThreads, threadFactory) + } else { + new NioEventLoopGroup(numThreads, threadFactory) + } + } + + /** + * Returns the default number of threads for the Netty thread pools. If numUsableCores is absent, + * we will use Runtime get an approximate number of available cores. + */ + def defaultNumThreads(numUsableCores: Option[Int]): Int = numUsableCores match { + case Some(num) => min(num, MAX_NETTY_THREADS) + case None => min(sys.runtime.availableProcessors, MAX_NETTY_THREADS) + } +} diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml index 821bde624..ecf336bd1 100644 --- a/kyuubi-server/pom.xml +++ b/kyuubi-server/pom.xml @@ -202,11 +202,6 @@ - - io.netty - netty-all - - org.glassfish.jersey.core jersey-server diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendService.scala new file mode 100644 index 000000000..5c2800dd8 --- /dev/null +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendService.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.server + +import java.net.{InetAddress, InetSocketAddress} +import java.util.concurrent.{ThreadPoolExecutor, TimeUnit} + +import io.netty.bootstrap.ServerBootstrap +import io.netty.buffer.PooledByteBufAllocator +import io.netty.channel.{ChannelFuture, ChannelInitializer, ChannelOption} +import io.netty.channel.socket.SocketChannel +import io.netty.handler.logging.{LoggingHandler, LogLevel} + +import org.apache.kyuubi.{KyuubiException, Logging, Utils} +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf._ +import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service} +import org.apache.kyuubi.util.ExecutorPoolCaptureOom +import org.apache.kyuubi.util.NettyUtils._ + +/** + * A frontend service implement MySQL protocol. + */ +class KyuubiMySQLFrontendService(override val serverable: Serverable) + extends AbstractFrontendService("KyuubiMySQLFrontendService") with Logging { + + private var execPool: ThreadPoolExecutor = _ + + private var serverAddr: InetAddress = _ + private var port: Int = _ + private var bootstrap: ServerBootstrap = _ + private var bindFuture: ChannelFuture = _ + + @volatile protected var isStarted = false + + protected def oomHook: Runnable = () => serverable.stop() + + override def initialize(conf: KyuubiConf): Unit = synchronized { + val minThreads = conf.get(FRONTEND_MYSQL_MIN_WORKER_THREADS) + val maxThreads = conf.get(FRONTEND_MYSQL_MAX_WORKER_THREADS) + val keepAliveMs = conf.get(FRONTEND_MYSQL_WORKER_KEEPALIVE_TIME) + execPool = ExecutorPoolCaptureOom( + "mysql-exec-pool", + minThreads, maxThreads, + keepAliveMs, + oomHook) + + serverAddr = conf.get(FRONTEND_MYSQL_BIND_HOST) + .map(InetAddress.getByName) + .getOrElse(Utils.findLocalInetAddress) + port = conf.get(FRONTEND_MYSQL_BIND_PORT) + val workerThreads = defaultNumThreads(conf.get(FRONTEND_MYSQL_NETTY_WORKER_THREADS)) + val bossGroup = createEventLoop(1, "mysql-netty-boss") + val workerGroup = createEventLoop(workerThreads, "mysql-netty-worker") + bootstrap = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(SERVER_CHANNEL_CLASS) + .option(ChannelOption.SO_BACKLOG, Int.box(128)) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.TCP_NODELAY, Boolean.box(true)) + .childHandler(new ChannelInitializer[SocketChannel] { + override def initChannel(channel: SocketChannel): Unit = channel.pipeline + .addLast(new LoggingHandler("org.apache.kyuubi.server.mysql.codec", LogLevel.TRACE)) + // TODO implement authentication, codec, command handler + }) + super.initialize(conf) + } + + override def connectionUrl: String = { + checkInitialized() + s"${serverAddr.getCanonicalHostName}:$port" + } + + override def start(): Unit = synchronized { + if (!isStarted) { + try { + bindFuture = bootstrap.bind(serverAddr, port) + bindFuture.syncUninterruptibly + port = bindFuture.channel.localAddress.asInstanceOf[InetSocketAddress].getPort + isStarted = true + info(s"MySQL frontend service has started at $connectionUrl.") + } catch { + case rethrow: Exception => + throw new KyuubiException("Cannot start MySQL frontend service Netty server", rethrow) + } + } + super.start() + } + + override def stop(): Unit = synchronized { + if (isStarted) { + if (bindFuture != null) { + // close is a local operation and should finish within milliseconds; timeout just to be safe + bindFuture.channel.close.awaitUninterruptibly(10, TimeUnit.SECONDS) + bindFuture = null + } + if (bootstrap != null && bootstrap.config.group != null) { + bootstrap.config.group.shutdownGracefully + } + if (bootstrap != null && bootstrap.config.childGroup != null) { + bootstrap.config.childGroup.shutdownGracefully + } + bootstrap = null + isStarted = false + } + super.stop() + } + + override val discoveryService: Option[Service] = None +} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala index 8b8704428..f620a33c2 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala @@ -34,12 +34,12 @@ import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service} * Note: Currently, it only be used in the Kyuubi Server side. */ class KyuubiRestFrontendService(override val serverable: Serverable) - extends AbstractFrontendService("RestFrontendService") with Logging { + extends AbstractFrontendService("KyuubiRestFrontendService") with Logging { - var serverAddr: InetAddress = _ - var portNum: Int = _ - var jettyServer: Server = _ - var connector: ServerConnector = _ + private var serverAddr: InetAddress = _ + private var portNum: Int = _ + private var jettyServer: Server = _ + private var connector: ServerConnector = _ @volatile protected var isStarted = false @@ -84,13 +84,13 @@ class KyuubiRestFrontendService(override val serverable: Serverable) try { connector.start() jettyServer.start() + isStarted = true info(s"Rest frontend service jetty server has started at ${jettyServer.getURI}.") } catch { case rethrow: Exception => stopHttpServer() throw new KyuubiException("Cannot start rest frontend service jetty server", rethrow) } - isStarted = true } super.start() diff --git a/kyuubi-server/src/test/resources/log4j.properties b/kyuubi-server/src/test/resources/log4j.properties index 958c9c8d1..06881c997 100644 --- a/kyuubi-server/src/test/resources/log4j.properties +++ b/kyuubi-server/src/test/resources/log4j.properties @@ -18,6 +18,9 @@ # Set everything to be logged to the file target/unit-tests.log log4j.rootLogger=INFO, CA, FA +# Analysis MySQLFrontend protocol traffic +# log4j.logger.org.apache.kyuubi.server.mysql.codec=TRACE + #Console Appender log4j.appender.CA=org.apache.log4j.ConsoleAppender log4j.appender.CA.layout=org.apache.log4j.PatternLayout diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala index d222bb8c7..8c8775283 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala @@ -57,7 +57,7 @@ trait RestFrontendTestHelper { val server = new NoopRestFrontendServer() server.stop() val conf = KyuubiConf() - conf.set(KyuubiConf.FRONTEND_REST_BIND_HOST, restFrontendHost) + conf.set(KyuubiConf.FRONTEND_REST_BIND_HOST, Some(restFrontendHost)) server.initialize(conf) server.start() diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendServiceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendServiceSuite.scala new file mode 100644 index 000000000..735863b8b --- /dev/null +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendServiceSuite.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.server + +import org.apache.kyuubi.KyuubiFunSuite +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.service.NoopMySQLFrontendServer +import org.apache.kyuubi.service.ServiceState._ + +class KyuubiMySQLFrontendServiceSuite extends KyuubiFunSuite { + + test("Kyuubi MySQL frontend service basic") { + val server = new NoopMySQLFrontendServer + server.stop() + val conf = KyuubiConf() + assert(server.getServices.isEmpty) + assert(server.getServiceState === LATENT) + val e = intercept[IllegalStateException](server.frontendServices.head.connectionUrl) + assert(e.getMessage startsWith "Illegal Service State: LATENT") + assert(server.getConf === null) + + server.initialize(conf) + assert(server.getServiceState === INITIALIZED) + val frontendService = server.frontendServices.head + assert(frontendService.getServiceState == INITIALIZED) + assert(server.frontendServices.head.connectionUrl.split(":").length === 2) + assert(server.getConf === conf) + assert(server.getStartTime === 0) + server.stop() + + server.start() + assert(server.getServiceState === STARTED) + assert(frontendService.getServiceState == STARTED) + assert(server.getStartTime !== 0) + + server.stop() + assert(server.getServiceState === STOPPED) + assert(frontendService.getServiceState == STOPPED) + server.stop() + } +} diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/service/NoopMySQLFrontendServer.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/service/NoopMySQLFrontendServer.scala new file mode 100644 index 000000000..6e6fec323 --- /dev/null +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/service/NoopMySQLFrontendServer.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.service + +import org.apache.kyuubi.server.KyuubiMySQLFrontendService + +class NoopMySQLFrontendServer extends AbstractNoopServer("NoopMySQLFrontendServer") { + + override val frontendServices = Seq(new KyuubiMySQLFrontendService(this)) +}