[CELEBORN-2122] Avoiding multiple accesses to HDFS when retrieving in…

…dexes in DfsPartitionReader

### What changes were proposed in this pull request?

Avoiding multiple accesses to HDFS when retrieving indexes in DfsPartitionReader

### Why are the changes needed?

This optimization method improves read performance by reducing the number of interactions with HDFS, merging multiple small I/O operations into a single large I/O operation. Especially when index files are small, the strategy of reading the entire file at once can significantly reduce the number of I/O operations.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

CI

Closes #3443 from xy2953396112/CELEBORN-2122.

Authored-by: xxx <953396112@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
xxx 2025-08-27 14:16:59 +08:00 committed by mingji
parent d4e13b6ba2
commit f590fb275d

View File

@ -144,7 +144,7 @@ public class DfsPartitionReader implements PartitionReader {
} else {
dataFilePath = new Path(location.getStorageInfo().getFilePath());
dfsInputStream = hadoopFs.open(dataFilePath);
chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location));
chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(location));
}
this.startChunkIndex = startChunkIndex == -1 ? 0 : startChunkIndex;
this.endChunkIndex =
@ -181,15 +181,19 @@ public class DfsPartitionReader implements PartitionReader {
}
}
private List<Long> getChunkOffsetsFromUnsortedIndex(CelebornConf conf, PartitionLocation location)
private List<Long> getChunkOffsetsFromUnsortedIndex(PartitionLocation location)
throws IOException {
List<Long> offsets;
try (FSDataInputStream indexInputStream =
hadoopFs.open(new Path(Utils.getIndexFilePath(location.getStorageInfo().getFilePath())))) {
offsets = new ArrayList<>();
int offsetCount = indexInputStream.readInt();
for (int i = 0; i < offsetCount; i++) {
offsets.add(indexInputStream.readLong());
String indexPath = Utils.getIndexFilePath(location.getStorageInfo().getFilePath());
try (FSDataInputStream indexInputStream = hadoopFs.open(new Path(indexPath))) {
long indexSize = hadoopFs.getFileStatus(new Path(indexPath)).getLen();
byte[] indexBuffer = new byte[(int) indexSize];
indexInputStream.readFully(0L, indexBuffer);
ByteBuffer buffer = ByteBuffer.wrap(indexBuffer);
int offsetSize = buffer.getInt();
offsets = new ArrayList<>(offsetSize);
for (int i = 0; i < offsetSize; i++) {
offsets.add(buffer.getLong());
}
}
return offsets;