[CELEBORN-2029][FLINK] Some minor optimizations in the Flink integration
### What changes were proposed in this pull request? Some minor performance optimizations in the internal implementation ### Why are the changes needed? During our use of Flink with Celeborn, we identified several minor optimizations that can be made: 1. In the client side, the Flink-Celeborn client parses the `pushDataTimeout` configuration too frequently, which is unnecessary and cpu-intensive. 2. On the worker side, Celeborn needs to filter readers that are able for reading. However, using Java Stream's collection operations is costly in terms of performance. 3. Also on the worker side, Celeborn currently checks whether a reader can continue reading by comparing the current read offset with the total file size. This check involves retrieving the total file size, which is an expensive operation. Since this value is constant, it should be cached in memory instead of being fetched multiple times. 4. In the Flink’s hybrid shuffle integration, the `EndOfSegment` event should not be bundled with data buffers. If it is, there is a risk of data corruption or misinterpretation. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manual test. Closes #3318 from codenohup/CELEBORN-2029. Authored-by: codenohup <huangxu.walker@gmail.com> Signed-off-by: SteNicholas <programgeek@163.com>
This commit is contained in:
parent
da84baed4d
commit
feba7baec6
@ -238,7 +238,7 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl {
|
||||
partitionLocations,
|
||||
subPartitionIndexStart,
|
||||
subPartitionIndexEnd,
|
||||
conf.pushDataTimeoutMs());
|
||||
pushDataTimeout);
|
||||
}
|
||||
}
|
||||
|
||||
@ -448,7 +448,7 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl {
|
||||
.build()
|
||||
.toByteArray())
|
||||
.toByteBuffer(),
|
||||
conf.pushDataTimeoutMs());
|
||||
pushDataTimeout);
|
||||
} catch (IOException e) {
|
||||
// ioexeption revive
|
||||
return revive(shuffleId, mapId, attemptId, location);
|
||||
@ -499,7 +499,7 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl {
|
||||
.build()
|
||||
.toByteArray())
|
||||
.toByteBuffer(),
|
||||
conf.pushDataTimeoutMs());
|
||||
pushDataTimeout);
|
||||
} catch (IOException e) {
|
||||
// ioexeption revive
|
||||
return revive(shuffleId, mapId, attemptId, location);
|
||||
@ -580,7 +580,7 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl {
|
||||
.build()
|
||||
.toByteArray())
|
||||
.toByteBuffer(),
|
||||
conf.pushDataTimeoutMs());
|
||||
pushDataTimeout);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
@ -620,7 +620,7 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl {
|
||||
.build()
|
||||
.toByteArray())
|
||||
.toByteBuffer(),
|
||||
conf.pushDataTimeoutMs());
|
||||
pushDataTimeout);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
@ -441,6 +441,9 @@ public class CelebornTierProducerAgent implements TierProducerAgent {
|
||||
END_OF_SEGMENT,
|
||||
endSegmentMemorySegment.size());
|
||||
processBuffer(endOfSegmentBuffer, subPartitionId);
|
||||
// drain the bufferPacker to ensure that the EndOfSegment event is not bundled with the data
|
||||
// buffer
|
||||
bufferPacker.drain();
|
||||
} catch (Exception e) {
|
||||
ExceptionUtils.rethrow(e, "Failed to append end of segment event.");
|
||||
}
|
||||
|
||||
@ -441,6 +441,9 @@ public class CelebornTierProducerAgent implements TierProducerAgent {
|
||||
END_OF_SEGMENT,
|
||||
endSegmentMemorySegment.size());
|
||||
processBuffer(endOfSegmentBuffer, subPartitionId);
|
||||
// drain the bufferPacker to ensure that the EndOfSegment event is not bundled with the data
|
||||
// buffer
|
||||
bufferPacker.drain();
|
||||
} catch (Exception e) {
|
||||
ExceptionUtils.rethrow(e, "Failed to append end of segment event.");
|
||||
}
|
||||
|
||||
@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
@ -186,13 +185,17 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis
|
||||
}
|
||||
|
||||
try {
|
||||
PriorityQueue<MapPartitionDataReader> sortedReaders =
|
||||
new PriorityQueue<>(
|
||||
readers.values().stream()
|
||||
.filter(MapPartitionDataReader::shouldReadData)
|
||||
.collect(Collectors.toList()));
|
||||
for (MapPartitionDataReader reader : sortedReaders) {
|
||||
// Find all readers that can read data.
|
||||
// Avoid using Java Stream's collect() operation in this case, as the internal array used
|
||||
// by Stream.collect() may be resized and copied multiple times if the exact size of the
|
||||
// final result is not known in advance
|
||||
PriorityQueue<MapPartitionDataReader> sortedReaders = new PriorityQueue<>();
|
||||
for (MapPartitionDataReader reader : readers.values()) {
|
||||
if (!reader.shouldReadData()) {
|
||||
continue;
|
||||
}
|
||||
openReader(reader);
|
||||
sortedReaders.add(reader);
|
||||
}
|
||||
while (bufferQueue.bufferAvailable() && !sortedReaders.isEmpty()) {
|
||||
BufferRecycler bufferRecycler = new BufferRecycler(MapPartitionData.this::recycle);
|
||||
|
||||
@ -96,6 +96,10 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader
|
||||
protected boolean errorNotified;
|
||||
|
||||
private FileChannel dataFileChannel;
|
||||
|
||||
// The size of the data file, it is initialized in the open method and remains unchanged
|
||||
// afterward.
|
||||
private long dataFileChannelSize;
|
||||
private FileChannel indexFileChannel;
|
||||
|
||||
private Channel associatedChannel;
|
||||
@ -132,6 +136,7 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader
|
||||
throws IOException {
|
||||
if (!isOpen) {
|
||||
this.dataFileChannel = dataFileChannel;
|
||||
this.dataFileChannelSize = dataFileChannel.size();
|
||||
this.indexFileChannel = indexFileChannel;
|
||||
// index is (offset,length)
|
||||
long indexRegionSize = mapFileMeta.getNumSubpartitions() * (long) INDEX_ENTRY_SIZE;
|
||||
@ -340,13 +345,13 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader
|
||||
logger.debug(
|
||||
"readBuffer updateConsumingOffset, {}, {}, {}, {}",
|
||||
streamId,
|
||||
dataFileChannel.size(),
|
||||
dataFileChannelSize,
|
||||
dataConsumingOffset,
|
||||
currentPartitionRemainingBytes);
|
||||
|
||||
// if these checks fail, the partition file must be corrupted
|
||||
if (dataConsumingOffset < 0
|
||||
|| dataConsumingOffset + currentPartitionRemainingBytes > dataFileChannel.size()
|
||||
|| dataConsumingOffset + currentPartitionRemainingBytes > dataFileChannelSize
|
||||
|| currentPartitionRemainingBytes < 0) {
|
||||
throw new FileCorruptedException("File " + fileInfo.getFilePath() + " is corrupted");
|
||||
}
|
||||
@ -383,7 +388,7 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader
|
||||
logger.debug(
|
||||
"readBuffer end, {}, {}, {}, {}",
|
||||
streamId,
|
||||
dataFileChannel.size(),
|
||||
dataFileChannelSize,
|
||||
dataConsumingOffset,
|
||||
currentPartitionRemainingBytes);
|
||||
int prevDataRegion = currentDataRegion;
|
||||
@ -394,7 +399,7 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader
|
||||
logger.debug(
|
||||
"readBuffer run: {}, {}, {}, {}",
|
||||
streamId,
|
||||
dataFileChannel.size(),
|
||||
dataFileChannelSize,
|
||||
dataConsumingOffset,
|
||||
currentPartitionRemainingBytes);
|
||||
return true;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user