diff --git a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java index e39fb7ce8..63136f00d 100644 --- a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java +++ b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java @@ -144,7 +144,7 @@ public class DfsPartitionReader implements PartitionReader { } else { dataFilePath = new Path(location.getStorageInfo().getFilePath()); dfsInputStream = hadoopFs.open(dataFilePath); - chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location)); + chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(location)); } this.startChunkIndex = startChunkIndex == -1 ? 0 : startChunkIndex; this.endChunkIndex = @@ -181,15 +181,19 @@ public class DfsPartitionReader implements PartitionReader { } } - private List getChunkOffsetsFromUnsortedIndex(CelebornConf conf, PartitionLocation location) + private List getChunkOffsetsFromUnsortedIndex(PartitionLocation location) throws IOException { List offsets; - try (FSDataInputStream indexInputStream = - hadoopFs.open(new Path(Utils.getIndexFilePath(location.getStorageInfo().getFilePath())))) { - offsets = new ArrayList<>(); - int offsetCount = indexInputStream.readInt(); - for (int i = 0; i < offsetCount; i++) { - offsets.add(indexInputStream.readLong()); + String indexPath = Utils.getIndexFilePath(location.getStorageInfo().getFilePath()); + try (FSDataInputStream indexInputStream = hadoopFs.open(new Path(indexPath))) { + long indexSize = hadoopFs.getFileStatus(new Path(indexPath)).getLen(); + byte[] indexBuffer = new byte[(int) indexSize]; + indexInputStream.readFully(0L, indexBuffer); + ByteBuffer buffer = ByteBuffer.wrap(indexBuffer); + int offsetSize = buffer.getInt(); + offsets = new ArrayList<>(offsetSize); + for (int i = 0; i < offsetSize; i++) { + offsets.add(buffer.getLong()); } } return offsets;