[CELEBORN-785] Add worker side partition hard split threshold

### What changes were proposed in this pull request?
Add a configuration `celeborn.worker.shuffle.partitionSplit.max`  to ensure that, in soft mode, individual partition files are limited to a size smaller than the configured value

### Why are the changes needed?

In soft mode, there may be situations where individual partition files are exceptionally large, which can result in excessively long sort times in skewed scenarios.

### Does this PR introduce _any_ user-facing change?
`celeborn.worker.shuffle.partitionSplit.max` defalut value 2g

### How was this patch tested?
none

Closes #1701 from JQ-Cao/785.

Authored-by: caojiaqing <caojiaqing@bilibili.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
This commit is contained in:
caojiaqing 2023-07-11 14:14:41 +08:00 committed by zky.zhoukeyong
parent 7a47fae230
commit d64e0091f1
3 changed files with 14 additions and 1 deletions

View File

@ -871,6 +871,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
}
def partitionSplitMinimumSize: Long = get(WORKER_PARTITION_SPLIT_MIN_SIZE)
def partitionSplitMaximumSize: Long = get(WORKER_PARTITION_SPLIT_MAX_SIZE)
def hdfsDir: String = {
get(HDFS_DIR).map {
@ -1931,6 +1932,14 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1m")
val WORKER_PARTITION_SPLIT_MAX_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.shuffle.partitionSplit.max")
.categories("worker")
.doc("Specify the maximum partition size for splitting, and ensure that individual partition files are always smaller than this limit.")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("2g")
val WORKER_STORAGE_DIRS: OptionalConfigEntry[Seq[String]] =
buildConf("celeborn.worker.storage.dirs")
.categories("worker")

View File

@ -85,6 +85,7 @@ license: |
| celeborn.worker.replicate.threads | 64 | Thread number of worker to replicate shuffle data. | 0.2.0 |
| celeborn.worker.rpc.port | 0 | Server port for Worker to receive RPC request. | 0.2.0 |
| celeborn.worker.shuffle.partitionSplit.enabled | true | enable the partition split on worker side | 0.3.0 |
| celeborn.worker.shuffle.partitionSplit.max | 2g | Specify the maximum partition size for splitting, and ensure that individual partition files are always smaller than this limit. | 0.3.0 |
| celeborn.worker.shuffle.partitionSplit.min | 1m | Min size for a partition to split | 0.3.0 |
| celeborn.worker.sortPartition.reservedMemoryPerPartition | 1mb | Reserved memory when sorting a shuffle file off-heap. | 0.3.0 |
| celeborn.worker.sortPartition.threads | &lt;undefined&gt; | PartitionSorter's thread counts. It's recommended to set at least `64` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 |

View File

@ -54,6 +54,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
private var workerInfo: WorkerInfo = _
private var diskReserveSize: Long = _
private var partitionSplitMinimumSize: Long = _
private var partitionSplitMaximumSize: Long = _
private var shutdown: AtomicBoolean = _
private var storageManager: StorageManager = _
private var workerPartitionSplitEnabled: Boolean = _
@ -75,6 +76,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
workerInfo = worker.workerInfo
diskReserveSize = worker.conf.workerDiskReserveSize
partitionSplitMinimumSize = worker.conf.partitionSplitMinimumSize
partitionSplitMaximumSize = worker.conf.partitionSplitMaximumSize
storageManager = worker.storageManager
shutdown = worker.shutdown
workerPartitionSplitEnabled = worker.conf.workerPartitionSplitEnabled
@ -1080,7 +1082,8 @@ class PushDataHandler extends BaseMessageHandler with Logging {
val diskFull = checkDiskFull(fileWriter)
if (workerPartitionSplitEnabled && ((diskFull && fileWriter.getFileInfo.getFileLength > partitionSplitMinimumSize) ||
(isPrimary && fileWriter.getFileInfo.getFileLength > fileWriter.getSplitThreshold()))) {
if (softSplit != null && fileWriter.getSplitMode == PartitionSplitMode.SOFT) {
if (softSplit != null && fileWriter.getSplitMode == PartitionSplitMode.SOFT &&
(fileWriter.getFileInfo.getFileLength < partitionSplitMaximumSize)) {
softSplit.set(true)
} else {
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))