FrontendServiceSuite get operation status

This commit is contained in:
Kent Yao 2020-10-22 17:00:13 +08:00
parent 9e2c092fc4
commit fb1b806fdf
5 changed files with 181 additions and 13 deletions

View File

@ -42,7 +42,8 @@ object OperationType extends Enumeration {
case TOperationType.GET_TABLE_TYPES => GET_TABLE_TYPES
case TOperationType.GET_COLUMNS => GET_COLUMNS
case TOperationType.GET_FUNCTIONS => GET_FUNCTIONS
case _ => UNKNOWN_OPERATION
case other =>
throw new UnsupportedOperationException(s"Unsupported Operation type: ${other.toString}")
}
}
@ -56,7 +57,8 @@ object OperationType extends Enumeration {
case GET_TABLE_TYPES => TOperationType.GET_TABLE_TYPES
case GET_COLUMNS => TOperationType.GET_COLUMNS
case GET_FUNCTIONS => TOperationType.GET_FUNCTIONS
case _ => TOperationType.UNKNOWN
case other =>
throw new UnsupportedOperationException(s"Unsupported Operation type: ${other.toString}")
}
}
}

View File

@ -23,23 +23,46 @@ import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, TStringColumn, TTableSchema}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationType.OperationType
import org.apache.kyuubi.session.Session
class NoopOperation(typ: OperationType, session: Session)
class NoopOperation(typ: OperationType, session: Session, shouldFail: Boolean = false)
extends AbstractOperation(typ, session) {
override protected def runInternal(): Unit = {}
override protected def runInternal(): Unit = {
setState(OperationState.RUNNING)
if (shouldFail) {
val exception = KyuubiSQLException("noop operation err")
setOperationException(exception)
setState(OperationState.ERROR)
}
setHasResultSet(true)
}
override protected def beforeRun(): Unit = {}
override protected def beforeRun(): Unit = {
setState(OperationState.PENDING)
}
override protected def afterRun(): Unit = {}
override protected def afterRun(): Unit = {
if (!OperationState.isTerminal(state)) {
setState(OperationState.FINISHED)
}
override def cancel(): Unit = {}
}
override def close(): Unit = {}
override def cancel(): Unit = {
setState(OperationState.CANCELED)
override def getResultSetSchema: TTableSchema = new TTableSchema()
}
override def close(): Unit = {
setState(OperationState.CLOSED)
}
override def getResultSetSchema: TTableSchema = {
new TTableSchema()
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
val col = TColumn.stringVal(new TStringColumn(Seq(typ.toString).asJava, ByteBuffer.allocate(0)))

View File

@ -19,6 +19,7 @@ package org.apache.kyuubi.operation
import org.apache.hive.service.rpc.thrift.TRowSet
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.Session
@ -57,7 +58,7 @@ class NoopOperationManager extends OperationManager("noop") {
schemaName: String,
tableName: String,
tableTypes: java.util.List[String]): Operation = {
val operation = new NoopOperation(OperationType.GET_TABLES, session)
val operation = new NoopOperation(OperationType.GET_TABLES, session, schemaName == "invalid")
addOperation(operation)
}

View File

@ -35,7 +35,8 @@ class OperationTypeSuite extends KyuubiFunSuite {
assert(get(TOperationType.GET_TABLE_TYPES) === GET_TABLE_TYPES)
assert(get(TOperationType.GET_COLUMNS) === GET_COLUMNS)
assert(get(TOperationType.GET_FUNCTIONS) === GET_FUNCTIONS)
assert(get(TOperationType.UNKNOWN) === UNKNOWN_OPERATION)
val e = intercept[UnsupportedOperationException](get(TOperationType.UNKNOWN))
assert(e.getMessage === "Unsupported Operation type: UNKNOWN")
}
test("toTOperationType") {
@ -48,6 +49,7 @@ class OperationTypeSuite extends KyuubiFunSuite {
assert(to(GET_TABLE_TYPES) === TOperationType.GET_TABLE_TYPES)
assert(to(GET_COLUMNS) === TOperationType.GET_COLUMNS)
assert(to(GET_FUNCTIONS) === TOperationType.GET_FUNCTIONS)
assert(to(UNKNOWN_OPERATION) === TOperationType.UNKNOWN)
val e = intercept[UnsupportedOperationException](to(UNKNOWN_OPERATION))
assert(e.getMessage === "Unsupported Operation type: UNKNOWN_OPERATION")
}
}

View File

@ -19,13 +19,15 @@ package org.apache.kyuubi.service
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TFetchOrientation, TFetchResultsReq, TGetCatalogsReq, TGetSchemasReq, TOpenSessionReq, TOperationHandle, TOperationType, TSessionHandle, TStatus, TStatusCode}
import org.apache.hive.service.rpc.thrift._
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.TSocket
import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.{OperationHandle, OperationType}
import org.apache.kyuubi.service.authentication.PlainSASLHelper
import org.apache.kyuubi.session.SessionHandle
class FrontendServiceSuite extends KyuubiFunSuite {
@ -121,6 +123,13 @@ class FrontendServiceSuite extends KyuubiFunSuite {
assert(opHandle.getOperationType === TOperationType.GET_CATALOGS)
assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
checkOperationResult(client, opHandle)
req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle)
val resp1 = client.GetCatalogs(req)
assert(resp1.getOperationHandle === null)
assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(resp1.getStatus.getSqlState === null)
assert(resp1.getStatus.getErrorMessage startsWith "Invalid SessionHandle")
}
}
@ -132,6 +141,137 @@ class FrontendServiceSuite extends KyuubiFunSuite {
assert(opHandle.getOperationType === TOperationType.GET_SCHEMAS)
assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
checkOperationResult(client, opHandle)
req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle)
val resp1 = client.GetSchemas(req)
assert(resp1.getOperationHandle === null)
assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(resp1.getStatus.getSqlState === null)
assert(resp1.getStatus.getErrorMessage startsWith "Invalid SessionHandle")
}
}
test("get tables") {
withSessionHandle { (client, handle) =>
val req = new TGetTablesReq(handle)
val resp = client.GetTables(req)
val opHandle = resp.getOperationHandle
assert(opHandle.getOperationType === TOperationType.GET_TABLES)
assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
checkOperationResult(client, opHandle)
req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle)
val resp1 = client.GetTables(req)
assert(resp1.getOperationHandle === null)
assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(resp1.getStatus.getSqlState === null)
assert(resp1.getStatus.getErrorMessage startsWith "Invalid SessionHandle")
}
}
test("get table types") {
withSessionHandle { (client, handle) =>
val req = new TGetTableTypesReq(handle)
val resp = client.GetTableTypes(req)
val opHandle = resp.getOperationHandle
assert(opHandle.getOperationType === TOperationType.GET_TABLE_TYPES)
assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
checkOperationResult(client, opHandle)
req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle)
val resp1 = client.GetTableTypes(req)
assert(resp1.getOperationHandle === null)
assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(resp1.getStatus.getSqlState === null)
assert(resp1.getStatus.getErrorMessage startsWith "Invalid SessionHandle")
}
}
test("get columns") {
withSessionHandle { (client, handle) =>
val req = new TGetColumnsReq(handle)
val resp = client.GetColumns(req)
val opHandle = resp.getOperationHandle
assert(opHandle.getOperationType === TOperationType.GET_COLUMNS)
assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
checkOperationResult(client, opHandle)
req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle)
val resp1 = client.GetColumns(req)
assert(resp1.getOperationHandle === null)
assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(resp1.getStatus.getSqlState === null)
assert(resp1.getStatus.getErrorMessage startsWith "Invalid SessionHandle")
}
}
test("get functions") {
withSessionHandle { (client, handle) =>
val req = new TGetFunctionsReq(handle, "sum")
val resp = client.GetFunctions(req)
val opHandle = resp.getOperationHandle
assert(opHandle.getOperationType === TOperationType.GET_FUNCTIONS)
assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
checkOperationResult(client, opHandle)
req.setSessionHandle(SessionHandle(BackendService.SERVER_VERSION).toTSessionHandle)
val resp1 = client.GetFunctions(req)
assert(resp1.getOperationHandle === null)
assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(resp1.getStatus.getSqlState === null)
assert(resp1.getStatus.getErrorMessage startsWith "Invalid SessionHandle")
}
}
test("get primary keys") {
withSessionHandle { (client, handle) =>
val req = new TGetPrimaryKeysReq(handle)
val resp = client.GetPrimaryKeys(req)
assert(resp.getOperationHandle === null)
assert(resp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(resp.getStatus.getSqlState === null)
assert(resp.getStatus.getErrorMessage startsWith "Feature is not available")
}
}
test("get cross reference") {
withSessionHandle { (client, handle) =>
val req = new TGetCrossReferenceReq(handle)
val resp = client.GetCrossReference(req)
assert(resp.getOperationHandle === null)
assert(resp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(resp.getStatus.getSqlState === null)
assert(resp.getStatus.getErrorMessage startsWith "Feature is not available")
}
}
test("get operation status") {
withSessionHandle { (client, handle) =>
val opHandle =
OperationHandle(OperationType.EXECUTE_STATEMENT, BackendService.SERVER_VERSION)
val req = new TGetOperationStatusReq(opHandle)
val resp = client.GetOperationStatus(req)
assert(resp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(resp.getStatus.getSqlState === null)
assert(resp.getStatus.getErrorMessage startsWith "Invalid OperationHandle")
val req1 = new TGetTablesReq(handle)
val resp1 = client.GetTables(req1)
val opHandle1 = resp1.getOperationHandle
val req2 = new TGetOperationStatusReq(opHandle1)
val resp2 = client.GetOperationStatus(req2)
assert(resp2.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
assert(resp2.getOperationState === TOperationState.FINISHED_STATE)
req1.setSchemaName("invalid")
val resp3 = client.GetTables(req1)
req2.setOperationHandle(resp3.getOperationHandle)
val resp4 = client.GetOperationStatus(req2)
assert(resp4.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
assert(resp4.getOperationState === TOperationState.ERROR_STATE)
assert(resp4.getErrorMessage === "noop operation err")
}
}
}