[CELEBORN-1720][FOLLOWUP] Fix flakyTest - check if fetch failure task another attempt is running or successful

### What changes were proposed in this pull request?
Record the last reported shuffle fetch failure task id.

### Why are the changes needed?
Because the reported shuffle fetch failure task id might be cleaned up fast after recorded.

To prevent flaky test, it is better to record the last reported task id for testing.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?
GA for 3 times.

Closes #3301 from turboFei/app_id_debug.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Wang, Fei 2025-06-06 11:11:12 -07:00
parent 5e305c3a5a
commit 60fa6d0ee7
3 changed files with 11 additions and 3 deletions

View File

@ -292,6 +292,9 @@ public class SparkUtils {
reportedStageShuffleFetchFailureTaskIds.remove(stageId + "-" + stageAttemptId);
}
// For testing only.
protected static volatile Long lastReportedShuffleFetchFailureTaskId = null;
/**
* Only used to check for the shuffle fetch failure task whether another attempt is running or
* successful. If another attempt(excluding the reported shuffle fetch failure tasks in current
@ -315,6 +318,7 @@ public class SparkUtils {
reportedStageShuffleFetchFailureTaskIds.computeIfAbsent(
stageUniqId, k -> new HashSet<>());
reportedStageTaskIds.add(taskId);
lastReportedShuffleFetchFailureTaskId = taskId;
Tuple2<TaskInfo, List<TaskInfo>> taskAttempts = getTaskAttempts(taskSetManager, taskId);

View File

@ -407,6 +407,9 @@ public class SparkUtils {
reportedStageShuffleFetchFailureTaskIds.remove(stageId + "-" + stageAttemptId);
}
// For testing only.
protected static volatile Long lastReportedShuffleFetchFailureTaskId = null;
/**
* Only used to check for the shuffle fetch failure task whether another attempt is running or
* successful. If another attempt(excluding the reported shuffle fetch failure tasks in current
@ -430,6 +433,7 @@ public class SparkUtils {
reportedStageShuffleFetchFailureTaskIds.computeIfAbsent(
stageUniqId, k -> new HashSet<>());
reportedStageTaskIds.add(taskId);
lastReportedShuffleFetchFailureTaskId = taskId;
Tuple2<TaskInfo, List<TaskInfo>> taskAttempts = getTaskAttempts(taskSetManager, taskId);

View File

@ -63,6 +63,7 @@ class SparkUtilsSuite extends AnyFunSuite
val celebornConf = SparkUtils.fromSparkConf(sparkSession.sparkContext.getConf)
val hook = new ShuffleReaderGetHooks(celebornConf, workerDirs)
TestCelebornShuffleManager.registerReaderGetHook(hook)
SparkUtils.lastReportedShuffleFetchFailureTaskId = null
try {
val sc = sparkSession.sparkContext
@ -87,9 +88,8 @@ class SparkUtilsSuite extends AnyFunSuite
val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl]
eventually(timeout(30.seconds), interval(0.milliseconds)) {
assert(hook.executed.get() == true)
val reportedTaskId =
SparkUtils.reportedStageShuffleFetchFailureTaskIds.values().asScala.flatMap(
_.asScala).head
val reportedTaskId = SparkUtils.lastReportedShuffleFetchFailureTaskId
assert(reportedTaskId != null)
val taskSetManager = SparkUtils.getTaskSetManager(taskScheduler, reportedTaskId)
assert(taskSetManager != null)
assert(SparkUtils.getTaskAttempts(taskSetManager, reportedTaskId)._2.size() == 1)