From fb1b806fdf3d1b43e543f6c07b6139335d30fbdd Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 22 Oct 2020 17:00:13 +0800 Subject: [PATCH] FrontendServiceSuite get operation status --- .../kyuubi/operation/OperationType.scala | 6 +- .../kyuubi/operation/NoopOperation.scala | 37 ++++- .../operation/NoopOperationManager.scala | 3 +- .../kyuubi/operation/OperationTypeSuite.scala | 6 +- .../kyuubi/service/FrontendServiceSuite.scala | 142 +++++++++++++++++- 5 files changed, 181 insertions(+), 13 deletions(-) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala index 676a5abaa..e4686aa65 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala @@ -42,7 +42,8 @@ object OperationType extends Enumeration { case TOperationType.GET_TABLE_TYPES => GET_TABLE_TYPES case TOperationType.GET_COLUMNS => GET_COLUMNS case TOperationType.GET_FUNCTIONS => GET_FUNCTIONS - case _ => UNKNOWN_OPERATION + case other => + throw new UnsupportedOperationException(s"Unsupported Operation type: ${other.toString}") } } @@ -56,7 +57,8 @@ object OperationType extends Enumeration { case GET_TABLE_TYPES => TOperationType.GET_TABLE_TYPES case GET_COLUMNS => TOperationType.GET_COLUMNS case GET_FUNCTIONS => TOperationType.GET_FUNCTIONS - case _ => TOperationType.UNKNOWN + case other => + throw new UnsupportedOperationException(s"Unsupported Operation type: ${other.toString}") } } } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala index 93131bffc..f44bc396b 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala @@ -23,23 +23,46 @@ import scala.collection.JavaConverters._ import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, TStringColumn, TTableSchema} +import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation import org.apache.kyuubi.operation.OperationType.OperationType import org.apache.kyuubi.session.Session -class NoopOperation(typ: OperationType, session: Session) +class NoopOperation(typ: OperationType, session: Session, shouldFail: Boolean = false) extends AbstractOperation(typ, session) { - override protected def runInternal(): Unit = {} + override protected def runInternal(): Unit = { + setState(OperationState.RUNNING) + if (shouldFail) { + val exception = KyuubiSQLException("noop operation err") + setOperationException(exception) + setState(OperationState.ERROR) + } + setHasResultSet(true) + } - override protected def beforeRun(): Unit = {} + override protected def beforeRun(): Unit = { + setState(OperationState.PENDING) + } - override protected def afterRun(): Unit = {} + override protected def afterRun(): Unit = { + if (!OperationState.isTerminal(state)) { + setState(OperationState.FINISHED) + } - override def cancel(): Unit = {} + } - override def close(): Unit = {} + override def cancel(): Unit = { + setState(OperationState.CANCELED) - override def getResultSetSchema: TTableSchema = new TTableSchema() + } + + override def close(): Unit = { + setState(OperationState.CLOSED) + } + + override def getResultSetSchema: TTableSchema = { + new TTableSchema() + } override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = { val col = TColumn.stringVal(new TStringColumn(Seq(typ.toString).asJava, ByteBuffer.allocate(0))) diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala index f26d344ad..91e23c703 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala @@ -19,6 +19,7 @@ package org.apache.kyuubi.operation import org.apache.hive.service.rpc.thrift.TRowSet +import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation import org.apache.kyuubi.session.Session @@ -57,7 +58,7 @@ class NoopOperationManager extends OperationManager("noop") { schemaName: String, tableName: String, tableTypes: java.util.List[String]): Operation = { - val operation = new NoopOperation(OperationType.GET_TABLES, session) + val operation = new NoopOperation(OperationType.GET_TABLES, session, schemaName == "invalid") addOperation(operation) } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationTypeSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationTypeSuite.scala index c4a55724f..d78df01c7 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationTypeSuite.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationTypeSuite.scala @@ -35,7 +35,8 @@ class OperationTypeSuite extends KyuubiFunSuite { assert(get(TOperationType.GET_TABLE_TYPES) === GET_TABLE_TYPES) assert(get(TOperationType.GET_COLUMNS) === GET_COLUMNS) assert(get(TOperationType.GET_FUNCTIONS) === GET_FUNCTIONS) - assert(get(TOperationType.UNKNOWN) === UNKNOWN_OPERATION) + val e = intercept[UnsupportedOperationException](get(TOperationType.UNKNOWN)) + assert(e.getMessage === "Unsupported Operation type: UNKNOWN") } test("toTOperationType") { @@ -48,6 +49,7 @@ class OperationTypeSuite extends KyuubiFunSuite { assert(to(GET_TABLE_TYPES) === TOperationType.GET_TABLE_TYPES) assert(to(GET_COLUMNS) === TOperationType.GET_COLUMNS) assert(to(GET_FUNCTIONS) === TOperationType.GET_FUNCTIONS) - assert(to(UNKNOWN_OPERATION) === TOperationType.UNKNOWN) + val e = intercept[UnsupportedOperationException](to(UNKNOWN_OPERATION)) + assert(e.getMessage === "Unsupported Operation type: UNKNOWN_OPERATION") } } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/FrontendServiceSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/FrontendServiceSuite.scala index 84d1dd836..a327a5885 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/FrontendServiceSuite.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/FrontendServiceSuite.scala @@ -19,13 +19,15 @@ package org.apache.kyuubi.service import scala.collection.JavaConverters._ -import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TFetchOrientation, TFetchResultsReq, TGetCatalogsReq, TGetSchemasReq, TOpenSessionReq, TOperationHandle, TOperationType, TSessionHandle, TStatus, TStatusCode} +import org.apache.hive.service.rpc.thrift._ import org.apache.thrift.protocol.TBinaryProtocol import org.apache.thrift.transport.TSocket import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException, Utils} import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.operation.{OperationHandle, OperationType} import org.apache.kyuubi.service.authentication.PlainSASLHelper +import org.apache.kyuubi.session.SessionHandle class FrontendServiceSuite extends KyuubiFunSuite { @@ -121,6 +123,13 @@ class FrontendServiceSuite extends KyuubiFunSuite { assert(opHandle.getOperationType === TOperationType.GET_CATALOGS) assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) checkOperationResult(client, opHandle) + req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle) + + val resp1 = client.GetCatalogs(req) + assert(resp1.getOperationHandle === null) + assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) + assert(resp1.getStatus.getSqlState === null) + assert(resp1.getStatus.getErrorMessage startsWith "Invalid SessionHandle") } } @@ -132,6 +141,137 @@ class FrontendServiceSuite extends KyuubiFunSuite { assert(opHandle.getOperationType === TOperationType.GET_SCHEMAS) assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) checkOperationResult(client, opHandle) + + req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle) + val resp1 = client.GetSchemas(req) + assert(resp1.getOperationHandle === null) + assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) + assert(resp1.getStatus.getSqlState === null) + assert(resp1.getStatus.getErrorMessage startsWith "Invalid SessionHandle") + } + } + + test("get tables") { + withSessionHandle { (client, handle) => + val req = new TGetTablesReq(handle) + val resp = client.GetTables(req) + val opHandle = resp.getOperationHandle + assert(opHandle.getOperationType === TOperationType.GET_TABLES) + assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + checkOperationResult(client, opHandle) + + req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle) + val resp1 = client.GetTables(req) + assert(resp1.getOperationHandle === null) + assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) + assert(resp1.getStatus.getSqlState === null) + assert(resp1.getStatus.getErrorMessage startsWith "Invalid SessionHandle") + } + } + + test("get table types") { + withSessionHandle { (client, handle) => + val req = new TGetTableTypesReq(handle) + val resp = client.GetTableTypes(req) + val opHandle = resp.getOperationHandle + assert(opHandle.getOperationType === TOperationType.GET_TABLE_TYPES) + assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + checkOperationResult(client, opHandle) + + req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle) + val resp1 = client.GetTableTypes(req) + assert(resp1.getOperationHandle === null) + assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) + assert(resp1.getStatus.getSqlState === null) + assert(resp1.getStatus.getErrorMessage startsWith "Invalid SessionHandle") + } + } + + test("get columns") { + withSessionHandle { (client, handle) => + val req = new TGetColumnsReq(handle) + val resp = client.GetColumns(req) + val opHandle = resp.getOperationHandle + assert(opHandle.getOperationType === TOperationType.GET_COLUMNS) + assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + checkOperationResult(client, opHandle) + + req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle) + val resp1 = client.GetColumns(req) + assert(resp1.getOperationHandle === null) + assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) + assert(resp1.getStatus.getSqlState === null) + assert(resp1.getStatus.getErrorMessage startsWith "Invalid SessionHandle") + } + } + + test("get functions") { + withSessionHandle { (client, handle) => + val req = new TGetFunctionsReq(handle, "sum") + val resp = client.GetFunctions(req) + val opHandle = resp.getOperationHandle + assert(opHandle.getOperationType === TOperationType.GET_FUNCTIONS) + assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + checkOperationResult(client, opHandle) + + req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle) + val resp1 = client.GetFunctions(req) + assert(resp1.getOperationHandle === null) + assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) + assert(resp1.getStatus.getSqlState === null) + assert(resp1.getStatus.getErrorMessage startsWith "Invalid SessionHandle") + } + } + + test("get primary keys") { + withSessionHandle { (client, handle) => + val req = new TGetPrimaryKeysReq(handle) + val resp = client.GetPrimaryKeys(req) + assert(resp.getOperationHandle === null) + assert(resp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) + assert(resp.getStatus.getSqlState === null) + assert(resp.getStatus.getErrorMessage startsWith "Feature is not available") + } + } + + + test("get cross reference") { + withSessionHandle { (client, handle) => + val req = new TGetCrossReferenceReq(handle) + val resp = client.GetCrossReference(req) + assert(resp.getOperationHandle === null) + assert(resp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) + assert(resp.getStatus.getSqlState === null) + assert(resp.getStatus.getErrorMessage startsWith "Feature is not available") + } + } + + test("get operation status") { + withSessionHandle { (client, handle) => + val opHandle = + OperationHandle(OperationType.EXECUTE_STATEMENT, BackendService.SERVER_VERSION) + val req = new TGetOperationStatusReq(opHandle) + val resp = client.GetOperationStatus(req) + assert(resp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) + assert(resp.getStatus.getSqlState === null) + assert(resp.getStatus.getErrorMessage startsWith "Invalid OperationHandle") + + val req1 = new TGetTablesReq(handle) + val resp1 = client.GetTables(req1) + val opHandle1 = resp1.getOperationHandle + val req2 = new TGetOperationStatusReq(opHandle1) + val resp2 = client.GetOperationStatus(req2) + assert(resp2.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + assert(resp2.getOperationState === TOperationState.FINISHED_STATE) + + req1.setSchemaName("invalid") + val resp3 = client.GetTables(req1) + req2.setOperationHandle(resp3.getOperationHandle) + val resp4 = client.GetOperationStatus(req2) + assert(resp4.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + assert(resp4.getOperationState === TOperationState.ERROR_STATE) + assert(resp4.getErrorMessage === "noop operation err") + } } }