[CELEBORN-2088] Fix NPE if celeborn.client.spark.fetch.cleanFailedShuffle enabled

### What changes were proposed in this pull request?

Fix NPE if `celeborn.client.spark.fetch.cleanFailedShuffle` is true.

This PR also refine the code for `FailedShuffleCleaner`.
### Why are the changes needed?

`failedShuffleCleaner` is null in executor end.
```
25/07/29 17:58:40 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[CoarseGrainedExecutorBackend-stop-executor,5,main]
java.lang.NullPointerException: Cannot invoke "org.apache.celeborn.spark.FailedShuffleCleaner.reset()" because "this.failedShuffleCleaner" is null
	at org.apache.spark.shuffle.celeborn.SparkShuffleManager.stop(SparkShuffleManager.java:272) ~[celeborn-client-spark-3-shaded_2.12-0.6.0-rc3.jar:?]
```

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
UT.

Closes #3401 from turboFei/fix_npe_cleaner.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Wang, Fei 2025-07-31 21:15:51 -07:00
parent 604485779c
commit 20a629a432
2 changed files with 22 additions and 24 deletions

View File

@ -34,28 +34,10 @@ private[celeborn] class FailedShuffleCleaner(lifecycleManager: LifecycleManager)
private val shufflesToBeCleaned = new LinkedBlockingQueue[Int]()
private val cleanedShuffleIds = new mutable.HashSet[Int]
private lazy val cleanInterval =
lifecycleManager.conf.clientFetchCleanFailedShuffleIntervalMS
private val cleanInterval = lifecycleManager.conf.clientFetchCleanFailedShuffleIntervalMS
private var cleanerThreadPool: ScheduledExecutorService = _
// for test
def reset(): Unit = {
shufflesToBeCleaned.clear()
cleanedShuffleIds.clear()
if (cleanerThreadPool != null) {
cleanerThreadPool.shutdownNow()
cleanerThreadPool = null
}
}
def addShuffleIdToBeCleaned(appShuffleIdentifier: String): Unit = {
val Array(appShuffleId, _, _) = SparkCommonUtils.decodeAppShuffleIdentifier(
appShuffleIdentifier)
lifecycleManager.getShuffleIdMapping.get(appShuffleId.toInt).foreach {
case (_, (celebornShuffleId, _)) => shufflesToBeCleaned.put(celebornShuffleId)
}
}
def init(): Unit = {
private def init(): Unit = {
cleanerThreadPool = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"failedShuffleCleanerThreadPool")
cleanerThreadPool.scheduleWithFixedDelay(
@ -85,9 +67,24 @@ private[celeborn] class FailedShuffleCleaner(lifecycleManager: LifecycleManager)
init()
def addShuffleIdToBeCleaned(appShuffleIdentifier: String): Unit = {
val Array(appShuffleId, _, _) = SparkCommonUtils.decodeAppShuffleIdentifier(
appShuffleIdentifier)
lifecycleManager.getShuffleIdMapping.get(appShuffleId.toInt).foreach {
case (_, (celebornShuffleId, _)) => shufflesToBeCleaned.put(celebornShuffleId)
}
}
def removeCleanedShuffleId(celebornShuffleId: Int): Unit = {
cleanedShuffleIds.remove(celebornShuffleId)
}
private var cleanerThreadPool: ScheduledExecutorService = _
def stop(): Unit = {
shufflesToBeCleaned.clear()
cleanedShuffleIds.clear()
if (cleanerThreadPool != null) {
ThreadUtils.shutdown(cleanerThreadPool)
cleanerThreadPool = null
}
}
}

View File

@ -268,8 +268,9 @@ public class SparkShuffleManager implements ShuffleManager {
_sortShuffleManager.stop();
_sortShuffleManager = null;
}
if (celebornConf.clientFetchCleanFailedShuffle()) {
failedShuffleCleaner.reset();
if (failedShuffleCleaner != null) {
failedShuffleCleaner.stop();
failedShuffleCleaner = null;
}
}