[KYUUBI #647] Capture exceptions in async ExecuteStatement

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/NetEase/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
When ExecuteStatement running in async, if engine broken(e.g. killed by yarn), the thread just crash without `setOperationException(ke)`, then client(like `beeline`) will hang.

Close #647

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

Manual tested on our yarn cluster: kill yarn app when executing statement.
1. `beeline` connect hiveserver2. Result: exit with error code 2
2. `beeline` connect kyuubi(before this patch). Result: hang
3. `beeline` connect kyuubi(after this patch). Result:  exit with error code 2

Closes #646 from pan3793/async-hang.

Closes #647

418fca9 [Cheng Pan] move engine crash test to KyuubiOperationPerConnectionSuite
03e5d73 [Cheng Pan] simplify ut
1651991 [Cheng Pan] tune test name
392dd03 [Cheng Pan] ut
728c040 [Cheng Pan] address comments
8031528 [Cheng Pan] Capture exceptions in async ExecuteStatement

Authored-by: Cheng Pan <379377944@qq.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
Cheng Pan 2021-05-27 01:11:01 +08:00 committed by Kent Yao
parent c8b2ec4f58
commit 8ce6d01b25
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
2 changed files with 19 additions and 2 deletions

View File

@ -79,7 +79,7 @@ class ExecuteStatement(
} catch onError()
}
private def waitStatementComplete(): Unit = {
private def waitStatementComplete(): Unit = try {
setState(OperationState.RUNNING)
var statusResp = client.GetOperationStatus(statusReq)
var isComplete = false
@ -119,7 +119,7 @@ class ExecuteStatement(
}
// see if anymore log could be fetched
getQueryLog()
}
} catch onError()
private def getQueryLog(): Unit = {
getOperationLog.foreach { logger =>

View File

@ -17,6 +17,8 @@
package org.apache.kyuubi.operation
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TGetOperationStatusReq, TOperationState, TStatusCode}
import org.apache.kyuubi.WithKyuubiServer
import org.apache.kyuubi.config.KyuubiConf
@ -30,4 +32,19 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with BasicJDBCT
override protected val conf: KyuubiConf = {
KyuubiConf().set(KyuubiConf.ENGINE_SHARE_LEVEL, "connection")
}
test("KYUUBI #647 - engine crash") {
withSessionHandle { (client, handle) =>
val executeStmtReq = new TExecuteStatementReq()
executeStmtReq.setStatement("select java_method('java.lang.System', 'exit', 1)")
executeStmtReq.setSessionHandle(handle)
executeStmtReq.setRunAsync(true)
val executeStmtResp = client.ExecuteStatement(executeStmtReq)
val getOpStatusReq = new TGetOperationStatusReq(executeStmtResp.getOperationHandle)
val getOpStatusResp = client.GetOperationStatus(getOpStatusReq)
assert(getOpStatusResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
assert(getOpStatusResp.getOperationState === TOperationState.ERROR_STATE)
}
}
}