[CELEBORN-936] Shuffle master urls to avoid always connect first mast…

### What changes were proposed in this pull request?
Shuffle master urls to avoid always connect first master first time

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #1866 from zy-jordan/CELEBORN-936.

Authored-by: hongzhaoyang <15316036153@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
This commit is contained in:
hongzhaoyang 2023-08-30 17:33:38 +08:00 committed by zky.zhoukeyong
parent 92777c3ff2
commit bd465aa7a6

View File

@ -18,7 +18,7 @@
package org.apache.celeborn.common.client;
import java.io.IOException;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -52,7 +52,7 @@ public class MasterClient {
private static final Logger LOG = LoggerFactory.getLogger(MasterClient.class);
private final RpcEnv rpcEnv;
private final String[] masterEndpoints;
private final List<String> masterEndpoints;
private final int maxRetries;
private final RpcTimeout rpcTimeout;
@ -62,8 +62,9 @@ public class MasterClient {
public MasterClient(RpcEnv rpcEnv, CelebornConf conf) {
this.rpcEnv = rpcEnv;
this.masterEndpoints = conf.masterEndpoints();
this.maxRetries = Math.max(masterEndpoints.length, conf.masterClientMaxRetries());
this.masterEndpoints = Arrays.asList(conf.masterEndpoints());
Collections.shuffle(this.masterEndpoints);
this.maxRetries = Math.max(masterEndpoints.size(), conf.masterClientMaxRetries());
this.rpcTimeout = conf.masterClientRpcAskTimeout();
this.rpcEndpointRef = new AtomicReference<>();
this.oneWayMessageSender = ThreadUtils.newDaemonSingleThreadExecutor("One-Way-Message-Sender");
@ -224,9 +225,9 @@ public class MasterClient {
if (endpointRef == null) {
int index = currentIndex.get();
do {
RpcEndpointRef tempEndpointRef = setupEndpointRef(masterEndpoints[index]);
RpcEndpointRef tempEndpointRef = setupEndpointRef(masterEndpoints.get(index));
if (rpcEndpointRef.compareAndSet(null, tempEndpointRef)) {
index = (index + 1) % masterEndpoints.length;
index = (index + 1) % masterEndpoints.size();
}
endpointRef = rpcEndpointRef.get();
} while (endpointRef == null && index != currentIndex.get());