[KYUUBI #4186] Spark showProgress with JobInfo
### _Why are the changes needed?_ current version, when set `kyuubi.session.engine.spark.showProgress=true`, it will show stage's progress info,but the info only show stage's detail, now we need to add job info in this, just like ``` [Stage 1:> (0 + 1) / 2] ``` to ``` [Job 1 (0 / 1) Stages] [Stage 1:> (0 + 1) / 2] ``` **this update is useful when user want know their sql execute detail** closes #4186 ### _How was this patch tested?_ - [x] Add screenshots for manual tests if appropriate **The photo show match log**  - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No Closes #5410 from davidyuan1223/improvement_add_job_log. Closes #4186 d8d03c4c0 [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala a06e9a17c [david yuan] Update SparkConsoleProgressBar.scala 854408416 [david yuan] Merge branch 'apache:master' into improvement_add_job_log 963ff18b9 [david yuan] Update SparkConsoleProgressBar.scala 9e4635653 [david yuan] Update SparkConsoleProgressBar.scala 8c04dca7d [david yuan] Update SQLOperationListener.scala 39751bffa [davidyuan] fix 4f657e728 [davidyuan] fix deleted files 86756eba7 [david yuan] Merge branch 'apache:master' into improvement_add_job_log 0c9ac27b5 [davidyuan] add showProgress with jobInfo Unit Test d4434a0de [davidyuan] Revert "add showProgress with jobInfo Unit Test" 84b1aa005 [davidyuan] Revert "improvement_add_job_log fix" 66126f96e [davidyuan] Merge remote-tracking branch 'origin/improvement_add_job_log' into improvement_add_job_log 228fd9cf3 [davidyuan] add showProgress with jobInfo Unit Test 055e0ac96 [davidyuan] add showProgress with jobInfo Unit Test e4aac34bd [davidyuan] Merge remote-tracking branch 'origin/improvement_add_job_log' into improvement_add_job_log b226adad8 [davidyuan] Merge remote-tracking branch 'origin/improvement_add_job_log' into improvement_add_job_log a08799ca0 [david yuan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala a991b68c4 [david yuan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala d12046dac [davidyuan] add showProgress with jobInfo Unit Test 10a56b159 [davidyuan] add showProgress with jobInfo Unit Test a973cdde6 [davidyuan] improvement_add_job_log fix 1. provide Option[Int] with JobId e8a510891 [davidyuan] improvement_add_job_log fix 1. provide Option[Int] with JobId 7b9e874f2 [davidyuan] improvement_add_job_log fix 1. provide Option[Int] with JobId 5b4aaa8b5 [davidyuan] improvement_add_job_log fix 1. fix new end line 2. provide Option[Int] with JobId 780f9d15e [davidyuan] improvement_add_job_log fix 1. remove duplicate synchronized 2. because the activeJobs is ConcurrentHashMap, so reduce synchronized 3. fix scala code style 4. change forEach to asScala code style 5. change conf str to KyuubiConf.XXX.key 59340b713 [davidyuan] add showProgress with jobInfo Unit Test af05089d4 [davidyuan] add showProgress with jobInfo Unit Test c07535a01 [davidyuan] [Improvement] spark showProgress can briefly output info of the job #4186 d4bdec798 [yuanfuyuan] fix_4186 9fa8e73fc [davidyuan] add showProgress with jobInfo Unit Test 49debfbe3 [davidyuan] improvement_add_job_log fix 1. provide Option[Int] with JobId 5cf8714e0 [davidyuan] improvement_add_job_log fix 1. provide Option[Int] with JobId 249a422b6 [davidyuan] improvement_add_job_log fix 1. provide Option[Int] with JobId e15fc7195 [davidyuan] improvement_add_job_log fix 1. fix new end line 2. provide Option[Int] with JobId 4564ef98f [davidyuan] improvement_add_job_log fix 1. remove duplicate synchronized 2. because the activeJobs is ConcurrentHashMap, so reduce synchronized 3. fix scala code style 4. change forEach to asScala code style 5. change conf str to KyuubiConf.XXX.key 32ad0759b [davidyuan] add showProgress with jobInfo Unit Test d30492e46 [davidyuan] add showProgress with jobInfo Unit Test 6209c344e [davidyuan] [Improvement] spark showProgress can briefly output info of the job #4186 56b91a321 [yuanfuyuan] fix_4186 Lead-authored-by: davidyuan <yuanfuyuan@mafengwo.com> Co-authored-by: davidyuan <davidyuan1223@gmail.com> Co-authored-by: david yuan <51512358+davidyuan1223@users.noreply.github.com> Co-authored-by: yuanfuyuan <1406957364@qq.com> Co-authored-by: Cheng Pan <pan3793@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
3646ae08bd
commit
ed0d9976ae
@ -20,6 +20,8 @@ package org.apache.spark.kyuubi
|
||||
import java.util.Properties
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.spark.scheduler._
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd
|
||||
@ -44,7 +46,7 @@ class SQLOperationListener(
|
||||
spark: SparkSession) extends StatsReportListener with Logging {
|
||||
|
||||
private val operationId: String = operation.getHandle.identifier.toString
|
||||
private lazy val activeJobs = new java.util.HashSet[Int]()
|
||||
private lazy val activeJobs = new ConcurrentHashMap[Int, SparkJobInfo]()
|
||||
private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt, SparkStageInfo]()
|
||||
private var executionId: Option[Long] = None
|
||||
|
||||
@ -53,6 +55,7 @@ class SQLOperationListener(
|
||||
if (conf.get(ENGINE_SPARK_SHOW_PROGRESS)) {
|
||||
Some(new SparkConsoleProgressBar(
|
||||
operation,
|
||||
activeJobs,
|
||||
activeStages,
|
||||
conf.get(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL),
|
||||
conf.get(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT)))
|
||||
@ -79,9 +82,10 @@ class SQLOperationListener(
|
||||
}
|
||||
}
|
||||
|
||||
override def onJobStart(jobStart: SparkListenerJobStart): Unit = activeJobs.synchronized {
|
||||
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
|
||||
if (sameGroupId(jobStart.properties)) {
|
||||
val jobId = jobStart.jobId
|
||||
val stageIds = jobStart.stageInfos.map(_.stageId).toSet
|
||||
val stageSize = jobStart.stageInfos.size
|
||||
if (executionId.isEmpty) {
|
||||
executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY))
|
||||
@ -93,17 +97,19 @@ class SQLOperationListener(
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
activeJobs.put(
|
||||
jobId,
|
||||
new SparkJobInfo(stageSize, stageIds))
|
||||
withOperationLog {
|
||||
activeJobs.add(jobId)
|
||||
info(s"Query [$operationId]: Job $jobId started with $stageSize stages," +
|
||||
s" ${activeJobs.size()} active jobs running")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = activeJobs.synchronized {
|
||||
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
|
||||
val jobId = jobEnd.jobId
|
||||
if (activeJobs.remove(jobId)) {
|
||||
if (activeJobs.remove(jobId) != null) {
|
||||
val hint = jobEnd.jobResult match {
|
||||
case JobSucceeded => "succeeded"
|
||||
case _ => "failed" // TODO: Handle JobFailed(exception: Exception)
|
||||
@ -134,9 +140,18 @@ class SQLOperationListener(
|
||||
|
||||
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
|
||||
val stageInfo = stageCompleted.stageInfo
|
||||
val stageId = stageInfo.stageId
|
||||
val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber())
|
||||
activeStages.synchronized {
|
||||
if (activeStages.remove(stageAttempt) != null) {
|
||||
stageInfo.getStatusString match {
|
||||
case "succeeded" =>
|
||||
activeJobs.asScala.foreach { case (_, jobInfo) =>
|
||||
if (jobInfo.stageIds.contains(stageId)) {
|
||||
jobInfo.numCompleteStages.getAndIncrement()
|
||||
}
|
||||
}
|
||||
}
|
||||
withOperationLog(super.onStageCompleted(stageCompleted))
|
||||
}
|
||||
}
|
||||
|
||||
@ -29,6 +29,7 @@ import org.apache.kyuubi.operation.Operation
|
||||
|
||||
class SparkConsoleProgressBar(
|
||||
operation: Operation,
|
||||
liveJobs: ConcurrentHashMap[Int, SparkJobInfo],
|
||||
liveStages: ConcurrentHashMap[SparkStageAttempt, SparkStageInfo],
|
||||
updatePeriodMSec: Long,
|
||||
timeFormat: String)
|
||||
@ -72,6 +73,17 @@ class SparkConsoleProgressBar(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Use stageId to find stage's jobId
|
||||
* @param stageId
|
||||
* @return jobId (Optional)
|
||||
*/
|
||||
private def findJobId(stageId: Int): Option[Int] = {
|
||||
liveJobs.asScala.collectFirst {
|
||||
case (jobId, jobInfo) if jobInfo.stageIds.contains(stageId) => jobId
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Show progress bar in console. The progress bar is displayed in the next line
|
||||
* after your last output, keeps overwriting itself to hold in one line. The logging will follow
|
||||
@ -81,9 +93,13 @@ class SparkConsoleProgressBar(
|
||||
val width = TerminalWidth / stages.size
|
||||
val bar = stages.map { s =>
|
||||
val total = s.numTasks
|
||||
val header = s"[Stage ${s.stageId}:"
|
||||
val jobHeader = findJobId(s.stageId).map(jobId =>
|
||||
s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " +
|
||||
s"/ ${liveJobs.get(jobId).numStages}) Stages] ").getOrElse(
|
||||
"[There is no job about this stage] ")
|
||||
val header = jobHeader + s"[Stage ${s.stageId}:"
|
||||
val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]"
|
||||
val w = width - header.length - tailer.length
|
||||
val w = width + jobHeader.length - header.length - tailer.length
|
||||
val bar =
|
||||
if (w > 0) {
|
||||
val percent = w * s.numCompleteTasks.get / total
|
||||
|
||||
@ -24,6 +24,10 @@ case class SparkStageAttempt(stageId: Int, stageAttemptId: Int) {
|
||||
}
|
||||
|
||||
class SparkStageInfo(val stageId: Int, val numTasks: Int) {
|
||||
var numActiveTasks = new AtomicInteger(0)
|
||||
var numCompleteTasks = new AtomicInteger(0)
|
||||
val numActiveTasks = new AtomicInteger(0)
|
||||
val numCompleteTasks = new AtomicInteger(0)
|
||||
}
|
||||
|
||||
class SparkJobInfo(val numStages: Int, val stageIds: Set[Int]) {
|
||||
val numCompleteStages = new AtomicInteger(0)
|
||||
}
|
||||
|
||||
@ -22,13 +22,16 @@ import scala.collection.JavaConverters.asScalaBufferConverter
|
||||
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchOrientation, TFetchResultsReq, TOperationHandle}
|
||||
import org.scalatest.time.SpanSugar._
|
||||
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.KyuubiConf.OPERATION_SPARK_LISTENER_ENABLED
|
||||
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
|
||||
import org.apache.kyuubi.operation.HiveJDBCTestHelper
|
||||
|
||||
class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
|
||||
|
||||
override def withKyuubiConf: Map[String, String] = Map.empty
|
||||
override def withKyuubiConf: Map[String, String] = Map(
|
||||
KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS.key -> "true",
|
||||
KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL.key -> "200")
|
||||
|
||||
override protected def jdbcUrl: String = getJdbcUrl
|
||||
|
||||
@ -54,6 +57,24 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp
|
||||
}
|
||||
}
|
||||
|
||||
test("operation listener with progress job info") {
|
||||
val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);"
|
||||
withSessionHandle { (client, handle) =>
|
||||
val req = new TExecuteStatementReq()
|
||||
req.setSessionHandle(handle)
|
||||
req.setStatement(sql)
|
||||
val tExecuteStatementResp = client.ExecuteStatement(req)
|
||||
val opHandle = tExecuteStatementResp.getOperationHandle
|
||||
val fetchResultsReq = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_NEXT, 1000)
|
||||
fetchResultsReq.setFetchType(1.toShort)
|
||||
eventually(timeout(90.seconds), interval(500.milliseconds)) {
|
||||
val resultsResp = client.FetchResults(fetchResultsReq)
|
||||
val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala
|
||||
assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]")))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("SQLOperationListener configurable") {
|
||||
val sql = "select /*+ REPARTITION(3, a) */ a from values(1) t(a);"
|
||||
withSessionHandle { (client, handle) =>
|
||||
|
||||
Loading…
Reference in New Issue
Block a user