diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index bf4ec52af..58bdfd382 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -17,7 +17,7 @@ package org.apache.kyuubi.engine.spark.operation -import java.util.concurrent.{RejectedExecutionException, TimeUnit} +import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit} import org.apache.spark.kyuubi.SQLOperationListener import org.apache.spark.sql.{DataFrame, SparkSession} @@ -46,6 +46,8 @@ class ExecuteStatement( spark.conf.getOption(KyuubiConf.OPERATION_SCHEDULER_POOL.key).orElse( session.sessionManager.getConf.get(KyuubiConf.OPERATION_SCHEDULER_POOL)) + private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None + private val operationLog: OperationLog = OperationLog.createOperationLog(session.handle, getHandle) override def getOperationLog: Option[OperationLog] = Option(operationLog) @@ -85,6 +87,7 @@ class ExecuteStatement( onError(cancel = true) } finally { spark.sparkContext.removeSparkListener(operationListener) + statementTimeoutCleaner.foreach(_.shutdown()) } } @@ -138,9 +141,9 @@ class ExecuteStatement( timeoutExecutor.schedule(new Runnable { override def run(): Unit = { cleanup(OperationState.TIMEOUT) - timeoutExecutor.shutdown() } }, queryTimeout, TimeUnit.SECONDS) + statementTimeoutCleaner = Some(timeoutExecutor) } } }