[KYUUBI #1601] Align SparkStatementEvent to KyuubiOperationEvent

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

Align SparkStatementEvent to KyuubiOperationEvent

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1601 from yaooqinn/soe.

Closes #1601

090dee8f [Kent Yao] Merge branch 'master' of github.com:apache/incubator-kyuubi into soe
a61ce3b7 [Kent Yao] Align SparkStatementEvent to KyuubiOperationEven
efbbdf79 [Kent Yao] Align SparkStatementEvent to KyuubiOperationEven

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
This commit is contained in:
Kent Yao 2021-12-23 16:10:43 +08:00 committed by ulysses-you
parent 152e394016
commit 1b48b1874f
No known key found for this signature in database
GPG Key ID: 4C500BC62D576766
14 changed files with 257 additions and 150 deletions

View File

@ -570,31 +570,31 @@ object DataGenerator {
$"web_tax_percentage" .decimal(5, 2))
// format: on
Seq(
store_sales,
store_returns,
catalog_sales,
catalog_returns,
web_sales,
web_returns,
inventory,
catalog_page,
call_center,
customer,
customer_address,
customer_demographics,
household_demographics,
store,
warehouse,
item,
income_band,
web_site,
web_page,
date_dim,
time_dim,
ship_mode,
reason,
promotion)
Seq(
store_sales,
store_returns,
catalog_sales,
catalog_returns,
web_sales,
web_returns,
inventory,
catalog_page,
call_center,
customer,
customer_address,
customer_demographics,
household_demographics,
store,
warehouse,
item,
income_band,
web_site,
web_page,
date_dim,
time_dim,
ship_mode,
reason,
promotion)
}
def run(config: RunConfig): Unit = {

View File

@ -136,10 +136,10 @@ abstract class FlinkOperation(
setOperationException(KyuubiSQLException(errMsg))
warn(s"Ignore exception in terminal state with $statementId: $errMsg")
} else {
setState(OperationState.ERROR)
error(s"Error operating $opType: $errMsg", e)
val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e)
setOperationException(ke)
setState(OperationState.ERROR)
throw ke
}
}

View File

@ -92,23 +92,23 @@ class EngineEventsStore(conf: KyuubiConf) {
/**
* store all statements events.
*/
val statements = new ConcurrentHashMap[String, SparkStatementEvent]
val statements = new ConcurrentHashMap[String, SparkOperationEvent]
/**
* get all statement events order by startTime
*/
def getStatementList: Seq[SparkStatementEvent] = {
def getStatementList: Seq[SparkOperationEvent] = {
statements.values().asScala.toSeq.sortBy(_.createTime)
}
def getStatement(statementId: String): Option[SparkStatementEvent] = {
def getStatement(statementId: String): Option[SparkOperationEvent] = {
Option(statements.get(statementId))
}
/**
* save statement events and check the capacity threshold
*/
def saveStatement(statementEvent: SparkStatementEvent): Unit = {
def saveStatement(statementEvent: SparkOperationEvent): Unit = {
statements.put(statementEvent.statementId, statementEvent)
checkStatementCapacity()
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.events
import org.apache.spark.sql.{DataFrame, Encoders}
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.Utils
import org.apache.kyuubi.engine.spark.operation.SparkOperation
/**
* A [[SparkOperationEvent]] used to tracker the lifecycle of an operation at Spark SQL Engine side.
* <ul>
* <li>Operation Basis</li>
* <li>Operation Live Status</li>
* <li>Parent Session Id</li>
* </ul>
*
* @param statementId the unique identifier of a single operation
* @param statement the sql that you execute
* @param shouldRunAsync the flag indicating whether the query runs synchronously or not
* @param state the current operation state
* @param eventTime the time when the event created & logged
* @param createTime the time for changing to the current operation state
* @param startTime the time the query start to time of this operation
* @param completeTime time time the query ends
* @param exception: caught exception if have
* @param sessionId the identifier of the parent session
* @param sessionUser the authenticated client user
* @param queryExecution the query execution of this operation
*/
case class SparkOperationEvent(
statementId: String,
statement: String,
shouldRunAsync: Boolean,
state: String,
eventTime: Long,
createTime: Long,
startTime: Long,
completeTime: Long,
exception: Option[Throwable],
sessionId: String,
sessionUser: String,
queryExecution: String) extends KyuubiSparkEvent {
override def schema: StructType = Encoders.product[SparkOperationEvent].schema
override def partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(createTime)) :: Nil
def duration: Long = {
if (completeTime == -1L) {
System.currentTimeMillis - createTime
} else {
completeTime - createTime
}
}
}
object SparkOperationEvent {
def apply(operation: SparkOperation, result: Option[DataFrame] = None): SparkOperationEvent = {
val session = operation.getSession
val status = operation.getStatus
new SparkOperationEvent(
operation.statementId,
operation.statement,
operation.shouldRunAsync,
status.state.name(),
status.lastModified,
status.create,
status.start,
status.completed,
status.exception,
session.handle.identifier.toString,
session.user,
result.map(_.queryExecution.toString).orNull)
}
}

View File

@ -1,60 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.events
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.Utils
/**
* @param statementId: the identifier of operationHandler
* @param statement: the sql that you execute
* @param appId: application id a.k.a, the unique id for engine
* @param sessionId: the identifier of a session
* @param createTime: the create time of this statement
* @param state: store each state that the sql has
* @param eventTime: the time that the sql's state change
* @param queryExecution: contains logicPlan and physicalPlan
* @param exception: caught exception if have
*/
case class SparkStatementEvent(
username: String,
statementId: String,
statement: String,
appId: String,
sessionId: String,
createTime: Long,
var state: String,
var eventTime: Long,
var completeTime: Long = -1L,
var queryExecution: String = "",
var exception: String = "") extends KyuubiSparkEvent {
override def schema: StructType = Encoders.product[SparkStatementEvent].schema
override def partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(createTime)) :: Nil
def duration: Long = {
if (completeTime == -1L) {
System.currentTimeMillis - createTime
} else {
completeTime - createTime
}
}
}

View File

@ -27,7 +27,7 @@ import org.apache.spark.sql.types._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.events.{EventLoggingService, SparkStatementEvent}
import org.apache.kyuubi.engine.spark.events.{EventLoggingService, SparkOperationEvent}
import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator, OperationState, OperationType}
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
@ -49,16 +49,7 @@ class ExecuteStatement(
private val operationListener: SQLOperationListener = new SQLOperationListener(this, spark)
val statementEvent: SparkStatementEvent = SparkStatementEvent(
session.user,
statementId,
statement,
spark.sparkContext.applicationId,
session.handle.identifier.toString,
lastAccessTime,
state.toString,
lastAccessTime)
EventLoggingService.onEvent(statementEvent)
EventLoggingService.onEvent(SparkOperationEvent(this))
override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
@ -87,7 +78,6 @@ class ExecuteStatement(
spark.sparkContext.addSparkListener(operationListener)
result = spark.sql(statement)
// TODO #921: COMPILED need consider eagerly executed commands
statementEvent.queryExecution = result.queryExecution.toString()
setState(OperationState.COMPILED)
debug(result.queryExecution)
iter =
@ -156,17 +146,6 @@ class ExecuteStatement(
override def setState(newState: OperationState): Unit = {
super.setState(newState)
statementEvent.state = newState.toString
statementEvent.eventTime = lastAccessTime
if (newState == OperationState.ERROR || newState == OperationState.FINISHED) {
statementEvent.completeTime = System.currentTimeMillis()
}
EventLoggingService.onEvent(statementEvent)
}
override def setOperationException(opEx: KyuubiSQLException): Unit = {
super.setOperationException(opEx)
statementEvent.exception = opEx.toString
EventLoggingService.onEvent(statementEvent)
EventLoggingService.onEvent(SparkOperationEvent(this, Option(result)))
}
}

View File

@ -130,10 +130,10 @@ abstract class SparkOperation(opType: OperationType, session: Session)
setOperationException(KyuubiSQLException(errMsg))
warn(s"Ignore exception in terminal state with $statementId: $errMsg")
} else {
setState(OperationState.ERROR)
error(s"Error operating $opType: $errMsg", e)
val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e)
setOperationException(ke)
setState(OperationState.ERROR)
throw ke
}
}

View File

@ -30,7 +30,7 @@ import org.apache.kyuubi.Logging
import org.apache.kyuubi.Utils.stringifyException
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
import org.apache.kyuubi.engine.spark.events.{EngineEventsStore, SessionEvent, SparkStatementEvent}
import org.apache.kyuubi.engine.spark.events.{EngineEventsStore, SessionEvent, SparkOperationEvent}
import org.apache.kyuubi.service.{Serverable, ServiceState}
/**
@ -119,7 +119,7 @@ class SparkSQLEngineListener(
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case e: SessionEvent => updateSessionStore(e)
case e: SparkStatementEvent => updateStatementStore(e)
case e: SparkOperationEvent => updateStatementStore(e)
case _ => // Ignore
}
}
@ -128,7 +128,7 @@ class SparkSQLEngineListener(
store.saveSession(event)
}
private def updateStatementStore(event: SparkStatementEvent): Unit = {
private def updateStatementStore(event: SparkOperationEvent): Unit = {
store.saveStatement(event)
}
}

View File

@ -30,7 +30,7 @@ import org.apache.spark.ui.TableSourceUtil._
import org.apache.spark.ui.UIUtils._
import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
import org.apache.kyuubi.engine.spark.events.{SessionEvent, SparkStatementEvent}
import org.apache.kyuubi.engine.spark.events.{SessionEvent, SparkOperationEvent}
case class EnginePage(parent: EngineTab) extends WebUIPage("") {
private val store = parent.engine.store
@ -280,10 +280,10 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") {
private class StatementStatsPagedTable(
request: HttpServletRequest,
parent: EngineTab,
data: Seq[SparkStatementEvent],
data: Seq[SparkOperationEvent],
subPath: String,
basePath: String,
sqlStatsTableTag: String) extends PagedTable[SparkStatementEvent] {
sqlStatsTableTag: String) extends PagedTable[SparkOperationEvent] {
private val (sortColumn, desc, pageSize) =
getRequestTableParameters(request, sqlStatsTableTag, "Create Time")
@ -339,32 +339,32 @@ private class StatementStatsPagedTable(
sqlStatsTableTag)
}
override def row(sparkStatementEvent: SparkStatementEvent): Seq[Node] = {
override def row(event: SparkOperationEvent): Seq[Node] = {
<tr>
<td>
{sparkStatementEvent.username}
{event.sessionUser}
</td>
<td>
{sparkStatementEvent.statementId}
{event.statementId}
</td>
<td >
{formatDate(sparkStatementEvent.createTime)}
{formatDate(event.createTime)}
</td>
<td>
{if (sparkStatementEvent.completeTime > 0) formatDate(sparkStatementEvent.completeTime)}
{if (event.completeTime > 0) formatDate(event.completeTime)}
</td>
<td >
{formatDurationVerbose(sparkStatementEvent.duration)}
{formatDurationVerbose(event.duration)}
</td>
<td>
<span class="description-input">
{sparkStatementEvent.statement}
{event.statement}
</span>
</td>
<td>
{sparkStatementEvent.state}
{event.state}
</td>
{errorMessageCell(sparkStatementEvent.exception)}
{errorMessageCell(event.exception.map(Utils.stringifyException).getOrElse(""))}
</tr>
}
@ -421,24 +421,24 @@ private class SessionStatsTableDataSource(
}
private class StatementStatsTableDataSource(
info: Seq[SparkStatementEvent],
info: Seq[SparkOperationEvent],
pageSize: Int,
sortColumn: String,
desc: Boolean) extends PagedDataSource[SparkStatementEvent](pageSize) {
desc: Boolean) extends PagedDataSource[SparkOperationEvent](pageSize) {
// Sorting SessionEvent data
private val data = info.sorted(ordering(sortColumn, desc))
override def dataSize: Int = data.size
override def sliceData(from: Int, to: Int): Seq[SparkStatementEvent] = data.slice(from, to)
override def sliceData(from: Int, to: Int): Seq[SparkOperationEvent] = data.slice(from, to)
/**
* Return Ordering according to sortColumn and desc.
*/
private def ordering(sortColumn: String, desc: Boolean): Ordering[SparkStatementEvent] = {
val ordering: Ordering[SparkStatementEvent] = sortColumn match {
case "User" => Ordering.by(_.username)
private def ordering(sortColumn: String, desc: Boolean): Ordering[SparkOperationEvent] = {
val ordering: Ordering[SparkOperationEvent] = sortColumn match {
case "User" => Ordering.by(_.sessionUser)
case "Statement ID" => Ordering.by(_.statementId)
case "Create Time" => Ordering.by(_.createTime)
case "Finish Time" => Ordering.by(_.completeTime)

View File

@ -82,9 +82,45 @@ class EngineEventsStoreSuite extends KyuubiFunSuite {
test("ensure that the statements are stored in order") {
val store = new EngineEventsStore(KyuubiConf())
val s1 = SparkStatementEvent("a", "ea1", "select 1", "app1", "sid1", 1L, "RUNNING", 2L)
val s2 = SparkStatementEvent("c", "ea2", "select 2", "app2", "sid1", 2L, "RUNNING", 2L)
val s3 = SparkStatementEvent("b", "ea3", "select 3", "app3", "sid1", 3L, "RUNNING", 2L)
val s1 = SparkOperationEvent(
"ea1",
"select 1",
true,
"RUNNING",
1L,
1L,
1L,
2L,
None,
"sid1",
"a",
"")
val s2 = SparkOperationEvent(
"ea2",
"select 2",
true,
"RUNNING",
2L,
2L,
2L,
4L,
None,
"sid1",
"c",
"")
val s3 = SparkOperationEvent(
"ea3",
"select 3",
true,
"RUNNING",
3L,
3L,
3L,
6L,
None,
"sid1",
"b",
"")
store.saveStatement(s1)
store.saveStatement(s2)
@ -101,7 +137,19 @@ class EngineEventsStoreSuite extends KyuubiFunSuite {
val store = new EngineEventsStore(conf)
for (i <- 1 to 5) {
val s = SparkStatementEvent("a", s"ea1${i}", "select 1", "app1", "sid1", 1L, "RUNNING", 2L)
val s = SparkOperationEvent(
s"ea1${i}",
"select 1",
true,
"RUNNING",
1L,
1L,
1L,
2L,
None,
"sid1",
"a",
"")
store.saveStatement(s)
}
@ -114,10 +162,58 @@ class EngineEventsStoreSuite extends KyuubiFunSuite {
val store = new EngineEventsStore(conf)
store.saveStatement(SparkStatementEvent("a", "s1", "select 1", "a1", "si1", 1L, "RUNNING", -1L))
store.saveStatement(SparkStatementEvent("a", "s2", "select 1", "a2", "si1", 2L, "RUNNING", -1L))
store.saveStatement(SparkStatementEvent("a", "s3", "1", "a3", "si1", 3L, "ERROR", 3L, 3L))
store.saveStatement(SparkStatementEvent("a", "s4", "select 1", "a4", "si1", 4L, "RUNNING", -1L))
store.saveStatement(SparkOperationEvent(
"s1",
"select 1",
true,
"RUNNING",
1L,
1L,
1L,
-1L,
None,
"sid1",
"a",
""))
store.saveStatement(SparkOperationEvent(
"s2",
"select 1",
true,
"RUNNING",
2L,
2L,
2L,
-1L,
None,
"sid1",
"a",
""))
store.saveStatement(SparkOperationEvent(
"s3",
"select 1",
true,
"RUNNING",
3L,
3L,
3L,
3L,
None,
"sid1",
"a",
""))
store.saveStatement(SparkOperationEvent(
"s4",
"select 1",
true,
"RUNNING",
4L,
4L,
4L,
-1L,
None,
"sid1",
"a",
""))
assert(store.getStatementList.size == 3)
assert(store.getStatementList(2).statementId == "s4")

View File

@ -106,7 +106,7 @@ class EventLoggingServiceSuite extends WithSparkSQLEngine with HiveJDBCTestHelpe
test("statementEvent: generate, dump and query") {
val statementEventPath = Paths.get(
logRoot,
"spark_statement",
"spark_operation",
s"day=$currentDate",
spark.sparkContext.applicationId + ".json")
val sql = "select timestamp'2021-09-01';"

View File

@ -38,7 +38,7 @@ abstract class AbstractOperation(opType: OperationType, session: Session)
session.sessionManager.getConf.get(OPERATION_IDLE_TIMEOUT)
}
final protected val statementId = handle.identifier.toString
final private[kyuubi] val statementId = handle.identifier.toString
override def getOperationLog: Option[OperationLog] = None

View File

@ -70,7 +70,7 @@ object KyuubiOperationEvent {
val session = operation.getSession
val status = operation.getStatus
new KyuubiOperationEvent(
operation.getHandle.identifier.toString,
operation.statementId,
Option(operation.remoteOpHandle()).map(OperationHandle(_).identifier.toString).orNull,
operation.statement,
operation.shouldRunAsync,

View File

@ -58,7 +58,7 @@ class EventLoggingServiceSuite extends WithKyuubiServer with HiveJDBCTestHelper
val serverStatementEventPath =
Paths.get(serverLogRoot, "kyuubi_operation", s"day=$currentDate", s"server-$hostName.json")
val engineStatementEventPath =
Paths.get(engineLogRoot, "spark_statement", s"day=$currentDate", "*.json")
Paths.get(engineLogRoot, "spark_operation", s"day=$currentDate", "*.json")
val sql = "select timestamp'2021-06-01'"
withJdbcStatement() { statement =>
@ -86,9 +86,9 @@ class EventLoggingServiceSuite extends WithKyuubiServer with HiveJDBCTestHelper
stateIndex = 0
while (resultSet2.next()) {
assert(resultSet2.getString("Event") ==
"org.apache.kyuubi.engine.spark.events.SparkStatementEvent")
"org.apache.kyuubi.engine.spark.events.SparkOperationEvent")
assert(resultSet2.getString("statement") == sql)
assert(resultSet2.getString("state") == engineStates(stateIndex).toString)
assert(resultSet2.getString("state") == engineStates(stateIndex).name())
stateIndex += 1
}
}