diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala index 78a016d54..e48ff6e5b 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala @@ -53,6 +53,7 @@ class ExecutePython( private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle) override def getOperationLog: Option[OperationLog] = Option(operationLog) + override protected def supportProgress: Boolean = true override protected def resultSchema: StructType = { if (result == null || result.schema.isEmpty) { @@ -80,6 +81,7 @@ class ExecutePython( try { setState(OperationState.RUNNING) info(diagnostics) + addOperationListener() val response = worker.runCode(statement) val status = response.map(_.content.status).getOrElse("UNKNOWN_STATUS") if (PythonResponse.OK_STATUS.equalsIgnoreCase(status)) { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala index c2df02728..0f63dcc06 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala @@ -56,6 +56,7 @@ class ExecuteScala( private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle) override def getOperationLog: Option[OperationLog] = Option(operationLog) + override protected def supportProgress: Boolean = true override protected def resultSchema: StructType = { if (result == null || result.schema.isEmpty) { @@ -80,6 +81,7 @@ class ExecuteScala( setState(OperationState.RUNNING) info(diagnostics) Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader) + addOperationListener() val legacyOutput = repl.getOutput if (legacyOutput.nonEmpty) { warn(s"Clearing legacy output from last interpreting:\n $legacyOutput") diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index f18916979..27b9dde4d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -21,19 +21,14 @@ import java.util.concurrent.RejectedExecutionException import scala.collection.JavaConverters._ -import org.apache.hive.service.rpc.thrift.TProgressUpdateResp -import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener} import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.kyuubi.SparkDatasetHelper import org.apache.spark.sql.types._ import org.apache.kyuubi.{KyuubiSQLException, Logging} -import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE} +import org.apache.kyuubi.config.KyuubiConf.OPERATION_RESULT_MAX_ROWS import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._ -import org.apache.kyuubi.engine.spark.events.SparkOperationEvent -import org.apache.kyuubi.events.EventBus -import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator, OperationState, OperationStatus} -import org.apache.kyuubi.operation.OperationState.OperationState +import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator, OperationState} import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session @@ -47,26 +42,7 @@ class ExecuteStatement( private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle) override def getOperationLog: Option[OperationLog] = Option(operationLog) - - private val operationSparkListenerEnabled = - spark.conf.getOption(OPERATION_SPARK_LISTENER_ENABLED.key) match { - case Some(s) => s.toBoolean - case _ => session.sessionManager.getConf.get(OPERATION_SPARK_LISTENER_ENABLED) - } - - private val operationListener: Option[SQLOperationListener] = - if (operationSparkListenerEnabled) { - Some(new SQLOperationListener(this, spark)) - } else { - None - } - - private val progressEnable = spark.conf.getOption(SESSION_PROGRESS_ENABLE.key) match { - case Some(s) => s.toBoolean - case _ => session.sessionManager.getConf.get(SESSION_PROGRESS_ENABLE) - } - - EventBus.post(SparkOperationEvent(this)) + override protected def supportProgress: Boolean = true override protected def resultSchema: StructType = { if (result == null || result.schema.isEmpty) { @@ -91,7 +67,7 @@ class ExecuteStatement( setState(OperationState.RUNNING) info(diagnostics) Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader) - operationListener.foreach(spark.sparkContext.addSparkListener(_)) + addOperationListener() result = spark.sql(statement) iter = @@ -167,31 +143,6 @@ class ExecuteStatement( } } - override def cleanup(targetState: OperationState): Unit = { - operationListener.foreach(_.cleanup()) - super.cleanup(targetState) - } - - override def setState(newState: OperationState): Unit = { - super.setState(newState) - EventBus.post( - SparkOperationEvent(this, operationListener.flatMap(_.getExecutionId))) - } - - override def getStatus: OperationStatus = { - if (progressEnable) { - val progressMonitor = new SparkProgressMonitor(spark, statementId) - setOperationJobProgress(new TProgressUpdateResp( - progressMonitor.headers, - progressMonitor.rows, - progressMonitor.progressedPercentage, - progressMonitor.executionStatus, - progressMonitor.footerSummary, - startTime)) - } - super.getStatus - } - def setCompiledStateIfNeeded(): Unit = synchronized { if (getStatus.state == OperationState.RUNNING) { val lastAccessCompiledTime = @@ -206,12 +157,10 @@ class ExecuteStatement( } else { 0L } - super.setState(OperationState.COMPILED) + setState(OperationState.COMPILED) if (lastAccessCompiledTime > 0L) { lastAccessTime = lastAccessCompiledTime } - EventBus.post( - SparkOperationEvent(this, operationListener.flatMap(_.getExecutionId))) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index 8ab898db5..428d7dcaf 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -20,20 +20,23 @@ package org.apache.kyuubi.engine.spark.operation import java.io.IOException import java.time.ZoneId -import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet} +import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressUpdateResp, TRowSet} +import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener} import org.apache.spark.kyuubi.SparkUtilsHelper.redact import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.types.StructType import org.apache.kyuubi.{KyuubiSQLException, Utils} import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.config.KyuubiConf.SESSION_USER_SIGN_ENABLED +import org.apache.kyuubi.config.KyuubiConf.{OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE, SESSION_USER_SIGN_ENABLED} import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_KEY, KYUUBI_SESSION_USER_SIGN, KYUUBI_STATEMENT_ID_KEY} import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SCHEDULER_POOL_KEY +import org.apache.kyuubi.engine.spark.events.SparkOperationEvent import org.apache.kyuubi.engine.spark.operation.SparkOperation.TIMEZONE_KEY import org.apache.kyuubi.engine.spark.schema.{RowSet, SchemaHelper} import org.apache.kyuubi.engine.spark.session.SparkSessionImpl -import org.apache.kyuubi.operation.{AbstractOperation, FetchIterator, OperationState} +import org.apache.kyuubi.events.EventBus +import org.apache.kyuubi.operation.{AbstractOperation, FetchIterator, OperationState, OperationStatus} import org.apache.kyuubi.operation.FetchOrientation._ import org.apache.kyuubi.operation.OperationState.OperationState import org.apache.kyuubi.operation.log.OperationLog @@ -59,7 +62,46 @@ abstract class SparkOperation(session: Session) override def redactedStatement: String = redact(spark.sessionState.conf.stringRedactionPattern, statement) + protected val operationSparkListenerEnabled = + spark.conf.getOption(OPERATION_SPARK_LISTENER_ENABLED.key) match { + case Some(s) => s.toBoolean + case _ => session.sessionManager.getConf.get(OPERATION_SPARK_LISTENER_ENABLED) + } + + protected val operationListener: Option[SQLOperationListener] = + if (operationSparkListenerEnabled) { + Some(new SQLOperationListener(this, spark)) + } else { + None + } + + protected def addOperationListener(): Unit = { + operationListener.foreach(spark.sparkContext.addSparkListener(_)) + } + + private val progressEnable = spark.conf.getOption(SESSION_PROGRESS_ENABLE.key) match { + case Some(s) => s.toBoolean + case _ => session.sessionManager.getConf.get(SESSION_PROGRESS_ENABLE) + } + + protected def supportProgress: Boolean = false + + override def getStatus: OperationStatus = { + if (progressEnable && supportProgress) { + val progressMonitor = new SparkProgressMonitor(spark, statementId) + setOperationJobProgress(new TProgressUpdateResp( + progressMonitor.headers, + progressMonitor.rows, + progressMonitor.progressedPercentage, + progressMonitor.executionStatus, + progressMonitor.footerSummary, + startTime)) + } + super.getStatus + } + override def cleanup(targetState: OperationState): Unit = state.synchronized { + operationListener.foreach(_.cleanup()) if (!isTerminalState(state)) { setState(targetState) Option(getBackgroundHandle).foreach(_.cancel(true)) @@ -78,6 +120,13 @@ abstract class SparkOperation(session: Session) s"spark.${SESSION_USER_SIGN_ENABLED.key}", SESSION_USER_SIGN_ENABLED.defaultVal.get) + EventBus.post(SparkOperationEvent(this)) + + override protected def setState(newState: OperationState): Unit = { + super.setState(newState) + EventBus.post(SparkOperationEvent(this, operationListener.flatMap(_.getExecutionId))) + } + protected def setSparkLocalProperty: (String, String) => Unit = spark.sparkContext.setLocalProperty diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala index dafdb0eb6..0aba0c7c5 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala @@ -155,7 +155,7 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") { 'aggregated-sqlstat')">