diff --git a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
index c7161483b..ab798eca3 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
@@ -164,4 +164,8 @@ public class DiskFileInfo extends FileInfo {
public boolean isDFS() {
return Utils.isS3Path(filePath) || Utils.isOssPath(filePath) || Utils.isHdfsPath(filePath);
}
+
+ public StorageInfo.Type getStorageType() {
+ return storageType;
+ }
}
diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
index e81b72936..e8511f1bf 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
@@ -108,4 +108,8 @@ public abstract class FileInfo {
}
public abstract String getFilePath();
+
+ public boolean isReduceFileMeta() {
+ return isReduceFileMeta;
+ }
}
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 3fd090e77..aa719cff1 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -40,6 +40,7 @@ import com.google.protobuf.{ByteString, GeneratedMessageV3}
import io.netty.channel.unix.Errors.NativeIoException
import org.apache.commons.lang3.SystemUtils
import org.apache.commons.lang3.time.FastDateFormat
+import org.apache.hadoop.fs.FSDataInputStream
import org.roaringbitmap.RoaringBitmap
import org.apache.celeborn.common.CelebornConf
@@ -1175,12 +1176,11 @@ object Utils extends Logging {
}
@throws[IOException]
- def checkFileIntegrity(fileChannel: FileChannel, length: Int): Unit = {
- val remainingBytes = fileChannel.size - fileChannel.position
+ def checkFileIntegrity(remainingBytes: Long, length: Int, filePath: String): Unit = {
if (remainingBytes < length) {
logError(
- s"File remaining bytes not not enough, remaining: ${remainingBytes}, wanted: ${length}.")
- throw new RuntimeException(s"File is corrupted ${fileChannel}")
+ s"File remaining bytes not not enough, remaining: $remainingBytes, wanted: $length.")
+ throw new RuntimeException(s"File is corrupted $filePath")
}
}
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index cdce5973b..760aaf8fe 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -268,6 +268,8 @@ object Dependencies {
ExclusionRule("org.apache.httpcomponents", "httpclient"),
ExclusionRule("org.slf4j", "slf4j-log4j12")
)
+ val hadoopAuth = "org.apache.hadoop" % "hadoop-auth" % hadoopVersion
+ val hadoopHdfs = "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion
val picocli = "info.picocli" % "picocli" % picocliVersion
@@ -1284,7 +1286,11 @@ trait FlinkClientProjects {
"org.apache.flink" % "flink-runtime" % flinkVersion % "test",
flinkStreamingDependency,
flinkClientsDependency,
- flinkRuntimeWebDependency
+ flinkRuntimeWebDependency,
+ Dependencies.hadoopCommon % "test",
+ Dependencies.hadoopAuth % "test",
+ Dependencies.hadoopHdfs % "test->test;compile->compile",
+ Dependencies.jerseyServer % "test",
) ++ commonUnitTestDependencies,
(Test / envVars) += ("FLINK_VERSION", flinkVersion)
)
diff --git a/tests/flink-it/pom.xml b/tests/flink-it/pom.xml
index 8664fbe94..b4ef70ea5 100644
--- a/tests/flink-it/pom.xml
+++ b/tests/flink-it/pom.xml
@@ -91,5 +91,36 @@
${flink.version}
test
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+ test
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+ test-jar
+ test
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ ${hadoop.version}
+ test
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ ${hadoop.version}
+ test-jar
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
diff --git a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
index cc75a73f4..fa8860efc 100644
--- a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
+++ b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
@@ -18,6 +18,7 @@
package org.apache.celeborn.tests.flink
import java.io.File
+import java.nio.file.Files
import scala.collection.JavaConverters._
@@ -26,13 +27,17 @@ import org.apache.flink.configuration.{Configuration, ExecutionOptions}
import org.apache.flink.runtime.jobgraph.JobType
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode
+import org.apache.flink.util.OperatingSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hdfs.MiniDFSCluster
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.CelebornConf.{AUTH_ENABLED, INTERNAL_PORT_ENABLED}
+import org.apache.celeborn.common.CelebornConf.{ACTIVE_STORAGE_TYPES, AUTH_ENABLED, HDFS_DIR, INTERNAL_PORT_ENABLED, WORKER_STORAGE_CREATE_FILE_POLICY}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.protocol.FallbackPolicy
+import org.apache.celeborn.rest.v1.model.PartitionLocationData.StorageEnum
import org.apache.celeborn.service.deploy.MiniClusterFeature
import org.apache.celeborn.service.deploy.worker.Worker
@@ -132,3 +137,41 @@ class WordCountTestWithAuthentication extends WordCountTestBase {
override protected def getWorkerConf: Map[String, String] = authConfig
override protected def getClientConf: Map[String, String] = Map(AUTH_ENABLED.key -> "true")
}
+
+class WordCountTestWithHDFS extends WordCountTestBase {
+
+ private var basePath: Path = _
+ private var hdfsCluster: MiniDFSCluster = _
+
+ override protected def getMasterConf: Map[String, String] = Map()
+ override protected def getWorkerConf: Map[String, String] = hdfsConfig
+ override protected def getClientConf: Map[String, String] = hdfsConfig
+
+ override def createWorker(map: Map[String, String]): Worker = {
+ super.createWorker(map, null)
+ }
+
+ override def beforeAll(): Unit = {
+ assume(!OperatingSystem.isWindows)
+ val hdConf = new org.apache.hadoop.conf.Configuration()
+ val tmpDir = Files.createTempDirectory("celeborn-")
+ tmpDir.toFile.deleteOnExit()
+ hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpDir.toString)
+ hdfsCluster = new MiniDFSCluster.Builder(hdConf).build
+ basePath = new Path(hdfsCluster.getFileSystem.getUri.toString + "/test")
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ if (hdfsCluster != null) {
+ hdfsCluster.getFileSystem.delete(basePath, true)
+ hdfsCluster.shutdown()
+ }
+ }
+
+ private def hdfsConfig = Map(
+ ACTIVE_STORAGE_TYPES.key -> StorageEnum.HDFS.getValue,
+ WORKER_STORAGE_CREATE_FILE_POLICY.key -> StorageEnum.HDFS.getValue,
+ HDFS_DIR.key -> basePath.toString)
+}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java
new file mode 100644
index 000000000..c461e8366
--- /dev/null
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.celeborn.common.meta.DiskFileInfo;
+import org.apache.celeborn.common.protocol.StorageInfo;
+import org.apache.celeborn.common.util.Utils;
+
+public class DfsPartitionDataReader extends PartitionDataReader {
+
+ private final FSDataInputStream dataInputStream;
+ private final FSDataInputStream indexInputStream;
+
+ public DfsPartitionDataReader(
+ DiskFileInfo fileInfo, ByteBuffer headerBuffer, ByteBuffer indexBuffer) throws IOException {
+ super(fileInfo, headerBuffer, indexBuffer);
+ FileSystem fileSystem =
+ StorageManager.hadoopFs()
+ .get(
+ fileInfo.isHdfs()
+ ? StorageInfo.Type.HDFS
+ : fileInfo.isS3() ? StorageInfo.Type.S3 : StorageInfo.Type.OSS);
+ this.dataInputStream = fileSystem.open(fileInfo.getDfsPath());
+ this.indexInputStream = fileSystem.open(fileInfo.getDfsIndexPath());
+ this.dataFileSize = fileSystem.getFileStatus(fileInfo.getDfsPath()).getLen();
+ this.indexFileSize = fileSystem.getFileStatus(fileInfo.getDfsIndexPath()).getLen();
+ }
+
+ @Override
+ public void readIndexBuffer(long targetPosition) throws IOException {
+ indexInputStream.seek(targetPosition);
+ readHeaderOrIndexBuffer(
+ indexInputStream,
+ indexBuffer,
+ indexFileSize,
+ indexBuffer.capacity(),
+ fileInfo.getIndexPath());
+ }
+
+ @Override
+ public void position(long targetPosition) throws IOException {
+ dataInputStream.seek(targetPosition);
+ }
+
+ @Override
+ public void readHeaderBuffer(int headerSize) throws IOException {
+ readHeaderOrIndexBuffer(
+ dataInputStream, headerBuffer, dataFileSize, headerSize, fileInfo.getFilePath());
+ }
+
+ @Override
+ public void readBufferIntoReadBuffer(ByteBuf buf, long fileSize, int length, String filePath)
+ throws IOException {
+ Utils.checkFileIntegrity(fileSize - dataInputStream.getPos(), length, filePath);
+ ByteBuffer tmpBuffer = ByteBuffer.allocate(length);
+ while (tmpBuffer.hasRemaining()) {
+ dataInputStream.read(tmpBuffer);
+ }
+ tmpBuffer.flip();
+ buf.writeBytes(tmpBuffer);
+ }
+
+ @Override
+ public long position() throws IOException {
+ return dataInputStream.getPos();
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(dataInputStream);
+ IOUtils.closeQuietly(indexInputStream);
+ }
+
+ private void readHeaderOrIndexBuffer(
+ FSDataInputStream inputStream, ByteBuffer buffer, long fileSize, int length, String filePath)
+ throws IOException {
+ Utils.checkFileIntegrity(fileSize - inputStream.getPos(), length, filePath);
+ buffer.clear();
+ buffer.limit(length);
+ while (buffer.hasRemaining()) {
+ inputStream.read(buffer);
+ }
+ buffer.flip();
+ }
+}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java
new file mode 100644
index 000000000..c5c47aedf
--- /dev/null
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.io.IOUtils;
+
+import org.apache.celeborn.common.meta.DiskFileInfo;
+import org.apache.celeborn.common.util.FileChannelUtils;
+import org.apache.celeborn.common.util.Utils;
+
+public class LocalPartitionDataReader extends PartitionDataReader {
+
+ private final FileChannel dataFileChanel;
+ private final FileChannel indexFileChannel;
+
+ public LocalPartitionDataReader(
+ DiskFileInfo fileInfo, ByteBuffer headerBuffer, ByteBuffer indexBuffer) throws IOException {
+ super(fileInfo, headerBuffer, indexBuffer);
+ this.dataFileChanel = FileChannelUtils.openReadableFileChannel(fileInfo.getFilePath());
+ this.indexFileChannel = FileChannelUtils.openReadableFileChannel(fileInfo.getIndexPath());
+ this.dataFileSize = dataFileChanel.size();
+ this.indexFileSize = indexFileChannel.size();
+ }
+
+ @Override
+ public void readIndexBuffer(long targetPosition) throws IOException {
+ indexFileChannel.position(targetPosition);
+ readHeaderOrIndexBuffer(
+ indexFileChannel,
+ indexBuffer,
+ indexFileSize,
+ indexBuffer.capacity(),
+ fileInfo.getIndexPath());
+ }
+
+ @Override
+ public void position(long targetPosition) throws IOException {
+ dataFileChanel.position(targetPosition);
+ }
+
+ @Override
+ public void readHeaderBuffer(int headerSize) throws IOException {
+ readHeaderOrIndexBuffer(
+ dataFileChanel, headerBuffer, dataFileSize, headerSize, fileInfo.getFilePath());
+ }
+
+ @Override
+ public void readBufferIntoReadBuffer(ByteBuf buf, long fileSize, int length, String filePath)
+ throws IOException {
+ Utils.checkFileIntegrity(fileSize - dataFileChanel.position(), length, filePath);
+ ByteBuffer tmpBuffer = ByteBuffer.allocate(length);
+ while (tmpBuffer.hasRemaining()) {
+ dataFileChanel.read(tmpBuffer);
+ }
+ tmpBuffer.flip();
+ buf.writeBytes(tmpBuffer);
+ }
+
+ @Override
+ public long position() throws IOException {
+ return dataFileChanel.position();
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(dataFileChanel);
+ IOUtils.closeQuietly(indexFileChannel);
+ }
+
+ private void readHeaderOrIndexBuffer(
+ FileChannel channel, ByteBuffer buffer, long fileSize, int length, String filePath)
+ throws IOException {
+ Utils.checkFileIntegrity(fileSize - channel.position(), length, filePath);
+ buffer.clear();
+ buffer.limit(length);
+ while (buffer.hasRemaining()) {
+ channel.read(buffer);
+ }
+ buffer.flip();
+ }
+}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
index d88370edf..66d3be67b 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
@@ -18,7 +18,6 @@
package org.apache.celeborn.service.deploy.worker.storage;
import java.io.IOException;
-import java.nio.channels.FileChannel;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -28,13 +27,11 @@ import java.util.function.Consumer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
-import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.meta.DiskFileInfo;
import org.apache.celeborn.common.meta.MapFileMeta;
-import org.apache.celeborn.common.util.FileChannelUtils;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.service.deploy.worker.memory.BufferQueue;
@@ -49,9 +46,6 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis
protected final ExecutorService readExecutor;
protected final ConcurrentHashMap readers =
JavaUtils.newConcurrentHashMap();
- private FileChannel dataFileChanel;
- private FileChannel indexChannel;
- private long indexSize;
private volatile boolean isReleased = false;
private final BufferQueue bufferQueue = new BufferQueue();
private AtomicBoolean bufferQueueInitialized = new AtomicBoolean(false);
@@ -95,9 +89,6 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis
threadsPerMountPoint,
String.format("worker-map-partition-%s-reader", mapFileMeta.getMountPoint()),
false));
- this.dataFileChanel = FileChannelUtils.openReadableFileChannel(diskFileInfo.getFilePath());
- this.indexChannel = FileChannelUtils.openReadableFileChannel(diskFileInfo.getIndexPath());
- this.indexSize = indexChannel.size();
MemoryManager.instance().addReadBufferTargetChangeListener(this);
}
@@ -174,7 +165,7 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis
}
protected void openReader(MapPartitionDataReader reader) throws IOException {
- reader.open(dataFileChanel, indexChannel, indexSize);
+ reader.open();
}
public synchronized void readBuffers() {
@@ -252,8 +243,8 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis
bufferQueue.release();
isReleased = true;
- IOUtils.closeQuietly(dataFileChanel);
- IOUtils.closeQuietly(indexChannel);
+ readers.values().forEach(MapPartitionDataReader::close);
+ readers.clear();
MemoryManager.instance().removeReadBufferTargetChangeListener(this);
}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
index 8f8a2c353..a0a5ce506 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
@@ -20,7 +20,6 @@ package org.apache.celeborn.service.deploy.worker.storage;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
-import java.nio.channels.FileChannel;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
@@ -95,13 +94,6 @@ public class MapPartitionDataReader implements Comparable buffer.capacity()) {
- logger.error("Incorrect buffer header, buffer length: {}.", bufferLength);
- throw new FileCorruptedException("File " + filename + " is corrupted");
- }
- buffer.writeBytes(header);
- readBufferIntoReadBuffer(channel, buffer, bufferLength);
- return bufferLength + headerSize;
- }
-
protected void updateConsumingOffset() throws IOException {
while (currentPartitionRemainingBytes == 0
&& (currentDataRegion < numRegions - 1 || numRemainingPartitions > 0)) {
@@ -331,10 +288,10 @@ public class MapPartitionDataReader implements Comparable dataFileChannelSize
+ || dataConsumingOffset + currentPartitionRemainingBytes
+ > partitionDataReader.getDataFileSize()
|| currentPartitionRemainingBytes < 0) {
throw new FileCorruptedException("File " + fileInfo.getFilePath() + " is corrupted");
}
@@ -360,17 +318,9 @@ public class MapPartitionDataReader implements Comparable buffer.capacity()) {
+ LOG.error("Incorrect buffer header, buffer length: {}.", bufferLength);
+ throw new FileCorruptedException(
+ String.format("File %s is corrupted", fileInfo.getFilePath()));
+ }
+ buffer.writeBytes(headerBuffer);
+ readBufferIntoReadBuffer(buffer, dataFileSize, bufferLength, fileInfo.getFilePath());
+ return bufferLength + headerSize;
+ }
+
+ public long getDataFileSize() {
+ return dataFileSize;
+ }
+
+ public long getIndexFileSize() {
+ return indexFileSize;
+ }
+}
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala
index 0eeaa5e70..902d720ee 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala
@@ -29,8 +29,8 @@ import org.apache.hadoop.fs.FileSystem
import org.roaringbitmap.RoaringBitmap
import org.slf4j.{Logger, LoggerFactory}
-import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MapFileMeta, MemoryFileInfo, ReduceFileMeta}
-import org.apache.celeborn.common.protocol.{PbPushDataHandShake, PbRegionFinish, PbRegionStart, PbSegmentStart}
+import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MapFileMeta, ReduceFileMeta}
+import org.apache.celeborn.common.protocol.{PbPushDataHandShake, PbRegionFinish, PbRegionStart, PbSegmentStart, StorageInfo}
import org.apache.celeborn.common.unsafe.Platform
import org.apache.celeborn.common.util.FileChannelUtils
@@ -94,7 +94,7 @@ trait PartitionMetaHandler {
class MapPartitionMetaHandler(
diskFileInfo: DiskFileInfo,
notifier: FlushNotifier) extends PartitionMetaHandler {
- lazy val hadoopFs: FileSystem = StorageManager.hadoopFs.get()
+ lazy val hadoopFs: FileSystem = StorageManager.hadoopFs.get(diskFileInfo.getStorageType)
val logger: Logger = LoggerFactory.getLogger(classOf[MapPartitionMetaHandler])
val fileMeta: MapFileMeta = diskFileInfo.getFileMeta.asInstanceOf[MapFileMeta]
var numSubpartitions = 0
@@ -286,28 +286,7 @@ class MapPartitionMetaHandler(
}
override def afterClose(): Unit = {
- // TODO: force flush the index file channel in scenarios which the upstream task writes and
- // downstream task reads simultaneously, such as flink hybrid shuffle
- if (indexBuffer != null) {
- logger.debug(s"flushIndex start:${diskFileInfo.getIndexPath}")
- val startTime = System.currentTimeMillis
- indexBuffer.flip
- notifier.checkException()
- try {
- if (indexBuffer.hasRemaining) {
- // map partition synchronously writes file index
- if (indexChannel != null) while (indexBuffer.hasRemaining) indexChannel.write(indexBuffer)
- else if (diskFileInfo.isDFS) {
- val dfsStream = hadoopFs.append(diskFileInfo.getDfsIndexPath)
- dfsStream.write(indexBuffer.array)
- dfsStream.close()
- }
- }
- indexBuffer.clear
- } finally logger.debug(
- s"flushIndex end:${diskFileInfo.getIndexPath}, " +
- s"cost:${System.currentTimeMillis - startTime}")
- }
+ flushIndex()
}
override def beforeWrite(bytes: ByteBuf): Unit = {
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 3070e4b94..b71110947 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -43,7 +43,7 @@ import org.apache.celeborn.common.metrics.source.{AbstractSource, ThreadPoolSour
import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType, StorageInfo}
import org.apache.celeborn.common.quota.ResourceConsumption
-import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, DiskUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
+import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, CollectionUtils, DiskUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
import org.apache.celeborn.service.deploy.worker._
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager.MemoryPressureListener
@@ -819,7 +819,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
}
}
}
- if (null != diskOperators) {
+ if (CollectionUtils.isNotEmpty(diskOperators)) {
if (exitKind != CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
cleanupExpiredShuffleKey(shuffleKeySet(), false)
}
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index 7c3f1b966..fa0934efb 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -494,7 +494,7 @@ class DfsTierWriter(
notifier: FlushNotifier,
flusher: Flusher,
source: AbstractSource,
- hdfsFileInfo: DiskFileInfo,
+ dfsFileInfo: DiskFileInfo,
storageType: StorageInfo.Type,
partitionDataWriterContext: PartitionDataWriterContext,
storageManager: StorageManager)
@@ -503,7 +503,7 @@ class DfsTierWriter(
metaHandler,
numPendingWrites,
notifier,
- hdfsFileInfo,
+ dfsFileInfo,
source,
storageType,
partitionDataWriterContext.getPartitionLocation.getFileName,
@@ -520,21 +520,21 @@ class DfsTierWriter(
var partNumber: Int = 1
this.flusherBufferSize =
- if (hdfsFileInfo.isS3()) {
+ if (dfsFileInfo.isS3()) {
conf.workerS3FlusherBufferSize
- } else if (hdfsFileInfo.isOSS()) {
+ } else if (dfsFileInfo.isOSS()) {
conf.workerOssFlusherBufferSize
} else {
conf.workerHdfsFlusherBufferSize
}
try {
- hadoopFs.create(hdfsFileInfo.getDfsPath, true).close()
- if (hdfsFileInfo.isS3) {
+ hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
+ if (dfsFileInfo.isS3) {
val uri = hadoopFs.getUri
val bucketName = uri.getHost
- val index = hdfsFileInfo.getFilePath.indexOf(bucketName)
- val key = hdfsFileInfo.getFilePath.substring(index + bucketName.length + 1)
+ val index = dfsFileInfo.getFilePath.indexOf(bucketName)
+ val key = dfsFileInfo.getFilePath.substring(index + bucketName.length + 1)
this.s3MultipartUploadHandler = TierWriterHelper.getS3MultipartUploadHandler(
hadoopFs,
@@ -544,7 +544,7 @@ class DfsTierWriter(
conf.s3MultiplePartUploadBaseDelay,
conf.s3MultiplePartUploadMaxBackoff)
s3MultipartUploadHandler.startUpload()
- } else if (hdfsFileInfo.isOSS) {
+ } else if (dfsFileInfo.isOSS) {
val configuration = hadoopFs.getConf
val ossEndpoint = configuration.get("fs.oss.endpoint")
val ossAccessKey = configuration.get("fs.oss.accessKeyId")
@@ -552,8 +552,8 @@ class DfsTierWriter(
val uri = hadoopFs.getUri
val bucketName = uri.getHost
- val index = hdfsFileInfo.getFilePath.indexOf(bucketName)
- val key = hdfsFileInfo.getFilePath.substring(index + bucketName.length + 1)
+ val index = dfsFileInfo.getFilePath.indexOf(bucketName)
+ val key = dfsFileInfo.getFilePath.substring(index + bucketName.length + 1)
this.ossMultipartUploadHandler = TierWriterHelper.getOssMultipartUploadHandler(
ossEndpoint,
@@ -572,7 +572,7 @@ class DfsTierWriter(
case ex: InterruptedException =>
throw new RuntimeException(ex)
}
- hadoopFs.create(hdfsFileInfo.getDfsPath, true).close()
+ hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
}
storageManager.registerDiskFilePartitionWriter(
@@ -586,9 +586,9 @@ class DfsTierWriter(
override def genFlushTask(finalFlush: Boolean, keepBuffer: Boolean): FlushTask = {
notifier.numPendingFlushes.incrementAndGet()
- if (hdfsFileInfo.isHdfs) {
- new HdfsFlushTask(flushBuffer, hdfsFileInfo.getDfsPath(), notifier, true, source)
- } else if (hdfsFileInfo.isOSS) {
+ if (dfsFileInfo.isHdfs) {
+ new HdfsFlushTask(flushBuffer, dfsFileInfo.getDfsPath(), notifier, true, source)
+ } else if (dfsFileInfo.isOSS) {
val flushTask = new OssFlushTask(
flushBuffer,
notifier,
@@ -641,17 +641,19 @@ class DfsTierWriter(
}
override def closeStreams(): Unit = {
- if (hadoopFs.exists(hdfsFileInfo.getDfsPeerWriterSuccessPath)) {
- hadoopFs.delete(hdfsFileInfo.getDfsPath, false)
+ if (hadoopFs.exists(dfsFileInfo.getDfsPeerWriterSuccessPath)) {
+ hadoopFs.delete(dfsFileInfo.getDfsPath, false)
deleted = true
} else {
- hadoopFs.create(hdfsFileInfo.getDfsWriterSuccessPath).close()
- val indexOutputStream = hadoopFs.create(hdfsFileInfo.getDfsIndexPath)
- indexOutputStream.writeInt(hdfsFileInfo.getReduceFileMeta.getChunkOffsets.size)
- for (offset <- hdfsFileInfo.getReduceFileMeta.getChunkOffsets.asScala) {
- indexOutputStream.writeLong(offset)
+ hadoopFs.create(dfsFileInfo.getDfsWriterSuccessPath).close()
+ if (dfsFileInfo.isReduceFileMeta) {
+ val indexOutputStream = hadoopFs.create(dfsFileInfo.getDfsIndexPath)
+ indexOutputStream.writeInt(dfsFileInfo.getReduceFileMeta.getChunkOffsets.size)
+ for (offset <- dfsFileInfo.getReduceFileMeta.getChunkOffsets.asScala) {
+ indexOutputStream.writeLong(offset)
+ }
+ indexOutputStream.close()
}
- indexOutputStream.close()
}
if (s3MultipartUploadHandler != null) {
s3MultipartUploadHandler.complete()
@@ -664,12 +666,12 @@ class DfsTierWriter(
}
override def notifyFileCommitted(): Unit =
- storageManager.notifyFileInfoCommitted(shuffleKey, filename, hdfsFileInfo)
+ storageManager.notifyFileInfoCommitted(shuffleKey, filename, dfsFileInfo)
override def closeResource(): Unit = {}
override def cleanLocalOrDfsFiles(): Unit = {
- hdfsFileInfo.deleteAllFiles(hadoopFs)
+ dfsFileInfo.deleteAllFiles(hadoopFs)
}
override def takeBufferInternal(): CompositeByteBuf = {
diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
index 8c3a47a07..95d69fc12 100644
--- a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
+++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
@@ -25,6 +25,8 @@ import java.util.concurrent.locks.ReentrantLock
import scala.collection.mutable
+import org.apache.commons.lang3.StringUtils
+
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
@@ -151,7 +153,9 @@ trait MiniClusterFeature extends Logging {
def createWorker(map: Map[String, String], storageDir: String): Worker = {
logInfo("start create worker for mini cluster")
val conf = new CelebornConf()
- conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, storageDir)
+ if (StringUtils.isNotEmpty(storageDir)) {
+ conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, storageDir)
+ }
conf.set(CelebornConf.WORKER_DISK_MONITOR_ENABLED.key, "false")
conf.set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K")
conf.set(CelebornConf.WORKER_HTTP_PORT.key, s"${selectRandomPort()}")