diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala index 6cace4452..ad7191c09 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala @@ -52,6 +52,8 @@ class KyuubiSyncThriftClient private ( @volatile private var _engineUrl: Option[String] = _ @volatile private var _engineName: Option[String] = _ + private[kyuubi] def engineConnectionClosed: Boolean = !protocol.getTransport.isOpen + private val lock = new ReentrantLock() // Visible for testing. @@ -84,7 +86,7 @@ class KyuubiSyncThriftClient private ( "engine-alive-probe-" + _aliveProbeSessionHandle) val task = new Runnable { override def run(): Unit = { - if (!remoteEngineBroken && protocol.getTransport.isOpen) { + if (!remoteEngineBroken && !engineConnectionClosed) { engineAliveProbeClient.foreach { client => val tGetInfoReq = new TGetInfoReq() tGetInfoReq.setSessionHandle(_aliveProbeSessionHandle) @@ -134,7 +136,7 @@ class KyuubiSyncThriftClient private ( * Lock every rpc call to send them sequentially */ private def withLockAcquired[T](block: => T): T = Utils.withLockRequired(lock) { - if (!protocol.getTransport.isOpen) { + if (engineConnectionClosed) { throw KyuubiSQLException.connectionDoesNotExist() } block diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index 237eb3ca6..809be86f3 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -291,8 +291,9 @@ class KyuubiSessionImpl( var engineAliveMaxFailCount = 3 var engineAliveFailCount = 0 - def checkEngineAlive(): Boolean = { + def checkEngineConnectionAlive(): Boolean = { try { + if (Option(client).exists(_.engineConnectionClosed)) return false if (!aliveProbeEnabled) return true getInfo(TGetInfoType.CLI_DBMS_VER) engineLastAlive = System.currentTimeMillis() diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index 6e74bedbf..d2547bca9 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -63,7 +63,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { private var batchLimiter: Option[SessionLimiter] = None lazy val (signingPrivateKey, signingPublicKey) = SignUtils.generateKeyPair() - private val engineAliveChecker = + private val engineConnectionAliveChecker = ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-engine-alive-checker") override def initialize(conf: KyuubiConf): Unit = { @@ -350,7 +350,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { val interval = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL) val checkTask: Runnable = () => { allSessions().foreach { session => - if (!session.asInstanceOf[KyuubiSessionImpl].checkEngineAlive()) { + if (!session.asInstanceOf[KyuubiSessionImpl].checkEngineConnectionAlive()) { try { closeSession(session.handle) logger.info(s"The session ${session.handle} has been closed " + @@ -362,7 +362,11 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { } } } - engineAliveChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS) + engineConnectionAliveChecker.scheduleWithFixedDelay( + checkTask, + interval, + interval, + TimeUnit.MILLISECONDS) } }