diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index 1ac4f5d39..a845b4472 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -265,6 +265,15 @@ kyuubi.engine.user.isolated.spark.session.idle.interval|PT1M|The interval to che kyuubi.engine.user.isolated.spark.session.idle.timeout|PT6H|If kyuubi.engine.user.isolated.spark.session is false, we will release the spark session if its corresponding user is inactive after this configured timeout.|duration|1.6.0 +### Event + +Key | Default | Meaning | Type | Since +--- | --- | --- | --- | --- +kyuubi.event.async.pool.keepalive.time|PT1M|Time(ms) that an idle async thread of the async event handler thread pool will wait for a new task to arrive before terminating|duration|1.7.0 +kyuubi.event.async.pool.size|8|Number of threads in the async event handler thread pool|int|1.7.0 +kyuubi.event.async.pool.wait.queue.size|100|Size of the wait queue for the async event handler thread pool|int|1.7.0 + + ### Frontend Key | Default | Meaning | Type | Since diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 8400613e7..d6ec9f86a 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -2120,4 +2120,26 @@ object KyuubiConf { _.toSet.subsetOf(Set("JSON", "JDBC", "CUSTOM")), "Unsupported event loggers") .createWithDefault(Seq("JSON")) + + val ASYNC_EVENT_HANDLER_POLL_SIZE: ConfigEntry[Int] = + buildConf("kyuubi.event.async.pool.size") + .doc("Number of threads in the async event handler thread pool") + .version("1.7.0") + .intConf + .createWithDefault(8) + + val ASYNC_EVENT_HANDLER_WAIT_QUEUE_SIZE: ConfigEntry[Int] = + buildConf("kyuubi.event.async.pool.wait.queue.size") + .doc("Size of the wait queue for the async event handler thread pool") + .version("1.7.0") + .intConf + .createWithDefault(100) + + val ASYNC_EVENT_HANDLER_KEEPALIVE_TIME: ConfigEntry[Long] = + buildConf("kyuubi.event.async.pool.keepalive.time") + .doc("Time(ms) that an idle async thread of the async event handler thread pool will wait" + + " for a new task to arrive before terminating") + .version("1.7.0") + .timeConf + .createWithDefault(Duration.ofSeconds(60).toMillis) } diff --git a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventBus.scala b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventBus.scala index 9e127f82e..e854e40a7 100644 --- a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventBus.scala +++ b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventBus.scala @@ -18,13 +18,15 @@ package org.apache.kyuubi.events import scala.collection.mutable -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} import scala.reflect.{classTag, ClassTag} import scala.util.Try import org.apache.kyuubi.Logging +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.{ASYNC_EVENT_HANDLER_KEEPALIVE_TIME, ASYNC_EVENT_HANDLER_POLL_SIZE, ASYNC_EVENT_HANDLER_WAIT_QUEUE_SIZE} import org.apache.kyuubi.events.handler.EventHandler +import org.apache.kyuubi.util.ThreadUtils /** * The [[EventBus]] is responsible for triggering Kyuubi event, registering event handlers and @@ -42,6 +44,18 @@ sealed trait EventBus { object EventBus extends Logging { private val defaultEventBus = EventBusLive() + private val conf: KyuubiConf = KyuubiConf().loadFileDefaults() + + private val poolSize = conf.get(ASYNC_EVENT_HANDLER_POLL_SIZE) + private val waitQueueSize = conf.get(ASYNC_EVENT_HANDLER_WAIT_QUEUE_SIZE) + private val keepAliveMs = conf.get(ASYNC_EVENT_HANDLER_KEEPALIVE_TIME) + + implicit private lazy val asyncEventExecutionContext: ExecutionContextExecutorService = + ExecutionContext.fromExecutorService(ThreadUtils.newDaemonQueuedThreadPool( + poolSize, + waitQueueSize, + keepAliveMs, + "async-event-handler-pool")) def apply(): EventBus = EventBusLive()