[KYUUBI #6843] Fix 'query-timeout-thread' thread leak
### Why are the changes needed? see https://github.com/apache/kyuubi/issues/6843 If the session manager's ThreadPoolExecutor refuses to execute asyncOperation, then we need to shut down the query-timeout-thread in the catch ### How was this patch tested? 1 Use jstack to view threads on the long-lived engine side  2 Wait for all SQL statements in the engine to finish executing, and then use stack to check the number of query-timeout-thread threads, which should be empty.  ### Was this patch authored or co-authored using generative AI tooling? NO Closes #6844 from ASiegeLion/master. Closes #6843 9107a300e [liupeiyue] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak 4b3417f21 [liupeiyue] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak ef1f66bb5 [liupeiyue] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak 9e1a015f6 [liupeiyue] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak 78a9fde09 [liupeiyue] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak Authored-by: liupeiyue <liupeiyue@yy.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
2b37c037dc
commit
a051253774
@ -163,6 +163,7 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio
|
||||
val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e)
|
||||
setOperationException(ke)
|
||||
setState(OperationState.ERROR)
|
||||
shutdownTimeoutMonitor()
|
||||
throw ke
|
||||
}
|
||||
}
|
||||
|
||||
@ -127,6 +127,7 @@ class ExecutePython(
|
||||
val ke =
|
||||
KyuubiSQLException("Error submitting python in background", rejected)
|
||||
setOperationException(ke)
|
||||
shutdownTimeoutMonitor()
|
||||
throw ke
|
||||
}
|
||||
} else {
|
||||
|
||||
@ -154,6 +154,7 @@ class ExecuteScala(
|
||||
val ke =
|
||||
KyuubiSQLException("Error submitting scala in background", rejected)
|
||||
setOperationException(ke)
|
||||
shutdownTimeoutMonitor()
|
||||
throw ke
|
||||
}
|
||||
} else {
|
||||
|
||||
@ -124,6 +124,7 @@ class ExecuteStatement(
|
||||
val ke =
|
||||
KyuubiSQLException("Error submitting query in background, query rejected", rejected)
|
||||
setOperationException(ke)
|
||||
shutdownTimeoutMonitor()
|
||||
throw ke
|
||||
}
|
||||
} else {
|
||||
|
||||
@ -76,6 +76,7 @@ class ExecuteStatement(
|
||||
val ke =
|
||||
KyuubiSQLException("Error submitting query in background, query rejected", rejected)
|
||||
setOperationException(ke)
|
||||
shutdownTimeoutMonitor()
|
||||
throw ke
|
||||
}
|
||||
} else {
|
||||
|
||||
@ -95,6 +95,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
|
||||
}
|
||||
setOperationException(ke)
|
||||
setState(OperationState.ERROR)
|
||||
shutdownTimeoutMonitor()
|
||||
throw ke
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user