[KYUUBI #1327] Fix socket timeout error when client sync execute statement cost time longer than engine.login.timeout

### _Why are the changes needed?_

Fix #1327

#### Analysis

The typical error logs are

```log
2021-11-01 11:27:54.018 INFO client.KyuubiSyncThriftClient: TCloseOperationReq(operationHandle:TOperationHandle(operationId:THandleIdentifier(guid:47 69 37 B3 13 38 48 DA 87 7A 8A B6 BD 22 FA 57, secret:C1 01 AE 0B 6F 5F 48 F1 9A F0 FD 84 E3 0F 2B 1E), operationType:EXECUTE_STATEMENT, hasResultSet:true)) succeed on engine side
2021-11-01 11:27:54.019 INFO operation.ExecuteStatement: Processing sy.yao's query[c581db45-67f2-4f15-ac0d-aaecdb713fe9]: INITIALIZED_STATE -> PENDING_STATE, statement: [...SQL]
2021-11-01 11:27:54.019 INFO operation.ExecuteStatement: Processing sy.yao's query[c581db45-67f2-4f15-ac0d-aaecdb713fe9]: PENDING_STATE -> RUNNING_STATE, statement: [...SQL]
2021-11-01 11:28:09.034 INFO operation.ExecuteStatement: Processing sy.yao's query[c581db45-67f2-4f15-ac0d-aaecdb713fe9]: RUNNING_STATE -> ERROR_STATE, statement: [...SQL], time taken: 15.015 seconds
2021-11-01 11:28:09.035 INFO operation.ExecuteStatement: Processing sy.yao's query[c581db45-67f2-4f15-ac0d-aaecdb713fe9]: ERROR_STATE -> CLOSED_STATE, statement: [...SQL]
2021-11-01 11:28:09.035 WARN server.KyuubiThriftBinaryFrontendService: Error executing statement:
org.apache.kyuubi.KyuubiSQLException: Error operating EXECUTE_STATEMENT: org.apache.thrift.transport.TTransportException: java.net.SocketTimeoutException: Read timed out
[...omit detail stacktrace here]
```
The key points here are:

1. client execute query in sync mode, Hive JDBC client use async mode since 2.1.0 [HIVE-6535](https://issues.apache.org/jira/browse/HIVE-6535)
2. query execute cost time great than `kyuubi.session.engine.login.timeout`, which default is `15s`

Kyuubi server create Thrift clients use `kyuubi.session.engine.login.timeout` as both `socket_timeout` and `connect_timeout`, and there is no heartbeat/keepalive mechanism in Thrift Protocol layer, thus if engine does not send response to Kyuubi server in `socket_timeout`, the Thrift client(Kyuubi server) will fail with `SocketTimeoutException: Read timed out`.

In sync mode, when user send a execute statement request to Kyuubi server, Kyuubi server forword the request to the engine, engine return nothing until the execution finished or any other error happend.

In async mode, the query request passes in same way, but engine will return an `op_handle` immediately instead of waiting for query to finish, the client should use the `op_handle` to check query status periodically, in detail

1. Kyuubi server ask engine for query status, and keep the result in memory
2. User ask Kyuubi server for query status, Kyuubi server get result from memory

#### Solution

Option 1: change `socket_timeout` of Thrift clients created by Kyuubi server to infinite. This approach may cause another issue. if engine crash when executing the query, Kyuubi server will wait until the socket keepalive(controlled by OS) timeout, so the query status will always be `running` in Kyuubi server memory.

Option 2: always run a query in async mode, simulate sync mode in Server side.

This PR implement the option 2, also introduce a new conf `kyuubi.session.engine.request.timeout`

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1328 from pan3793/timeout.

Closes #1327

cc15e6d5 [Cheng Pan] Always execute statement in async mode
dc91d0d5 [Cheng Pan] Add new conf `kyuubi.session.engine.request.timeout`

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Cheng Pan 2021-11-04 18:23:10 +08:00
parent b29b39b116
commit 5421b56440
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
5 changed files with 60 additions and 24 deletions

View File

@ -289,7 +289,8 @@ kyuubi\.session\.engine<br>\.check\.interval|<div style='width: 65pt;word-wrap:
kyuubi\.session\.engine<br>\.idle\.timeout|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT30M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>engine timeout, the engine will self-terminate when it's not accessed for this duration</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.initialize\.timeout|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT3M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Timeout for starting the background engine, e.g. SparkSQLEngine.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.log\.timeout|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT24H</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>If we use Spark as the engine then the session submit log is the console output of spark-submit. We will retain the session submit log until over the config value.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.1.0</div>
kyuubi\.session\.engine<br>\.login\.timeout|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT15S</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The timeout(ms) of creating the connection to remote sql query engine</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.login\.timeout|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT15S</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The timeout of creating the connection to remote sql query engine</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.request\.timeout|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The timeout of awaiting response after sending request to remote sql query engine</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.session\.engine<br>\.share\.level|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>USER</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>(deprecated) - Using kyuubi.engine.share.level instead</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.spark\.main\.resource|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.startup\.error\.max<br>\.size|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>8192</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>During engine bootstrapping, if error occurs, using this config to limit the length error message(characters).</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.1.0</div>

View File

@ -509,11 +509,17 @@ object KyuubiConf {
.createOptional
val ENGINE_LOGIN_TIMEOUT: ConfigEntry[Long] = buildConf("session.engine.login.timeout")
.doc("The timeout(ms) of creating the connection to remote sql query engine")
.doc("The timeout of creating the connection to remote sql query engine")
.version("1.0.0")
.timeConf
.createWithDefault(Duration.ofSeconds(15).toMillis)
val ENGINE_REQUEST_TIMEOUT: ConfigEntry[Long] = buildConf("session.engine.request.timeout")
.doc("The timeout of awaiting response after sending request to remote sql query engine")
.version("1.4.0")
.timeConf
.createWithDefault(Duration.ofSeconds(60).toMillis)
val ENGINE_INIT_TIMEOUT: ConfigEntry[Long] = buildConf("session.engine.initialize.timeout")
.doc("Timeout for starting the background engine, e.g. SparkSQLEngine.")
.version("1.0.0")

View File

@ -77,7 +77,10 @@ class ExecuteStatement(
ms.incCount(STATEMENT_OPEN)
ms.incCount(STATEMENT_TOTAL)
}
_remoteOpHandle = client.executeStatement(statement, shouldRunAsync, queryTimeout)
// We need to avoid executing query in sync mode, because there is no heartbeat mechanism
// in thrift protocol, in sync mode, we cannot distinguish between long-run query and
// engine crash without response before socket read timeout.
_remoteOpHandle = client.executeStatement(statement, true, queryTimeout)
} catch onError()
}
@ -86,7 +89,7 @@ class ExecuteStatement(
var statusResp: TGetOperationStatusResp = null
var currentAttempts = 0
def getOperationStatusWithRetry: Unit = {
def fetchOperationStatusWithRetry(): Unit = {
try {
statusResp = client.getOperationStatus(_remoteOpHandle)
currentAttempts = 0 // reset attempts whenever get touch with engine again
@ -104,7 +107,7 @@ class ExecuteStatement(
}
// initialize operation status
while (statusResp == null) { getOperationStatusWithRetry }
while (statusResp == null) { fetchOperationStatusWithRetry() }
var isComplete = false
while (!isComplete) {
@ -116,7 +119,7 @@ class ExecuteStatement(
remoteState match {
case INITIALIZED_STATE | PENDING_STATE | RUNNING_STATE =>
isComplete = false
getOperationStatusWithRetry
fetchOperationStatusWithRetry()
case FINISHED_STATE =>
setState(OperationState.FINISHED)
@ -164,22 +167,15 @@ class ExecuteStatement(
}
override protected def runInternal(): Unit = {
if (shouldRunAsync) {
executeStatement()
val sessionManager = session.sessionManager
val asyncOperation = new Runnable {
override def run(): Unit = waitStatementComplete()
}
try {
val backgroundOperation =
sessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundOperation)
} catch onError("submitting query in background, query rejected")
} else {
setState(OperationState.RUNNING)
executeStatement()
setState(OperationState.FINISHED)
}
executeStatement()
val sessionManager = session.sessionManager
val asyncOperation: Runnable = () => waitStatementComplete()
try {
val opHandle = sessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(opHandle)
} catch onError("submitting query in background, query rejected")
if (!shouldRunAsync) getBackgroundHandle.get()
}
override def setState(newState: OperationState): Unit = {

View File

@ -80,8 +80,9 @@ class KyuubiSessionImpl(
private def openSession(host: String, port: Int): Unit = {
val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous")
val loginTimeout = sessionConf.get(ENGINE_LOGIN_TIMEOUT).toInt
val requestTimeout = sessionConf.get(ENGINE_REQUEST_TIMEOUT).toInt
transport = PlainSASLHelper.getPlainTransport(
user, passwd, new TSocket(host, port, loginTimeout))
user, passwd, new TSocket(host, port, requestTimeout, loginTimeout))
if (!transport.isOpen) {
transport.open()
logSessionInfo(s"Connected to engine [$host:$port]")

View File

@ -37,7 +37,7 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe
KyuubiConf().set(KyuubiConf.ENGINE_SHARE_LEVEL, "connection")
}
test("KYUUBI #647 - engine crash") {
test("KYUUBI #647 - async query causes engine crash") {
withSessionHandle { (client, handle) =>
val executeStmtReq = new TExecuteStatementReq()
executeStmtReq.setStatement("select java_method('java.lang.System', 'exit', 1)")
@ -65,4 +65,36 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe
assert(verboseMessage.contains("Failed to detect the root cause"))
}
}
test("client sync query cost time longer than engine.request.timeout") {
withSessionConf(Map(
KyuubiConf.ENGINE_REQUEST_TIMEOUT.key -> "PT5S"
))(Map.empty)(Map.empty) {
withSessionHandle { (client, handle) =>
val executeStmtReq = new TExecuteStatementReq()
executeStmtReq.setStatement("select java_method('java.lang.Thread', 'sleep', 6000L)")
executeStmtReq.setSessionHandle(handle)
executeStmtReq.setRunAsync(false)
val executeStmtResp = client.ExecuteStatement(executeStmtReq)
val getOpStatusReq = new TGetOperationStatusReq(executeStmtResp.getOperationHandle)
val getOpStatusResp = client.GetOperationStatus(getOpStatusReq)
assert(getOpStatusResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
assert(getOpStatusResp.getOperationState === TOperationState.FINISHED_STATE)
}
}
}
test("sync query causes engine crash") {
withSessionHandle { (client, handle) =>
val executeStmtReq = new TExecuteStatementReq()
executeStmtReq.setStatement("select java_method('java.lang.System', 'exit', 1)")
executeStmtReq.setSessionHandle(handle)
executeStmtReq.setRunAsync(false)
val executeStmtResp = client.ExecuteStatement(executeStmtReq)
assert(executeStmtResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(executeStmtResp.getOperationHandle === null)
assert(executeStmtResp.getStatus.getErrorMessage contains
"Caused by: java.net.SocketException: Broken pipe (Write failed)")
}
}
}