FrontendServiceSuite nit
This commit is contained in:
parent
767a296594
commit
1f52dbd656
@ -98,10 +98,3 @@ trait BackendService {
|
||||
|
||||
def sessionManager: SessionManager
|
||||
}
|
||||
|
||||
object BackendService {
|
||||
final val SERVER_VERSION = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@ -160,7 +160,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
|
||||
|
||||
@throws[KyuubiSQLException]
|
||||
private def getSessionHandle(req: TOpenSessionReq, res: TOpenSessionResp): SessionHandle = {
|
||||
val protocol = getMinVersion(BackendService.SERVER_VERSION, req.getClient_protocol)
|
||||
val protocol = getMinVersion(SERVER_VERSION, req.getClient_protocol)
|
||||
res.setServerProtocolVersion(protocol)
|
||||
val userName = getUserName(req)
|
||||
val ipAddress = authFactory.getIpAddress.orNull
|
||||
@ -535,6 +535,8 @@ object FrontendService {
|
||||
|
||||
final val CURRENT_SERVER_CONTEXT = new ThreadLocal[FeServiceServerContext]()
|
||||
|
||||
final val SERVER_VERSION = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10
|
||||
|
||||
class FeServiceServerContext extends ServerContext {
|
||||
private var sessionHandle: SessionHandle = _
|
||||
|
||||
|
||||
@ -26,6 +26,7 @@ import org.apache.thrift.transport.TSocket
|
||||
import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException, Utils}
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.operation.{OperationHandle, OperationType}
|
||||
import org.apache.kyuubi.service.FrontendService.{FeServiceServerContext, SERVER_VERSION}
|
||||
import org.apache.kyuubi.service.authentication.PlainSASLHelper
|
||||
import org.apache.kyuubi.session.SessionHandle
|
||||
|
||||
@ -115,6 +116,15 @@ class FrontendServiceSuite extends KyuubiFunSuite {
|
||||
}
|
||||
}
|
||||
|
||||
test("fe service server context") {
|
||||
withSessionHandle { (_, handle) =>
|
||||
val context = new FeServiceServerContext()
|
||||
val handle1 = SessionHandle(handle)
|
||||
context.setSessionHandle(handle1)
|
||||
assert(context.getSessionHandle.toTSessionHandle === handle)
|
||||
}
|
||||
}
|
||||
|
||||
test("get info") {
|
||||
withSessionHandle { (client, handle) =>
|
||||
val req = new TGetInfoReq()
|
||||
@ -155,7 +165,7 @@ class FrontendServiceSuite extends KyuubiFunSuite {
|
||||
assert(opHandle.getOperationType === TOperationType.GET_CATALOGS)
|
||||
assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
|
||||
checkOperationResult(client, opHandle)
|
||||
req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle)
|
||||
req.setSessionHandle(SessionHandle(SERVER_VERSION).toTSessionHandle)
|
||||
|
||||
val resp1 = client.GetCatalogs(req)
|
||||
assert(resp1.getOperationHandle === null)
|
||||
@ -174,7 +184,7 @@ class FrontendServiceSuite extends KyuubiFunSuite {
|
||||
assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
|
||||
checkOperationResult(client, opHandle)
|
||||
|
||||
req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle)
|
||||
req.setSessionHandle(SessionHandle(SERVER_VERSION).toTSessionHandle)
|
||||
val resp1 = client.GetSchemas(req)
|
||||
assert(resp1.getOperationHandle === null)
|
||||
assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
|
||||
@ -192,7 +202,7 @@ class FrontendServiceSuite extends KyuubiFunSuite {
|
||||
assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
|
||||
checkOperationResult(client, opHandle)
|
||||
|
||||
req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle)
|
||||
req.setSessionHandle(SessionHandle(SERVER_VERSION).toTSessionHandle)
|
||||
val resp1 = client.GetTables(req)
|
||||
assert(resp1.getOperationHandle === null)
|
||||
assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
|
||||
@ -210,7 +220,7 @@ class FrontendServiceSuite extends KyuubiFunSuite {
|
||||
assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
|
||||
checkOperationResult(client, opHandle)
|
||||
|
||||
req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle)
|
||||
req.setSessionHandle(SessionHandle(SERVER_VERSION).toTSessionHandle)
|
||||
val resp1 = client.GetTableTypes(req)
|
||||
assert(resp1.getOperationHandle === null)
|
||||
assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
|
||||
@ -228,7 +238,7 @@ class FrontendServiceSuite extends KyuubiFunSuite {
|
||||
assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
|
||||
checkOperationResult(client, opHandle)
|
||||
|
||||
req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle)
|
||||
req.setSessionHandle(SessionHandle(SERVER_VERSION).toTSessionHandle)
|
||||
val resp1 = client.GetColumns(req)
|
||||
assert(resp1.getOperationHandle === null)
|
||||
assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
|
||||
@ -246,7 +256,7 @@ class FrontendServiceSuite extends KyuubiFunSuite {
|
||||
assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
|
||||
checkOperationResult(client, opHandle)
|
||||
|
||||
req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle)
|
||||
req.setSessionHandle(SessionHandle(SERVER_VERSION).toTSessionHandle)
|
||||
val resp1 = client.GetFunctions(req)
|
||||
assert(resp1.getOperationHandle === null)
|
||||
assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
|
||||
@ -281,7 +291,7 @@ class FrontendServiceSuite extends KyuubiFunSuite {
|
||||
test("get operation status") {
|
||||
withSessionHandle { (client, handle) =>
|
||||
val opHandle =
|
||||
OperationHandle(OperationType.EXECUTE_STATEMENT, BackendService.SERVER_VERSION)
|
||||
OperationHandle(OperationType.EXECUTE_STATEMENT, SERVER_VERSION)
|
||||
val req = new TGetOperationStatusReq(opHandle)
|
||||
val resp = client.GetOperationStatus(req)
|
||||
assert(resp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
|
||||
|
||||
@ -49,171 +49,6 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
|
||||
super.afterAll()
|
||||
}
|
||||
|
||||
test("get table types") {
|
||||
withFEServiceAndHandle { case (fe, handle) =>
|
||||
val req = new TGetTableTypesReq(handle)
|
||||
val resp = fe.GetTableTypes(req)
|
||||
val req2 = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
val resp2 = fe.FetchResults(req2)
|
||||
val rows = resp2.getResults.getRows
|
||||
rows.get(0).getColVals.get(0).getStringVal.getValue should be("MANAGED")
|
||||
rows.get(1).getColVals.get(0).getStringVal.getValue should be("VIEW")
|
||||
rows.get(2).getColVals.get(0).getStringVal.getValue should be("EXTERNAL")
|
||||
val closeReq = new TCloseSessionReq(handle)
|
||||
fe.CloseSession(closeReq)
|
||||
val afterCloseResp = fe.GetTableTypes(req)
|
||||
afterCloseResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
|
||||
}
|
||||
}
|
||||
|
||||
test("get type info") {
|
||||
withFEServiceAndHandle { case (fe, handle) =>
|
||||
val req = new TGetTypeInfoReq(handle)
|
||||
val resp = fe.GetTypeInfo(req)
|
||||
val req2 = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
val resp2 = fe.FetchResults(req2)
|
||||
val rows = resp2.getResults.getRows
|
||||
rows.size() should be(15)
|
||||
rows.get(0).getColVals.get(0).getStringVal.getValue should be("void")
|
||||
rows.get(1).getColVals.get(0).getStringVal.getValue should be("boolean")
|
||||
val closeReq = new TCloseSessionReq(handle)
|
||||
fe.CloseSession(closeReq)
|
||||
val afterCloseResp = fe.GetTypeInfo(req)
|
||||
afterCloseResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
|
||||
}
|
||||
}
|
||||
|
||||
test("get functions") {
|
||||
withFEServiceAndHandle { case (fe, handle) =>
|
||||
val req = new TGetFunctionsReq(handle, null)
|
||||
val resp = fe.GetFunctions(req)
|
||||
val req2 = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
val resp2 = fe.FetchResults(req2)
|
||||
val rows = resp2.getResults.getRows
|
||||
assert(rows.size() === 50)
|
||||
assert(rows.get(0).getColVals.get(2).getStringVal.getValue.nonEmpty)
|
||||
val closeReq = new TCloseSessionReq(handle)
|
||||
fe.CloseSession(closeReq)
|
||||
val afterCloseResp = fe.GetFunctions(req)
|
||||
afterCloseResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
|
||||
}
|
||||
}
|
||||
|
||||
test("get schemas") {
|
||||
withFEServiceAndHandle { case (fe, handle) =>
|
||||
val schemasReq = new TGetSchemasReq(handle)
|
||||
schemasReq.setCatalogName("")
|
||||
schemasReq.setSchemaName("*")
|
||||
val resp = fe.GetSchemas(schemasReq)
|
||||
val req2 = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
val resp2 = fe.FetchResults(req2)
|
||||
val rows = resp2.getResults.getRows
|
||||
rows.size() should be(1)
|
||||
rows.get(0).getColVals.get(0).getStringVal.getValue should be("default")
|
||||
val closeReq = new TCloseSessionReq(handle)
|
||||
fe.CloseSession(closeReq)
|
||||
val afterCloseResp = fe.GetSchemas(schemasReq)
|
||||
afterCloseResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
|
||||
}
|
||||
}
|
||||
|
||||
test("get tables") {
|
||||
withFEServiceAndHandle { case (fe, handle) =>
|
||||
val req = new TGetTablesReq(handle)
|
||||
req.setCatalogName("")
|
||||
req.setSchemaName("*")
|
||||
req.setTableName("*")
|
||||
val kyuubiSession = server.beService.getSessionManager.getSession(new SessionHandle(handle))
|
||||
kyuubiSession.sparkSession.sql("create table get_tables(key int, value string) using parquet")
|
||||
val resp = fe.GetTables(req)
|
||||
val req2 = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
val resp2 = fe.FetchResults(req2)
|
||||
val rows = resp2.getResults.getRows
|
||||
rows.get(0).getColVals.get(2).getStringVal.getValue should be("get_tables")
|
||||
|
||||
// schema not match
|
||||
val req3 = new TGetTablesReq(handle)
|
||||
req3.setSchemaName("a")
|
||||
req3.setTableName("*")
|
||||
val resp3 = fe.GetTables(req3)
|
||||
val req4 = new TFetchResultsReq(resp3.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
val resp4 = fe.FetchResults(req4)
|
||||
val rows2 = resp4.getResults.getRows
|
||||
rows2.size() should be(0)
|
||||
|
||||
// table name not match
|
||||
val req5 = new TGetTablesReq(handle)
|
||||
req5.setSchemaName("*")
|
||||
req5.setTableName("get_tables_2")
|
||||
val resp5 = fe.GetTables(req3)
|
||||
val req6 = new TFetchResultsReq(resp5.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
val resp6 = fe.FetchResults(req6)
|
||||
val rows3 = resp6.getResults.getRows
|
||||
rows3.size() should be(0)
|
||||
|
||||
// table type not match
|
||||
val req7 = new TGetTablesReq(handle)
|
||||
req7.setCatalogName("")
|
||||
req7.setSchemaName("*")
|
||||
req7.setTableName("*")
|
||||
req7.setTableTypes(Seq("VIEW").asJava)
|
||||
val resp7 = fe.GetTables(req7)
|
||||
val req8 = new TFetchResultsReq(resp7.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
val resp8 = fe.FetchResults(req8)
|
||||
val rows4 = resp8.getResults.getRows
|
||||
rows4.size() should be(0)
|
||||
|
||||
val closeReq = new TCloseSessionReq(handle)
|
||||
fe.CloseSession(closeReq)
|
||||
val afterCloseResp = fe.GetTables(req)
|
||||
afterCloseResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
|
||||
}
|
||||
}
|
||||
|
||||
test("get columns") {
|
||||
withFEServiceAndHandle { case (fe, handle) =>
|
||||
val kyuubiSession = server.beService.getSessionManager.getSession(new SessionHandle(handle))
|
||||
val tableName = "get_columns"
|
||||
kyuubiSession.sparkSession
|
||||
.sql("create table " + tableName + "(key int, value string) using parquet")
|
||||
val req = new TGetColumnsReq(handle)
|
||||
req.setCatalogName("")
|
||||
req.setSchemaName("*")
|
||||
req.setTableName(tableName)
|
||||
req.setColumnName(null)
|
||||
val resp = fe.GetColumns(req)
|
||||
val operation = server.beService.getSessionManager.getOperationMgr
|
||||
.getOperation(new OperationHandle(resp.getOperationHandle))
|
||||
assert(!operation.isTimedOut)
|
||||
assert(!operation.shouldRunAsync)
|
||||
val req2 = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
val resp2 = fe.FetchResults(req2)
|
||||
val rows = resp2.getResults.getRows
|
||||
intercept[KyuubiSQLException](operation.cancel())
|
||||
rows.size() should be(2)
|
||||
rows.get(0).getColVals.get(1).getStringVal.getValue should be("default")
|
||||
rows.get(0).getColVals.get(2).getStringVal.getValue should be(tableName)
|
||||
rows.get(0).getColVals.get(3).getStringVal.getValue should be("key")
|
||||
rows.get(0).getColVals.get(4).getI32Val.getValue should be(java.sql.Types.INTEGER)
|
||||
|
||||
val req3 = new TGetColumnsReq(handle)
|
||||
req3.setCatalogName("")
|
||||
req3.setSchemaName("*")
|
||||
req3.setTableName(tableName)
|
||||
req3.setColumnName("key")
|
||||
val resp3 = fe.GetColumns(req3)
|
||||
val req4 = new TFetchResultsReq(resp3.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
val resp4 = fe.FetchResults(req4)
|
||||
val rows2 = resp4.getResults.getRows
|
||||
rows2.size() should be(1)
|
||||
|
||||
val closeReq = new TCloseSessionReq(handle)
|
||||
fe.CloseSession(closeReq)
|
||||
val afterCloseResp = fe.GetColumns(req)
|
||||
afterCloseResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
|
||||
}
|
||||
}
|
||||
|
||||
test("get port num") {
|
||||
val feService = new FrontendService(server.beService)
|
||||
feService.getPortNumber should be(0)
|
||||
@ -228,13 +63,6 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
|
||||
feService.getServerIPAddress should not be null
|
||||
}
|
||||
|
||||
test("fe service server context") {
|
||||
withFEServiceAndHandle { case (fe, handle) =>
|
||||
val context = new fe.FeServiceServerContext()
|
||||
context.setSessionHandle(new SessionHandle(handle))
|
||||
context.getSessionHandle.toTSessionHandle should be(handle)
|
||||
}
|
||||
}
|
||||
|
||||
test("fe tserver event handler") {
|
||||
withFEServiceAndHandle { case (fe, handle) =>
|
||||
|
||||
Loading…
Reference in New Issue
Block a user