diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala index d0f1f065d..979594c23 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala @@ -34,7 +34,7 @@ import org.apache.kyuubi.jdbc.KyuubiHiveDriver import org.apache.kyuubi.jdbc.hive.{KyuubiConnection, KyuubiSQLException} import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem} import org.apache.kyuubi.plugin.SessionConfAdvisor -import org.apache.kyuubi.session.{KyuubiSessionManager, SessionType} +import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, SessionHandle, SessionType} /** * UT with Connection level engine shared cost much time, only run basic jdbc tests. @@ -291,6 +291,55 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe assert(e.getMessage.contains("client should catch this exception")) } } + + test("support to interrupt the thrift request if remote engine is broken") { + withSessionConf(Map( + KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true", + KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "1000", + KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "1000"))(Map.empty)( + Map.empty) { + withSessionHandle { (client, handle) => + val preReq = new TExecuteStatementReq() + preReq.setStatement("select engine_name()") + preReq.setSessionHandle(handle) + preReq.setRunAsync(false) + client.ExecuteStatement(preReq) + + val sessionHandle = SessionHandle(handle) + val session = server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager] + .getSession(sessionHandle).asInstanceOf[KyuubiSessionImpl] + + val exitReq = new TExecuteStatementReq() + exitReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 1000L)," + + "java_method('java.lang.System', 'exit', 1)") + exitReq.setSessionHandle(handle) + exitReq.setRunAsync(true) + client.ExecuteStatement(exitReq) + + session.sessionManager.getConf + .set(KyuubiConf.OPERATION_STATUS_UPDATE_INTERVAL, 3000L) + + val executeStmtReq = new TExecuteStatementReq() + executeStmtReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 30000l)") + executeStmtReq.setSessionHandle(handle) + executeStmtReq.setRunAsync(false) + val startTime = System.currentTimeMillis() + val executeStmtResp = client.ExecuteStatement(executeStmtReq) + assert(executeStmtResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) + assert(executeStmtResp.getStatus.getErrorMessage.contains( + "java.net.SocketException") || + executeStmtResp.getStatus.getErrorMessage.contains( + "org.apache.thrift.transport.TTransportException") || + executeStmtResp.getStatus.getErrorMessage.contains( + "connection does not exist")) + val elapsedTime = System.currentTimeMillis() - startTime + assert(elapsedTime < 20 * 1000) + eventually(timeout(3.seconds)) { + assert(session.client.asyncRequestInterrupted) + } + } + } + } } class TestSessionConfAdvisor extends SessionConfAdvisor { diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala index 21bf56b4f..87a2dc7d2 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala @@ -29,7 +29,7 @@ import org.apache.kyuubi.config.KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX import org.apache.kyuubi.engine.SemanticVersion import org.apache.kyuubi.jdbc.hive.KyuubiStatement import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem} -import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, SessionHandle} +import org.apache.kyuubi.session.{KyuubiSessionImpl, SessionHandle} import org.apache.kyuubi.zookeeper.ZookeeperConf class KyuubiOperationPerUserSuite @@ -166,52 +166,6 @@ class KyuubiOperationPerUserSuite assert(r1 !== r2) } - test("support to interrupt the thrift request if remote engine is broken") { - assume(!httpMode) - withSessionConf(Map( - KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true", - KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "1000", - KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "1000"))(Map.empty)( - Map.empty) { - withSessionHandle { (client, handle) => - val preReq = new TExecuteStatementReq() - preReq.setStatement("select engine_name()") - preReq.setSessionHandle(handle) - preReq.setRunAsync(false) - client.ExecuteStatement(preReq) - - val sessionHandle = SessionHandle(handle) - val session = server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager] - .getSession(sessionHandle).asInstanceOf[KyuubiSessionImpl] - session.client.getEngineAliveProbeProtocol.foreach(_.getTransport.close()) - - val exitReq = new TExecuteStatementReq() - exitReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 1000L)," + - "java_method('java.lang.System', 'exit', 1)") - exitReq.setSessionHandle(handle) - exitReq.setRunAsync(true) - client.ExecuteStatement(exitReq) - - val executeStmtReq = new TExecuteStatementReq() - executeStmtReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 30000l)") - executeStmtReq.setSessionHandle(handle) - executeStmtReq.setRunAsync(false) - val startTime = System.currentTimeMillis() - val executeStmtResp = client.ExecuteStatement(executeStmtReq) - assert(executeStmtResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) - assert(executeStmtResp.getStatus.getErrorMessage.contains( - "java.net.SocketException") || - executeStmtResp.getStatus.getErrorMessage.contains( - "org.apache.thrift.transport.TTransportException") || - executeStmtResp.getStatus.getErrorMessage.contains( - "connection does not exist")) - val elapsedTime = System.currentTimeMillis() - startTime - assert(elapsedTime < 20 * 1000) - assert(session.client.asyncRequestInterrupted) - } - } - } - test("max result rows") { Seq("true", "false").foreach { incremental => Seq("thrift", "arrow").foreach { resultFormat =>