[CELEBORN-1225][FOLLOWUP] Worker should build replicate factory to get client for sending replicate data
### What changes were proposed in this pull request? `PushDataHandler` should build replicate factory to get client for sending replicate data instead of push client factory. Meanwhile, timeout checker of `TransportResponseHandler` should run with `replicate` module instead of `push`. Follow up #2232. ### Why are the changes needed? `PushDataHandler` uses push client factory to create client for replicating, which should use replicate factory, otherwise replicate module configuration does not take effect for replicating of worker server. Meanwhile, timeout checker of `TransportResponseHandler` runs with `push` module, which does not work well with replicate client for worker. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA and cluster. Closes #2241 from SteNicholas/CELEBORN-1225. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
parent
b90fb1fdb2
commit
9107779174
@ -86,7 +86,8 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
|
||||
if (TransportModuleConstants.DATA_MODULE.equals(module)) {
|
||||
checkPushTimeout = true;
|
||||
checkFetchTimeout = true;
|
||||
} else if (TransportModuleConstants.PUSH_MODULE.equals(module)) {
|
||||
} else if (TransportModuleConstants.PUSH_MODULE.equals(module)
|
||||
|| TransportModuleConstants.REPLICATE_MODULE.equals(module)) {
|
||||
checkPushTimeout = true;
|
||||
}
|
||||
synchronized (TransportResponseHandler.class) {
|
||||
@ -137,11 +138,13 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
|
||||
if (info.channelFuture != null) {
|
||||
info.channelFuture.cancel(true);
|
||||
}
|
||||
String module = conf.getModuleName();
|
||||
// When module name equals to DATA_MODULE, mean shuffle client push data, else means
|
||||
// do data replication.
|
||||
if (TransportModuleConstants.DATA_MODULE.equals(conf.getModuleName())) {
|
||||
if (TransportModuleConstants.DATA_MODULE.equals(module)) {
|
||||
info.callback.onFailure(new CelebornIOException(StatusCode.PUSH_DATA_TIMEOUT_PRIMARY));
|
||||
} else if (TransportModuleConstants.PUSH_MODULE.equals(conf.getModuleName())) {
|
||||
} else if (TransportModuleConstants.PUSH_MODULE.equals(module)
|
||||
|| TransportModuleConstants.REPLICATE_MODULE.equals(module)) {
|
||||
info.callback.onFailure(new CelebornIOException(StatusCode.PUSH_DATA_TIMEOUT_REPLICA));
|
||||
}
|
||||
info.channelFuture = null;
|
||||
|
||||
@ -1643,7 +1643,9 @@ object CelebornConf extends Logging {
|
||||
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
|
||||
s"it works for shuffle client push data. " +
|
||||
s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
|
||||
s"it works for Flink shuffle client push data.")
|
||||
s"it works for Flink shuffle client push data. " +
|
||||
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
|
||||
s"it works for replicate client of worker replicating data to peer worker.")
|
||||
.version("0.3.0")
|
||||
.timeConf(TimeUnit.MILLISECONDS)
|
||||
.createWithDefaultString("5s")
|
||||
@ -1655,7 +1657,9 @@ object CelebornConf extends Logging {
|
||||
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
|
||||
s"it works for shuffle client push data. " +
|
||||
s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
|
||||
s"it works for Flink shuffle client push data.")
|
||||
s"it works for Flink shuffle client push data. " +
|
||||
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
|
||||
s"it works for replicate client of worker replicating data to peer worker.")
|
||||
.version("0.3.0")
|
||||
.intConf
|
||||
.createWithDefault(4)
|
||||
|
||||
@ -37,8 +37,8 @@ license: |
|
||||
| celeborn.<module>.io.saslTimeout | 30s | Timeout for a single round trip of auth message exchange, in milliseconds. | 0.5.0 |
|
||||
| celeborn.<module>.io.sendBuffer | 0b | Send buffer size (SO_SNDBUF). If setting <module> to `rpc`, it works for shuffle client, master or worker. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `push`, it works for worker receiving push data. If setting <module> to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting <module> to `fetch`, it works for worker fetch server. | 0.2.0 |
|
||||
| celeborn.<module>.io.serverThreads | 0 | Number of threads used in the server thread pool. Default to 0, which is 2x#cores. If setting <module> to `rpc`, it works for master or worker. If setting <module> to `push`, it works for worker receiving push data. If setting <module> to `replicate`, it works for replicate server of worker replicating data to peer worker. If setting <module> to `fetch`, it works for worker fetch server. | |
|
||||
| celeborn.<module>.push.timeoutCheck.interval | 5s | Interval for checking push data timeout. If setting <module> to `data`, it works for shuffle client push data. If setting <module> to `push`, it works for Flink shuffle client push data. | 0.3.0 |
|
||||
| celeborn.<module>.push.timeoutCheck.threads | 4 | Threads num for checking push data timeout. If setting <module> to `data`, it works for shuffle client push data. If setting <module> to `push`, it works for Flink shuffle client push data. | 0.3.0 |
|
||||
| celeborn.<module>.push.timeoutCheck.interval | 5s | Interval for checking push data timeout. If setting <module> to `data`, it works for shuffle client push data. If setting <module> to `push`, it works for Flink shuffle client push data. If setting <module> to `replicate`, it works for replicate client of worker replicating data to peer worker. | 0.3.0 |
|
||||
| celeborn.<module>.push.timeoutCheck.threads | 4 | Threads num for checking push data timeout. If setting <module> to `data`, it works for shuffle client push data. If setting <module> to `push`, it works for Flink shuffle client push data. If setting <module> to `replicate`, it works for replicate client of worker replicating data to peer worker. | 0.3.0 |
|
||||
| celeborn.<role>.rpc.dispatcher.threads | <value of celeborn.rpc.dispatcher.threads> | Threads number of message dispatcher event loop for roles | |
|
||||
| celeborn.io.maxDefaultNettyThreads | 64 | Max default netty threads | 0.3.2 |
|
||||
| celeborn.network.bind.preferIpAddress | true | When `ture`, prefer to use IP address, otherwise FQDN. This configuration only takes effects when the bind hostname is not set explicitly, in such case, Celeborn will find the first non-loopback address to bind. | 0.3.0 |
|
||||
|
||||
Loading…
Reference in New Issue
Block a user