[KYUUBI #451] Support query auto timeout cancel on thriftserver

![ulysses-you](https://badgen.net/badge/Hello/ulysses-you/green) [![Closes #451](https://badgen.net/badge/Preview/Closes%20%23451/blue)](https://github.com/yaooqinn/kyuubi/pull/451) ![200](https://badgen.net/badge/%2B/200/red) ![17](https://badgen.net/badge/-/17/green) ![27](https://badgen.net/badge/commits/27/yellow) ![Target Issue](https://badgen.net/badge/Missing/Target%20Issue/ff0000) ![Test Plan](https://badgen.net/badge/Missing/Test%20Plan/ff0000) ![Feature](https://badgen.net/badge/Label/Feature/) [<img width="16" alt="Powered by Pull Request Badge" src="https://user-images.githubusercontent.com/1393946/111216524-d2bb8e00-85d4-11eb-821b-ed4c00989c02.png">](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
Manual cherry-pick some Spark patch into Kyuubi.
1. [Support query auto timeout cancel on thriftserver](https://github.com/apache/spark/pull/29933)
2. [Add config to control if cancel invoke interrupt task on thriftserver](https://github.com/apache/spark/pull/30481)

In order to keep backward with early Spark version, we hard code the config key instead of refer to Spark SQLConf.

Note that, the exists timeout of operator (`kyuubi.operation.idle.timeout`) is to cancel that client has no access with engine. That said if a query run a long time and the client is alive, the query would not be cancelled. Then the new added config `spark.sql.thriftServer.queryTimeout` can handle this case.

### _How was this patch tested?_
Add new test.

Closes #451 from ulysses-you/query-timeout.

212f579 [ulysses-you] docs
9206538 [ulysses-you] empty flaky test
ddab9bf [ulysses-you] flaty test
1da02a0 [ulysses-you] flaty test
edfadf1 [ulysses-you] nit
3f9920b [ulysses-you] address comment
9492c48 [ulysses-you] correct timeout
5df997e [ulysses-you] nit
2124952 [ulysses-you] address comment
192fdcc [ulysses-you] fix tets
d684af6 [ulysses-you] global config
1d1adda [ulysses-you] empty
967a63e [ulysses-you] correct import
128948e [ulysses-you] add session conf in session
144d51b [ulysses-you] fix
a90248b [ulysses-you] unused import
c90386f [ulysses-you] timeout move to operation manager
d780965 [ulysses-you] update docs
a5f7138 [ulysses-you] fix test
f7c7308 [ulysses-you] config name
7f3fb3d [ulysses-you] change conf place
97a011e [ulysses-you] unnecessary change
0953a76 [ulysses-you] move test
38ac0c0 [ulysses-you] Merge branch 'master' of https://github.com/yaooqinn/kyuubi into query-timeout
71bea97 [ulysses-you] refector implementation
35ef6f9 [ulysses-you] update conf
0cad8e2 [ulysses-you] Support query auto timeout cancel on thriftserver

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit fecdba32a7)
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
ulysses-you 2021-03-26 14:04:09 +08:00 committed by Kent Yao
parent 68cf158279
commit 1503af361b
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
10 changed files with 200 additions and 17 deletions

View File

@ -285,6 +285,8 @@ kyuubi\.kinit<br>\.principal|<div style='width: 80pt;word-wrap: break-word;white
Key | Default | Meaning | Since
--- | --- | --- | ---
kyuubi\.operation\.idle<br>\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT3H</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Operation will be closed when it's not accessed for this duration of time</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.operation<br>\.interrupt\.on\.cancel|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished.</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.operation<br>\.query\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT0S</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Set a query duration timeout in seconds in Kyuubi. If the timeout is set to a positive value, a running query will be cancelled automatically if timeout. Otherwise the query continues to run till completion. If timeout values are set for each statement via `java.sql.Statement.setQueryTimeout` and they are smaller than this configuration value, they take precedence. If you set this timeout and prefer to cancel the queries right away without waiting task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together.</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.operation<br>\.status\.polling<br>\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT5S</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Timeout(ms) for long polling asynchronous running sql query's status</div>|<div style='width: 20pt'>1.0.0</div>
### Session

View File

@ -17,24 +17,32 @@
package org.apache.kyuubi.engine.spark.operation
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.{RejectedExecutionException, TimeUnit}
import scala.util.control.NonFatal
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.{ArrayFetchIterator, KyuubiSparkUtil}
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.ThreadUtils
class ExecuteStatement(
spark: SparkSession,
session: Session,
protected override val statement: String,
override val shouldRunAsync: Boolean)
override val shouldRunAsync: Boolean,
queryTimeout: Long)
extends SparkOperation(spark, OperationType.EXECUTE_STATEMENT, session) with Logging {
private val forceCancel =
session.sessionManager.getConf.get(KyuubiConf.OPERATION_FORCE_CANCEL)
private val operationLog: OperationLog =
OperationLog.createOperationLog(session.handle, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
@ -63,7 +71,7 @@ class ExecuteStatement(
setState(OperationState.RUNNING)
info(KyuubiSparkUtil.diagnostics(spark))
Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
spark.sparkContext.setJobGroup(statementId, statement)
spark.sparkContext.setJobGroup(statementId, statement, forceCancel)
result = spark.sql(statement)
debug(result.queryExecution)
iter = new ArrayFetchIterator(result.collect())
@ -76,6 +84,7 @@ class ExecuteStatement(
}
override protected def runInternal(): Unit = {
addTimeoutMonitor()
if (shouldRunAsync) {
val asyncOperation = new Runnable {
override def run(): Unit = {
@ -100,4 +109,27 @@ class ExecuteStatement(
executeStatement()
}
}
private def addTimeoutMonitor(): Unit = {
if (queryTimeout > 0) {
val timeoutExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
timeoutExecutor.schedule(new Runnable {
override def run(): Unit = {
try {
if (getStatus.state != OperationState.TIMEOUT) {
info(s"Query with $statementId timed out after $queryTimeout seconds")
cleanup(OperationState.TIMEOUT)
}
} catch {
case NonFatal(e) =>
setOperationException(KyuubiSQLException(e))
error(s"Error cancelling the query after timeout: $queryTimeout seconds")
} finally {
timeoutExecutor.shutdown()
}
}
}, queryTimeout, TimeUnit.SECONDS)
}
}
}

View File

@ -58,7 +58,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
runAsync: Boolean,
queryTimeout: Long): Operation = {
val spark = getSparkSession(session.handle)
val operation = new ExecuteStatement(spark, session, statement, runAsync)
val operation = new ExecuteStatement(spark, session, statement, runAsync, queryTimeout)
addOperation(operation)
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark
import java.sql.{SQLTimeoutException, Statement}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import org.apache.spark.TaskKilled
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.SparkSession
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.JDBCTestUtils
class SparkEngineSuites extends KyuubiFunSuite {
test("Add config to control if cancel invoke interrupt task on engine") {
Seq(true, false).foreach { force =>
withSparkJdbcStatement(Map(KyuubiConf.OPERATION_FORCE_CANCEL.key -> force.toString)) {
case (statement, spark) =>
val index = new AtomicInteger(0)
val forceCancel = new AtomicBoolean(false)
val listener = new SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
assert(taskEnd.reason.isInstanceOf[TaskKilled])
if (forceCancel.get()) {
assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime < 3000)
index.incrementAndGet()
} else {
assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime >= 4000)
index.incrementAndGet()
}
}
}
spark.sparkContext.addSparkListener(listener)
try {
statement.setQueryTimeout(3)
forceCancel.set(force)
val e1 = intercept[SQLTimeoutException] {
statement.execute("select java_method('java.lang.Thread', 'sleep', 5000L)")
}.getMessage
assert(e1.contains("Query timed out"))
eventually(Timeout(30.seconds)) {
assert(index.get() == 1)
}
} finally {
spark.sparkContext.removeSparkListener(listener)
}
}
}
}
private def withSparkJdbcStatement(
conf: Map[String, String] = Map.empty)(
statement: (Statement, SparkSession) => Unit): Unit = {
val spark = new WithSparkSuite {
override def withKyuubiConf: Map[String, String] = conf
override protected def jdbcUrl: String = getJdbcUrl
}
spark.startSparkEngine()
val tmp: Statement => Unit = { tmpStatement =>
statement(tmpStatement, spark.getSpark)
}
try {
spark.withJdbcStatement()(tmp)
} finally {
spark.stopSparkEngine()
}
}
}
trait WithSparkSuite extends WithSparkSQLEngine with JDBCTestUtils

View File

@ -36,7 +36,7 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
super.beforeAll()
}
protected def startSparkEngine(): Unit = {
def startSparkEngine(): Unit = {
val warehousePath = Utils.createTempDir()
val metastorePath = Utils.createTempDir()
warehousePath.toFile.delete()
@ -63,7 +63,7 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
stopSparkEngine()
}
protected def stopSparkEngine(): Unit = {
def stopSparkEngine(): Unit = {
// we need to clean up conf since it's the global config in same jvm.
withKyuubiConf.foreach { case (k, _) =>
System.clearProperty(k)
@ -83,4 +83,5 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
}
protected def getJdbcUrl: String = s"jdbc:hive2://$connectionUrl/;"
def getSpark: SparkSession = spark
}

View File

@ -502,6 +502,27 @@ object KyuubiConf {
.timeConf
.createWithDefault(Duration.ofSeconds(5).toMillis)
val OPERATION_FORCE_CANCEL: ConfigEntry[Boolean] =
buildConf("operation.interrupt.on.cancel")
.doc("When true, all running tasks will be interrupted if one cancels a query. " +
"When false, all running tasks will remain until finished.")
.version("1.2.0")
.booleanConf
.createWithDefault(true)
val OPERATION_QUERY_TIMEOUT: ConfigEntry[Long] =
buildConf("operation.query.timeout")
.doc("Set a query duration timeout in seconds in Kyuubi. If the timeout is set to " +
"a positive value, a running query will be cancelled automatically if timeout. " +
"Otherwise the query continues to run till completion. If timeout values are " +
"set for each statement via `java.sql.Statement.setQueryTimeout` and they are smaller " +
"than this configuration value, they take precedence. If you set this timeout and prefer " +
"to cancel the queries right away without waiting task to finish, consider enabling " +
s"${OPERATION_FORCE_CANCEL.key} together.")
.version("1.2.0")
.timeConf
.createWithDefault(Duration.ZERO.toMillis)
val ENGINE_SHARED_LEVEL: ConfigEntry[String] = buildConf("session.engine.share.level")
.doc("The SQL engine App will be shared in different levels, available configs are: <ul>" +
" <li>CONNECTION: the App will not be shared but only used by the current client" +

View File

@ -34,7 +34,7 @@ trait JDBCTestUtils extends KyuubiFunSuite {
protected val patterns = Seq("", "*", "%", null, ".*", "_*", "_%", ".%")
protected def jdbcUrl: String
protected def withMultipleConnectionJdbcStatement(
def withMultipleConnectionJdbcStatement(
tableNames: String*)(fs: (Statement => Unit)*): Unit = {
val connections = fs.map { _ => DriverManager.getConnection(jdbcUrl, user, "") }
val statements = connections.map(_.createStatement())
@ -57,7 +57,7 @@ trait JDBCTestUtils extends KyuubiFunSuite {
}
}
protected def withDatabases(dbNames: String*)(fs: (Statement => Unit)*): Unit = {
def withDatabases(dbNames: String*)(fs: (Statement => Unit)*): Unit = {
val connections = fs.map { _ => DriverManager.getConnection(jdbcUrl, user, "") }
val statements = connections.map(_.createStatement())
@ -75,11 +75,11 @@ trait JDBCTestUtils extends KyuubiFunSuite {
}
}
protected def withJdbcStatement(tableNames: String*)(f: Statement => Unit): Unit = {
def withJdbcStatement(tableNames: String*)(f: Statement => Unit): Unit = {
withMultipleConnectionJdbcStatement(tableNames: _*)(f)
}
protected def withThriftClient(f: TCLIService.Iface => Unit): Unit = {
def withThriftClient(f: TCLIService.Iface => Unit): Unit = {
val hostAndPort = jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head.split(":")
val host = hostAndPort.head
val port = hostAndPort(1).toInt
@ -96,7 +96,7 @@ trait JDBCTestUtils extends KyuubiFunSuite {
}
}
protected def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = {
def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername(user)
@ -117,7 +117,7 @@ trait JDBCTestUtils extends KyuubiFunSuite {
}
}
protected def checkGetSchemas(
def checkGetSchemas(
rs: ResultSet, dbNames: Seq[String], catalogName: String = ""): Unit = {
var count = 0
while(rs.next()) {

View File

@ -17,7 +17,7 @@
package org.apache.kyuubi.operation
import java.sql.{Date, SQLException, Timestamp}
import java.sql.{Date, SQLException, SQLTimeoutException, Timestamp}
import scala.collection.JavaConverters._
@ -327,4 +327,26 @@ trait JDBCTests extends BasicJDBCTests {
assert(metaData.getScale(1) === 0)
}
}
test("Support query auto timeout cancel on thriftserver - setQueryTimeout") {
withJdbcStatement() { statement =>
statement.setQueryTimeout(1)
val e = intercept[SQLTimeoutException] {
statement.execute("select java_method('java.lang.Thread', 'sleep', 10000L)")
}.getMessage
assert(e.contains("Query timed out after"))
statement.setQueryTimeout(0)
val rs1 = statement.executeQuery(
"select 'test', java_method('java.lang.Thread', 'sleep', 3000L)")
rs1.next()
assert(rs1.getString(1) == "test")
statement.setQueryTimeout(-1)
val rs2 = statement.executeQuery(
"select 'test', java_method('java.lang.Thread', 'sleep', 3000L)")
rs2.next()
assert(rs2.getString(1) == "test")
}
}
}

View File

@ -31,7 +31,8 @@ class ExecuteStatement(
client: TCLIService.Iface,
remoteSessionHandle: TSessionHandle,
override val statement: String,
override val shouldRunAsync: Boolean)
override val shouldRunAsync: Boolean,
queryTimeout: Long)
extends KyuubiOperation(
OperationType.EXECUTE_STATEMENT, session, client, remoteSessionHandle) {
@ -64,6 +65,7 @@ class ExecuteStatement(
try {
val req = new TExecuteStatementReq(remoteSessionHandle, statement)
req.setRunAsync(shouldRunAsync)
req.setQueryTimeout(queryTimeout)
val resp = client.ExecuteStatement(req)
verifyTStatus(resp.getStatus)
_remoteOpHandle = resp.getOperationHandle

View File

@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
import org.apache.hive.service.rpc.thrift.{TCLIService, TFetchResultsReq, TRow, TRowSet, TSessionHandle}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.{Session, SessionHandle}
import org.apache.kyuubi.util.ThriftUtils
@ -49,6 +50,18 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
tSessionHandle
}
private def getQueryTimeout(clientQueryTimeout: Long): Long = {
// If clientQueryTimeout is smaller than systemQueryTimeout value,
// we use the clientQueryTimeout value.
val systemQueryTimeout = getConf.get(KyuubiConf.OPERATION_QUERY_TIMEOUT)
if (clientQueryTimeout > 0 &&
(systemQueryTimeout <= 0 || clientQueryTimeout < systemQueryTimeout)) {
clientQueryTimeout
} else {
systemQueryTimeout
}
}
def setConnection(
sessionHandle: SessionHandle,
client: TCLIService.Iface,
@ -69,9 +82,9 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
queryTimeout: Long): Operation = {
val client = getThriftClient(session.handle)
val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
val operation = new ExecuteStatement(session, client, remoteSessionHandle, statement, runAsync)
val operation = new ExecuteStatement(session, client, remoteSessionHandle, statement, runAsync,
getQueryTimeout(queryTimeout))
addOperation(operation)
}
override def newGetTypeInfoOperation(session: Session): Operation = {
@ -143,7 +156,6 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
addOperation(operation)
}
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation, maxRows: Int): TRowSet = {