diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index e4e7b9026..690758b77 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -805,6 +805,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def workerReplicateThreads: Int = get(WORKER_REPLICATE_THREADS) def workerCommitThreads: Int = if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else get(WORKER_COMMIT_THREADS) + def workerCommitFilesWaitThreads: Int = get(WORKER_COMMIT_FILES_WAIT_THREADS) def workerCleanThreads: Int = get(WORKER_CLEAN_THREADS) def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT) def maxPartitionSizeToEstimate: Long = @@ -2907,6 +2908,14 @@ object CelebornConf extends Logging { .intConf .createWithDefault(32) + val WORKER_COMMIT_FILES_WAIT_THREADS: ConfigEntry[Int] = + buildConf("celeborn.worker.commitFiles.wait.threads") + .categories("worker") + .version("0.5.0") + .doc("Thread number of worker to wait for commit shuffle data files to finish.") + .intConf + .createWithDefault(32) + val WORKER_CLEAN_THREADS: ConfigEntry[Int] = buildConf("celeborn.worker.clean.threads") .categories("worker") diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index ee6756131..30d27f780 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -51,6 +51,7 @@ license: | | celeborn.worker.closeIdleConnections | false | false | Whether worker will close idle connections. | 0.2.0 | | | celeborn.worker.commitFiles.threads | 32 | false | Thread number of worker to commit shuffle data files asynchronously. It's recommended to set at least `128` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 | celeborn.worker.commit.threads | | celeborn.worker.commitFiles.timeout | 120s | false | Timeout for a Celeborn worker to commit files of a shuffle. It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 | celeborn.worker.shuffle.commit.timeout | +| celeborn.worker.commitFiles.wait.threads | 32 | false | Thread number of worker to wait for commit shuffle data files to finish. | 0.5.0 | | | celeborn.worker.congestionControl.check.interval | 10ms | false | Interval of worker checks congestion if celeborn.worker.congestionControl.enabled is true. | 0.3.2 | | | celeborn.worker.congestionControl.enabled | false | false | Whether to enable congestion control or not. | 0.3.0 | | | celeborn.worker.congestionControl.high.watermark | <undefined> | false | If the total bytes in disk buffer exceeds this configure, will start to congestusers whose produce rate is higher than the potential average consume rate. The congestion will stop if the produce rate is lower or equal to the average consume rate, or the total pending bytes lower than celeborn.worker.congestionControl.low.watermark | 0.3.0 | | diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala index 6d9a44df1..8deac0905 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala @@ -57,6 +57,7 @@ private[deploy] class Controller( var partitionLocationInfo: WorkerPartitionLocationInfo = _ var timer: HashedWheelTimer = _ var commitThreadPool: ThreadPoolExecutor = _ + var waitThreadPool: ThreadPoolExecutor = _ var asyncReplyPool: ScheduledExecutorService = _ val minPartitionSizeToEstimate = conf.minPartitionSizeToEstimate var shutdown: AtomicBoolean = _ @@ -72,6 +73,7 @@ private[deploy] class Controller( partitionLocationInfo = worker.partitionLocationInfo timer = worker.timer commitThreadPool = worker.commitThreadPool + waitThreadPool = worker.waitThreadPool asyncReplyPool = worker.asyncReplyPool shutdown = worker.shutdown } @@ -431,7 +433,8 @@ private[deploy] class Controller( return } else if (commitInfo.status == CommitInfo.COMMIT_INPROCESS) { logInfo(s"$shuffleKey CommitFiles inprogress, wait for finish") - commitThreadPool.submit(new Runnable { + // should not use commitThreadPool in case of block by commit files. + waitThreadPool.submit(new Runnable { override def run(): Unit = { waitForCommitFinish() } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index bdaab805c..bc815f6d1 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -325,6 +325,8 @@ private[celeborn] class Worker( ThreadUtils.newDaemonCachedThreadPool("worker-data-replicator", conf.workerReplicateThreads) val commitThreadPool: ThreadPoolExecutor = ThreadUtils.newDaemonCachedThreadPool("worker-files-committer", conf.workerCommitThreads) + val waitThreadPool: ThreadPoolExecutor = + ThreadUtils.newDaemonCachedThreadPool("worker-commit-waiter", conf.workerCommitFilesWaitThreads) val cleanThreadPool: ThreadPoolExecutor = ThreadUtils.newDaemonCachedThreadPool( "worker-expired-shuffle-cleaner", @@ -563,11 +565,13 @@ private[celeborn] class Worker( forwardMessageScheduler.shutdown() replicateThreadPool.shutdown() commitThreadPool.shutdown() + waitThreadPool.shutdown(); asyncReplyPool.shutdown() } else { forwardMessageScheduler.shutdownNow() replicateThreadPool.shutdownNow() commitThreadPool.shutdownNow() + waitThreadPool.shutdownNow(); asyncReplyPool.shutdownNow() } workerSource.appActiveConnections.clear()