[CELEBORN-83][FOLLOWUP] Fix various bugs when using HDFS as storage. (#1065)

This commit is contained in:
Ethan Feng 2022-12-15 15:20:29 +08:00 committed by GitHub
parent df5ed8ec09
commit 65cb36c002
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 177 additions and 78 deletions

View File

@ -121,6 +121,7 @@ celeborn.ha.master.node.3.ratis.port 9874
celeborn.ha.master.ratis.raft.server.storage.dir /mnt/disk1/rss_ratis/ celeborn.ha.master.ratis.raft.server.storage.dir /mnt/disk1/rss_ratis/
celeborn.metrics.enabled true celeborn.metrics.enabled true
# If you want to use HDFS as shuffle storage, make sure that flush buffer size is at least 4MB or larger.
celeborn.worker.flush.buffer.size 256k celeborn.worker.flush.buffer.size 256k
celeborn.worker.storage.dirs /mnt/disk1/,/mnt/disk2 celeborn.worker.storage.dirs /mnt/disk1/,/mnt/disk2
# If your hosts have disk raid or use lvm, set celeborn.worker.monitor.disk.enabled to false # If your hosts have disk raid or use lvm, set celeborn.worker.monitor.disk.enabled to false

View File

@ -24,6 +24,8 @@ import java.util.function.BooleanSupplier;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.client.read.RssInputStream; import org.apache.celeborn.client.read.RssInputStream;
import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.CelebornConf;
@ -39,6 +41,7 @@ public abstract class ShuffleClient {
private static volatile ShuffleClient _instance; private static volatile ShuffleClient _instance;
private static volatile boolean initialized = false; private static volatile boolean initialized = false;
private static volatile FileSystem hdfsFs; private static volatile FileSystem hdfsFs;
private static Logger logger = LoggerFactory.getLogger(ShuffleClient.class);
// for testing // for testing
public static void reset() { public static void reset() {
@ -102,6 +105,11 @@ public abstract class ShuffleClient {
synchronized (ShuffleClient.class) { synchronized (ShuffleClient.class) {
if (null == hdfsFs) { if (null == hdfsFs) {
Configuration hdfsConfiguration = new Configuration(); Configuration hdfsConfiguration = new Configuration();
// enable fs cache to avoid too many fs instances
hdfsConfiguration.set("fs.hdfs.impl.disable.cache", "false");
logger.info(
"Celeborn client will ignore cluster"
+ " settings about fs.hdfs.impl.disable.cache and set it to false");
try { try {
hdfsFs = FileSystem.get(hdfsConfiguration); hdfsFs = FileSystem.get(hdfsConfiguration);
} catch (IOException e) { } catch (IOException e) {

View File

@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
@ -53,9 +52,10 @@ public class DfsPartitionReader implements PartitionReader {
private final AtomicReference<IOException> exception = new AtomicReference<>(); private final AtomicReference<IOException> exception = new AtomicReference<>();
private volatile boolean closed = false; private volatile boolean closed = false;
private Thread fetchThread; private Thread fetchThread;
private final FSDataInputStream hdfsInputStream; private FSDataInputStream hdfsInputStream;
private int numChunks = 0; private int numChunks = 0;
private final AtomicInteger currentChunkIndex = new AtomicInteger(0); private int returnedChunks = 0;
private int currentChunkIndex = 0;
public DfsPartitionReader( public DfsPartitionReader(
CelebornConf conf, CelebornConf conf,
@ -98,29 +98,67 @@ public class DfsPartitionReader implements PartitionReader {
ShuffleClient.getHdfsFs(conf).open(new Path(location.getStorageInfo().getFilePath())); ShuffleClient.getHdfsFs(conf).open(new Path(location.getStorageInfo().getFilePath()));
chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location)); chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location));
} }
logger.debug(
"DFS {} index count:{} offsets:{}",
location.getStorageInfo().getFilePath(),
chunkOffsets.size(),
chunkOffsets);
if (chunkOffsets.size() > 1) { if (chunkOffsets.size() > 1) {
numChunks = chunkOffsets.size() - 1; numChunks = chunkOffsets.size() - 1;
fetchThread = fetchThread =
new Thread( new Thread(
() -> { () -> {
try { try {
while (!closed && currentChunkIndex.get() < numChunks) { while (!closed && currentChunkIndex < numChunks) {
while (results.size() >= fetchMaxReqsInFlight) { while (results.size() >= fetchMaxReqsInFlight) {
Thread.sleep(50); Thread.sleep(50);
} }
long offset = chunkOffsets.get(currentChunkIndex.get()); long offset = chunkOffsets.get(currentChunkIndex);
long length = chunkOffsets.get(currentChunkIndex.get() + 1) - offset; long length = chunkOffsets.get(currentChunkIndex + 1) - offset;
logger.debug("read {} offset {} length {}", currentChunkIndex, offset, length);
byte[] buffer = new byte[(int) length]; byte[] buffer = new byte[(int) length];
hdfsInputStream.readFully(offset, buffer); try {
results.add(Unpooled.wrappedBuffer(buffer)); hdfsInputStream.readFully(offset, buffer);
currentChunkIndex.incrementAndGet(); } catch (IOException e) {
logger.warn(
"read hdfs {} failed will retry, error detail {}",
location.getStorageInfo().getFilePath(),
e);
try {
hdfsInputStream.close();
hdfsInputStream =
ShuffleClient.getHdfsFs(conf)
.open(
new Path(
Utils.getSortedFilePath(
location.getStorageInfo().getFilePath())));
hdfsInputStream.readFully(offset, buffer);
} catch (IOException ex) {
logger.warn(
"retry read hdfs {} failed, error detail {} ",
location.getStorageInfo().getFilePath(),
e);
exception.set(ex);
break;
}
}
results.put(Unpooled.wrappedBuffer(buffer));
logger.debug("add index {} to results", currentChunkIndex++);
} }
} catch (IOException e) { } catch (Exception e) {
exception.set(e); logger.warn("Fetch thread is cancelled.", e);
} catch (InterruptedException e) {
// cancel a task for speculative, ignore this exception // cancel a task for speculative, ignore this exception
} }
}); logger.debug("fetch {} is done.", location.getStorageInfo().getFilePath());
},
"Dfs-fetch-thread" + location.getStorageInfo().getFilePath());
fetchThread.setUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error("thread {} failed with exception {}", t, e);
}
});
fetchThread.start(); fetchThread.start();
logger.debug("Start dfs read on location {}", location); logger.debug("Start dfs read on location {}", location);
} }
@ -145,6 +183,7 @@ public class DfsPartitionReader implements PartitionReader {
throws IOException { throws IOException {
String indexPath = Utils.getIndexFilePath(location.getStorageInfo().getFilePath()); String indexPath = Utils.getIndexFilePath(location.getStorageInfo().getFilePath());
FSDataInputStream indexInputStream = ShuffleClient.getHdfsFs(conf).open(new Path(indexPath)); FSDataInputStream indexInputStream = ShuffleClient.getHdfsFs(conf).open(new Path(indexPath));
logger.debug("read sorted index {}", indexPath);
long indexSize = ShuffleClient.getHdfsFs(conf).getFileStatus(new Path(indexPath)).getLen(); long indexSize = ShuffleClient.getHdfsFs(conf).getFileStatus(new Path(indexPath)).getLen();
// Index size won't be large, so it's safe to do the conversion. // Index size won't be large, so it's safe to do the conversion.
byte[] indexBuffer = new byte[(int) indexSize]; byte[] indexBuffer = new byte[(int) indexSize];
@ -162,17 +201,18 @@ public class DfsPartitionReader implements PartitionReader {
@Override @Override
public boolean hasNext() { public boolean hasNext() {
return currentChunkIndex.get() < numChunks; logger.debug("check has next current index: {} chunks {}", returnedChunks, numChunks);
return returnedChunks < numChunks;
} }
@Override @Override
public ByteBuf next() throws IOException { public ByteBuf next() throws IOException {
checkException();
ByteBuf chunk = null; ByteBuf chunk = null;
try { try {
while (chunk == null) { while (chunk == null) {
checkException(); checkException();
chunk = results.poll(500, TimeUnit.MILLISECONDS); chunk = results.poll(500, TimeUnit.MILLISECONDS);
logger.debug("poll result with result size: {}", results.size());
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -180,6 +220,7 @@ public class DfsPartitionReader implements PartitionReader {
exception.set(ioe); exception.set(ioe);
throw ioe; throw ioe;
} }
returnedChunks++;
return chunk; return chunk;
} }
@ -193,7 +234,9 @@ public class DfsPartitionReader implements PartitionReader {
@Override @Override
public void close() { public void close() {
closed = true; closed = true;
fetchThread.interrupt(); if (fetchThread != null) {
fetchThread.interrupt();
}
try { try {
hdfsInputStream.close(); hdfsInputStream.close();
} catch (IOException e) { } catch (IOException e) {

View File

@ -21,6 +21,8 @@ coverage:
status: status:
project: project:
default: default:
target: 0% target: 10%
threshold: 0% patch:
base: auto default:
enabled: no
if_not_found: success

View File

@ -18,7 +18,6 @@
package org.apache.celeborn.common.meta; package org.apache.celeborn.common.meta;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -27,12 +26,15 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.protocol.PartitionType; import org.apache.celeborn.common.protocol.PartitionType;
import org.apache.celeborn.common.util.Utils; import org.apache.celeborn.common.util.Utils;
public class FileInfo { public class FileInfo {
private static Logger logger = LoggerFactory.getLogger(FileInfo.class);
private final String filePath; private final String filePath;
private final List<Long> chunkOffsets; private final List<Long> chunkOffsets;
private final UserIdentifier userIdentifier; private final UserIdentifier userIdentifier;
@ -127,12 +129,23 @@ public class FileInfo {
return userIdentifier; return userIdentifier;
} }
public void deleteAllFiles(FileSystem hdfsFs) throws IOException { public void deleteAllFiles(FileSystem hdfsFs) {
if (isHdfs()) { if (isHdfs()) {
hdfsFs.delete(getHdfsPath(), false); try {
hdfsFs.delete(getHdfsWriterSuccessPath(), false); hdfsFs.delete(getHdfsPath(), false);
hdfsFs.delete(getHdfsIndexPath(), false); hdfsFs.delete(getHdfsWriterSuccessPath(), false);
hdfsFs.delete(getHdfsSortedPath(), false); hdfsFs.delete(getHdfsIndexPath(), false);
hdfsFs.delete(getHdfsSortedPath(), false);
} catch (Exception e) {
// ignore delete exceptions because some other workers might be deleting the directory
logger.debug(
"delete hdfs file {},{},{},{} failed {}",
getHdfsPath(),
getHdfsWriterSuccessPath(),
getHdfsIndexPath(),
getHdfsSortedPath(),
e);
}
} else { } else {
getFile().delete(); getFile().delete();
new File(getIndexPath()).delete(); new File(getIndexPath()).delete();

View File

@ -33,9 +33,13 @@
<PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} %p [%t] %c{1}: %m%n%ex"/> <PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} %p [%t] %c{1}: %m%n%ex"/>
</Console> </Console>
</Appenders> </Appenders>
<Loggers> <Loggers>
<Root level="INFO"> <Root level="INFO">
<AppenderRef ref="stdout"/> <AppenderRef ref="stdout"/>
</Root> </Root>
<Logger name="org.apache.hadoop.hdfs" level="WARN" additivity="false">
<Appender-ref ref="stdout" level="WARN" />
</Logger>
</Loggers> </Loggers>
</Configuration> </Configuration>

View File

@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.CompositeByteBuf;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.roaringbitmap.RoaringBitmap; import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -53,10 +52,9 @@ public abstract class FileWriter implements DeviceObserver {
private static final long WAIT_INTERVAL_MS = 20; private static final long WAIT_INTERVAL_MS = 20;
protected final FileInfo fileInfo; protected final FileInfo fileInfo;
protected FileChannel channel; private FileChannel channel;
protected FSDataOutputStream stream; private volatile boolean closed;
protected volatile boolean closed; private volatile boolean destroyed;
protected volatile boolean destroyed;
protected final AtomicInteger numPendingWrites = new AtomicInteger(); protected final AtomicInteger numPendingWrites = new AtomicInteger();
protected long bytesFlushed; protected long bytesFlushed;
@ -105,7 +103,20 @@ public abstract class FileWriter implements DeviceObserver {
if (!fileInfo.isHdfs()) { if (!fileInfo.isHdfs()) {
channel = new FileOutputStream(fileInfo.getFilePath()).getChannel(); channel = new FileOutputStream(fileInfo.getFilePath()).getChannel();
} else { } else {
stream = StorageManager.hdfsFs().create(fileInfo.getHdfsPath(), true); // We open the stream and close immediately because HDFS output stream will
// create a DataStreamer that is a threaed.
// If we reuse HDFS output stream, we will exhaust the memory soon.
try {
StorageManager.hadoopFs().create(fileInfo.getHdfsPath(), true).close();
} catch (IOException e) {
try {
// If create file failed, wait 10 ms and retry
Thread.sleep(10);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
StorageManager.hadoopFs().create(fileInfo.getHdfsPath(), true).close();
}
} }
source = workerSource; source = workerSource;
logger.debug("FileWriter {} split threshold {} mode {}", this, splitThreshold, splitMode); logger.debug("FileWriter {} split threshold {} mode {}", this, splitThreshold, splitMode);
@ -138,8 +149,8 @@ public abstract class FileWriter implements DeviceObserver {
FlushTask task = null; FlushTask task = null;
if (channel != null) { if (channel != null) {
task = new LocalFlushTask(flushBuffer, channel, notifier); task = new LocalFlushTask(flushBuffer, channel, notifier);
} else if (stream != null) { } else if (fileInfo.isHdfs()) {
task = new HdfsFlushTask(flushBuffer, stream, notifier); task = new HdfsFlushTask(flushBuffer, fileInfo.getHdfsPath(), notifier);
} }
addTask(task); addTask(task);
flushBuffer = null; flushBuffer = null;
@ -248,8 +259,7 @@ public abstract class FileWriter implements DeviceObserver {
if (channel != null) { if (channel != null) {
channel.close(); channel.close();
} }
if (stream != null) { if (fileInfo.isHdfs()) {
stream.close();
streamClose.run(); streamClose.run();
} }
} catch (IOException e) { } catch (IOException e) {
@ -278,9 +288,6 @@ public abstract class FileWriter implements DeviceObserver {
if (channel != null) { if (channel != null) {
channel.close(); channel.close();
} }
if (stream != null) {
stream.close();
}
} catch (IOException e) { } catch (IOException e) {
logger.warn( logger.warn(
"Close channel failed for file {} caused by {}.", "Close channel failed for file {} caused by {}.",
@ -291,14 +298,12 @@ public abstract class FileWriter implements DeviceObserver {
if (!destroyed) { if (!destroyed) {
destroyed = true; destroyed = true;
try { fileInfo.deleteAllFiles(StorageManager.hadoopFs());
fileInfo.deleteAllFiles(StorageManager.hdfsFs());
} catch (Exception e) {
logger.warn("Exception when cleaning hdfs file {}", fileInfo.getFilePath());
}
// unregister from DeviceMonitor // unregister from DeviceMonitor
deviceMonitor.unregisterFileWriter(this); if (!fileInfo.isHdfs()) {
deviceMonitor.unregisterFileWriter(this);
}
destroyHook.run(); destroyHook.run();
} }
} }

View File

@ -75,7 +75,7 @@ public final class MapPartitionFileWriter extends FileWriter {
if (!fileInfo.isHdfs()) { if (!fileInfo.isHdfs()) {
channelIndex = new FileOutputStream(fileInfo.getIndexPath()).getChannel(); channelIndex = new FileOutputStream(fileInfo.getIndexPath()).getChannel();
} else { } else {
streamIndex = StorageManager.hdfsFs().create(fileInfo.getHdfsIndexPath(), true); streamIndex = StorageManager.hadoopFs().create(fileInfo.getHdfsIndexPath(), true);
} }
} }

View File

@ -342,7 +342,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
if (isHdfs) { if (isHdfs) {
// If the index file exists, it will be overwritten. // If the index file exists, it will be overwritten.
// So there is no need to check its existence. // So there is no need to check its existence.
hdfsIndexOutput = StorageManager.hdfsFs().create(new Path(indexFilePath)); hdfsIndexOutput = StorageManager.hadoopFs().create(new Path(indexFilePath));
} else { } else {
indexFileChannel = new FileOutputStream(indexFilePath).getChannel(); indexFileChannel = new FileOutputStream(indexFilePath).getChannel();
} }
@ -455,8 +455,9 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
int indexSize = 0; int indexSize = 0;
try { try {
if (isHdfs) { if (isHdfs) {
hdfsIndexStream = StorageManager.hdfsFs().open(new Path(indexFilePath)); hdfsIndexStream = StorageManager.hadoopFs().open(new Path(indexFilePath));
indexSize = (int) StorageManager.hdfsFs().getFileStatus(new Path(indexFilePath)).getLen(); indexSize =
(int) StorageManager.hadoopFs().getFileStatus(new Path(indexFilePath)).getLen();
} else { } else {
indexStream = new FileInputStream(indexFilePath); indexStream = new FileInputStream(indexFilePath);
File indexFile = new File(indexFilePath); File indexFile = new File(indexFilePath);
@ -520,11 +521,11 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
indexFile.delete(); indexFile.delete();
} }
} else { } else {
if (StorageManager.hdfsFs().exists(fileInfo.getHdfsSortedPath())) { if (StorageManager.hadoopFs().exists(fileInfo.getHdfsSortedPath())) {
StorageManager.hdfsFs().delete(fileInfo.getHdfsSortedPath(), false); StorageManager.hadoopFs().delete(fileInfo.getHdfsSortedPath(), false);
} }
if (StorageManager.hdfsFs().exists(fileInfo.getHdfsIndexPath())) { if (StorageManager.hadoopFs().exists(fileInfo.getHdfsIndexPath())) {
StorageManager.hdfsFs().delete(fileInfo.getHdfsIndexPath(), false); StorageManager.hadoopFs().delete(fileInfo.getHdfsIndexPath(), false);
} }
} }
} }
@ -612,8 +613,8 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
private void initializeFiles() throws IOException { private void initializeFiles() throws IOException {
if (isHdfs) { if (isHdfs) {
hdfsOriginInput = StorageManager.hdfsFs().open(new Path(originFilePath)); hdfsOriginInput = StorageManager.hadoopFs().open(new Path(originFilePath));
hdfsSortedOutput = StorageManager.hdfsFs().create(new Path(sortedFilePath)); hdfsSortedOutput = StorageManager.hadoopFs().create(new Path(sortedFilePath));
} else { } else {
originFileChannel = new FileInputStream(originFilePath).getChannel(); originFileChannel = new FileInputStream(originFilePath).getChannel();
sortedFileChannel = new FileOutputStream(sortedFilePath).getChannel(); sortedFileChannel = new FileOutputStream(sortedFilePath).getChannel();
@ -646,7 +647,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
private void deleteOriginFiles() throws IOException { private void deleteOriginFiles() throws IOException {
boolean deleteSuccess = false; boolean deleteSuccess = false;
if (isHdfs) { if (isHdfs) {
deleteSuccess = StorageManager.hdfsFs().delete(new Path(originFilePath), false); deleteSuccess = StorageManager.hadoopFs().delete(new Path(originFilePath), false);
} else { } else {
deleteSuccess = new File(originFilePath).delete(); deleteSuccess = new File(originFilePath).delete();
} }

View File

@ -94,13 +94,13 @@ public final class ReducePartitionFileWriter extends FileWriter {
} }
}, },
() -> { () -> {
if (StorageManager.hdfsFs().exists(fileInfo.getHdfsPeerWriterSuccessPath())) { if (StorageManager.hadoopFs().exists(fileInfo.getHdfsPeerWriterSuccessPath())) {
StorageManager.hdfsFs().delete(fileInfo.getHdfsPath(), false); StorageManager.hadoopFs().delete(fileInfo.getHdfsPath(), false);
deleted = true; deleted = true;
} else { } else {
StorageManager.hdfsFs().create(fileInfo.getHdfsWriterSuccessPath()).close(); StorageManager.hadoopFs().create(fileInfo.getHdfsWriterSuccessPath()).close();
FSDataOutputStream indexOutputStream = FSDataOutputStream indexOutputStream =
StorageManager.hdfsFs().create(fileInfo.getHdfsIndexPath()); StorageManager.hadoopFs().create(fileInfo.getHdfsIndexPath());
indexOutputStream.writeInt(fileInfo.getChunkOffsets().size()); indexOutputStream.writeInt(fileInfo.getChunkOffsets().size());
for (Long offset : fileInfo.getChunkOffsets()) { for (Long offset : fileInfo.getChunkOffsets()) {
indexOutputStream.writeLong(offset); indexOutputStream.writeLong(offset);

View File

@ -37,7 +37,7 @@ import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMod
import org.apache.celeborn.common.protocol.message.StatusCode import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.common.unsafe.Platform import org.apache.celeborn.common.unsafe.Platform
import org.apache.celeborn.common.util.PackedPartitionId import org.apache.celeborn.common.util.PackedPartitionId
import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, LocalFlusher, MapPartitionFileWriter, StorageManager} import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, HdfsFlusher, LocalFlusher, MapPartitionFileWriter, StorageManager}
class PushDataHandler extends BaseMessageHandler with Logging { class PushDataHandler extends BaseMessageHandler with Logging {
@ -224,9 +224,14 @@ class PushDataHandler extends BaseMessageHandler with Logging {
callback.onFailure(new Exception(message, exception)) callback.onFailure(new Exception(message, exception))
return return
} }
val diskFull = workerInfo.diskInfos val diskFull =
.get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint) if (fileWriter.flusher.isInstanceOf[LocalFlusher]) {
.actualUsableSpace < diskReserveSize workerInfo.diskInfos
.get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
.actualUsableSpace < diskReserveSize
} else {
false
}
if ((diskFull && fileWriter.getFileInfo.getFileLength > partitionSplitMinimumSize) || if ((diskFull && fileWriter.getFileInfo.getFileLength > partitionSplitMinimumSize) ||
(isMaster && fileWriter.getFileInfo.getFileLength > fileWriter.getSplitThreshold())) { (isMaster && fileWriter.getFileInfo.getFileLength > fileWriter.getSplitThreshold())) {
if (fileWriter.getSplitMode == PartitionSplitMode.SOFT) { if (fileWriter.getSplitMode == PartitionSplitMode.SOFT) {
@ -859,6 +864,9 @@ class PushDataHandler extends BaseMessageHandler with Logging {
} }
private def checkDiskFull(fileWriter: FileWriter): Boolean = { private def checkDiskFull(fileWriter: FileWriter): Boolean = {
if (fileWriter.flusher.isInstanceOf[HdfsFlusher]) {
return false
}
val diskFull = workerInfo.diskInfos val diskFull = workerInfo.diskInfos
.get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint) .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
.actualUsableSpace < diskReserveSize .actualUsableSpace < diskReserveSize

View File

@ -20,7 +20,7 @@ package org.apache.celeborn.service.deploy.worker.storage
import java.nio.channels.FileChannel import java.nio.channels.FileChannel
import io.netty.buffer.{ByteBufUtil, CompositeByteBuf} import io.netty.buffer.{ByteBufUtil, CompositeByteBuf}
import org.apache.hadoop.fs.FSDataOutputStream import org.apache.hadoop.fs.{FSDataOutputStream, Path}
abstract private[worker] class FlushTask( abstract private[worker] class FlushTask(
val buffer: CompositeByteBuf, val buffer: CompositeByteBuf,
@ -44,9 +44,11 @@ private[worker] class LocalFlushTask(
private[worker] class HdfsFlushTask( private[worker] class HdfsFlushTask(
buffer: CompositeByteBuf, buffer: CompositeByteBuf,
fsStream: FSDataOutputStream, val path: Path,
notifier: FlushNotifier) extends FlushTask(buffer, notifier) { notifier: FlushNotifier) extends FlushTask(buffer, notifier) {
override def flush(): Unit = { override def flush(): Unit = {
fsStream.write(ByteBufUtil.getBytes(buffer)) val hdfsStream = StorageManager.hadoopFs.append(path)
hdfsStream.write(ByteBufUtil.getBytes(buffer))
hdfsStream.close()
} }
} }

View File

@ -223,11 +223,11 @@ final private[worker] class HdfsFlusher(
hdfsFlusherThreads, hdfsFlusherThreads,
flushAvgTimeWindowSize, flushAvgTimeWindowSize,
avgFlushTimeSlidingWindowMinCount) with Logging { avgFlushTimeSlidingWindowMinCount) with Logging {
override def toString: String = s"HdfsFlusher@$flusherId" override def toString: String = s"HdfsFlusher@$flusherId"
override def processIOException(e: IOException, deviceErrorType: DiskStatus): Unit = { override def processIOException(e: IOException, deviceErrorType: DiskStatus): Unit = {
stopAndCleanFlusher() stopAndCleanFlusher()
logError(s"$this write failed, reason $deviceErrorType ,exception: $e") logError(s"$this write failed, reason $deviceErrorType ,exception: $e")
} }
} }

View File

@ -45,7 +45,7 @@ import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMod
import org.apache.celeborn.common.quota.ResourceConsumption import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.util.{PbSerDeUtils, ThreadUtils, Utils} import org.apache.celeborn.common.util.{PbSerDeUtils, ThreadUtils, Utils}
import org.apache.celeborn.service.deploy.worker._ import org.apache.celeborn.service.deploy.worker._
import org.apache.celeborn.service.deploy.worker.storage.StorageManager.hdfsFs import org.apache.celeborn.service.deploy.worker.storage.StorageManager.hadoopFs
final private[worker] class StorageManager(conf: CelebornConf, workerSource: AbstractSource) final private[worker] class StorageManager(conf: CelebornConf, workerSource: AbstractSource)
extends ShuffleRecoverHelper with DeviceObserver with Logging with MemoryPressureListener { extends ShuffleRecoverHelper with DeviceObserver with Logging with MemoryPressureListener {
@ -129,7 +129,10 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
val hdfsConfiguration = new Configuration val hdfsConfiguration = new Configuration
hdfsConfiguration.set("fs.defaultFS", hdfsDir) hdfsConfiguration.set("fs.defaultFS", hdfsDir)
hdfsConfiguration.set("dfs.replication", "2") hdfsConfiguration.set("dfs.replication", "2")
StorageManager.hdfsFs = FileSystem.get(hdfsConfiguration) hdfsConfiguration.set("fs.hdfs.impl.disable.cache", "false")
logInfo("Celeborn will ignore cluster settings" +
" about fs.hdfs.impl.disable.cache and set it to false")
StorageManager.hadoopFs = FileSystem.get(hdfsConfiguration)
Some(new HdfsFlusher( Some(new HdfsFlusher(
workerSource, workerSource,
conf.hdfsFlusherThreads, conf.hdfsFlusherThreads,
@ -270,7 +273,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
if (diskInfo != null && diskInfo.status.equals(DiskStatus.HEALTHY)) { if (diskInfo != null && diskInfo.status.equals(DiskStatus.HEALTHY)) {
diskInfo.dirs diskInfo.dirs
} else { } else {
logWarning(s"Disk unavailable for $suggestedMountPoint, return all healthy" + logDebug(s"Disk unavailable for $suggestedMountPoint, return all healthy" +
s" working dirs. diskInfo $diskInfo") s" working dirs. diskInfo $diskInfo")
healthyWorkingDirs() healthyWorkingDirs()
} }
@ -281,7 +284,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
if (dirs.isEmpty) { if (dirs.isEmpty) {
val shuffleDir = val shuffleDir =
new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId") new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId")
FileSystem.mkdirs(StorageManager.hdfsFs, shuffleDir, hdfsPermission) FileSystem.mkdirs(StorageManager.hadoopFs, shuffleDir, hdfsPermission)
val fileInfo = val fileInfo =
new FileInfo(new Path(shuffleDir, fileName).toString, userIdentifier, partitionType) new FileInfo(new Path(shuffleDir, fileName).toString, userIdentifier, partitionType)
val hdfsWriter = partitionType match { val hdfsWriter = partitionType match {
@ -420,7 +423,16 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
deleteDirectory(file, diskOperators.get(diskInfo.mountPoint)) deleteDirectory(file, diskOperators.get(diskInfo.mountPoint))
} }
} }
hdfsInfos.foreach(item => item._2.deleteAllFiles(StorageManager.hdfsFs)) hdfsInfos.foreach(item => item._2.deleteAllFiles(StorageManager.hadoopFs))
if (!hdfsInfos.isEmpty) {
try {
StorageManager.hadoopFs.delete(
new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId"),
true)
} catch {
case e: Exception => logWarning("Clean expired hdfs shuffle failed.", e)
}
}
} }
} }
@ -465,14 +477,14 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
} }
} }
if (hdfsFs != null) { if (hadoopFs != null) {
val hdfsWorkPath = new Path(hdfsDir, conf.workerWorkingDir) val hdfsWorkPath = new Path(hdfsDir, conf.workerWorkingDir)
if (hdfsFs.exists(hdfsWorkPath)) { if (hadoopFs.exists(hdfsWorkPath)) {
val iter = hdfsFs.listFiles(hdfsWorkPath, false) val iter = hadoopFs.listFiles(hdfsWorkPath, false)
while (iter.hasNext) { while (iter.hasNext) {
val fileStatus = iter.next() val fileStatus = iter.next()
if (fileStatus.getModificationTime < expireTime) { if (fileStatus.getModificationTime < expireTime) {
hdfsFs.delete(fileStatus.getPath, true) hadoopFs.delete(fileStatus.getPath, true)
} }
} }
} }
@ -528,7 +540,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
} }
} }
val hdfsCleaned = hdfsFs match { val hdfsCleaned = hadoopFs match {
case hdfs: FileSystem => case hdfs: FileSystem =>
val hdfsWorkPath = new Path(hdfsDir, conf.workerWorkingDir) val hdfsWorkPath = new Path(hdfsDir, conf.workerWorkingDir)
// hdfs path not exist when first time initialize // hdfs path not exist when first time initialize
@ -670,5 +682,5 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
} }
object StorageManager { object StorageManager {
var hdfsFs: FileSystem = _ var hadoopFs: FileSystem = _
} }