[CELEBORN-553] Improve IO (#1458)

This commit is contained in:
Shuang 2023-04-25 21:14:06 +08:00 committed by GitHub
parent 01d8d1079c
commit 0b2e4877bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 56 additions and 30 deletions

View File

@ -2474,7 +2474,7 @@ object CelebornConf extends Logging {
"Otherwise, LifecycleManager will process release partition request immediately")
.version("0.3.0")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)
val BATCH_HANDLE_RELEASE_PARTITION_THREADS: ConfigEntry[Int] =
buildConf("celeborn.shuffle.batchHandleReleasePartition.threads")

View File

@ -61,7 +61,7 @@ license: |
| celeborn.shuffle.batchHandleCommitPartition.enabled | false | When true, LifecycleManager will handle commit partition request in batch. Otherwise, LifecycleManager won't commit partition before stage end | 0.2.0 |
| celeborn.shuffle.batchHandleCommitPartition.interval | 5s | Interval for LifecycleManager to schedule handling commit partition requests in batch. | 0.2.0 |
| celeborn.shuffle.batchHandleCommitPartition.threads | 8 | Threads number for LifecycleManager to handle commit partition request in batch. | 0.2.0 |
| celeborn.shuffle.batchHandleReleasePartition.enabled | false | When true, LifecycleManager will handle release partition request in batch. Otherwise, LifecycleManager will process release partition request immediately | 0.3.0 |
| celeborn.shuffle.batchHandleReleasePartition.enabled | true | When true, LifecycleManager will handle release partition request in batch. Otherwise, LifecycleManager will process release partition request immediately | 0.3.0 |
| celeborn.shuffle.batchHandleReleasePartition.interval | 5s | Interval for LifecycleManager to schedule handling release partition requests in batch. | 0.3.0 |
| celeborn.shuffle.batchHandleReleasePartition.threads | 8 | Threads number for LifecycleManager to handle release partition request in batch. | 0.3.0 |
| celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 |

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.service.deploy.worker.storage;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
public class FileChannelUtils {
public static FileChannel createWritableFileChannel(String filePath) throws IOException {
return FileChannel.open(
Paths.get(filePath), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
}
public static FileChannel openReadableFileChannel(String filePath) throws IOException {
return FileChannel.open(Paths.get(filePath), StandardOpenOption.READ);
}
}

View File

@ -18,7 +18,6 @@
package org.apache.celeborn.service.deploy.worker.storage;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.Optional;
@ -50,7 +49,7 @@ import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
*/
public abstract class FileWriter implements DeviceObserver {
private static final Logger logger = LoggerFactory.getLogger(FileWriter.class);
private static final long WAIT_INTERVAL_MS = 20;
private static final long WAIT_INTERVAL_MS = 5;
protected final FileInfo fileInfo;
private FileChannel channel;
@ -101,7 +100,7 @@ public abstract class FileWriter implements DeviceObserver {
this.partitionType = partitionType;
this.rangeReadFilter = rangeReadFilter;
if (!fileInfo.isHdfs()) {
channel = new FileOutputStream(fileInfo.getFilePath()).getChannel();
channel = FileChannelUtils.createWritableFileChannel(fileInfo.getFilePath());
} else {
// We open the stream and close immediately because HDFS output stream will
// create a DataStreamer that is a thread.
@ -251,13 +250,11 @@ public abstract class FileWriter implements DeviceObserver {
waitOnNoPending(numPendingWrites);
closed = true;
synchronized (this) {
if (flushBuffer.readableBytes() > 0) {
flush(true);
}
tryClose.run();
if (flushBuffer.readableBytes() > 0) {
flush(true);
}
tryClose.run();
waitOnNoPending(notifier.numPendingFlushes);
} finally {
returnBuffer();

View File

@ -17,7 +17,6 @@
package org.apache.celeborn.service.deploy.worker.storage;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.HashMap;
@ -101,8 +100,8 @@ class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener {
logger.warn("StorageFetcherPool thread:{}:{}", t1, t2);
})
.build()));
this.dataFileChanel = new FileInputStream(fileInfo.getFile()).getChannel();
this.indexChannel = new FileInputStream(fileInfo.getIndexPath()).getChannel();
this.dataFileChanel = FileChannelUtils.openReadableFileChannel(fileInfo.getFilePath());
this.indexChannel = FileChannelUtils.openReadableFileChannel(fileInfo.getIndexPath());
this.indexSize = indexChannel.size();
MemoryManager.instance().addReadBufferTargetChangeListener(this);

View File

@ -17,7 +17,6 @@
package org.apache.celeborn.service.deploy.worker.storage;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@ -51,7 +50,6 @@ public final class MapPartitionFileWriter extends FileWriter {
private int currentSubpartition;
private long totalBytes;
private long regionStartingOffset;
private long numDataRegions;
private FileChannel indexChannel;
public MapPartitionFileWriter(
@ -75,7 +73,7 @@ public final class MapPartitionFileWriter extends FileWriter {
PartitionType.MAP,
rangeReadFilter);
if (!fileInfo.isHdfs()) {
indexChannel = new FileOutputStream(fileInfo.getIndexPath()).getChannel();
indexChannel = FileChannelUtils.createWritableFileChannel(fileInfo.getIndexPath());
} else {
try {
StorageManager.hadoopFs().create(fileInfo.getHdfsIndexPath(), true).close();
@ -232,7 +230,6 @@ public final class MapPartitionFileWriter extends FileWriter {
flushIndex();
}
++numDataRegions;
regionStartingOffset = totalBytes;
Arrays.fill(numSubpartitionBytes, 0);
}
@ -257,7 +254,6 @@ public final class MapPartitionFileWriter extends FileWriter {
indexBuffer.flip();
notifier.checkException();
try {
notifier.numPendingFlushes.incrementAndGet();
if (indexBuffer.hasRemaining()) {
// mappartition synchronously writes file index
if (indexChannel != null) {
@ -273,11 +269,10 @@ public final class MapPartitionFileWriter extends FileWriter {
}
indexBuffer.clear();
} finally {
logger.info(
logger.debug(
"flushIndex end:{}, cost:{}",
fileInfo.getIndexPath(),
System.currentTimeMillis() - startTime);
notifier.numPendingFlushes.decrementAndGet();
}
}
}

View File

@ -18,8 +18,6 @@
package org.apache.celeborn.service.deploy.worker.storage;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
@ -340,7 +338,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
// So there is no need to check its existence.
hdfsIndexOutput = StorageManager.hadoopFs().create(new Path(indexFilePath));
} else {
indexFileChannel = new FileOutputStream(indexFilePath).getChannel();
indexFileChannel = FileChannelUtils.createWritableFileChannel(indexFilePath);
}
int indexSize = 0;
@ -444,7 +442,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
&& cachedIndexMaps.get(shuffleKey).containsKey(fileId)) {
indexMap = cachedIndexMaps.get(shuffleKey).get(fileId);
} else {
FileInputStream indexStream = null;
FileChannel indexChannel = null;
FSDataInputStream hdfsIndexStream = null;
boolean isHdfs = Utils.isHdfsPath(indexFilePath);
int indexSize = 0;
@ -454,7 +452,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
indexSize =
(int) StorageManager.hadoopFs().getFileStatus(new Path(indexFilePath)).getLen();
} else {
indexStream = new FileInputStream(indexFilePath);
indexChannel = FileChannelUtils.openReadableFileChannel(indexFilePath);
File indexFile = new File(indexFilePath);
indexSize = (int) indexFile.length();
}
@ -462,7 +460,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
if (isHdfs) {
readStreamFully(hdfsIndexStream, indexBuf, indexFilePath);
} else {
readChannelFully(indexStream.getChannel(), indexBuf, indexFilePath);
readChannelFully(indexChannel, indexBuf, indexFilePath);
}
indexBuf.rewind();
indexMap = ShuffleBlockInfoUtils.parseShuffleBlockInfosFromByteBuffer(indexBuf);
@ -473,7 +471,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
logger.error("Read sorted shuffle file index " + indexFilePath + " error, detail: ", e);
throw new IOException("Read sorted shuffle file index failed.", e);
} finally {
IOUtils.closeQuietly(indexStream, null);
IOUtils.closeQuietly(indexChannel, null);
IOUtils.closeQuietly(hdfsIndexStream, null);
}
}
@ -603,8 +601,8 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
hdfsOriginInput = StorageManager.hadoopFs().open(new Path(originFilePath));
hdfsSortedOutput = StorageManager.hadoopFs().create(new Path(sortedFilePath));
} else {
originFileChannel = new FileInputStream(originFilePath).getChannel();
sortedFileChannel = new FileOutputStream(sortedFilePath).getChannel();
originFileChannel = FileChannelUtils.openReadableFileChannel(originFilePath);
sortedFileChannel = FileChannelUtils.createWritableFileChannel(sortedFilePath);
}
}

View File

@ -830,7 +830,9 @@ class PushDataHandler extends BaseMessageHandler with Logging {
callback: RpcResponseCallback): Unit = {
val isMaster = PartitionLocation.getMode(mode) == PartitionLocation.Mode.MASTER
val messageType = message.`type`()
log.info(s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, partitionUniqueId:$partitionUniqueId")
log.debug(
s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, " +
s"partitionUniqueId:$partitionUniqueId")
val (workerSourceMaster, workerSourceSlave) =
messageType match {
case Type.PUSH_DATA_HAND_SHAKE =>