[KYUUBI #3512] Use dedicated ExecutionContext for EventBus async execution

### _Why are the changes needed?_

Fix https://github.com/apache/incubator-kyuubi/issues/3512

Currently, Kyuubi EventBus uses `scala.concurrent.ExecutionContext.Implicits.global` to execute async event handler. Generally, it's discouraged to use that global ec, instead, we should create dedicated ec for each workload, this pr aims to use dedicated ExecutionContext for EventBus async execution

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3524 from Yikf/ec.

Closes #3512

9757372d [yikf] Use dedicated ExecutionContext for EventBus async execution

Authored-by: yikf <yikaifei1@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
yikf 2022-09-28 13:56:48 +08:00 committed by Cheng Pan
parent c77900f47a
commit 12a85fe337
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
3 changed files with 47 additions and 2 deletions

View File

@ -265,6 +265,15 @@ kyuubi.engine.user.isolated.spark.session.idle.interval|PT1M|The interval to che
kyuubi.engine.user.isolated.spark.session.idle.timeout|PT6H|If kyuubi.engine.user.isolated.spark.session is false, we will release the spark session if its corresponding user is inactive after this configured timeout.|duration|1.6.0
### Event
Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
kyuubi.event.async.pool.keepalive.time|PT1M|Time(ms) that an idle async thread of the async event handler thread pool will wait for a new task to arrive before terminating|duration|1.7.0
kyuubi.event.async.pool.size|8|Number of threads in the async event handler thread pool|int|1.7.0
kyuubi.event.async.pool.wait.queue.size|100|Size of the wait queue for the async event handler thread pool|int|1.7.0
### Frontend
Key | Default | Meaning | Type | Since

View File

@ -2120,4 +2120,26 @@ object KyuubiConf {
_.toSet.subsetOf(Set("JSON", "JDBC", "CUSTOM")),
"Unsupported event loggers")
.createWithDefault(Seq("JSON"))
val ASYNC_EVENT_HANDLER_POLL_SIZE: ConfigEntry[Int] =
buildConf("kyuubi.event.async.pool.size")
.doc("Number of threads in the async event handler thread pool")
.version("1.7.0")
.intConf
.createWithDefault(8)
val ASYNC_EVENT_HANDLER_WAIT_QUEUE_SIZE: ConfigEntry[Int] =
buildConf("kyuubi.event.async.pool.wait.queue.size")
.doc("Size of the wait queue for the async event handler thread pool")
.version("1.7.0")
.intConf
.createWithDefault(100)
val ASYNC_EVENT_HANDLER_KEEPALIVE_TIME: ConfigEntry[Long] =
buildConf("kyuubi.event.async.pool.keepalive.time")
.doc("Time(ms) that an idle async thread of the async event handler thread pool will wait" +
" for a new task to arrive before terminating")
.version("1.7.0")
.timeConf
.createWithDefault(Duration.ofSeconds(60).toMillis)
}

View File

@ -18,13 +18,15 @@
package org.apache.kyuubi.events
import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
import scala.reflect.{classTag, ClassTag}
import scala.util.Try
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ASYNC_EVENT_HANDLER_KEEPALIVE_TIME, ASYNC_EVENT_HANDLER_POLL_SIZE, ASYNC_EVENT_HANDLER_WAIT_QUEUE_SIZE}
import org.apache.kyuubi.events.handler.EventHandler
import org.apache.kyuubi.util.ThreadUtils
/**
* The [[EventBus]] is responsible for triggering Kyuubi event, registering event handlers and
@ -42,6 +44,18 @@ sealed trait EventBus {
object EventBus extends Logging {
private val defaultEventBus = EventBusLive()
private val conf: KyuubiConf = KyuubiConf().loadFileDefaults()
private val poolSize = conf.get(ASYNC_EVENT_HANDLER_POLL_SIZE)
private val waitQueueSize = conf.get(ASYNC_EVENT_HANDLER_WAIT_QUEUE_SIZE)
private val keepAliveMs = conf.get(ASYNC_EVENT_HANDLER_KEEPALIVE_TIME)
implicit private lazy val asyncEventExecutionContext: ExecutionContextExecutorService =
ExecutionContext.fromExecutorService(ThreadUtils.newDaemonQueuedThreadPool(
poolSize,
waitQueueSize,
keepAliveMs,
"async-event-handler-pool"))
def apply(): EventBus = EventBusLive()