diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index 1adc44c7f..11142d41b 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -19,8 +19,6 @@ package org.apache.kyuubi.engine.spark.operation import java.util.concurrent.{RejectedExecutionException, TimeUnit} -import scala.util.control.NonFatal - import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types._ @@ -116,18 +114,8 @@ class ExecuteStatement( ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread") timeoutExecutor.schedule(new Runnable { override def run(): Unit = { - try { - if (getStatus.state != OperationState.TIMEOUT) { - info(s"Query with $statementId timed out after $queryTimeout seconds") - cleanup(OperationState.TIMEOUT) - } - } catch { - case NonFatal(e) => - setOperationException(KyuubiSQLException(e)) - error(s"Error cancelling the query after timeout: $queryTimeout seconds") - } finally { - timeoutExecutor.shutdown() - } + cleanup(OperationState.TIMEOUT) + timeoutExecutor.shutdown() } }, queryTimeout, TimeUnit.SECONDS) } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index c3397daad..4e76ed9d9 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -49,9 +49,10 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio protected def resultSchema: StructType - protected def cleanup(targetState: OperationState): Unit = synchronized { + protected def cleanup(targetState: OperationState): Unit = state.synchronized { if (!isTerminalState(state)) { setState(targetState) + Option(getBackgroundHandle).foreach(_.cancel(true)) spark.sparkContext.cancelJobGroup(statementId) } } @@ -92,7 +93,11 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio if (cancel) spark.sparkContext.cancelJobGroup(statementId) state.synchronized { val errMsg = KyuubiSQLException.stringifyException(e) - if (isTerminalState(state)) { + if (state == OperationState.TIMEOUT) { + val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg") + setOperationException(ke) + throw ke + } else if (isTerminalState(state)) { warn(s"Ignore exception in terminal state with $statementId: $errMsg") } else { setState(OperationState.ERROR) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala index 1f42e7b78..0b9f93547 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala @@ -74,7 +74,7 @@ abstract class AbstractOperation(opType: OperationType, session: Session) var timeCost = "" newState match { case RUNNING => startTime = System.currentTimeMillis() - case ERROR | FINISHED | CANCELED => + case ERROR | FINISHED | CANCELED | TIMEOUT => completedTime = System.currentTimeMillis() timeCost = s", time taken: ${(completedTime - startTime) / 1000.0} seconds" case _ =>