[KYUUBI #2927][1.5] Fix the thread in ScheduleThreadExecutorPool can't be shutdown immediately

Close #2927 for branch-1.5

### _Why are the changes needed?_

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #2934 from iodone/dev-1.5.

Closes #2927

97d5f924 [odone] [KYUUBI #2927][1.5] fixed

Authored-by: odone <odone.zhang@gmail.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
This commit is contained in:
odone 2022-06-23 13:25:13 +08:00 committed by ulysses-you
parent 3fcff34dab
commit ff05cf8934
No known key found for this signature in database
GPG Key ID: 4C500BC62D576766
4 changed files with 33 additions and 3 deletions

View File

@ -179,7 +179,7 @@ class ExecuteStatement(
private def addTimeoutMonitor(): Unit = {
if (queryTimeout > 0) {
val timeoutExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false)
val action: Runnable = () => cleanup(OperationState.TIMEOUT)
timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
statementTimeoutCleaner = Some(timeoutExecutor)

View File

@ -127,7 +127,7 @@ class ExecuteStatement(
private def addTimeoutMonitor(): Unit = {
if (queryTimeout > 0) {
val timeoutExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false)
timeoutExecutor.schedule(
new Runnable {
override def run(): Unit = {

View File

@ -26,11 +26,15 @@ import org.apache.kyuubi.{KyuubiException, Logging}
object ThreadUtils extends Logging {
def newDaemonSingleThreadScheduledExecutor(threadName: String): ScheduledExecutorService = {
def newDaemonSingleThreadScheduledExecutor(
threadName: String,
executeExistingDelayedTasksAfterShutdown: Boolean = true): ScheduledExecutorService = {
val threadFactory = new NamedThreadFactory(threadName, daemon = true)
val executor = new ScheduledThreadPoolExecutor(1, threadFactory)
executor.setRemoveOnCancelPolicy(true)
executor
.setExecuteExistingDelayedTasksAfterShutdownPolicy(executeExistingDelayedTasksAfterShutdown)
executor
}
def newDaemonQueuedThreadPool(

View File

@ -35,4 +35,30 @@ class ThreadUtilsSuite extends KyuubiFunSuite {
service.awaitTermination(10, TimeUnit.SECONDS)
assert(threadName startsWith "ThreadUtilsTest")
}
test("New daemon single thread scheduled executor for shutdownNow") {
val service = ThreadUtils.newDaemonSingleThreadScheduledExecutor("ThreadUtilsTest")
@volatile var threadName = ""
service.submit(new Runnable {
override def run(): Unit = {
threadName = Thread.currentThread().getName
}
})
service.shutdownNow()
service.awaitTermination(10, TimeUnit.SECONDS)
assert(threadName startsWith "")
}
test("New daemon single thread scheduled executor for cancel delayed tasks") {
val service = ThreadUtils.newDaemonSingleThreadScheduledExecutor("ThreadUtilsTest", false)
@volatile var threadName = ""
service.submit(new Runnable {
override def run(): Unit = {
threadName = Thread.currentThread().getName
}
})
service.shutdown()
service.awaitTermination(10, TimeUnit.SECONDS)
assert(threadName startsWith "")
}
}