From 450dac82454ecc92b984b32b75818665fee2a032 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Mon, 3 Jun 2024 17:47:01 +0800 Subject: [PATCH] [CELEBORN-1447] Support configuring thread number of worker to wait for commit shuffle data files to finish ### What changes were proposed in this pull request? Introduce `celeborn.worker.commitFiles.wait.threads` to support configuring thread number of worker to wait for commit shuffle data files to finish. ### Why are the changes needed? `celeborn.worker.commitFiles.threads` supports the configuration that is the thread number of worker to commit shuffle data files asynchronously including waiting for commit files to finish at present. It should support to configure thread number of waiting for commit shuffle data files to finish which avoids the situation where the commit thread pool is waiting for commit files and no thread could commit shuffle data files. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA. Closes #2539 from SteNicholas/CELEBORN-1447. Authored-by: SteNicholas Signed-off-by: mingji --- .../scala/org/apache/celeborn/common/CelebornConf.scala | 9 +++++++++ docs/configuration/worker.md | 1 + .../celeborn/service/deploy/worker/Controller.scala | 5 ++++- .../apache/celeborn/service/deploy/worker/Worker.scala | 4 ++++ 4 files changed, 18 insertions(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index e4e7b9026..690758b77 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -805,6 +805,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def workerReplicateThreads: Int = get(WORKER_REPLICATE_THREADS) def workerCommitThreads: Int = if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else get(WORKER_COMMIT_THREADS) + def workerCommitFilesWaitThreads: Int = get(WORKER_COMMIT_FILES_WAIT_THREADS) def workerCleanThreads: Int = get(WORKER_CLEAN_THREADS) def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT) def maxPartitionSizeToEstimate: Long = @@ -2907,6 +2908,14 @@ object CelebornConf extends Logging { .intConf .createWithDefault(32) + val WORKER_COMMIT_FILES_WAIT_THREADS: ConfigEntry[Int] = + buildConf("celeborn.worker.commitFiles.wait.threads") + .categories("worker") + .version("0.5.0") + .doc("Thread number of worker to wait for commit shuffle data files to finish.") + .intConf + .createWithDefault(32) + val WORKER_CLEAN_THREADS: ConfigEntry[Int] = buildConf("celeborn.worker.clean.threads") .categories("worker") diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index ee6756131..30d27f780 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -51,6 +51,7 @@ license: | | celeborn.worker.closeIdleConnections | false | false | Whether worker will close idle connections. | 0.2.0 | | | celeborn.worker.commitFiles.threads | 32 | false | Thread number of worker to commit shuffle data files asynchronously. It's recommended to set at least `128` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 | celeborn.worker.commit.threads | | celeborn.worker.commitFiles.timeout | 120s | false | Timeout for a Celeborn worker to commit files of a shuffle. It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 | celeborn.worker.shuffle.commit.timeout | +| celeborn.worker.commitFiles.wait.threads | 32 | false | Thread number of worker to wait for commit shuffle data files to finish. | 0.5.0 | | | celeborn.worker.congestionControl.check.interval | 10ms | false | Interval of worker checks congestion if celeborn.worker.congestionControl.enabled is true. | 0.3.2 | | | celeborn.worker.congestionControl.enabled | false | false | Whether to enable congestion control or not. | 0.3.0 | | | celeborn.worker.congestionControl.high.watermark | <undefined> | false | If the total bytes in disk buffer exceeds this configure, will start to congestusers whose produce rate is higher than the potential average consume rate. The congestion will stop if the produce rate is lower or equal to the average consume rate, or the total pending bytes lower than celeborn.worker.congestionControl.low.watermark | 0.3.0 | | diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala index 6d9a44df1..8deac0905 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala @@ -57,6 +57,7 @@ private[deploy] class Controller( var partitionLocationInfo: WorkerPartitionLocationInfo = _ var timer: HashedWheelTimer = _ var commitThreadPool: ThreadPoolExecutor = _ + var waitThreadPool: ThreadPoolExecutor = _ var asyncReplyPool: ScheduledExecutorService = _ val minPartitionSizeToEstimate = conf.minPartitionSizeToEstimate var shutdown: AtomicBoolean = _ @@ -72,6 +73,7 @@ private[deploy] class Controller( partitionLocationInfo = worker.partitionLocationInfo timer = worker.timer commitThreadPool = worker.commitThreadPool + waitThreadPool = worker.waitThreadPool asyncReplyPool = worker.asyncReplyPool shutdown = worker.shutdown } @@ -431,7 +433,8 @@ private[deploy] class Controller( return } else if (commitInfo.status == CommitInfo.COMMIT_INPROCESS) { logInfo(s"$shuffleKey CommitFiles inprogress, wait for finish") - commitThreadPool.submit(new Runnable { + // should not use commitThreadPool in case of block by commit files. + waitThreadPool.submit(new Runnable { override def run(): Unit = { waitForCommitFinish() } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index bdaab805c..bc815f6d1 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -325,6 +325,8 @@ private[celeborn] class Worker( ThreadUtils.newDaemonCachedThreadPool("worker-data-replicator", conf.workerReplicateThreads) val commitThreadPool: ThreadPoolExecutor = ThreadUtils.newDaemonCachedThreadPool("worker-files-committer", conf.workerCommitThreads) + val waitThreadPool: ThreadPoolExecutor = + ThreadUtils.newDaemonCachedThreadPool("worker-commit-waiter", conf.workerCommitFilesWaitThreads) val cleanThreadPool: ThreadPoolExecutor = ThreadUtils.newDaemonCachedThreadPool( "worker-expired-shuffle-cleaner", @@ -563,11 +565,13 @@ private[celeborn] class Worker( forwardMessageScheduler.shutdown() replicateThreadPool.shutdown() commitThreadPool.shutdown() + waitThreadPool.shutdown(); asyncReplyPool.shutdown() } else { forwardMessageScheduler.shutdownNow() replicateThreadPool.shutdownNow() commitThreadPool.shutdownNow() + waitThreadPool.shutdownNow(); asyncReplyPool.shutdownNow() } workerSource.appActiveConnections.clear()