[CELEBORN-2047] Support MapPartitionData on DFS

### What changes were proposed in this pull request?

Support `MapPartitionData` on DFS.

### Why are the changes needed?

`MapPartitionData` only supports on local, which does not support on DFS. It's recommended to support `MapPartitionData` on DFS for MapPartition mode to align the ability of ReducePartition mode.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`WordCountTestWithHDFS`.

Closes #3349 from SteNicholas/CELEBORN-2047.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
SteNicholas 2025-07-26 22:11:32 +08:00 committed by mingji
parent df0def6701
commit ae40222351
15 changed files with 452 additions and 140 deletions

View File

@ -164,4 +164,8 @@ public class DiskFileInfo extends FileInfo {
public boolean isDFS() {
return Utils.isS3Path(filePath) || Utils.isOssPath(filePath) || Utils.isHdfsPath(filePath);
}
public StorageInfo.Type getStorageType() {
return storageType;
}
}

View File

@ -108,4 +108,8 @@ public abstract class FileInfo {
}
public abstract String getFilePath();
public boolean isReduceFileMeta() {
return isReduceFileMeta;
}
}

View File

@ -40,6 +40,7 @@ import com.google.protobuf.{ByteString, GeneratedMessageV3}
import io.netty.channel.unix.Errors.NativeIoException
import org.apache.commons.lang3.SystemUtils
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.hadoop.fs.FSDataInputStream
import org.roaringbitmap.RoaringBitmap
import org.apache.celeborn.common.CelebornConf
@ -1175,12 +1176,11 @@ object Utils extends Logging {
}
@throws[IOException]
def checkFileIntegrity(fileChannel: FileChannel, length: Int): Unit = {
val remainingBytes = fileChannel.size - fileChannel.position
def checkFileIntegrity(remainingBytes: Long, length: Int, filePath: String): Unit = {
if (remainingBytes < length) {
logError(
s"File remaining bytes not not enough, remaining: ${remainingBytes}, wanted: ${length}.")
throw new RuntimeException(s"File is corrupted ${fileChannel}")
s"File remaining bytes not not enough, remaining: $remainingBytes, wanted: $length.")
throw new RuntimeException(s"File is corrupted $filePath")
}
}

View File

@ -268,6 +268,8 @@ object Dependencies {
ExclusionRule("org.apache.httpcomponents", "httpclient"),
ExclusionRule("org.slf4j", "slf4j-log4j12")
)
val hadoopAuth = "org.apache.hadoop" % "hadoop-auth" % hadoopVersion
val hadoopHdfs = "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion
val picocli = "info.picocli" % "picocli" % picocliVersion
@ -1284,7 +1286,11 @@ trait FlinkClientProjects {
"org.apache.flink" % "flink-runtime" % flinkVersion % "test",
flinkStreamingDependency,
flinkClientsDependency,
flinkRuntimeWebDependency
flinkRuntimeWebDependency,
Dependencies.hadoopCommon % "test",
Dependencies.hadoopAuth % "test",
Dependencies.hadoopHdfs % "test->test;compile->compile",
Dependencies.jerseyServer % "test",
) ++ commonUnitTestDependencies,
(Test / envVars) += ("FLINK_VERSION", flinkVersion)
)

View File

@ -91,5 +91,36 @@
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -18,6 +18,7 @@
package org.apache.celeborn.tests.flink
import java.io.File
import java.nio.file.Files
import scala.collection.JavaConverters._
@ -26,13 +27,17 @@ import org.apache.flink.configuration.{Configuration, ExecutionOptions}
import org.apache.flink.runtime.jobgraph.JobType
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode
import org.apache.flink.util.OperatingSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hdfs.MiniDFSCluster
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.CelebornConf.{AUTH_ENABLED, INTERNAL_PORT_ENABLED}
import org.apache.celeborn.common.CelebornConf.{ACTIVE_STORAGE_TYPES, AUTH_ENABLED, HDFS_DIR, INTERNAL_PORT_ENABLED, WORKER_STORAGE_CREATE_FILE_POLICY}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.protocol.FallbackPolicy
import org.apache.celeborn.rest.v1.model.PartitionLocationData.StorageEnum
import org.apache.celeborn.service.deploy.MiniClusterFeature
import org.apache.celeborn.service.deploy.worker.Worker
@ -132,3 +137,41 @@ class WordCountTestWithAuthentication extends WordCountTestBase {
override protected def getWorkerConf: Map[String, String] = authConfig
override protected def getClientConf: Map[String, String] = Map(AUTH_ENABLED.key -> "true")
}
class WordCountTestWithHDFS extends WordCountTestBase {
private var basePath: Path = _
private var hdfsCluster: MiniDFSCluster = _
override protected def getMasterConf: Map[String, String] = Map()
override protected def getWorkerConf: Map[String, String] = hdfsConfig
override protected def getClientConf: Map[String, String] = hdfsConfig
override def createWorker(map: Map[String, String]): Worker = {
super.createWorker(map, null)
}
override def beforeAll(): Unit = {
assume(!OperatingSystem.isWindows)
val hdConf = new org.apache.hadoop.conf.Configuration()
val tmpDir = Files.createTempDirectory("celeborn-")
tmpDir.toFile.deleteOnExit()
hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpDir.toString)
hdfsCluster = new MiniDFSCluster.Builder(hdConf).build
basePath = new Path(hdfsCluster.getFileSystem.getUri.toString + "/test")
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
if (hdfsCluster != null) {
hdfsCluster.getFileSystem.delete(basePath, true)
hdfsCluster.shutdown()
}
}
private def hdfsConfig = Map(
ACTIVE_STORAGE_TYPES.key -> StorageEnum.HDFS.getValue,
WORKER_STORAGE_CREATE_FILE_POLICY.key -> StorageEnum.HDFS.getValue,
HDFS_DIR.key -> basePath.toString)
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.service.deploy.worker.storage;
import java.io.IOException;
import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.celeborn.common.meta.DiskFileInfo;
import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.common.util.Utils;
public class DfsPartitionDataReader extends PartitionDataReader {
private final FSDataInputStream dataInputStream;
private final FSDataInputStream indexInputStream;
public DfsPartitionDataReader(
DiskFileInfo fileInfo, ByteBuffer headerBuffer, ByteBuffer indexBuffer) throws IOException {
super(fileInfo, headerBuffer, indexBuffer);
FileSystem fileSystem =
StorageManager.hadoopFs()
.get(
fileInfo.isHdfs()
? StorageInfo.Type.HDFS
: fileInfo.isS3() ? StorageInfo.Type.S3 : StorageInfo.Type.OSS);
this.dataInputStream = fileSystem.open(fileInfo.getDfsPath());
this.indexInputStream = fileSystem.open(fileInfo.getDfsIndexPath());
this.dataFileSize = fileSystem.getFileStatus(fileInfo.getDfsPath()).getLen();
this.indexFileSize = fileSystem.getFileStatus(fileInfo.getDfsIndexPath()).getLen();
}
@Override
public void readIndexBuffer(long targetPosition) throws IOException {
indexInputStream.seek(targetPosition);
readHeaderOrIndexBuffer(
indexInputStream,
indexBuffer,
indexFileSize,
indexBuffer.capacity(),
fileInfo.getIndexPath());
}
@Override
public void position(long targetPosition) throws IOException {
dataInputStream.seek(targetPosition);
}
@Override
public void readHeaderBuffer(int headerSize) throws IOException {
readHeaderOrIndexBuffer(
dataInputStream, headerBuffer, dataFileSize, headerSize, fileInfo.getFilePath());
}
@Override
public void readBufferIntoReadBuffer(ByteBuf buf, long fileSize, int length, String filePath)
throws IOException {
Utils.checkFileIntegrity(fileSize - dataInputStream.getPos(), length, filePath);
ByteBuffer tmpBuffer = ByteBuffer.allocate(length);
while (tmpBuffer.hasRemaining()) {
dataInputStream.read(tmpBuffer);
}
tmpBuffer.flip();
buf.writeBytes(tmpBuffer);
}
@Override
public long position() throws IOException {
return dataInputStream.getPos();
}
@Override
public void close() {
IOUtils.closeQuietly(dataInputStream);
IOUtils.closeQuietly(indexInputStream);
}
private void readHeaderOrIndexBuffer(
FSDataInputStream inputStream, ByteBuffer buffer, long fileSize, int length, String filePath)
throws IOException {
Utils.checkFileIntegrity(fileSize - inputStream.getPos(), length, filePath);
buffer.clear();
buffer.limit(length);
while (buffer.hasRemaining()) {
inputStream.read(buffer);
}
buffer.flip();
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.service.deploy.worker.storage;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import io.netty.buffer.ByteBuf;
import org.apache.commons.io.IOUtils;
import org.apache.celeborn.common.meta.DiskFileInfo;
import org.apache.celeborn.common.util.FileChannelUtils;
import org.apache.celeborn.common.util.Utils;
public class LocalPartitionDataReader extends PartitionDataReader {
private final FileChannel dataFileChanel;
private final FileChannel indexFileChannel;
public LocalPartitionDataReader(
DiskFileInfo fileInfo, ByteBuffer headerBuffer, ByteBuffer indexBuffer) throws IOException {
super(fileInfo, headerBuffer, indexBuffer);
this.dataFileChanel = FileChannelUtils.openReadableFileChannel(fileInfo.getFilePath());
this.indexFileChannel = FileChannelUtils.openReadableFileChannel(fileInfo.getIndexPath());
this.dataFileSize = dataFileChanel.size();
this.indexFileSize = indexFileChannel.size();
}
@Override
public void readIndexBuffer(long targetPosition) throws IOException {
indexFileChannel.position(targetPosition);
readHeaderOrIndexBuffer(
indexFileChannel,
indexBuffer,
indexFileSize,
indexBuffer.capacity(),
fileInfo.getIndexPath());
}
@Override
public void position(long targetPosition) throws IOException {
dataFileChanel.position(targetPosition);
}
@Override
public void readHeaderBuffer(int headerSize) throws IOException {
readHeaderOrIndexBuffer(
dataFileChanel, headerBuffer, dataFileSize, headerSize, fileInfo.getFilePath());
}
@Override
public void readBufferIntoReadBuffer(ByteBuf buf, long fileSize, int length, String filePath)
throws IOException {
Utils.checkFileIntegrity(fileSize - dataFileChanel.position(), length, filePath);
ByteBuffer tmpBuffer = ByteBuffer.allocate(length);
while (tmpBuffer.hasRemaining()) {
dataFileChanel.read(tmpBuffer);
}
tmpBuffer.flip();
buf.writeBytes(tmpBuffer);
}
@Override
public long position() throws IOException {
return dataFileChanel.position();
}
@Override
public void close() {
IOUtils.closeQuietly(dataFileChanel);
IOUtils.closeQuietly(indexFileChannel);
}
private void readHeaderOrIndexBuffer(
FileChannel channel, ByteBuffer buffer, long fileSize, int length, String filePath)
throws IOException {
Utils.checkFileIntegrity(fileSize - channel.position(), length, filePath);
buffer.clear();
buffer.limit(length);
while (buffer.hasRemaining()) {
channel.read(buffer);
}
buffer.flip();
}
}

View File

@ -18,7 +18,6 @@
package org.apache.celeborn.service.deploy.worker.storage;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
@ -28,13 +27,11 @@ import java.util.function.Consumer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.meta.DiskFileInfo;
import org.apache.celeborn.common.meta.MapFileMeta;
import org.apache.celeborn.common.util.FileChannelUtils;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.service.deploy.worker.memory.BufferQueue;
@ -49,9 +46,6 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis
protected final ExecutorService readExecutor;
protected final ConcurrentHashMap<Long, MapPartitionDataReader> readers =
JavaUtils.newConcurrentHashMap();
private FileChannel dataFileChanel;
private FileChannel indexChannel;
private long indexSize;
private volatile boolean isReleased = false;
private final BufferQueue bufferQueue = new BufferQueue();
private AtomicBoolean bufferQueueInitialized = new AtomicBoolean(false);
@ -95,9 +89,6 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis
threadsPerMountPoint,
String.format("worker-map-partition-%s-reader", mapFileMeta.getMountPoint()),
false));
this.dataFileChanel = FileChannelUtils.openReadableFileChannel(diskFileInfo.getFilePath());
this.indexChannel = FileChannelUtils.openReadableFileChannel(diskFileInfo.getIndexPath());
this.indexSize = indexChannel.size();
MemoryManager.instance().addReadBufferTargetChangeListener(this);
}
@ -174,7 +165,7 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis
}
protected void openReader(MapPartitionDataReader reader) throws IOException {
reader.open(dataFileChanel, indexChannel, indexSize);
reader.open();
}
public synchronized void readBuffers() {
@ -252,8 +243,8 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis
bufferQueue.release();
isReleased = true;
IOUtils.closeQuietly(dataFileChanel);
IOUtils.closeQuietly(indexChannel);
readers.values().forEach(MapPartitionDataReader::close);
readers.clear();
MemoryManager.instance().removeReadBufferTargetChangeListener(this);
}

View File

@ -20,7 +20,6 @@ package org.apache.celeborn.service.deploy.worker.storage;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
@ -95,13 +94,6 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader
@GuardedBy("lock")
protected boolean errorNotified;
private FileChannel dataFileChannel;
// The size of the data file, it is initialized in the open method and remains unchanged
// afterward.
private long dataFileChannelSize;
private FileChannel indexFileChannel;
private Channel associatedChannel;
private Runnable recycleStream;
@ -109,6 +101,8 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader
protected AtomicInteger numInUseBuffers = new AtomicInteger(0);
private boolean isOpen = false;
private PartitionDataReader partitionDataReader;
public MapPartitionDataReader(
int startPartitionIndex,
int endPartitionIndex,
@ -132,15 +126,16 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader
this.readFinished = false;
}
public void open(FileChannel dataFileChannel, FileChannel indexFileChannel, long indexSize)
throws IOException {
public void open() throws IOException {
if (!isOpen) {
this.dataFileChannel = dataFileChannel;
this.dataFileChannelSize = dataFileChannel.size();
this.indexFileChannel = indexFileChannel;
this.partitionDataReader =
fileInfo.isDFS()
? new DfsPartitionDataReader(fileInfo, headerBuffer, indexBuffer)
: new LocalPartitionDataReader(fileInfo, headerBuffer, indexBuffer);
// index is (offset,length)
long indexRegionSize = mapFileMeta.getNumSubpartitions() * (long) INDEX_ENTRY_SIZE;
this.numRegions = Utils.checkedDownCast(indexSize / indexRegionSize);
this.numRegions =
Utils.checkedDownCast(partitionDataReader.getIndexFileSize() / indexRegionSize);
updateConsumingOffset();
isOpen = true;
@ -285,44 +280,6 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader
return mapFileMeta.getNumSubpartitions() * (long) INDEX_ENTRY_SIZE;
}
protected void readHeaderOrIndexBuffer(FileChannel channel, ByteBuffer buffer, int length)
throws IOException {
Utils.checkFileIntegrity(channel, length);
buffer.clear();
buffer.limit(length);
while (buffer.hasRemaining()) {
channel.read(buffer);
}
buffer.flip();
}
protected void readBufferIntoReadBuffer(FileChannel channel, ByteBuf buf, int length)
throws IOException {
Utils.checkFileIntegrity(channel, length);
ByteBuffer tmpBuffer = ByteBuffer.allocate(length);
while (tmpBuffer.hasRemaining()) {
channel.read(tmpBuffer);
}
tmpBuffer.flip();
buf.writeBytes(tmpBuffer);
}
protected int readBuffer(
String filename, FileChannel channel, ByteBuffer header, ByteBuf buffer, int headerSize)
throws IOException {
readHeaderOrIndexBuffer(channel, header, headerSize);
// header is combined of mapId(4),attemptId(4),nextBatchId(4) and total Compressed Length(4)
// we need size here,so we read length directly
int bufferLength = header.getInt(12);
if (bufferLength <= 0 || bufferLength > buffer.capacity()) {
logger.error("Incorrect buffer header, buffer length: {}.", bufferLength);
throw new FileCorruptedException("File " + filename + " is corrupted");
}
buffer.writeBytes(header);
readBufferIntoReadBuffer(channel, buffer, bufferLength);
return bufferLength + headerSize;
}
protected void updateConsumingOffset() throws IOException {
while (currentPartitionRemainingBytes == 0
&& (currentDataRegion < numRegions - 1 || numRemainingPartitions > 0)) {
@ -331,10 +288,10 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader
numRemainingPartitions = endPartitionIndex - startPartitionIndex + 1;
// read the target index entry to the target index buffer
indexFileChannel.position(
long targetPosition =
currentDataRegion * getIndexRegionSize()
+ (long) startPartitionIndex * INDEX_ENTRY_SIZE);
readHeaderOrIndexBuffer(indexFileChannel, indexBuffer, indexBuffer.capacity());
+ (long) startPartitionIndex * INDEX_ENTRY_SIZE;
partitionDataReader.readIndexBuffer(targetPosition);
}
// get the data file offset and the data size
@ -345,13 +302,14 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader
logger.debug(
"readBuffer updateConsumingOffset, {}, {}, {}, {}",
streamId,
dataFileChannelSize,
partitionDataReader.getDataFileSize(),
dataConsumingOffset,
currentPartitionRemainingBytes);
// if these checks fail, the partition file must be corrupted
if (dataConsumingOffset < 0
|| dataConsumingOffset + currentPartitionRemainingBytes > dataFileChannelSize
|| dataConsumingOffset + currentPartitionRemainingBytes
> partitionDataReader.getDataFileSize()
|| currentPartitionRemainingBytes < 0) {
throw new FileCorruptedException("File " + fileInfo.getFilePath() + " is corrupted");
}
@ -360,17 +318,9 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader
private synchronized boolean readBuffer(ByteBuf buffer) throws IOException {
try {
dataFileChannel.position(dataConsumingOffset);
int readSize =
readBuffer(
fileInfo.getFilePath(),
dataFileChannel,
headerBuffer,
buffer,
headerBuffer.capacity());
int readSize = partitionDataReader.readBuffer(buffer, dataConsumingOffset);
currentPartitionRemainingBytes -= readSize;
dataConsumingOffset = dataFileChannel.position();
dataConsumingOffset = partitionDataReader.position();
logger.debug(
"readBuffer data: {}, {}, {}, {}, {}, {}",
@ -388,7 +338,7 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader
logger.debug(
"readBuffer end, {}, {}, {}, {}",
streamId,
dataFileChannelSize,
partitionDataReader.getDataFileSize(),
dataConsumingOffset,
currentPartitionRemainingBytes);
int prevDataRegion = currentDataRegion;
@ -399,7 +349,7 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader
logger.debug(
"readBuffer run: {}, {}, {}, {}",
streamId,
dataFileChannelSize,
partitionDataReader.getDataFileSize(),
dataConsumingOffset,
currentPartitionRemainingBytes);
return true;
@ -557,4 +507,8 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader
return !isReleased && !readFinished;
}
}
public void close() {
partitionDataReader.close();
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.service.deploy.worker.storage;
import java.io.IOException;
import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.exception.FileCorruptedException;
import org.apache.celeborn.common.meta.DiskFileInfo;
public abstract class PartitionDataReader {
private static final Logger LOG = LoggerFactory.getLogger(PartitionDataReader.class);
protected final DiskFileInfo fileInfo;
protected final ByteBuffer headerBuffer;
protected final ByteBuffer indexBuffer;
protected long dataFileSize;
protected long indexFileSize;
public PartitionDataReader(
DiskFileInfo fileInfo, ByteBuffer headerBuffer, ByteBuffer indexBuffer) {
this.fileInfo = fileInfo;
this.headerBuffer = headerBuffer;
this.indexBuffer = indexBuffer;
}
public abstract void readIndexBuffer(long targetPosition) throws IOException;
public abstract void position(long targetPosition) throws IOException;
public abstract void readHeaderBuffer(int headSize) throws IOException;
public abstract void readBufferIntoReadBuffer(
ByteBuf buf, long fileSize, int length, String filePath) throws IOException;
public abstract long position() throws IOException;
public abstract void close();
public int readBuffer(ByteBuf buffer, long dataConsumingOffset) throws IOException {
position(dataConsumingOffset);
int headerSize = headerBuffer.capacity();
readHeaderBuffer(headerSize);
// header is combined of mapId(4),attemptId(4),nextBatchId(4) and total Compressed Length(4)
// we need size here,so we read length directly
int bufferLength = headerBuffer.getInt(12);
if (bufferLength <= 0 || bufferLength > buffer.capacity()) {
LOG.error("Incorrect buffer header, buffer length: {}.", bufferLength);
throw new FileCorruptedException(
String.format("File %s is corrupted", fileInfo.getFilePath()));
}
buffer.writeBytes(headerBuffer);
readBufferIntoReadBuffer(buffer, dataFileSize, bufferLength, fileInfo.getFilePath());
return bufferLength + headerSize;
}
public long getDataFileSize() {
return dataFileSize;
}
public long getIndexFileSize() {
return indexFileSize;
}
}

View File

@ -29,8 +29,8 @@ import org.apache.hadoop.fs.FileSystem
import org.roaringbitmap.RoaringBitmap
import org.slf4j.{Logger, LoggerFactory}
import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MapFileMeta, MemoryFileInfo, ReduceFileMeta}
import org.apache.celeborn.common.protocol.{PbPushDataHandShake, PbRegionFinish, PbRegionStart, PbSegmentStart}
import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MapFileMeta, ReduceFileMeta}
import org.apache.celeborn.common.protocol.{PbPushDataHandShake, PbRegionFinish, PbRegionStart, PbSegmentStart, StorageInfo}
import org.apache.celeborn.common.unsafe.Platform
import org.apache.celeborn.common.util.FileChannelUtils
@ -94,7 +94,7 @@ trait PartitionMetaHandler {
class MapPartitionMetaHandler(
diskFileInfo: DiskFileInfo,
notifier: FlushNotifier) extends PartitionMetaHandler {
lazy val hadoopFs: FileSystem = StorageManager.hadoopFs.get()
lazy val hadoopFs: FileSystem = StorageManager.hadoopFs.get(diskFileInfo.getStorageType)
val logger: Logger = LoggerFactory.getLogger(classOf[MapPartitionMetaHandler])
val fileMeta: MapFileMeta = diskFileInfo.getFileMeta.asInstanceOf[MapFileMeta]
var numSubpartitions = 0
@ -286,28 +286,7 @@ class MapPartitionMetaHandler(
}
override def afterClose(): Unit = {
// TODO: force flush the index file channel in scenarios which the upstream task writes and
// downstream task reads simultaneously, such as flink hybrid shuffle
if (indexBuffer != null) {
logger.debug(s"flushIndex start:${diskFileInfo.getIndexPath}")
val startTime = System.currentTimeMillis
indexBuffer.flip
notifier.checkException()
try {
if (indexBuffer.hasRemaining) {
// map partition synchronously writes file index
if (indexChannel != null) while (indexBuffer.hasRemaining) indexChannel.write(indexBuffer)
else if (diskFileInfo.isDFS) {
val dfsStream = hadoopFs.append(diskFileInfo.getDfsIndexPath)
dfsStream.write(indexBuffer.array)
dfsStream.close()
}
}
indexBuffer.clear
} finally logger.debug(
s"flushIndex end:${diskFileInfo.getIndexPath}, " +
s"cost:${System.currentTimeMillis - startTime}")
}
flushIndex()
}
override def beforeWrite(bytes: ByteBuf): Unit = {

View File

@ -43,7 +43,7 @@ import org.apache.celeborn.common.metrics.source.{AbstractSource, ThreadPoolSour
import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType, StorageInfo}
import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, DiskUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, CollectionUtils, DiskUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
import org.apache.celeborn.service.deploy.worker._
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager.MemoryPressureListener
@ -819,7 +819,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
}
}
}
if (null != diskOperators) {
if (CollectionUtils.isNotEmpty(diskOperators)) {
if (exitKind != CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
cleanupExpiredShuffleKey(shuffleKeySet(), false)
}

View File

@ -494,7 +494,7 @@ class DfsTierWriter(
notifier: FlushNotifier,
flusher: Flusher,
source: AbstractSource,
hdfsFileInfo: DiskFileInfo,
dfsFileInfo: DiskFileInfo,
storageType: StorageInfo.Type,
partitionDataWriterContext: PartitionDataWriterContext,
storageManager: StorageManager)
@ -503,7 +503,7 @@ class DfsTierWriter(
metaHandler,
numPendingWrites,
notifier,
hdfsFileInfo,
dfsFileInfo,
source,
storageType,
partitionDataWriterContext.getPartitionLocation.getFileName,
@ -520,21 +520,21 @@ class DfsTierWriter(
var partNumber: Int = 1
this.flusherBufferSize =
if (hdfsFileInfo.isS3()) {
if (dfsFileInfo.isS3()) {
conf.workerS3FlusherBufferSize
} else if (hdfsFileInfo.isOSS()) {
} else if (dfsFileInfo.isOSS()) {
conf.workerOssFlusherBufferSize
} else {
conf.workerHdfsFlusherBufferSize
}
try {
hadoopFs.create(hdfsFileInfo.getDfsPath, true).close()
if (hdfsFileInfo.isS3) {
hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
if (dfsFileInfo.isS3) {
val uri = hadoopFs.getUri
val bucketName = uri.getHost
val index = hdfsFileInfo.getFilePath.indexOf(bucketName)
val key = hdfsFileInfo.getFilePath.substring(index + bucketName.length + 1)
val index = dfsFileInfo.getFilePath.indexOf(bucketName)
val key = dfsFileInfo.getFilePath.substring(index + bucketName.length + 1)
this.s3MultipartUploadHandler = TierWriterHelper.getS3MultipartUploadHandler(
hadoopFs,
@ -544,7 +544,7 @@ class DfsTierWriter(
conf.s3MultiplePartUploadBaseDelay,
conf.s3MultiplePartUploadMaxBackoff)
s3MultipartUploadHandler.startUpload()
} else if (hdfsFileInfo.isOSS) {
} else if (dfsFileInfo.isOSS) {
val configuration = hadoopFs.getConf
val ossEndpoint = configuration.get("fs.oss.endpoint")
val ossAccessKey = configuration.get("fs.oss.accessKeyId")
@ -552,8 +552,8 @@ class DfsTierWriter(
val uri = hadoopFs.getUri
val bucketName = uri.getHost
val index = hdfsFileInfo.getFilePath.indexOf(bucketName)
val key = hdfsFileInfo.getFilePath.substring(index + bucketName.length + 1)
val index = dfsFileInfo.getFilePath.indexOf(bucketName)
val key = dfsFileInfo.getFilePath.substring(index + bucketName.length + 1)
this.ossMultipartUploadHandler = TierWriterHelper.getOssMultipartUploadHandler(
ossEndpoint,
@ -572,7 +572,7 @@ class DfsTierWriter(
case ex: InterruptedException =>
throw new RuntimeException(ex)
}
hadoopFs.create(hdfsFileInfo.getDfsPath, true).close()
hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
}
storageManager.registerDiskFilePartitionWriter(
@ -586,9 +586,9 @@ class DfsTierWriter(
override def genFlushTask(finalFlush: Boolean, keepBuffer: Boolean): FlushTask = {
notifier.numPendingFlushes.incrementAndGet()
if (hdfsFileInfo.isHdfs) {
new HdfsFlushTask(flushBuffer, hdfsFileInfo.getDfsPath(), notifier, true, source)
} else if (hdfsFileInfo.isOSS) {
if (dfsFileInfo.isHdfs) {
new HdfsFlushTask(flushBuffer, dfsFileInfo.getDfsPath(), notifier, true, source)
} else if (dfsFileInfo.isOSS) {
val flushTask = new OssFlushTask(
flushBuffer,
notifier,
@ -641,17 +641,19 @@ class DfsTierWriter(
}
override def closeStreams(): Unit = {
if (hadoopFs.exists(hdfsFileInfo.getDfsPeerWriterSuccessPath)) {
hadoopFs.delete(hdfsFileInfo.getDfsPath, false)
if (hadoopFs.exists(dfsFileInfo.getDfsPeerWriterSuccessPath)) {
hadoopFs.delete(dfsFileInfo.getDfsPath, false)
deleted = true
} else {
hadoopFs.create(hdfsFileInfo.getDfsWriterSuccessPath).close()
val indexOutputStream = hadoopFs.create(hdfsFileInfo.getDfsIndexPath)
indexOutputStream.writeInt(hdfsFileInfo.getReduceFileMeta.getChunkOffsets.size)
for (offset <- hdfsFileInfo.getReduceFileMeta.getChunkOffsets.asScala) {
indexOutputStream.writeLong(offset)
hadoopFs.create(dfsFileInfo.getDfsWriterSuccessPath).close()
if (dfsFileInfo.isReduceFileMeta) {
val indexOutputStream = hadoopFs.create(dfsFileInfo.getDfsIndexPath)
indexOutputStream.writeInt(dfsFileInfo.getReduceFileMeta.getChunkOffsets.size)
for (offset <- dfsFileInfo.getReduceFileMeta.getChunkOffsets.asScala) {
indexOutputStream.writeLong(offset)
}
indexOutputStream.close()
}
indexOutputStream.close()
}
if (s3MultipartUploadHandler != null) {
s3MultipartUploadHandler.complete()
@ -664,12 +666,12 @@ class DfsTierWriter(
}
override def notifyFileCommitted(): Unit =
storageManager.notifyFileInfoCommitted(shuffleKey, filename, hdfsFileInfo)
storageManager.notifyFileInfoCommitted(shuffleKey, filename, dfsFileInfo)
override def closeResource(): Unit = {}
override def cleanLocalOrDfsFiles(): Unit = {
hdfsFileInfo.deleteAllFiles(hadoopFs)
dfsFileInfo.deleteAllFiles(hadoopFs)
}
override def takeBufferInternal(): CompositeByteBuf = {

View File

@ -25,6 +25,8 @@ import java.util.concurrent.locks.ReentrantLock
import scala.collection.mutable
import org.apache.commons.lang3.StringUtils
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
@ -151,7 +153,9 @@ trait MiniClusterFeature extends Logging {
def createWorker(map: Map[String, String], storageDir: String): Worker = {
logInfo("start create worker for mini cluster")
val conf = new CelebornConf()
conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, storageDir)
if (StringUtils.isNotEmpty(storageDir)) {
conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, storageDir)
}
conf.set(CelebornConf.WORKER_DISK_MONITOR_ENABLED.key, "false")
conf.set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K")
conf.set(CelebornConf.WORKER_HTTP_PORT.key, s"${selectRandomPort()}")