diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index d0e2bcba2..77a81accb 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -289,7 +289,8 @@ kyuubi\.session\.engine
\.check\.interval|
PT30M
|engine timeout, the engine will self-terminate when it's not accessed for this duration
|duration
|1.0.0
kyuubi\.session\.engine
\.initialize\.timeout|PT3M
|Timeout for starting the background engine, e.g. SparkSQLEngine.
|duration
|1.0.0
kyuubi\.session\.engine
\.log\.timeout|PT24H
|If we use Spark as the engine then the session submit log is the console output of spark-submit. We will retain the session submit log until over the config value.
|duration
|1.1.0
-kyuubi\.session\.engine
\.login\.timeout|PT15S
|The timeout(ms) of creating the connection to remote sql query engine
|duration
|1.0.0
+kyuubi\.session\.engine
\.login\.timeout|PT15S
|The timeout of creating the connection to remote sql query engine
|duration
|1.0.0
+kyuubi\.session\.engine
\.request\.timeout|PT1M
|The timeout of awaiting response after sending request to remote sql query engine
|duration
|1.4.0
kyuubi\.session\.engine
\.share\.level|USER
|(deprecated) - Using kyuubi.engine.share.level instead
|string
|1.0.0
kyuubi\.session\.engine
\.spark\.main\.resource|<undefined>
|The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default
|string
|1.0.0
kyuubi\.session\.engine
\.startup\.error\.max
\.size|8192
|During engine bootstrapping, if error occurs, using this config to limit the length error message(characters).
|int
|1.1.0
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index ffdb7af41..47353690d 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -509,11 +509,17 @@ object KyuubiConf {
.createOptional
val ENGINE_LOGIN_TIMEOUT: ConfigEntry[Long] = buildConf("session.engine.login.timeout")
- .doc("The timeout(ms) of creating the connection to remote sql query engine")
+ .doc("The timeout of creating the connection to remote sql query engine")
.version("1.0.0")
.timeConf
.createWithDefault(Duration.ofSeconds(15).toMillis)
+ val ENGINE_REQUEST_TIMEOUT: ConfigEntry[Long] = buildConf("session.engine.request.timeout")
+ .doc("The timeout of awaiting response after sending request to remote sql query engine")
+ .version("1.4.0")
+ .timeConf
+ .createWithDefault(Duration.ofSeconds(60).toMillis)
+
val ENGINE_INIT_TIMEOUT: ConfigEntry[Long] = buildConf("session.engine.initialize.timeout")
.doc("Timeout for starting the background engine, e.g. SparkSQLEngine.")
.version("1.0.0")
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index e3fdb9b60..6df933b83 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -77,7 +77,10 @@ class ExecuteStatement(
ms.incCount(STATEMENT_OPEN)
ms.incCount(STATEMENT_TOTAL)
}
- _remoteOpHandle = client.executeStatement(statement, shouldRunAsync, queryTimeout)
+ // We need to avoid executing query in sync mode, because there is no heartbeat mechanism
+ // in thrift protocol, in sync mode, we cannot distinguish between long-run query and
+ // engine crash without response before socket read timeout.
+ _remoteOpHandle = client.executeStatement(statement, true, queryTimeout)
} catch onError()
}
@@ -86,7 +89,7 @@ class ExecuteStatement(
var statusResp: TGetOperationStatusResp = null
var currentAttempts = 0
- def getOperationStatusWithRetry: Unit = {
+ def fetchOperationStatusWithRetry(): Unit = {
try {
statusResp = client.getOperationStatus(_remoteOpHandle)
currentAttempts = 0 // reset attempts whenever get touch with engine again
@@ -104,7 +107,7 @@ class ExecuteStatement(
}
// initialize operation status
- while (statusResp == null) { getOperationStatusWithRetry }
+ while (statusResp == null) { fetchOperationStatusWithRetry() }
var isComplete = false
while (!isComplete) {
@@ -116,7 +119,7 @@ class ExecuteStatement(
remoteState match {
case INITIALIZED_STATE | PENDING_STATE | RUNNING_STATE =>
isComplete = false
- getOperationStatusWithRetry
+ fetchOperationStatusWithRetry()
case FINISHED_STATE =>
setState(OperationState.FINISHED)
@@ -164,22 +167,15 @@ class ExecuteStatement(
}
override protected def runInternal(): Unit = {
- if (shouldRunAsync) {
- executeStatement()
- val sessionManager = session.sessionManager
- val asyncOperation = new Runnable {
- override def run(): Unit = waitStatementComplete()
- }
- try {
- val backgroundOperation =
- sessionManager.submitBackgroundOperation(asyncOperation)
- setBackgroundHandle(backgroundOperation)
- } catch onError("submitting query in background, query rejected")
- } else {
- setState(OperationState.RUNNING)
- executeStatement()
- setState(OperationState.FINISHED)
- }
+ executeStatement()
+ val sessionManager = session.sessionManager
+ val asyncOperation: Runnable = () => waitStatementComplete()
+ try {
+ val opHandle = sessionManager.submitBackgroundOperation(asyncOperation)
+ setBackgroundHandle(opHandle)
+ } catch onError("submitting query in background, query rejected")
+
+ if (!shouldRunAsync) getBackgroundHandle.get()
}
override def setState(newState: OperationState): Unit = {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index cb8bf706b..d847b0c85 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -80,8 +80,9 @@ class KyuubiSessionImpl(
private def openSession(host: String, port: Int): Unit = {
val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous")
val loginTimeout = sessionConf.get(ENGINE_LOGIN_TIMEOUT).toInt
+ val requestTimeout = sessionConf.get(ENGINE_REQUEST_TIMEOUT).toInt
transport = PlainSASLHelper.getPlainTransport(
- user, passwd, new TSocket(host, port, loginTimeout))
+ user, passwd, new TSocket(host, port, requestTimeout, loginTimeout))
if (!transport.isOpen) {
transport.open()
logSessionInfo(s"Connected to engine [$host:$port]")
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
index e0011b1ca..0a16da929 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -37,7 +37,7 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe
KyuubiConf().set(KyuubiConf.ENGINE_SHARE_LEVEL, "connection")
}
- test("KYUUBI #647 - engine crash") {
+ test("KYUUBI #647 - async query causes engine crash") {
withSessionHandle { (client, handle) =>
val executeStmtReq = new TExecuteStatementReq()
executeStmtReq.setStatement("select java_method('java.lang.System', 'exit', 1)")
@@ -65,4 +65,36 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe
assert(verboseMessage.contains("Failed to detect the root cause"))
}
}
+
+ test("client sync query cost time longer than engine.request.timeout") {
+ withSessionConf(Map(
+ KyuubiConf.ENGINE_REQUEST_TIMEOUT.key -> "PT5S"
+ ))(Map.empty)(Map.empty) {
+ withSessionHandle { (client, handle) =>
+ val executeStmtReq = new TExecuteStatementReq()
+ executeStmtReq.setStatement("select java_method('java.lang.Thread', 'sleep', 6000L)")
+ executeStmtReq.setSessionHandle(handle)
+ executeStmtReq.setRunAsync(false)
+ val executeStmtResp = client.ExecuteStatement(executeStmtReq)
+ val getOpStatusReq = new TGetOperationStatusReq(executeStmtResp.getOperationHandle)
+ val getOpStatusResp = client.GetOperationStatus(getOpStatusReq)
+ assert(getOpStatusResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
+ assert(getOpStatusResp.getOperationState === TOperationState.FINISHED_STATE)
+ }
+ }
+ }
+
+ test("sync query causes engine crash") {
+ withSessionHandle { (client, handle) =>
+ val executeStmtReq = new TExecuteStatementReq()
+ executeStmtReq.setStatement("select java_method('java.lang.System', 'exit', 1)")
+ executeStmtReq.setSessionHandle(handle)
+ executeStmtReq.setRunAsync(false)
+ val executeStmtResp = client.ExecuteStatement(executeStmtReq)
+ assert(executeStmtResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
+ assert(executeStmtResp.getOperationHandle === null)
+ assert(executeStmtResp.getStatus.getErrorMessage contains
+ "Caused by: java.net.SocketException: Broken pipe (Write failed)")
+ }
+ }
}