[CELEBORN-1447] Support configuring thread number of worker to wait for commit shuffle data files to finish
### What changes were proposed in this pull request? Introduce `celeborn.worker.commitFiles.wait.threads` to support configuring thread number of worker to wait for commit shuffle data files to finish. ### Why are the changes needed? `celeborn.worker.commitFiles.threads` supports the configuration that is the thread number of worker to commit shuffle data files asynchronously including waiting for commit files to finish at present. It should support to configure thread number of waiting for commit shuffle data files to finish which avoids the situation where the commit thread pool is waiting for commit files and no thread could commit shuffle data files. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA. Closes #2539 from SteNicholas/CELEBORN-1447. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
parent
e5f09ce4e0
commit
450dac8245
@ -805,6 +805,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
|
||||
def workerReplicateThreads: Int = get(WORKER_REPLICATE_THREADS)
|
||||
def workerCommitThreads: Int =
|
||||
if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else get(WORKER_COMMIT_THREADS)
|
||||
def workerCommitFilesWaitThreads: Int = get(WORKER_COMMIT_FILES_WAIT_THREADS)
|
||||
def workerCleanThreads: Int = get(WORKER_CLEAN_THREADS)
|
||||
def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
|
||||
def maxPartitionSizeToEstimate: Long =
|
||||
@ -2907,6 +2908,14 @@ object CelebornConf extends Logging {
|
||||
.intConf
|
||||
.createWithDefault(32)
|
||||
|
||||
val WORKER_COMMIT_FILES_WAIT_THREADS: ConfigEntry[Int] =
|
||||
buildConf("celeborn.worker.commitFiles.wait.threads")
|
||||
.categories("worker")
|
||||
.version("0.5.0")
|
||||
.doc("Thread number of worker to wait for commit shuffle data files to finish.")
|
||||
.intConf
|
||||
.createWithDefault(32)
|
||||
|
||||
val WORKER_CLEAN_THREADS: ConfigEntry[Int] =
|
||||
buildConf("celeborn.worker.clean.threads")
|
||||
.categories("worker")
|
||||
|
||||
@ -51,6 +51,7 @@ license: |
|
||||
| celeborn.worker.closeIdleConnections | false | false | Whether worker will close idle connections. | 0.2.0 | |
|
||||
| celeborn.worker.commitFiles.threads | 32 | false | Thread number of worker to commit shuffle data files asynchronously. It's recommended to set at least `128` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 | celeborn.worker.commit.threads |
|
||||
| celeborn.worker.commitFiles.timeout | 120s | false | Timeout for a Celeborn worker to commit files of a shuffle. It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 | celeborn.worker.shuffle.commit.timeout |
|
||||
| celeborn.worker.commitFiles.wait.threads | 32 | false | Thread number of worker to wait for commit shuffle data files to finish. | 0.5.0 | |
|
||||
| celeborn.worker.congestionControl.check.interval | 10ms | false | Interval of worker checks congestion if celeborn.worker.congestionControl.enabled is true. | 0.3.2 | |
|
||||
| celeborn.worker.congestionControl.enabled | false | false | Whether to enable congestion control or not. | 0.3.0 | |
|
||||
| celeborn.worker.congestionControl.high.watermark | <undefined> | false | If the total bytes in disk buffer exceeds this configure, will start to congestusers whose produce rate is higher than the potential average consume rate. The congestion will stop if the produce rate is lower or equal to the average consume rate, or the total pending bytes lower than celeborn.worker.congestionControl.low.watermark | 0.3.0 | |
|
||||
|
||||
@ -57,6 +57,7 @@ private[deploy] class Controller(
|
||||
var partitionLocationInfo: WorkerPartitionLocationInfo = _
|
||||
var timer: HashedWheelTimer = _
|
||||
var commitThreadPool: ThreadPoolExecutor = _
|
||||
var waitThreadPool: ThreadPoolExecutor = _
|
||||
var asyncReplyPool: ScheduledExecutorService = _
|
||||
val minPartitionSizeToEstimate = conf.minPartitionSizeToEstimate
|
||||
var shutdown: AtomicBoolean = _
|
||||
@ -72,6 +73,7 @@ private[deploy] class Controller(
|
||||
partitionLocationInfo = worker.partitionLocationInfo
|
||||
timer = worker.timer
|
||||
commitThreadPool = worker.commitThreadPool
|
||||
waitThreadPool = worker.waitThreadPool
|
||||
asyncReplyPool = worker.asyncReplyPool
|
||||
shutdown = worker.shutdown
|
||||
}
|
||||
@ -431,7 +433,8 @@ private[deploy] class Controller(
|
||||
return
|
||||
} else if (commitInfo.status == CommitInfo.COMMIT_INPROCESS) {
|
||||
logInfo(s"$shuffleKey CommitFiles inprogress, wait for finish")
|
||||
commitThreadPool.submit(new Runnable {
|
||||
// should not use commitThreadPool in case of block by commit files.
|
||||
waitThreadPool.submit(new Runnable {
|
||||
override def run(): Unit = {
|
||||
waitForCommitFinish()
|
||||
}
|
||||
|
||||
@ -325,6 +325,8 @@ private[celeborn] class Worker(
|
||||
ThreadUtils.newDaemonCachedThreadPool("worker-data-replicator", conf.workerReplicateThreads)
|
||||
val commitThreadPool: ThreadPoolExecutor =
|
||||
ThreadUtils.newDaemonCachedThreadPool("worker-files-committer", conf.workerCommitThreads)
|
||||
val waitThreadPool: ThreadPoolExecutor =
|
||||
ThreadUtils.newDaemonCachedThreadPool("worker-commit-waiter", conf.workerCommitFilesWaitThreads)
|
||||
val cleanThreadPool: ThreadPoolExecutor =
|
||||
ThreadUtils.newDaemonCachedThreadPool(
|
||||
"worker-expired-shuffle-cleaner",
|
||||
@ -563,11 +565,13 @@ private[celeborn] class Worker(
|
||||
forwardMessageScheduler.shutdown()
|
||||
replicateThreadPool.shutdown()
|
||||
commitThreadPool.shutdown()
|
||||
waitThreadPool.shutdown();
|
||||
asyncReplyPool.shutdown()
|
||||
} else {
|
||||
forwardMessageScheduler.shutdownNow()
|
||||
replicateThreadPool.shutdownNow()
|
||||
commitThreadPool.shutdownNow()
|
||||
waitThreadPool.shutdownNow();
|
||||
asyncReplyPool.shutdownNow()
|
||||
}
|
||||
workerSource.appActiveConnections.clear()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user