[KYUUBI #456] Can not cancel SQL job if the timeout value is small
 [](https://github.com/yaooqinn/kyuubi/pull/461)      [<img width="16" alt="Powered by Pull Request Badge" src="https://user-images.githubusercontent.com/1393946/111216524-d2bb8e00-85d4-11eb-821b-ed4c00989c02.png">](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT --> <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> * avoid condition race * avoid job leak ### _How was this patch tested?_ pass `org.apache.kyuubi.engine.spark.SparkEngineSuites` close #456 Closes #461 from ulysses-you/thread. 970ba7a [ulysses-you] fix 8786779 [ulysses-you] npe d82321e [ulysses-you] remove c28e671 [ulysses-you] interrupt execute statement 8dccf73 [ulysses-you] cleanup 29857e3 [ulysses-you] sync Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
parent
768d583767
commit
066075ac74
@ -19,8 +19,6 @@ package org.apache.kyuubi.engine.spark.operation
|
||||
|
||||
import java.util.concurrent.{RejectedExecutionException, TimeUnit}
|
||||
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import org.apache.spark.sql.{DataFrame, SparkSession}
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
@ -116,18 +114,8 @@ class ExecuteStatement(
|
||||
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
|
||||
timeoutExecutor.schedule(new Runnable {
|
||||
override def run(): Unit = {
|
||||
try {
|
||||
if (getStatus.state != OperationState.TIMEOUT) {
|
||||
info(s"Query with $statementId timed out after $queryTimeout seconds")
|
||||
cleanup(OperationState.TIMEOUT)
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
setOperationException(KyuubiSQLException(e))
|
||||
error(s"Error cancelling the query after timeout: $queryTimeout seconds")
|
||||
} finally {
|
||||
timeoutExecutor.shutdown()
|
||||
}
|
||||
cleanup(OperationState.TIMEOUT)
|
||||
timeoutExecutor.shutdown()
|
||||
}
|
||||
}, queryTimeout, TimeUnit.SECONDS)
|
||||
}
|
||||
|
||||
@ -49,9 +49,10 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
|
||||
|
||||
protected def resultSchema: StructType
|
||||
|
||||
protected def cleanup(targetState: OperationState): Unit = synchronized {
|
||||
protected def cleanup(targetState: OperationState): Unit = state.synchronized {
|
||||
if (!isTerminalState(state)) {
|
||||
setState(targetState)
|
||||
Option(getBackgroundHandle).foreach(_.cancel(true))
|
||||
spark.sparkContext.cancelJobGroup(statementId)
|
||||
}
|
||||
}
|
||||
@ -92,7 +93,11 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
|
||||
if (cancel) spark.sparkContext.cancelJobGroup(statementId)
|
||||
state.synchronized {
|
||||
val errMsg = KyuubiSQLException.stringifyException(e)
|
||||
if (isTerminalState(state)) {
|
||||
if (state == OperationState.TIMEOUT) {
|
||||
val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
|
||||
setOperationException(ke)
|
||||
throw ke
|
||||
} else if (isTerminalState(state)) {
|
||||
warn(s"Ignore exception in terminal state with $statementId: $errMsg")
|
||||
} else {
|
||||
setState(OperationState.ERROR)
|
||||
|
||||
@ -74,7 +74,7 @@ abstract class AbstractOperation(opType: OperationType, session: Session)
|
||||
var timeCost = ""
|
||||
newState match {
|
||||
case RUNNING => startTime = System.currentTimeMillis()
|
||||
case ERROR | FINISHED | CANCELED =>
|
||||
case ERROR | FINISHED | CANCELED | TIMEOUT =>
|
||||
completedTime = System.currentTimeMillis()
|
||||
timeCost = s", time taken: ${(completedTime - startTime) / 1000.0} seconds"
|
||||
case _ =>
|
||||
|
||||
Loading…
Reference in New Issue
Block a user