[KYUUBI #6843] [FOLLOWUP] Fix 'query-timeout-thread' thread leak

### Why are the changes needed?

If the session manager's ThreadPoolExecutor refuses to execute the asyncOperation, then we need to shut down the query-timeout-thread in the catch block. This should also be done in JDBC and the CHAT engine.

### How was this patch tested?

### Was this patch authored or co-authored using generative AI tooling?

Closes #6873 from lsm1/branch-followup-6843.

Closes #6843

aed9088c8 [senmiaoliu] fix query timeout checker leak in chat engine and jdbc engine

Authored-by: senmiaoliu <senmiaoliu@trip.com>
Signed-off-by: senmiaoliu <senmiaoliu@trip.com>
This commit is contained in:
senmiaoliu 2025-01-10 10:30:00 +08:00
parent a051253774
commit 622190197d
2 changed files with 30 additions and 7 deletions

View File

@ -16,7 +16,9 @@
*/
package org.apache.kyuubi.engine.chat.operation
import org.apache.kyuubi.Logging
import java.util.concurrent.RejectedExecutionException
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.chat.provider.ChatProvider
import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationState}
import org.apache.kyuubi.operation.log.OperationLog
@ -41,9 +43,19 @@ class ExecuteStatement(
executeStatement()
}
}
val chatSessionManager = session.sessionManager
val backgroundHandle = chatSessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundHandle)
try {
val chatSessionManager = session.sessionManager
val backgroundHandle = chatSessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)
val ke =
KyuubiSQLException("Error submitting query in background, query rejected", rejected)
setOperationException(ke)
shutdownTimeoutMonitor()
throw ke
}
} else {
executeStatement()
}

View File

@ -17,6 +17,7 @@
package org.apache.kyuubi.engine.jdbc.operation
import java.sql.{Connection, Statement, Types}
import java.util.concurrent.RejectedExecutionException
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.jdbc.schema.{Column, Row, Schema}
@ -50,9 +51,19 @@ class ExecuteStatement(
executeStatement()
}
}
val jdbcSessionManager = session.sessionManager
val backgroundHandle = jdbcSessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundHandle)
try {
val jdbcSessionManager = session.sessionManager
val backgroundHandle = jdbcSessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)
val ke =
KyuubiSQLException("Error submitting query in background, query rejected", rejected)
setOperationException(ke)
shutdownTimeoutMonitor()
throw ke
}
} else {
executeStatement()
}