[KYUUBI #456] Can not cancel SQL job if the timeout value is small

![ulysses-you](https://badgen.net/badge/Hello/ulysses-you/green) [![Closes #461](https://badgen.net/badge/Preview/Closes%20%23461/blue)](https://github.com/yaooqinn/kyuubi/pull/461) ![10](https://badgen.net/badge/%2B/10/red) ![17](https://badgen.net/badge/-/17/green) ![6](https://badgen.net/badge/commits/6/yellow) ![Test Plan](https://badgen.net/badge/Missing/Test%20Plan/ff0000) ![Feature](https://badgen.net/badge/Label/Feature/) [<img width="16" alt="Powered by Pull Request Badge" src="https://user-images.githubusercontent.com/1393946/111216524-d2bb8e00-85d4-11eb-821b-ed4c00989c02.png">](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
* avoid condition race
* avoid job leak

### _How was this patch tested?_
pass `org.apache.kyuubi.engine.spark.SparkEngineSuites`
close #456

Closes #461 from ulysses-you/thread.

970ba7a [ulysses-you] fix
8786779 [ulysses-you] npe
d82321e [ulysses-you] remove
c28e671 [ulysses-you] interrupt execute statement
8dccf73 [ulysses-you] cleanup
29857e3 [ulysses-you] sync

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit 066075ac74)
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
ulysses-you 2021-03-26 23:50:24 +08:00 committed by Kent Yao
parent 1503af361b
commit a00b9fa9d0
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
3 changed files with 10 additions and 17 deletions

View File

@ -19,8 +19,6 @@ package org.apache.kyuubi.engine.spark.operation
import java.util.concurrent.{RejectedExecutionException, TimeUnit}
import scala.util.control.NonFatal
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._
@ -116,18 +114,8 @@ class ExecuteStatement(
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
timeoutExecutor.schedule(new Runnable {
override def run(): Unit = {
try {
if (getStatus.state != OperationState.TIMEOUT) {
info(s"Query with $statementId timed out after $queryTimeout seconds")
cleanup(OperationState.TIMEOUT)
}
} catch {
case NonFatal(e) =>
setOperationException(KyuubiSQLException(e))
error(s"Error cancelling the query after timeout: $queryTimeout seconds")
} finally {
timeoutExecutor.shutdown()
}
cleanup(OperationState.TIMEOUT)
timeoutExecutor.shutdown()
}
}, queryTimeout, TimeUnit.SECONDS)
}

View File

@ -49,9 +49,10 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
protected def resultSchema: StructType
protected def cleanup(targetState: OperationState): Unit = synchronized {
protected def cleanup(targetState: OperationState): Unit = state.synchronized {
if (!isTerminalState(state)) {
setState(targetState)
Option(getBackgroundHandle).foreach(_.cancel(true))
spark.sparkContext.cancelJobGroup(statementId)
}
}
@ -92,7 +93,11 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
if (cancel) spark.sparkContext.cancelJobGroup(statementId)
state.synchronized {
val errMsg = KyuubiSQLException.stringifyException(e)
if (isTerminalState(state)) {
if (state == OperationState.TIMEOUT) {
val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
setOperationException(ke)
throw ke
} else if (isTerminalState(state)) {
warn(s"Ignore exception in terminal state with $statementId: $errMsg")
} else {
setState(OperationState.ERROR)

View File

@ -74,7 +74,7 @@ abstract class AbstractOperation(opType: OperationType, session: Session)
var timeCost = ""
newState match {
case RUNNING => startTime = System.currentTimeMillis()
case ERROR | FINISHED | CANCELED =>
case ERROR | FINISHED | CANCELED | TIMEOUT =>
completedTime = System.currentTimeMillis()
timeCost = s", time taken: ${(completedTime - startTime) / 1000.0} seconds"
case _ =>