[KYUUBI #4777] Deregister event handlers when stopping server with event handler made auto-closeable

### _Why are the changes needed?_

- deregister event handlers when stopping Kyuubiserver, by deregister them from EventBus's handler Registry
- change `EventHandler` from `type` to `trait` and make it extending  `AutoCloseable`
- implement `close` method in `JsonLoggingEventHandler` for closing writers and streams

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4777 from bowenliang123/closable-eventlogger.

Closes #4777

db1ad5d73 [liangbowen] make EventBus.deregisterAll method synchronized
648471ba1 [liangbowen] update
d28931d3c [liangbowen] re-register event loggers in ut
7121fa33a [liangbowen] make EventHandler closable, and de-register all event handlers when stopping server

Authored-by: liangbowen <liangbowen@gf.com.cn>
Signed-off-by: liangbowen <liangbowen@gf.com.cn>
This commit is contained in:
liangbowen 2023-04-28 09:59:48 +08:00
parent 4762edc622
commit 430f6d5901
5 changed files with 40 additions and 4 deletions

View File

@ -40,6 +40,8 @@ sealed trait EventBus {
def register[T <: KyuubiEvent: ClassTag](eventHandler: EventHandler[T]): EventBus
def registerAsync[T <: KyuubiEvent: ClassTag](eventHandler: EventHandler[T]): EventBus
def deregisterAll(): Unit = {}
}
object EventBus extends Logging {
@ -68,6 +70,10 @@ object EventBus extends Logging {
def registerAsync[T <: KyuubiEvent: ClassTag](et: EventHandler[T]): EventBus =
defaultEventBus.registerAsync[T](et)
def deregisterAll(): Unit = synchronized {
defaultEventBus.deregisterAll()
}
private case class EventBusLive() extends EventBus {
private[this] lazy val eventHandlerRegistry = new Registry
private[this] lazy val asyncEventHandlerRegistry = new Registry
@ -96,6 +102,11 @@ object EventBus extends Logging {
asyncEventHandlerRegistry.register(et)
this
}
override def deregisterAll(): Unit = {
eventHandlerRegistry.deregisterAll()
asyncEventHandlerRegistry.deregisterAll()
}
}
private class Registry {
@ -122,5 +133,10 @@ object EventBus extends Logging {
} yield parent
clazz :: parents
}
def deregisterAll(): Unit = {
eventHandlers.values.flatten.foreach(_.close())
eventHandlers.clear()
}
}
}

View File

@ -65,6 +65,17 @@ class JsonLoggingEventHandler(
stream.foreach(_.hflush())
}
override def close(): Unit = {
writers.values.foreach { case (writer, stream) =>
writer.flush()
stream.foreach(_.hflush())
writer.close()
stream.foreach(_.close())
}
writers.clear()
fs = null
}
private def getOrUpdate(event: KyuubiEvent): Logger = synchronized {
val partitions = event.partitions.map(kv => s"${kv._1}=${kv._2}").mkString(Path.SEPARATOR)
writers.getOrElseUpdate(
@ -108,6 +119,7 @@ class JsonLoggingEventHandler(
}
object JsonLoggingEventHandler {
val JSON_LOG_DIR_PERM: FsPermission = new FsPermission(Integer.parseInt("770", 8).toShort)
val JSON_LOG_FILE_PERM: FsPermission = new FsPermission(Integer.parseInt("660", 8).toShort)
private val JSON_LOG_DIR_PERM: FsPermission = new FsPermission(Integer.parseInt("770", 8).toShort)
private val JSON_LOG_FILE_PERM: FsPermission =
new FsPermission(Integer.parseInt("660", 8).toShort)
}

View File

@ -18,5 +18,9 @@
package org.apache.kyuubi.events
package object handler {
type EventHandler[T <: KyuubiEvent] = T => Unit
trait EventHandler[T <: KyuubiEvent] extends AutoCloseable {
def apply(event: T): Unit
def close(): Unit = {}
}
}

View File

@ -193,5 +193,7 @@ class KyuubiServer(name: String) extends Serverable(name) {
ServerEventHandlerRegister.registerEventLoggers(conf)
}
override protected def stopServer(): Unit = {}
override protected def stopServer(): Unit = {
EventBus.deregisterAll()
}
}

View File

@ -33,6 +33,7 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi._
import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.events.ServerEventHandlerRegister
import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.operation.OperationState._
import org.apache.kyuubi.server.KyuubiServer
@ -197,6 +198,7 @@ class ServerJsonLoggingEventHandlerSuite extends WithKyuubiServer with HiveJDBCT
server.initialize(conf)
server.start()
server.stop()
ServerEventHandlerRegister.registerEventLoggers(conf) // register event loggers again
val hostName = InetAddress.getLocalHost.getCanonicalHostName
val kyuubiServerInfoPath =