[KYUUBI #885] Add Event for Engine

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

Closes #885 from yaooqinn/engineevent.

Closes #885

6a7ac341 [Kent Yao] refine
41baf0e7 [Kent Yao] refine
2c141a06 [Kent Yao] test history server
d1b0823a [Kent Yao] test history server
3aae8417 [Kent Yao] dump the final engine status
257e50f5 [Kent Yao] Add Event for Engine
f1f1f0e8 [Kent Yao] Add Event for Engine
85addc6e [Kent Yao] Add Event for Engine
cece5888 [Kent Yao] Add Event for Engine

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
Kent Yao 2021-08-03 15:51:04 +08:00
parent 962d721842
commit f6d5069bb6
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
14 changed files with 579 additions and 30 deletions

View File

@ -146,6 +146,8 @@ kyuubi\.engine<br>\.deregister\.exception<br>\.classes|<div style='width: 65pt;w
kyuubi\.engine<br>\.deregister\.exception<br>\.messages|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of exception messages. If there is any exception thrown, whose message or stacktrace matches the specified message list, the engine would deregister itself.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.engine<br>\.deregister\.exception<br>\.ttl|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT30M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Time to live(TTL) for exceptions pattern specified in kyuubi.engine.deregister.exception.classes and kyuubi.engine.deregister.exception.messages to deregister engines. Once the total error count hits the kyuubi.engine.deregister.job.max.failures within the TTL, an engine will deregister itself and wait for self-terminated. Otherwise, we suppose that the engine has recovered from temporary failures.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.engine<br>\.deregister\.job\.max<br>\.failures|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>4</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Number of failures of job before deregistering the engine.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.engine\.event<br>\.json\.log\.path|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>/tmp/kyuubi/events</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The location of all the engine events go for the builtin JSON logger</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.3.0</div>
kyuubi\.engine\.event<br>\.loggers|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of engine history loggers, where engine/session/operation etc events go.<ul> <li>SPARK: the events will be written to the spark history events</li> <li>JSON: the events will be written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be done</li> <li>CUSTOM: to be done.</li></ul></div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.3.0</div>
kyuubi\.engine<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SHOW DATABASES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.engine\.session<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SHOW DATABASES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.3.0</div>
kyuubi\.engine\.share<br>\.level|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>USER</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Engines will be shared in different levels, available configs are: <ul> <li>CONNECTION: engine will not be shared but only used by the current client connection</li> <li>USER: engine will be shared by all sessions created by a unique username, see also kyuubi.engine.share.level.sub.domain</li> <li>SERVER: the App will be shared by Kyuubi servers</li></ul></div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.2.0</div>

View File

@ -56,6 +56,18 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common</artifactId>
@ -151,18 +163,6 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-zookeeper</artifactId>

View File

@ -29,23 +29,30 @@ import org.apache.kyuubi.Logging
import org.apache.kyuubi.Utils._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EventLoggingService}
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, RetryPolicies, ServiceDiscovery}
import org.apache.kyuubi.service.{Serverable, Service}
import org.apache.kyuubi.service.{Serverable, Service, ServiceState}
import org.apache.kyuubi.util.SignalRegister
case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngine") {
lazy val engineStatus: EngineEvent = EngineEvent(this)
private val eventLogging = new EventLoggingService(this)
override val backendService = new SparkSQLBackendService(spark)
override val discoveryService: Service = new EngineServiceDiscovery(this)
override protected def supportsServiceDiscovery: Boolean = {
ServiceDiscovery.supportServiceDiscovery(conf)
}
override val discoveryService: Service = new EngineServiceDiscovery(this)
override def initialize(conf: KyuubiConf): Unit = {
val listener = new SparkSQLEngineListener(this)
spark.sparkContext.addSparkListener(listener)
addService(eventLogging)
super.initialize(conf)
eventLogging.onEvent(engineStatus.copy(state = ServiceState.INITIALIZED.id))
}
override def start(): Unit = {
@ -53,11 +60,22 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
// Start engine self-terminating checker after all services are ready and it can be reached by
// all servers in engine spaces.
backendService.sessionManager.startTerminatingChecker()
eventLogging.onEvent(engineStatus.copy(state = ServiceState.STARTED.id))
}
override def stop(): Unit = {
eventLogging.onEvent(
engineStatus.copy(state = ServiceState.STOPPED.id, endTime = System.currentTimeMillis()))
super.stop()
}
override protected def stopServer(): Unit = {
countDownLatch.countDown()
}
def engineId: String = {
spark.sparkContext.applicationAttemptId.getOrElse(spark.sparkContext.applicationId)
}
}
object SparkSQLEngine extends Logging {
@ -114,31 +132,33 @@ object SparkSQLEngine extends Logging {
session
}
def startEngine(spark: SparkSession): SparkSQLEngine = {
val engine = new SparkSQLEngine(spark)
engine.initialize(kyuubiConf)
engine.start()
// Stop engine before SparkContext stopped to avoid calling a stopped SparkContext
addShutdownHook(() => engine.stop(), SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1)
currentEngine = Some(engine)
EngineTab(engine)
engine
def startEngine(spark: SparkSession): Unit = {
currentEngine = Some(new SparkSQLEngine(spark))
currentEngine.foreach { engine =>
engine.initialize(kyuubiConf)
engine.start()
// Stop engine before SparkContext stopped to avoid calling a stopped SparkContext
addShutdownHook(() => engine.stop(), SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1)
EngineTab(engine)
info(engine.engineStatus)
}
}
def main(args: Array[String]): Unit = {
SignalRegister.registerLogger(logger)
var spark: SparkSession = null
var engine: SparkSQLEngine = null
try {
spark = createSpark()
engine = startEngine(spark)
info(KyuubiSparkUtil.diagnostics)
startEngine(spark)
// blocking main thread
countDownLatch.await()
} catch {
case t: Throwable =>
error("Error start SparkSQLEngine", t)
if (engine != null) {
currentEngine.foreach { engine =>
val status =
engine.engineStatus.copy(diagnostic = s"Error State SparkSQL Engine ${t.getMessage}")
EventLoggingService.onEvent(status)
error(status, t)
engine.stop()
}
} finally {

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.events
import java.util.Date
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.spark.SparkSQLEngine
import org.apache.kyuubi.service.ServiceState
/**
*
* @param applicationId application id a.k.a, the unique id for engine
* @param applicationName the application name
* @param owner the application user
* @param shareLevel the share level for this engine
* @param connectionUrl the jdbc connection string
* @param master the master type, yarn, k8s, local etc.
* @param sparkVersion short version of spark distribution
* @param webUrl the tracking url of this engine
* @param startTime start time
* @param endTime end time
* @param state the engine state
* @param diagnostic caught exceptions if any
* @param settings collection of all configurations of spark and kyuubi
*/
case class EngineEvent(
applicationId: String,
attemptId: Option[String],
applicationName: String,
owner: String,
shareLevel: String,
connectionUrl: String,
master: String,
sparkVersion: String,
webUrl: String,
startTime: Long,
endTime: Long,
state: Int,
diagnostic: String,
settings: Map[String, String]) extends KyuubiEvent {
override def eventType: String = "engine"
override def schema: StructType = Encoders.product[EngineEvent].schema
override def toJson: String = JsonProtocol.productToJson(this)
override def toString: String = {
// need to consider deploy mode and cluster to get core and mem
val driverCores = settings.getOrElse("spark.driver.cores", 1)
val driverMemory = settings.getOrElse("spark.driver.memory", "1g")
val executorCore = settings.getOrElse("spark.executor.cores", 2)
val executorMemory = settings.getOrElse("spark.executor.memory", "1g")
val dae = settings.getOrElse("spark.dynamicAllocation.enabled", "false").toBoolean
val maxExecutors = if (dae) {
settings.getOrElse("spark.dynamicAllocation.maxExecutors", Int.MaxValue)
} else {
settings.getOrElse("spark.executor.instances", 2)
}
s"""
| Spark application name: $applicationName
| application ID: $applicationId
| application web UI: $webUrl
| master: $master
| version: $sparkVersion
| driver: [cpu: $driverCores, mem: $driverMemory]
| executor: [cpu: $executorCore, mem: $executorMemory MB, maxNum: $maxExecutors]
| Start time: ${new Date(startTime)}
| ${if (endTime != -1L) "End time: " + new Date(endTime) else ""}
| User: $owner (shared mode: $shareLevel)
| State: ${ServiceState(state)}
| ${if (diagnostic.nonEmpty) "Diagnostic: " + diagnostic else ""}""".stripMargin
}
}
object EngineEvent {
def apply(engine: SparkSQLEngine): EngineEvent = {
val sc = engine.spark.sparkContext
val webUrl = sc.getConf.getOption(
"spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES")
.orElse(sc.uiWebUrl).getOrElse("")
new EngineEvent(
sc.applicationId,
sc.applicationAttemptId,
sc.appName,
sc.sparkUser,
engine.getConf.get(ENGINE_SHARE_LEVEL),
engine.connectionUrl,
sc.master,
sc.version,
webUrl,
sc.startTime,
endTime = -1L,
state = 0,
diagnostic = "",
sc.getConf.getAll.toMap ++ engine.getConf.getAll)
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.events
trait EventLogger {
def logEvent(kyuubiEvent: KyuubiEvent): Unit
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.events
object EventLoggerType extends Enumeration {
type EventLoggerType = Value
// TODO: Only SPARK is done now
val SPARK, JSON, DB, CUSTOM = Value
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.events
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.kyuubi.SparkContextHelper
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.SparkSQLEngine
import org.apache.kyuubi.engine.spark.events.EventLoggingService._service
import org.apache.kyuubi.service.CompositeService
class EventLoggingService(engine: SparkSQLEngine)
extends CompositeService("EventLogging") {
private val eventLoggers = new ArrayBuffer[EventLogger]()
def onEvent(event: KyuubiEvent): Unit = {
eventLoggers.foreach(_.logEvent(event))
}
override def initialize(conf: KyuubiConf): Unit = {
conf.get(KyuubiConf.ENGINE_EVENT_LOGGERS)
.map(EventLoggerType.withName)
.foreach {
case EventLoggerType.SPARK =>
eventLoggers += SparkContextHelper.createSparkHistoryLogger(engine.spark.sparkContext)
case EventLoggerType.JSON =>
val jsonEventLogger = new JsonEventLogger(engine.engineId)
addService(jsonEventLogger)
eventLoggers += jsonEventLogger
case logger =>
// TODO: Add more implementations
throw new IllegalArgumentException(s"Unrecognized event logger: $logger")
}
super.initialize(conf)
}
override def start(): Unit = {
_service = Some(this)
super.start()
}
override def stop(): Unit = {
_service = None
super.stop()
}
}
object EventLoggingService {
private var _service: Option[EventLoggingService] = None
def onEvent(event: KyuubiEvent): Unit = {
_service.foreach(_.onEvent(event))
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.events
import java.io.{IOException, PrintWriter}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths}
import java.nio.file.attribute.{PosixFilePermission, PosixFilePermissions}
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.events.JsonEventLogger.{JSON_LOG_DIR_PERM, JSON_LOG_FILE_PERM}
import org.apache.kyuubi.service.AbstractService
/**
* This event logger logs Kyuubi engine events in JSON file format.
* The hierarchical directory structure is {ENGINE_EVENT_JSON_LOG_PATH}/{eventType}/{logName}.json
* The {eventType} is based on core concepts of the Kyuubi systems, e.g. engine/session/statement
* @param logName the engine id formed of appId + attemptId(if any)
*/
class JsonEventLogger(logName: String)
extends AbstractService("JsonEventLogger") with EventLogger with Logging {
private var logRoot: Path = _
private val writers = new scala.collection.mutable.HashMap[String, PrintWriter]()
private def getOrUpdate(event: KyuubiEvent): PrintWriter = synchronized {
writers.getOrElseUpdate(event.eventType, {
val eventDir = Files.createDirectories(Paths.get(logRoot.toString, event.eventType))
Files.setPosixFilePermissions(eventDir, JSON_LOG_DIR_PERM)
val eventPath = Files.createFile(Paths.get(eventDir.toString, logName + ".json"))
// TODO: make it support Hadoop compatible filesystems
val newWriter = new PrintWriter(Files.newBufferedWriter(eventPath, StandardCharsets.UTF_8))
Files.setPosixFilePermissions(eventPath, JSON_LOG_FILE_PERM)
newWriter
})
}
override def initialize(conf: KyuubiConf): Unit = synchronized {
logRoot = Paths.get(conf.get(KyuubiConf.ENGINE_EVENT_JSON_LOG_PATH)).toAbsolutePath
Files.setPosixFilePermissions(logRoot, PosixFilePermissions.fromString("rwxrwxr--"))
super.initialize(conf)
}
override def stop(): Unit = synchronized {
writers.foreach { case (name, writer) =>
try {
writer.close()
} catch {
case e: IOException => error(s"File to close $name's event writer", e)
}
}
super.stop()
}
override def logEvent(kyuubiEvent: KyuubiEvent): Unit = kyuubiEvent match {
case e: EngineEvent =>
val writer = getOrUpdate(e)
// scalastyle:off println
writer.println(e.toJson)
// scalastyle:on println
writer.flush()
case _ => // TODO: add extra events handling here
}
}
object JsonEventLogger {
val JSON_LOG_DIR_PERM: java.util.Set[PosixFilePermission] =
PosixFilePermissions.fromString("rwxrwxr--")
val JSON_LOG_FILE_PERM: java.util.Set[PosixFilePermission] =
PosixFilePermissions.fromString("rwxr--r--")
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.events
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
object JsonProtocol {
private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
def productToJson[T <: KyuubiEvent](value: T): String = mapper.writeValueAsString(value)
def jsonToEvent(jsonValue: String): KyuubiEvent = {
mapper.readValue(jsonValue, classOf[KyuubiEvent])
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.events
import org.apache.spark.scheduler.SparkListenerEvent
import org.apache.spark.sql.types.StructType
trait KyuubiEvent extends SparkListenerEvent {
def eventType: String
def schema: StructType
def toJson: String
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.kyuubi
import org.apache.spark.SparkContext
import org.apache.kyuubi.engine.spark.events.{EventLogger, KyuubiEvent}
/**
* A place to invoke non-public APIs of [[SparkContext]], anything to be added here need to
* think twice
*/
object SparkContextHelper {
def createSparkHistoryLogger(sc: SparkContext): EventLogger = {
new SparkHistoryEventLogger(sc)
}
}
/**
* A [[EventLogger]] that logs everything to SparkHistory
* @param sc SparkContext
*/
private class SparkHistoryEventLogger(sc: SparkContext) extends EventLogger {
override def logEvent(kyuubiEvent: KyuubiEvent): Unit = {
sc.eventLogger.foreach(_.onOtherEvent(kyuubiEvent))
}
}

View File

@ -54,7 +54,8 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
spark = SparkSQLEngine.createSpark()
engine = SparkSQLEngine.startEngine(spark)
SparkSQLEngine.startEngine(spark)
engine = SparkSQLEngine.currentEngine.get
connectionUrl = engine.connectionUrl
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.events
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
import org.apache.kyuubi.operation.JDBCTestUtils
class EventLoggingServiceSuite extends WithSparkSQLEngine with JDBCTestUtils {
import EventLoggerType._
private val logRoot = Utils.createTempDir()
override def withKyuubiConf: Map[String, String] = Map(
KyuubiConf.ENGINE_EVENT_LOGGERS.key -> s"$JSON,$SPARK",
KyuubiConf.ENGINE_EVENT_JSON_LOG_PATH.key -> logRoot.toString,
"spark.eventLog.enabled" -> "true",
"spark.eventLog.dir" -> logRoot.toString
)
override protected def jdbcUrl: String = getJdbcUrl
test("round-trip for event logging service") {
val engineEventPath = Paths.get(logRoot.toString, "engine", engine.engineId + ".json")
val reader = Files.newBufferedReader(engineEventPath, StandardCharsets.UTF_8)
val readEvent = JsonProtocol.jsonToEvent(reader.readLine())
assert(readEvent.isInstanceOf[KyuubiEvent])
withJdbcStatement() { statement =>
val table = engineEventPath.getParent
val resultSet = statement.executeQuery(s"SELECT * FROM `json`.`${table}`")
while (resultSet.next()) {
assert(resultSet.getString("Event") === classOf[EngineEvent].getCanonicalName)
assert(resultSet.getString("applicationId") === spark.sparkContext.applicationId)
assert(resultSet.getString("master") === spark.sparkContext.master)
}
val table2 = table.getParent
val rs2 = statement.executeQuery(s"SELECT * FROM `json`.`${table2}`" +
s" where Event = '${classOf[EngineEvent].getCanonicalName}'")
while (rs2.next()) {
assert(rs2.getString("Event") === classOf[EngineEvent].getCanonicalName)
assert(rs2.getString("applicationId") === spark.sparkContext.applicationId)
assert(rs2.getString("master") === spark.sparkContext.master)
}
}
}
}

View File

@ -678,4 +678,28 @@ object KyuubiConf {
.version("1.3.0")
.booleanConf
.createWithDefault(false)
val ENGINE_EVENT_JSON_LOG_PATH: ConfigEntry[String] =
buildConf("engine.event.json.log.path")
.doc("The location of all the engine events go for the builtin JSON logger")
.version("1.3.0")
.stringConf
.createWithDefault("/tmp/kyuubi/events")
val ENGINE_EVENT_LOGGERS: ConfigEntry[Seq[String]] =
buildConf("engine.event.loggers")
.doc("A comma separated list of engine history loggers, where engine/session/operation etc" +
" events go.<ul>" +
" <li>SPARK: the events will be written to the spark history events</li>" +
s" <li>JSON: the events will be written to the location of" +
s" ${ENGINE_EVENT_JSON_LOG_PATH.key}</li>" +
s" <li>JDBC: to be done</li>" +
s" <li>CUSTOM: to be done.</li></ul>")
.version("1.3.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.toSequence
.checkValue(_.toSet.subsetOf(Set("SPARK", "JSON", "JDBC", "CUSTOM")),
"Unsupported event loggers")
.createWithDefault(Nil)
}