From 48ef2f0401132088f16cf4df83eb0b62e4521fe8 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 25 Oct 2018 10:29:20 +0800 Subject: [PATCH 1/3] fix #109 add meaningful msg for front service port confliction --- .../kyuubi/cli/HandleIdentifier.scala | 16 +- .../kyuubi/server/FrontendService.scala | 6 +- .../kyuubi/service/AbstractService.scala | 2 +- .../kyuubi/spark/SparkSessionWithUGI.scala | 1 - .../kyuubi/cli/HandleIdentifierSuite.scala | 2 +- .../kyuubi/server/FrontendServiceSuite.scala | 180 ++++++++++++++++++ 6 files changed, 188 insertions(+), 19 deletions(-) create mode 100644 kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/cli/HandleIdentifier.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/cli/HandleIdentifier.scala index c97d83a60..f496d848e 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/cli/HandleIdentifier.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/cli/HandleIdentifier.scala @@ -22,23 +22,13 @@ import java.util.UUID import org.apache.hive.service.cli.thrift.THandleIdentifier -case class HandleIdentifier(publicId: UUID, secretId: UUID) { +class HandleIdentifier(val publicId: UUID, val secretId: UUID) { def this() = this(UUID.randomUUID(), UUID.randomUUID()) def this(guid: ByteBuffer, secret: ByteBuffer) = - this( - if (guid == null) { - UUID.randomUUID() - } else { - new UUID(guid.getLong(), guid.getLong()) - }, - if (secret == null) { - UUID.randomUUID() - } else { - new UUID(secret.getLong(), secret.getLong()) - }) - + this(Option(guid).map(id => new UUID(id.getLong(), id.getLong())).getOrElse(UUID.randomUUID()), + Option(secret).map(id => new UUID(id.getLong(), id.getLong())).getOrElse(UUID.randomUUID())) def this(tHandleId: THandleIdentifier) = this(tHandleId.bufferForGuid(), tHandleId.bufferForSecret()) diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/server/FrontendService.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/server/FrontendService.scala index 1efd1b550..bfcfab201 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/server/FrontendService.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/server/FrontendService.scala @@ -57,7 +57,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe private[this] var serverEventHandler: TServerEventHandler = _ private[this] var currentServerContext: ThreadLocal[ServerContext] = _ - private[this] var server: Option[TServer] = _ + private[this] var server: Option[TServer] = None private[this] var portNum = 0 private[this] var serverIPAddress: InetAddress = _ private[this] var serverSocket: ServerSocket = _ @@ -116,7 +116,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) val serverHost = conf.get(FRONTEND_BIND_HOST.key) try { - if (serverHost != null && !serverHost.isEmpty) { + if (serverHost.nonEmpty) { serverIPAddress = InetAddress.getByName(serverHost) } else { serverIPAddress = InetAddress.getLocalHost @@ -124,7 +124,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe portNum = conf.get(FRONTEND_BIND_PORT.key).toInt serverSocket = new ServerSocket(portNum, 1, serverIPAddress) } catch { - case e: Exception => throw new ServiceException(e) + case e: Exception => throw new ServiceException(e.getMessage + ": " + portNum, e) } portNum = serverSocket.getLocalPort super.init(conf) diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/service/AbstractService.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/service/AbstractService.scala index 509f74e02..3ec36ad53 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/service/AbstractService.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/service/AbstractService.scala @@ -98,7 +98,7 @@ abstract class AbstractService(name: String) extends Service with Logging { if (state ne currentState) { throw new IllegalStateException( s""" - |For this operation, the current service state must be $state instead of $currentState + |For this operation, the current service state must be $currentState instead of $state """.stripMargin) } } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala index 14aa7c39c..4e74f8df6 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala @@ -17,7 +17,6 @@ package yaooqinn.kyuubi.spark -import java.lang.reflect.UndeclaredThrowableException import java.security.PrivilegedExceptionAction import java.util.concurrent.TimeUnit diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/cli/HandleIdentifierSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/cli/HandleIdentifierSuite.scala index 72d8eaae3..673d775ef 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/cli/HandleIdentifierSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/cli/HandleIdentifierSuite.scala @@ -28,7 +28,7 @@ class HandleIdentifierSuite extends SparkFunSuite { // default constructor val pid = UUID.randomUUID() val sid = UUID.randomUUID() - val handleId1 = HandleIdentifier(pid, sid) + val handleId1 = new HandleIdentifier(pid, sid) assert(pid === handleId1.getPublicId) assert(sid === handleId1.getSecretId) assert(handleId1.toTHandleIdentifier.bufferForGuid() !== null) diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala new file mode 100644 index 000000000..b68c483cc --- /dev/null +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package yaooqinn.kyuubi.server + +import java.net.InetAddress + +import org.apache.hive.service.cli.thrift._ +import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite} +import org.apache.spark.KyuubiConf._ +import org.scalatest.Matchers + +import yaooqinn.kyuubi.service.{ServiceException, State} +import yaooqinn.kyuubi.session.SessionHandle + +class FrontendServiceSuite extends SparkFunSuite with Matchers { + + private val beService = new BackendService() + private val sessionHandle = new SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8) + private def tHandle: TSessionHandle = sessionHandle.toTSessionHandle + private val user = KyuubiSparkUtil.getCurrentUserName + private val catalog = "test_catalog" + private val tbl = "test_tbl" + private val schema = "test_schema" + private val col = "test_col" + private val conf = new SparkConf(loadDefaults = true).setAppName("fe test") + KyuubiSparkUtil.setupCommonConfig(conf) + conf.remove(KyuubiSparkUtil.CATALOG_IMPL) + conf.setMaster("local").set(FRONTEND_BIND_PORT.key, "0") + + test(" test new fe service") { + val feService = new FrontendService(beService) + feService.getConf should be(null) + feService.getStartTime should be(0) + feService.getServiceState should be(State.NOT_INITED) + feService.getName should be(classOf[FrontendService].getSimpleName) + feService.getServerIPAddress should be(null) + feService.getPortNumber should be(0) + val catalogsResp = feService.GetCatalogs(new TGetCatalogsReq(tHandle)) + catalogsResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) + catalogsResp.getStatus.getErrorMessage should be("Method Not Implemented!") + val columnsReq = new TGetColumnsReq(tHandle) + columnsReq.setCatalogName(catalog) + columnsReq.setSchemaName(schema) + columnsReq.setTableName(tbl) + columnsReq.setColumnName(col) + val columnsResp = feService.GetColumns(columnsReq) + columnsResp.getStatus.getErrorMessage should be("Method Not Implemented!") + val getDelegationTokenResp = + feService.GetDelegationToken(new TGetDelegationTokenReq(tHandle, user, user)) + getDelegationTokenResp.getStatus.getErrorMessage should be("Delegation token is not supported") + } + + test("test init fe service") { + val feService = new FrontendService(beService) + feService.init(conf) + feService.getConf should be(conf) + feService.getServiceState should be(State.INITED) + feService.getPortNumber should not be 0 + val conf1 = new SparkConf(loadDefaults = true) + .set(FRONTEND_BIND_HOST.key, "") + .set(FRONTEND_BIND_PORT.key, "10009") + val feService2 = new FrontendService(beService) + feService2.init(conf1) + feService2.getServerIPAddress should be(InetAddress.getLocalHost) + intercept[ServiceException]( + feService2.init(conf1)).getMessage should be("Address already in use: 10009") + } + + test("start fe service") { + val feService = new FrontendService(beService) + intercept[IllegalStateException](feService.start()) + feService.init(conf) + feService.start() + feService.getConf should be(conf) + feService.getStartTime should not be 0 + feService.getServiceState should be(State.STARTED) + } + + test("stop fe service") { + val feService = new FrontendService(beService) + feService.stop() + feService.getServiceState should be(State.NOT_INITED) + feService.init(conf) + feService.stop() + feService.getServiceState should be(State.INITED) + feService.start() + feService.stop() + feService.getServiceState should be(State.STOPPED) + } + + test("get catalogs") { + val feService = new FrontendService(beService) + val req = new TGetCatalogsReq(tHandle) + val resp = feService.GetCatalogs(req) + resp.getStatus.getErrorMessage should be("Method Not Implemented!") + } + + test("get schemas") { + val feService = new FrontendService(beService) + val schemasReq = new TGetSchemasReq(tHandle) + schemasReq.setCatalogName(catalog) + schemasReq.setSchemaName(schema) + val resp = feService.GetSchemas(schemasReq) + resp.getStatus.getErrorMessage should be("Method Not Implemented!") + } + + test("get tables") { + val feService = new FrontendService(beService) + val req = new TGetTablesReq(tHandle) + req.setCatalogName(catalog) + req.setSchemaName(schema) + req.setTableName(tbl) + val resp = feService.GetTables(req) + resp.getStatus.getErrorMessage should be("Method Not Implemented!") + } + + test("get columns") { + val feService = new FrontendService(beService) + val req = new TGetColumnsReq(tHandle) + req.setCatalogName(catalog) + req.setSchemaName(schema) + req.setTableName(tbl) + req.setColumnName(col) + val resp = feService.GetColumns(req) + resp.getStatus.getErrorMessage should be("Method Not Implemented!") + } + + test("get type info") { + val feService = new FrontendService(beService) + val req = new TGetTypeInfoReq(tHandle) + val resp = feService.GetTypeInfo(req) + resp.getStatus.getErrorMessage should be("Method Not Implemented!") + } + + test("get port num") { + val feService = new FrontendService(beService) + feService.getPortNumber should be(0) + feService.init(conf) + feService.getPortNumber should not be 0 + } + + test("get server ip addr") { + val feService = new FrontendService(beService) + feService.getServerIPAddress should be(null) + feService.init(conf) + feService.getServerIPAddress should not be null + } + + test("fe service server context") { + val feService = new FrontendService(beService) + val context = new feService.FeServiceServerContext() + context.setSessionHandle(sessionHandle) + context.getSessionHandle should be(sessionHandle) + } + + test("fe tserver event handler") { + val feService = new FrontendService(beService) + val handler = new feService.FeTServerEventHandler + val context = new feService.FeServiceServerContext() + context.setSessionHandle(sessionHandle) + handler.createContext(null, null) + handler.processContext(context, null, null) + handler.deleteContext(context, null, null) + } +} From f297be1214ca3410ad2220af7d48a72d231c79e9 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 25 Oct 2018 10:43:18 +0800 Subject: [PATCH 2/3] fix travis ut --- .../scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala index b68c483cc..b29ddc8c6 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala @@ -65,7 +65,7 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers { getDelegationTokenResp.getStatus.getErrorMessage should be("Delegation token is not supported") } - test("test init fe service") { + test("init fe service") { val feService = new FrontendService(beService) feService.init(conf) feService.getConf should be(conf) @@ -78,7 +78,7 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers { feService2.init(conf1) feService2.getServerIPAddress should be(InetAddress.getLocalHost) intercept[ServiceException]( - feService2.init(conf1)).getMessage should be("Address already in use: 10009") + feService2.init(conf1)).getMessage should include("10009") } test("start fe service") { From c1a39552a73b54bb1053fac86ba807c980a582eb Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 25 Oct 2018 11:26:41 +0800 Subject: [PATCH 3/3] add ut --- .../src/main/scala/yaooqinn/kyuubi/cli/Handle.scala | 2 +- .../src/test/scala/yaooqinn/kyuubi/cli/HandleSuite.scala | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/cli/Handle.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/cli/Handle.scala index b1b2431e3..d954dfc61 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/cli/Handle.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/cli/Handle.scala @@ -44,7 +44,7 @@ abstract class Handle(val handleId: HandleIdentifier) { if (other.handleId != null) { return false } - } else if (!(handleId == other.handleId)) { + } else if (handleId != other.handleId) { return false } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/cli/HandleSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/cli/HandleSuite.scala index 0da7c31f2..762a52172 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/cli/HandleSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/cli/HandleSuite.scala @@ -27,6 +27,14 @@ class HandleSuite extends SparkFunSuite { val handle3 = TestHandle3(handle2.handleId.toTHandleIdentifier) assert(handle1 === handle2) assert(handle1 === handle3) + val handle4 = TestHandle2(null) + assert(handle4.hashCode === 31) + assert(!handle4.equals(null)) + assert(!handle4.equals(new Object)) + assert(!handle2.equals(handle4)) + assert(handle4.equals(handle4)) + assert(!handle4.equals(handle2)) + assert(handle2.equals(handle2)) } }