[CELEBORN-2047] Reuse FileChannel/FSDataInputStream in PartitionDataReader
Some checks failed
Celeborn Cpp Integration Test / celeborn_cpp_check_lint (push) Has been cancelled
Celeborn Cpp Integration Test / celeborn_cpp_unit_test (push) Has been cancelled
Celeborn Cpp Integration Test / celeborn_cpp_integration_test (push) Has been cancelled
Grafana Dashboard CI / lint (push) Has been cancelled
Integration Test / celeborn_integration_test (push) Has been cancelled
Celeborn CI / service (11) (push) Has been cancelled
Celeborn CI / service (17) (push) Has been cancelled
Celeborn CI / service (8) (push) Has been cancelled
Celeborn CI / spark2 (8, 2.4) (push) Has been cancelled
Celeborn CI / spark3 (11, org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn CI / spark3 (11, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.0) (push) Has been cancelled
Celeborn CI / spark3 (11, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.1) (push) Has been cancelled
Celeborn CI / spark3 (11, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.2) (push) Has been cancelled
Celeborn CI / spark3 (11, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.3) (push) Has been cancelled
Celeborn CI / spark3 (11, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.4) (push) Has been cancelled
Celeborn CI / spark3 (11, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn CI / spark3 (17, org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn CI / spark3 (17, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.3) (push) Has been cancelled
Celeborn CI / spark3 (17, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.4) (push) Has been cancelled
Celeborn CI / spark3 (17, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn CI / spark3 (8, org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn CI / spark3 (8, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.0) (push) Has been cancelled
Celeborn CI / spark3 (8, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.1) (push) Has been cancelled
Celeborn CI / spark3 (8, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.2) (push) Has been cancelled
Celeborn CI / spark3 (8, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.3) (push) Has been cancelled
Celeborn CI / spark3 (8, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.4) (push) Has been cancelled
Celeborn CI / spark3 (8, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn CI / spark4 (17, org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO, 4.0) (push) Has been cancelled
Celeborn CI / flink1 (1.16, 11) (push) Has been cancelled
Celeborn CI / flink1 (1.16, 8) (push) Has been cancelled
Celeborn CI / flink1 (1.17, 11) (push) Has been cancelled
Celeborn CI / flink1 (1.17, 8) (push) Has been cancelled
Celeborn CI / flink1 (1.18, 11) (push) Has been cancelled
Celeborn CI / flink1 (1.18, 8) (push) Has been cancelled
Celeborn CI / flink1 (1.19, 11) (push) Has been cancelled
Celeborn CI / flink1 (1.19, 8) (push) Has been cancelled
Celeborn CI / flink1 (1.20, 11) (push) Has been cancelled
Celeborn CI / flink1 (1.20, 8) (push) Has been cancelled
Celeborn CI / flink2 (2.0, 11) (push) Has been cancelled
Celeborn CI / flink2 (2.0, 17) (push) Has been cancelled
Celeborn CI / flink2 (2.1, 11) (push) Has been cancelled
Celeborn CI / flink2 (2.1, 17) (push) Has been cancelled
Celeborn CI / mr (11) (push) Has been cancelled
Celeborn CI / mr (8) (push) Has been cancelled
Celeborn SBT CI / service (11, 2.12.15) (push) Has been cancelled
Celeborn SBT CI / service (11, 2.13.5) (push) Has been cancelled
Celeborn SBT CI / service (17, 2.12.15) (push) Has been cancelled
Celeborn SBT CI / service (17, 2.13.5) (push) Has been cancelled
Celeborn SBT CI / service (8, 2.12.15) (push) Has been cancelled
Celeborn SBT CI / service (8, 2.13.5) (push) Has been cancelled
Celeborn SBT CI / spark2 (8, 2.11.12, 2.4) (push) Has been cancelled
Celeborn SBT CI / spark2 (8, 2.12.10, 2.4) (push) Has been cancelled
Celeborn SBT CI / spark3 (11, 2.12.10, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.0) (push) Has been cancelled
Celeborn SBT CI / spark3 (11, 2.12.10, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.1) (push) Has been cancelled
Celeborn SBT CI / spark3 (11, 2.12.15, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.2) (push) Has been cancelled
Celeborn SBT CI / spark3 (11, 2.12.15, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.3) (push) Has been cancelled
Celeborn SBT CI / spark3 (11, 2.12.17, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.4) (push) Has been cancelled
Celeborn SBT CI / spark3 (11, 2.12.18, 2.12, org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark3 (11, 2.12.18, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark3 (11, 2.13.5, 2.13, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.2) (push) Has been cancelled
Celeborn SBT CI / spark3 (11, 2.13.8, 2.13, org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark3 (11, 2.13.8, 2.13, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.3) (push) Has been cancelled
Celeborn SBT CI / spark3 (11, 2.13.8, 2.13, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.4) (push) Has been cancelled
Celeborn SBT CI / spark3 (11, 2.13.8, 2.13, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark3 (17, 2.12.15, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.3) (push) Has been cancelled
Celeborn SBT CI / spark3 (17, 2.12.17, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.4) (push) Has been cancelled
Celeborn SBT CI / spark3 (17, 2.12.18, 2.12, org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark3 (17, 2.12.18, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark3 (17, 2.13.8, 2.13, org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark3 (17, 2.13.8, 2.13, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.3) (push) Has been cancelled
Celeborn SBT CI / spark3 (17, 2.13.8, 2.13, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.4) (push) Has been cancelled
Celeborn SBT CI / spark3 (17, 2.13.8, 2.13, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.12.10, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.0) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.12.10, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.1) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.12.15, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.2) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.12.15, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.3) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.12.17, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.4) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.12.18, 2.12, org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.12.18, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.13.5, 2.13, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.2) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.13.8, 2.13, org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.13.8, 2.13, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.3) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.13.8, 2.13, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.4) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.13.8, 2.13, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark4 (17, 2.13.16, 2.13, org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO, 4.0) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.16, 11) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.16, 8) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.17, 11) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.17, 8) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.18, 11) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.18, 8) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.19, 11) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.19, 8) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.20, 11) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.20, 8) (push) Has been cancelled
Celeborn SBT CI / flink2 (2.0, 11) (push) Has been cancelled
Celeborn SBT CI / flink2 (2.0, 17) (push) Has been cancelled
Celeborn SBT CI / flink2 (2.1, 11) (push) Has been cancelled
Celeborn SBT CI / flink2 (2.1, 17) (push) Has been cancelled
Celeborn SBT CI / mr (11) (push) Has been cancelled
Celeborn SBT CI / mr (8) (push) Has been cancelled
Celeborn SBT CI / openapi-codegen-check (11) (push) Has been cancelled
Style check / License (push) Has been cancelled
Lint Check for Web / lint (push) Has been cancelled

### What changes were proposed in this pull request?

Reuse FileChannel/FSDataInputStream in PartitionDataReader to avoid  open too many files.

### Why are the changes needed?

In previous implementations, different PartitionDataReader reused the same FileChannel/FSDataInputStream,but in #3349
changed this logic, so we need to maintain logical consistency

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Manual test.

Closes #3445 from Alibaba-HZY/mirror-fix.

Authored-by: daowu.hzy <daowu.hzy@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
daowu.hzy 2025-09-03 16:14:56 +08:00 committed by mingji
parent 449cb5d588
commit 3efc60cc9f
4 changed files with 51 additions and 11 deletions

View File

@ -35,7 +35,12 @@ public class DfsPartitionDataReader extends PartitionDataReader {
private final FSDataInputStream indexInputStream;
public DfsPartitionDataReader(
DiskFileInfo fileInfo, ByteBuffer headerBuffer, ByteBuffer indexBuffer) throws IOException {
DiskFileInfo fileInfo,
FSDataInputStream dataInputStream,
FSDataInputStream indexInputStream,
ByteBuffer headerBuffer,
ByteBuffer indexBuffer)
throws IOException {
super(fileInfo, headerBuffer, indexBuffer);
FileSystem fileSystem =
StorageManager.hadoopFs()
@ -43,8 +48,8 @@ public class DfsPartitionDataReader extends PartitionDataReader {
fileInfo.isHdfs()
? StorageInfo.Type.HDFS
: fileInfo.isS3() ? StorageInfo.Type.S3 : StorageInfo.Type.OSS);
this.dataInputStream = fileSystem.open(fileInfo.getDfsPath());
this.indexInputStream = fileSystem.open(fileInfo.getDfsIndexPath());
this.dataInputStream = dataInputStream;
this.indexInputStream = indexInputStream;
this.dataFileSize = fileSystem.getFileStatus(fileInfo.getDfsPath()).getLen();
this.indexFileSize = fileSystem.getFileStatus(fileInfo.getDfsIndexPath()).getLen();
}

View File

@ -25,7 +25,6 @@ import io.netty.buffer.ByteBuf;
import org.apache.commons.io.IOUtils;
import org.apache.celeborn.common.meta.DiskFileInfo;
import org.apache.celeborn.common.util.FileChannelUtils;
import org.apache.celeborn.common.util.Utils;
public class LocalPartitionDataReader extends PartitionDataReader {
@ -34,10 +33,15 @@ public class LocalPartitionDataReader extends PartitionDataReader {
private final FileChannel indexFileChannel;
public LocalPartitionDataReader(
DiskFileInfo fileInfo, ByteBuffer headerBuffer, ByteBuffer indexBuffer) throws IOException {
DiskFileInfo fileInfo,
FileChannel dataFileChannel,
FileChannel indexFileChannel,
ByteBuffer headerBuffer,
ByteBuffer indexBuffer)
throws IOException {
super(fileInfo, headerBuffer, indexBuffer);
this.dataFileChanel = FileChannelUtils.openReadableFileChannel(fileInfo.getFilePath());
this.indexFileChannel = FileChannelUtils.openReadableFileChannel(fileInfo.getIndexPath());
this.dataFileChanel = dataFileChannel;
this.indexFileChannel = indexFileChannel;
this.dataFileSize = dataFileChanel.size();
this.indexFileSize = indexFileChannel.size();
}

View File

@ -18,6 +18,7 @@
package org.apache.celeborn.service.deploy.worker.storage;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
@ -27,11 +28,14 @@ import java.util.function.Consumer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.meta.DiskFileInfo;
import org.apache.celeborn.common.meta.MapFileMeta;
import org.apache.celeborn.common.util.FileChannelUtils;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.service.deploy.worker.memory.BufferQueue;
@ -46,6 +50,10 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis
protected final ExecutorService readExecutor;
protected final ConcurrentHashMap<Long, MapPartitionDataReader> readers =
JavaUtils.newConcurrentHashMap();
private FileChannel dataFileChanel;
private FileChannel indexChannel;
private FSDataInputStream hdfsDataInputStream;
private FSDataInputStream hdfsIndexInputStream;
private volatile boolean isReleased = false;
private final BufferQueue bufferQueue = new BufferQueue();
private AtomicBoolean bufferQueueInitialized = new AtomicBoolean(false);
@ -90,6 +98,20 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis
String.format("worker-map-partition-%s-reader", mapFileMeta.getMountPoint()),
false));
if (diskFileInfo.isDFS()) {
this.hdfsDataInputStream =
StorageManager.hadoopFs()
.get(diskFileInfo.getStorageType())
.open(new Path(diskFileInfo.getFilePath()));
this.hdfsIndexInputStream =
StorageManager.hadoopFs()
.get(diskFileInfo.getStorageType())
.open(new Path(diskFileInfo.getIndexPath()));
} else {
this.dataFileChanel = FileChannelUtils.openReadableFileChannel(diskFileInfo.getFilePath());
this.indexChannel = FileChannelUtils.openReadableFileChannel(diskFileInfo.getIndexPath());
}
MemoryManager.instance().addReadBufferTargetChangeListener(this);
}
@ -165,7 +187,7 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis
}
protected void openReader(MapPartitionDataReader reader) throws IOException {
reader.open();
reader.open(dataFileChanel, indexChannel, hdfsDataInputStream, hdfsIndexInputStream);
}
public synchronized void readBuffers() {

View File

@ -20,6 +20,7 @@ package org.apache.celeborn.service.deploy.worker.storage;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
@ -30,6 +31,7 @@ import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import org.apache.hadoop.fs.FSDataInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -126,12 +128,19 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader
this.readFinished = false;
}
public void open() throws IOException {
public void open(
FileChannel dataFileChannel,
FileChannel indexFileChannel,
FSDataInputStream dataInputStream,
FSDataInputStream indexInputStream)
throws IOException {
if (!isOpen) {
this.partitionDataReader =
fileInfo.isDFS()
? new DfsPartitionDataReader(fileInfo, headerBuffer, indexBuffer)
: new LocalPartitionDataReader(fileInfo, headerBuffer, indexBuffer);
? new DfsPartitionDataReader(
fileInfo, dataInputStream, indexInputStream, headerBuffer, indexBuffer)
: new LocalPartitionDataReader(
fileInfo, dataFileChannel, indexFileChannel, headerBuffer, indexBuffer);
// index is (offset,length)
long indexRegionSize = mapFileMeta.getNumSubpartitions() * (long) INDEX_ENTRY_SIZE;
this.numRegions =