From f6d5069bb619dac76c8b08ef0a2f19f52fa633f3 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 3 Aug 2021 15:51:04 +0800 Subject: [PATCH] [KYUUBI #885] Add Event for Engine ### _Why are the changes needed?_ ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request Closes #885 from yaooqinn/engineevent. Closes #885 6a7ac341 [Kent Yao] refine 41baf0e7 [Kent Yao] refine 2c141a06 [Kent Yao] test history server d1b0823a [Kent Yao] test history server 3aae8417 [Kent Yao] dump the final engine status 257e50f5 [Kent Yao] Add Event for Engine f1f1f0e8 [Kent Yao] Add Event for Engine 85addc6e [Kent Yao] Add Event for Engine cece5888 [Kent Yao] Add Event for Engine Authored-by: Kent Yao Signed-off-by: Kent Yao --- docs/deployment/settings.md | 2 + externals/kyuubi-spark-sql-engine/pom.xml | 24 ++-- .../kyuubi/engine/spark/SparkSQLEngine.scala | 54 +++++--- .../engine/spark/events/EngineEvent.scala | 118 ++++++++++++++++++ .../engine/spark/events/EventLogger.scala | 24 ++++ .../engine/spark/events/EventLoggerType.scala | 26 ++++ .../spark/events/EventLoggingService.scala | 75 +++++++++++ .../engine/spark/events/JsonEventLogger.scala | 87 +++++++++++++ .../engine/spark/events/JsonProtocol.scala | 32 +++++ .../engine/spark/events/KyuubiEvent.scala | 31 +++++ .../spark/kyuubi/SparkContextHelper.scala | 42 +++++++ .../engine/spark/WithSparkSQLEngine.scala | 3 +- .../events/EventLoggingServiceSuite.scala | 67 ++++++++++ .../org/apache/kyuubi/config/KyuubiConf.scala | 24 ++++ 14 files changed, 579 insertions(+), 30 deletions(-) create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEvent.scala create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLogger.scala create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggerType.scala create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/JsonEventLogger.scala create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/JsonProtocol.scala create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/KyuubiEvent.scala create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala create mode 100644 externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index 274d2542d..605418eba 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -146,6 +146,8 @@ kyuubi\.engine
\.deregister\.exception
\.classes|
|
A comma separated list of exception messages. If there is any exception thrown, whose message or stacktrace matches the specified message list, the engine would deregister itself.
|
seq
|
1.2.0
kyuubi\.engine
\.deregister\.exception
\.ttl|
PT30M
|
Time to live(TTL) for exceptions pattern specified in kyuubi.engine.deregister.exception.classes and kyuubi.engine.deregister.exception.messages to deregister engines. Once the total error count hits the kyuubi.engine.deregister.job.max.failures within the TTL, an engine will deregister itself and wait for self-terminated. Otherwise, we suppose that the engine has recovered from temporary failures.
|
duration
|
1.2.0
kyuubi\.engine
\.deregister\.job\.max
\.failures|
4
|
Number of failures of job before deregistering the engine.
|
int
|
1.2.0
+kyuubi\.engine\.event
\.json\.log\.path|
/tmp/kyuubi/events
|
The location of all the engine events go for the builtin JSON logger
|
string
|
1.3.0
+kyuubi\.engine\.event
\.loggers|
|
A comma separated list of engine history loggers, where engine/session/operation etc events go.
  • SPARK: the events will be written to the spark history events
  • JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
  • JDBC: to be done
  • CUSTOM: to be done.
|
seq
|
1.3.0
kyuubi\.engine
\.initialize\.sql|
SHOW DATABASES
|
SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.
|
string
|
1.2.0
kyuubi\.engine\.session
\.initialize\.sql|
SHOW DATABASES
|
SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.
|
string
|
1.3.0
kyuubi\.engine\.share
\.level|
USER
|
Engines will be shared in different levels, available configs are:
  • CONNECTION: engine will not be shared but only used by the current client connection
  • USER: engine will be shared by all sessions created by a unique username, see also kyuubi.engine.share.level.sub.domain
  • SERVER: the App will be shared by Kyuubi servers
|
string
|
1.2.0
diff --git a/externals/kyuubi-spark-sql-engine/pom.xml b/externals/kyuubi-spark-sql-engine/pom.xml index 7063cb51e..d4f519105 100644 --- a/externals/kyuubi-spark-sql-engine/pom.xml +++ b/externals/kyuubi-spark-sql-engine/pom.xml @@ -56,6 +56,18 @@ provided + + com.fasterxml.jackson.module + jackson-module-scala_${scala.binary.version} + provided + + + + com.fasterxml.jackson.core + jackson-databind + provided + + org.apache.kyuubi kyuubi-common @@ -151,18 +163,6 @@ test - - com.fasterxml.jackson.core - jackson-databind - test - - - - com.fasterxml.jackson.module - jackson-module-scala_${scala.binary.version} - test - - org.apache.kyuubi kyuubi-zookeeper diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index c8e683424..3c18cf20a 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -29,23 +29,30 @@ import org.apache.kyuubi.Logging import org.apache.kyuubi.Utils._ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch +import org.apache.kyuubi.engine.spark.events.{EngineEvent, EventLoggingService} import org.apache.kyuubi.ha.HighAvailabilityConf._ import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, RetryPolicies, ServiceDiscovery} -import org.apache.kyuubi.service.{Serverable, Service} +import org.apache.kyuubi.service.{Serverable, Service, ServiceState} import org.apache.kyuubi.util.SignalRegister case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngine") { + + lazy val engineStatus: EngineEvent = EngineEvent(this) + + private val eventLogging = new EventLoggingService(this) override val backendService = new SparkSQLBackendService(spark) + override val discoveryService: Service = new EngineServiceDiscovery(this) + override protected def supportsServiceDiscovery: Boolean = { ServiceDiscovery.supportServiceDiscovery(conf) } - override val discoveryService: Service = new EngineServiceDiscovery(this) - override def initialize(conf: KyuubiConf): Unit = { val listener = new SparkSQLEngineListener(this) spark.sparkContext.addSparkListener(listener) + addService(eventLogging) super.initialize(conf) + eventLogging.onEvent(engineStatus.copy(state = ServiceState.INITIALIZED.id)) } override def start(): Unit = { @@ -53,11 +60,22 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin // Start engine self-terminating checker after all services are ready and it can be reached by // all servers in engine spaces. backendService.sessionManager.startTerminatingChecker() + eventLogging.onEvent(engineStatus.copy(state = ServiceState.STARTED.id)) + } + + override def stop(): Unit = { + eventLogging.onEvent( + engineStatus.copy(state = ServiceState.STOPPED.id, endTime = System.currentTimeMillis())) + super.stop() } override protected def stopServer(): Unit = { countDownLatch.countDown() } + + def engineId: String = { + spark.sparkContext.applicationAttemptId.getOrElse(spark.sparkContext.applicationId) + } } object SparkSQLEngine extends Logging { @@ -114,31 +132,33 @@ object SparkSQLEngine extends Logging { session } - def startEngine(spark: SparkSession): SparkSQLEngine = { - val engine = new SparkSQLEngine(spark) - engine.initialize(kyuubiConf) - engine.start() - // Stop engine before SparkContext stopped to avoid calling a stopped SparkContext - addShutdownHook(() => engine.stop(), SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1) - currentEngine = Some(engine) - EngineTab(engine) - engine + def startEngine(spark: SparkSession): Unit = { + currentEngine = Some(new SparkSQLEngine(spark)) + currentEngine.foreach { engine => + engine.initialize(kyuubiConf) + engine.start() + // Stop engine before SparkContext stopped to avoid calling a stopped SparkContext + addShutdownHook(() => engine.stop(), SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1) + EngineTab(engine) + info(engine.engineStatus) + } } def main(args: Array[String]): Unit = { SignalRegister.registerLogger(logger) var spark: SparkSession = null - var engine: SparkSQLEngine = null try { spark = createSpark() - engine = startEngine(spark) - info(KyuubiSparkUtil.diagnostics) + startEngine(spark) // blocking main thread countDownLatch.await() } catch { case t: Throwable => - error("Error start SparkSQLEngine", t) - if (engine != null) { + currentEngine.foreach { engine => + val status = + engine.engineStatus.copy(diagnostic = s"Error State SparkSQL Engine ${t.getMessage}") + EventLoggingService.onEvent(status) + error(status, t) engine.stop() } } finally { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEvent.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEvent.scala new file mode 100644 index 000000000..6cfbcb6b4 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEvent.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.events + +import java.util.Date + +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.types.StructType + +import org.apache.kyuubi.config.KyuubiConf._ +import org.apache.kyuubi.engine.spark.SparkSQLEngine +import org.apache.kyuubi.service.ServiceState + +/** + * + * @param applicationId application id a.k.a, the unique id for engine + * @param applicationName the application name + * @param owner the application user + * @param shareLevel the share level for this engine + * @param connectionUrl the jdbc connection string + * @param master the master type, yarn, k8s, local etc. + * @param sparkVersion short version of spark distribution + * @param webUrl the tracking url of this engine + * @param startTime start time + * @param endTime end time + * @param state the engine state + * @param diagnostic caught exceptions if any + * @param settings collection of all configurations of spark and kyuubi + */ +case class EngineEvent( + applicationId: String, + attemptId: Option[String], + applicationName: String, + owner: String, + shareLevel: String, + connectionUrl: String, + master: String, + sparkVersion: String, + webUrl: String, + startTime: Long, + endTime: Long, + state: Int, + diagnostic: String, + settings: Map[String, String]) extends KyuubiEvent { + + override def eventType: String = "engine" + + override def schema: StructType = Encoders.product[EngineEvent].schema + + override def toJson: String = JsonProtocol.productToJson(this) + + override def toString: String = { + // need to consider deploy mode and cluster to get core and mem + val driverCores = settings.getOrElse("spark.driver.cores", 1) + val driverMemory = settings.getOrElse("spark.driver.memory", "1g") + val executorCore = settings.getOrElse("spark.executor.cores", 2) + val executorMemory = settings.getOrElse("spark.executor.memory", "1g") + val dae = settings.getOrElse("spark.dynamicAllocation.enabled", "false").toBoolean + val maxExecutors = if (dae) { + settings.getOrElse("spark.dynamicAllocation.maxExecutors", Int.MaxValue) + } else { + settings.getOrElse("spark.executor.instances", 2) + } + s""" + | Spark application name: $applicationName + | application ID: $applicationId + | application web UI: $webUrl + | master: $master + | version: $sparkVersion + | driver: [cpu: $driverCores, mem: $driverMemory] + | executor: [cpu: $executorCore, mem: $executorMemory MB, maxNum: $maxExecutors] + | Start time: ${new Date(startTime)} + | ${if (endTime != -1L) "End time: " + new Date(endTime) else ""} + | User: $owner (shared mode: $shareLevel) + | State: ${ServiceState(state)} + | ${if (diagnostic.nonEmpty) "Diagnostic: " + diagnostic else ""}""".stripMargin + } +} + +object EngineEvent { + + def apply(engine: SparkSQLEngine): EngineEvent = { + val sc = engine.spark.sparkContext + val webUrl = sc.getConf.getOption( + "spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES") + .orElse(sc.uiWebUrl).getOrElse("") + new EngineEvent( + sc.applicationId, + sc.applicationAttemptId, + sc.appName, + sc.sparkUser, + engine.getConf.get(ENGINE_SHARE_LEVEL), + engine.connectionUrl, + sc.master, + sc.version, + webUrl, + sc.startTime, + endTime = -1L, + state = 0, + diagnostic = "", + sc.getConf.getAll.toMap ++ engine.getConf.getAll) + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLogger.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLogger.scala new file mode 100644 index 000000000..4b39330c4 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLogger.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.events + +trait EventLogger { + + def logEvent(kyuubiEvent: KyuubiEvent): Unit + +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggerType.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggerType.scala new file mode 100644 index 000000000..690b93429 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggerType.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.events + +object EventLoggerType extends Enumeration { + + type EventLoggerType = Value + + // TODO: Only SPARK is done now + val SPARK, JSON, DB, CUSTOM = Value +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala new file mode 100644 index 000000000..95d9f32e7 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.events + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.kyuubi.SparkContextHelper + +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.engine.spark.SparkSQLEngine +import org.apache.kyuubi.engine.spark.events.EventLoggingService._service +import org.apache.kyuubi.service.CompositeService + +class EventLoggingService(engine: SparkSQLEngine) + extends CompositeService("EventLogging") { + + private val eventLoggers = new ArrayBuffer[EventLogger]() + + def onEvent(event: KyuubiEvent): Unit = { + eventLoggers.foreach(_.logEvent(event)) + } + + override def initialize(conf: KyuubiConf): Unit = { + conf.get(KyuubiConf.ENGINE_EVENT_LOGGERS) + .map(EventLoggerType.withName) + .foreach { + case EventLoggerType.SPARK => + eventLoggers += SparkContextHelper.createSparkHistoryLogger(engine.spark.sparkContext) + case EventLoggerType.JSON => + val jsonEventLogger = new JsonEventLogger(engine.engineId) + addService(jsonEventLogger) + eventLoggers += jsonEventLogger + case logger => + // TODO: Add more implementations + throw new IllegalArgumentException(s"Unrecognized event logger: $logger") + } + super.initialize(conf) + } + + override def start(): Unit = { + _service = Some(this) + super.start() + } + + override def stop(): Unit = { + _service = None + super.stop() + } + +} + +object EventLoggingService { + + private var _service: Option[EventLoggingService] = None + + def onEvent(event: KyuubiEvent): Unit = { + _service.foreach(_.onEvent(event)) + } +} + diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/JsonEventLogger.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/JsonEventLogger.scala new file mode 100644 index 000000000..ce4ab2fa5 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/JsonEventLogger.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.events + +import java.io.{IOException, PrintWriter} +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Path, Paths} +import java.nio.file.attribute.{PosixFilePermission, PosixFilePermissions} + +import org.apache.kyuubi.Logging +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.engine.spark.events.JsonEventLogger.{JSON_LOG_DIR_PERM, JSON_LOG_FILE_PERM} +import org.apache.kyuubi.service.AbstractService + +/** + * This event logger logs Kyuubi engine events in JSON file format. + * The hierarchical directory structure is {ENGINE_EVENT_JSON_LOG_PATH}/{eventType}/{logName}.json + * The {eventType} is based on core concepts of the Kyuubi systems, e.g. engine/session/statement + * @param logName the engine id formed of appId + attemptId(if any) + */ +class JsonEventLogger(logName: String) + extends AbstractService("JsonEventLogger") with EventLogger with Logging { + + private var logRoot: Path = _ + private val writers = new scala.collection.mutable.HashMap[String, PrintWriter]() + + private def getOrUpdate(event: KyuubiEvent): PrintWriter = synchronized { + writers.getOrElseUpdate(event.eventType, { + val eventDir = Files.createDirectories(Paths.get(logRoot.toString, event.eventType)) + Files.setPosixFilePermissions(eventDir, JSON_LOG_DIR_PERM) + val eventPath = Files.createFile(Paths.get(eventDir.toString, logName + ".json")) + // TODO: make it support Hadoop compatible filesystems + val newWriter = new PrintWriter(Files.newBufferedWriter(eventPath, StandardCharsets.UTF_8)) + Files.setPosixFilePermissions(eventPath, JSON_LOG_FILE_PERM) + newWriter + }) + } + + override def initialize(conf: KyuubiConf): Unit = synchronized { + logRoot = Paths.get(conf.get(KyuubiConf.ENGINE_EVENT_JSON_LOG_PATH)).toAbsolutePath + Files.setPosixFilePermissions(logRoot, PosixFilePermissions.fromString("rwxrwxr--")) + super.initialize(conf) + } + + override def stop(): Unit = synchronized { + writers.foreach { case (name, writer) => + try { + writer.close() + } catch { + case e: IOException => error(s"File to close $name's event writer", e) + } + } + super.stop() + } + + override def logEvent(kyuubiEvent: KyuubiEvent): Unit = kyuubiEvent match { + case e: EngineEvent => + val writer = getOrUpdate(e) + // scalastyle:off println + writer.println(e.toJson) + // scalastyle:on println + writer.flush() + case _ => // TODO: add extra events handling here + } +} + +object JsonEventLogger { + val JSON_LOG_DIR_PERM: java.util.Set[PosixFilePermission] = + PosixFilePermissions.fromString("rwxrwxr--") + val JSON_LOG_FILE_PERM: java.util.Set[PosixFilePermission] = + PosixFilePermissions.fromString("rwxr--r--") +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/JsonProtocol.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/JsonProtocol.scala new file mode 100644 index 000000000..be8e4f985 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/JsonProtocol.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.events + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +object JsonProtocol { + + private val mapper = new ObjectMapper().registerModule(DefaultScalaModule) + + def productToJson[T <: KyuubiEvent](value: T): String = mapper.writeValueAsString(value) + + def jsonToEvent(jsonValue: String): KyuubiEvent = { + mapper.readValue(jsonValue, classOf[KyuubiEvent]) + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/KyuubiEvent.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/KyuubiEvent.scala new file mode 100644 index 000000000..42fbca18e --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/KyuubiEvent.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.events + +import org.apache.spark.scheduler.SparkListenerEvent +import org.apache.spark.sql.types.StructType + +trait KyuubiEvent extends SparkListenerEvent { + + def eventType: String + + def schema: StructType + + def toJson: String + +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala new file mode 100644 index 000000000..4cd50a24c --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kyuubi + +import org.apache.spark.SparkContext + +import org.apache.kyuubi.engine.spark.events.{EventLogger, KyuubiEvent} + +/** + * A place to invoke non-public APIs of [[SparkContext]], anything to be added here need to + * think twice + */ +object SparkContextHelper { + def createSparkHistoryLogger(sc: SparkContext): EventLogger = { + new SparkHistoryEventLogger(sc) + } +} + +/** + * A [[EventLogger]] that logs everything to SparkHistory + * @param sc SparkContext + */ +private class SparkHistoryEventLogger(sc: SparkContext) extends EventLogger { + override def logEvent(kyuubiEvent: KyuubiEvent): Unit = { + sc.eventLogger.foreach(_.onOtherEvent(kyuubiEvent)) + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala index 605255b63..a88ff7dfa 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala @@ -54,7 +54,8 @@ trait WithSparkSQLEngine extends KyuubiFunSuite { SparkSession.clearActiveSession() SparkSession.clearDefaultSession() spark = SparkSQLEngine.createSpark() - engine = SparkSQLEngine.startEngine(spark) + SparkSQLEngine.startEngine(spark) + engine = SparkSQLEngine.currentEngine.get connectionUrl = engine.connectionUrl } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala new file mode 100644 index 000000000..f1665c57e --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.events + +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths} + +import org.apache.kyuubi.Utils +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.engine.spark.WithSparkSQLEngine +import org.apache.kyuubi.operation.JDBCTestUtils + +class EventLoggingServiceSuite extends WithSparkSQLEngine with JDBCTestUtils { + import EventLoggerType._ + + private val logRoot = Utils.createTempDir() + override def withKyuubiConf: Map[String, String] = Map( + KyuubiConf.ENGINE_EVENT_LOGGERS.key -> s"$JSON,$SPARK", + KyuubiConf.ENGINE_EVENT_JSON_LOG_PATH.key -> logRoot.toString, + "spark.eventLog.enabled" -> "true", + "spark.eventLog.dir" -> logRoot.toString + ) + + override protected def jdbcUrl: String = getJdbcUrl + + test("round-trip for event logging service") { + val engineEventPath = Paths.get(logRoot.toString, "engine", engine.engineId + ".json") + val reader = Files.newBufferedReader(engineEventPath, StandardCharsets.UTF_8) + + val readEvent = JsonProtocol.jsonToEvent(reader.readLine()) + assert(readEvent.isInstanceOf[KyuubiEvent]) + + withJdbcStatement() { statement => + val table = engineEventPath.getParent + val resultSet = statement.executeQuery(s"SELECT * FROM `json`.`${table}`") + while (resultSet.next()) { + assert(resultSet.getString("Event") === classOf[EngineEvent].getCanonicalName) + assert(resultSet.getString("applicationId") === spark.sparkContext.applicationId) + assert(resultSet.getString("master") === spark.sparkContext.master) + } + val table2 = table.getParent + val rs2 = statement.executeQuery(s"SELECT * FROM `json`.`${table2}`" + + s" where Event = '${classOf[EngineEvent].getCanonicalName}'") + while (rs2.next()) { + assert(rs2.getString("Event") === classOf[EngineEvent].getCanonicalName) + assert(rs2.getString("applicationId") === spark.sparkContext.applicationId) + assert(rs2.getString("master") === spark.sparkContext.master) + } + } + } + +} diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 36ef367a1..95f6afc2a 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -678,4 +678,28 @@ object KyuubiConf { .version("1.3.0") .booleanConf .createWithDefault(false) + + val ENGINE_EVENT_JSON_LOG_PATH: ConfigEntry[String] = + buildConf("engine.event.json.log.path") + .doc("The location of all the engine events go for the builtin JSON logger") + .version("1.3.0") + .stringConf + .createWithDefault("/tmp/kyuubi/events") + + val ENGINE_EVENT_LOGGERS: ConfigEntry[Seq[String]] = + buildConf("engine.event.loggers") + .doc("A comma separated list of engine history loggers, where engine/session/operation etc" + + " events go.
    " + + "
  • SPARK: the events will be written to the spark history events
  • " + + s"
  • JSON: the events will be written to the location of" + + s" ${ENGINE_EVENT_JSON_LOG_PATH.key}
  • " + + s"
  • JDBC: to be done
  • " + + s"
  • CUSTOM: to be done.
") + .version("1.3.0") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .toSequence + .checkValue(_.toSet.subsetOf(Set("SPARK", "JSON", "JDBC", "CUSTOM")), + "Unsupported event loggers") + .createWithDefault(Nil) }