From 47562b464bd4b7a384ee219aabc5fe7b4e1aa364 Mon Sep 17 00:00:00 2001 From: fwang12 Date: Mon, 10 Jul 2023 12:28:10 +0800 Subject: [PATCH] [KYUUBI #4847][FOLLOWUP] Close the session immediately when engine connection closed ### _Why are the changes needed?_ If the session between kyuubi server and kyuubi engine has been inactive, we should close the kyuubi session as well. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request Closes #5031 from turboFei/close_session_inactive. Closes #4847 2eea080b5 [fwang12] fix 964ead778 [fwang12] check engine connection alive 62642f734 [fwang12] save Authored-by: fwang12 Signed-off-by: Cheng Pan --- .../apache/kyuubi/client/KyuubiSyncThriftClient.scala | 6 ++++-- .../org/apache/kyuubi/session/KyuubiSessionImpl.scala | 3 ++- .../apache/kyuubi/session/KyuubiSessionManager.scala | 10 +++++++--- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala index 6cace4452..ad7191c09 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala @@ -52,6 +52,8 @@ class KyuubiSyncThriftClient private ( @volatile private var _engineUrl: Option[String] = _ @volatile private var _engineName: Option[String] = _ + private[kyuubi] def engineConnectionClosed: Boolean = !protocol.getTransport.isOpen + private val lock = new ReentrantLock() // Visible for testing. @@ -84,7 +86,7 @@ class KyuubiSyncThriftClient private ( "engine-alive-probe-" + _aliveProbeSessionHandle) val task = new Runnable { override def run(): Unit = { - if (!remoteEngineBroken && protocol.getTransport.isOpen) { + if (!remoteEngineBroken && !engineConnectionClosed) { engineAliveProbeClient.foreach { client => val tGetInfoReq = new TGetInfoReq() tGetInfoReq.setSessionHandle(_aliveProbeSessionHandle) @@ -134,7 +136,7 @@ class KyuubiSyncThriftClient private ( * Lock every rpc call to send them sequentially */ private def withLockAcquired[T](block: => T): T = Utils.withLockRequired(lock) { - if (!protocol.getTransport.isOpen) { + if (engineConnectionClosed) { throw KyuubiSQLException.connectionDoesNotExist() } block diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index 237eb3ca6..809be86f3 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -291,8 +291,9 @@ class KyuubiSessionImpl( var engineAliveMaxFailCount = 3 var engineAliveFailCount = 0 - def checkEngineAlive(): Boolean = { + def checkEngineConnectionAlive(): Boolean = { try { + if (Option(client).exists(_.engineConnectionClosed)) return false if (!aliveProbeEnabled) return true getInfo(TGetInfoType.CLI_DBMS_VER) engineLastAlive = System.currentTimeMillis() diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index 6e74bedbf..d2547bca9 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -63,7 +63,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { private var batchLimiter: Option[SessionLimiter] = None lazy val (signingPrivateKey, signingPublicKey) = SignUtils.generateKeyPair() - private val engineAliveChecker = + private val engineConnectionAliveChecker = ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-engine-alive-checker") override def initialize(conf: KyuubiConf): Unit = { @@ -350,7 +350,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { val interval = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL) val checkTask: Runnable = () => { allSessions().foreach { session => - if (!session.asInstanceOf[KyuubiSessionImpl].checkEngineAlive()) { + if (!session.asInstanceOf[KyuubiSessionImpl].checkEngineConnectionAlive()) { try { closeSession(session.handle) logger.info(s"The session ${session.handle} has been closed " + @@ -362,7 +362,11 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { } } } - engineAliveChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS) + engineConnectionAliveChecker.scheduleWithFixedDelay( + checkTask, + interval, + interval, + TimeUnit.MILLISECONDS) } }