From f590fb275d3b5a3630b0e734ada43417e0bf71a3 Mon Sep 17 00:00:00 2001 From: xxx <953396112@qq.com> Date: Wed, 27 Aug 2025 14:16:59 +0800 Subject: [PATCH] =?UTF-8?q?[CELEBORN-2122]=20Avoiding=20multiple=20accesse?= =?UTF-8?q?s=20to=20HDFS=20when=20retrieving=20in=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …dexes in DfsPartitionReader ### What changes were proposed in this pull request? Avoiding multiple accesses to HDFS when retrieving indexes in DfsPartitionReader ### Why are the changes needed? This optimization method improves read performance by reducing the number of interactions with HDFS, merging multiple small I/O operations into a single large I/O operation. Especially when index files are small, the strategy of reading the entire file at once can significantly reduce the number of I/O operations. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? CI Closes #3443 from xy2953396112/CELEBORN-2122. Authored-by: xxx <953396112@qq.com> Signed-off-by: mingji --- .../client/read/DfsPartitionReader.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java index e39fb7ce8..63136f00d 100644 --- a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java +++ b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java @@ -144,7 +144,7 @@ public class DfsPartitionReader implements PartitionReader { } else { dataFilePath = new Path(location.getStorageInfo().getFilePath()); dfsInputStream = hadoopFs.open(dataFilePath); - chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location)); + chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(location)); } this.startChunkIndex = startChunkIndex == -1 ? 0 : startChunkIndex; this.endChunkIndex = @@ -181,15 +181,19 @@ public class DfsPartitionReader implements PartitionReader { } } - private List getChunkOffsetsFromUnsortedIndex(CelebornConf conf, PartitionLocation location) + private List getChunkOffsetsFromUnsortedIndex(PartitionLocation location) throws IOException { List offsets; - try (FSDataInputStream indexInputStream = - hadoopFs.open(new Path(Utils.getIndexFilePath(location.getStorageInfo().getFilePath())))) { - offsets = new ArrayList<>(); - int offsetCount = indexInputStream.readInt(); - for (int i = 0; i < offsetCount; i++) { - offsets.add(indexInputStream.readLong()); + String indexPath = Utils.getIndexFilePath(location.getStorageInfo().getFilePath()); + try (FSDataInputStream indexInputStream = hadoopFs.open(new Path(indexPath))) { + long indexSize = hadoopFs.getFileStatus(new Path(indexPath)).getLen(); + byte[] indexBuffer = new byte[(int) indexSize]; + indexInputStream.readFully(0L, indexBuffer); + ByteBuffer buffer = ByteBuffer.wrap(indexBuffer); + int offsetSize = buffer.getInt(); + offsets = new ArrayList<>(offsetSize); + for (int i = 0; i < offsetSize; i++) { + offsets.add(buffer.getLong()); } } return offsets;