[KYUUBI #5184] [Improvement] Rename Kyuubi's StageInfo to SparkStageInfo to fix class mismatch

### _Why are the changes needed?_

- Fix class mismatch when trying to compilation on Scala 2.13, due to implicit class reference to `StageInfo`. The compilation fails by type mismatching, if the compiler classloader loads Spark's `org.apache.spark.schedulerStageInfo` ahead of Kyuubi's `org.apache.spark.kyuubi.StageInfo`.
- Change var integer to AtomicInteger for `numActiveTasks` and `numCompleteTasks`, preventing possible concurrent inconsistency

```
[ERROR] [Error] /Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:56: type mismatch;
 found   : java.util.concurrent.ConcurrentHashMap[org.apache.spark.kyuubi.StageAttempt,org.apache.spark.scheduler.StageInfo]
 required: java.util.concurrent.ConcurrentHashMap[org.apache.spark.kyuubi.StageAttempt,org.apache.spark.kyuubi.StageInfo]
[INFO] [Info] : java.util.concurrent.ConcurrentHashMap[org.apache.spark.kyuubi.StageAttempt,org.apache.spark.scheduler.StageInfo] <: java.util.concurrent.ConcurrentHashMap[org.apache.spark.kyuubi.StageAttempt,org.apache.spark.kyuubi.StageInfo]?
[INFO] [Info] : false
[ERROR] [Error] /Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:126: not enough arguments for constructor StageInfo: (stageId: Int, attemptId: Int, name: String, numTasks: Int, rddInfos: Seq[org.apache.spark.storage.RDDInfo], parentIds: Seq[Int], details: String, taskMetrics: org.apache.spark.executor.TaskMetrics, taskLocalityPreferences: Seq[Seq[org.apache.spark.scheduler.TaskLocation]], shuffleDepId: Option[Int], resourceProfileId: Int, isPushBasedShuffleEnabled: Boolean, shuffleMergerCount: Int): org.apache.spark.scheduler.StageInfo.
Unspecified value parameters name, numTasks, rddInfos...
[ERROR] [Error] /Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:148: value numActiveTasks is not a member of org.apache.spark.scheduler.StageInfo
[ERROR] [Error] /Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:156: value numActiveTasks is not a member of org.apache.spark.scheduler.StageInfo
[ERROR] [Error] /Users/bw/dev/kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:158: value numCompleteTasks is not a member of org.apache.spark.scheduler.StageInfo
```

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

No.

Closes #5184 from bowenliang123/spark-stage-info.

Closes #5184

fd0b9b564 [liangbowen] update
d410491f3 [liangbowen] rename Kyuubi's StageInfo to SparkStageInfo preventing class mismatch

Authored-by: liangbowen <liangbowen@gf.com.cn>
Signed-off-by: liangbowen <liangbowen@gf.com.cn>
This commit is contained in:
liangbowen 2023-08-21 19:59:43 +08:00
parent 7979aafe54
commit 00ddcd39bb
3 changed files with 18 additions and 16 deletions

View File

@ -45,7 +45,7 @@ class SQLOperationListener(
private val operationId: String = operation.getHandle.identifier.toString
private lazy val activeJobs = new java.util.HashSet[Int]()
private lazy val activeStages = new ConcurrentHashMap[StageAttempt, StageInfo]()
private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt, SparkStageInfo]()
private var executionId: Option[Long] = None
private val conf: KyuubiConf = operation.getSession.sessionManager.getConf
@ -120,10 +120,10 @@ class SQLOperationListener(
val stageInfo = stageSubmitted.stageInfo
val stageId = stageInfo.stageId
val attemptNumber = stageInfo.attemptNumber()
val stageAttempt = StageAttempt(stageId, attemptNumber)
val stageAttempt = SparkStageAttempt(stageId, attemptNumber)
activeStages.put(
stageAttempt,
new StageInfo(stageId, stageInfo.numTasks))
new SparkStageInfo(stageId, stageInfo.numTasks))
withOperationLog {
info(s"Query [$operationId]: Stage $stageId.$attemptNumber started " +
s"with ${stageInfo.numTasks} tasks, ${activeStages.size()} active stages running")
@ -134,7 +134,7 @@ class SQLOperationListener(
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
val stageInfo = stageCompleted.stageInfo
val stageAttempt = StageAttempt(stageInfo.stageId, stageInfo.attemptNumber())
val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber())
activeStages.synchronized {
if (activeStages.remove(stageAttempt) != null) {
withOperationLog(super.onStageCompleted(stageCompleted))
@ -143,19 +143,19 @@ class SQLOperationListener(
}
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = activeStages.synchronized {
val stageAttempt = StageAttempt(taskStart.stageId, taskStart.stageAttemptId)
val stageAttempt = SparkStageAttempt(taskStart.stageId, taskStart.stageAttemptId)
if (activeStages.containsKey(stageAttempt)) {
activeStages.get(stageAttempt).numActiveTasks += 1
activeStages.get(stageAttempt).numActiveTasks.getAndIncrement()
super.onTaskStart(taskStart)
}
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = activeStages.synchronized {
val stageAttempt = StageAttempt(taskEnd.stageId, taskEnd.stageAttemptId)
val stageAttempt = SparkStageAttempt(taskEnd.stageId, taskEnd.stageAttemptId)
if (activeStages.containsKey(stageAttempt)) {
activeStages.get(stageAttempt).numActiveTasks -= 1
activeStages.get(stageAttempt).numActiveTasks.getAndDecrement()
if (taskEnd.reason == org.apache.spark.Success) {
activeStages.get(stageAttempt).numCompleteTasks += 1
activeStages.get(stageAttempt).numCompleteTasks.getAndIncrement()
}
super.onTaskEnd(taskEnd)
}

View File

@ -29,7 +29,7 @@ import org.apache.kyuubi.operation.Operation
class SparkConsoleProgressBar(
operation: Operation,
liveStages: ConcurrentHashMap[StageAttempt, StageInfo],
liveStages: ConcurrentHashMap[SparkStageAttempt, SparkStageInfo],
updatePeriodMSec: Long,
timeFormat: String)
extends Logging {
@ -77,7 +77,7 @@ class SparkConsoleProgressBar(
* after your last output, keeps overwriting itself to hold in one line. The logging will follow
* the progress bar, then progress bar will be showed in next line without overwrite logs.
*/
private def show(now: Long, stages: Seq[StageInfo]): Unit = {
private def show(now: Long, stages: Seq[SparkStageInfo]): Unit = {
val width = TerminalWidth / stages.size
val bar = stages.map { s =>
val total = s.numTasks
@ -86,7 +86,7 @@ class SparkConsoleProgressBar(
val w = width - header.length - tailer.length
val bar =
if (w > 0) {
val percent = w * s.numCompleteTasks / total
val percent = w * s.numCompleteTasks.get / total
(0 until w).map { i =>
if (i < percent) "=" else if (i == percent) ">" else " "
}.mkString("")

View File

@ -17,11 +17,13 @@
package org.apache.spark.kyuubi
case class StageAttempt(stageId: Int, stageAttemptId: Int) {
import java.util.concurrent.atomic.AtomicInteger
case class SparkStageAttempt(stageId: Int, stageAttemptId: Int) {
override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)"
}
class StageInfo(val stageId: Int, val numTasks: Int) {
var numActiveTasks = 0
var numCompleteTasks = 0
class SparkStageInfo(val stageId: Int, val numTasks: Int) {
var numActiveTasks = new AtomicInteger(0)
var numCompleteTasks = new AtomicInteger(0)
}