From a00b9fa9d09aa2812f7b98711ef56a983956d136 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 26 Mar 2021 23:50:24 +0800 Subject: [PATCH] [KYUUBI #456] Can not cancel SQL job if the timeout value is small ![ulysses-you](https://badgen.net/badge/Hello/ulysses-you/green) [![Closes #461](https://badgen.net/badge/Preview/Closes%20%23461/blue)](https://github.com/yaooqinn/kyuubi/pull/461) ![10](https://badgen.net/badge/%2B/10/red) ![17](https://badgen.net/badge/-/17/green) ![6](https://badgen.net/badge/commits/6/yellow) ![Test Plan](https://badgen.net/badge/Missing/Test%20Plan/ff0000) ![Feature](https://badgen.net/badge/Label/Feature/) [Powered by Pull Request Badge](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info) ### _Why are the changes needed?_ * avoid condition race * avoid job leak ### _How was this patch tested?_ pass `org.apache.kyuubi.engine.spark.SparkEngineSuites` close #456 Closes #461 from ulysses-you/thread. 970ba7a [ulysses-you] fix 8786779 [ulysses-you] npe d82321e [ulysses-you] remove c28e671 [ulysses-you] interrupt execute statement 8dccf73 [ulysses-you] cleanup 29857e3 [ulysses-you] sync Authored-by: ulysses-you Signed-off-by: Kent Yao (cherry picked from commit 066075ac746a65de51108627dad8d97538c384fd) Signed-off-by: Kent Yao --- .../spark/operation/ExecuteStatement.scala | 16 ++-------------- .../engine/spark/operation/SparkOperation.scala | 9 +++++++-- .../kyuubi/operation/AbstractOperation.scala | 2 +- 3 files changed, 10 insertions(+), 17 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index 1adc44c7f..11142d41b 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -19,8 +19,6 @@ package org.apache.kyuubi.engine.spark.operation import java.util.concurrent.{RejectedExecutionException, TimeUnit} -import scala.util.control.NonFatal - import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types._ @@ -116,18 +114,8 @@ class ExecuteStatement( ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread") timeoutExecutor.schedule(new Runnable { override def run(): Unit = { - try { - if (getStatus.state != OperationState.TIMEOUT) { - info(s"Query with $statementId timed out after $queryTimeout seconds") - cleanup(OperationState.TIMEOUT) - } - } catch { - case NonFatal(e) => - setOperationException(KyuubiSQLException(e)) - error(s"Error cancelling the query after timeout: $queryTimeout seconds") - } finally { - timeoutExecutor.shutdown() - } + cleanup(OperationState.TIMEOUT) + timeoutExecutor.shutdown() } }, queryTimeout, TimeUnit.SECONDS) } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index c3397daad..4e76ed9d9 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -49,9 +49,10 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio protected def resultSchema: StructType - protected def cleanup(targetState: OperationState): Unit = synchronized { + protected def cleanup(targetState: OperationState): Unit = state.synchronized { if (!isTerminalState(state)) { setState(targetState) + Option(getBackgroundHandle).foreach(_.cancel(true)) spark.sparkContext.cancelJobGroup(statementId) } } @@ -92,7 +93,11 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio if (cancel) spark.sparkContext.cancelJobGroup(statementId) state.synchronized { val errMsg = KyuubiSQLException.stringifyException(e) - if (isTerminalState(state)) { + if (state == OperationState.TIMEOUT) { + val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg") + setOperationException(ke) + throw ke + } else if (isTerminalState(state)) { warn(s"Ignore exception in terminal state with $statementId: $errMsg") } else { setState(OperationState.ERROR) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala index 1f42e7b78..0b9f93547 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala @@ -74,7 +74,7 @@ abstract class AbstractOperation(opType: OperationType, session: Session) var timeCost = "" newState match { case RUNNING => startTime = System.currentTimeMillis() - case ERROR | FINISHED | CANCELED => + case ERROR | FINISHED | CANCELED | TIMEOUT => completedTime = System.currentTimeMillis() timeCost = s", time taken: ${(completedTime - startTime) / 1000.0} seconds" case _ =>