[KYUUBI #735] Shutdown timeout cleaner as far as possible

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/NetEase/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
If some `ExecuteStatement` are canceled before timeout in a short time, the timeout cleaners will hold the resource until timeout.

This PR aims to release the timeout cleaner as far as possible.

### _How was this patch tested?_
Pass `org.apache.kyuubi.engine.spark.SparkEngineSuites`

Closes #735 from ulysses-you/timeout-cleaner.

Closes #735

92bc29a6 [ulysses-you] fix

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
ulysses-you 2021-07-01 17:49:54 +08:00 committed by Kent Yao
parent 52647f0246
commit cc7e239c20
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D

View File

@ -17,7 +17,7 @@
package org.apache.kyuubi.engine.spark.operation
import java.util.concurrent.{RejectedExecutionException, TimeUnit}
import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}
import org.apache.spark.kyuubi.SQLOperationListener
import org.apache.spark.sql.{DataFrame, SparkSession}
@ -46,6 +46,8 @@ class ExecuteStatement(
spark.conf.getOption(KyuubiConf.OPERATION_SCHEDULER_POOL.key).orElse(
session.sessionManager.getConf.get(KyuubiConf.OPERATION_SCHEDULER_POOL))
private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
private val operationLog: OperationLog =
OperationLog.createOperationLog(session.handle, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
@ -85,6 +87,7 @@ class ExecuteStatement(
onError(cancel = true)
} finally {
spark.sparkContext.removeSparkListener(operationListener)
statementTimeoutCleaner.foreach(_.shutdown())
}
}
@ -138,9 +141,9 @@ class ExecuteStatement(
timeoutExecutor.schedule(new Runnable {
override def run(): Unit = {
cleanup(OperationState.TIMEOUT)
timeoutExecutor.shutdown()
}
}, queryTimeout, TimeUnit.SECONDS)
statementTimeoutCleaner = Some(timeoutExecutor)
}
}
}