From bd465aa7a6d8da07b14aaf1776e4e54ebf97820a Mon Sep 17 00:00:00 2001 From: hongzhaoyang <15316036153@163.com> Date: Wed, 30 Aug 2023 17:33:38 +0800 Subject: [PATCH] =?UTF-8?q?[CELEBORN-936]=20Shuffle=20master=20urls=20to?= =?UTF-8?q?=20avoid=20always=20connect=20first=20mast=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? Shuffle master urls to avoid always connect first master first time ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1866 from zy-jordan/CELEBORN-936. Authored-by: hongzhaoyang <15316036153@163.com> Signed-off-by: zky.zhoukeyong --- .../apache/celeborn/common/client/MasterClient.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java index 8ad3f267a..e63e3b87b 100644 --- a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java +++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java @@ -18,7 +18,7 @@ package org.apache.celeborn.common.client; import java.io.IOException; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -52,7 +52,7 @@ public class MasterClient { private static final Logger LOG = LoggerFactory.getLogger(MasterClient.class); private final RpcEnv rpcEnv; - private final String[] masterEndpoints; + private final List masterEndpoints; private final int maxRetries; private final RpcTimeout rpcTimeout; @@ -62,8 +62,9 @@ public class MasterClient { public MasterClient(RpcEnv rpcEnv, CelebornConf conf) { this.rpcEnv = rpcEnv; - this.masterEndpoints = conf.masterEndpoints(); - this.maxRetries = Math.max(masterEndpoints.length, conf.masterClientMaxRetries()); + this.masterEndpoints = Arrays.asList(conf.masterEndpoints()); + Collections.shuffle(this.masterEndpoints); + this.maxRetries = Math.max(masterEndpoints.size(), conf.masterClientMaxRetries()); this.rpcTimeout = conf.masterClientRpcAskTimeout(); this.rpcEndpointRef = new AtomicReference<>(); this.oneWayMessageSender = ThreadUtils.newDaemonSingleThreadExecutor("One-Way-Message-Sender"); @@ -224,9 +225,9 @@ public class MasterClient { if (endpointRef == null) { int index = currentIndex.get(); do { - RpcEndpointRef tempEndpointRef = setupEndpointRef(masterEndpoints[index]); + RpcEndpointRef tempEndpointRef = setupEndpointRef(masterEndpoints.get(index)); if (rpcEndpointRef.compareAndSet(null, tempEndpointRef)) { - index = (index + 1) % masterEndpoints.length; + index = (index + 1) % masterEndpoints.size(); } endpointRef = rpcEndpointRef.get(); } while (endpointRef == null && index != currentIndex.get());