From 53b3fdb616cb5a615223bae88c60ebd4e9ac7d1e Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 21 Oct 2020 16:06:38 +0800 Subject: [PATCH] OperationStatus --- .../spark/operation/SparkOperation.scala | 4 +- .../engine/spark/WithSparkSQLEngine.scala | 27 +- .../spark/operation/SparkOperationSuite.scala | 342 +++++++++--------- .../operation/OperationStatusSuite.scala | 31 ++ .../kyuubi/operation/KyuubiOperation.scala | 7 +- 5 files changed, 243 insertions(+), 168 deletions(-) create mode 100644 kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationStatusSuite.scala diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index 46cbfd65f..460ab248e 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -92,7 +92,9 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio warn(s"Ignore exception in terminal state with $statementId: $e") } else { setState(OperationState.ERROR) - throw KyuubiSQLException(s"Error operating $opType: ${e.getMessage}", e) + val ke = KyuubiSQLException(s"Error operating $opType: ${e.getMessage}", e) + setOperationException(ke) + throw ke } } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala index 2d0f31c10..e64d19e03 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala @@ -22,7 +22,7 @@ import java.util.Locale import org.apache.hadoop.hive.ql.metadata.Hive import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hive.service.rpc.thrift.TCLIService +import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TOpenSessionReq, TSessionHandle} import org.apache.spark.sql.SparkSession import org.apache.thrift.protocol.TBinaryProtocol import org.apache.thrift.transport.TSocket @@ -41,6 +41,8 @@ trait WithSparkSQLEngine extends KyuubiFunSuite { System.setProperty("spark.sql.warehouse.dir", warehousePath.toString) System.setProperty("spark.sql.hive.metastore.sharedPrefixes", "org.apache.hive.jdbc") + protected val user = System.getProperty("user.name") + protected val spark: SparkSession = SparkSQLEngine.createSpark() protected var engine: SparkSQLEngine = _ @@ -67,7 +69,6 @@ trait WithSparkSQLEngine extends KyuubiFunSuite { protected def withMultipleConnectionJdbcStatement( tableNames: String*)(fs: (Statement => Unit)*): Unit = { - val user = System.getProperty("user.name") val connections = fs.map { _ => DriverManager.getConnection(jdbcUrl, user, "") } val statements = connections.map(_.createStatement()) @@ -91,7 +92,6 @@ trait WithSparkSQLEngine extends KyuubiFunSuite { } protected def withDatabases(dbNames: String*)(fs: (Statement => Unit)*): Unit = { - val user = System.getProperty("user.name") val connections = fs.map { _ => DriverManager.getConnection(jdbcUrl, user, "") } val statements = connections.map(_.createStatement()) @@ -129,4 +129,25 @@ trait WithSparkSQLEngine extends KyuubiFunSuite { socket.close() } } + + protected def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = { + withThriftClient { client => + val req = new TOpenSessionReq() + req.setUsername(user) + req.setPassword("anonymous") + val resp = client.OpenSession(req) + val handle = resp.getSessionHandle + + try { + f(client, handle) + } finally { + val tCloseSessionReq = new TCloseSessionReq(handle) + try { + client.CloseSession(tCloseSessionReq) + } catch { + case e: Exception => error(s"Failed to close $handle", e) + } + } + } + } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala index 527fef9ff..8ef5a6464 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala @@ -34,6 +34,7 @@ import org.apache.kyuubi.engine.spark.WithSparkSQLEngine import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ class SparkOperationSuite extends WithSparkSQLEngine { + private val currentCatalog = spark.sessionState.catalogManager.currentCatalog private val dftSchema = "default" @@ -723,167 +724,6 @@ class SparkOperationSuite extends WithSparkSQLEngine { } } - test("basic open | execute | close") { - withThriftClient { client => - val req = new TOpenSessionReq() - req.setUsername("kentyao") - req.setPassword("anonymous") - val tOpenSessionResp = client.OpenSession(req) - - val tExecuteStatementReq = new TExecuteStatementReq() - tExecuteStatementReq.setSessionHandle( tOpenSessionResp.getSessionHandle) - tExecuteStatementReq.setStatement("set -v") - val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq) - - val tFetchResultsReq = new TFetchResultsReq() - tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle) - tFetchResultsReq.setFetchType(1) - tFetchResultsReq.setMaxRows(1000) - val tFetchResultsResp = client.FetchResults(tFetchResultsReq) - val logs = tFetchResultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala - assert(logs.exists(_.contains(classOf[ExecuteStatement].getCanonicalName))) - - tFetchResultsReq.setFetchType(0) - val tFetchResultsResp1 = client.FetchResults(tFetchResultsReq) - val rs = tFetchResultsResp1.getResults.getColumns.get(0).getStringVal.getValues.asScala - assert(rs.contains("spark.sql.shuffle.partitions")) - - val tCloseSessionReq = new TCloseSessionReq() - tCloseSessionReq.setSessionHandle(tOpenSessionResp.getSessionHandle) - val tCloseSessionResp = client.CloseSession(tCloseSessionReq) - assert(tCloseSessionResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) - } - } - - test("set session conf") { - withThriftClient { client => - val req = new TOpenSessionReq() - req.setUsername("kentyao") - req.setPassword("anonymous") - val conf = Map( - "use:database" -> "default", - "spark.sql.shuffle.partitions" -> "4", - "set:hiveconf:spark.sql.autoBroadcastJoinThreshold" -> "-1", - "set:hivevar:spark.sql.adaptive.enabled" -> "true") - req.setConfiguration(conf.asJava) - val tOpenSessionResp = client.OpenSession(req) - - val tExecuteStatementReq = new TExecuteStatementReq() - tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle) - tExecuteStatementReq.setStatement("set") - val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq) - - val tFetchResultsReq = new TFetchResultsReq() - tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle) - tFetchResultsReq.setFetchType(0) - tFetchResultsReq.setMaxRows(1000) - val tFetchResultsResp1 = client.FetchResults(tFetchResultsReq) - val columns = tFetchResultsResp1.getResults.getColumns - val rs = columns.get(0).getStringVal.getValues.asScala.zip( - columns.get(1).getStringVal.getValues.asScala) - rs foreach { - case ("spark.sql.shuffle.partitions", v) => assert(v === "4") - case ("spark.sql.autoBroadcastJoinThreshold", v) => assert(v === "-1") - case ("spark.sql.adaptive.enabled", v) => assert(v.toBoolean) - case _ => - } - assert(spark.conf.get("spark.sql.shuffle.partitions") === "200") - - val tCloseSessionReq = new TCloseSessionReq() - tCloseSessionReq.setSessionHandle(tOpenSessionResp.getSessionHandle) - val tCloseSessionResp = client.CloseSession(tCloseSessionReq) - assert(tCloseSessionResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) - } - } - - test("set session conf - static") { - withThriftClient { client => - val req = new TOpenSessionReq() - req.setUsername("kentyao") - req.setPassword("anonymous") - val conf = Map("use:database" -> "default", "spark.sql.globalTempDatabase" -> "temp") - req.setConfiguration(conf.asJava) - val tOpenSessionResp = client.OpenSession(req) - val status = tOpenSessionResp.getStatus - assert(status.getStatusCode === TStatusCode.ERROR_STATUS) - assert(status.getErrorMessage.contains("spark.sql.globalTempDatabase")) - } - } - - test("set session conf - wrong database") { - withThriftClient { client => - val req = new TOpenSessionReq() - req.setUsername("kentyao") - req.setPassword("anonymous") - val conf = Map("use:database" -> "default2") - req.setConfiguration(conf.asJava) - val tOpenSessionResp = client.OpenSession(req) - val status = tOpenSessionResp.getStatus - assert(status.getStatusCode === TStatusCode.ERROR_STATUS) - assert(status.getErrorMessage.contains("Database 'default2' does not exist")) - } - } - - test("not allow to operate closed session or operation") { - withThriftClient { client => - val req = new TOpenSessionReq() - req.setUsername("kentyao") - req.setPassword("anonymous") - val tOpenSessionResp = client.OpenSession(req) - - val tExecuteStatementReq = new TExecuteStatementReq() - tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle) - tExecuteStatementReq.setStatement("set") - val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq) - - val tCloseOperationReq = new TCloseOperationReq(tExecuteStatementResp.getOperationHandle) - val tCloseOperationResp = client.CloseOperation(tCloseOperationReq) - assert(tCloseOperationResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) - - val tFetchResultsReq = new TFetchResultsReq() - tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle) - tFetchResultsReq.setFetchType(0) - tFetchResultsReq.setMaxRows(1000) - val tFetchResultsResp = client.FetchResults(tFetchResultsReq) - assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) - assert(tFetchResultsResp.getStatus.getErrorMessage startsWith "Invalid OperationHandle" + - " [type=EXECUTE_STATEMENT, identifier:") - - val tCloseSessionReq = new TCloseSessionReq() - tCloseSessionReq.setSessionHandle(tOpenSessionResp.getSessionHandle) - val tCloseSessionResp = client.CloseSession(tCloseSessionReq) - assert(tCloseSessionResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) - val tExecuteStatementResp1 = client.ExecuteStatement(tExecuteStatementReq) - - val status = tExecuteStatementResp1.getStatus - assert(status.getStatusCode === TStatusCode.ERROR_STATUS) - assert(status.getErrorMessage startsWith s"Invalid SessionHandle [") - } - } - - test("cancel operation") { - withThriftClient { client => - val req = new TOpenSessionReq() - req.setUsername("kentyao") - req.setPassword("anonymous") - val tOpenSessionResp = client.OpenSession(req) - - val tExecuteStatementReq = new TExecuteStatementReq() - tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle) - tExecuteStatementReq.setStatement("set") - val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq) - val tCancelOperationReq = new TCancelOperationReq(tExecuteStatementResp.getOperationHandle) - val tCancelOperationResp = client.CancelOperation(tCancelOperationReq) - assert(tCancelOperationResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) - val tFetchResultsReq = new TFetchResultsReq() - tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle) - tFetchResultsReq.setFetchType(0) - tFetchResultsReq.setMaxRows(1000) - val tFetchResultsResp = client.FetchResults(tFetchResultsReq) - assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) - } - } - test("Hive JDBC Database MetaData API Auditing") { withJdbcStatement() { statement => val metaData = statement.getConnection.getMetaData @@ -1066,4 +906,184 @@ class SparkOperationSuite extends WithSparkSQLEngine { assert(!metaData.supportsRefCursors) } } + + test("get operation status") { + val sql = "select date_sub(date'2011-11-11', '1')" + + withSessionHandle { (client, handle) => + val req = new TExecuteStatementReq() + req.setSessionHandle(handle) + req.setStatement(sql) + val tExecuteStatementResp = client.ExecuteStatement(req) + val opHandle = tExecuteStatementResp.getOperationHandle + val tGetOperationStatusReq = new TGetOperationStatusReq() + tGetOperationStatusReq.setOperationHandle(opHandle) + val resp = client.GetOperationStatus(tGetOperationStatusReq) + val status = resp.getStatus + assert(status.getStatusCode === TStatusCode.SUCCESS_STATUS) + assert(resp.getOperationState === TOperationState.FINISHED_STATE) + assert(resp.isHasResultSet) + } + } + + test("basic open | execute | close") { + withThriftClient { client => + val req = new TOpenSessionReq() + req.setUsername("kentyao") + req.setPassword("anonymous") + val tOpenSessionResp = client.OpenSession(req) + + val tExecuteStatementReq = new TExecuteStatementReq() + tExecuteStatementReq.setSessionHandle( tOpenSessionResp.getSessionHandle) + tExecuteStatementReq.setStatement("set -v") + val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq) + + val tFetchResultsReq = new TFetchResultsReq() + tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle) + tFetchResultsReq.setFetchType(1) + tFetchResultsReq.setMaxRows(1000) + val tFetchResultsResp = client.FetchResults(tFetchResultsReq) + val logs = tFetchResultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala + assert(logs.exists(_.contains(classOf[ExecuteStatement].getCanonicalName))) + + tFetchResultsReq.setFetchType(0) + val tFetchResultsResp1 = client.FetchResults(tFetchResultsReq) + val rs = tFetchResultsResp1.getResults.getColumns.get(0).getStringVal.getValues.asScala + assert(rs.contains("spark.sql.shuffle.partitions")) + + val tCloseSessionReq = new TCloseSessionReq() + tCloseSessionReq.setSessionHandle(tOpenSessionResp.getSessionHandle) + val tCloseSessionResp = client.CloseSession(tCloseSessionReq) + assert(tCloseSessionResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + } + } + + test("set session conf") { + withThriftClient { client => + val req = new TOpenSessionReq() + req.setUsername("kentyao") + req.setPassword("anonymous") + val conf = Map( + "use:database" -> "default", + "spark.sql.shuffle.partitions" -> "4", + "set:hiveconf:spark.sql.autoBroadcastJoinThreshold" -> "-1", + "set:hivevar:spark.sql.adaptive.enabled" -> "true") + req.setConfiguration(conf.asJava) + val tOpenSessionResp = client.OpenSession(req) + + val tExecuteStatementReq = new TExecuteStatementReq() + tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle) + tExecuteStatementReq.setStatement("set") + val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq) + + val tFetchResultsReq = new TFetchResultsReq() + tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle) + tFetchResultsReq.setFetchType(0) + tFetchResultsReq.setMaxRows(1000) + val tFetchResultsResp1 = client.FetchResults(tFetchResultsReq) + val columns = tFetchResultsResp1.getResults.getColumns + val rs = columns.get(0).getStringVal.getValues.asScala.zip( + columns.get(1).getStringVal.getValues.asScala) + rs foreach { + case ("spark.sql.shuffle.partitions", v) => assert(v === "4") + case ("spark.sql.autoBroadcastJoinThreshold", v) => assert(v === "-1") + case ("spark.sql.adaptive.enabled", v) => assert(v.toBoolean) + case _ => + } + assert(spark.conf.get("spark.sql.shuffle.partitions") === "200") + + val tCloseSessionReq = new TCloseSessionReq() + tCloseSessionReq.setSessionHandle(tOpenSessionResp.getSessionHandle) + val tCloseSessionResp = client.CloseSession(tCloseSessionReq) + assert(tCloseSessionResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + } + } + + test("set session conf - static") { + withThriftClient { client => + val req = new TOpenSessionReq() + req.setUsername("kentyao") + req.setPassword("anonymous") + val conf = Map("use:database" -> "default", "spark.sql.globalTempDatabase" -> "temp") + req.setConfiguration(conf.asJava) + val tOpenSessionResp = client.OpenSession(req) + val status = tOpenSessionResp.getStatus + assert(status.getStatusCode === TStatusCode.ERROR_STATUS) + assert(status.getErrorMessage.contains("spark.sql.globalTempDatabase")) + } + } + + test("set session conf - wrong database") { + withThriftClient { client => + val req = new TOpenSessionReq() + req.setUsername("kentyao") + req.setPassword("anonymous") + val conf = Map("use:database" -> "default2") + req.setConfiguration(conf.asJava) + val tOpenSessionResp = client.OpenSession(req) + val status = tOpenSessionResp.getStatus + assert(status.getStatusCode === TStatusCode.ERROR_STATUS) + assert(status.getErrorMessage.contains("Database 'default2' does not exist")) + } + } + + test("not allow to operate closed session or operation") { + withThriftClient { client => + val req = new TOpenSessionReq() + req.setUsername("kentyao") + req.setPassword("anonymous") + val tOpenSessionResp = client.OpenSession(req) + + val tExecuteStatementReq = new TExecuteStatementReq() + tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle) + tExecuteStatementReq.setStatement("set") + val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq) + + val tCloseOperationReq = new TCloseOperationReq(tExecuteStatementResp.getOperationHandle) + val tCloseOperationResp = client.CloseOperation(tCloseOperationReq) + assert(tCloseOperationResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = new TFetchResultsReq() + tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle) + tFetchResultsReq.setFetchType(0) + tFetchResultsReq.setMaxRows(1000) + val tFetchResultsResp = client.FetchResults(tFetchResultsReq) + assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) + assert(tFetchResultsResp.getStatus.getErrorMessage startsWith "Invalid OperationHandle" + + " [type=EXECUTE_STATEMENT, identifier:") + + val tCloseSessionReq = new TCloseSessionReq() + tCloseSessionReq.setSessionHandle(tOpenSessionResp.getSessionHandle) + val tCloseSessionResp = client.CloseSession(tCloseSessionReq) + assert(tCloseSessionResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + val tExecuteStatementResp1 = client.ExecuteStatement(tExecuteStatementReq) + + val status = tExecuteStatementResp1.getStatus + assert(status.getStatusCode === TStatusCode.ERROR_STATUS) + assert(status.getErrorMessage startsWith s"Invalid SessionHandle [") + } + } + + test("cancel operation") { + withThriftClient { client => + val req = new TOpenSessionReq() + req.setUsername("kentyao") + req.setPassword("anonymous") + val tOpenSessionResp = client.OpenSession(req) + + val tExecuteStatementReq = new TExecuteStatementReq() + tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle) + tExecuteStatementReq.setStatement("set") + val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq) + val tCancelOperationReq = new TCancelOperationReq(tExecuteStatementResp.getOperationHandle) + val tCancelOperationResp = client.CancelOperation(tCancelOperationReq) + assert(tCancelOperationResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + val tFetchResultsReq = new TFetchResultsReq() + tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle) + tFetchResultsReq.setFetchType(0) + tFetchResultsReq.setMaxRows(1000) + val tFetchResultsResp = client.FetchResults(tFetchResultsReq) + assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + } + } } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationStatusSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationStatusSuite.scala new file mode 100644 index 000000000..a84c063c6 --- /dev/null +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationStatusSuite.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation + +import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException} + +class OperationStatusSuite extends KyuubiFunSuite { + + test("operation status") { + val status = OperationStatus(OperationState.INITIALIZED, 0, 0, hasResultSet = false) + assert(status.exception.isEmpty) + val status1 = status.copy(exception = Some(KyuubiSQLException("nothing"))) + assert(status1.exception.get.getMessage === "nothing") + } + +} diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala index caf8e9398..4f09f1ffb 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala @@ -47,11 +47,12 @@ abstract class KyuubiOperation( warn(s"Ignore exception in terminal state with $statementId: $e") } else { setState(OperationState.ERROR) - e match { - case kse: KyuubiSQLException => throw kse + val ke = e match { + case kse: KyuubiSQLException => kse case _ => - throw KyuubiSQLException(s"Error $action $opType: ${e.getMessage}", e) + KyuubiSQLException(s"Error $action $opType: ${e.getMessage}", e) } + setOperationException(ke) } } }