From 91077791741dfeec8d672395bba3183496ebb20b Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Fri, 19 Jan 2024 09:46:27 +0800 Subject: [PATCH] [CELEBORN-1225][FOLLOWUP] Worker should build replicate factory to get client for sending replicate data ### What changes were proposed in this pull request? `PushDataHandler` should build replicate factory to get client for sending replicate data instead of push client factory. Meanwhile, timeout checker of `TransportResponseHandler` should run with `replicate` module instead of `push`. Follow up #2232. ### Why are the changes needed? `PushDataHandler` uses push client factory to create client for replicating, which should use replicate factory, otherwise replicate module configuration does not take effect for replicating of worker server. Meanwhile, timeout checker of `TransportResponseHandler` runs with `push` module, which does not work well with replicate client for worker. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA and cluster. Closes #2241 from SteNicholas/CELEBORN-1225. Authored-by: SteNicholas Signed-off-by: mingji --- .../common/network/client/TransportResponseHandler.java | 9 ++++++--- .../scala/org/apache/celeborn/common/CelebornConf.scala | 8 ++++++-- docs/configuration/network.md | 4 ++-- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java index ddace8d87..48193549d 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java +++ b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java @@ -86,7 +86,8 @@ public class TransportResponseHandler extends MessageHandler { if (TransportModuleConstants.DATA_MODULE.equals(module)) { checkPushTimeout = true; checkFetchTimeout = true; - } else if (TransportModuleConstants.PUSH_MODULE.equals(module)) { + } else if (TransportModuleConstants.PUSH_MODULE.equals(module) + || TransportModuleConstants.REPLICATE_MODULE.equals(module)) { checkPushTimeout = true; } synchronized (TransportResponseHandler.class) { @@ -137,11 +138,13 @@ public class TransportResponseHandler extends MessageHandler { if (info.channelFuture != null) { info.channelFuture.cancel(true); } + String module = conf.getModuleName(); // When module name equals to DATA_MODULE, mean shuffle client push data, else means // do data replication. - if (TransportModuleConstants.DATA_MODULE.equals(conf.getModuleName())) { + if (TransportModuleConstants.DATA_MODULE.equals(module)) { info.callback.onFailure(new CelebornIOException(StatusCode.PUSH_DATA_TIMEOUT_PRIMARY)); - } else if (TransportModuleConstants.PUSH_MODULE.equals(conf.getModuleName())) { + } else if (TransportModuleConstants.PUSH_MODULE.equals(module) + || TransportModuleConstants.REPLICATE_MODULE.equals(module)) { info.callback.onFailure(new CelebornIOException(StatusCode.PUSH_DATA_TIMEOUT_REPLICA)); } info.channelFuture = null; diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 7b2cfe473..9c94fa2a0 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1643,7 +1643,9 @@ object CelebornConf extends Logging { s"If setting to `${TransportModuleConstants.DATA_MODULE}`, " + s"it works for shuffle client push data. " + s"If setting to `${TransportModuleConstants.PUSH_MODULE}`, " + - s"it works for Flink shuffle client push data.") + s"it works for Flink shuffle client push data. " + + s"If setting to `${TransportModuleConstants.REPLICATE_MODULE}`, " + + s"it works for replicate client of worker replicating data to peer worker.") .version("0.3.0") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("5s") @@ -1655,7 +1657,9 @@ object CelebornConf extends Logging { s"If setting to `${TransportModuleConstants.DATA_MODULE}`, " + s"it works for shuffle client push data. " + s"If setting to `${TransportModuleConstants.PUSH_MODULE}`, " + - s"it works for Flink shuffle client push data.") + s"it works for Flink shuffle client push data. " + + s"If setting to `${TransportModuleConstants.REPLICATE_MODULE}`, " + + s"it works for replicate client of worker replicating data to peer worker.") .version("0.3.0") .intConf .createWithDefault(4) diff --git a/docs/configuration/network.md b/docs/configuration/network.md index 71b0bffdb..2d6dabdc8 100644 --- a/docs/configuration/network.md +++ b/docs/configuration/network.md @@ -37,8 +37,8 @@ license: | | celeborn.<module>.io.saslTimeout | 30s | Timeout for a single round trip of auth message exchange, in milliseconds. | 0.5.0 | | celeborn.<module>.io.sendBuffer | 0b | Send buffer size (SO_SNDBUF). If setting to `rpc`, it works for shuffle client, master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | 0.2.0 | | celeborn.<module>.io.serverThreads | 0 | Number of threads used in the server thread pool. Default to 0, which is 2x#cores. If setting to `rpc`, it works for master or worker. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | | -| celeborn.<module>.push.timeoutCheck.interval | 5s | Interval for checking push data timeout. If setting to `data`, it works for shuffle client push data. If setting to `push`, it works for Flink shuffle client push data. | 0.3.0 | -| celeborn.<module>.push.timeoutCheck.threads | 4 | Threads num for checking push data timeout. If setting to `data`, it works for shuffle client push data. If setting to `push`, it works for Flink shuffle client push data. | 0.3.0 | +| celeborn.<module>.push.timeoutCheck.interval | 5s | Interval for checking push data timeout. If setting to `data`, it works for shuffle client push data. If setting to `push`, it works for Flink shuffle client push data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. | 0.3.0 | +| celeborn.<module>.push.timeoutCheck.threads | 4 | Threads num for checking push data timeout. If setting to `data`, it works for shuffle client push data. If setting to `push`, it works for Flink shuffle client push data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. | 0.3.0 | | celeborn.<role>.rpc.dispatcher.threads | <value of celeborn.rpc.dispatcher.threads> | Threads number of message dispatcher event loop for roles | | | celeborn.io.maxDefaultNettyThreads | 64 | Max default netty threads | 0.3.2 | | celeborn.network.bind.preferIpAddress | true | When `ture`, prefer to use IP address, otherwise FQDN. This configuration only takes effects when the bind hostname is not set explicitly, in such case, Celeborn will find the first non-loopback address to bind. | 0.3.0 |