diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 429ad5ec7..00c153940 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -647,6 +647,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else get(WORKER_COMMIT_THREADS) def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT) def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE) + def partitionSorterLazyRemovalOfOriginalFilesEnabled: Boolean = + get(PARTITION_SORTER_LAZY_REMOVAL_OF_ORIGINAL_FILES_ENABLED) def partitionSorterSortPartitionTimeout: Long = get(PARTITION_SORTER_SORT_TIMEOUT) def partitionSorterReservedMemoryPerPartition: Long = get(WORKER_PARTITION_SORTER_PER_PARTITION_RESERVED_MEMORY) @@ -2209,6 +2211,21 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("120s") + val PARTITION_SORTER_LAZY_REMOVAL_OF_ORIGINAL_FILES_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled") + .categories("worker") + .doc("When set to false, the PartitionSorter immediately removes the original file once " + + "its partition has been successfully sorted. It is important to note that this behavior " + + "may result in a potential issue with the ReusedExchange operation when it triggers both " + + "non-range and range fetch requests simultaneously. see CELEBORN-980 for more details." + + "When set to true, the PartitionSorter will retain the original unsorted file. However, " + + "it's essential to be aware that enabling this option may lead to an increase in storage " + + "space usage during the range fetch phase, as both the original and sorted files will be " + + "retained until the shuffle is finished.") + .version("0.3.2") + .booleanConf + .createWithDefault(true) + val PARTITION_SORTER_SORT_TIMEOUT: ConfigEntry[Long] = buildConf("celeborn.worker.sortPartition.timeout") .withAlternative("celeborn.worker.partitionSorter.sort.timeout") diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 3833726d5..8c0111523 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -93,6 +93,7 @@ license: | | celeborn.worker.shuffle.partitionSplit.enabled | true | enable the partition split on worker side | 0.3.0 | | celeborn.worker.shuffle.partitionSplit.max | 2g | Specify the maximum partition size for splitting, and ensure that individual partition files are always smaller than this limit. | 0.3.0 | | celeborn.worker.shuffle.partitionSplit.min | 1m | Min size for a partition to split | 0.3.0 | +| celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled | true | When set to false, the PartitionSorter immediately removes the original file once its partition has been successfully sorted. It is important to note that this behavior may result in a potential issue with the ReusedExchange operation when it triggers both non-range and range fetch requests simultaneously. see CELEBORN-980 for more details.When set to true, the PartitionSorter will retain the original unsorted file. However, it's essential to be aware that enabling this option may lead to an increase in storage space usage during the range fetch phase, as both the original and sorted files will be retained until the shuffle is finished. | 0.3.2 | | celeborn.worker.sortPartition.reservedMemoryPerPartition | 1mb | Reserved memory when sorting a shuffle file off-heap. | 0.3.0 | | celeborn.worker.sortPartition.threads | <undefined> | PartitionSorter's thread counts. It's recommended to set at least `64` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 | | celeborn.worker.sortPartition.timeout | 220s | Timeout for a shuffle file to sort. | 0.3.0 | diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index c729c8402..ba6b76b56 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -77,6 +77,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { private final AtomicInteger sortedFileCount = new AtomicInteger(); private final AtomicLong sortedFilesSize = new AtomicLong(); + protected final boolean lazyRemovalOfOriginalFilesEnabled; protected final long sortTimeout; protected final long shuffleChunkSize; protected final long reservedMemoryPerPartition; @@ -92,6 +93,8 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { public PartitionFilesSorter( MemoryManager memoryManager, CelebornConf conf, AbstractSource source) { + this.lazyRemovalOfOriginalFilesEnabled = + conf.partitionSorterLazyRemovalOfOriginalFilesEnabled(); this.sortTimeout = conf.partitionSorterSortPartitionTimeout(); this.shuffleChunkSize = conf.shuffleChunkSize(); this.reservedMemoryPerPartition = conf.partitionSorterReservedMemoryPerPartition(); @@ -593,7 +596,9 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { writeIndex(sortedBlockInfoMap, indexFilePath, isHdfs); updateSortedShuffleFiles(shuffleKey, fileId, originFileLen); - deleteOriginFiles(); + if (!lazyRemovalOfOriginalFilesEnabled) { + deleteOriginFiles(); + } logger.debug("sort complete for {} {}", shuffleKey, originFilePath); } catch (Exception e) { logger.error(