[KYUUBI #6843] Fix 'query-timeout-thread' thread leak

### Why are the changes needed?

see https://github.com/apache/kyuubi/issues/6843

If the session manager's ThreadPoolExecutor refuses to execute asyncOperation,   then we need to shut down the query-timeout-thread in the catch

### How was this patch tested?

 1 Use jstack to view threads on the long-lived engine side
![image](https://github.com/user-attachments/assets/95d3a897-001d-4250-bf13-172b6997021b)

 2  Wait for all SQL statements in the engine to finish executing, and then use stack to check the number of query-timeout-thread threads, which should be empty.
![image](https://github.com/user-attachments/assets/0afbc026-7dd3-4594-afd2-92a5ef23f6cb)

### Was this patch authored or co-authored using generative AI tooling?

NO

Closes #6844 from ASiegeLion/master.

Closes #6843

9107a300e [liupeiyue] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak
4b3417f21 [liupeiyue] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak
ef1f66bb5 [liupeiyue] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak
9e1a015f6 [liupeiyue] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak
78a9fde09 [liupeiyue] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak

Authored-by: liupeiyue <liupeiyue@yy.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
(cherry picked from commit a051253774)
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
liupeiyue 2024-12-27 18:02:16 +08:00 committed by Cheng Pan
parent cc6702da6a
commit e5bdc2139f
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
6 changed files with 6 additions and 0 deletions

View File

@ -163,6 +163,7 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio
val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e)
setOperationException(ke)
setState(OperationState.ERROR)
shutdownTimeoutMonitor()
throw ke
}
}

View File

@ -127,6 +127,7 @@ class ExecutePython(
val ke =
KyuubiSQLException("Error submitting python in background", rejected)
setOperationException(ke)
shutdownTimeoutMonitor()
throw ke
}
} else {

View File

@ -154,6 +154,7 @@ class ExecuteScala(
val ke =
KyuubiSQLException("Error submitting scala in background", rejected)
setOperationException(ke)
shutdownTimeoutMonitor()
throw ke
}
} else {

View File

@ -124,6 +124,7 @@ class ExecuteStatement(
val ke =
KyuubiSQLException("Error submitting query in background, query rejected", rejected)
setOperationException(ke)
shutdownTimeoutMonitor()
throw ke
}
} else {

View File

@ -74,6 +74,7 @@ class ExecuteStatement(
val ke =
KyuubiSQLException("Error submitting query in background, query rejected", rejected)
setOperationException(ke)
shutdownTimeoutMonitor()
throw ke
}
} else {

View File

@ -95,6 +95,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
}
setOperationException(ke)
setState(OperationState.ERROR)
shutdownTimeoutMonitor()
throw ke
}
}