diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala index 918cfc571..1e8563c27 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala @@ -98,10 +98,3 @@ trait BackendService { def sessionManager: SessionManager } - -object BackendService { - final val SERVER_VERSION = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10 -} - - - diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/FrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/FrontendService.scala index effaed750..dda8abe24 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/FrontendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/FrontendService.scala @@ -160,7 +160,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab @throws[KyuubiSQLException] private def getSessionHandle(req: TOpenSessionReq, res: TOpenSessionResp): SessionHandle = { - val protocol = getMinVersion(BackendService.SERVER_VERSION, req.getClient_protocol) + val protocol = getMinVersion(SERVER_VERSION, req.getClient_protocol) res.setServerProtocolVersion(protocol) val userName = getUserName(req) val ipAddress = authFactory.getIpAddress.orNull @@ -535,6 +535,8 @@ object FrontendService { final val CURRENT_SERVER_CONTEXT = new ThreadLocal[FeServiceServerContext]() + final val SERVER_VERSION = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10 + class FeServiceServerContext extends ServerContext { private var sessionHandle: SessionHandle = _ diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/FrontendServiceSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/FrontendServiceSuite.scala index 468f886cb..ea06eeffe 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/FrontendServiceSuite.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/FrontendServiceSuite.scala @@ -26,6 +26,7 @@ import org.apache.thrift.transport.TSocket import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.operation.{OperationHandle, OperationType} +import org.apache.kyuubi.service.FrontendService.{FeServiceServerContext, SERVER_VERSION} import org.apache.kyuubi.service.authentication.PlainSASLHelper import org.apache.kyuubi.session.SessionHandle @@ -115,6 +116,15 @@ class FrontendServiceSuite extends KyuubiFunSuite { } } + test("fe service server context") { + withSessionHandle { (_, handle) => + val context = new FeServiceServerContext() + val handle1 = SessionHandle(handle) + context.setSessionHandle(handle1) + assert(context.getSessionHandle.toTSessionHandle === handle) + } + } + test("get info") { withSessionHandle { (client, handle) => val req = new TGetInfoReq() @@ -155,7 +165,7 @@ class FrontendServiceSuite extends KyuubiFunSuite { assert(opHandle.getOperationType === TOperationType.GET_CATALOGS) assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) checkOperationResult(client, opHandle) - req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle) + req.setSessionHandle(SessionHandle(SERVER_VERSION).toTSessionHandle) val resp1 = client.GetCatalogs(req) assert(resp1.getOperationHandle === null) @@ -174,7 +184,7 @@ class FrontendServiceSuite extends KyuubiFunSuite { assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) checkOperationResult(client, opHandle) - req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle) + req.setSessionHandle(SessionHandle(SERVER_VERSION).toTSessionHandle) val resp1 = client.GetSchemas(req) assert(resp1.getOperationHandle === null) assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) @@ -192,7 +202,7 @@ class FrontendServiceSuite extends KyuubiFunSuite { assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) checkOperationResult(client, opHandle) - req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle) + req.setSessionHandle(SessionHandle(SERVER_VERSION).toTSessionHandle) val resp1 = client.GetTables(req) assert(resp1.getOperationHandle === null) assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) @@ -210,7 +220,7 @@ class FrontendServiceSuite extends KyuubiFunSuite { assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) checkOperationResult(client, opHandle) - req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle) + req.setSessionHandle(SessionHandle(SERVER_VERSION).toTSessionHandle) val resp1 = client.GetTableTypes(req) assert(resp1.getOperationHandle === null) assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) @@ -228,7 +238,7 @@ class FrontendServiceSuite extends KyuubiFunSuite { assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) checkOperationResult(client, opHandle) - req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle) + req.setSessionHandle(SessionHandle(SERVER_VERSION).toTSessionHandle) val resp1 = client.GetColumns(req) assert(resp1.getOperationHandle === null) assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) @@ -246,7 +256,7 @@ class FrontendServiceSuite extends KyuubiFunSuite { assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) checkOperationResult(client, opHandle) - req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle) + req.setSessionHandle(SessionHandle(SERVER_VERSION).toTSessionHandle) val resp1 = client.GetFunctions(req) assert(resp1.getOperationHandle === null) assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) @@ -281,7 +291,7 @@ class FrontendServiceSuite extends KyuubiFunSuite { test("get operation status") { withSessionHandle { (client, handle) => val opHandle = - OperationHandle(OperationType.EXECUTE_STATEMENT, BackendService.SERVER_VERSION) + OperationHandle(OperationType.EXECUTE_STATEMENT, SERVER_VERSION) val req = new TGetOperationStatusReq(opHandle) val resp = client.GetOperationStatus(req) assert(resp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala index 8527aea80..88574c464 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala @@ -49,171 +49,6 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu super.afterAll() } - test("get table types") { - withFEServiceAndHandle { case (fe, handle) => - val req = new TGetTableTypesReq(handle) - val resp = fe.GetTableTypes(req) - val req2 = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - val resp2 = fe.FetchResults(req2) - val rows = resp2.getResults.getRows - rows.get(0).getColVals.get(0).getStringVal.getValue should be("MANAGED") - rows.get(1).getColVals.get(0).getStringVal.getValue should be("VIEW") - rows.get(2).getColVals.get(0).getStringVal.getValue should be("EXTERNAL") - val closeReq = new TCloseSessionReq(handle) - fe.CloseSession(closeReq) - val afterCloseResp = fe.GetTableTypes(req) - afterCloseResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) - } - } - - test("get type info") { - withFEServiceAndHandle { case (fe, handle) => - val req = new TGetTypeInfoReq(handle) - val resp = fe.GetTypeInfo(req) - val req2 = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - val resp2 = fe.FetchResults(req2) - val rows = resp2.getResults.getRows - rows.size() should be(15) - rows.get(0).getColVals.get(0).getStringVal.getValue should be("void") - rows.get(1).getColVals.get(0).getStringVal.getValue should be("boolean") - val closeReq = new TCloseSessionReq(handle) - fe.CloseSession(closeReq) - val afterCloseResp = fe.GetTypeInfo(req) - afterCloseResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) - } - } - - test("get functions") { - withFEServiceAndHandle { case (fe, handle) => - val req = new TGetFunctionsReq(handle, null) - val resp = fe.GetFunctions(req) - val req2 = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - val resp2 = fe.FetchResults(req2) - val rows = resp2.getResults.getRows - assert(rows.size() === 50) - assert(rows.get(0).getColVals.get(2).getStringVal.getValue.nonEmpty) - val closeReq = new TCloseSessionReq(handle) - fe.CloseSession(closeReq) - val afterCloseResp = fe.GetFunctions(req) - afterCloseResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) - } - } - - test("get schemas") { - withFEServiceAndHandle { case (fe, handle) => - val schemasReq = new TGetSchemasReq(handle) - schemasReq.setCatalogName("") - schemasReq.setSchemaName("*") - val resp = fe.GetSchemas(schemasReq) - val req2 = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - val resp2 = fe.FetchResults(req2) - val rows = resp2.getResults.getRows - rows.size() should be(1) - rows.get(0).getColVals.get(0).getStringVal.getValue should be("default") - val closeReq = new TCloseSessionReq(handle) - fe.CloseSession(closeReq) - val afterCloseResp = fe.GetSchemas(schemasReq) - afterCloseResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) - } - } - - test("get tables") { - withFEServiceAndHandle { case (fe, handle) => - val req = new TGetTablesReq(handle) - req.setCatalogName("") - req.setSchemaName("*") - req.setTableName("*") - val kyuubiSession = server.beService.getSessionManager.getSession(new SessionHandle(handle)) - kyuubiSession.sparkSession.sql("create table get_tables(key int, value string) using parquet") - val resp = fe.GetTables(req) - val req2 = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - val resp2 = fe.FetchResults(req2) - val rows = resp2.getResults.getRows - rows.get(0).getColVals.get(2).getStringVal.getValue should be("get_tables") - - // schema not match - val req3 = new TGetTablesReq(handle) - req3.setSchemaName("a") - req3.setTableName("*") - val resp3 = fe.GetTables(req3) - val req4 = new TFetchResultsReq(resp3.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - val resp4 = fe.FetchResults(req4) - val rows2 = resp4.getResults.getRows - rows2.size() should be(0) - - // table name not match - val req5 = new TGetTablesReq(handle) - req5.setSchemaName("*") - req5.setTableName("get_tables_2") - val resp5 = fe.GetTables(req3) - val req6 = new TFetchResultsReq(resp5.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - val resp6 = fe.FetchResults(req6) - val rows3 = resp6.getResults.getRows - rows3.size() should be(0) - - // table type not match - val req7 = new TGetTablesReq(handle) - req7.setCatalogName("") - req7.setSchemaName("*") - req7.setTableName("*") - req7.setTableTypes(Seq("VIEW").asJava) - val resp7 = fe.GetTables(req7) - val req8 = new TFetchResultsReq(resp7.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - val resp8 = fe.FetchResults(req8) - val rows4 = resp8.getResults.getRows - rows4.size() should be(0) - - val closeReq = new TCloseSessionReq(handle) - fe.CloseSession(closeReq) - val afterCloseResp = fe.GetTables(req) - afterCloseResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) - } - } - - test("get columns") { - withFEServiceAndHandle { case (fe, handle) => - val kyuubiSession = server.beService.getSessionManager.getSession(new SessionHandle(handle)) - val tableName = "get_columns" - kyuubiSession.sparkSession - .sql("create table " + tableName + "(key int, value string) using parquet") - val req = new TGetColumnsReq(handle) - req.setCatalogName("") - req.setSchemaName("*") - req.setTableName(tableName) - req.setColumnName(null) - val resp = fe.GetColumns(req) - val operation = server.beService.getSessionManager.getOperationMgr - .getOperation(new OperationHandle(resp.getOperationHandle)) - assert(!operation.isTimedOut) - assert(!operation.shouldRunAsync) - val req2 = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - val resp2 = fe.FetchResults(req2) - val rows = resp2.getResults.getRows - intercept[KyuubiSQLException](operation.cancel()) - rows.size() should be(2) - rows.get(0).getColVals.get(1).getStringVal.getValue should be("default") - rows.get(0).getColVals.get(2).getStringVal.getValue should be(tableName) - rows.get(0).getColVals.get(3).getStringVal.getValue should be("key") - rows.get(0).getColVals.get(4).getI32Val.getValue should be(java.sql.Types.INTEGER) - - val req3 = new TGetColumnsReq(handle) - req3.setCatalogName("") - req3.setSchemaName("*") - req3.setTableName(tableName) - req3.setColumnName("key") - val resp3 = fe.GetColumns(req3) - val req4 = new TFetchResultsReq(resp3.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) - val resp4 = fe.FetchResults(req4) - val rows2 = resp4.getResults.getRows - rows2.size() should be(1) - - val closeReq = new TCloseSessionReq(handle) - fe.CloseSession(closeReq) - val afterCloseResp = fe.GetColumns(req) - afterCloseResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) - } - } - test("get port num") { val feService = new FrontendService(server.beService) feService.getPortNumber should be(0) @@ -228,13 +63,6 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu feService.getServerIPAddress should not be null } - test("fe service server context") { - withFEServiceAndHandle { case (fe, handle) => - val context = new fe.FeServiceServerContext() - context.setSessionHandle(new SessionHandle(handle)) - context.getSessionHandle.toTSessionHandle should be(handle) - } - } test("fe tserver event handler") { withFEServiceAndHandle { case (fe, handle) =>