diff --git a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/DataGenerator.scala b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/DataGenerator.scala index 8b8c5752d..fb275d86b 100644 --- a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/DataGenerator.scala +++ b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/DataGenerator.scala @@ -570,31 +570,31 @@ object DataGenerator { $"web_tax_percentage" .decimal(5, 2)) // format: on - Seq( - store_sales, - store_returns, - catalog_sales, - catalog_returns, - web_sales, - web_returns, - inventory, - catalog_page, - call_center, - customer, - customer_address, - customer_demographics, - household_demographics, - store, - warehouse, - item, - income_band, - web_site, - web_page, - date_dim, - time_dim, - ship_mode, - reason, - promotion) + Seq( + store_sales, + store_returns, + catalog_sales, + catalog_returns, + web_sales, + web_returns, + inventory, + catalog_page, + call_center, + customer, + customer_address, + customer_demographics, + household_demographics, + store, + warehouse, + item, + income_band, + web_site, + web_page, + date_dim, + time_dim, + ship_mode, + reason, + promotion) } def run(config: RunConfig): Unit = { diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala index f6780a239..a3565362e 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala @@ -136,10 +136,10 @@ abstract class FlinkOperation( setOperationException(KyuubiSQLException(errMsg)) warn(s"Ignore exception in terminal state with $statementId: $errMsg") } else { - setState(OperationState.ERROR) error(s"Error operating $opType: $errMsg", e) val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e) setOperationException(ke) + setState(OperationState.ERROR) throw ke } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala index 260976d8a..d05cd8e27 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala @@ -92,23 +92,23 @@ class EngineEventsStore(conf: KyuubiConf) { /** * store all statements events. */ - val statements = new ConcurrentHashMap[String, SparkStatementEvent] + val statements = new ConcurrentHashMap[String, SparkOperationEvent] /** * get all statement events order by startTime */ - def getStatementList: Seq[SparkStatementEvent] = { + def getStatementList: Seq[SparkOperationEvent] = { statements.values().asScala.toSeq.sortBy(_.createTime) } - def getStatement(statementId: String): Option[SparkStatementEvent] = { + def getStatement(statementId: String): Option[SparkOperationEvent] = { Option(statements.get(statementId)) } /** * save statement events and check the capacity threshold */ - def saveStatement(statementEvent: SparkStatementEvent): Unit = { + def saveStatement(statementEvent: SparkOperationEvent): Unit = { statements.put(statementEvent.statementId, statementEvent) checkStatementCapacity() } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala new file mode 100644 index 000000000..489d90124 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.events + +import org.apache.spark.sql.{DataFrame, Encoders} +import org.apache.spark.sql.types.StructType + +import org.apache.kyuubi.Utils +import org.apache.kyuubi.engine.spark.operation.SparkOperation + +/** + * A [[SparkOperationEvent]] used to tracker the lifecycle of an operation at Spark SQL Engine side. + * + * + * @param statementId the unique identifier of a single operation + * @param statement the sql that you execute + * @param shouldRunAsync the flag indicating whether the query runs synchronously or not + * @param state the current operation state + * @param eventTime the time when the event created & logged + * @param createTime the time for changing to the current operation state + * @param startTime the time the query start to time of this operation + * @param completeTime time time the query ends + * @param exception: caught exception if have + * @param sessionId the identifier of the parent session + * @param sessionUser the authenticated client user + * @param queryExecution the query execution of this operation + */ +case class SparkOperationEvent( + statementId: String, + statement: String, + shouldRunAsync: Boolean, + state: String, + eventTime: Long, + createTime: Long, + startTime: Long, + completeTime: Long, + exception: Option[Throwable], + sessionId: String, + sessionUser: String, + queryExecution: String) extends KyuubiSparkEvent { + + override def schema: StructType = Encoders.product[SparkOperationEvent].schema + override def partitions: Seq[(String, String)] = + ("day", Utils.getDateFromTimestamp(createTime)) :: Nil + + def duration: Long = { + if (completeTime == -1L) { + System.currentTimeMillis - createTime + } else { + completeTime - createTime + } + } +} + +object SparkOperationEvent { + def apply(operation: SparkOperation, result: Option[DataFrame] = None): SparkOperationEvent = { + val session = operation.getSession + val status = operation.getStatus + new SparkOperationEvent( + operation.statementId, + operation.statement, + operation.shouldRunAsync, + status.state.name(), + status.lastModified, + status.create, + status.start, + status.completed, + status.exception, + session.handle.identifier.toString, + session.user, + result.map(_.queryExecution.toString).orNull) + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkStatementEvent.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkStatementEvent.scala deleted file mode 100644 index 4995b6983..000000000 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkStatementEvent.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.engine.spark.events - -import org.apache.spark.sql.Encoders -import org.apache.spark.sql.types.StructType - -import org.apache.kyuubi.Utils - -/** - * @param statementId: the identifier of operationHandler - * @param statement: the sql that you execute - * @param appId: application id a.k.a, the unique id for engine - * @param sessionId: the identifier of a session - * @param createTime: the create time of this statement - * @param state: store each state that the sql has - * @param eventTime: the time that the sql's state change - * @param queryExecution: contains logicPlan and physicalPlan - * @param exception: caught exception if have - */ -case class SparkStatementEvent( - username: String, - statementId: String, - statement: String, - appId: String, - sessionId: String, - createTime: Long, - var state: String, - var eventTime: Long, - var completeTime: Long = -1L, - var queryExecution: String = "", - var exception: String = "") extends KyuubiSparkEvent { - - override def schema: StructType = Encoders.product[SparkStatementEvent].schema - override def partitions: Seq[(String, String)] = - ("day", Utils.getDateFromTimestamp(createTime)) :: Nil - - def duration: Long = { - if (completeTime == -1L) { - System.currentTimeMillis - createTime - } else { - completeTime - createTime - } - } -} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index 3ecdff3cc..495ae48f3 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.types._ import org.apache.kyuubi.{KyuubiSQLException, Logging} import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._ -import org.apache.kyuubi.engine.spark.events.{EventLoggingService, SparkStatementEvent} +import org.apache.kyuubi.engine.spark.events.{EventLoggingService, SparkOperationEvent} import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator, OperationState, OperationType} import org.apache.kyuubi.operation.OperationState.OperationState import org.apache.kyuubi.operation.log.OperationLog @@ -49,16 +49,7 @@ class ExecuteStatement( private val operationListener: SQLOperationListener = new SQLOperationListener(this, spark) - val statementEvent: SparkStatementEvent = SparkStatementEvent( - session.user, - statementId, - statement, - spark.sparkContext.applicationId, - session.handle.identifier.toString, - lastAccessTime, - state.toString, - lastAccessTime) - EventLoggingService.onEvent(statementEvent) + EventLoggingService.onEvent(SparkOperationEvent(this)) override protected def resultSchema: StructType = { if (result == null || result.schema.isEmpty) { @@ -87,7 +78,6 @@ class ExecuteStatement( spark.sparkContext.addSparkListener(operationListener) result = spark.sql(statement) // TODO #921: COMPILED need consider eagerly executed commands - statementEvent.queryExecution = result.queryExecution.toString() setState(OperationState.COMPILED) debug(result.queryExecution) iter = @@ -156,17 +146,6 @@ class ExecuteStatement( override def setState(newState: OperationState): Unit = { super.setState(newState) - statementEvent.state = newState.toString - statementEvent.eventTime = lastAccessTime - if (newState == OperationState.ERROR || newState == OperationState.FINISHED) { - statementEvent.completeTime = System.currentTimeMillis() - } - EventLoggingService.onEvent(statementEvent) - } - - override def setOperationException(opEx: KyuubiSQLException): Unit = { - super.setOperationException(opEx) - statementEvent.exception = opEx.toString - EventLoggingService.onEvent(statementEvent) + EventLoggingService.onEvent(SparkOperationEvent(this, Option(result))) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index 3674fb5db..f0a6c6f32 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -130,10 +130,10 @@ abstract class SparkOperation(opType: OperationType, session: Session) setOperationException(KyuubiSQLException(errMsg)) warn(s"Ignore exception in terminal state with $statementId: $errMsg") } else { - setState(OperationState.ERROR) error(s"Error operating $opType: $errMsg", e) val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e) setOperationException(ke) + setState(OperationState.ERROR) throw ke } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala index c85d2f52c..77414dbe3 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala @@ -30,7 +30,7 @@ import org.apache.kyuubi.Logging import org.apache.kyuubi.Utils.stringifyException import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY -import org.apache.kyuubi.engine.spark.events.{EngineEventsStore, SessionEvent, SparkStatementEvent} +import org.apache.kyuubi.engine.spark.events.{EngineEventsStore, SessionEvent, SparkOperationEvent} import org.apache.kyuubi.service.{Serverable, ServiceState} /** @@ -119,7 +119,7 @@ class SparkSQLEngineListener( override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { case e: SessionEvent => updateSessionStore(e) - case e: SparkStatementEvent => updateStatementStore(e) + case e: SparkOperationEvent => updateStatementStore(e) case _ => // Ignore } } @@ -128,7 +128,7 @@ class SparkSQLEngineListener( store.saveSession(event) } - private def updateStatementStore(event: SparkStatementEvent): Unit = { + private def updateStatementStore(event: SparkOperationEvent): Unit = { store.saveStatement(event) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala index 32c7d6b35..00c5ff1d9 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala @@ -30,7 +30,7 @@ import org.apache.spark.ui.TableSourceUtil._ import org.apache.spark.ui.UIUtils._ import org.apache.kyuubi.{KYUUBI_VERSION, Utils} -import org.apache.kyuubi.engine.spark.events.{SessionEvent, SparkStatementEvent} +import org.apache.kyuubi.engine.spark.events.{SessionEvent, SparkOperationEvent} case class EnginePage(parent: EngineTab) extends WebUIPage("") { private val store = parent.engine.store @@ -280,10 +280,10 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") { private class StatementStatsPagedTable( request: HttpServletRequest, parent: EngineTab, - data: Seq[SparkStatementEvent], + data: Seq[SparkOperationEvent], subPath: String, basePath: String, - sqlStatsTableTag: String) extends PagedTable[SparkStatementEvent] { + sqlStatsTableTag: String) extends PagedTable[SparkOperationEvent] { private val (sortColumn, desc, pageSize) = getRequestTableParameters(request, sqlStatsTableTag, "Create Time") @@ -339,32 +339,32 @@ private class StatementStatsPagedTable( sqlStatsTableTag) } - override def row(sparkStatementEvent: SparkStatementEvent): Seq[Node] = { + override def row(event: SparkOperationEvent): Seq[Node] = { - {sparkStatementEvent.username} + {event.sessionUser} - {sparkStatementEvent.statementId} + {event.statementId} - {formatDate(sparkStatementEvent.createTime)} + {formatDate(event.createTime)} - {if (sparkStatementEvent.completeTime > 0) formatDate(sparkStatementEvent.completeTime)} + {if (event.completeTime > 0) formatDate(event.completeTime)} - {formatDurationVerbose(sparkStatementEvent.duration)} + {formatDurationVerbose(event.duration)} - {sparkStatementEvent.statement} + {event.statement} - {sparkStatementEvent.state} + {event.state} - {errorMessageCell(sparkStatementEvent.exception)} + {errorMessageCell(event.exception.map(Utils.stringifyException).getOrElse(""))} } @@ -421,24 +421,24 @@ private class SessionStatsTableDataSource( } private class StatementStatsTableDataSource( - info: Seq[SparkStatementEvent], + info: Seq[SparkOperationEvent], pageSize: Int, sortColumn: String, - desc: Boolean) extends PagedDataSource[SparkStatementEvent](pageSize) { + desc: Boolean) extends PagedDataSource[SparkOperationEvent](pageSize) { // Sorting SessionEvent data private val data = info.sorted(ordering(sortColumn, desc)) override def dataSize: Int = data.size - override def sliceData(from: Int, to: Int): Seq[SparkStatementEvent] = data.slice(from, to) + override def sliceData(from: Int, to: Int): Seq[SparkOperationEvent] = data.slice(from, to) /** * Return Ordering according to sortColumn and desc. */ - private def ordering(sortColumn: String, desc: Boolean): Ordering[SparkStatementEvent] = { - val ordering: Ordering[SparkStatementEvent] = sortColumn match { - case "User" => Ordering.by(_.username) + private def ordering(sortColumn: String, desc: Boolean): Ordering[SparkOperationEvent] = { + val ordering: Ordering[SparkOperationEvent] = sortColumn match { + case "User" => Ordering.by(_.sessionUser) case "Statement ID" => Ordering.by(_.statementId) case "Create Time" => Ordering.by(_.createTime) case "Finish Time" => Ordering.by(_.completeTime) diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala index 666d006b1..d84a9ef73 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala @@ -82,9 +82,45 @@ class EngineEventsStoreSuite extends KyuubiFunSuite { test("ensure that the statements are stored in order") { val store = new EngineEventsStore(KyuubiConf()) - val s1 = SparkStatementEvent("a", "ea1", "select 1", "app1", "sid1", 1L, "RUNNING", 2L) - val s2 = SparkStatementEvent("c", "ea2", "select 2", "app2", "sid1", 2L, "RUNNING", 2L) - val s3 = SparkStatementEvent("b", "ea3", "select 3", "app3", "sid1", 3L, "RUNNING", 2L) + val s1 = SparkOperationEvent( + "ea1", + "select 1", + true, + "RUNNING", + 1L, + 1L, + 1L, + 2L, + None, + "sid1", + "a", + "") + val s2 = SparkOperationEvent( + "ea2", + "select 2", + true, + "RUNNING", + 2L, + 2L, + 2L, + 4L, + None, + "sid1", + "c", + "") + val s3 = SparkOperationEvent( + "ea3", + "select 3", + true, + "RUNNING", + 3L, + 3L, + 3L, + 6L, + None, + "sid1", + "b", + "") store.saveStatement(s1) store.saveStatement(s2) @@ -101,7 +137,19 @@ class EngineEventsStoreSuite extends KyuubiFunSuite { val store = new EngineEventsStore(conf) for (i <- 1 to 5) { - val s = SparkStatementEvent("a", s"ea1${i}", "select 1", "app1", "sid1", 1L, "RUNNING", 2L) + val s = SparkOperationEvent( + s"ea1${i}", + "select 1", + true, + "RUNNING", + 1L, + 1L, + 1L, + 2L, + None, + "sid1", + "a", + "") store.saveStatement(s) } @@ -114,10 +162,58 @@ class EngineEventsStoreSuite extends KyuubiFunSuite { val store = new EngineEventsStore(conf) - store.saveStatement(SparkStatementEvent("a", "s1", "select 1", "a1", "si1", 1L, "RUNNING", -1L)) - store.saveStatement(SparkStatementEvent("a", "s2", "select 1", "a2", "si1", 2L, "RUNNING", -1L)) - store.saveStatement(SparkStatementEvent("a", "s3", "1", "a3", "si1", 3L, "ERROR", 3L, 3L)) - store.saveStatement(SparkStatementEvent("a", "s4", "select 1", "a4", "si1", 4L, "RUNNING", -1L)) + store.saveStatement(SparkOperationEvent( + "s1", + "select 1", + true, + "RUNNING", + 1L, + 1L, + 1L, + -1L, + None, + "sid1", + "a", + "")) + store.saveStatement(SparkOperationEvent( + "s2", + "select 1", + true, + "RUNNING", + 2L, + 2L, + 2L, + -1L, + None, + "sid1", + "a", + "")) + store.saveStatement(SparkOperationEvent( + "s3", + "select 1", + true, + "RUNNING", + 3L, + 3L, + 3L, + 3L, + None, + "sid1", + "a", + "")) + store.saveStatement(SparkOperationEvent( + "s4", + "select 1", + true, + "RUNNING", + 4L, + 4L, + 4L, + -1L, + None, + "sid1", + "a", + "")) assert(store.getStatementList.size == 3) assert(store.getStatementList(2).statementId == "s4") diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala index 217056bed..495ae597a 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala @@ -106,7 +106,7 @@ class EventLoggingServiceSuite extends WithSparkSQLEngine with HiveJDBCTestHelpe test("statementEvent: generate, dump and query") { val statementEventPath = Paths.get( logRoot, - "spark_statement", + "spark_operation", s"day=$currentDate", spark.sparkContext.applicationId + ".json") val sql = "select timestamp'2021-09-01';" diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala index b89f3f7cc..69239b18d 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala @@ -38,7 +38,7 @@ abstract class AbstractOperation(opType: OperationType, session: Session) session.sessionManager.getConf.get(OPERATION_IDLE_TIMEOUT) } - final protected val statementId = handle.identifier.toString + final private[kyuubi] val statementId = handle.identifier.toString override def getOperationLog: Option[OperationLog] = None diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala index 2a9bfd736..0ecb73267 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala @@ -70,7 +70,7 @@ object KyuubiOperationEvent { val session = operation.getSession val status = operation.getStatus new KyuubiOperationEvent( - operation.getHandle.identifier.toString, + operation.statementId, Option(operation.remoteOpHandle()).map(OperationHandle(_).identifier.toString).orNull, operation.statement, operation.shouldRunAsync, diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala index b8ecdd1e0..4d6098dbd 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala @@ -58,7 +58,7 @@ class EventLoggingServiceSuite extends WithKyuubiServer with HiveJDBCTestHelper val serverStatementEventPath = Paths.get(serverLogRoot, "kyuubi_operation", s"day=$currentDate", s"server-$hostName.json") val engineStatementEventPath = - Paths.get(engineLogRoot, "spark_statement", s"day=$currentDate", "*.json") + Paths.get(engineLogRoot, "spark_operation", s"day=$currentDate", "*.json") val sql = "select timestamp'2021-06-01'" withJdbcStatement() { statement => @@ -86,9 +86,9 @@ class EventLoggingServiceSuite extends WithKyuubiServer with HiveJDBCTestHelper stateIndex = 0 while (resultSet2.next()) { assert(resultSet2.getString("Event") == - "org.apache.kyuubi.engine.spark.events.SparkStatementEvent") + "org.apache.kyuubi.engine.spark.events.SparkOperationEvent") assert(resultSet2.getString("statement") == sql) - assert(resultSet2.getString("state") == engineStates(stateIndex).toString) + assert(resultSet2.getString("state") == engineStates(stateIndex).name()) stateIndex += 1 } }