OperationStatus

This commit is contained in:
Kent Yao 2020-10-21 16:06:38 +08:00
parent 6e88ce9ef7
commit 53b3fdb616
5 changed files with 243 additions and 168 deletions

View File

@ -92,7 +92,9 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
warn(s"Ignore exception in terminal state with $statementId: $e")
} else {
setState(OperationState.ERROR)
throw KyuubiSQLException(s"Error operating $opType: ${e.getMessage}", e)
val ke = KyuubiSQLException(s"Error operating $opType: ${e.getMessage}", e)
setOperationException(ke)
throw ke
}
}
}

View File

@ -22,7 +22,7 @@ import java.util.Locale
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hive.service.rpc.thrift.TCLIService
import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TOpenSessionReq, TSessionHandle}
import org.apache.spark.sql.SparkSession
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.TSocket
@ -41,6 +41,8 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
System.setProperty("spark.sql.warehouse.dir", warehousePath.toString)
System.setProperty("spark.sql.hive.metastore.sharedPrefixes", "org.apache.hive.jdbc")
protected val user = System.getProperty("user.name")
protected val spark: SparkSession = SparkSQLEngine.createSpark()
protected var engine: SparkSQLEngine = _
@ -67,7 +69,6 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
protected def withMultipleConnectionJdbcStatement(
tableNames: String*)(fs: (Statement => Unit)*): Unit = {
val user = System.getProperty("user.name")
val connections = fs.map { _ => DriverManager.getConnection(jdbcUrl, user, "") }
val statements = connections.map(_.createStatement())
@ -91,7 +92,6 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
}
protected def withDatabases(dbNames: String*)(fs: (Statement => Unit)*): Unit = {
val user = System.getProperty("user.name")
val connections = fs.map { _ => DriverManager.getConnection(jdbcUrl, user, "") }
val statements = connections.map(_.createStatement())
@ -129,4 +129,25 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
socket.close()
}
}
protected def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername(user)
req.setPassword("anonymous")
val resp = client.OpenSession(req)
val handle = resp.getSessionHandle
try {
f(client, handle)
} finally {
val tCloseSessionReq = new TCloseSessionReq(handle)
try {
client.CloseSession(tCloseSessionReq)
} catch {
case e: Exception => error(s"Failed to close $handle", e)
}
}
}
}
}

View File

@ -34,6 +34,7 @@ import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
class SparkOperationSuite extends WithSparkSQLEngine {
private val currentCatalog = spark.sessionState.catalogManager.currentCatalog
private val dftSchema = "default"
@ -723,167 +724,6 @@ class SparkOperationSuite extends WithSparkSQLEngine {
}
}
test("basic open | execute | close") {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername("kentyao")
req.setPassword("anonymous")
val tOpenSessionResp = client.OpenSession(req)
val tExecuteStatementReq = new TExecuteStatementReq()
tExecuteStatementReq.setSessionHandle( tOpenSessionResp.getSessionHandle)
tExecuteStatementReq.setStatement("set -v")
val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
val tFetchResultsReq = new TFetchResultsReq()
tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle)
tFetchResultsReq.setFetchType(1)
tFetchResultsReq.setMaxRows(1000)
val tFetchResultsResp = client.FetchResults(tFetchResultsReq)
val logs = tFetchResultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala
assert(logs.exists(_.contains(classOf[ExecuteStatement].getCanonicalName)))
tFetchResultsReq.setFetchType(0)
val tFetchResultsResp1 = client.FetchResults(tFetchResultsReq)
val rs = tFetchResultsResp1.getResults.getColumns.get(0).getStringVal.getValues.asScala
assert(rs.contains("spark.sql.shuffle.partitions"))
val tCloseSessionReq = new TCloseSessionReq()
tCloseSessionReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
val tCloseSessionResp = client.CloseSession(tCloseSessionReq)
assert(tCloseSessionResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
}
}
test("set session conf") {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername("kentyao")
req.setPassword("anonymous")
val conf = Map(
"use:database" -> "default",
"spark.sql.shuffle.partitions" -> "4",
"set:hiveconf:spark.sql.autoBroadcastJoinThreshold" -> "-1",
"set:hivevar:spark.sql.adaptive.enabled" -> "true")
req.setConfiguration(conf.asJava)
val tOpenSessionResp = client.OpenSession(req)
val tExecuteStatementReq = new TExecuteStatementReq()
tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
tExecuteStatementReq.setStatement("set")
val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
val tFetchResultsReq = new TFetchResultsReq()
tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle)
tFetchResultsReq.setFetchType(0)
tFetchResultsReq.setMaxRows(1000)
val tFetchResultsResp1 = client.FetchResults(tFetchResultsReq)
val columns = tFetchResultsResp1.getResults.getColumns
val rs = columns.get(0).getStringVal.getValues.asScala.zip(
columns.get(1).getStringVal.getValues.asScala)
rs foreach {
case ("spark.sql.shuffle.partitions", v) => assert(v === "4")
case ("spark.sql.autoBroadcastJoinThreshold", v) => assert(v === "-1")
case ("spark.sql.adaptive.enabled", v) => assert(v.toBoolean)
case _ =>
}
assert(spark.conf.get("spark.sql.shuffle.partitions") === "200")
val tCloseSessionReq = new TCloseSessionReq()
tCloseSessionReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
val tCloseSessionResp = client.CloseSession(tCloseSessionReq)
assert(tCloseSessionResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
}
}
test("set session conf - static") {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername("kentyao")
req.setPassword("anonymous")
val conf = Map("use:database" -> "default", "spark.sql.globalTempDatabase" -> "temp")
req.setConfiguration(conf.asJava)
val tOpenSessionResp = client.OpenSession(req)
val status = tOpenSessionResp.getStatus
assert(status.getStatusCode === TStatusCode.ERROR_STATUS)
assert(status.getErrorMessage.contains("spark.sql.globalTempDatabase"))
}
}
test("set session conf - wrong database") {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername("kentyao")
req.setPassword("anonymous")
val conf = Map("use:database" -> "default2")
req.setConfiguration(conf.asJava)
val tOpenSessionResp = client.OpenSession(req)
val status = tOpenSessionResp.getStatus
assert(status.getStatusCode === TStatusCode.ERROR_STATUS)
assert(status.getErrorMessage.contains("Database 'default2' does not exist"))
}
}
test("not allow to operate closed session or operation") {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername("kentyao")
req.setPassword("anonymous")
val tOpenSessionResp = client.OpenSession(req)
val tExecuteStatementReq = new TExecuteStatementReq()
tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
tExecuteStatementReq.setStatement("set")
val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
val tCloseOperationReq = new TCloseOperationReq(tExecuteStatementResp.getOperationHandle)
val tCloseOperationResp = client.CloseOperation(tCloseOperationReq)
assert(tCloseOperationResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
val tFetchResultsReq = new TFetchResultsReq()
tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle)
tFetchResultsReq.setFetchType(0)
tFetchResultsReq.setMaxRows(1000)
val tFetchResultsResp = client.FetchResults(tFetchResultsReq)
assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(tFetchResultsResp.getStatus.getErrorMessage startsWith "Invalid OperationHandle" +
" [type=EXECUTE_STATEMENT, identifier:")
val tCloseSessionReq = new TCloseSessionReq()
tCloseSessionReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
val tCloseSessionResp = client.CloseSession(tCloseSessionReq)
assert(tCloseSessionResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
val tExecuteStatementResp1 = client.ExecuteStatement(tExecuteStatementReq)
val status = tExecuteStatementResp1.getStatus
assert(status.getStatusCode === TStatusCode.ERROR_STATUS)
assert(status.getErrorMessage startsWith s"Invalid SessionHandle [")
}
}
test("cancel operation") {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername("kentyao")
req.setPassword("anonymous")
val tOpenSessionResp = client.OpenSession(req)
val tExecuteStatementReq = new TExecuteStatementReq()
tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
tExecuteStatementReq.setStatement("set")
val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
val tCancelOperationReq = new TCancelOperationReq(tExecuteStatementResp.getOperationHandle)
val tCancelOperationResp = client.CancelOperation(tCancelOperationReq)
assert(tCancelOperationResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
val tFetchResultsReq = new TFetchResultsReq()
tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle)
tFetchResultsReq.setFetchType(0)
tFetchResultsReq.setMaxRows(1000)
val tFetchResultsResp = client.FetchResults(tFetchResultsReq)
assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
}
}
test("Hive JDBC Database MetaData API Auditing") {
withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
@ -1066,4 +906,184 @@ class SparkOperationSuite extends WithSparkSQLEngine {
assert(!metaData.supportsRefCursors)
}
}
test("get operation status") {
val sql = "select date_sub(date'2011-11-11', '1')"
withSessionHandle { (client, handle) =>
val req = new TExecuteStatementReq()
req.setSessionHandle(handle)
req.setStatement(sql)
val tExecuteStatementResp = client.ExecuteStatement(req)
val opHandle = tExecuteStatementResp.getOperationHandle
val tGetOperationStatusReq = new TGetOperationStatusReq()
tGetOperationStatusReq.setOperationHandle(opHandle)
val resp = client.GetOperationStatus(tGetOperationStatusReq)
val status = resp.getStatus
assert(status.getStatusCode === TStatusCode.SUCCESS_STATUS)
assert(resp.getOperationState === TOperationState.FINISHED_STATE)
assert(resp.isHasResultSet)
}
}
test("basic open | execute | close") {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername("kentyao")
req.setPassword("anonymous")
val tOpenSessionResp = client.OpenSession(req)
val tExecuteStatementReq = new TExecuteStatementReq()
tExecuteStatementReq.setSessionHandle( tOpenSessionResp.getSessionHandle)
tExecuteStatementReq.setStatement("set -v")
val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
val tFetchResultsReq = new TFetchResultsReq()
tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle)
tFetchResultsReq.setFetchType(1)
tFetchResultsReq.setMaxRows(1000)
val tFetchResultsResp = client.FetchResults(tFetchResultsReq)
val logs = tFetchResultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala
assert(logs.exists(_.contains(classOf[ExecuteStatement].getCanonicalName)))
tFetchResultsReq.setFetchType(0)
val tFetchResultsResp1 = client.FetchResults(tFetchResultsReq)
val rs = tFetchResultsResp1.getResults.getColumns.get(0).getStringVal.getValues.asScala
assert(rs.contains("spark.sql.shuffle.partitions"))
val tCloseSessionReq = new TCloseSessionReq()
tCloseSessionReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
val tCloseSessionResp = client.CloseSession(tCloseSessionReq)
assert(tCloseSessionResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
}
}
test("set session conf") {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername("kentyao")
req.setPassword("anonymous")
val conf = Map(
"use:database" -> "default",
"spark.sql.shuffle.partitions" -> "4",
"set:hiveconf:spark.sql.autoBroadcastJoinThreshold" -> "-1",
"set:hivevar:spark.sql.adaptive.enabled" -> "true")
req.setConfiguration(conf.asJava)
val tOpenSessionResp = client.OpenSession(req)
val tExecuteStatementReq = new TExecuteStatementReq()
tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
tExecuteStatementReq.setStatement("set")
val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
val tFetchResultsReq = new TFetchResultsReq()
tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle)
tFetchResultsReq.setFetchType(0)
tFetchResultsReq.setMaxRows(1000)
val tFetchResultsResp1 = client.FetchResults(tFetchResultsReq)
val columns = tFetchResultsResp1.getResults.getColumns
val rs = columns.get(0).getStringVal.getValues.asScala.zip(
columns.get(1).getStringVal.getValues.asScala)
rs foreach {
case ("spark.sql.shuffle.partitions", v) => assert(v === "4")
case ("spark.sql.autoBroadcastJoinThreshold", v) => assert(v === "-1")
case ("spark.sql.adaptive.enabled", v) => assert(v.toBoolean)
case _ =>
}
assert(spark.conf.get("spark.sql.shuffle.partitions") === "200")
val tCloseSessionReq = new TCloseSessionReq()
tCloseSessionReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
val tCloseSessionResp = client.CloseSession(tCloseSessionReq)
assert(tCloseSessionResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
}
}
test("set session conf - static") {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername("kentyao")
req.setPassword("anonymous")
val conf = Map("use:database" -> "default", "spark.sql.globalTempDatabase" -> "temp")
req.setConfiguration(conf.asJava)
val tOpenSessionResp = client.OpenSession(req)
val status = tOpenSessionResp.getStatus
assert(status.getStatusCode === TStatusCode.ERROR_STATUS)
assert(status.getErrorMessage.contains("spark.sql.globalTempDatabase"))
}
}
test("set session conf - wrong database") {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername("kentyao")
req.setPassword("anonymous")
val conf = Map("use:database" -> "default2")
req.setConfiguration(conf.asJava)
val tOpenSessionResp = client.OpenSession(req)
val status = tOpenSessionResp.getStatus
assert(status.getStatusCode === TStatusCode.ERROR_STATUS)
assert(status.getErrorMessage.contains("Database 'default2' does not exist"))
}
}
test("not allow to operate closed session or operation") {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername("kentyao")
req.setPassword("anonymous")
val tOpenSessionResp = client.OpenSession(req)
val tExecuteStatementReq = new TExecuteStatementReq()
tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
tExecuteStatementReq.setStatement("set")
val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
val tCloseOperationReq = new TCloseOperationReq(tExecuteStatementResp.getOperationHandle)
val tCloseOperationResp = client.CloseOperation(tCloseOperationReq)
assert(tCloseOperationResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
val tFetchResultsReq = new TFetchResultsReq()
tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle)
tFetchResultsReq.setFetchType(0)
tFetchResultsReq.setMaxRows(1000)
val tFetchResultsResp = client.FetchResults(tFetchResultsReq)
assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(tFetchResultsResp.getStatus.getErrorMessage startsWith "Invalid OperationHandle" +
" [type=EXECUTE_STATEMENT, identifier:")
val tCloseSessionReq = new TCloseSessionReq()
tCloseSessionReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
val tCloseSessionResp = client.CloseSession(tCloseSessionReq)
assert(tCloseSessionResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
val tExecuteStatementResp1 = client.ExecuteStatement(tExecuteStatementReq)
val status = tExecuteStatementResp1.getStatus
assert(status.getStatusCode === TStatusCode.ERROR_STATUS)
assert(status.getErrorMessage startsWith s"Invalid SessionHandle [")
}
}
test("cancel operation") {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername("kentyao")
req.setPassword("anonymous")
val tOpenSessionResp = client.OpenSession(req)
val tExecuteStatementReq = new TExecuteStatementReq()
tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
tExecuteStatementReq.setStatement("set")
val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
val tCancelOperationReq = new TCancelOperationReq(tExecuteStatementResp.getOperationHandle)
val tCancelOperationResp = client.CancelOperation(tCancelOperationReq)
assert(tCancelOperationResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
val tFetchResultsReq = new TFetchResultsReq()
tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle)
tFetchResultsReq.setFetchType(0)
tFetchResultsReq.setMaxRows(1000)
val tFetchResultsResp = client.FetchResults(tFetchResultsReq)
assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
}
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.operation
import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException}
class OperationStatusSuite extends KyuubiFunSuite {
test("operation status") {
val status = OperationStatus(OperationState.INITIALIZED, 0, 0, hasResultSet = false)
assert(status.exception.isEmpty)
val status1 = status.copy(exception = Some(KyuubiSQLException("nothing")))
assert(status1.exception.get.getMessage === "nothing")
}
}

View File

@ -47,11 +47,12 @@ abstract class KyuubiOperation(
warn(s"Ignore exception in terminal state with $statementId: $e")
} else {
setState(OperationState.ERROR)
e match {
case kse: KyuubiSQLException => throw kse
val ke = e match {
case kse: KyuubiSQLException => kse
case _ =>
throw KyuubiSQLException(s"Error $action $opType: ${e.getMessage}", e)
KyuubiSQLException(s"Error $action $opType: ${e.getMessage}", e)
}
setOperationException(ke)
}
}
}