set random port for KyuubiServerSuite
This commit is contained in:
parent
1f52dbd656
commit
5320cee50b
@ -21,6 +21,7 @@ import java.net.{InetAddress, ServerSocket}
|
|||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
import scala.language.implicitConversions
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
import org.apache.hive.service.rpc.thrift._
|
import org.apache.hive.service.rpc.thrift._
|
||||||
@ -72,14 +73,15 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
|
|||||||
authFactory = new KyuubiAuthenticationFactory(conf)
|
authFactory = new KyuubiAuthenticationFactory(conf)
|
||||||
val transFactory = authFactory.getTTransportFactory
|
val transFactory = authFactory.getTTransportFactory
|
||||||
val tProcFactory = authFactory.getTProcessorFactory(this)
|
val tProcFactory = authFactory.getTProcessorFactory(this)
|
||||||
val serverSocket = new TServerSocket(new ServerSocket(portNum, 1, serverAddr))
|
val serverSocket = new ServerSocket(portNum, -1, serverAddr)
|
||||||
portNum = serverSocket.getServerSocket.getLocalPort
|
portNum = serverSocket.getLocalPort
|
||||||
|
val tServerSocket = new TServerSocket(serverSocket)
|
||||||
|
|
||||||
val maxMessageSize = conf.get(FRONTEND_MAX_MESSAGE_SIZE)
|
val maxMessageSize = conf.get(FRONTEND_MAX_MESSAGE_SIZE)
|
||||||
val requestTimeout = conf.get(FRONTEND_LOGIN_TIMEOUT).toInt
|
val requestTimeout = conf.get(FRONTEND_LOGIN_TIMEOUT).toInt
|
||||||
val beBackoffSlotLength = conf.get(FRONTEND_LOGIN_BACKOFF_SLOT_LENGTH).toInt
|
val beBackoffSlotLength = conf.get(FRONTEND_LOGIN_BACKOFF_SLOT_LENGTH).toInt
|
||||||
|
|
||||||
val args = new TThreadPoolServer.Args(serverSocket)
|
val args = new TThreadPoolServer.Args(tServerSocket)
|
||||||
.processorFactory(tProcFactory)
|
.processorFactory(tProcFactory)
|
||||||
.transportFactory(transFactory)
|
.transportFactory(transFactory)
|
||||||
.protocolFactory(new TBinaryProtocol.Factory)
|
.protocolFactory(new TBinaryProtocol.Factory)
|
||||||
@ -498,28 +500,25 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
|
|||||||
}
|
}
|
||||||
|
|
||||||
class FeTServerEventHandler extends TServerEventHandler {
|
class FeTServerEventHandler extends TServerEventHandler {
|
||||||
|
implicit def toFeServiceServerContext(context: ServerContext): FeServiceServerContext = {
|
||||||
|
context.asInstanceOf[FeServiceServerContext]
|
||||||
|
}
|
||||||
|
|
||||||
override def deleteContext(context: ServerContext, in: TProtocol, out: TProtocol): Unit = {
|
override def deleteContext(context: ServerContext, in: TProtocol, out: TProtocol): Unit = {
|
||||||
context match {
|
val handle = context.getSessionHandle
|
||||||
case fc: FeServiceServerContext =>
|
if (handle != null) {
|
||||||
val handle = fc.getSessionHandle
|
info(s"Session [$handle] disconnected without closing properly, close it now")
|
||||||
if (handle != null) {
|
try {
|
||||||
info(s"Session [$handle] disconnected without closing properly, close it now")
|
be.closeSession(handle)
|
||||||
try {
|
} catch {
|
||||||
be.closeSession(handle)
|
case e: KyuubiSQLException =>
|
||||||
} catch {
|
warn("Failed closing session", e)
|
||||||
case e: KyuubiSQLException =>
|
}
|
||||||
warn("Failed closing session", e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case _ =>
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def processContext(context: ServerContext, in: TTransport, out: TTransport): Unit = {
|
override def processContext(context: ServerContext, in: TTransport, out: TTransport): Unit = {
|
||||||
context match {
|
CURRENT_SERVER_CONTEXT.set(context)
|
||||||
case fc: FeServiceServerContext => CURRENT_SERVER_CONTEXT.set(fc)
|
|
||||||
case _ =>
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override def preServe(): Unit = {}
|
override def preServe(): Unit = {}
|
||||||
|
|||||||
@ -25,7 +25,7 @@ class KyuubiServerSuite extends KyuubiFunSuite {
|
|||||||
|
|
||||||
test("kyuubi server basic") {
|
test("kyuubi server basic") {
|
||||||
val server = new KyuubiServer()
|
val server = new KyuubiServer()
|
||||||
val conf = KyuubiConf()
|
val conf = KyuubiConf().set(KyuubiConf.FRONTEND_BIND_PORT, 0)
|
||||||
assert(server.getServices.isEmpty)
|
assert(server.getServices.isEmpty)
|
||||||
assert(server.getServiceState === LATENT)
|
assert(server.getServiceState === LATENT)
|
||||||
server.initialize(conf)
|
server.initialize(conf)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user