From a96df17bcdba2b9de3d4c6e520f8baf1f03c45ce Mon Sep 17 00:00:00 2001 From: fwang12 Date: Thu, 29 Dec 2022 11:13:45 +0800 Subject: [PATCH] [KYUUBI #4035] Post SparkOperationEvent for all SparkOperations and show sessionId for statements ### _Why are the changes needed?_ Before, we can only see sql query on the kyuubi query engine tab. image After this pr, we can see both scala & python code in the kyuubi query engine tab. image Also support to show sessionId. ![image](https://user-images.githubusercontent.com/6757692/209756690-b3eb5465-5d29-4983-9178-c86361eb9d13.png) It also close #2886 ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [x] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4035 from turboFei/scala_python_event. Closes #4035 33c3a4879 [fwang12] refactor 20dfa874e [fwang12] fix ut d68fce9a3 [fwang12] support progress 8a86e3235 [fwang12] refactor 568ef8624 [fwang12] sessionId 1d623a29a [fwang12] h link 613e829bb [fwang12] show session id 2f120f1d3 [fwang12] add operation listener 529d91c89 [fwang12] move to spark operation af21769a7 [fwang12] SQL => Statement 713c593ce [fwang12] scala python Authored-by: fwang12 Signed-off-by: fwang12 --- .../spark/operation/ExecutePython.scala | 2 + .../engine/spark/operation/ExecuteScala.scala | 2 + .../spark/operation/ExecuteStatement.scala | 61 ++----------------- .../spark/operation/SparkOperation.scala | 55 ++++++++++++++++- .../org/apache/spark/ui/EnginePage.scala | 11 +++- .../apache/spark/ui/EngineSessionPage.scala | 2 +- .../org/apache/spark/ui/EngineTabSuite.scala | 2 +- 7 files changed, 73 insertions(+), 62 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala index 78a016d54..e48ff6e5b 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala @@ -53,6 +53,7 @@ class ExecutePython( private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle) override def getOperationLog: Option[OperationLog] = Option(operationLog) + override protected def supportProgress: Boolean = true override protected def resultSchema: StructType = { if (result == null || result.schema.isEmpty) { @@ -80,6 +81,7 @@ class ExecutePython( try { setState(OperationState.RUNNING) info(diagnostics) + addOperationListener() val response = worker.runCode(statement) val status = response.map(_.content.status).getOrElse("UNKNOWN_STATUS") if (PythonResponse.OK_STATUS.equalsIgnoreCase(status)) { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala index c2df02728..0f63dcc06 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala @@ -56,6 +56,7 @@ class ExecuteScala( private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle) override def getOperationLog: Option[OperationLog] = Option(operationLog) + override protected def supportProgress: Boolean = true override protected def resultSchema: StructType = { if (result == null || result.schema.isEmpty) { @@ -80,6 +81,7 @@ class ExecuteScala( setState(OperationState.RUNNING) info(diagnostics) Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader) + addOperationListener() val legacyOutput = repl.getOutput if (legacyOutput.nonEmpty) { warn(s"Clearing legacy output from last interpreting:\n $legacyOutput") diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index f18916979..27b9dde4d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -21,19 +21,14 @@ import java.util.concurrent.RejectedExecutionException import scala.collection.JavaConverters._ -import org.apache.hive.service.rpc.thrift.TProgressUpdateResp -import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener} import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.kyuubi.SparkDatasetHelper import org.apache.spark.sql.types._ import org.apache.kyuubi.{KyuubiSQLException, Logging} -import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE} +import org.apache.kyuubi.config.KyuubiConf.OPERATION_RESULT_MAX_ROWS import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._ -import org.apache.kyuubi.engine.spark.events.SparkOperationEvent -import org.apache.kyuubi.events.EventBus -import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator, OperationState, OperationStatus} -import org.apache.kyuubi.operation.OperationState.OperationState +import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator, OperationState} import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session @@ -47,26 +42,7 @@ class ExecuteStatement( private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle) override def getOperationLog: Option[OperationLog] = Option(operationLog) - - private val operationSparkListenerEnabled = - spark.conf.getOption(OPERATION_SPARK_LISTENER_ENABLED.key) match { - case Some(s) => s.toBoolean - case _ => session.sessionManager.getConf.get(OPERATION_SPARK_LISTENER_ENABLED) - } - - private val operationListener: Option[SQLOperationListener] = - if (operationSparkListenerEnabled) { - Some(new SQLOperationListener(this, spark)) - } else { - None - } - - private val progressEnable = spark.conf.getOption(SESSION_PROGRESS_ENABLE.key) match { - case Some(s) => s.toBoolean - case _ => session.sessionManager.getConf.get(SESSION_PROGRESS_ENABLE) - } - - EventBus.post(SparkOperationEvent(this)) + override protected def supportProgress: Boolean = true override protected def resultSchema: StructType = { if (result == null || result.schema.isEmpty) { @@ -91,7 +67,7 @@ class ExecuteStatement( setState(OperationState.RUNNING) info(diagnostics) Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader) - operationListener.foreach(spark.sparkContext.addSparkListener(_)) + addOperationListener() result = spark.sql(statement) iter = @@ -167,31 +143,6 @@ class ExecuteStatement( } } - override def cleanup(targetState: OperationState): Unit = { - operationListener.foreach(_.cleanup()) - super.cleanup(targetState) - } - - override def setState(newState: OperationState): Unit = { - super.setState(newState) - EventBus.post( - SparkOperationEvent(this, operationListener.flatMap(_.getExecutionId))) - } - - override def getStatus: OperationStatus = { - if (progressEnable) { - val progressMonitor = new SparkProgressMonitor(spark, statementId) - setOperationJobProgress(new TProgressUpdateResp( - progressMonitor.headers, - progressMonitor.rows, - progressMonitor.progressedPercentage, - progressMonitor.executionStatus, - progressMonitor.footerSummary, - startTime)) - } - super.getStatus - } - def setCompiledStateIfNeeded(): Unit = synchronized { if (getStatus.state == OperationState.RUNNING) { val lastAccessCompiledTime = @@ -206,12 +157,10 @@ class ExecuteStatement( } else { 0L } - super.setState(OperationState.COMPILED) + setState(OperationState.COMPILED) if (lastAccessCompiledTime > 0L) { lastAccessTime = lastAccessCompiledTime } - EventBus.post( - SparkOperationEvent(this, operationListener.flatMap(_.getExecutionId))) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index 8ab898db5..428d7dcaf 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -20,20 +20,23 @@ package org.apache.kyuubi.engine.spark.operation import java.io.IOException import java.time.ZoneId -import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet} +import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressUpdateResp, TRowSet} +import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener} import org.apache.spark.kyuubi.SparkUtilsHelper.redact import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.types.StructType import org.apache.kyuubi.{KyuubiSQLException, Utils} import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.config.KyuubiConf.SESSION_USER_SIGN_ENABLED +import org.apache.kyuubi.config.KyuubiConf.{OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE, SESSION_USER_SIGN_ENABLED} import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_KEY, KYUUBI_SESSION_USER_SIGN, KYUUBI_STATEMENT_ID_KEY} import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SCHEDULER_POOL_KEY +import org.apache.kyuubi.engine.spark.events.SparkOperationEvent import org.apache.kyuubi.engine.spark.operation.SparkOperation.TIMEZONE_KEY import org.apache.kyuubi.engine.spark.schema.{RowSet, SchemaHelper} import org.apache.kyuubi.engine.spark.session.SparkSessionImpl -import org.apache.kyuubi.operation.{AbstractOperation, FetchIterator, OperationState} +import org.apache.kyuubi.events.EventBus +import org.apache.kyuubi.operation.{AbstractOperation, FetchIterator, OperationState, OperationStatus} import org.apache.kyuubi.operation.FetchOrientation._ import org.apache.kyuubi.operation.OperationState.OperationState import org.apache.kyuubi.operation.log.OperationLog @@ -59,7 +62,46 @@ abstract class SparkOperation(session: Session) override def redactedStatement: String = redact(spark.sessionState.conf.stringRedactionPattern, statement) + protected val operationSparkListenerEnabled = + spark.conf.getOption(OPERATION_SPARK_LISTENER_ENABLED.key) match { + case Some(s) => s.toBoolean + case _ => session.sessionManager.getConf.get(OPERATION_SPARK_LISTENER_ENABLED) + } + + protected val operationListener: Option[SQLOperationListener] = + if (operationSparkListenerEnabled) { + Some(new SQLOperationListener(this, spark)) + } else { + None + } + + protected def addOperationListener(): Unit = { + operationListener.foreach(spark.sparkContext.addSparkListener(_)) + } + + private val progressEnable = spark.conf.getOption(SESSION_PROGRESS_ENABLE.key) match { + case Some(s) => s.toBoolean + case _ => session.sessionManager.getConf.get(SESSION_PROGRESS_ENABLE) + } + + protected def supportProgress: Boolean = false + + override def getStatus: OperationStatus = { + if (progressEnable && supportProgress) { + val progressMonitor = new SparkProgressMonitor(spark, statementId) + setOperationJobProgress(new TProgressUpdateResp( + progressMonitor.headers, + progressMonitor.rows, + progressMonitor.progressedPercentage, + progressMonitor.executionStatus, + progressMonitor.footerSummary, + startTime)) + } + super.getStatus + } + override def cleanup(targetState: OperationState): Unit = state.synchronized { + operationListener.foreach(_.cleanup()) if (!isTerminalState(state)) { setState(targetState) Option(getBackgroundHandle).foreach(_.cancel(true)) @@ -78,6 +120,13 @@ abstract class SparkOperation(session: Session) s"spark.${SESSION_USER_SIGN_ENABLED.key}", SESSION_USER_SIGN_ENABLED.defaultVal.get) + EventBus.post(SparkOperationEvent(this)) + + override protected def setState(newState: OperationState): Unit = { + super.setState(newState) + EventBus.post(SparkOperationEvent(this, operationListener.flatMap(_.getExecutionId))) + } + protected def setSparkLocalProperty: (String, String) => Unit = spark.sparkContext.setLocalProperty diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala index dafdb0eb6..0aba0c7c5 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala @@ -155,7 +155,7 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") { 'aggregated-sqlstat')">

- SQL Statistics ({numStatement}) + Statement Statistics ({numStatement})

++
@@ -340,6 +340,7 @@ private class StatementStatsPagedTable( val sqlTableHeadersAndTooltips: Seq[(String, Boolean, Option[String])] = Seq( ("User", true, None), + ("Session ID", true, None), ("Statement ID", true, None), ("Create Time", true, None), ("Finish Time", true, None), @@ -360,10 +361,17 @@ private class StatementStatsPagedTable( } override def row(event: SparkOperationEvent): Seq[Node] = { + val sessionLink = "%s/%s/session/?id=%s".format( + UIUtils.prependBaseUri(request, parent.basePath), + parent.prefix, + event.sessionId) {event.sessionUser} + + {event.sessionId} + {event.statementId} @@ -475,6 +483,7 @@ private class StatementStatsTableDataSource( private def ordering(sortColumn: String, desc: Boolean): Ordering[SparkOperationEvent] = { val ordering: Ordering[SparkOperationEvent] = sortColumn match { case "User" => Ordering.by(_.sessionUser) + case "Session ID" => Ordering.by(_.sessionId) case "Statement ID" => Ordering.by(_.statementId) case "Create Time" => Ordering.by(_.createTime) case "Finish Time" => Ordering.by(_.completeTime) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala index 9fe8e5ac8..1f34ae64f 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala @@ -153,7 +153,7 @@ case class EngineSessionPage(parent: EngineTab) 'aggregated-sqlsessionstat')">

- SQL Statistics + Statement Statistics

++
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala index 3d27935de..260dbf87e 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala @@ -132,7 +132,7 @@ class EngineTabSuite extends WithSparkSQLEngine with HiveJDBCTestHelper { val resp = EntityUtils.toString(response.getEntity) // check session section - assert(resp.contains("SQL Statistics")) + assert(resp.contains("Statement Statistics")) // check sql stats table id assert(resp.contains("sqlstat"))