diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/StatementEvent.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/StatementEvent.scala new file mode 100644 index 000000000..026f6b021 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/StatementEvent.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.events + +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.types.StructType + +/** + * + * @param statementId: the identifier of operationHandler + * @param statement: the sql that you execute + * @param appId: application id a.k.a, the unique id for engine + * @param sessionId: the identifier of a session + * @param state: store each state that the sql has + * @param stateTime: the time that the sql's state change + * @param queryExecution: contains logicPlan and physicalPlan + * @param exeception: caught exeception if have + */ +case class StatementEvent( + statementId: String, + statement: String, + appId: String, + sessionId: String, + var state: String, + var stateTime: Long, + var queryExecution: String = "", + var exeception: String = "") extends KyuubiEvent { + + override def schema: StructType = Encoders.product[StatementEvent].schema +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index a932d4377..509608215 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -19,8 +19,6 @@ package org.apache.kyuubi.engine.spark.operation import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit} -import scala.collection.mutable.Map - import org.apache.spark.kyuubi.SQLOperationListener import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types._ @@ -28,8 +26,7 @@ import org.apache.spark.sql.types._ import org.apache.kyuubi.{KyuubiSQLException, Logging} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.spark.{ArrayFetchIterator, KyuubiSparkUtil} -import org.apache.kyuubi.engine.spark.monitor.KyuubiStatementMonitor -import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiStatementInfo +import org.apache.kyuubi.engine.spark.events.{EventLoggingService, StatementEvent} import org.apache.kyuubi.operation.{OperationState, OperationType} import org.apache.kyuubi.operation.OperationState.OperationState import org.apache.kyuubi.operation.log.OperationLog @@ -62,10 +59,10 @@ class ExecuteStatement( private val operationListener: SQLOperationListener = new SQLOperationListener(this, spark) - private val kyuubiStatementInfo = KyuubiStatementInfo( + val statementEvent: StatementEvent = StatementEvent( statementId, statement, spark.sparkContext.applicationId, - session.handle.identifier, Map(state -> lastAccessTime)) - KyuubiStatementMonitor.putStatementInfoIntoQueue(kyuubiStatementInfo) + session.handle.identifier.toString, state.toString, lastAccessTime) + EventLoggingService.onEvent(statementEvent) override protected def resultSchema: StructType = { if (result == null || result.schema.isEmpty) { @@ -95,7 +92,8 @@ class ExecuteStatement( result = spark.sql(statement) // TODO( #921): COMPILED need consider eagerly executed commands setState(OperationState.COMPILED) - kyuubiStatementInfo.queryExecution = result.queryExecution + statementEvent.queryExecution = result.queryExecution.toString() + EventLoggingService.onEvent(statementEvent) debug(result.queryExecution) iter = new ArrayFetchIterator(result.collect()) setState(OperationState.FINISHED) @@ -171,11 +169,14 @@ class ExecuteStatement( override def setState(newState: OperationState): Unit = { super.setState(newState) - kyuubiStatementInfo.stateToTime.put(newState, lastAccessTime) + statementEvent.state = newState.toString + statementEvent.stateTime = lastAccessTime + EventLoggingService.onEvent(statementEvent) } override def setOperationException(opEx: KyuubiSQLException): Unit = { super.setOperationException(opEx) - kyuubiStatementInfo.exception = opEx + statementEvent.exeception = opEx.toString + EventLoggingService.onEvent(statementEvent) } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala index 126103b48..ea6a97c41 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala @@ -17,7 +17,7 @@ package org.apache.kyuubi.engine.spark -import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap} +import java.util.concurrent.ConcurrentHashMap import org.apache.hive.service.rpc.thrift._ import org.apache.hive.service.rpc.thrift.TCLIService.Iface @@ -27,7 +27,7 @@ import org.scalatest.PrivateMethodTester import org.scalatest.time.SpanSugar._ import org.apache.kyuubi.engine.spark.monitor.KyuubiStatementMonitor -import org.apache.kyuubi.engine.spark.monitor.entity.{KyuubiJobInfo, KyuubiStatementInfo} +import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiJobInfo import org.apache.kyuubi.operation.{HiveJDBCTests, OperationHandle} class KyuubiStatementMonitorSuite extends WithSparkSQLEngine with HiveJDBCTests @@ -36,32 +36,6 @@ class KyuubiStatementMonitorSuite extends WithSparkSQLEngine with HiveJDBCTests override protected def jdbcUrl: String = getJdbcUrl override def withKyuubiConf: Map[String, String] = Map.empty - test("add kyuubiStatementInfo into queue") { - var baseSql = "select timestamp'2021-06-0" - val total: Int = 7 - // Clear kyuubiStatementQueue first - val getQueue = PrivateMethod[ - ArrayBlockingQueue[KyuubiStatementInfo]](Symbol("kyuubiStatementQueue"))() - val kyuubiStatementQueue = KyuubiStatementMonitor.invokePrivate(getQueue) - kyuubiStatementQueue.clear() - withSessionHandle { (client, handle) => - - for ( a <- 1 to total ) { - val sql = baseSql + a + "'" - val req = new TExecuteStatementReq() - req.setSessionHandle(handle) - req.setStatement(sql) - val tExecuteStatementResp = client.ExecuteStatement(req) - val operationHandle = tExecuteStatementResp.getOperationHandle - - val kyuubiStatementInfo = kyuubiStatementQueue.poll() - assert( - kyuubiStatementInfo.statementId === OperationHandle(operationHandle).identifier.toString) - assert(sql === kyuubiStatementInfo.statement) - } - } - } - test("add kyuubiJobInfo into queue and remove them when threshold reached") { val sql = "select timestamp'2021-06-01'" val getJobMap = PrivateMethod[ diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala index c67442c54..142994d18 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala @@ -20,10 +20,13 @@ package org.apache.kyuubi.engine.spark.events import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} +import org.apache.hive.service.rpc.thrift.TExecuteStatementReq +import org.scalatest.time.SpanSugar._ + import org.apache.kyuubi.Utils import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, WithSparkSQLEngine} -import org.apache.kyuubi.operation.JDBCTestUtils +import org.apache.kyuubi.operation.{JDBCTestUtils, OperationHandle} class EventLoggingServiceSuite extends WithSparkSQLEngine with JDBCTestUtils { import EventLoggerType._ @@ -83,4 +86,28 @@ class EventLoggingServiceSuite extends WithSparkSQLEngine with JDBCTestUtils { assert(rs.getInt("totalOperations") === 3) } } + + test("statementEvent: generate, dump and query") { + val statementEventPath = Paths.get(logRoot.toString, "statement", engine.engineId + ".json") + val sql = "select timestamp'2021-06-01'" + withSessionHandle { (client, handle) => + + val table = statementEventPath.getParent + val req = new TExecuteStatementReq() + req.setSessionHandle(handle) + req.setStatement(sql) + val tExecuteStatementResp = client.ExecuteStatement(req) + val opHandle = tExecuteStatementResp.getOperationHandle + val statementId = OperationHandle(opHandle).identifier.toString + + eventually(timeout(60.seconds), interval(5.seconds)) { + val result = spark.sql(s"select * from `json`.`${table}`") + .where(s"statementId = '${statementId}'") + + assert(result.select("statementId").first().get(0) === statementId) + assert(result.count() >= 1) + assert(result.select("statement").first().get(0) === sql) + } + } + } }