[KYUUBI #903] Add event for statement

### _Why are the changes needed?_
- Through eventLog to write statementInfo into file

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

Closes #903 from zhang1002/branch-1.2_statementEvent.

Closes #903

27cc7741 [张宇翔] 1. Add some introduce 2. change som UT
7cf73bde [张宇翔] Merge branch 'master' into branch-1.2_statementEvent
b662a989 [张宇翔] Merge remote-tracking branch 'upstream/master'
fc868609 [张宇翔] resolve conflicts
5dfcf868 [张宇翔] resolve conflicts
b04adee8 [张宇翔] remove some unused code
4c8f3b87 [张宇翔] Merge remote-tracking branch 'upstream/master'
2a4317a5 [张宇翔] remove some ut
0c474cc4 [张宇翔] Add statement event
4e05a395 [张宇翔] Add statement event
5f73e247 [张宇翔] Add statement event
8b686767 [张宇翔] Merge remote-tracking branch 'upstream/master'
cf99e309 [张宇翔] Merge remote-tracking branch 'upstream/master'
0afaa578 [张宇翔] Merge remote-tracking branch 'upstream/master'
b24fea07 [张宇翔] Merge remote-tracking branch 'upstream/master'
e517cfc5 [张宇翔] Merge remote-tracking branch 'upstream/master'
18aebe76 [张宇翔] Merge remote-tracking branch 'upstream/master'
f248bef7 [张宇翔] Merge remote-tracking branch 'upstream/master'
5ffb54f3 [张宇翔] Add kyuubi-spark-monitor module for nightly.yml

Authored-by: 张宇翔 <zhang1002@126.com>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit d0c9c8ab25)
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
张宇翔 2021-08-18 09:56:43 +08:00 committed by Kent Yao
parent bb4d1e807e
commit e4c35d5883
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
4 changed files with 86 additions and 39 deletions

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.events
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.types.StructType
/**
*
* @param statementId: the identifier of operationHandler
* @param statement: the sql that you execute
* @param appId: application id a.k.a, the unique id for engine
* @param sessionId: the identifier of a session
* @param state: store each state that the sql has
* @param stateTime: the time that the sql's state change
* @param queryExecution: contains logicPlan and physicalPlan
* @param exeception: caught exeception if have
*/
case class StatementEvent(
statementId: String,
statement: String,
appId: String,
sessionId: String,
var state: String,
var stateTime: Long,
var queryExecution: String = "",
var exeception: String = "") extends KyuubiEvent {
override def schema: StructType = Encoders.product[StatementEvent].schema
}

View File

@ -19,8 +19,6 @@ package org.apache.kyuubi.engine.spark.operation
import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}
import scala.collection.mutable.Map
import org.apache.spark.kyuubi.SQLOperationListener
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._
@ -28,8 +26,7 @@ import org.apache.spark.sql.types._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.{ArrayFetchIterator, KyuubiSparkUtil}
import org.apache.kyuubi.engine.spark.monitor.KyuubiStatementMonitor
import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiStatementInfo
import org.apache.kyuubi.engine.spark.events.{EventLoggingService, StatementEvent}
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
@ -62,10 +59,10 @@ class ExecuteStatement(
private val operationListener: SQLOperationListener = new SQLOperationListener(this, spark)
private val kyuubiStatementInfo = KyuubiStatementInfo(
val statementEvent: StatementEvent = StatementEvent(
statementId, statement, spark.sparkContext.applicationId,
session.handle.identifier, Map(state -> lastAccessTime))
KyuubiStatementMonitor.putStatementInfoIntoQueue(kyuubiStatementInfo)
session.handle.identifier.toString, state.toString, lastAccessTime)
EventLoggingService.onEvent(statementEvent)
override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
@ -95,7 +92,8 @@ class ExecuteStatement(
result = spark.sql(statement)
// TODO( #921): COMPILED need consider eagerly executed commands
setState(OperationState.COMPILED)
kyuubiStatementInfo.queryExecution = result.queryExecution
statementEvent.queryExecution = result.queryExecution.toString()
EventLoggingService.onEvent(statementEvent)
debug(result.queryExecution)
iter = new ArrayFetchIterator(result.collect())
setState(OperationState.FINISHED)
@ -171,11 +169,14 @@ class ExecuteStatement(
override def setState(newState: OperationState): Unit = {
super.setState(newState)
kyuubiStatementInfo.stateToTime.put(newState, lastAccessTime)
statementEvent.state = newState.toString
statementEvent.stateTime = lastAccessTime
EventLoggingService.onEvent(statementEvent)
}
override def setOperationException(opEx: KyuubiSQLException): Unit = {
super.setOperationException(opEx)
kyuubiStatementInfo.exception = opEx
statementEvent.exeception = opEx.toString
EventLoggingService.onEvent(statementEvent)
}
}

View File

@ -17,7 +17,7 @@
package org.apache.kyuubi.engine.spark
import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
import java.util.concurrent.ConcurrentHashMap
import org.apache.hive.service.rpc.thrift._
import org.apache.hive.service.rpc.thrift.TCLIService.Iface
@ -27,7 +27,7 @@ import org.scalatest.PrivateMethodTester
import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.engine.spark.monitor.KyuubiStatementMonitor
import org.apache.kyuubi.engine.spark.monitor.entity.{KyuubiJobInfo, KyuubiStatementInfo}
import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiJobInfo
import org.apache.kyuubi.operation.{HiveJDBCTests, OperationHandle}
class KyuubiStatementMonitorSuite extends WithSparkSQLEngine with HiveJDBCTests
@ -36,32 +36,6 @@ class KyuubiStatementMonitorSuite extends WithSparkSQLEngine with HiveJDBCTests
override protected def jdbcUrl: String = getJdbcUrl
override def withKyuubiConf: Map[String, String] = Map.empty
test("add kyuubiStatementInfo into queue") {
var baseSql = "select timestamp'2021-06-0"
val total: Int = 7
// Clear kyuubiStatementQueue first
val getQueue = PrivateMethod[
ArrayBlockingQueue[KyuubiStatementInfo]](Symbol("kyuubiStatementQueue"))()
val kyuubiStatementQueue = KyuubiStatementMonitor.invokePrivate(getQueue)
kyuubiStatementQueue.clear()
withSessionHandle { (client, handle) =>
for ( a <- 1 to total ) {
val sql = baseSql + a + "'"
val req = new TExecuteStatementReq()
req.setSessionHandle(handle)
req.setStatement(sql)
val tExecuteStatementResp = client.ExecuteStatement(req)
val operationHandle = tExecuteStatementResp.getOperationHandle
val kyuubiStatementInfo = kyuubiStatementQueue.poll()
assert(
kyuubiStatementInfo.statementId === OperationHandle(operationHandle).identifier.toString)
assert(sql === kyuubiStatementInfo.statement)
}
}
}
test("add kyuubiJobInfo into queue and remove them when threshold reached") {
val sql = "select timestamp'2021-06-01'"
val getJobMap = PrivateMethod[

View File

@ -20,10 +20,13 @@ package org.apache.kyuubi.engine.spark.events
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import org.apache.hive.service.rpc.thrift.TExecuteStatementReq
import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, WithSparkSQLEngine}
import org.apache.kyuubi.operation.JDBCTestUtils
import org.apache.kyuubi.operation.{JDBCTestUtils, OperationHandle}
class EventLoggingServiceSuite extends WithSparkSQLEngine with JDBCTestUtils {
import EventLoggerType._
@ -83,4 +86,28 @@ class EventLoggingServiceSuite extends WithSparkSQLEngine with JDBCTestUtils {
assert(rs.getInt("totalOperations") === 3)
}
}
test("statementEvent: generate, dump and query") {
val statementEventPath = Paths.get(logRoot.toString, "statement", engine.engineId + ".json")
val sql = "select timestamp'2021-06-01'"
withSessionHandle { (client, handle) =>
val table = statementEventPath.getParent
val req = new TExecuteStatementReq()
req.setSessionHandle(handle)
req.setStatement(sql)
val tExecuteStatementResp = client.ExecuteStatement(req)
val opHandle = tExecuteStatementResp.getOperationHandle
val statementId = OperationHandle(opHandle).identifier.toString
eventually(timeout(60.seconds), interval(5.seconds)) {
val result = spark.sql(s"select * from `json`.`${table}`")
.where(s"statementId = '${statementId}'")
assert(result.select("statementId").first().get(0) === statementId)
assert(result.count() >= 1)
assert(result.select("statement").first().get(0) === sql)
}
}
}
}