[KYUUBI #3801] Correctly calculate active stages in SQLOperationListener

### _Code of Conduct_
- [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)

### _Search before asking_
- [x] I have searched in the [issues](https://github.com/apache/incubator-kyuubi/issues?q=is%3Aissue) and found no similar issues.

### _Describe the bug_
activeStages key is StageAttempt, and its value is StageInfo. The contains function defaults to 'containsValue'

### _Affects Version(s)_
master/1.7/1.6

### _Are you willing to submit PR?_
- [x] Yes. I can submit a PR independently to fix.
- [ ] Yes. I would be willing to submit a PR with guidance from the Kyuubi community to fix.
- [ ] No. I cannot submit a PR at this time.

Closes #3801 from lcy999/bug_1113.

Closes #3801

d6d96d30 [lcy999] active stage can't calculate the number of tasks correctly in sql operation listener

Authored-by: lcy999 <1501989483@qq.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
lcy999 2022-11-13 11:30:22 +00:00 committed by Cheng Pan
parent be82aca237
commit 979881d687

View File

@ -144,7 +144,7 @@ class SQLOperationListener(
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = activeStages.synchronized {
val stageAttempt = StageAttempt(taskStart.stageId, taskStart.stageAttemptId)
if (activeStages.contains(stageAttempt)) {
if (activeStages.containsKey(stageAttempt)) {
activeStages.get(stageAttempt).numActiveTasks += 1
super.onTaskStart(taskStart)
}
@ -152,7 +152,7 @@ class SQLOperationListener(
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = activeStages.synchronized {
val stageAttempt = StageAttempt(taskEnd.stageId, taskEnd.stageAttemptId)
if (activeStages.contains(stageAttempt)) {
if (activeStages.containsKey(stageAttempt)) {
activeStages.get(stageAttempt).numActiveTasks -= 1
if (taskEnd.reason == org.apache.spark.Success) {
activeStages.get(stageAttempt).numCompleteTasks += 1