[CELEBORN-390][FLINK] Refine synchronization in FlinkShuffleClientImpl#updateFileGroup (#1320)
This commit is contained in:
parent
8e167c6488
commit
21bdfdb21b
@ -136,26 +136,31 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl {
|
||||
String applicationId, String shuffleKey, int shuffleId, int partitionId) throws IOException {
|
||||
ReduceFileGroups reduceFileGroups =
|
||||
reduceFileGroupsMap.computeIfAbsent(shuffleId, (id) -> new ReduceFileGroups());
|
||||
synchronized (reduceFileGroups) {
|
||||
if (reduceFileGroups.partitionIds != null
|
||||
&& reduceFileGroups.partitionIds.contains(partitionId)) {
|
||||
logger.debug(
|
||||
"use cached file groups for partition: {}",
|
||||
Utils.makeReducerKey(applicationId, shuffleId, partitionId));
|
||||
return reduceFileGroups;
|
||||
} else {
|
||||
// refresh file groups
|
||||
ReduceFileGroups newGroups = loadFileGroupInternal(applicationId, shuffleKey, shuffleId);
|
||||
if (newGroups == null || !newGroups.partitionIds.contains(partitionId)) {
|
||||
throw new IOException(
|
||||
"shuffle data lost for partition: "
|
||||
+ Utils.makeReducerKey(applicationId, shuffleId, partitionId));
|
||||
if (reduceFileGroups.partitionIds != null
|
||||
&& reduceFileGroups.partitionIds.contains(partitionId)) {
|
||||
logger.debug(
|
||||
"use cached file groups for partition: {}",
|
||||
Utils.makeReducerKey(applicationId, shuffleId, partitionId));
|
||||
} else {
|
||||
synchronized (reduceFileGroups) {
|
||||
if (reduceFileGroups.partitionIds != null
|
||||
&& reduceFileGroups.partitionIds.contains(partitionId)) {
|
||||
logger.debug(
|
||||
"use cached file groups for partition: {}",
|
||||
Utils.makeReducerKey(applicationId, shuffleId, partitionId));
|
||||
} else {
|
||||
// refresh file groups
|
||||
ReduceFileGroups newGroups = loadFileGroupInternal(applicationId, shuffleKey, shuffleId);
|
||||
if (newGroups == null || !newGroups.partitionIds.contains(partitionId)) {
|
||||
throw new IOException(
|
||||
"shuffle data lost for partition: "
|
||||
+ Utils.makeReducerKey(applicationId, shuffleId, partitionId));
|
||||
}
|
||||
reduceFileGroups.update(newGroups);
|
||||
}
|
||||
|
||||
reduceFileGroupsMap.put(shuffleId, newGroups);
|
||||
return newGroups;
|
||||
}
|
||||
}
|
||||
return reduceFileGroups;
|
||||
}
|
||||
|
||||
public ReadClientHandler getReadClientHandler() {
|
||||
|
||||
@ -111,9 +111,9 @@ public class ShuffleClientImpl extends ShuffleClient {
|
||||
};
|
||||
|
||||
protected static class ReduceFileGroups {
|
||||
public final Map<Integer, Set<PartitionLocation>> partitionGroups;
|
||||
public final int[] mapAttempts;
|
||||
public final Set<Integer> partitionIds;
|
||||
public Map<Integer, Set<PartitionLocation>> partitionGroups;
|
||||
public int[] mapAttempts;
|
||||
public Set<Integer> partitionIds;
|
||||
|
||||
ReduceFileGroups(
|
||||
Map<Integer, Set<PartitionLocation>> partitionGroups,
|
||||
@ -129,6 +129,12 @@ public class ShuffleClientImpl extends ShuffleClient {
|
||||
this.mapAttempts = null;
|
||||
this.partitionIds = null;
|
||||
}
|
||||
|
||||
public void update(ReduceFileGroups fileGroups) {
|
||||
partitionGroups = fileGroups.partitionGroups;
|
||||
mapAttempts = fileGroups.mapAttempts;
|
||||
partitionIds = fileGroups.partitionIds;
|
||||
}
|
||||
}
|
||||
|
||||
// key: shuffleId
|
||||
|
||||
Loading…
Reference in New Issue
Block a user