[KYUUBI #7121] Improve operation timeout management with configurable executors
### Why are the changes needed? The current mechanism for handling operation timeouts in Kyuubi creates a new `ScheduledExecutorService` with a dedicated thread for each operation. In scenarios with a large number of concurrent operations, this results in excessive thread creation, which consumes substantial system resources and may adversely affect server performance and stability. This PR introduces a shared `ScheduledThreadPool` within the Operation Manager to centrally schedule operation timeouts. This approach avoids the overhead of creating an excessive number of threads, thereby reducing the system load. Additionally, both the pool size and thread keep-alive time are configurable via the `OPERATION_TIMEOUT_POOL_SIZE` and `OPERATION_TIMEOUT_POOL_KEEPALIVE_TIME` parameters. ### How was this patch tested? A new unit test for `newDaemonScheduledThreadPool` was added to `ThreadUtilsSuite.scala`. Furthermore, a dedicated `TimeoutSchedulerSuite` was introduced to verify operation timeout behavior. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #7121 from wangzhigang1999/master. Closes #7121 df7688dbf [wangzhigang] Refactor timeout management configuration and improve documentation 2b03b1e68 [wangzhigang] Remove deprecated `ThreadPoolTimeoutExecutor` class following refactor of operation timeout management. 52a8a516a [wangzhigang] Refactor operation timeout management to use per-OperationManager scheduler 7e46d47f8 [wangzhigang] Refactor timeout management by introducing ThreadPoolTimeoutExecutor f7f10881a [wangzhigang] Add operation timeout management with ThreadPoolTimeoutExecutor d8cd6c7d4 [wangzhigang] Update .gitignore to exclude .bloop and .metals directories Lead-authored-by: wangzhigang <wangzhigang1999@live.cn> Co-authored-by: wangzhigang <wzg443064@alibaba-inc.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
efe9f5552c
commit
84928184fc
2
.gitignore
vendored
2
.gitignore
vendored
@ -26,6 +26,8 @@
|
|||||||
*.swp
|
*.swp
|
||||||
*~
|
*~
|
||||||
.DS_Store
|
.DS_Store
|
||||||
|
.bloop/
|
||||||
|
.metals/
|
||||||
.cache
|
.cache
|
||||||
.classpath
|
.classpath
|
||||||
.ensime
|
.ensime
|
||||||
|
|||||||
@ -417,7 +417,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
|
|||||||
### Operation
|
### Operation
|
||||||
|
|
||||||
| Key | Default | Meaning | Type | Since |
|
| Key | Default | Meaning | Type | Since |
|
||||||
|--------------------------------------------------|---------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------|
|
|--------------------------------------------------|---------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|--------|
|
||||||
| kyuubi.operation.getTables.ignoreTableProperties | false | Speed up the `GetTables` operation by ignoring `tableTypes` query criteria, and returning table identities only. | boolean | 1.8.0 |
|
| kyuubi.operation.getTables.ignoreTableProperties | false | Speed up the `GetTables` operation by ignoring `tableTypes` query criteria, and returning table identities only. | boolean | 1.8.0 |
|
||||||
| kyuubi.operation.idle.timeout | PT3H | Operation will be closed when it's not accessed for this duration of time | duration | 1.0.0 |
|
| kyuubi.operation.idle.timeout | PT3H | Operation will be closed when it's not accessed for this duration of time | duration | 1.0.0 |
|
||||||
| kyuubi.operation.interrupt.on.cancel | true | When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished. | boolean | 1.2.0 |
|
| kyuubi.operation.interrupt.on.cancel | true | When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished. | boolean | 1.2.0 |
|
||||||
@ -438,6 +438,8 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
|
|||||||
| kyuubi.operation.scheduler.pool | <undefined> | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR. | string | 1.1.1 |
|
| kyuubi.operation.scheduler.pool | <undefined> | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR. | string | 1.1.1 |
|
||||||
| kyuubi.operation.spark.listener.enabled | true | When set to true, Spark engine registers an SQLOperationListener before executing the statement, logging a few summary statistics when each stage completes. | boolean | 1.6.0 |
|
| kyuubi.operation.spark.listener.enabled | true | When set to true, Spark engine registers an SQLOperationListener before executing the statement, logging a few summary statistics when each stage completes. | boolean | 1.6.0 |
|
||||||
| kyuubi.operation.status.polling.timeout | PT5S | Timeout(ms) for long polling asynchronous running sql query's status | duration | 1.0.0 |
|
| kyuubi.operation.status.polling.timeout | PT5S | Timeout(ms) for long polling asynchronous running sql query's status | duration | 1.0.0 |
|
||||||
|
| kyuubi.operation.timeout.pool.keepalive.time | PT1M | Keep-alive time for idle threads in the timeout scheduler pool. | duration | 1.11.0 |
|
||||||
|
| kyuubi.operation.timeout.pool.size | 8 | Number of threads in the timeout scheduler pool used for operation timeout monitoring. | int | 1.11.0 |
|
||||||
|
|
||||||
### Server
|
### Server
|
||||||
|
|
||||||
|
|||||||
@ -2212,6 +2212,20 @@ object KyuubiConf {
|
|||||||
.checkValue(_ >= 1000, "must >= 1s if set")
|
.checkValue(_ >= 1000, "must >= 1s if set")
|
||||||
.createOptional
|
.createOptional
|
||||||
|
|
||||||
|
val OPERATION_TIMEOUT_POOL_SIZE: ConfigEntry[Int] =
|
||||||
|
buildConf("kyuubi.operation.timeout.pool.size")
|
||||||
|
.doc("Number of threads in the timeout scheduler pool used for operation timeout monitoring.")
|
||||||
|
.version("1.11.0")
|
||||||
|
.intConf
|
||||||
|
.createWithDefault(8)
|
||||||
|
|
||||||
|
val OPERATION_TIMEOUT_POOL_KEEPALIVE_TIME: ConfigEntry[Long] =
|
||||||
|
buildConf("kyuubi.operation.timeout.pool.keepalive.time")
|
||||||
|
.doc("Keep-alive time for idle threads in the timeout scheduler pool.")
|
||||||
|
.version("1.11.0")
|
||||||
|
.timeConf
|
||||||
|
.createWithDefault(Duration.ofSeconds(60).toMillis)
|
||||||
|
|
||||||
val OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED: ConfigEntry[Boolean] =
|
val OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED: ConfigEntry[Boolean] =
|
||||||
buildConf("kyuubi.operation.query.timeout.monitor.enabled")
|
buildConf("kyuubi.operation.query.timeout.monitor.enabled")
|
||||||
.doc("Whether to monitor timeout query timeout check on server side.")
|
.doc("Whether to monitor timeout query timeout check on server side.")
|
||||||
|
|||||||
@ -18,7 +18,7 @@
|
|||||||
package org.apache.kyuubi.operation
|
package org.apache.kyuubi.operation
|
||||||
|
|
||||||
import java.io.IOException
|
import java.io.IOException
|
||||||
import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
|
import java.util.concurrent.{Future, ScheduledFuture}
|
||||||
import java.util.concurrent.locks.ReentrantLock
|
import java.util.concurrent.locks.ReentrantLock
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
@ -32,7 +32,6 @@ import org.apache.kyuubi.operation.OperationState._
|
|||||||
import org.apache.kyuubi.operation.log.OperationLog
|
import org.apache.kyuubi.operation.log.OperationLog
|
||||||
import org.apache.kyuubi.session.Session
|
import org.apache.kyuubi.session.Session
|
||||||
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp, TProgressUpdateResp, TProtocolVersion, TStatus, TStatusCode}
|
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp, TProgressUpdateResp, TProtocolVersion, TStatus, TStatusCode}
|
||||||
import org.apache.kyuubi.util.ThreadUtils
|
|
||||||
|
|
||||||
abstract class AbstractOperation(session: Session) extends Operation with Logging {
|
abstract class AbstractOperation(session: Session) extends Operation with Logging {
|
||||||
|
|
||||||
@ -45,7 +44,7 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
|
|||||||
|
|
||||||
final private[kyuubi] val statementId = handle.identifier.toString
|
final private[kyuubi] val statementId = handle.identifier.toString
|
||||||
|
|
||||||
private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
|
private var timeoutFuture: Option[ScheduledFuture[_]] = None
|
||||||
|
|
||||||
private val lock: ReentrantLock = new ReentrantLock()
|
private val lock: ReentrantLock = new ReentrantLock()
|
||||||
|
|
||||||
@ -60,8 +59,6 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
|
|||||||
|
|
||||||
protected def addTimeoutMonitor(queryTimeout: Long): Unit = {
|
protected def addTimeoutMonitor(queryTimeout: Long): Unit = {
|
||||||
if (queryTimeout > 0) {
|
if (queryTimeout > 0) {
|
||||||
val timeoutExecutor =
|
|
||||||
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false)
|
|
||||||
val action: Runnable = () =>
|
val action: Runnable = () =>
|
||||||
// Clients less than version 2.1 have no HIVE-4924 Patch,
|
// Clients less than version 2.1 have no HIVE-4924 Patch,
|
||||||
// no queryTimeout parameter and no TIMEOUT status.
|
// no queryTimeout parameter and no TIMEOUT status.
|
||||||
@ -74,13 +71,17 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
|
|||||||
} else {
|
} else {
|
||||||
cleanup(OperationState.TIMEOUT)
|
cleanup(OperationState.TIMEOUT)
|
||||||
}
|
}
|
||||||
timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
|
|
||||||
statementTimeoutCleaner = Some(timeoutExecutor)
|
val future = session.sessionManager.operationManager.scheduleTimeout(action, queryTimeout)
|
||||||
|
timeoutFuture = Some(future)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected def shutdownTimeoutMonitor(): Unit = {
|
protected def shutdownTimeoutMonitor(): Unit = {
|
||||||
statementTimeoutCleaner.foreach(_.shutdown())
|
timeoutFuture.foreach { future =>
|
||||||
|
session.sessionManager.operationManager.cancelTimeout(future)
|
||||||
|
}
|
||||||
|
timeoutFuture = None
|
||||||
}
|
}
|
||||||
|
|
||||||
override def getOperationLog: Option[OperationLog] = None
|
override def getOperationLog: Option[OperationLog] = None
|
||||||
|
|||||||
@ -17,6 +17,8 @@
|
|||||||
|
|
||||||
package org.apache.kyuubi.operation
|
package org.apache.kyuubi.operation
|
||||||
|
|
||||||
|
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit}
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
import org.apache.kyuubi.KyuubiSQLException
|
import org.apache.kyuubi.KyuubiSQLException
|
||||||
@ -28,6 +30,7 @@ import org.apache.kyuubi.operation.log.LogDivertAppender
|
|||||||
import org.apache.kyuubi.service.AbstractService
|
import org.apache.kyuubi.service.AbstractService
|
||||||
import org.apache.kyuubi.session.Session
|
import org.apache.kyuubi.session.Session
|
||||||
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
|
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
|
||||||
|
import org.apache.kyuubi.util.ThreadUtils
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The [[OperationManager]] manages all the operations during their lifecycle.
|
* The [[OperationManager]] manages all the operations during their lifecycle.
|
||||||
@ -40,6 +43,9 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
|
|||||||
|
|
||||||
protected def skipOperationLog: Boolean = false
|
protected def skipOperationLog: Boolean = false
|
||||||
|
|
||||||
|
/* Scheduler used for query timeout tasks */
|
||||||
|
@volatile private var timeoutScheduler: ScheduledExecutorService = _
|
||||||
|
|
||||||
def getOperationCount: Int = handleToOperation.size()
|
def getOperationCount: Int = handleToOperation.size()
|
||||||
|
|
||||||
def allOperations(): Iterable[Operation] = handleToOperation.values().asScala
|
def allOperations(): Iterable[Operation] = handleToOperation.values().asScala
|
||||||
@ -47,6 +53,32 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
|
|||||||
override def initialize(conf: KyuubiConf): Unit = {
|
override def initialize(conf: KyuubiConf): Unit = {
|
||||||
LogDivertAppender.initialize(skipOperationLog)
|
LogDivertAppender.initialize(skipOperationLog)
|
||||||
super.initialize(conf)
|
super.initialize(conf)
|
||||||
|
|
||||||
|
val timeoutPoolSize = conf.get(KyuubiConf.OPERATION_TIMEOUT_POOL_SIZE)
|
||||||
|
val timeoutPoolKeepAliveTime = conf.get(KyuubiConf.OPERATION_TIMEOUT_POOL_KEEPALIVE_TIME)
|
||||||
|
timeoutScheduler = ThreadUtils.newDaemonScheduledThreadPool(
|
||||||
|
timeoutPoolSize,
|
||||||
|
timeoutPoolKeepAliveTime,
|
||||||
|
"operation-timeout")
|
||||||
|
}
|
||||||
|
|
||||||
|
override def stop(): Unit = synchronized {
|
||||||
|
if (timeoutScheduler != null) {
|
||||||
|
ThreadUtils.shutdown(timeoutScheduler)
|
||||||
|
timeoutScheduler = null
|
||||||
|
}
|
||||||
|
super.stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Schedule a timeout task using the internal scheduler */
|
||||||
|
def scheduleTimeout(action: Runnable, timeoutSeconds: Long): ScheduledFuture[_] = {
|
||||||
|
timeoutScheduler.schedule(action, timeoutSeconds, TimeUnit.SECONDS)
|
||||||
|
}
|
||||||
|
|
||||||
|
def cancelTimeout(future: ScheduledFuture[_]): Unit = {
|
||||||
|
if (future != null && !future.isCancelled) {
|
||||||
|
future.cancel(false)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def newExecuteStatementOperation(
|
def newExecuteStatementOperation(
|
||||||
|
|||||||
@ -67,6 +67,24 @@ object ThreadUtils extends Logging {
|
|||||||
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
|
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def newDaemonScheduledThreadPool(
|
||||||
|
poolSize: Int,
|
||||||
|
keepAliveSec: Long,
|
||||||
|
prefix: String,
|
||||||
|
removeOnCancel: Boolean = true,
|
||||||
|
executeDelayedTasksAfterShutdown: Boolean = false): ScheduledThreadPoolExecutor = {
|
||||||
|
val threadFactory = new NamedThreadFactory(prefix, daemon = true)
|
||||||
|
val executor = new ScheduledThreadPoolExecutor(poolSize, threadFactory)
|
||||||
|
executor.setRemoveOnCancelPolicy(removeOnCancel)
|
||||||
|
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(executeDelayedTasksAfterShutdown)
|
||||||
|
if (keepAliveSec > 0) {
|
||||||
|
executor.setKeepAliveTime(keepAliveSec, TimeUnit.SECONDS)
|
||||||
|
executor.allowCoreThreadTimeOut(true)
|
||||||
|
}
|
||||||
|
info(s"$prefix: pool size: $poolSize, keepalive time: $keepAliveSec s")
|
||||||
|
executor
|
||||||
|
}
|
||||||
|
|
||||||
def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
|
def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
|
||||||
try {
|
try {
|
||||||
// `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
|
// `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
|
||||||
|
|||||||
@ -0,0 +1,55 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kyuubi.operation.timeout
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch
|
||||||
|
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
import org.apache.kyuubi.KyuubiFunSuite
|
||||||
|
import org.apache.kyuubi.config.KyuubiConf
|
||||||
|
import org.apache.kyuubi.operation.NoopOperationManager
|
||||||
|
|
||||||
|
class TimeoutSchedulerSuite extends KyuubiFunSuite {
|
||||||
|
|
||||||
|
test("scheduler lifecycle and functionality via OperationManager") {
|
||||||
|
val conf = new KyuubiConf()
|
||||||
|
conf.set(KyuubiConf.OPERATION_TIMEOUT_POOL_SIZE, 2)
|
||||||
|
conf.set(KyuubiConf.OPERATION_TIMEOUT_POOL_KEEPALIVE_TIME, 30000L)
|
||||||
|
|
||||||
|
val manager = new NoopOperationManager()
|
||||||
|
manager.initialize(conf)
|
||||||
|
manager.start()
|
||||||
|
|
||||||
|
val latch = new CountDownLatch(1)
|
||||||
|
val future =
|
||||||
|
manager.scheduleTimeout(new Runnable { override def run(): Unit = latch.countDown() }, 1)
|
||||||
|
|
||||||
|
// wait until latch count becomes 0
|
||||||
|
eventually(timeout(5.seconds), interval(100.millis)) {
|
||||||
|
assert(latch.getCount == 0)
|
||||||
|
}
|
||||||
|
assert(!future.isCancelled)
|
||||||
|
|
||||||
|
// Test cancellation path
|
||||||
|
val longFuture = manager.scheduleTimeout(new Runnable { override def run(): Unit = {} }, 10)
|
||||||
|
manager.cancelTimeout(longFuture)
|
||||||
|
assert(longFuture.isCancelled)
|
||||||
|
|
||||||
|
manager.stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -61,4 +61,19 @@ class ThreadUtilsSuite extends KyuubiFunSuite {
|
|||||||
service.awaitTermination(10, TimeUnit.SECONDS)
|
service.awaitTermination(10, TimeUnit.SECONDS)
|
||||||
assert(threadName startsWith "")
|
assert(threadName startsWith "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("New daemon scheduled thread pool") {
|
||||||
|
val pool = ThreadUtils.newDaemonScheduledThreadPool(2, 10, "ThreadUtilsSchedTest")
|
||||||
|
// submit a task to ensure pool operational
|
||||||
|
@volatile var ran = false
|
||||||
|
val fut = pool.schedule(
|
||||||
|
new Runnable { override def run(): Unit = ran = true },
|
||||||
|
100,
|
||||||
|
TimeUnit.MILLISECONDS)
|
||||||
|
fut.get(5, TimeUnit.SECONDS)
|
||||||
|
assert(ran)
|
||||||
|
assert(pool.getCorePoolSize == 2)
|
||||||
|
ThreadUtils.shutdown(pool)
|
||||||
|
assert(pool.isShutdown)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -147,6 +147,8 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Clean up timeout monitor when operation is cancelled
|
||||||
|
shutdownTimeoutMonitor()
|
||||||
}
|
}
|
||||||
|
|
||||||
override def close(): Unit = withLockRequired {
|
override def close(): Unit = withLockRequired {
|
||||||
@ -162,6 +164,8 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Clean up timeout monitor to prevent memory leaks
|
||||||
|
shutdownTimeoutMonitor()
|
||||||
try {
|
try {
|
||||||
// For launch engine operation, we use OperationLog to pass engine submit log but
|
// For launch engine operation, we use OperationLog to pass engine submit log but
|
||||||
// at that time we do not have remoteOpHandle
|
// at that time we do not have remoteOpHandle
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user