From cc7e239c202798c5debd7ebf548b63b42a2402fc Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Thu, 1 Jul 2021 17:49:54 +0800 Subject: [PATCH] [KYUUBI #735] Shutdown timeout cleaner as far as possible ### _Why are the changes needed?_ If some `ExecuteStatement` are canceled before timeout in a short time, the timeout cleaners will hold the resource until timeout. This PR aims to release the timeout cleaner as far as possible. ### _How was this patch tested?_ Pass `org.apache.kyuubi.engine.spark.SparkEngineSuites` Closes #735 from ulysses-you/timeout-cleaner. Closes #735 92bc29a6 [ulysses-you] fix Authored-by: ulysses-you Signed-off-by: Kent Yao --- .../kyuubi/engine/spark/operation/ExecuteStatement.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index bf4ec52af..58bdfd382 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -17,7 +17,7 @@ package org.apache.kyuubi.engine.spark.operation -import java.util.concurrent.{RejectedExecutionException, TimeUnit} +import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit} import org.apache.spark.kyuubi.SQLOperationListener import org.apache.spark.sql.{DataFrame, SparkSession} @@ -46,6 +46,8 @@ class ExecuteStatement( spark.conf.getOption(KyuubiConf.OPERATION_SCHEDULER_POOL.key).orElse( session.sessionManager.getConf.get(KyuubiConf.OPERATION_SCHEDULER_POOL)) + private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None + private val operationLog: OperationLog = OperationLog.createOperationLog(session.handle, getHandle) override def getOperationLog: Option[OperationLog] = Option(operationLog) @@ -85,6 +87,7 @@ class ExecuteStatement( onError(cancel = true) } finally { spark.sparkContext.removeSparkListener(operationListener) + statementTimeoutCleaner.foreach(_.shutdown()) } } @@ -138,9 +141,9 @@ class ExecuteStatement( timeoutExecutor.schedule(new Runnable { override def run(): Unit = { cleanup(OperationState.TIMEOUT) - timeoutExecutor.shutdown() } }, queryTimeout, TimeUnit.SECONDS) + statementTimeoutCleaner = Some(timeoutExecutor) } } }