add and rm some tests
This commit is contained in:
parent
7864f6210a
commit
cf255c79c6
@ -57,7 +57,7 @@ install:
|
||||
# - build/mvn --no-transfer-progress clean verify -pl :kyuubi-common,:kyuubi-ha,:kyuubi-main,:kyuubi-spark-sql-engine,:kyuubi-codecov,:kyuubi-download,:kyuubi-assembly -Dmaven.javadoc.skip=true -B -V
|
||||
|
||||
script:
|
||||
- mvn clean verify -pl '!:kyuubi-server,!:kyuubi-thrift' -Dmaven.javadoc.skip=true -B -V
|
||||
- build/mvn clean verify -pl '!:kyuubi-server,!:kyuubi-thrift' -Dmaven.javadoc.skip=true -B -V
|
||||
|
||||
after_script:
|
||||
- rm -r $HOME/.m2/repository/org/apache/kyuubi
|
||||
|
||||
@ -47,12 +47,6 @@ abstract class AbstractBackendService(name: String)
|
||||
sessionManager.getSession(sessionHandle).getInfo(infoType)
|
||||
}
|
||||
|
||||
override def executeStatement(
|
||||
sessionHandle: SessionHandle,
|
||||
statement: String): OperationHandle = {
|
||||
sessionManager.getSession(sessionHandle).executeStatement(statement)
|
||||
}
|
||||
|
||||
override def executeStatement(
|
||||
sessionHandle: SessionHandle,
|
||||
statement: String,
|
||||
@ -60,12 +54,6 @@ abstract class AbstractBackendService(name: String)
|
||||
sessionManager.getSession(sessionHandle).executeStatement(statement, queryTimeout)
|
||||
}
|
||||
|
||||
override def executeStatementAsync(
|
||||
sessionHandle: SessionHandle,
|
||||
statement: String): OperationHandle = {
|
||||
sessionManager.getSession(sessionHandle).executeStatementAsync(statement)
|
||||
}
|
||||
|
||||
override def executeStatementAsync(
|
||||
sessionHandle: SessionHandle,
|
||||
statement: String,
|
||||
|
||||
@ -63,6 +63,7 @@ abstract class AbstractService(serviceName: String) extends Service with Logging
|
||||
override def stop(): Unit = {
|
||||
state match {
|
||||
case LATENT | INITIALIZED | STOPPED =>
|
||||
warn(s"Service[$serviceName] is not started($state) yet.")
|
||||
case _ =>
|
||||
ensureCurrentState(STARTED)
|
||||
changeState(STOPPED)
|
||||
@ -112,9 +113,7 @@ abstract class AbstractService(serviceName: String) extends Service with Logging
|
||||
private def ensureCurrentState(currentState: ServiceState): Unit = {
|
||||
if (state ne currentState) {
|
||||
throw new IllegalStateException(
|
||||
s"""
|
||||
|For this operation, the current service state must be $currentState instead of $state
|
||||
""".stripMargin)
|
||||
s"For this operation, the current service state must be $currentState instead of $state")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -46,16 +46,10 @@ trait BackendService {
|
||||
|
||||
def getInfo(sessionHandle: SessionHandle, infoType: TGetInfoType): TGetInfoValue
|
||||
|
||||
def executeStatement(
|
||||
sessionHandle: SessionHandle,
|
||||
statement: String): OperationHandle
|
||||
def executeStatement(
|
||||
sessionHandle: SessionHandle,
|
||||
statement: String,
|
||||
queryTimeout: Long): OperationHandle
|
||||
def executeStatementAsync(
|
||||
sessionHandle: SessionHandle,
|
||||
statement: String): OperationHandle
|
||||
def executeStatementAsync(
|
||||
sessionHandle: SessionHandle,
|
||||
statement: String,
|
||||
|
||||
@ -59,7 +59,7 @@ class HadoopThriftAuthBridgeServer(secretMgr: KyuubiDelegationTokenManager) {
|
||||
null,
|
||||
SaslRpcServer.SASL_DEFAULT_REALM,
|
||||
saslProps,
|
||||
SaslDigestCallbackHandler(secretMgr))
|
||||
new SaslDigestCallbackHandler(secretMgr))
|
||||
factory
|
||||
}
|
||||
|
||||
@ -69,7 +69,7 @@ class HadoopThriftAuthBridgeServer(secretMgr: KyuubiDelegationTokenManager) {
|
||||
* the SASL transport.
|
||||
*/
|
||||
def wrapTransportFactory(transFactory: TTransportFactory): TTransportFactory = {
|
||||
TUGIAssumingTransportFactory(ugi, transFactory)
|
||||
new TUGIAssumingTransportFactory(ugi, transFactory)
|
||||
}
|
||||
|
||||
/**
|
||||
@ -78,14 +78,14 @@ class HadoopThriftAuthBridgeServer(secretMgr: KyuubiDelegationTokenManager) {
|
||||
* the SASL transport.
|
||||
*/
|
||||
def wrapProcessor(processor: TProcessor): TProcessor = {
|
||||
TUGIAssumingProcessor(processor, secretMgr, userProxy = true)
|
||||
new TUGIAssumingProcessor(processor, secretMgr, userProxy = true)
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrap a TProcessor to capture the client information like connecting userid, ip etc
|
||||
*/
|
||||
def wrapNonAssumingProcessor(processor: TProcessor): TProcessor = {
|
||||
TUGIAssumingProcessor(processor, secretMgr, userProxy = false)
|
||||
new TUGIAssumingProcessor(processor, secretMgr, userProxy = false)
|
||||
}
|
||||
|
||||
def getRemoteAddress: InetAddress = REMOTE_ADDRESS.get
|
||||
@ -119,7 +119,7 @@ object HadoopThriftAuthBridgeServer {
|
||||
* This is used on the server side to assume the server's Principal when accepting
|
||||
* clients.
|
||||
*/
|
||||
case class TUGIAssumingTransportFactory(
|
||||
class TUGIAssumingTransportFactory(
|
||||
ugi: UserGroupInformation,
|
||||
wrapped: TTransportFactory) extends TTransportFactory {
|
||||
|
||||
@ -139,7 +139,7 @@ object HadoopThriftAuthBridgeServer {
|
||||
*
|
||||
* This is used on the server side to set the UGI for each specific call.
|
||||
*/
|
||||
case class TUGIAssumingProcessor(
|
||||
class TUGIAssumingProcessor(
|
||||
wrapped: TProcessor,
|
||||
secretMgr: KyuubiDelegationTokenManager,
|
||||
userProxy: Boolean) extends TProcessor with Logging {
|
||||
@ -204,7 +204,7 @@ object HadoopThriftAuthBridgeServer {
|
||||
/**
|
||||
* From Apache Hive
|
||||
*/
|
||||
case class SaslDigestCallbackHandler(secretMgr: KyuubiDelegationTokenManager)
|
||||
class SaslDigestCallbackHandler(secretMgr: KyuubiDelegationTokenManager)
|
||||
extends CallbackHandler with Logging {
|
||||
|
||||
def getPasswd(identifer: KyuubiDelegationTokenIdentifier): Array[Char] = {
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
|
||||
import org.apache.hive.service.rpc.thrift.TProtocolVersion._
|
||||
|
||||
import org.apache.kyuubi.KyuubiFunSuite
|
||||
import org.apache.kyuubi.cli.HandleIdentifier
|
||||
import org.apache.kyuubi.operation.OperationType._
|
||||
|
||||
class OperationHandleSuite extends KyuubiFunSuite {
|
||||
@ -40,5 +41,11 @@ class OperationHandleSuite extends KyuubiFunSuite {
|
||||
assert(!t1.isHasResultSet)
|
||||
h1.setHasResultSet(true)
|
||||
assert(h1.toTOperationHandle.isHasResultSet)
|
||||
assert(h1 !== null)
|
||||
assert(h1 !== new Integer(1))
|
||||
val h3 = OperationHandle(h1.identifier, GET_CATALOGS, HIVE_CLI_SERVICE_PROTOCOL_V10)
|
||||
assert(h3 !== h1, "different types")
|
||||
val h4 = OperationHandle(HandleIdentifier(), h1.typ, HIVE_CLI_SERVICE_PROTOCOL_V10)
|
||||
assert(h4 !== h1)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,120 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package yaooqinn.kyuubi.auth
|
||||
|
||||
import javax.security.auth.login.LoginException
|
||||
|
||||
import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException
|
||||
import org.apache.kyuubi.KyuubiSQLException
|
||||
import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf, SparkFunSuite}
|
||||
import yaooqinn.kyuubi.service.ServiceException
|
||||
import yaooqinn.kyuubi.utils.ReflectUtils
|
||||
|
||||
import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
|
||||
|
||||
class KyuubiAuthFactorySuite extends SparkFunSuite {
|
||||
|
||||
test("testVerifyProxyAccess") {
|
||||
val conf = new SparkConf(true)
|
||||
val hadoopConf = KyuubiSparkUtil.newConfiguration(conf)
|
||||
val user = UserGroupInformation.getCurrentUser.getShortUserName
|
||||
KyuubiAuthenticationFactory.verifyProxyAccess(user, user, "localhost", hadoopConf)
|
||||
val e = intercept[KyuubiSQLException](
|
||||
KyuubiAuthenticationFactory.verifyProxyAccess(user, "proxy-user", "localhost", hadoopConf))
|
||||
val msg = "Failed to validate proxy privilege"
|
||||
assert(e.getMessage.contains(msg))
|
||||
assert(e.getCause.isInstanceOf[AuthorizationException])
|
||||
hadoopConf.set(s"hadoop.proxyuser.$user.groups", "*")
|
||||
val e2 = intercept[KyuubiSQLException](
|
||||
KyuubiAuthenticationFactory.verifyProxyAccess(user, "proxy-user", "localhost", hadoopConf))
|
||||
assert(e2.getMessage.contains(msg))
|
||||
assert(e2.getCause.getMessage.contains("Unauthorized connection for super-user"))
|
||||
hadoopConf.set(s"hadoop.proxyuser.$user.hosts", "*")
|
||||
KyuubiAuthenticationFactory.verifyProxyAccess(user, "proxy-user", "localhost", hadoopConf)
|
||||
}
|
||||
|
||||
test("test HS2_PROXY_USER") {
|
||||
assert(KyuubiAuthFactory.HS2_PROXY_USER === "hive.server2.proxy.user")
|
||||
}
|
||||
|
||||
test("AuthType NONE") {
|
||||
val conf = new SparkConf(true)
|
||||
KyuubiSparkUtil.setupCommonConfig(conf)
|
||||
val auth = new KyuubiAuthFactory(conf)
|
||||
val saslServer = ReflectUtils.getFieldValue(auth, "saslServer")
|
||||
assert(saslServer === None)
|
||||
assert(auth.getRemoteUser === None)
|
||||
assert(auth.getIpAddress === None)
|
||||
val user = UserGroupInformation.getCurrentUser.getShortUserName
|
||||
val e = intercept[KyuubiSQLException](auth.getDelegationToken(user, user))
|
||||
assert(e.getMessage.contains("Delegation token only supported over kerberos authentication"))
|
||||
val e1 = intercept[KyuubiSQLException](auth.cancelDelegationToken(""))
|
||||
assert(e1.getMessage.contains("Delegation token only supported over kerberos authentication"))
|
||||
val e2 = intercept[KyuubiSQLException](auth.renewDelegationToken(""))
|
||||
assert(e2.getMessage.contains("Delegation token only supported over kerberos authentication"))
|
||||
}
|
||||
|
||||
test("AuthType Other") {
|
||||
val conf = new SparkConf(true).set(KyuubiConf.AUTHENTICATION_METHOD.key, "other")
|
||||
KyuubiSparkUtil.setupCommonConfig(conf)
|
||||
val e = intercept[ServiceException](new KyuubiAuthFactory(conf))
|
||||
assert(e.getMessage === "Unsupported authentication method: OTHER")
|
||||
}
|
||||
|
||||
test("AuthType LDAP") {
|
||||
val conf = new SparkConf(true).set(KyuubiConf.AUTHENTICATION_METHOD.key, "LDAP")
|
||||
KyuubiSparkUtil.setupCommonConfig(conf)
|
||||
val authFactory = new KyuubiAuthFactory(conf)
|
||||
assert(authFactory.getIpAddress.isEmpty)
|
||||
}
|
||||
|
||||
test("AuthType KERBEROS without keytab/principal") {
|
||||
val conf = new SparkConf(true).set(KyuubiConf.AUTHENTICATION_METHOD.key, "KERBEROS")
|
||||
KyuubiSparkUtil.setupCommonConfig(conf)
|
||||
val e = intercept[ServiceException](new KyuubiAuthFactory(conf))
|
||||
assert(e.getMessage === "spark.yarn.keytab and spark.yarn.principal are not configured " +
|
||||
"properly for KERBEROS Authentication method")
|
||||
}
|
||||
|
||||
test("AuthType KERBEROS with keytab/principal ioe") {
|
||||
val conf = new SparkConf(true)
|
||||
.set(KyuubiConf.AUTHENTICATION_METHOD.key, "KERBEROS")
|
||||
.set(KyuubiSparkUtil.KEYTAB, "kent.keytab")
|
||||
.set(KyuubiSparkUtil.PRINCIPAL, "kent")
|
||||
KyuubiSparkUtil.setupCommonConfig(conf)
|
||||
val auth = new KyuubiAuthFactory(conf)
|
||||
val saslServer = ReflectUtils.getFieldValue(auth, "saslServer")
|
||||
saslServer match {
|
||||
case Some(server) =>
|
||||
assert(server.isInstanceOf[HadoopThriftAuthBridge.Server])
|
||||
intercept[LoginException](auth.getAuthTransFactory)
|
||||
case None => assert(false, "server could not be none")
|
||||
}
|
||||
|
||||
val user = UserGroupInformation.getCurrentUser.getShortUserName
|
||||
val e = intercept[KyuubiSQLException](auth.getDelegationToken(user, user))
|
||||
assert(e.getMessage.contains(s"Error retrieving delegation token for user $user"))
|
||||
val e1 = intercept[KyuubiSQLException](auth.cancelDelegationToken(""))
|
||||
assert(e1.getMessage.contains("Error canceling delegation token"))
|
||||
val e2 = intercept[KyuubiSQLException](auth.renewDelegationToken(""))
|
||||
assert(e2.getMessage.contains("Error renewing delegation token"))
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,133 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package yaooqinn.kyuubi.operation
|
||||
|
||||
import org.apache.hive.service.cli.thrift.{TOperationType, TProtocolVersion}
|
||||
import org.apache.spark.SparkFunSuite
|
||||
|
||||
import yaooqinn.kyuubi.cli.HandleIdentifier
|
||||
|
||||
class OperationHandleSuite extends SparkFunSuite {
|
||||
|
||||
import TProtocolVersion._
|
||||
|
||||
test("operation handle basic tests") {
|
||||
val handle1 = new OperationHandle(EXECUTE_STATEMENT, HIVE_CLI_SERVICE_PROTOCOL_V8)
|
||||
assert(!handle1.isHasResultSet)
|
||||
handle1.setHasResultSet(true)
|
||||
assert(handle1.isHasResultSet)
|
||||
assert(handle1.toTOperationHandle.isHasResultSet)
|
||||
assert(handle1.getOperationType === EXECUTE_STATEMENT)
|
||||
assert(handle1.getProtocolVersion === HIVE_CLI_SERVICE_PROTOCOL_V8)
|
||||
|
||||
val handle2 = new OperationHandle(
|
||||
handle1.toTOperationHandle, HIVE_CLI_SERVICE_PROTOCOL_V8)
|
||||
assert(handle2.isHasResultSet)
|
||||
assert(handle2.toTOperationHandle.isHasResultSet)
|
||||
assert(handle2.getOperationType === EXECUTE_STATEMENT)
|
||||
assert(handle2.getProtocolVersion === HIVE_CLI_SERVICE_PROTOCOL_V8)
|
||||
assert(handle1 equals handle2)
|
||||
|
||||
val handle3 = new OperationHandle(handle2.toTOperationHandle)
|
||||
assert(handle3.isHasResultSet)
|
||||
assert(handle3.toTOperationHandle.isHasResultSet)
|
||||
assert(handle3.getOperationType === EXECUTE_STATEMENT)
|
||||
assert(handle3.getProtocolVersion === HIVE_CLI_SERVICE_PROTOCOL_V1)
|
||||
assert(handle3 === handle2)
|
||||
|
||||
assert(handle1.toTOperationHandle.getOperationType === TOperationType.EXECUTE_STATEMENT)
|
||||
assert(handle1.toString === handle3.toString)
|
||||
assert(handle1.toTOperationHandle.getOperationId ===
|
||||
handle1.getHandleIdentifier.toTHandleIdentifier)
|
||||
}
|
||||
|
||||
test("operation handle to string") {
|
||||
val opType = EXECUTE_STATEMENT
|
||||
val protocol = HIVE_CLI_SERVICE_PROTOCOL_V8
|
||||
val handle = new OperationHandle(opType, protocol)
|
||||
val hStr = handle.toString
|
||||
assert(hStr.startsWith(classOf[OperationHandle].getSimpleName))
|
||||
assert(hStr.contains(opType.toString))
|
||||
assert(hStr.contains(handle.getOperationType.toString))
|
||||
assert(hStr.contains(handle.getHandleIdentifier.toString))
|
||||
}
|
||||
|
||||
test("operation handle equals") {
|
||||
val opType = EXECUTE_STATEMENT
|
||||
val protocol = HIVE_CLI_SERVICE_PROTOCOL_V8
|
||||
val handle = new OperationHandle(opType, protocol)
|
||||
assert(!handle.equals(new Object()))
|
||||
assert(handle.equals(handle))
|
||||
val handle2 = new OperationHandle(opType, protocol)
|
||||
assert(!handle.equals(handle2))
|
||||
val handle3 = new OperationHandle(handle.toTOperationHandle)
|
||||
assert(handle.equals(handle3))
|
||||
val handle4 = new OperationHandle(handle.toTOperationHandle,
|
||||
HIVE_CLI_SERVICE_PROTOCOL_V7)
|
||||
assert(handle.equals(handle4))
|
||||
val ctor = classOf[OperationHandle].getDeclaredConstructor(
|
||||
classOf[OperationType], classOf[TProtocolVersion], classOf[HandleIdentifier])
|
||||
ctor.setAccessible(true)
|
||||
val handle5 = ctor.newInstance(GET_TYPE_INFO, protocol, handle.getHandleIdentifier)
|
||||
assert(handle5.isInstanceOf[OperationHandle])
|
||||
assert(handle5.getOperationType !== handle.getOperationType)
|
||||
assert(handle5.getHandleIdentifier === handle.getHandleIdentifier)
|
||||
assert(handle5.getProtocolVersion === handle.getProtocolVersion)
|
||||
assert(!handle.equals(handle5))
|
||||
}
|
||||
|
||||
test("operation handle hash code") {
|
||||
val opType = EXECUTE_STATEMENT
|
||||
val protocol = HIVE_CLI_SERVICE_PROTOCOL_V8
|
||||
val handle = new OperationHandle(opType, protocol)
|
||||
val prime = 31
|
||||
assert(handle.hashCode ===
|
||||
prime * (prime * 1 + handle.getHandleIdentifier.hashCode) + opType.hashCode())
|
||||
}
|
||||
|
||||
test("operation handle get protocol version") {
|
||||
val opType = EXECUTE_STATEMENT
|
||||
val protocol = HIVE_CLI_SERVICE_PROTOCOL_V8
|
||||
val handle = new OperationHandle(opType, protocol)
|
||||
assert(handle.getProtocolVersion === protocol)
|
||||
assert(new OperationHandle(handle.toTOperationHandle).getProtocolVersion ===
|
||||
HIVE_CLI_SERVICE_PROTOCOL_V1)
|
||||
assert(new OperationHandle(handle.toTOperationHandle,
|
||||
HIVE_CLI_SERVICE_PROTOCOL_V1).getProtocolVersion === HIVE_CLI_SERVICE_PROTOCOL_V1)
|
||||
}
|
||||
|
||||
test("operation handle has result set") {
|
||||
val opType = EXECUTE_STATEMENT
|
||||
val protocol = HIVE_CLI_SERVICE_PROTOCOL_V8
|
||||
val handle = new OperationHandle(opType, protocol)
|
||||
assert(!handle.isHasResultSet)
|
||||
handle.setHasResultSet(false)
|
||||
assert(!handle.isHasResultSet)
|
||||
handle.setHasResultSet(true)
|
||||
assert(handle.isHasResultSet)
|
||||
}
|
||||
|
||||
test("operation handle to tOperationType") {
|
||||
val opType = EXECUTE_STATEMENT
|
||||
val protocol = HIVE_CLI_SERVICE_PROTOCOL_V8
|
||||
val handle = new OperationHandle(opType, protocol)
|
||||
assert(!handle.toTOperationHandle.isHasResultSet)
|
||||
handle.setHasResultSet(true)
|
||||
assert(handle.toTOperationHandle.isHasResultSet)
|
||||
}
|
||||
}
|
||||
@ -1,92 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package yaooqinn.kyuubi.operation
|
||||
|
||||
import org.apache.hive.service.cli.thrift.TOperationType
|
||||
import org.apache.spark.SparkFunSuite
|
||||
|
||||
class OperationTypeSuite extends SparkFunSuite {
|
||||
|
||||
test("EXECUTE_STATEMENT") {
|
||||
val tOpType = TOperationType.EXECUTE_STATEMENT
|
||||
assert(OperationType.getOperationType(tOpType) === EXECUTE_STATEMENT)
|
||||
assert(OperationType.getOperationType(tOpType).toTOperationType === tOpType)
|
||||
assert(EXECUTE_STATEMENT.toTOperationType === tOpType)
|
||||
}
|
||||
|
||||
test("GET_CATALOGS") {
|
||||
val tOpType = TOperationType.GET_CATALOGS
|
||||
assert(OperationType.getOperationType(tOpType) === GET_CATALOGS)
|
||||
assert(OperationType.getOperationType(tOpType).toTOperationType === tOpType)
|
||||
assert(GET_CATALOGS.toTOperationType === tOpType)
|
||||
}
|
||||
|
||||
test("GET_TYPE_INFO") {
|
||||
val tOpType = TOperationType.GET_TYPE_INFO
|
||||
assert(OperationType.getOperationType(tOpType) === GET_TYPE_INFO)
|
||||
assert(OperationType.getOperationType(tOpType).toTOperationType === tOpType)
|
||||
assert(GET_TYPE_INFO.toTOperationType === tOpType)
|
||||
}
|
||||
|
||||
test("GET_SCHEMAS") {
|
||||
val tOpType = TOperationType.GET_SCHEMAS
|
||||
assert(OperationType.getOperationType(tOpType) === GET_SCHEMAS)
|
||||
assert(OperationType.getOperationType(tOpType).toTOperationType === tOpType)
|
||||
assert(GET_SCHEMAS.toTOperationType === tOpType)
|
||||
}
|
||||
|
||||
test("GET_TABLES") {
|
||||
val tOpType = TOperationType.GET_TABLES
|
||||
assert(OperationType.getOperationType(tOpType) === GET_TABLES)
|
||||
assert(OperationType.getOperationType(tOpType).toTOperationType === tOpType)
|
||||
assert(GET_TABLES.toTOperationType === tOpType)
|
||||
}
|
||||
|
||||
test("GET_TABLE_TYPES") {
|
||||
val tOpType = TOperationType.GET_TABLE_TYPES
|
||||
assert(OperationType.getOperationType(tOpType) === GET_TABLE_TYPES)
|
||||
assert(OperationType.getOperationType(tOpType).toTOperationType === tOpType)
|
||||
assert(GET_TABLE_TYPES.toTOperationType === tOpType)
|
||||
}
|
||||
|
||||
test("GET_COLUMNS") {
|
||||
val tOpType = TOperationType.GET_COLUMNS
|
||||
assert(OperationType.getOperationType(tOpType) === GET_COLUMNS)
|
||||
assert(OperationType.getOperationType(tOpType).toTOperationType === tOpType)
|
||||
assert(GET_COLUMNS.toTOperationType === tOpType)
|
||||
}
|
||||
|
||||
test("GET_FUNCTIONS") {
|
||||
val tOpType = TOperationType.GET_FUNCTIONS
|
||||
assert(OperationType.getOperationType(tOpType) === GET_FUNCTIONS)
|
||||
assert(OperationType.getOperationType(tOpType).toTOperationType === tOpType)
|
||||
assert(GET_FUNCTIONS.toTOperationType === tOpType)
|
||||
}
|
||||
|
||||
test("UNKNOWN_OPERATION") {
|
||||
val tOpType = TOperationType.UNKNOWN
|
||||
assert(OperationType.getOperationType(tOpType) === UNKNOWN_OPERATION)
|
||||
assert(OperationType.getOperationType(tOpType).toTOperationType === tOpType)
|
||||
assert(UNKNOWN_OPERATION.toTOperationType === tOpType)
|
||||
val unknownOperation = new OperationType {
|
||||
override def toTOperationType: TOperationType = TOperationType.findByValue(9)
|
||||
}
|
||||
assert(unknownOperation.toTOperationType === null)
|
||||
assert(OperationType.getOperationType(TOperationType.findByValue(9)) === UNKNOWN_OPERATION)
|
||||
}
|
||||
}
|
||||
@ -1,481 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package yaooqinn.kyuubi.server
|
||||
|
||||
import java.net.InetAddress
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.hive.service.cli.thrift._
|
||||
import org.apache.kyuubi.KyuubiSQLException
|
||||
import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf, SparkFunSuite}
|
||||
import org.apache.spark.KyuubiConf._
|
||||
import org.scalatest.Matchers
|
||||
import yaooqinn.kyuubi.SecuredFunSuite
|
||||
import yaooqinn.kyuubi.metrics.MetricsSystem
|
||||
import yaooqinn.kyuubi.operation.OperationHandle
|
||||
import yaooqinn.kyuubi.service.{ServiceException, State}
|
||||
import yaooqinn.kyuubi.session.SessionHandle
|
||||
|
||||
class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSuite {
|
||||
|
||||
private val server: KyuubiServer = new KyuubiServer()
|
||||
private val user = KyuubiSparkUtil.getCurrentUserName
|
||||
private val conf = new SparkConf(loadDefaults = true).setAppName("fe test")
|
||||
KyuubiSparkUtil.setupCommonConfig(conf)
|
||||
conf.remove(KyuubiSparkUtil.CATALOG_IMPL)
|
||||
conf.setMaster("local").set(FRONTEND_BIND_PORT.key, "0")
|
||||
|
||||
server.init(conf)
|
||||
server.start()
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
Option(server).foreach(_.stop())
|
||||
super.afterAll()
|
||||
}
|
||||
|
||||
test("get port num") {
|
||||
val feService = new FrontendService(server.beService)
|
||||
feService.getPortNumber should be(0)
|
||||
feService.init(conf)
|
||||
feService.getPortNumber should not be 0
|
||||
}
|
||||
|
||||
test("get server ip addr") {
|
||||
val feService = new FrontendService(server.beService)
|
||||
feService.getServerIPAddress should be(null)
|
||||
feService.init(conf)
|
||||
feService.getServerIPAddress should not be null
|
||||
}
|
||||
|
||||
|
||||
test("fe tserver event handler") {
|
||||
withFEServiceAndHandle { case (fe, handle) =>
|
||||
val handler = new fe.FeTServerEventHandler
|
||||
val context = new fe.FeServiceServerContext()
|
||||
context.setSessionHandle(new SessionHandle(handle))
|
||||
handler.createContext(null, null)
|
||||
handler.processContext(context, null, null)
|
||||
handler.deleteContext(context, null, null)
|
||||
}
|
||||
}
|
||||
|
||||
test("fe tserver event handler with metrics") {
|
||||
withFEServiceAndHandle { case (fe, handle) =>
|
||||
MetricsSystem.init(conf)
|
||||
val handler = new fe.FeTServerEventHandler
|
||||
val context = new fe.FeServiceServerContext()
|
||||
context.setSessionHandle(new SessionHandle(handle))
|
||||
handler.createContext(null, null)
|
||||
handler.processContext(context, null, null)
|
||||
handler.deleteContext(context, null, null)
|
||||
MetricsSystem.close()
|
||||
}
|
||||
}
|
||||
|
||||
test("open session, execute sql and get results") {
|
||||
|
||||
withFEServiceAndHandle { case (fe, handle) =>
|
||||
val req2 = new TExecuteStatementReq(handle, "show databases")
|
||||
val resp2 = fe.ExecuteStatement(req2)
|
||||
resp2.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
val req3 = new TGetOperationStatusReq(resp2.getOperationHandle)
|
||||
val resp3 = fe.GetOperationStatus(req3)
|
||||
resp3.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
while(fe.GetOperationStatus(req3)
|
||||
.getOperationState.getValue < TOperationState.FINISHED_STATE.getValue) {
|
||||
Thread.sleep(10)
|
||||
}
|
||||
Thread.sleep(2000)
|
||||
val req4 = new TFetchResultsReq(resp2.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
val resp4 = fe.FetchResults(req4)
|
||||
resp4.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
resp4.getResults.getRows.get(0).getColVals.get(0).getStringVal.getValue should be("default")
|
||||
assert(!resp4.getResults.isSetColumns)
|
||||
val req5 = new TGetResultSetMetadataReq(resp2.getOperationHandle)
|
||||
val resp5 = fe.GetResultSetMetadata(req5)
|
||||
resp5.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
resp5.getSchema.getColumns.get(0).getColumnName should be("databaseName")
|
||||
val req7 = new TCancelOperationReq(resp2.getOperationHandle)
|
||||
val resp7 = fe.CancelOperation(req7)
|
||||
resp7.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
val req6 = new TCloseOperationReq(resp2.getOperationHandle)
|
||||
val resp6 = fe.CloseOperation(req6)
|
||||
resp6.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
val req9 = new TCancelOperationReq(resp2.getOperationHandle)
|
||||
val resp9 = fe.CancelOperation(req9)
|
||||
resp9.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
|
||||
|
||||
val reqInfo1 = new TGetInfoReq(handle, TGetInfoType.CLI_DBMS_NAME)
|
||||
val respInfo1 = fe.GetInfo(reqInfo1)
|
||||
respInfo1.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
|
||||
val reqInfo2 = new TGetInfoReq(handle, TGetInfoType.CLI_ACCESSIBLE_PROCEDURES)
|
||||
val respInfo2 = fe.GetInfo(reqInfo2)
|
||||
respInfo2.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
|
||||
respInfo2.getStatus.getErrorMessage should
|
||||
include(TGetInfoType.CLI_ACCESSIBLE_PROCEDURES.toString)
|
||||
|
||||
val req8 = new TCloseSessionReq(handle)
|
||||
val resp8 = fe.CloseSession(req8)
|
||||
resp8.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
|
||||
// after session closed
|
||||
val resp10 = fe.CloseSession(req8)
|
||||
resp10.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
|
||||
resp10.getStatus.getErrorMessage should include("does not exist!")
|
||||
val resp11 = fe.ExecuteStatement(req2)
|
||||
resp11.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
|
||||
fe.GetOperationStatus(req3).getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
|
||||
fe.FetchResults(req4).getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
|
||||
fe.GetResultSetMetadata(req5)
|
||||
.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
|
||||
fe.CancelOperation(req7).getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
|
||||
fe.CloseOperation(req6).getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
|
||||
|
||||
val tOpenSessionReq = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
|
||||
tOpenSessionReq.setUsername("yaooqinn")
|
||||
tOpenSessionReq.setPassword("passwd")
|
||||
tOpenSessionReq.setConfiguration(
|
||||
Map("hive.server2.proxy.user" -> "kent").asJava)
|
||||
val tOpenSessionResp = fe.OpenSession(tOpenSessionReq)
|
||||
tOpenSessionResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
}
|
||||
}
|
||||
|
||||
test("execute statement sync") {
|
||||
withFEServiceAndHandle { case (fe, handle) =>
|
||||
val req = new TExecuteStatementReq(handle, "show databases")
|
||||
req.setRunAsync(false)
|
||||
val resp = fe.ExecuteStatement(req)
|
||||
val req2 = new TGetOperationStatusReq(resp.getOperationHandle)
|
||||
val statusResp = fe.GetOperationStatus(req2)
|
||||
statusResp.getOperationState should be(TOperationState.FINISHED_STATE)
|
||||
val fReq = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
val fRes = fe.FetchResults(fReq)
|
||||
val rows = fRes.getResults.getRows
|
||||
rows.get(0).getColVals.get(0).getStringVal.getValue should be("default")
|
||||
}
|
||||
}
|
||||
|
||||
test("execute statement async") {
|
||||
withFEServiceAndHandle { case (fe, handle) =>
|
||||
val req = new TExecuteStatementReq(handle, "show databases")
|
||||
req.setRunAsync(true)
|
||||
val resp = fe.ExecuteStatement(req)
|
||||
val statusReq = new TGetOperationStatusReq(resp.getOperationHandle)
|
||||
while(fe.GetOperationStatus(statusReq)
|
||||
.getOperationState.getValue < TOperationState.FINISHED_STATE.getValue) {
|
||||
Thread.sleep(10)
|
||||
}
|
||||
Thread.sleep(2000)
|
||||
val fReq = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
val fRes = fe.FetchResults(fReq)
|
||||
val rows = fRes.getResults.getRows
|
||||
rows.get(0).getColVals.get(0).getStringVal.getValue should be("default")
|
||||
}
|
||||
}
|
||||
|
||||
test("alter database") {
|
||||
val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => {
|
||||
val req = new TExecuteStatementReq(handle,
|
||||
"alter database default set dbproperties ('kent'='yao')")
|
||||
val resp = fe.ExecuteStatement(req)
|
||||
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
|
||||
val tFetchResultsReq =
|
||||
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
Thread.sleep(5000)
|
||||
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
|
||||
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
tFetchResultsResp.getResults.getRows.size() should be(0)
|
||||
}
|
||||
withFEServiceAndHandle(block)
|
||||
withFEServiceAndHandleInc(block)
|
||||
withFEServiceAndHandleIncAndCal(block)
|
||||
}
|
||||
|
||||
test("alter schema") {
|
||||
val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => {
|
||||
val req = new TExecuteStatementReq(handle,
|
||||
"alter schema default set dbproperties ('kent'='yao')")
|
||||
val resp = fe.ExecuteStatement(req)
|
||||
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
|
||||
val tFetchResultsReq =
|
||||
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
|
||||
Thread.sleep(5000)
|
||||
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
|
||||
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
tFetchResultsResp.getResults.getRows.size() should be(0)
|
||||
}
|
||||
withFEServiceAndHandle(block)
|
||||
withFEServiceAndHandleInc(block)
|
||||
withFEServiceAndHandleIncAndCal(block)
|
||||
}
|
||||
|
||||
test("alter table name") {
|
||||
val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => {
|
||||
val ct = new TExecuteStatementReq(handle,
|
||||
"create table if not exists default.src(key int) using parquet")
|
||||
fe.ExecuteStatement(ct)
|
||||
Thread.sleep(5000)
|
||||
val req = new TExecuteStatementReq(handle, "alter table default.src rename to default.src2")
|
||||
val resp = fe.ExecuteStatement(req)
|
||||
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
|
||||
val tFetchResultsReq =
|
||||
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
val dt = new TExecuteStatementReq(handle, "drop table src2")
|
||||
fe.ExecuteStatement(dt)
|
||||
Thread.sleep(5000)
|
||||
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
|
||||
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
tFetchResultsResp.getResults.getRows.size() should be(0)
|
||||
}
|
||||
withFEServiceAndHandle(block)
|
||||
}
|
||||
|
||||
test("alter table name inc") {
|
||||
val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => {
|
||||
val ct = new TExecuteStatementReq(handle,
|
||||
"create table if not exists default.src3(key int) using parquet")
|
||||
fe.ExecuteStatement(ct)
|
||||
Thread.sleep(5000)
|
||||
val req = new TExecuteStatementReq(handle, "alter table default.src3 rename to default.src4")
|
||||
val resp = fe.ExecuteStatement(req)
|
||||
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
|
||||
val tFetchResultsReq =
|
||||
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
val dt = new TExecuteStatementReq(handle, "drop table src4")
|
||||
fe.ExecuteStatement(dt)
|
||||
Thread.sleep(5000)
|
||||
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
|
||||
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
tFetchResultsResp.getResults.getRows.size() should be(0)
|
||||
}
|
||||
withFEServiceAndHandleInc(block)
|
||||
}
|
||||
|
||||
test("alter table set properties") {
|
||||
val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => {
|
||||
val ct = new TExecuteStatementReq(handle, "create table default.src(key int) using parquet")
|
||||
fe.ExecuteStatement(ct)
|
||||
Thread.sleep(5000)
|
||||
val req = new TExecuteStatementReq(handle,
|
||||
"alter table default.src set tblproperties ('kent'='yao')")
|
||||
val resp = fe.ExecuteStatement(req)
|
||||
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
|
||||
val tFetchResultsReq =
|
||||
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
|
||||
Thread.sleep(5000)
|
||||
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
|
||||
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
tFetchResultsResp.getResults.getRows.size() should be(0)
|
||||
}
|
||||
withFEServiceAndHandle(block)
|
||||
withFEServiceAndHandleInc(block)
|
||||
}
|
||||
|
||||
test("alter table unset properties") {
|
||||
withFEServiceAndHandle { (fe, handle) =>
|
||||
val ct = new TExecuteStatementReq(handle,
|
||||
"create table default.src(key int) using parquet tblproperties ('kent'='yao')")
|
||||
fe.ExecuteStatement(ct)
|
||||
Thread.sleep(5000)
|
||||
val req = new TExecuteStatementReq(handle,
|
||||
"alter table default.src unset tblproperties if exists ('kent', 'yao')")
|
||||
val resp = fe.ExecuteStatement(req)
|
||||
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
|
||||
val tFetchResultsReq =
|
||||
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
|
||||
Thread.sleep(5000)
|
||||
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
|
||||
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
tFetchResultsResp.getResults.getRows.size() should be(0)
|
||||
}
|
||||
}
|
||||
|
||||
test("add jar hdfs") {
|
||||
withFEServiceAndHandle { (fe, handle) =>
|
||||
val req = new TExecuteStatementReq(handle, "add jar hdfs://a/b/test.jar")
|
||||
val resp = fe.ExecuteStatement(req)
|
||||
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
|
||||
val tFetchResultsReq =
|
||||
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
|
||||
Thread.sleep(5000)
|
||||
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
|
||||
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
|
||||
}
|
||||
}
|
||||
|
||||
test("add jar local") {
|
||||
withFEServiceAndHandle { (fe, handle) =>
|
||||
val req = new TExecuteStatementReq(handle, "add jar file://a/b/test.jar")
|
||||
val resp = fe.ExecuteStatement(req)
|
||||
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
|
||||
val tFetchResultsReq =
|
||||
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
|
||||
Thread.sleep(5000)
|
||||
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
|
||||
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
tFetchResultsResp.getResults.getRows.get(0).getColVals.get(0).getI32Val.getValue should be(0)
|
||||
}
|
||||
}
|
||||
|
||||
test("create temporary function") {
|
||||
withFEServiceAndHandle { (fe, handle) =>
|
||||
val req = new TExecuteStatementReq(handle,
|
||||
"create temporary function testfunc as 'testClass' using jar 'hdfs://a/b/test.jar'")
|
||||
val resp = fe.ExecuteStatement(req)
|
||||
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
|
||||
val tFetchResultsReq =
|
||||
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
|
||||
Thread.sleep(5000)
|
||||
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
|
||||
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
|
||||
}
|
||||
}
|
||||
|
||||
test("select") {
|
||||
val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => {
|
||||
val kyuubiSession = server.beService.getSessionManager.getSession(new SessionHandle(handle))
|
||||
kyuubiSession.sparkSession.sql(
|
||||
"create table if not exists default.select_tbl(key int) using parquet")
|
||||
val ct = new TExecuteStatementReq(handle, "select * from default.select_tbl")
|
||||
val tExecuteStatementResp = fe.ExecuteStatement(ct)
|
||||
val statusReq = new TGetOperationStatusReq(tExecuteStatementResp.getOperationHandle)
|
||||
|
||||
while(fe.GetOperationStatus(statusReq)
|
||||
.getOperationState.getValue < TOperationState.FINISHED_STATE.getValue) {
|
||||
Thread.sleep(10)
|
||||
}
|
||||
Thread.sleep(2000)
|
||||
|
||||
val tFetchResultsReq = new TFetchResultsReq(
|
||||
tExecuteStatementResp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
|
||||
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
|
||||
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
tFetchResultsResp.getResults.getRows.size() should be(0)
|
||||
}
|
||||
withFEServiceAndHandleIncAndCal(block)
|
||||
withFEServiceAndHandleInc(block)
|
||||
withFEServiceAndHandle(block)
|
||||
withFEServiceAndHandleAndResultLimit(block)
|
||||
}
|
||||
|
||||
test("select with exception") {
|
||||
val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => {
|
||||
val kyuubiSession = server.beService.getSessionManager.getSession(new SessionHandle(handle))
|
||||
kyuubiSession.sparkSession.sql(
|
||||
"create table if not exists default.select_tbl(key int) using parquet")
|
||||
val ct = new TExecuteStatementReq(handle, "select * from default.select_tbl")
|
||||
val tExecuteStatementResp = fe.ExecuteStatement(ct)
|
||||
val statusReq = new TGetOperationStatusReq(tExecuteStatementResp.getOperationHandle)
|
||||
|
||||
while(fe.GetOperationStatus(statusReq)
|
||||
.getOperationState.getValue < TOperationState.FINISHED_STATE.getValue) {
|
||||
Thread.sleep(10)
|
||||
}
|
||||
Thread.sleep(2000)
|
||||
|
||||
val tFetchResultsReq = new TFetchResultsReq(
|
||||
tExecuteStatementResp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
|
||||
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
|
||||
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
|
||||
}
|
||||
|
||||
withFEServiceAndHandleAndException(block)
|
||||
}
|
||||
|
||||
def withFEServiceAndHandle(block: (FrontendService, TSessionHandle) => Unit): Unit = {
|
||||
val feService = server.feService
|
||||
val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
|
||||
req.setUsername(user)
|
||||
val resp = feService.OpenSession(req)
|
||||
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
val handle = resp.getSessionHandle
|
||||
block(feService, handle)
|
||||
}
|
||||
|
||||
def withFEServiceAndHandleAndResultLimit(
|
||||
block: (FrontendService, TSessionHandle) => Unit): Unit = {
|
||||
val feService = server.feService
|
||||
val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
|
||||
req.setUsername(user)
|
||||
req.setConfiguration(
|
||||
Map("set:hivevar:" + KyuubiConf.OPERATION_RESULT_LIMIT.key -> "1").asJava)
|
||||
val resp = feService.OpenSession(req)
|
||||
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
val handle = resp.getSessionHandle
|
||||
block(feService, handle)
|
||||
}
|
||||
|
||||
def withFEServiceAndHandleAndException(
|
||||
block: (FrontendService, TSessionHandle) => Unit): Unit = {
|
||||
val feService = server.feService
|
||||
val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
|
||||
req.setUsername(user)
|
||||
req.setConfiguration(
|
||||
Map("set:hivevar:" + KyuubiConf.OPERATION_RESULT_LIMIT.key -> "invaild put").asJava)
|
||||
val resp = feService.OpenSession(req)
|
||||
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
val handle = resp.getSessionHandle
|
||||
block(feService, handle)
|
||||
}
|
||||
|
||||
def withFEServiceAndHandleInc(block: (FrontendService, TSessionHandle) => Unit): Unit = {
|
||||
val feService = server.feService
|
||||
val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
|
||||
req.setUsername(user)
|
||||
req.setConfiguration(
|
||||
Map("set:hivevar:" + KyuubiConf.OPERATION_INCREMENTAL_COLLECT.key -> "true").asJava)
|
||||
val resp = feService.OpenSession(req)
|
||||
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
val handle = resp.getSessionHandle
|
||||
block(feService, handle)
|
||||
}
|
||||
|
||||
def withFEServiceAndHandleIncAndCal(block: (FrontendService, TSessionHandle) => Unit): Unit = {
|
||||
val feService = server.feService
|
||||
val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
|
||||
req.setUsername(user)
|
||||
req.setConfiguration(
|
||||
Map("set:hivevar:" + KyuubiConf.OPERATION_INCREMENTAL_COLLECT.key -> "true",
|
||||
"set:hivevar:" + KyuubiConf.OPERATION_INCREMENTAL_RDD_PARTITIONS_LIMIT.key -> "-1")
|
||||
.asJava)
|
||||
val resp = feService.OpenSession(req)
|
||||
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
|
||||
val handle = resp.getSessionHandle
|
||||
block(feService, handle)
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user