[KYUUBI #4847][FOLLOWUP] Close the session immediately when engine connection closed
### _Why are the changes needed?_ If the session between kyuubi server and kyuubi engine has been inactive, we should close the kyuubi session as well. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request Closes #5031 from turboFei/close_session_inactive. Closes #4847 2eea080b5 [fwang12] fix 964ead778 [fwang12] check engine connection alive 62642f734 [fwang12] save Authored-by: fwang12 <fwang12@ebay.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
7210234a1c
commit
47562b464b
@ -52,6 +52,8 @@ class KyuubiSyncThriftClient private (
|
||||
@volatile private var _engineUrl: Option[String] = _
|
||||
@volatile private var _engineName: Option[String] = _
|
||||
|
||||
private[kyuubi] def engineConnectionClosed: Boolean = !protocol.getTransport.isOpen
|
||||
|
||||
private val lock = new ReentrantLock()
|
||||
|
||||
// Visible for testing.
|
||||
@ -84,7 +86,7 @@ class KyuubiSyncThriftClient private (
|
||||
"engine-alive-probe-" + _aliveProbeSessionHandle)
|
||||
val task = new Runnable {
|
||||
override def run(): Unit = {
|
||||
if (!remoteEngineBroken && protocol.getTransport.isOpen) {
|
||||
if (!remoteEngineBroken && !engineConnectionClosed) {
|
||||
engineAliveProbeClient.foreach { client =>
|
||||
val tGetInfoReq = new TGetInfoReq()
|
||||
tGetInfoReq.setSessionHandle(_aliveProbeSessionHandle)
|
||||
@ -134,7 +136,7 @@ class KyuubiSyncThriftClient private (
|
||||
* Lock every rpc call to send them sequentially
|
||||
*/
|
||||
private def withLockAcquired[T](block: => T): T = Utils.withLockRequired(lock) {
|
||||
if (!protocol.getTransport.isOpen) {
|
||||
if (engineConnectionClosed) {
|
||||
throw KyuubiSQLException.connectionDoesNotExist()
|
||||
}
|
||||
block
|
||||
|
||||
@ -291,8 +291,9 @@ class KyuubiSessionImpl(
|
||||
var engineAliveMaxFailCount = 3
|
||||
var engineAliveFailCount = 0
|
||||
|
||||
def checkEngineAlive(): Boolean = {
|
||||
def checkEngineConnectionAlive(): Boolean = {
|
||||
try {
|
||||
if (Option(client).exists(_.engineConnectionClosed)) return false
|
||||
if (!aliveProbeEnabled) return true
|
||||
getInfo(TGetInfoType.CLI_DBMS_VER)
|
||||
engineLastAlive = System.currentTimeMillis()
|
||||
|
||||
@ -63,7 +63,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
|
||||
private var batchLimiter: Option[SessionLimiter] = None
|
||||
lazy val (signingPrivateKey, signingPublicKey) = SignUtils.generateKeyPair()
|
||||
|
||||
private val engineAliveChecker =
|
||||
private val engineConnectionAliveChecker =
|
||||
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-engine-alive-checker")
|
||||
|
||||
override def initialize(conf: KyuubiConf): Unit = {
|
||||
@ -350,7 +350,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
|
||||
val interval = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL)
|
||||
val checkTask: Runnable = () => {
|
||||
allSessions().foreach { session =>
|
||||
if (!session.asInstanceOf[KyuubiSessionImpl].checkEngineAlive()) {
|
||||
if (!session.asInstanceOf[KyuubiSessionImpl].checkEngineConnectionAlive()) {
|
||||
try {
|
||||
closeSession(session.handle)
|
||||
logger.info(s"The session ${session.handle} has been closed " +
|
||||
@ -362,7 +362,11 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
|
||||
}
|
||||
}
|
||||
}
|
||||
engineAliveChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS)
|
||||
engineConnectionAliveChecker.scheduleWithFixedDelay(
|
||||
checkTask,
|
||||
interval,
|
||||
interval,
|
||||
TimeUnit.MILLISECONDS)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user