diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 53aa6d2f7..d0ea3afa7 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -871,6 +871,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se } def partitionSplitMinimumSize: Long = get(WORKER_PARTITION_SPLIT_MIN_SIZE) + def partitionSplitMaximumSize: Long = get(WORKER_PARTITION_SPLIT_MAX_SIZE) def hdfsDir: String = { get(HDFS_DIR).map { @@ -1931,6 +1932,14 @@ object CelebornConf extends Logging { .bytesConf(ByteUnit.BYTE) .createWithDefaultString("1m") + val WORKER_PARTITION_SPLIT_MAX_SIZE: ConfigEntry[Long] = + buildConf("celeborn.worker.shuffle.partitionSplit.max") + .categories("worker") + .doc("Specify the maximum partition size for splitting, and ensure that individual partition files are always smaller than this limit.") + .version("0.3.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("2g") + val WORKER_STORAGE_DIRS: OptionalConfigEntry[Seq[String]] = buildConf("celeborn.worker.storage.dirs") .categories("worker") diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 0a1372786..372f91cf4 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -85,6 +85,7 @@ license: | | celeborn.worker.replicate.threads | 64 | Thread number of worker to replicate shuffle data. | 0.2.0 | | celeborn.worker.rpc.port | 0 | Server port for Worker to receive RPC request. | 0.2.0 | | celeborn.worker.shuffle.partitionSplit.enabled | true | enable the partition split on worker side | 0.3.0 | +| celeborn.worker.shuffle.partitionSplit.max | 2g | Specify the maximum partition size for splitting, and ensure that individual partition files are always smaller than this limit. | 0.3.0 | | celeborn.worker.shuffle.partitionSplit.min | 1m | Min size for a partition to split | 0.3.0 | | celeborn.worker.sortPartition.reservedMemoryPerPartition | 1mb | Reserved memory when sorting a shuffle file off-heap. | 0.3.0 | | celeborn.worker.sortPartition.threads | <undefined> | PartitionSorter's thread counts. It's recommended to set at least `64` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 | diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index cddb17032..fe125cd41 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -54,6 +54,7 @@ class PushDataHandler extends BaseMessageHandler with Logging { private var workerInfo: WorkerInfo = _ private var diskReserveSize: Long = _ private var partitionSplitMinimumSize: Long = _ + private var partitionSplitMaximumSize: Long = _ private var shutdown: AtomicBoolean = _ private var storageManager: StorageManager = _ private var workerPartitionSplitEnabled: Boolean = _ @@ -75,6 +76,7 @@ class PushDataHandler extends BaseMessageHandler with Logging { workerInfo = worker.workerInfo diskReserveSize = worker.conf.workerDiskReserveSize partitionSplitMinimumSize = worker.conf.partitionSplitMinimumSize + partitionSplitMaximumSize = worker.conf.partitionSplitMaximumSize storageManager = worker.storageManager shutdown = worker.shutdown workerPartitionSplitEnabled = worker.conf.workerPartitionSplitEnabled @@ -1080,7 +1082,8 @@ class PushDataHandler extends BaseMessageHandler with Logging { val diskFull = checkDiskFull(fileWriter) if (workerPartitionSplitEnabled && ((diskFull && fileWriter.getFileInfo.getFileLength > partitionSplitMinimumSize) || (isPrimary && fileWriter.getFileInfo.getFileLength > fileWriter.getSplitThreshold()))) { - if (softSplit != null && fileWriter.getSplitMode == PartitionSplitMode.SOFT) { + if (softSplit != null && fileWriter.getSplitMode == PartitionSplitMode.SOFT && + (fileWriter.getFileInfo.getFileLength < partitionSplitMaximumSize)) { softSplit.set(true) } else { callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))