diff --git a/client/src/main/java/org/apache/celeborn/client/ReviveManager.java b/client/src/main/java/org/apache/celeborn/client/ReviveManager.java index 47d87878d..3c52e6b9f 100644 --- a/client/src/main/java/org/apache/celeborn/client/ReviveManager.java +++ b/client/src/main/java/org/apache/celeborn/client/ReviveManager.java @@ -22,6 +22,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import scala.concurrent.duration.Duration; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,17 +37,16 @@ class ReviveManager { private static final Logger logger = LoggerFactory.getLogger(ReviveManager.class); LinkedBlockingQueue requestQueue = new LinkedBlockingQueue<>(); - private final long interval; private final int batchSize; ShuffleClientImpl shuffleClient; - private ScheduledExecutorService batchReviveRequestScheduler = + private final ScheduledExecutorService batchReviveRequestScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("batch-revive-scheduler"); public ReviveManager(ShuffleClientImpl shuffleClient, CelebornConf conf) { this.shuffleClient = shuffleClient; - this.interval = conf.clientPushReviveInterval(); this.batchSize = conf.clientPushReviveBatchSize(); + long interval = conf.clientPushReviveInterval(); batchReviveRequestScheduler.scheduleWithFixedDelay( () -> { Map> shuffleMap = new HashMap<>(); @@ -124,4 +125,8 @@ class ReviveManager { logger.error("Exception when put into requests!", e); } } + + public void close() { + ThreadUtils.shutdown(batchReviveRequestScheduler, Duration.apply("800ms")); + } } diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index 39d382d10..a300ddf5f 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -73,8 +73,8 @@ public class ShuffleClientImpl extends ShuffleClient { private final int registerShuffleMaxRetries; private final long registerShuffleRetryWaitMs; - private int maxReviveTimes; - private boolean testRetryRevive; + private final int maxReviveTimes; + private final boolean testRetryRevive; private final int pushBufferMaxSize; protected final long pushDataTimeout; @@ -113,7 +113,7 @@ public class ShuffleClientImpl extends ShuffleClient { protected final String appUniqueId; - private ThreadLocal compressorThreadLocal = + private final ThreadLocal compressorThreadLocal = new ThreadLocal() { @Override protected Compressor initialValue() { @@ -601,13 +601,13 @@ public class ShuffleClientImpl extends ShuffleClient { } /** - * check if a newer PartitionLocation(with larger epoch) exists in local cache + * Check if a newer PartitionLocation(with larger epoch) exists in local cache. * - * @param shuffleMap - * @param partitionId - * @param epoch - * @param wait whether to wait for some time for a newer PartitionLocation - * @return + * @param shuffleMap The mapping between shuffle id and partition location. + * @param partitionId The id of partition. + * @param epoch The epoch of revive. + * @param wait Whether to wait for some time for a newer partition location. + * @return whether newer partition location exists in local cache. */ boolean newerPartitionLocationExists( Map shuffleMap, int partitionId, int epoch, boolean wait) { @@ -675,12 +675,10 @@ public class ShuffleClientImpl extends ShuffleClient { attemptId, partitionId); return true; - } else if (results == null - || !results.containsKey(partitionId) - || results.get(partitionId) != StatusCode.SUCCESS.getValue()) { - return false; } else { - return true; + return results != null + && results.containsKey(partitionId) + && results.get(partitionId) == StatusCode.SUCCESS.getValue(); } } @@ -1595,7 +1593,7 @@ public class ShuffleClientImpl extends ShuffleClient { throws IOException { ReduceFileGroups fileGroups = loadFileGroup(shuffleId, partitionId); - if (fileGroups.partitionGroups.size() == 0 + if (fileGroups.partitionGroups.isEmpty() || !fileGroups.partitionGroups.containsKey(partitionId)) { logger.warn("Shuffle data is empty for shuffle {} partition {}.", shuffleId, partitionId); return CelebornInputStream.empty(); @@ -1622,6 +1620,9 @@ public class ShuffleClientImpl extends ShuffleClient { @Override public void shutdown() { + if (null != reviveManager) { + reviveManager.close(); + } if (null != rpcEnv) { rpcEnv.shutdown(); } @@ -1666,7 +1667,7 @@ public class ShuffleClientImpl extends ShuffleClient { logger.debug("Push data failed cause message: {}", message); StatusCode cause; if (message == null) { - logger.error("Push data throw unexpected exception: {}", message); + logger.error("Push data throw unexpected exception"); cause = StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE; } else if (message.startsWith(StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA.name())) { cause = StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA;