FrontendServiceSuite other operations

This commit is contained in:
Kent Yao 2020-10-23 11:48:37 +08:00
parent 76a7d55685
commit 15b93e28b8
4 changed files with 165 additions and 9 deletions

View File

@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, TStringColumn, TTableSchema}
import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, TPrimitiveTypeEntry, TRow, TRowSet, TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@ -61,7 +61,17 @@ class NoopOperation(typ: OperationType, session: Session, shouldFail: Boolean =
}
override def getResultSetSchema: TTableSchema = {
new TTableSchema()
val tColumnDesc = new TColumnDesc()
tColumnDesc.setColumnName("noop")
val desc = new TTypeDesc
desc.addToTypes(TTypeEntry.primitiveEntry(new TPrimitiveTypeEntry(TTypeId.STRING_TYPE)))
tColumnDesc.setTypeDesc(desc)
tColumnDesc.setComment("comment")
tColumnDesc.setPosition(0)
tColumnDesc
val schema = new TTableSchema()
schema.addToColumns(tColumnDesc)
schema
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {

View File

@ -17,20 +17,21 @@
package org.apache.kyuubi.operation
import org.apache.hive.service.rpc.thrift.TRowSet
import org.apache.hive.service.rpc.thrift.{TRow, TRowSet}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.Session
class NoopOperationManager extends OperationManager("noop") {
private val invalid = "invalid"
override def newExecuteStatementOperation(
session: Session,
statement: String,
runAsync: Boolean,
queryTimeout: Long): Operation = {
val operation = new NoopOperation(OperationType.EXECUTE_STATEMENT, session)
val operation =
new NoopOperation(OperationType.EXECUTE_STATEMENT, session, statement == invalid)
addOperation(operation)
}
@ -58,7 +59,7 @@ class NoopOperationManager extends OperationManager("noop") {
schemaName: String,
tableName: String,
tableTypes: java.util.List[String]): Operation = {
val operation = new NoopOperation(OperationType.GET_TABLES, session, schemaName == "invalid")
val operation = new NoopOperation(OperationType.GET_TABLES, session, schemaName == invalid)
addOperation(operation)
}
@ -89,5 +90,5 @@ class NoopOperationManager extends OperationManager("noop") {
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
maxRows: Int): TRowSet = new TRowSet()
maxRows: Int): TRowSet = new TRowSet(0, new java.util.ArrayList[TRow](0))
}

View File

@ -94,6 +94,17 @@ class FrontendServiceSuite extends KyuubiFunSuite {
val expected = handle.getOperationType.toString
val actual = resp.getResults.getColumns.get(0).getStringVal.getValues.get(0)
assert(actual === expected)
tFetchResultsReq.setFetchType(1)
val resp1 = client.FetchResults(tFetchResultsReq)
assert(resp1.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
assert(resp1.getResults.getColumnCount === 0)
val invalidHandle =
OperationHandle(OperationType.getOperationType(handle.getOperationType), SERVER_VERSION)
tFetchResultsReq.setOperationHandle(invalidHandle)
val errResp = client.FetchResults(tFetchResultsReq)
errResp.getStatus
assert(errResp.getResults === null)
assert(errResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
}
test("open session") {
@ -278,6 +289,25 @@ class FrontendServiceSuite extends KyuubiFunSuite {
}
}
test("get type info") {
withSessionHandle { (client, handle) =>
val req = new TGetTypeInfoReq()
req.setSessionHandle(handle)
val resp = client.GetTypeInfo(req)
val opHandle = resp.getOperationHandle
assert(opHandle.getOperationType === TOperationType.GET_TYPE_INFO)
assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
checkOperationResult(client, opHandle)
req.setSessionHandle(SessionHandle(SERVER_VERSION).toTSessionHandle)
val resp1 = client.GetTypeInfo(req)
assert(resp1.getOperationHandle === null)
assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(resp1.getStatus.getSqlState === null)
assert(resp1.getStatus.getErrorMessage startsWith "Invalid SessionHandle")
}
}
test("get primary keys") {
withSessionHandle { (client, handle) =>
val req = new TGetPrimaryKeysReq(handle)
@ -326,7 +356,117 @@ class FrontendServiceSuite extends KyuubiFunSuite {
assert(resp4.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
assert(resp4.getOperationState === TOperationState.ERROR_STATE)
assert(resp4.getErrorMessage === "noop operation err")
}
}
test("execute statement") {
withSessionHandle { (client, handle) =>
val req = new TExecuteStatementReq()
req.setStatement("select 1")
req.setSessionHandle(handle)
req.setRunAsync(false)
val resp = client.ExecuteStatement(req)
val opHandle = resp.getOperationHandle
assert(opHandle.getOperationType === TOperationType.EXECUTE_STATEMENT)
assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
checkOperationResult(client, opHandle)
req.setRunAsync(true)
val resp1 = client.ExecuteStatement(req)
val opHandle1 = resp1.getOperationHandle
checkOperationResult(client, opHandle1)
req.setSessionHandle(SessionHandle(SERVER_VERSION).toTSessionHandle)
val resp2 = client.ExecuteStatement(req)
assert(resp2.getOperationHandle === null)
assert(resp2.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(resp2.getStatus.getSqlState === null)
assert(resp2.getStatus.getErrorMessage startsWith "Invalid SessionHandle")
}
}
test("cancel operation") {
withSessionHandle { (client, handle) =>
val opHandle =
OperationHandle(OperationType.EXECUTE_STATEMENT, SERVER_VERSION)
val req = new TCancelOperationReq()
req.setOperationHandle(opHandle)
val resp = client.CancelOperation(req)
assert(resp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(resp.getStatus.getSqlState === null)
assert(resp.getStatus.getErrorMessage startsWith "Invalid OperationHandle")
val req1 = new TGetSchemasReq(handle)
val resp1 = client.GetSchemas(req1)
resp1.getOperationHandle
req.setOperationHandle(resp1.getOperationHandle)
val resp2 = client.CancelOperation(req)
assert(resp2.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
}
}
test("close operation") {
withSessionHandle { (client, handle) =>
val opHandle =
OperationHandle(OperationType.EXECUTE_STATEMENT, SERVER_VERSION)
val req = new TCloseOperationReq()
req.setOperationHandle(opHandle)
val resp = client.CloseOperation(req)
assert(resp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(resp.getStatus.getSqlState === null)
assert(resp.getStatus.getErrorMessage startsWith "Invalid OperationHandle")
val req1 = new TGetSchemasReq(handle)
val resp1 = client.GetSchemas(req1)
resp1.getOperationHandle
req.setOperationHandle(resp1.getOperationHandle)
val resp2 = client.CloseOperation(req)
assert(resp2.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
}
}
test("get result set meta data") {
withSessionHandle { (client, handle) =>
val req = new TGetSchemasReq(handle)
req.setSessionHandle(handle)
val operationHandle = client.GetSchemas(req).getOperationHandle
val req1 = new TGetResultSetMetadataReq(operationHandle)
val resp = client.GetResultSetMetadata(req1)
assert(resp.getSchema.getColumns.get(0).getColumnName === "noop")
assert(resp.getSchema.getColumns.get(0).getComment === "comment")
assert(resp.getSchema.getColumns.get(0).getPosition === 0)
req1.setOperationHandle(OperationHandle(OperationType.EXECUTE_STATEMENT, SERVER_VERSION))
val resp1 = client.GetResultSetMetadata(req1)
assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(resp1.getStatus.getSqlState === null)
assert(resp1.getStatus.getErrorMessage startsWith "Invalid OperationHandle")
}
}
test("Delegation token is not supported") {
withSessionHandle { (client, handle) =>
val tGetDelegationTokenReq = new TGetDelegationTokenReq()
tGetDelegationTokenReq.setSessionHandle(handle)
tGetDelegationTokenReq.setOwner(Utils.currentUser)
tGetDelegationTokenReq.setRenewer(Utils.currentUser)
val tGetDelegationTokenResp = client.GetDelegationToken(tGetDelegationTokenReq)
assert(tGetDelegationTokenResp.getDelegationToken === null)
assert(tGetDelegationTokenResp.getStatus.getErrorMessage ===
"Delegation token is not supported")
val tCancelDelegationTokenReq = new TCancelDelegationTokenReq()
tCancelDelegationTokenReq.setSessionHandle(handle)
tCancelDelegationTokenReq.setDelegationToken("")
val tCancelDelegationTokenResp = client.CancelDelegationToken(tCancelDelegationTokenReq)
assert(tCancelDelegationTokenResp.getStatus.getErrorMessage ===
"Delegation token is not supported")
val tRenewDelegationTokenReq = new TRenewDelegationTokenReq()
tRenewDelegationTokenReq.setSessionHandle(handle)
tRenewDelegationTokenReq.setDelegationToken("")
val tRenewDelegationTokenResp = client.RenewDelegationToken(tRenewDelegationTokenReq)
assert(tRenewDelegationTokenResp.getStatus.getErrorMessage ===
"Delegation token is not supported")
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.kyuubi.engine.spark
import java.nio.file.{Files, Paths}
import java.util.concurrent.TimeUnit
import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException}
import org.apache.kyuubi.config.KyuubiConf
@ -46,7 +47,11 @@ class SparkProcessBuilderSuite extends KyuubiFunSuite {
test("capture error from spark process builder") {
val processBuilder = new SparkProcessBuilder("kentyao", conf ++ Map("spark.ui.port" -> "abc"))
processBuilder.start
val proc = processBuilder.start
proc.waitFor(10, TimeUnit.SECONDS)
while (proc.isAlive) {
Thread.sleep(100)
}
Thread.sleep(5000)
val error = processBuilder.getError
assert(error.getMessage.contains(