[KYUUBI #4830][TEST] Fix flaky test "support to interrupt the thrift request if remote engine is broken"
### _Why are the changes needed?_ Move 'support to interrupt the thrift request if remote engine is broken' ut to `KyuubiOperationPerConnectionSuite` and fix it. close #4830 ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4833 from wForget/KYUUBI-4830. Closes #4830 838021175 [wforget] test 214d22559 [wforget] fix c7b96f0e2 [wforget] fix fbf837ffd [wforget] fix e6ada6af9 [wforget] [KYUUBI #4830] Fix flaky test: KyuubiOperationPerUserSuite: max result rows Authored-by: wforget <643348094@qq.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
474f0972a4
commit
272673a41f
@ -34,7 +34,7 @@ import org.apache.kyuubi.jdbc.KyuubiHiveDriver
|
||||
import org.apache.kyuubi.jdbc.hive.{KyuubiConnection, KyuubiSQLException}
|
||||
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
|
||||
import org.apache.kyuubi.plugin.SessionConfAdvisor
|
||||
import org.apache.kyuubi.session.{KyuubiSessionManager, SessionType}
|
||||
import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, SessionHandle, SessionType}
|
||||
|
||||
/**
|
||||
* UT with Connection level engine shared cost much time, only run basic jdbc tests.
|
||||
@ -291,6 +291,55 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe
|
||||
assert(e.getMessage.contains("client should catch this exception"))
|
||||
}
|
||||
}
|
||||
|
||||
test("support to interrupt the thrift request if remote engine is broken") {
|
||||
withSessionConf(Map(
|
||||
KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true",
|
||||
KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "1000",
|
||||
KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "1000"))(Map.empty)(
|
||||
Map.empty) {
|
||||
withSessionHandle { (client, handle) =>
|
||||
val preReq = new TExecuteStatementReq()
|
||||
preReq.setStatement("select engine_name()")
|
||||
preReq.setSessionHandle(handle)
|
||||
preReq.setRunAsync(false)
|
||||
client.ExecuteStatement(preReq)
|
||||
|
||||
val sessionHandle = SessionHandle(handle)
|
||||
val session = server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
|
||||
.getSession(sessionHandle).asInstanceOf[KyuubiSessionImpl]
|
||||
|
||||
val exitReq = new TExecuteStatementReq()
|
||||
exitReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 1000L)," +
|
||||
"java_method('java.lang.System', 'exit', 1)")
|
||||
exitReq.setSessionHandle(handle)
|
||||
exitReq.setRunAsync(true)
|
||||
client.ExecuteStatement(exitReq)
|
||||
|
||||
session.sessionManager.getConf
|
||||
.set(KyuubiConf.OPERATION_STATUS_UPDATE_INTERVAL, 3000L)
|
||||
|
||||
val executeStmtReq = new TExecuteStatementReq()
|
||||
executeStmtReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 30000l)")
|
||||
executeStmtReq.setSessionHandle(handle)
|
||||
executeStmtReq.setRunAsync(false)
|
||||
val startTime = System.currentTimeMillis()
|
||||
val executeStmtResp = client.ExecuteStatement(executeStmtReq)
|
||||
assert(executeStmtResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
|
||||
assert(executeStmtResp.getStatus.getErrorMessage.contains(
|
||||
"java.net.SocketException") ||
|
||||
executeStmtResp.getStatus.getErrorMessage.contains(
|
||||
"org.apache.thrift.transport.TTransportException") ||
|
||||
executeStmtResp.getStatus.getErrorMessage.contains(
|
||||
"connection does not exist"))
|
||||
val elapsedTime = System.currentTimeMillis() - startTime
|
||||
assert(elapsedTime < 20 * 1000)
|
||||
eventually(timeout(3.seconds)) {
|
||||
assert(session.client.asyncRequestInterrupted)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class TestSessionConfAdvisor extends SessionConfAdvisor {
|
||||
|
||||
@ -29,7 +29,7 @@ import org.apache.kyuubi.config.KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX
|
||||
import org.apache.kyuubi.engine.SemanticVersion
|
||||
import org.apache.kyuubi.jdbc.hive.KyuubiStatement
|
||||
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
|
||||
import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, SessionHandle}
|
||||
import org.apache.kyuubi.session.{KyuubiSessionImpl, SessionHandle}
|
||||
import org.apache.kyuubi.zookeeper.ZookeeperConf
|
||||
|
||||
class KyuubiOperationPerUserSuite
|
||||
@ -166,52 +166,6 @@ class KyuubiOperationPerUserSuite
|
||||
assert(r1 !== r2)
|
||||
}
|
||||
|
||||
test("support to interrupt the thrift request if remote engine is broken") {
|
||||
assume(!httpMode)
|
||||
withSessionConf(Map(
|
||||
KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true",
|
||||
KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "1000",
|
||||
KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "1000"))(Map.empty)(
|
||||
Map.empty) {
|
||||
withSessionHandle { (client, handle) =>
|
||||
val preReq = new TExecuteStatementReq()
|
||||
preReq.setStatement("select engine_name()")
|
||||
preReq.setSessionHandle(handle)
|
||||
preReq.setRunAsync(false)
|
||||
client.ExecuteStatement(preReq)
|
||||
|
||||
val sessionHandle = SessionHandle(handle)
|
||||
val session = server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
|
||||
.getSession(sessionHandle).asInstanceOf[KyuubiSessionImpl]
|
||||
session.client.getEngineAliveProbeProtocol.foreach(_.getTransport.close())
|
||||
|
||||
val exitReq = new TExecuteStatementReq()
|
||||
exitReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 1000L)," +
|
||||
"java_method('java.lang.System', 'exit', 1)")
|
||||
exitReq.setSessionHandle(handle)
|
||||
exitReq.setRunAsync(true)
|
||||
client.ExecuteStatement(exitReq)
|
||||
|
||||
val executeStmtReq = new TExecuteStatementReq()
|
||||
executeStmtReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 30000l)")
|
||||
executeStmtReq.setSessionHandle(handle)
|
||||
executeStmtReq.setRunAsync(false)
|
||||
val startTime = System.currentTimeMillis()
|
||||
val executeStmtResp = client.ExecuteStatement(executeStmtReq)
|
||||
assert(executeStmtResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
|
||||
assert(executeStmtResp.getStatus.getErrorMessage.contains(
|
||||
"java.net.SocketException") ||
|
||||
executeStmtResp.getStatus.getErrorMessage.contains(
|
||||
"org.apache.thrift.transport.TTransportException") ||
|
||||
executeStmtResp.getStatus.getErrorMessage.contains(
|
||||
"connection does not exist"))
|
||||
val elapsedTime = System.currentTimeMillis() - startTime
|
||||
assert(elapsedTime < 20 * 1000)
|
||||
assert(session.client.asyncRequestInterrupted)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("max result rows") {
|
||||
Seq("true", "false").foreach { incremental =>
|
||||
Seq("thrift", "arrow").foreach { resultFormat =>
|
||||
|
||||
Loading…
Reference in New Issue
Block a user