[CELEBORN-1120] ShuffleClientImpl should close batchReviveRequestScheduler of ReviveManager

### What changes were proposed in this pull request?

`ShuffleClientImpl` closes `batchReviveRequestScheduler` of `ReviveManager`.

### Why are the changes needed?

After shuffle client is closed, `ReviveManager` still schedules invoker to `ShuffleClientImpl#reviveBatch`, which causes the `NullPointerException`. Therefore, `ShuffleClientImpl` should close `batchReviveRequestScheduler` of `ReviveManager` to avoid `NullPointerException`.

```
23/11/08 18:09:25,819 [batch-revive-scheduler] ERROR ShuffleClientImpl: Exception raised while reviving for shuffle 0 partitionIds 1988, epochs 0,.
java.lang.NullPointerException
	at org.apache.celeborn.client.ShuffleClientImpl.reviveBatch(ShuffleClientImpl.java:705)
	at org.apache.celeborn.client.ReviveManager.lambda$new$1(ReviveManager.java:94)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
23/11/08 18:09:25,844 [celeborn-retry-sender-6] ERROR ShuffleClientImpl: Push data to xx.xx.xx.xx:9092 failed for shuffle 0 map 216 attempt 0 partition 1988 batch 2623, remain revive times 4.
org.apache.celeborn.common.exception.CelebornIOException: PUSH_DATA_CONNECTION_EXCEPTION_PRIMARY then revive but REVIVE_FAILED, revive status 12(REVIVE_FAILED), old location: PartitionLocation[
  id-epoch:1988-0
  host-rpcPort-pushPort-fetchPort-replicatePort:xx.xx.xx.xx-9091-9092-9093-9094
  mode:PRIMARY
  peer:(empty)
  storage hint:StorageInfo{type=MEMORY, mountPoint='/tmp/storage', finalResult=false, filePath=}
  mapIdBitMap:null]
	at org.apache.celeborn.client.ShuffleClientImpl.submitRetryPushData(ShuffleClientImpl.java:261)
	at org.apache.celeborn.client.ShuffleClientImpl.access$600(ShuffleClientImpl.java:62)
	at org.apache.celeborn.client.ShuffleClientImpl$3.lambda$onFailure$1(ShuffleClientImpl.java:1045)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No.

Closes #2084 from SteNicholas/CELEBORN-1120.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
This commit is contained in:
SteNicholas 2023-11-10 11:44:47 +08:00 committed by Fu Chen
parent 02cea042a0
commit eb1be3fbf8
2 changed files with 25 additions and 19 deletions

View File

@ -22,6 +22,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -35,17 +37,16 @@ class ReviveManager {
private static final Logger logger = LoggerFactory.getLogger(ReviveManager.class);
LinkedBlockingQueue<ReviveRequest> requestQueue = new LinkedBlockingQueue<>();
private final long interval;
private final int batchSize;
ShuffleClientImpl shuffleClient;
private ScheduledExecutorService batchReviveRequestScheduler =
private final ScheduledExecutorService batchReviveRequestScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("batch-revive-scheduler");
public ReviveManager(ShuffleClientImpl shuffleClient, CelebornConf conf) {
this.shuffleClient = shuffleClient;
this.interval = conf.clientPushReviveInterval();
this.batchSize = conf.clientPushReviveBatchSize();
long interval = conf.clientPushReviveInterval();
batchReviveRequestScheduler.scheduleWithFixedDelay(
() -> {
Map<Integer, Set<ReviveRequest>> shuffleMap = new HashMap<>();
@ -124,4 +125,8 @@ class ReviveManager {
logger.error("Exception when put into requests!", e);
}
}
public void close() {
ThreadUtils.shutdown(batchReviveRequestScheduler, Duration.apply("800ms"));
}
}

View File

@ -73,8 +73,8 @@ public class ShuffleClientImpl extends ShuffleClient {
private final int registerShuffleMaxRetries;
private final long registerShuffleRetryWaitMs;
private int maxReviveTimes;
private boolean testRetryRevive;
private final int maxReviveTimes;
private final boolean testRetryRevive;
private final int pushBufferMaxSize;
protected final long pushDataTimeout;
@ -113,7 +113,7 @@ public class ShuffleClientImpl extends ShuffleClient {
protected final String appUniqueId;
private ThreadLocal<Compressor> compressorThreadLocal =
private final ThreadLocal<Compressor> compressorThreadLocal =
new ThreadLocal<Compressor>() {
@Override
protected Compressor initialValue() {
@ -601,13 +601,13 @@ public class ShuffleClientImpl extends ShuffleClient {
}
/**
* check if a newer PartitionLocation(with larger epoch) exists in local cache
* Check if a newer PartitionLocation(with larger epoch) exists in local cache.
*
* @param shuffleMap
* @param partitionId
* @param epoch
* @param wait whether to wait for some time for a newer PartitionLocation
* @return
* @param shuffleMap The mapping between shuffle id and partition location.
* @param partitionId The id of partition.
* @param epoch The epoch of revive.
* @param wait Whether to wait for some time for a newer partition location.
* @return whether newer partition location exists in local cache.
*/
boolean newerPartitionLocationExists(
Map<Integer, PartitionLocation> shuffleMap, int partitionId, int epoch, boolean wait) {
@ -675,12 +675,10 @@ public class ShuffleClientImpl extends ShuffleClient {
attemptId,
partitionId);
return true;
} else if (results == null
|| !results.containsKey(partitionId)
|| results.get(partitionId) != StatusCode.SUCCESS.getValue()) {
return false;
} else {
return true;
return results != null
&& results.containsKey(partitionId)
&& results.get(partitionId) == StatusCode.SUCCESS.getValue();
}
}
@ -1595,7 +1593,7 @@ public class ShuffleClientImpl extends ShuffleClient {
throws IOException {
ReduceFileGroups fileGroups = loadFileGroup(shuffleId, partitionId);
if (fileGroups.partitionGroups.size() == 0
if (fileGroups.partitionGroups.isEmpty()
|| !fileGroups.partitionGroups.containsKey(partitionId)) {
logger.warn("Shuffle data is empty for shuffle {} partition {}.", shuffleId, partitionId);
return CelebornInputStream.empty();
@ -1622,6 +1620,9 @@ public class ShuffleClientImpl extends ShuffleClient {
@Override
public void shutdown() {
if (null != reviveManager) {
reviveManager.close();
}
if (null != rpcEnv) {
rpcEnv.shutdown();
}
@ -1666,7 +1667,7 @@ public class ShuffleClientImpl extends ShuffleClient {
logger.debug("Push data failed cause message: {}", message);
StatusCode cause;
if (message == null) {
logger.error("Push data throw unexpected exception: {}", message);
logger.error("Push data throw unexpected exception");
cause = StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE;
} else if (message.startsWith(StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA.name())) {
cause = StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA;