diff --git a/.travis.yml b/.travis.yml index 8e7049a46..c20ee207c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -57,7 +57,7 @@ install: # - build/mvn --no-transfer-progress clean verify -pl :kyuubi-common,:kyuubi-ha,:kyuubi-main,:kyuubi-spark-sql-engine,:kyuubi-codecov,:kyuubi-download,:kyuubi-assembly -Dmaven.javadoc.skip=true -B -V script: - - mvn clean verify -pl '!:kyuubi-server,!:kyuubi-thrift' -Dmaven.javadoc.skip=true -B -V + - build/mvn clean verify -pl '!:kyuubi-server,!:kyuubi-thrift' -Dmaven.javadoc.skip=true -B -V after_script: - rm -r $HOME/.m2/repository/org/apache/kyuubi diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala index 050826cfe..676fe97f3 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala @@ -47,12 +47,6 @@ abstract class AbstractBackendService(name: String) sessionManager.getSession(sessionHandle).getInfo(infoType) } - override def executeStatement( - sessionHandle: SessionHandle, - statement: String): OperationHandle = { - sessionManager.getSession(sessionHandle).executeStatement(statement) - } - override def executeStatement( sessionHandle: SessionHandle, statement: String, @@ -60,12 +54,6 @@ abstract class AbstractBackendService(name: String) sessionManager.getSession(sessionHandle).executeStatement(statement, queryTimeout) } - override def executeStatementAsync( - sessionHandle: SessionHandle, - statement: String): OperationHandle = { - sessionManager.getSession(sessionHandle).executeStatementAsync(statement) - } - override def executeStatementAsync( sessionHandle: SessionHandle, statement: String, diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractService.scala index d07f9fcf3..4403a32cc 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractService.scala @@ -63,6 +63,7 @@ abstract class AbstractService(serviceName: String) extends Service with Logging override def stop(): Unit = { state match { case LATENT | INITIALIZED | STOPPED => + warn(s"Service[$serviceName] is not started($state) yet.") case _ => ensureCurrentState(STARTED) changeState(STOPPED) @@ -112,9 +113,7 @@ abstract class AbstractService(serviceName: String) extends Service with Logging private def ensureCurrentState(currentState: ServiceState): Unit = { if (state ne currentState) { throw new IllegalStateException( - s""" - |For this operation, the current service state must be $currentState instead of $state - """.stripMargin) + s"For this operation, the current service state must be $currentState instead of $state") } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala index 1e8563c27..662a7bb2f 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala @@ -46,16 +46,10 @@ trait BackendService { def getInfo(sessionHandle: SessionHandle, infoType: TGetInfoType): TGetInfoValue - def executeStatement( - sessionHandle: SessionHandle, - statement: String): OperationHandle def executeStatement( sessionHandle: SessionHandle, statement: String, queryTimeout: Long): OperationHandle - def executeStatementAsync( - sessionHandle: SessionHandle, - statement: String): OperationHandle def executeStatementAsync( sessionHandle: SessionHandle, statement: String, diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/HadoopThriftAuthBridgeServer.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/HadoopThriftAuthBridgeServer.scala index 081c8c945..4e73bc2c1 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/HadoopThriftAuthBridgeServer.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/HadoopThriftAuthBridgeServer.scala @@ -59,7 +59,7 @@ class HadoopThriftAuthBridgeServer(secretMgr: KyuubiDelegationTokenManager) { null, SaslRpcServer.SASL_DEFAULT_REALM, saslProps, - SaslDigestCallbackHandler(secretMgr)) + new SaslDigestCallbackHandler(secretMgr)) factory } @@ -69,7 +69,7 @@ class HadoopThriftAuthBridgeServer(secretMgr: KyuubiDelegationTokenManager) { * the SASL transport. */ def wrapTransportFactory(transFactory: TTransportFactory): TTransportFactory = { - TUGIAssumingTransportFactory(ugi, transFactory) + new TUGIAssumingTransportFactory(ugi, transFactory) } /** @@ -78,14 +78,14 @@ class HadoopThriftAuthBridgeServer(secretMgr: KyuubiDelegationTokenManager) { * the SASL transport. */ def wrapProcessor(processor: TProcessor): TProcessor = { - TUGIAssumingProcessor(processor, secretMgr, userProxy = true) + new TUGIAssumingProcessor(processor, secretMgr, userProxy = true) } /** * Wrap a TProcessor to capture the client information like connecting userid, ip etc */ def wrapNonAssumingProcessor(processor: TProcessor): TProcessor = { - TUGIAssumingProcessor(processor, secretMgr, userProxy = false) + new TUGIAssumingProcessor(processor, secretMgr, userProxy = false) } def getRemoteAddress: InetAddress = REMOTE_ADDRESS.get @@ -119,7 +119,7 @@ object HadoopThriftAuthBridgeServer { * This is used on the server side to assume the server's Principal when accepting * clients. */ - case class TUGIAssumingTransportFactory( + class TUGIAssumingTransportFactory( ugi: UserGroupInformation, wrapped: TTransportFactory) extends TTransportFactory { @@ -139,7 +139,7 @@ object HadoopThriftAuthBridgeServer { * * This is used on the server side to set the UGI for each specific call. */ - case class TUGIAssumingProcessor( + class TUGIAssumingProcessor( wrapped: TProcessor, secretMgr: KyuubiDelegationTokenManager, userProxy: Boolean) extends TProcessor with Logging { @@ -204,7 +204,7 @@ object HadoopThriftAuthBridgeServer { /** * From Apache Hive */ - case class SaslDigestCallbackHandler(secretMgr: KyuubiDelegationTokenManager) + class SaslDigestCallbackHandler(secretMgr: KyuubiDelegationTokenManager) extends CallbackHandler with Logging { def getPasswd(identifer: KyuubiDelegationTokenIdentifier): Array[Char] = { diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationHandleSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationHandleSuite.scala index 795149b78..9b3e25f90 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationHandleSuite.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationHandleSuite.scala @@ -21,6 +21,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.hive.service.rpc.thrift.TProtocolVersion._ import org.apache.kyuubi.KyuubiFunSuite +import org.apache.kyuubi.cli.HandleIdentifier import org.apache.kyuubi.operation.OperationType._ class OperationHandleSuite extends KyuubiFunSuite { @@ -40,5 +41,11 @@ class OperationHandleSuite extends KyuubiFunSuite { assert(!t1.isHasResultSet) h1.setHasResultSet(true) assert(h1.toTOperationHandle.isHasResultSet) + assert(h1 !== null) + assert(h1 !== new Integer(1)) + val h3 = OperationHandle(h1.identifier, GET_CATALOGS, HIVE_CLI_SERVICE_PROTOCOL_V10) + assert(h3 !== h1, "different types") + val h4 = OperationHandle(HandleIdentifier(), h1.typ, HIVE_CLI_SERVICE_PROTOCOL_V10) + assert(h4 !== h1) } } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/auth/KyuubiAuthFactorySuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/auth/KyuubiAuthFactorySuite.scala deleted file mode 100644 index e359e2e28..000000000 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/auth/KyuubiAuthFactorySuite.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package yaooqinn.kyuubi.auth - -import javax.security.auth.login.LoginException - -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge -import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.security.authorize.AuthorizationException -import org.apache.kyuubi.KyuubiSQLException -import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf, SparkFunSuite} -import yaooqinn.kyuubi.service.ServiceException -import yaooqinn.kyuubi.utils.ReflectUtils - -import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory - -class KyuubiAuthFactorySuite extends SparkFunSuite { - - test("testVerifyProxyAccess") { - val conf = new SparkConf(true) - val hadoopConf = KyuubiSparkUtil.newConfiguration(conf) - val user = UserGroupInformation.getCurrentUser.getShortUserName - KyuubiAuthenticationFactory.verifyProxyAccess(user, user, "localhost", hadoopConf) - val e = intercept[KyuubiSQLException]( - KyuubiAuthenticationFactory.verifyProxyAccess(user, "proxy-user", "localhost", hadoopConf)) - val msg = "Failed to validate proxy privilege" - assert(e.getMessage.contains(msg)) - assert(e.getCause.isInstanceOf[AuthorizationException]) - hadoopConf.set(s"hadoop.proxyuser.$user.groups", "*") - val e2 = intercept[KyuubiSQLException]( - KyuubiAuthenticationFactory.verifyProxyAccess(user, "proxy-user", "localhost", hadoopConf)) - assert(e2.getMessage.contains(msg)) - assert(e2.getCause.getMessage.contains("Unauthorized connection for super-user")) - hadoopConf.set(s"hadoop.proxyuser.$user.hosts", "*") - KyuubiAuthenticationFactory.verifyProxyAccess(user, "proxy-user", "localhost", hadoopConf) - } - - test("test HS2_PROXY_USER") { - assert(KyuubiAuthFactory.HS2_PROXY_USER === "hive.server2.proxy.user") - } - - test("AuthType NONE") { - val conf = new SparkConf(true) - KyuubiSparkUtil.setupCommonConfig(conf) - val auth = new KyuubiAuthFactory(conf) - val saslServer = ReflectUtils.getFieldValue(auth, "saslServer") - assert(saslServer === None) - assert(auth.getRemoteUser === None) - assert(auth.getIpAddress === None) - val user = UserGroupInformation.getCurrentUser.getShortUserName - val e = intercept[KyuubiSQLException](auth.getDelegationToken(user, user)) - assert(e.getMessage.contains("Delegation token only supported over kerberos authentication")) - val e1 = intercept[KyuubiSQLException](auth.cancelDelegationToken("")) - assert(e1.getMessage.contains("Delegation token only supported over kerberos authentication")) - val e2 = intercept[KyuubiSQLException](auth.renewDelegationToken("")) - assert(e2.getMessage.contains("Delegation token only supported over kerberos authentication")) - } - - test("AuthType Other") { - val conf = new SparkConf(true).set(KyuubiConf.AUTHENTICATION_METHOD.key, "other") - KyuubiSparkUtil.setupCommonConfig(conf) - val e = intercept[ServiceException](new KyuubiAuthFactory(conf)) - assert(e.getMessage === "Unsupported authentication method: OTHER") - } - - test("AuthType LDAP") { - val conf = new SparkConf(true).set(KyuubiConf.AUTHENTICATION_METHOD.key, "LDAP") - KyuubiSparkUtil.setupCommonConfig(conf) - val authFactory = new KyuubiAuthFactory(conf) - assert(authFactory.getIpAddress.isEmpty) - } - - test("AuthType KERBEROS without keytab/principal") { - val conf = new SparkConf(true).set(KyuubiConf.AUTHENTICATION_METHOD.key, "KERBEROS") - KyuubiSparkUtil.setupCommonConfig(conf) - val e = intercept[ServiceException](new KyuubiAuthFactory(conf)) - assert(e.getMessage === "spark.yarn.keytab and spark.yarn.principal are not configured " + - "properly for KERBEROS Authentication method") - } - - test("AuthType KERBEROS with keytab/principal ioe") { - val conf = new SparkConf(true) - .set(KyuubiConf.AUTHENTICATION_METHOD.key, "KERBEROS") - .set(KyuubiSparkUtil.KEYTAB, "kent.keytab") - .set(KyuubiSparkUtil.PRINCIPAL, "kent") - KyuubiSparkUtil.setupCommonConfig(conf) - val auth = new KyuubiAuthFactory(conf) - val saslServer = ReflectUtils.getFieldValue(auth, "saslServer") - saslServer match { - case Some(server) => - assert(server.isInstanceOf[HadoopThriftAuthBridge.Server]) - intercept[LoginException](auth.getAuthTransFactory) - case None => assert(false, "server could not be none") - } - - val user = UserGroupInformation.getCurrentUser.getShortUserName - val e = intercept[KyuubiSQLException](auth.getDelegationToken(user, user)) - assert(e.getMessage.contains(s"Error retrieving delegation token for user $user")) - val e1 = intercept[KyuubiSQLException](auth.cancelDelegationToken("")) - assert(e1.getMessage.contains("Error canceling delegation token")) - val e2 = intercept[KyuubiSQLException](auth.renewDelegationToken("")) - assert(e2.getMessage.contains("Error renewing delegation token")) - } - -} diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/OperationHandleSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/OperationHandleSuite.scala deleted file mode 100644 index 9249bf1d4..000000000 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/OperationHandleSuite.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package yaooqinn.kyuubi.operation - -import org.apache.hive.service.cli.thrift.{TOperationType, TProtocolVersion} -import org.apache.spark.SparkFunSuite - -import yaooqinn.kyuubi.cli.HandleIdentifier - -class OperationHandleSuite extends SparkFunSuite { - -import TProtocolVersion._ - - test("operation handle basic tests") { - val handle1 = new OperationHandle(EXECUTE_STATEMENT, HIVE_CLI_SERVICE_PROTOCOL_V8) - assert(!handle1.isHasResultSet) - handle1.setHasResultSet(true) - assert(handle1.isHasResultSet) - assert(handle1.toTOperationHandle.isHasResultSet) - assert(handle1.getOperationType === EXECUTE_STATEMENT) - assert(handle1.getProtocolVersion === HIVE_CLI_SERVICE_PROTOCOL_V8) - - val handle2 = new OperationHandle( - handle1.toTOperationHandle, HIVE_CLI_SERVICE_PROTOCOL_V8) - assert(handle2.isHasResultSet) - assert(handle2.toTOperationHandle.isHasResultSet) - assert(handle2.getOperationType === EXECUTE_STATEMENT) - assert(handle2.getProtocolVersion === HIVE_CLI_SERVICE_PROTOCOL_V8) - assert(handle1 equals handle2) - - val handle3 = new OperationHandle(handle2.toTOperationHandle) - assert(handle3.isHasResultSet) - assert(handle3.toTOperationHandle.isHasResultSet) - assert(handle3.getOperationType === EXECUTE_STATEMENT) - assert(handle3.getProtocolVersion === HIVE_CLI_SERVICE_PROTOCOL_V1) - assert(handle3 === handle2) - - assert(handle1.toTOperationHandle.getOperationType === TOperationType.EXECUTE_STATEMENT) - assert(handle1.toString === handle3.toString) - assert(handle1.toTOperationHandle.getOperationId === - handle1.getHandleIdentifier.toTHandleIdentifier) - } - - test("operation handle to string") { - val opType = EXECUTE_STATEMENT - val protocol = HIVE_CLI_SERVICE_PROTOCOL_V8 - val handle = new OperationHandle(opType, protocol) - val hStr = handle.toString - assert(hStr.startsWith(classOf[OperationHandle].getSimpleName)) - assert(hStr.contains(opType.toString)) - assert(hStr.contains(handle.getOperationType.toString)) - assert(hStr.contains(handle.getHandleIdentifier.toString)) - } - - test("operation handle equals") { - val opType = EXECUTE_STATEMENT - val protocol = HIVE_CLI_SERVICE_PROTOCOL_V8 - val handle = new OperationHandle(opType, protocol) - assert(!handle.equals(new Object())) - assert(handle.equals(handle)) - val handle2 = new OperationHandle(opType, protocol) - assert(!handle.equals(handle2)) - val handle3 = new OperationHandle(handle.toTOperationHandle) - assert(handle.equals(handle3)) - val handle4 = new OperationHandle(handle.toTOperationHandle, - HIVE_CLI_SERVICE_PROTOCOL_V7) - assert(handle.equals(handle4)) - val ctor = classOf[OperationHandle].getDeclaredConstructor( - classOf[OperationType], classOf[TProtocolVersion], classOf[HandleIdentifier]) - ctor.setAccessible(true) - val handle5 = ctor.newInstance(GET_TYPE_INFO, protocol, handle.getHandleIdentifier) - assert(handle5.isInstanceOf[OperationHandle]) - assert(handle5.getOperationType !== handle.getOperationType) - assert(handle5.getHandleIdentifier === handle.getHandleIdentifier) - assert(handle5.getProtocolVersion === handle.getProtocolVersion) - assert(!handle.equals(handle5)) - } - - test("operation handle hash code") { - val opType = EXECUTE_STATEMENT - val protocol = HIVE_CLI_SERVICE_PROTOCOL_V8 - val handle = new OperationHandle(opType, protocol) - val prime = 31 - assert(handle.hashCode === - prime * (prime * 1 + handle.getHandleIdentifier.hashCode) + opType.hashCode()) - } - - test("operation handle get protocol version") { - val opType = EXECUTE_STATEMENT - val protocol = HIVE_CLI_SERVICE_PROTOCOL_V8 - val handle = new OperationHandle(opType, protocol) - assert(handle.getProtocolVersion === protocol) - assert(new OperationHandle(handle.toTOperationHandle).getProtocolVersion === - HIVE_CLI_SERVICE_PROTOCOL_V1) - assert(new OperationHandle(handle.toTOperationHandle, - HIVE_CLI_SERVICE_PROTOCOL_V1).getProtocolVersion === HIVE_CLI_SERVICE_PROTOCOL_V1) - } - - test("operation handle has result set") { - val opType = EXECUTE_STATEMENT - val protocol = HIVE_CLI_SERVICE_PROTOCOL_V8 - val handle = new OperationHandle(opType, protocol) - assert(!handle.isHasResultSet) - handle.setHasResultSet(false) - assert(!handle.isHasResultSet) - handle.setHasResultSet(true) - assert(handle.isHasResultSet) - } - - test("operation handle to tOperationType") { - val opType = EXECUTE_STATEMENT - val protocol = HIVE_CLI_SERVICE_PROTOCOL_V8 - val handle = new OperationHandle(opType, protocol) - assert(!handle.toTOperationHandle.isHasResultSet) - handle.setHasResultSet(true) - assert(handle.toTOperationHandle.isHasResultSet) - } -} diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/OperationTypeSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/OperationTypeSuite.scala deleted file mode 100644 index 11749cfb5..000000000 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/OperationTypeSuite.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package yaooqinn.kyuubi.operation - -import org.apache.hive.service.cli.thrift.TOperationType -import org.apache.spark.SparkFunSuite - -class OperationTypeSuite extends SparkFunSuite { - - test("EXECUTE_STATEMENT") { - val tOpType = TOperationType.EXECUTE_STATEMENT - assert(OperationType.getOperationType(tOpType) === EXECUTE_STATEMENT) - assert(OperationType.getOperationType(tOpType).toTOperationType === tOpType) - assert(EXECUTE_STATEMENT.toTOperationType === tOpType) - } - - test("GET_CATALOGS") { - val tOpType = TOperationType.GET_CATALOGS - assert(OperationType.getOperationType(tOpType) === GET_CATALOGS) - assert(OperationType.getOperationType(tOpType).toTOperationType === tOpType) - assert(GET_CATALOGS.toTOperationType === tOpType) - } - - test("GET_TYPE_INFO") { - val tOpType = TOperationType.GET_TYPE_INFO - assert(OperationType.getOperationType(tOpType) === GET_TYPE_INFO) - assert(OperationType.getOperationType(tOpType).toTOperationType === tOpType) - assert(GET_TYPE_INFO.toTOperationType === tOpType) - } - - test("GET_SCHEMAS") { - val tOpType = TOperationType.GET_SCHEMAS - assert(OperationType.getOperationType(tOpType) === GET_SCHEMAS) - assert(OperationType.getOperationType(tOpType).toTOperationType === tOpType) - assert(GET_SCHEMAS.toTOperationType === tOpType) - } - - test("GET_TABLES") { - val tOpType = TOperationType.GET_TABLES - assert(OperationType.getOperationType(tOpType) === GET_TABLES) - assert(OperationType.getOperationType(tOpType).toTOperationType === tOpType) - assert(GET_TABLES.toTOperationType === tOpType) - } - - test("GET_TABLE_TYPES") { - val tOpType = TOperationType.GET_TABLE_TYPES - assert(OperationType.getOperationType(tOpType) === GET_TABLE_TYPES) - assert(OperationType.getOperationType(tOpType).toTOperationType === tOpType) - assert(GET_TABLE_TYPES.toTOperationType === tOpType) - } - - test("GET_COLUMNS") { - val tOpType = TOperationType.GET_COLUMNS - assert(OperationType.getOperationType(tOpType) === GET_COLUMNS) - assert(OperationType.getOperationType(tOpType).toTOperationType === tOpType) - assert(GET_COLUMNS.toTOperationType === tOpType) - } - - test("GET_FUNCTIONS") { - val tOpType = TOperationType.GET_FUNCTIONS - assert(OperationType.getOperationType(tOpType) === GET_FUNCTIONS) - assert(OperationType.getOperationType(tOpType).toTOperationType === tOpType) - assert(GET_FUNCTIONS.toTOperationType === tOpType) - } - - test("UNKNOWN_OPERATION") { - val tOpType = TOperationType.UNKNOWN - assert(OperationType.getOperationType(tOpType) === UNKNOWN_OPERATION) - assert(OperationType.getOperationType(tOpType).toTOperationType === tOpType) - assert(UNKNOWN_OPERATION.toTOperationType === tOpType) - val unknownOperation = new OperationType { - override def toTOperationType: TOperationType = TOperationType.findByValue(9) - } - assert(unknownOperation.toTOperationType === null) - assert(OperationType.getOperationType(TOperationType.findByValue(9)) === UNKNOWN_OPERATION) - } -} diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala deleted file mode 100644 index 88574c464..000000000 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala +++ /dev/null @@ -1,481 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package yaooqinn.kyuubi.server - -import java.net.InetAddress - -import scala.collection.JavaConverters._ - -import org.apache.hive.service.cli.thrift._ -import org.apache.kyuubi.KyuubiSQLException -import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf, SparkFunSuite} -import org.apache.spark.KyuubiConf._ -import org.scalatest.Matchers -import yaooqinn.kyuubi.SecuredFunSuite -import yaooqinn.kyuubi.metrics.MetricsSystem -import yaooqinn.kyuubi.operation.OperationHandle -import yaooqinn.kyuubi.service.{ServiceException, State} -import yaooqinn.kyuubi.session.SessionHandle - -class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSuite { - - private val server: KyuubiServer = new KyuubiServer() - private val user = KyuubiSparkUtil.getCurrentUserName - private val conf = new SparkConf(loadDefaults = true).setAppName("fe test") - KyuubiSparkUtil.setupCommonConfig(conf) - conf.remove(KyuubiSparkUtil.CATALOG_IMPL) - conf.setMaster("local").set(FRONTEND_BIND_PORT.key, "0") - - server.init(conf) - server.start() - - override def afterAll(): Unit = { - Option(server).foreach(_.stop()) - super.afterAll() - } - - test("get port num") { - val feService = new FrontendService(server.beService) - feService.getPortNumber should be(0) - feService.init(conf) - feService.getPortNumber should not be 0 - } - - test("get server ip addr") { - val feService = new FrontendService(server.beService) - feService.getServerIPAddress should be(null) - feService.init(conf) - feService.getServerIPAddress should not be null - } - - - test("fe tserver event handler") { - withFEServiceAndHandle { case (fe, handle) => - val handler = new fe.FeTServerEventHandler - val context = new fe.FeServiceServerContext() - context.setSessionHandle(new SessionHandle(handle)) - handler.createContext(null, null) - handler.processContext(context, null, null) - handler.deleteContext(context, null, null) - } - } - - test("fe tserver event handler with metrics") { - withFEServiceAndHandle { case (fe, handle) => - MetricsSystem.init(conf) - val handler = new fe.FeTServerEventHandler - val context = new fe.FeServiceServerContext() - context.setSessionHandle(new SessionHandle(handle)) - handler.createContext(null, null) - handler.processContext(context, null, null) - handler.deleteContext(context, null, null) - MetricsSystem.close() - } - } - - test("open session, execute sql and get results") { - - withFEServiceAndHandle { case (fe, handle) => - val req2 = new TExecuteStatementReq(handle, "show databases") - val resp2 = fe.ExecuteStatement(req2) - resp2.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - val req3 = new TGetOperationStatusReq(resp2.getOperationHandle) - val resp3 = fe.GetOperationStatus(req3) - resp3.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - while(fe.GetOperationStatus(req3) - .getOperationState.getValue < TOperationState.FINISHED_STATE.getValue) { - Thread.sleep(10) - } - Thread.sleep(2000) - val req4 = new TFetchResultsReq(resp2.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - val resp4 = fe.FetchResults(req4) - resp4.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - resp4.getResults.getRows.get(0).getColVals.get(0).getStringVal.getValue should be("default") - assert(!resp4.getResults.isSetColumns) - val req5 = new TGetResultSetMetadataReq(resp2.getOperationHandle) - val resp5 = fe.GetResultSetMetadata(req5) - resp5.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - resp5.getSchema.getColumns.get(0).getColumnName should be("databaseName") - val req7 = new TCancelOperationReq(resp2.getOperationHandle) - val resp7 = fe.CancelOperation(req7) - resp7.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - val req6 = new TCloseOperationReq(resp2.getOperationHandle) - val resp6 = fe.CloseOperation(req6) - resp6.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - val req9 = new TCancelOperationReq(resp2.getOperationHandle) - val resp9 = fe.CancelOperation(req9) - resp9.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) - - val reqInfo1 = new TGetInfoReq(handle, TGetInfoType.CLI_DBMS_NAME) - val respInfo1 = fe.GetInfo(reqInfo1) - respInfo1.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - - val reqInfo2 = new TGetInfoReq(handle, TGetInfoType.CLI_ACCESSIBLE_PROCEDURES) - val respInfo2 = fe.GetInfo(reqInfo2) - respInfo2.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) - respInfo2.getStatus.getErrorMessage should - include(TGetInfoType.CLI_ACCESSIBLE_PROCEDURES.toString) - - val req8 = new TCloseSessionReq(handle) - val resp8 = fe.CloseSession(req8) - resp8.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - - // after session closed - val resp10 = fe.CloseSession(req8) - resp10.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) - resp10.getStatus.getErrorMessage should include("does not exist!") - val resp11 = fe.ExecuteStatement(req2) - resp11.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) - fe.GetOperationStatus(req3).getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) - fe.FetchResults(req4).getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) - fe.GetResultSetMetadata(req5) - .getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) - fe.CancelOperation(req7).getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) - fe.CloseOperation(req6).getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) - - val tOpenSessionReq = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1) - tOpenSessionReq.setUsername("yaooqinn") - tOpenSessionReq.setPassword("passwd") - tOpenSessionReq.setConfiguration( - Map("hive.server2.proxy.user" -> "kent").asJava) - val tOpenSessionResp = fe.OpenSession(tOpenSessionReq) - tOpenSessionResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - } - } - - test("execute statement sync") { - withFEServiceAndHandle { case (fe, handle) => - val req = new TExecuteStatementReq(handle, "show databases") - req.setRunAsync(false) - val resp = fe.ExecuteStatement(req) - val req2 = new TGetOperationStatusReq(resp.getOperationHandle) - val statusResp = fe.GetOperationStatus(req2) - statusResp.getOperationState should be(TOperationState.FINISHED_STATE) - val fReq = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - val fRes = fe.FetchResults(fReq) - val rows = fRes.getResults.getRows - rows.get(0).getColVals.get(0).getStringVal.getValue should be("default") - } - } - - test("execute statement async") { - withFEServiceAndHandle { case (fe, handle) => - val req = new TExecuteStatementReq(handle, "show databases") - req.setRunAsync(true) - val resp = fe.ExecuteStatement(req) - val statusReq = new TGetOperationStatusReq(resp.getOperationHandle) - while(fe.GetOperationStatus(statusReq) - .getOperationState.getValue < TOperationState.FINISHED_STATE.getValue) { - Thread.sleep(10) - } - Thread.sleep(2000) - val fReq = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - val fRes = fe.FetchResults(fReq) - val rows = fRes.getResults.getRows - rows.get(0).getColVals.get(0).getStringVal.getValue should be("default") - } - } - - test("alter database") { - val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => { - val req = new TExecuteStatementReq(handle, - "alter database default set dbproperties ('kent'='yao')") - val resp = fe.ExecuteStatement(req) - resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - - val tFetchResultsReq = - new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - Thread.sleep(5000) - val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) - tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - tFetchResultsResp.getResults.getRows.size() should be(0) - } - withFEServiceAndHandle(block) - withFEServiceAndHandleInc(block) - withFEServiceAndHandleIncAndCal(block) - } - - test("alter schema") { - val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => { - val req = new TExecuteStatementReq(handle, - "alter schema default set dbproperties ('kent'='yao')") - val resp = fe.ExecuteStatement(req) - resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - - val tFetchResultsReq = - new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - - Thread.sleep(5000) - val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) - tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - tFetchResultsResp.getResults.getRows.size() should be(0) - } - withFEServiceAndHandle(block) - withFEServiceAndHandleInc(block) - withFEServiceAndHandleIncAndCal(block) - } - - test("alter table name") { - val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => { - val ct = new TExecuteStatementReq(handle, - "create table if not exists default.src(key int) using parquet") - fe.ExecuteStatement(ct) - Thread.sleep(5000) - val req = new TExecuteStatementReq(handle, "alter table default.src rename to default.src2") - val resp = fe.ExecuteStatement(req) - resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - - val tFetchResultsReq = - new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - val dt = new TExecuteStatementReq(handle, "drop table src2") - fe.ExecuteStatement(dt) - Thread.sleep(5000) - val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) - tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - tFetchResultsResp.getResults.getRows.size() should be(0) - } - withFEServiceAndHandle(block) - } - - test("alter table name inc") { - val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => { - val ct = new TExecuteStatementReq(handle, - "create table if not exists default.src3(key int) using parquet") - fe.ExecuteStatement(ct) - Thread.sleep(5000) - val req = new TExecuteStatementReq(handle, "alter table default.src3 rename to default.src4") - val resp = fe.ExecuteStatement(req) - resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - - val tFetchResultsReq = - new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - val dt = new TExecuteStatementReq(handle, "drop table src4") - fe.ExecuteStatement(dt) - Thread.sleep(5000) - val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) - tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - tFetchResultsResp.getResults.getRows.size() should be(0) - } - withFEServiceAndHandleInc(block) - } - - test("alter table set properties") { - val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => { - val ct = new TExecuteStatementReq(handle, "create table default.src(key int) using parquet") - fe.ExecuteStatement(ct) - Thread.sleep(5000) - val req = new TExecuteStatementReq(handle, - "alter table default.src set tblproperties ('kent'='yao')") - val resp = fe.ExecuteStatement(req) - resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - - val tFetchResultsReq = - new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - - Thread.sleep(5000) - val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) - tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - tFetchResultsResp.getResults.getRows.size() should be(0) - } - withFEServiceAndHandle(block) - withFEServiceAndHandleInc(block) - } - - test("alter table unset properties") { - withFEServiceAndHandle { (fe, handle) => - val ct = new TExecuteStatementReq(handle, - "create table default.src(key int) using parquet tblproperties ('kent'='yao')") - fe.ExecuteStatement(ct) - Thread.sleep(5000) - val req = new TExecuteStatementReq(handle, - "alter table default.src unset tblproperties if exists ('kent', 'yao')") - val resp = fe.ExecuteStatement(req) - resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - - val tFetchResultsReq = - new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - - Thread.sleep(5000) - val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) - tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - tFetchResultsResp.getResults.getRows.size() should be(0) - } - } - - test("add jar hdfs") { - withFEServiceAndHandle { (fe, handle) => - val req = new TExecuteStatementReq(handle, "add jar hdfs://a/b/test.jar") - val resp = fe.ExecuteStatement(req) - resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - - val tFetchResultsReq = - new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - - Thread.sleep(5000) - val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) - tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) - } - } - - test("add jar local") { - withFEServiceAndHandle { (fe, handle) => - val req = new TExecuteStatementReq(handle, "add jar file://a/b/test.jar") - val resp = fe.ExecuteStatement(req) - resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - - val tFetchResultsReq = - new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - - Thread.sleep(5000) - val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) - tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - tFetchResultsResp.getResults.getRows.get(0).getColVals.get(0).getI32Val.getValue should be(0) - } - } - - test("create temporary function") { - withFEServiceAndHandle { (fe, handle) => - val req = new TExecuteStatementReq(handle, - "create temporary function testfunc as 'testClass' using jar 'hdfs://a/b/test.jar'") - val resp = fe.ExecuteStatement(req) - resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - - val tFetchResultsReq = - new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - - Thread.sleep(5000) - val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) - tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) - } - } - - test("select") { - val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => { - val kyuubiSession = server.beService.getSessionManager.getSession(new SessionHandle(handle)) - kyuubiSession.sparkSession.sql( - "create table if not exists default.select_tbl(key int) using parquet") - val ct = new TExecuteStatementReq(handle, "select * from default.select_tbl") - val tExecuteStatementResp = fe.ExecuteStatement(ct) - val statusReq = new TGetOperationStatusReq(tExecuteStatementResp.getOperationHandle) - - while(fe.GetOperationStatus(statusReq) - .getOperationState.getValue < TOperationState.FINISHED_STATE.getValue) { - Thread.sleep(10) - } - Thread.sleep(2000) - - val tFetchResultsReq = new TFetchResultsReq( - tExecuteStatementResp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - - val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) - tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - tFetchResultsResp.getResults.getRows.size() should be(0) - } - withFEServiceAndHandleIncAndCal(block) - withFEServiceAndHandleInc(block) - withFEServiceAndHandle(block) - withFEServiceAndHandleAndResultLimit(block) - } - - test("select with exception") { - val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => { - val kyuubiSession = server.beService.getSessionManager.getSession(new SessionHandle(handle)) - kyuubiSession.sparkSession.sql( - "create table if not exists default.select_tbl(key int) using parquet") - val ct = new TExecuteStatementReq(handle, "select * from default.select_tbl") - val tExecuteStatementResp = fe.ExecuteStatement(ct) - val statusReq = new TGetOperationStatusReq(tExecuteStatementResp.getOperationHandle) - - while(fe.GetOperationStatus(statusReq) - .getOperationState.getValue < TOperationState.FINISHED_STATE.getValue) { - Thread.sleep(10) - } - Thread.sleep(2000) - - val tFetchResultsReq = new TFetchResultsReq( - tExecuteStatementResp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - - val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) - tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) - } - - withFEServiceAndHandleAndException(block) - } - - def withFEServiceAndHandle(block: (FrontendService, TSessionHandle) => Unit): Unit = { - val feService = server.feService - val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1) - req.setUsername(user) - val resp = feService.OpenSession(req) - resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - val handle = resp.getSessionHandle - block(feService, handle) - } - - def withFEServiceAndHandleAndResultLimit( - block: (FrontendService, TSessionHandle) => Unit): Unit = { - val feService = server.feService - val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1) - req.setUsername(user) - req.setConfiguration( - Map("set:hivevar:" + KyuubiConf.OPERATION_RESULT_LIMIT.key -> "1").asJava) - val resp = feService.OpenSession(req) - resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - val handle = resp.getSessionHandle - block(feService, handle) - } - - def withFEServiceAndHandleAndException( - block: (FrontendService, TSessionHandle) => Unit): Unit = { - val feService = server.feService - val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1) - req.setUsername(user) - req.setConfiguration( - Map("set:hivevar:" + KyuubiConf.OPERATION_RESULT_LIMIT.key -> "invaild put").asJava) - val resp = feService.OpenSession(req) - resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - val handle = resp.getSessionHandle - block(feService, handle) - } - - def withFEServiceAndHandleInc(block: (FrontendService, TSessionHandle) => Unit): Unit = { - val feService = server.feService - val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1) - req.setUsername(user) - req.setConfiguration( - Map("set:hivevar:" + KyuubiConf.OPERATION_INCREMENTAL_COLLECT.key -> "true").asJava) - val resp = feService.OpenSession(req) - resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - val handle = resp.getSessionHandle - block(feService, handle) - } - - def withFEServiceAndHandleIncAndCal(block: (FrontendService, TSessionHandle) => Unit): Unit = { - val feService = server.feService - val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1) - req.setUsername(user) - req.setConfiguration( - Map("set:hivevar:" + KyuubiConf.OPERATION_INCREMENTAL_COLLECT.key -> "true", - "set:hivevar:" + KyuubiConf.OPERATION_INCREMENTAL_RDD_PARTITIONS_LIMIT.key -> "-1") - .asJava) - val resp = feService.OpenSession(req) - resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - val handle = resp.getSessionHandle - block(feService, handle) - } -}