From 430f6d5901cfb79267db93dffaa3dc3bb598592f Mon Sep 17 00:00:00 2001 From: liangbowen Date: Fri, 28 Apr 2023 09:59:48 +0800 Subject: [PATCH] [KYUUBI #4777] Deregister event handlers when stopping server with event handler made auto-closeable ### _Why are the changes needed?_ - deregister event handlers when stopping Kyuubiserver, by deregister them from EventBus's handler Registry - change `EventHandler` from `type` to `trait` and make it extending `AutoCloseable` - implement `close` method in `JsonLoggingEventHandler` for closing writers and streams ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4777 from bowenliang123/closable-eventlogger. Closes #4777 db1ad5d73 [liangbowen] make EventBus.deregisterAll method synchronized 648471ba1 [liangbowen] update d28931d3c [liangbowen] re-register event loggers in ut 7121fa33a [liangbowen] make EventHandler closable, and de-register all event handlers when stopping server Authored-by: liangbowen Signed-off-by: liangbowen --- .../org/apache/kyuubi/events/EventBus.scala | 16 ++++++++++++++++ .../events/handler/JsonLoggingEventHandler.scala | 16 ++++++++++++++-- .../apache/kyuubi/events/handler/package.scala | 6 +++++- .../org/apache/kyuubi/server/KyuubiServer.scala | 4 +++- .../ServerJsonLoggingEventHandlerSuite.scala | 2 ++ 5 files changed, 40 insertions(+), 4 deletions(-) diff --git a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventBus.scala b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventBus.scala index e854e40a7..063f1719e 100644 --- a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventBus.scala +++ b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventBus.scala @@ -40,6 +40,8 @@ sealed trait EventBus { def register[T <: KyuubiEvent: ClassTag](eventHandler: EventHandler[T]): EventBus def registerAsync[T <: KyuubiEvent: ClassTag](eventHandler: EventHandler[T]): EventBus + + def deregisterAll(): Unit = {} } object EventBus extends Logging { @@ -68,6 +70,10 @@ object EventBus extends Logging { def registerAsync[T <: KyuubiEvent: ClassTag](et: EventHandler[T]): EventBus = defaultEventBus.registerAsync[T](et) + def deregisterAll(): Unit = synchronized { + defaultEventBus.deregisterAll() + } + private case class EventBusLive() extends EventBus { private[this] lazy val eventHandlerRegistry = new Registry private[this] lazy val asyncEventHandlerRegistry = new Registry @@ -96,6 +102,11 @@ object EventBus extends Logging { asyncEventHandlerRegistry.register(et) this } + + override def deregisterAll(): Unit = { + eventHandlerRegistry.deregisterAll() + asyncEventHandlerRegistry.deregisterAll() + } } private class Registry { @@ -122,5 +133,10 @@ object EventBus extends Logging { } yield parent clazz :: parents } + + def deregisterAll(): Unit = { + eventHandlers.values.flatten.foreach(_.close()) + eventHandlers.clear() + } } } diff --git a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/JsonLoggingEventHandler.scala b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/JsonLoggingEventHandler.scala index f6f74de9a..77d80b152 100644 --- a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/JsonLoggingEventHandler.scala +++ b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/JsonLoggingEventHandler.scala @@ -65,6 +65,17 @@ class JsonLoggingEventHandler( stream.foreach(_.hflush()) } + override def close(): Unit = { + writers.values.foreach { case (writer, stream) => + writer.flush() + stream.foreach(_.hflush()) + writer.close() + stream.foreach(_.close()) + } + writers.clear() + fs = null + } + private def getOrUpdate(event: KyuubiEvent): Logger = synchronized { val partitions = event.partitions.map(kv => s"${kv._1}=${kv._2}").mkString(Path.SEPARATOR) writers.getOrElseUpdate( @@ -108,6 +119,7 @@ class JsonLoggingEventHandler( } object JsonLoggingEventHandler { - val JSON_LOG_DIR_PERM: FsPermission = new FsPermission(Integer.parseInt("770", 8).toShort) - val JSON_LOG_FILE_PERM: FsPermission = new FsPermission(Integer.parseInt("660", 8).toShort) + private val JSON_LOG_DIR_PERM: FsPermission = new FsPermission(Integer.parseInt("770", 8).toShort) + private val JSON_LOG_FILE_PERM: FsPermission = + new FsPermission(Integer.parseInt("660", 8).toShort) } diff --git a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/package.scala b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/package.scala index 41cf001ed..69e1fdcee 100644 --- a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/package.scala +++ b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/package.scala @@ -18,5 +18,9 @@ package org.apache.kyuubi.events package object handler { - type EventHandler[T <: KyuubiEvent] = T => Unit + trait EventHandler[T <: KyuubiEvent] extends AutoCloseable { + def apply(event: T): Unit + + def close(): Unit = {} + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala index a7f2e8178..8bcd8d084 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala @@ -193,5 +193,7 @@ class KyuubiServer(name: String) extends Serverable(name) { ServerEventHandlerRegister.registerEventLoggers(conf) } - override protected def stopServer(): Unit = {} + override protected def stopServer(): Unit = { + EventBus.deregisterAll() + } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala index 3bdc9cd38..7c79d6a87 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala @@ -33,6 +33,7 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime import org.apache.kyuubi._ import org.apache.kyuubi.client.util.BatchUtils._ import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.events.ServerEventHandlerRegister import org.apache.kyuubi.operation.HiveJDBCTestHelper import org.apache.kyuubi.operation.OperationState._ import org.apache.kyuubi.server.KyuubiServer @@ -197,6 +198,7 @@ class ServerJsonLoggingEventHandlerSuite extends WithKyuubiServer with HiveJDBCT server.initialize(conf) server.start() server.stop() + ServerEventHandlerRegister.registerEventLoggers(conf) // register event loggers again val hostName = InetAddress.getLocalHost.getCanonicalHostName val kyuubiServerInfoPath =