diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 2eee6c46f..b6a98c135 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -2474,7 +2474,7 @@ object CelebornConf extends Logging { "Otherwise, LifecycleManager will process release partition request immediately") .version("0.3.0") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val BATCH_HANDLE_RELEASE_PARTITION_THREADS: ConfigEntry[Int] = buildConf("celeborn.shuffle.batchHandleReleasePartition.threads") diff --git a/docs/configuration/client.md b/docs/configuration/client.md index fc9ceac4c..c645b25fe 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -61,7 +61,7 @@ license: | | celeborn.shuffle.batchHandleCommitPartition.enabled | false | When true, LifecycleManager will handle commit partition request in batch. Otherwise, LifecycleManager won't commit partition before stage end | 0.2.0 | | celeborn.shuffle.batchHandleCommitPartition.interval | 5s | Interval for LifecycleManager to schedule handling commit partition requests in batch. | 0.2.0 | | celeborn.shuffle.batchHandleCommitPartition.threads | 8 | Threads number for LifecycleManager to handle commit partition request in batch. | 0.2.0 | -| celeborn.shuffle.batchHandleReleasePartition.enabled | false | When true, LifecycleManager will handle release partition request in batch. Otherwise, LifecycleManager will process release partition request immediately | 0.3.0 | +| celeborn.shuffle.batchHandleReleasePartition.enabled | true | When true, LifecycleManager will handle release partition request in batch. Otherwise, LifecycleManager will process release partition request immediately | 0.3.0 | | celeborn.shuffle.batchHandleReleasePartition.interval | 5s | Interval for LifecycleManager to schedule handling release partition requests in batch. | 0.3.0 | | celeborn.shuffle.batchHandleReleasePartition.threads | 8 | Threads number for LifecycleManager to handle release partition request in batch. | 0.3.0 | | celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 | diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileChannelUtils.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileChannelUtils.java new file mode 100644 index 000000000..3f08c78d9 --- /dev/null +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileChannelUtils.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.worker.storage; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; + +public class FileChannelUtils { + + public static FileChannel createWritableFileChannel(String filePath) throws IOException { + return FileChannel.open( + Paths.get(filePath), StandardOpenOption.CREATE, StandardOpenOption.WRITE); + } + + public static FileChannel openReadableFileChannel(String filePath) throws IOException { + return FileChannel.open(Paths.get(filePath), StandardOpenOption.READ); + } +} diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java index e16bb2632..1a8fe4e90 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java @@ -18,7 +18,6 @@ package org.apache.celeborn.service.deploy.worker.storage; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.nio.channels.FileChannel; import java.util.Optional; @@ -50,7 +49,7 @@ import org.apache.celeborn.service.deploy.worker.memory.MemoryManager; */ public abstract class FileWriter implements DeviceObserver { private static final Logger logger = LoggerFactory.getLogger(FileWriter.class); - private static final long WAIT_INTERVAL_MS = 20; + private static final long WAIT_INTERVAL_MS = 5; protected final FileInfo fileInfo; private FileChannel channel; @@ -101,7 +100,7 @@ public abstract class FileWriter implements DeviceObserver { this.partitionType = partitionType; this.rangeReadFilter = rangeReadFilter; if (!fileInfo.isHdfs()) { - channel = new FileOutputStream(fileInfo.getFilePath()).getChannel(); + channel = FileChannelUtils.createWritableFileChannel(fileInfo.getFilePath()); } else { // We open the stream and close immediately because HDFS output stream will // create a DataStreamer that is a thread. @@ -251,13 +250,11 @@ public abstract class FileWriter implements DeviceObserver { waitOnNoPending(numPendingWrites); closed = true; - synchronized (this) { - if (flushBuffer.readableBytes() > 0) { - flush(true); - } - tryClose.run(); + if (flushBuffer.readableBytes() > 0) { + flush(true); } + tryClose.run(); waitOnNoPending(notifier.numPendingFlushes); } finally { returnBuffer(); diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartition.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartition.java index d63a8013a..e02e2a362 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartition.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartition.java @@ -17,7 +17,6 @@ package org.apache.celeborn.service.deploy.worker.storage; -import java.io.FileInputStream; import java.io.IOException; import java.nio.channels.FileChannel; import java.util.HashMap; @@ -101,8 +100,8 @@ class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener { logger.warn("StorageFetcherPool thread:{}:{}", t1, t2); }) .build())); - this.dataFileChanel = new FileInputStream(fileInfo.getFile()).getChannel(); - this.indexChannel = new FileInputStream(fileInfo.getIndexPath()).getChannel(); + this.dataFileChanel = FileChannelUtils.openReadableFileChannel(fileInfo.getFilePath()); + this.indexChannel = FileChannelUtils.openReadableFileChannel(fileInfo.getIndexPath()); this.indexSize = indexChannel.size(); MemoryManager.instance().addReadBufferTargetChangeListener(this); diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java index 64982557d..848b5f32b 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java @@ -17,7 +17,6 @@ package org.apache.celeborn.service.deploy.worker.storage; -import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -51,7 +50,6 @@ public final class MapPartitionFileWriter extends FileWriter { private int currentSubpartition; private long totalBytes; private long regionStartingOffset; - private long numDataRegions; private FileChannel indexChannel; public MapPartitionFileWriter( @@ -75,7 +73,7 @@ public final class MapPartitionFileWriter extends FileWriter { PartitionType.MAP, rangeReadFilter); if (!fileInfo.isHdfs()) { - indexChannel = new FileOutputStream(fileInfo.getIndexPath()).getChannel(); + indexChannel = FileChannelUtils.createWritableFileChannel(fileInfo.getIndexPath()); } else { try { StorageManager.hadoopFs().create(fileInfo.getHdfsIndexPath(), true).close(); @@ -232,7 +230,6 @@ public final class MapPartitionFileWriter extends FileWriter { flushIndex(); } - ++numDataRegions; regionStartingOffset = totalBytes; Arrays.fill(numSubpartitionBytes, 0); } @@ -257,7 +254,6 @@ public final class MapPartitionFileWriter extends FileWriter { indexBuffer.flip(); notifier.checkException(); try { - notifier.numPendingFlushes.incrementAndGet(); if (indexBuffer.hasRemaining()) { // mappartition synchronously writes file index if (indexChannel != null) { @@ -273,11 +269,10 @@ public final class MapPartitionFileWriter extends FileWriter { } indexBuffer.clear(); } finally { - logger.info( + logger.debug( "flushIndex end:{}, cost:{}", fileInfo.getIndexPath(), System.currentTimeMillis() - startTime); - notifier.numPendingFlushes.decrementAndGet(); } } } diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index 0d41dcb70..66b1d31c1 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -18,8 +18,6 @@ package org.apache.celeborn.service.deploy.worker.storage; import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; @@ -340,7 +338,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { // So there is no need to check its existence. hdfsIndexOutput = StorageManager.hadoopFs().create(new Path(indexFilePath)); } else { - indexFileChannel = new FileOutputStream(indexFilePath).getChannel(); + indexFileChannel = FileChannelUtils.createWritableFileChannel(indexFilePath); } int indexSize = 0; @@ -444,7 +442,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { && cachedIndexMaps.get(shuffleKey).containsKey(fileId)) { indexMap = cachedIndexMaps.get(shuffleKey).get(fileId); } else { - FileInputStream indexStream = null; + FileChannel indexChannel = null; FSDataInputStream hdfsIndexStream = null; boolean isHdfs = Utils.isHdfsPath(indexFilePath); int indexSize = 0; @@ -454,7 +452,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { indexSize = (int) StorageManager.hadoopFs().getFileStatus(new Path(indexFilePath)).getLen(); } else { - indexStream = new FileInputStream(indexFilePath); + indexChannel = FileChannelUtils.openReadableFileChannel(indexFilePath); File indexFile = new File(indexFilePath); indexSize = (int) indexFile.length(); } @@ -462,7 +460,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { if (isHdfs) { readStreamFully(hdfsIndexStream, indexBuf, indexFilePath); } else { - readChannelFully(indexStream.getChannel(), indexBuf, indexFilePath); + readChannelFully(indexChannel, indexBuf, indexFilePath); } indexBuf.rewind(); indexMap = ShuffleBlockInfoUtils.parseShuffleBlockInfosFromByteBuffer(indexBuf); @@ -473,7 +471,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { logger.error("Read sorted shuffle file index " + indexFilePath + " error, detail: ", e); throw new IOException("Read sorted shuffle file index failed.", e); } finally { - IOUtils.closeQuietly(indexStream, null); + IOUtils.closeQuietly(indexChannel, null); IOUtils.closeQuietly(hdfsIndexStream, null); } } @@ -603,8 +601,8 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { hdfsOriginInput = StorageManager.hadoopFs().open(new Path(originFilePath)); hdfsSortedOutput = StorageManager.hadoopFs().create(new Path(sortedFilePath)); } else { - originFileChannel = new FileInputStream(originFilePath).getChannel(); - sortedFileChannel = new FileOutputStream(sortedFilePath).getChannel(); + originFileChannel = FileChannelUtils.openReadableFileChannel(originFilePath); + sortedFileChannel = FileChannelUtils.createWritableFileChannel(sortedFilePath); } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index 1e84a899c..755d5700f 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -830,7 +830,9 @@ class PushDataHandler extends BaseMessageHandler with Logging { callback: RpcResponseCallback): Unit = { val isMaster = PartitionLocation.getMode(mode) == PartitionLocation.Mode.MASTER val messageType = message.`type`() - log.info(s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, partitionUniqueId:$partitionUniqueId") + log.debug( + s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, " + + s"partitionUniqueId:$partitionUniqueId") val (workerSourceMaster, workerSourceSlave) = messageType match { case Type.PUSH_DATA_HAND_SHAKE =>