[KYUUBI #4035] Post SparkOperationEvent for all SparkOperations and show sessionId for statements

### _Why are the changes needed?_

Before, we can only see sql query on the kyuubi query engine tab.
<img width="1727" alt="image" src="https://user-images.githubusercontent.com/6757692/209688435-5fc45e5f-2aa9-4991-9cad-1268fc61fc66.png">

After this pr, we can see both scala & python code in the kyuubi query engine tab.

<img width="1727" alt="image" src="https://user-images.githubusercontent.com/6757692/209742783-9867d42d-b04d-403d-872a-c501db483ac6.png">

Also support to show sessionId.
![image](https://user-images.githubusercontent.com/6757692/209756690-b3eb5465-5d29-4983-9178-c86361eb9d13.png)

It also close #2886
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [x] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4035 from turboFei/scala_python_event.

Closes #4035

33c3a4879 [fwang12] refactor
20dfa874e [fwang12] fix ut
d68fce9a3 [fwang12] support progress
8a86e3235 [fwang12] refactor
568ef8624 [fwang12] sessionId
1d623a29a [fwang12] h link
613e829bb [fwang12] show session id
2f120f1d3 [fwang12] add operation listener
529d91c89 [fwang12] move to spark operation
af21769a7 [fwang12] SQL => Statement
713c593ce [fwang12] scala python

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
fwang12 2022-12-29 11:13:45 +08:00
parent 86d880e9e7
commit a96df17bcd
7 changed files with 73 additions and 62 deletions

View File

@ -53,6 +53,7 @@ class ExecutePython(
private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
override protected def supportProgress: Boolean = true
override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
@ -80,6 +81,7 @@ class ExecutePython(
try {
setState(OperationState.RUNNING)
info(diagnostics)
addOperationListener()
val response = worker.runCode(statement)
val status = response.map(_.content.status).getOrElse("UNKNOWN_STATUS")
if (PythonResponse.OK_STATUS.equalsIgnoreCase(status)) {

View File

@ -56,6 +56,7 @@ class ExecuteScala(
private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
override protected def supportProgress: Boolean = true
override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
@ -80,6 +81,7 @@ class ExecuteScala(
setState(OperationState.RUNNING)
info(diagnostics)
Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
addOperationListener()
val legacyOutput = repl.getOutput
if (legacyOutput.nonEmpty) {
warn(s"Clearing legacy output from last interpreting:\n $legacyOutput")

View File

@ -21,19 +21,14 @@ import java.util.concurrent.RejectedExecutionException
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.TProgressUpdateResp
import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.kyuubi.SparkDatasetHelper
import org.apache.spark.sql.types._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE}
import org.apache.kyuubi.config.KyuubiConf.OPERATION_RESULT_MAX_ROWS
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.events.SparkOperationEvent
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator, OperationState, OperationStatus}
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator, OperationState}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@ -47,26 +42,7 @@ class ExecuteStatement(
private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
private val operationSparkListenerEnabled =
spark.conf.getOption(OPERATION_SPARK_LISTENER_ENABLED.key) match {
case Some(s) => s.toBoolean
case _ => session.sessionManager.getConf.get(OPERATION_SPARK_LISTENER_ENABLED)
}
private val operationListener: Option[SQLOperationListener] =
if (operationSparkListenerEnabled) {
Some(new SQLOperationListener(this, spark))
} else {
None
}
private val progressEnable = spark.conf.getOption(SESSION_PROGRESS_ENABLE.key) match {
case Some(s) => s.toBoolean
case _ => session.sessionManager.getConf.get(SESSION_PROGRESS_ENABLE)
}
EventBus.post(SparkOperationEvent(this))
override protected def supportProgress: Boolean = true
override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
@ -91,7 +67,7 @@ class ExecuteStatement(
setState(OperationState.RUNNING)
info(diagnostics)
Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
operationListener.foreach(spark.sparkContext.addSparkListener(_))
addOperationListener()
result = spark.sql(statement)
iter =
@ -167,31 +143,6 @@ class ExecuteStatement(
}
}
override def cleanup(targetState: OperationState): Unit = {
operationListener.foreach(_.cleanup())
super.cleanup(targetState)
}
override def setState(newState: OperationState): Unit = {
super.setState(newState)
EventBus.post(
SparkOperationEvent(this, operationListener.flatMap(_.getExecutionId)))
}
override def getStatus: OperationStatus = {
if (progressEnable) {
val progressMonitor = new SparkProgressMonitor(spark, statementId)
setOperationJobProgress(new TProgressUpdateResp(
progressMonitor.headers,
progressMonitor.rows,
progressMonitor.progressedPercentage,
progressMonitor.executionStatus,
progressMonitor.footerSummary,
startTime))
}
super.getStatus
}
def setCompiledStateIfNeeded(): Unit = synchronized {
if (getStatus.state == OperationState.RUNNING) {
val lastAccessCompiledTime =
@ -206,12 +157,10 @@ class ExecuteStatement(
} else {
0L
}
super.setState(OperationState.COMPILED)
setState(OperationState.COMPILED)
if (lastAccessCompiledTime > 0L) {
lastAccessTime = lastAccessCompiledTime
}
EventBus.post(
SparkOperationEvent(this, operationListener.flatMap(_.getExecutionId)))
}
}

View File

@ -20,20 +20,23 @@ package org.apache.kyuubi.engine.spark.operation
import java.io.IOException
import java.time.ZoneId
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressUpdateResp, TRowSet}
import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener}
import org.apache.spark.kyuubi.SparkUtilsHelper.redact
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.SESSION_USER_SIGN_ENABLED
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE, SESSION_USER_SIGN_ENABLED}
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_KEY, KYUUBI_SESSION_USER_SIGN, KYUUBI_STATEMENT_ID_KEY}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SCHEDULER_POOL_KEY
import org.apache.kyuubi.engine.spark.events.SparkOperationEvent
import org.apache.kyuubi.engine.spark.operation.SparkOperation.TIMEZONE_KEY
import org.apache.kyuubi.engine.spark.schema.{RowSet, SchemaHelper}
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.operation.{AbstractOperation, FetchIterator, OperationState}
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.operation.{AbstractOperation, FetchIterator, OperationState, OperationStatus}
import org.apache.kyuubi.operation.FetchOrientation._
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
@ -59,7 +62,46 @@ abstract class SparkOperation(session: Session)
override def redactedStatement: String =
redact(spark.sessionState.conf.stringRedactionPattern, statement)
protected val operationSparkListenerEnabled =
spark.conf.getOption(OPERATION_SPARK_LISTENER_ENABLED.key) match {
case Some(s) => s.toBoolean
case _ => session.sessionManager.getConf.get(OPERATION_SPARK_LISTENER_ENABLED)
}
protected val operationListener: Option[SQLOperationListener] =
if (operationSparkListenerEnabled) {
Some(new SQLOperationListener(this, spark))
} else {
None
}
protected def addOperationListener(): Unit = {
operationListener.foreach(spark.sparkContext.addSparkListener(_))
}
private val progressEnable = spark.conf.getOption(SESSION_PROGRESS_ENABLE.key) match {
case Some(s) => s.toBoolean
case _ => session.sessionManager.getConf.get(SESSION_PROGRESS_ENABLE)
}
protected def supportProgress: Boolean = false
override def getStatus: OperationStatus = {
if (progressEnable && supportProgress) {
val progressMonitor = new SparkProgressMonitor(spark, statementId)
setOperationJobProgress(new TProgressUpdateResp(
progressMonitor.headers,
progressMonitor.rows,
progressMonitor.progressedPercentage,
progressMonitor.executionStatus,
progressMonitor.footerSummary,
startTime))
}
super.getStatus
}
override def cleanup(targetState: OperationState): Unit = state.synchronized {
operationListener.foreach(_.cleanup())
if (!isTerminalState(state)) {
setState(targetState)
Option(getBackgroundHandle).foreach(_.cancel(true))
@ -78,6 +120,13 @@ abstract class SparkOperation(session: Session)
s"spark.${SESSION_USER_SIGN_ENABLED.key}",
SESSION_USER_SIGN_ENABLED.defaultVal.get)
EventBus.post(SparkOperationEvent(this))
override protected def setState(newState: OperationState): Unit = {
super.setState(newState)
EventBus.post(SparkOperationEvent(this, operationListener.flatMap(_.getExecutionId)))
}
protected def setSparkLocalProperty: (String, String) => Unit =
spark.sparkContext.setLocalProperty

View File

@ -155,7 +155,7 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") {
'aggregated-sqlstat')">
<h4>
<span class="collapse-table-arrow arrow-open"></span>
<a>SQL Statistics ({numStatement})</a>
<a>Statement Statistics ({numStatement})</a>
</h4>
</span> ++
<div class="aggregated-sqlstat collapsible-table">
@ -340,6 +340,7 @@ private class StatementStatsPagedTable(
val sqlTableHeadersAndTooltips: Seq[(String, Boolean, Option[String])] =
Seq(
("User", true, None),
("Session ID", true, None),
("Statement ID", true, None),
("Create Time", true, None),
("Finish Time", true, None),
@ -360,10 +361,17 @@ private class StatementStatsPagedTable(
}
override def row(event: SparkOperationEvent): Seq[Node] = {
val sessionLink = "%s/%s/session/?id=%s".format(
UIUtils.prependBaseUri(request, parent.basePath),
parent.prefix,
event.sessionId)
<tr>
<td>
{event.sessionUser}
</td>
<td>
<a href={sessionLink}>{event.sessionId}</a>
</td>
<td>
{event.statementId}
</td>
@ -475,6 +483,7 @@ private class StatementStatsTableDataSource(
private def ordering(sortColumn: String, desc: Boolean): Ordering[SparkOperationEvent] = {
val ordering: Ordering[SparkOperationEvent] = sortColumn match {
case "User" => Ordering.by(_.sessionUser)
case "Session ID" => Ordering.by(_.sessionId)
case "Statement ID" => Ordering.by(_.statementId)
case "Create Time" => Ordering.by(_.createTime)
case "Finish Time" => Ordering.by(_.completeTime)

View File

@ -153,7 +153,7 @@ case class EngineSessionPage(parent: EngineTab)
'aggregated-sqlsessionstat')">
<h4>
<span class="collapse-table-arrow arrow-open"></span>
<a>SQL Statistics</a>
<a>Statement Statistics</a>
</h4>
</span> ++
<div class="aggregated-sqlsessionstat collapsible-table">

View File

@ -132,7 +132,7 @@ class EngineTabSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
val resp = EntityUtils.toString(response.getEntity)
// check session section
assert(resp.contains("SQL Statistics"))
assert(resp.contains("Statement Statistics"))
// check sql stats table id
assert(resp.contains("sqlstat"))