From 5320cee50b68bc90604503e2ee0d559a5c76c79b Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 23 Oct 2020 00:26:40 +0800 Subject: [PATCH] set random port for KyuubiServerSuite --- .../kyuubi/service/FrontendService.scala | 39 +++++++++---------- .../kyuubi/server/KyuubiServerSuite.scala | 2 +- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/FrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/FrontendService.scala index dda8abe24..218857147 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/FrontendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/FrontendService.scala @@ -21,6 +21,7 @@ import java.net.{InetAddress, ServerSocket} import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ +import scala.language.implicitConversions import org.apache.hadoop.conf.Configuration import org.apache.hive.service.rpc.thrift._ @@ -72,14 +73,15 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab authFactory = new KyuubiAuthenticationFactory(conf) val transFactory = authFactory.getTTransportFactory val tProcFactory = authFactory.getTProcessorFactory(this) - val serverSocket = new TServerSocket(new ServerSocket(portNum, 1, serverAddr)) - portNum = serverSocket.getServerSocket.getLocalPort + val serverSocket = new ServerSocket(portNum, -1, serverAddr) + portNum = serverSocket.getLocalPort + val tServerSocket = new TServerSocket(serverSocket) val maxMessageSize = conf.get(FRONTEND_MAX_MESSAGE_SIZE) val requestTimeout = conf.get(FRONTEND_LOGIN_TIMEOUT).toInt val beBackoffSlotLength = conf.get(FRONTEND_LOGIN_BACKOFF_SLOT_LENGTH).toInt - val args = new TThreadPoolServer.Args(serverSocket) + val args = new TThreadPoolServer.Args(tServerSocket) .processorFactory(tProcFactory) .transportFactory(transFactory) .protocolFactory(new TBinaryProtocol.Factory) @@ -498,28 +500,25 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } class FeTServerEventHandler extends TServerEventHandler { + implicit def toFeServiceServerContext(context: ServerContext): FeServiceServerContext = { + context.asInstanceOf[FeServiceServerContext] + } + override def deleteContext(context: ServerContext, in: TProtocol, out: TProtocol): Unit = { - context match { - case fc: FeServiceServerContext => - val handle = fc.getSessionHandle - if (handle != null) { - info(s"Session [$handle] disconnected without closing properly, close it now") - try { - be.closeSession(handle) - } catch { - case e: KyuubiSQLException => - warn("Failed closing session", e) - } - } - case _ => + val handle = context.getSessionHandle + if (handle != null) { + info(s"Session [$handle] disconnected without closing properly, close it now") + try { + be.closeSession(handle) + } catch { + case e: KyuubiSQLException => + warn("Failed closing session", e) + } } } override def processContext(context: ServerContext, in: TTransport, out: TTransport): Unit = { - context match { - case fc: FeServiceServerContext => CURRENT_SERVER_CONTEXT.set(fc) - case _ => - } + CURRENT_SERVER_CONTEXT.set(context) } override def preServe(): Unit = {} diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala index 2a85e4b16..4c890ebe7 100644 --- a/kyuubi-main/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala +++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala @@ -25,7 +25,7 @@ class KyuubiServerSuite extends KyuubiFunSuite { test("kyuubi server basic") { val server = new KyuubiServer() - val conf = KyuubiConf() + val conf = KyuubiConf().set(KyuubiConf.FRONTEND_BIND_PORT, 0) assert(server.getServices.isEmpty) assert(server.getServiceState === LATENT) server.initialize(conf)