From 00ddcd39bb9062f79c26bb14dcb719fe31513a10 Mon Sep 17 00:00:00 2001 From: liangbowen Date: Mon, 21 Aug 2023 19:59:43 +0800 Subject: [PATCH] [KYUUBI #5184] [Improvement] Rename Kyuubi's StageInfo to SparkStageInfo to fix class mismatch ### _Why are the changes needed?_ - Fix class mismatch when trying to compilation on Scala 2.13, due to implicit class reference to `StageInfo`. The compilation fails by type mismatching, if the compiler classloader loads Spark's `org.apache.spark.schedulerStageInfo` ahead of Kyuubi's `org.apache.spark.kyuubi.StageInfo`. - Change var integer to AtomicInteger for `numActiveTasks` and `numCompleteTasks`, preventing possible concurrent inconsistency ``` [ERROR] [Error] /Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:56: type mismatch; found : java.util.concurrent.ConcurrentHashMap[org.apache.spark.kyuubi.StageAttempt,org.apache.spark.scheduler.StageInfo] required: java.util.concurrent.ConcurrentHashMap[org.apache.spark.kyuubi.StageAttempt,org.apache.spark.kyuubi.StageInfo] [INFO] [Info] : java.util.concurrent.ConcurrentHashMap[org.apache.spark.kyuubi.StageAttempt,org.apache.spark.scheduler.StageInfo] <: java.util.concurrent.ConcurrentHashMap[org.apache.spark.kyuubi.StageAttempt,org.apache.spark.kyuubi.StageInfo]? [INFO] [Info] : false [ERROR] [Error] /Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:126: not enough arguments for constructor StageInfo: (stageId: Int, attemptId: Int, name: String, numTasks: Int, rddInfos: Seq[org.apache.spark.storage.RDDInfo], parentIds: Seq[Int], details: String, taskMetrics: org.apache.spark.executor.TaskMetrics, taskLocalityPreferences: Seq[Seq[org.apache.spark.scheduler.TaskLocation]], shuffleDepId: Option[Int], resourceProfileId: Int, isPushBasedShuffleEnabled: Boolean, shuffleMergerCount: Int): org.apache.spark.scheduler.StageInfo. Unspecified value parameters name, numTasks, rddInfos... [ERROR] [Error] /Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:148: value numActiveTasks is not a member of org.apache.spark.scheduler.StageInfo [ERROR] [Error] /Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:156: value numActiveTasks is not a member of org.apache.spark.scheduler.StageInfo [ERROR] [Error] /Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:158: value numCompleteTasks is not a member of org.apache.spark.scheduler.StageInfo ``` ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No. Closes #5184 from bowenliang123/spark-stage-info. Closes #5184 fd0b9b564 [liangbowen] update d410491f3 [liangbowen] rename Kyuubi's StageInfo to SparkStageInfo preventing class mismatch Authored-by: liangbowen Signed-off-by: liangbowen --- .../spark/kyuubi/SQLOperationListener.scala | 18 +++++++++--------- .../spark/kyuubi/SparkConsoleProgressBar.scala | 6 +++--- .../org/apache/spark/kyuubi/StageStatus.scala | 10 ++++++---- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala index 1a57fcf29..4e4a940d2 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala @@ -45,7 +45,7 @@ class SQLOperationListener( private val operationId: String = operation.getHandle.identifier.toString private lazy val activeJobs = new java.util.HashSet[Int]() - private lazy val activeStages = new ConcurrentHashMap[StageAttempt, StageInfo]() + private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt, SparkStageInfo]() private var executionId: Option[Long] = None private val conf: KyuubiConf = operation.getSession.sessionManager.getConf @@ -120,10 +120,10 @@ class SQLOperationListener( val stageInfo = stageSubmitted.stageInfo val stageId = stageInfo.stageId val attemptNumber = stageInfo.attemptNumber() - val stageAttempt = StageAttempt(stageId, attemptNumber) + val stageAttempt = SparkStageAttempt(stageId, attemptNumber) activeStages.put( stageAttempt, - new StageInfo(stageId, stageInfo.numTasks)) + new SparkStageInfo(stageId, stageInfo.numTasks)) withOperationLog { info(s"Query [$operationId]: Stage $stageId.$attemptNumber started " + s"with ${stageInfo.numTasks} tasks, ${activeStages.size()} active stages running") @@ -134,7 +134,7 @@ class SQLOperationListener( override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { val stageInfo = stageCompleted.stageInfo - val stageAttempt = StageAttempt(stageInfo.stageId, stageInfo.attemptNumber()) + val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber()) activeStages.synchronized { if (activeStages.remove(stageAttempt) != null) { withOperationLog(super.onStageCompleted(stageCompleted)) @@ -143,19 +143,19 @@ class SQLOperationListener( } override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = activeStages.synchronized { - val stageAttempt = StageAttempt(taskStart.stageId, taskStart.stageAttemptId) + val stageAttempt = SparkStageAttempt(taskStart.stageId, taskStart.stageAttemptId) if (activeStages.containsKey(stageAttempt)) { - activeStages.get(stageAttempt).numActiveTasks += 1 + activeStages.get(stageAttempt).numActiveTasks.getAndIncrement() super.onTaskStart(taskStart) } } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = activeStages.synchronized { - val stageAttempt = StageAttempt(taskEnd.stageId, taskEnd.stageAttemptId) + val stageAttempt = SparkStageAttempt(taskEnd.stageId, taskEnd.stageAttemptId) if (activeStages.containsKey(stageAttempt)) { - activeStages.get(stageAttempt).numActiveTasks -= 1 + activeStages.get(stageAttempt).numActiveTasks.getAndDecrement() if (taskEnd.reason == org.apache.spark.Success) { - activeStages.get(stageAttempt).numCompleteTasks += 1 + activeStages.get(stageAttempt).numCompleteTasks.getAndIncrement() } super.onTaskEnd(taskEnd) } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index fc2ebd5f8..dc8b493cc 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -29,7 +29,7 @@ import org.apache.kyuubi.operation.Operation class SparkConsoleProgressBar( operation: Operation, - liveStages: ConcurrentHashMap[StageAttempt, StageInfo], + liveStages: ConcurrentHashMap[SparkStageAttempt, SparkStageInfo], updatePeriodMSec: Long, timeFormat: String) extends Logging { @@ -77,7 +77,7 @@ class SparkConsoleProgressBar( * after your last output, keeps overwriting itself to hold in one line. The logging will follow * the progress bar, then progress bar will be showed in next line without overwrite logs. */ - private def show(now: Long, stages: Seq[StageInfo]): Unit = { + private def show(now: Long, stages: Seq[SparkStageInfo]): Unit = { val width = TerminalWidth / stages.size val bar = stages.map { s => val total = s.numTasks @@ -86,7 +86,7 @@ class SparkConsoleProgressBar( val w = width - header.length - tailer.length val bar = if (w > 0) { - val percent = w * s.numCompleteTasks / total + val percent = w * s.numCompleteTasks.get / total (0 until w).map { i => if (i < percent) "=" else if (i == percent) ">" else " " }.mkString("") diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala index 144570862..2ea9c3fda 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala @@ -17,11 +17,13 @@ package org.apache.spark.kyuubi -case class StageAttempt(stageId: Int, stageAttemptId: Int) { +import java.util.concurrent.atomic.AtomicInteger + +case class SparkStageAttempt(stageId: Int, stageAttemptId: Int) { override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" } -class StageInfo(val stageId: Int, val numTasks: Int) { - var numActiveTasks = 0 - var numCompleteTasks = 0 +class SparkStageInfo(val stageId: Int, val numTasks: Int) { + var numActiveTasks = new AtomicInteger(0) + var numCompleteTasks = new AtomicInteger(0) }