From eb1be3fbf8078cc71e2208095931d9510bf00095 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Fri, 10 Nov 2023 11:44:47 +0800 Subject: [PATCH] [CELEBORN-1120] ShuffleClientImpl should close batchReviveRequestScheduler of ReviveManager ### What changes were proposed in this pull request? `ShuffleClientImpl` closes `batchReviveRequestScheduler` of `ReviveManager`. ### Why are the changes needed? After shuffle client is closed, `ReviveManager` still schedules invoker to `ShuffleClientImpl#reviveBatch`, which causes the `NullPointerException`. Therefore, `ShuffleClientImpl` should close `batchReviveRequestScheduler` of `ReviveManager` to avoid `NullPointerException`. ``` 23/11/08 18:09:25,819 [batch-revive-scheduler] ERROR ShuffleClientImpl: Exception raised while reviving for shuffle 0 partitionIds 1988, epochs 0,. java.lang.NullPointerException at org.apache.celeborn.client.ShuffleClientImpl.reviveBatch(ShuffleClientImpl.java:705) at org.apache.celeborn.client.ReviveManager.lambda$new$1(ReviveManager.java:94) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 23/11/08 18:09:25,844 [celeborn-retry-sender-6] ERROR ShuffleClientImpl: Push data to xx.xx.xx.xx:9092 failed for shuffle 0 map 216 attempt 0 partition 1988 batch 2623, remain revive times 4. org.apache.celeborn.common.exception.CelebornIOException: PUSH_DATA_CONNECTION_EXCEPTION_PRIMARY then revive but REVIVE_FAILED, revive status 12(REVIVE_FAILED), old location: PartitionLocation[ id-epoch:1988-0 host-rpcPort-pushPort-fetchPort-replicatePort:xx.xx.xx.xx-9091-9092-9093-9094 mode:PRIMARY peer:(empty) storage hint:StorageInfo{type=MEMORY, mountPoint='/tmp/storage', finalResult=false, filePath=} mapIdBitMap:null] at org.apache.celeborn.client.ShuffleClientImpl.submitRetryPushData(ShuffleClientImpl.java:261) at org.apache.celeborn.client.ShuffleClientImpl.access$600(ShuffleClientImpl.java:62) at org.apache.celeborn.client.ShuffleClientImpl$3.lambda$onFailure$1(ShuffleClientImpl.java:1045) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No. Closes #2084 from SteNicholas/CELEBORN-1120. Authored-by: SteNicholas Signed-off-by: Fu Chen --- .../apache/celeborn/client/ReviveManager.java | 11 +++++-- .../celeborn/client/ShuffleClientImpl.java | 33 ++++++++++--------- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/client/src/main/java/org/apache/celeborn/client/ReviveManager.java b/client/src/main/java/org/apache/celeborn/client/ReviveManager.java index 47d87878d..3c52e6b9f 100644 --- a/client/src/main/java/org/apache/celeborn/client/ReviveManager.java +++ b/client/src/main/java/org/apache/celeborn/client/ReviveManager.java @@ -22,6 +22,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import scala.concurrent.duration.Duration; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,17 +37,16 @@ class ReviveManager { private static final Logger logger = LoggerFactory.getLogger(ReviveManager.class); LinkedBlockingQueue requestQueue = new LinkedBlockingQueue<>(); - private final long interval; private final int batchSize; ShuffleClientImpl shuffleClient; - private ScheduledExecutorService batchReviveRequestScheduler = + private final ScheduledExecutorService batchReviveRequestScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("batch-revive-scheduler"); public ReviveManager(ShuffleClientImpl shuffleClient, CelebornConf conf) { this.shuffleClient = shuffleClient; - this.interval = conf.clientPushReviveInterval(); this.batchSize = conf.clientPushReviveBatchSize(); + long interval = conf.clientPushReviveInterval(); batchReviveRequestScheduler.scheduleWithFixedDelay( () -> { Map> shuffleMap = new HashMap<>(); @@ -124,4 +125,8 @@ class ReviveManager { logger.error("Exception when put into requests!", e); } } + + public void close() { + ThreadUtils.shutdown(batchReviveRequestScheduler, Duration.apply("800ms")); + } } diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index 39d382d10..a300ddf5f 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -73,8 +73,8 @@ public class ShuffleClientImpl extends ShuffleClient { private final int registerShuffleMaxRetries; private final long registerShuffleRetryWaitMs; - private int maxReviveTimes; - private boolean testRetryRevive; + private final int maxReviveTimes; + private final boolean testRetryRevive; private final int pushBufferMaxSize; protected final long pushDataTimeout; @@ -113,7 +113,7 @@ public class ShuffleClientImpl extends ShuffleClient { protected final String appUniqueId; - private ThreadLocal compressorThreadLocal = + private final ThreadLocal compressorThreadLocal = new ThreadLocal() { @Override protected Compressor initialValue() { @@ -601,13 +601,13 @@ public class ShuffleClientImpl extends ShuffleClient { } /** - * check if a newer PartitionLocation(with larger epoch) exists in local cache + * Check if a newer PartitionLocation(with larger epoch) exists in local cache. * - * @param shuffleMap - * @param partitionId - * @param epoch - * @param wait whether to wait for some time for a newer PartitionLocation - * @return + * @param shuffleMap The mapping between shuffle id and partition location. + * @param partitionId The id of partition. + * @param epoch The epoch of revive. + * @param wait Whether to wait for some time for a newer partition location. + * @return whether newer partition location exists in local cache. */ boolean newerPartitionLocationExists( Map shuffleMap, int partitionId, int epoch, boolean wait) { @@ -675,12 +675,10 @@ public class ShuffleClientImpl extends ShuffleClient { attemptId, partitionId); return true; - } else if (results == null - || !results.containsKey(partitionId) - || results.get(partitionId) != StatusCode.SUCCESS.getValue()) { - return false; } else { - return true; + return results != null + && results.containsKey(partitionId) + && results.get(partitionId) == StatusCode.SUCCESS.getValue(); } } @@ -1595,7 +1593,7 @@ public class ShuffleClientImpl extends ShuffleClient { throws IOException { ReduceFileGroups fileGroups = loadFileGroup(shuffleId, partitionId); - if (fileGroups.partitionGroups.size() == 0 + if (fileGroups.partitionGroups.isEmpty() || !fileGroups.partitionGroups.containsKey(partitionId)) { logger.warn("Shuffle data is empty for shuffle {} partition {}.", shuffleId, partitionId); return CelebornInputStream.empty(); @@ -1622,6 +1620,9 @@ public class ShuffleClientImpl extends ShuffleClient { @Override public void shutdown() { + if (null != reviveManager) { + reviveManager.close(); + } if (null != rpcEnv) { rpcEnv.shutdown(); } @@ -1666,7 +1667,7 @@ public class ShuffleClientImpl extends ShuffleClient { logger.debug("Push data failed cause message: {}", message); StatusCode cause; if (message == null) { - logger.error("Push data throw unexpected exception: {}", message); + logger.error("Push data throw unexpected exception"); cause = StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE; } else if (message.startsWith(StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA.name())) { cause = StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA;