From 1fd8833756721291d62db5151a8b9ccb91ea2c66 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Thu, 29 Jun 2023 16:47:15 +0800 Subject: [PATCH] [CELEBORN-748] Rename RssHARetryClient to MasterClient ### What changes were proposed in this pull request? Rename RssHARetryClient to MasterClient ### Why are the changes needed? Code refactor ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #1661 from AngersZhuuuu/CELEBORN-748. Authored-by: Angerszhuuuu Signed-off-by: Cheng Pan --- .../celeborn/client/ShuffleClientImpl.java | 5 ++--- .../client/ApplicationHeartbeater.scala | 8 ++++---- .../celeborn/client/LifecycleManager.scala | 18 ++++++++--------- .../MasterClient.java} | 8 ++++---- .../MasterNotLeaderException.java | 2 +- .../MasterClientSuiteJ.java} | 20 +++++++++---------- .../master/clustermeta/ha/HAHelper.java | 2 +- .../clustermeta/ha/HAMasterMetaManager.java | 4 ++-- .../master/clustermeta/ha/HARaftServer.java | 4 ++-- .../service/deploy/master/Master.scala | 6 +++--- .../clustermeta/DefaultMetaSystemSuiteJ.java | 5 ++--- .../ha/RatisMasterStatusSystemSuiteJ.java | 5 ++--- .../service/deploy/worker/Worker.scala | 18 ++++++++--------- 13 files changed, 51 insertions(+), 54 deletions(-) rename common/src/main/java/org/apache/celeborn/common/{haclient/RssHARetryClient.java => client/MasterClient.java} (97%) rename common/src/main/java/org/apache/celeborn/common/{haclient => client}/MasterNotLeaderException.java (97%) rename common/src/test/java/org/apache/celeborn/common/{haclient/RssHARetryClientSuiteJ.java => client/MasterClientSuiteJ.java} (95%) diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index c7c07a1ad..9927ea64a 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -37,8 +37,8 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.client.compress.Compressor; import org.apache.celeborn.client.read.RssInputStream; import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.client.MasterClient; import org.apache.celeborn.common.exception.CelebornIOException; -import org.apache.celeborn.common.haclient.RssHARetryClient; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.network.TransportContext; import org.apache.celeborn.common.network.buffer.NettyManagedBuffer; @@ -1496,8 +1496,7 @@ public class ShuffleClientImpl extends ShuffleClient { if (isDriver) { try { driverRssMetaService.send( - UnregisterShuffle$.MODULE$.apply( - appUniqueId, shuffleId, RssHARetryClient.genRequestId())); + UnregisterShuffle$.MODULE$.apply(appUniqueId, shuffleId, MasterClient.genRequestId())); } catch (Exception e) { // If some exceptions need to be ignored, they shouldn't be logged as error-level, // otherwise it will mislead users. diff --git a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala index dd8a5d389..38dc2a47d 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import scala.concurrent.duration.DurationInt import org.apache.celeborn.common.CelebornConf -import org.apache.celeborn.common.haclient.RssHARetryClient +import org.apache.celeborn.common.client.MasterClient import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.protocol.message.ControlMessages.{HeartbeatFromApplication, HeartbeatFromApplicationResponse, ZERO_UUID} import org.apache.celeborn.common.protocol.message.StatusCode @@ -32,7 +32,7 @@ import org.apache.celeborn.common.util.{ThreadUtils, Utils} class ApplicationHeartbeater( appId: String, conf: CelebornConf, - rssHARetryClient: RssHARetryClient, + masterClient: MasterClient, shuffleMetrics: () => (Long, Long), workerStatusTracker: WorkerStatusTracker) extends Logging { @@ -47,7 +47,7 @@ class ApplicationHeartbeater( new Runnable { override def run(): Unit = { try { - require(rssHARetryClient != null, "When sending a heartbeat, client shouldn't be null.") + require(masterClient != null, "When sending a heartbeat, client shouldn't be null.") val (tmpTotalWritten, tmpTotalFileCount) = shuffleMetrics() logInfo("Send app heartbeat with " + s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count: $tmpTotalFileCount") @@ -82,7 +82,7 @@ class ApplicationHeartbeater( private def requestHeartbeat(message: HeartbeatFromApplication) : HeartbeatFromApplicationResponse = { try { - rssHARetryClient.askSync[HeartbeatFromApplicationResponse]( + masterClient.askSync[HeartbeatFromApplicationResponse]( message, classOf[HeartbeatFromApplicationResponse]) } catch { diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 3e43d7b72..06e1a3de8 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -30,7 +30,7 @@ import com.google.common.annotations.VisibleForTesting import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, ShuffleFailedWorkers} import org.apache.celeborn.client.listener.WorkerStatusListener import org.apache.celeborn.common.CelebornConf -import org.apache.celeborn.common.haclient.RssHARetryClient +import org.apache.celeborn.common.client.MasterClient import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier} import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo, WorkerInfo} @@ -118,14 +118,14 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends logInfo(s"Starting LifecycleManager on ${rpcEnv.address}") - private val rssHARetryClient = new RssHARetryClient(rpcEnv, conf) + private val masterClient = new MasterClient(rpcEnv, conf) val commitManager = new CommitManager(appUniqueId, conf, this) val workerStatusTracker = new WorkerStatusTracker(conf, this) private val heartbeater = new ApplicationHeartbeater( appUniqueId, conf, - rssHARetryClient, + masterClient, () => commitManager.commitMetrics(), workerStatusTracker) private val changePartitionManager = new ChangePartitionManager(conf, this) @@ -168,7 +168,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends releasePartitionManager.stop() heartbeater.stop() - rssHARetryClient.close() + masterClient.close() if (rpcEnv != null) { rpcEnv.shutdown() rpcEnv.awaitTermination() @@ -1023,7 +1023,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends commitManager.removeExpiredShuffle(shuffleId) changePartitionManager.removeExpiredShuffle(shuffleId) val unregisterShuffleResponse = requestMasterUnregisterShuffle( - UnregisterShuffle(appUniqueId, shuffleId, RssHARetryClient.genRequestId())) + UnregisterShuffle(appUniqueId, shuffleId, MasterClient.genRequestId())) // if unregister shuffle not success, wait next turn if (StatusCode.SUCCESS == Utils.toStatusCode(unregisterShuffleResponse.getStatus)) { unregisterShuffleTime.remove(shuffleId) @@ -1055,7 +1055,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends private def requestMasterRequestSlots(message: RequestSlots): RequestSlotsResponse = { val shuffleKey = Utils.makeShuffleKey(message.applicationId, message.shuffleId) try { - rssHARetryClient.askSync[RequestSlotsResponse](message, classOf[RequestSlotsResponse]) + masterClient.askSync[RequestSlotsResponse](message, classOf[RequestSlotsResponse]) } catch { case e: Exception => logError(s"AskSync RegisterShuffle for $shuffleKey failed.", e) @@ -1095,7 +1095,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends private def requestMasterReleaseSlots(message: ReleaseSlots): ReleaseSlotsResponse = { try { - rssHARetryClient.askSync[ReleaseSlotsResponse](message, classOf[ReleaseSlotsResponse]) + masterClient.askSync[ReleaseSlotsResponse](message, classOf[ReleaseSlotsResponse]) } catch { case e: Exception => logError(s"AskSync ReleaseSlots for ${message.shuffleId} failed.", e) @@ -1106,7 +1106,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends private def requestMasterUnregisterShuffle(message: PbUnregisterShuffle) : PbUnregisterShuffleResponse = { try { - rssHARetryClient.askSync[PbUnregisterShuffleResponse]( + masterClient.askSync[PbUnregisterShuffleResponse]( message, classOf[PbUnregisterShuffleResponse]) } catch { @@ -1118,7 +1118,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends def checkQuota(): CheckQuotaResponse = { try { - rssHARetryClient.askSync[CheckQuotaResponse]( + masterClient.askSync[CheckQuotaResponse]( CheckQuota(userIdentifier), classOf[CheckQuotaResponse]) } catch { diff --git a/common/src/main/java/org/apache/celeborn/common/haclient/RssHARetryClient.java b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java similarity index 97% rename from common/src/main/java/org/apache/celeborn/common/haclient/RssHARetryClient.java rename to common/src/main/java/org/apache/celeborn/common/client/MasterClient.java index 360de2af0..364255241 100644 --- a/common/src/main/java/org/apache/celeborn/common/haclient/RssHARetryClient.java +++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.celeborn.common.haclient; +package org.apache.celeborn.common.client; import java.io.IOException; import java.util.UUID; @@ -48,8 +48,8 @@ import org.apache.celeborn.common.rpc.RpcEnv; import org.apache.celeborn.common.rpc.RpcTimeout; import org.apache.celeborn.common.util.ThreadUtils; -public class RssHARetryClient { - private static final Logger LOG = LoggerFactory.getLogger(RssHARetryClient.class); +public class MasterClient { + private static final Logger LOG = LoggerFactory.getLogger(MasterClient.class); private final RpcEnv rpcEnv; private final String[] masterEndpoints; @@ -60,7 +60,7 @@ public class RssHARetryClient { private final AtomicReference rpcEndpointRef; private final ExecutorService oneWayMessageSender; - public RssHARetryClient(RpcEnv rpcEnv, CelebornConf conf) { + public MasterClient(RpcEnv rpcEnv, CelebornConf conf) { this.rpcEnv = rpcEnv; this.masterEndpoints = conf.masterEndpoints(); this.maxRetries = Math.max(masterEndpoints.length, conf.masterClientMaxRetries()); diff --git a/common/src/main/java/org/apache/celeborn/common/haclient/MasterNotLeaderException.java b/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java similarity index 97% rename from common/src/main/java/org/apache/celeborn/common/haclient/MasterNotLeaderException.java rename to common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java index bbeeb1e95..10c3c24ea 100644 --- a/common/src/main/java/org/apache/celeborn/common/haclient/MasterNotLeaderException.java +++ b/common/src/main/java/org/apache/celeborn/common/client/MasterNotLeaderException.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.celeborn.common.haclient; +package org.apache.celeborn.common.client; import java.io.IOException; diff --git a/common/src/test/java/org/apache/celeborn/common/haclient/RssHARetryClientSuiteJ.java b/common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java similarity index 95% rename from common/src/test/java/org/apache/celeborn/common/haclient/RssHARetryClientSuiteJ.java rename to common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java index 165029b46..a902fd2a4 100644 --- a/common/src/test/java/org/apache/celeborn/common/haclient/RssHARetryClientSuiteJ.java +++ b/common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.celeborn.common.haclient; +package org.apache.celeborn.common.client; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -46,8 +46,8 @@ import org.apache.celeborn.common.rpc.RpcAddress; import org.apache.celeborn.common.rpc.RpcEndpointRef; import org.apache.celeborn.common.rpc.RpcEnv; -public class RssHARetryClientSuiteJ { - private static final Logger LOG = LoggerFactory.getLogger(RssHARetryClientSuiteJ.class); +public class MasterClientSuiteJ { + private static final Logger LOG = LoggerFactory.getLogger(MasterClientSuiteJ.class); private final String masterHost = "localhost"; private final int masterPort = 9097; @@ -79,7 +79,7 @@ public class RssHARetryClientSuiteJ { }); prepareForRpcEnvWithoutHA(); - RssHARetryClient client = new RssHARetryClient(rpcEnv, conf); + MasterClient client = new MasterClient(rpcEnv, conf); HeartbeatFromApplication message = Mockito.mock(HeartbeatFromApplication.class); try { @@ -106,7 +106,7 @@ public class RssHARetryClientSuiteJ { }); prepareForRpcEnvWithoutHA(); - RssHARetryClient client = new RssHARetryClient(rpcEnv, conf); + MasterClient client = new MasterClient(rpcEnv, conf); HeartbeatFromApplication message = Mockito.mock(HeartbeatFromApplication.class); try { @@ -132,7 +132,7 @@ public class RssHARetryClientSuiteJ { return Future$.MODULE$.successful(response); }); - RssHARetryClient client = new RssHARetryClient(rpcEnv, conf); + MasterClient client = new MasterClient(rpcEnv, conf); HeartbeatFromApplication message = Mockito.mock(HeartbeatFromApplication.class); try { @@ -152,7 +152,7 @@ public class RssHARetryClientSuiteJ { prepareForEndpointRefWithoutRetry(() -> Future$.MODULE$.successful(mockResponse)); prepareForRpcEnvWithoutHA(); - RssHARetryClient client = new RssHARetryClient(rpcEnv, conf); + MasterClient client = new MasterClient(rpcEnv, conf); HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class); HeartbeatFromWorkerResponse response = null; @@ -174,7 +174,7 @@ public class RssHARetryClientSuiteJ { prepareForEndpointRefWithRetry(numTries, () -> Future$.MODULE$.successful(mockResponse)); prepareForRpcEnvWithoutHA(); - RssHARetryClient client = new RssHARetryClient(rpcEnv, conf); + MasterClient client = new MasterClient(rpcEnv, conf); HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class); HeartbeatFromWorkerResponse response = null; @@ -195,7 +195,7 @@ public class RssHARetryClientSuiteJ { prepareForRpcEnvWithHA(() -> Future$.MODULE$.successful(mockResponse)); - RssHARetryClient client = new RssHARetryClient(rpcEnv, conf); + MasterClient client = new MasterClient(rpcEnv, conf); HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class); HeartbeatFromWorkerResponse response = null; @@ -254,7 +254,7 @@ public class RssHARetryClientSuiteJ { .when(rpcEnv) .setupEndpointRef(Mockito.any(RpcAddress.class), Mockito.anyString()); - RssHARetryClient client = new RssHARetryClient(rpcEnv, conf); + MasterClient client = new MasterClient(rpcEnv, conf); HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class); HeartbeatFromWorkerResponse response = null; diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java index 570c09d53..527b5d557 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java @@ -26,8 +26,8 @@ import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorageUtil; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.celeborn.common.client.MasterNotLeaderException; import org.apache.celeborn.common.exception.CelebornIOException; -import org.apache.celeborn.common.haclient.MasterNotLeaderException; import org.apache.celeborn.common.rpc.RpcCallContext; import org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager; import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos; diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java index ebf75b310..2a0cecf61 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java @@ -25,8 +25,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.client.MasterClient; import org.apache.celeborn.common.exception.CelebornRuntimeException; -import org.apache.celeborn.common.haclient.RssHARetryClient; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.meta.AppDiskUsageMetric; import org.apache.celeborn.common.meta.DiskInfo; @@ -318,7 +318,7 @@ public class HAMasterMetaManager extends AbstractMetaManager { ratisServer.submitRequest( ResourceRequest.newBuilder() .setCmdType(Type.UpdatePartitionSize) - .setRequestId(RssHARetryClient.genRequestId()) + .setRequestId(MasterClient.genRequestId()) .build()); } catch (CelebornRuntimeException e) { LOG.error("Handle update partition size failed!", e); diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java index cd89fca19..f20cfff16 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java @@ -51,8 +51,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.client.MasterClient; import org.apache.celeborn.common.exception.CelebornRuntimeException; -import org.apache.celeborn.common.haclient.RssHARetryClient; import org.apache.celeborn.common.util.ThreadUtils; import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos; import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceResponse; @@ -198,7 +198,7 @@ public class HARaftServer { public ResourceResponse submitRequest(ResourceProtos.ResourceRequest request) throws CelebornRuntimeException { String requestId = request.getRequestId(); - Tuple2 decoded = RssHARetryClient.decodeRequestId(requestId); + Tuple2 decoded = MasterClient.decodeRequestId(requestId); if (decoded == null) { throw new CelebornRuntimeException( "RequestId:" + requestId + " invalid, should be: uuid#callId."); diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 475229aa9..1c1c1908d 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -26,7 +26,7 @@ import scala.collection.JavaConverters._ import scala.util.Random import org.apache.celeborn.common.CelebornConf -import org.apache.celeborn.common.haclient.RssHARetryClient +import org.apache.celeborn.common.client.MasterClient import org.apache.celeborn.common.identity.UserIdentifier import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo} @@ -369,7 +369,7 @@ private[celeborn] class Master( worker.pushPort, worker.fetchPort, worker.replicatePort, - RssHARetryClient.genRequestId())) + MasterClient.genRequestId())) } ind += 1 } @@ -384,7 +384,7 @@ private[celeborn] class Master( statusSystem.appHeartbeatTime.keySet().asScala.foreach { key => if (statusSystem.appHeartbeatTime.get(key) < currentTime - appHeartbeatTimeoutMs) { logWarning(s"Application $key timeout, trigger applicationLost event.") - val requestId = RssHARetryClient.genRequestId() + val requestId = MasterClient.genRequestId() var res = self.askSync[ApplicationLostResponse](ApplicationLost(key, requestId)) var retry = 1 while (res.status != StatusCode.SUCCESS && retry <= 3) { diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java index 1b6df8205..6d0ce6e21 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java @@ -30,7 +30,7 @@ import org.junit.Before; import org.junit.Test; import org.apache.celeborn.common.CelebornConf; -import org.apache.celeborn.common.haclient.RssHARetryClient; +import org.apache.celeborn.common.client.MasterClient; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.WorkerInfo; @@ -105,8 +105,7 @@ public class DefaultMetaSystemSuiteJ { public void tearDown() throws Exception {} private String getNewReqeustId() { - return RssHARetryClient.encodeRequestId( - UUID.randomUUID().toString(), callerId.incrementAndGet()); + return MasterClient.encodeRequestId(UUID.randomUUID().toString(), callerId.incrementAndGet()); } @Test diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java index f2ab80996..85bb1db61 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java @@ -29,8 +29,8 @@ import org.junit.*; import org.mockito.Mockito; import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.client.MasterClient; import org.apache.celeborn.common.exception.CelebornRuntimeException; -import org.apache.celeborn.common.haclient.RssHARetryClient; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.WorkerInfo; @@ -195,8 +195,7 @@ public class RatisMasterStatusSystemSuiteJ { private static String SHUFFLEKEY1 = APPID1 + "-" + SHUFFLEID1; private String getNewReqeustId() { - return RssHARetryClient.encodeRequestId( - UUID.randomUUID().toString(), callerId.incrementAndGet()); + return MasterClient.encodeRequestId(UUID.randomUUID().toString(), callerId.incrementAndGet()); } public HAMasterMetaManager pickLeaderStatusSystem() { diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index f6be07d6e..550adb05a 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -30,8 +30,8 @@ import io.netty.util.HashedWheelTimer import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.CelebornConf._ +import org.apache.celeborn.common.client.MasterClient import org.apache.celeborn.common.exception.CelebornException -import org.apache.celeborn.common.haclient.RssHARetryClient import org.apache.celeborn.common.identity.UserIdentifier import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, WorkerPartitionLocationInfo} @@ -219,7 +219,7 @@ private[celeborn] class Worker( val shuffleCommitInfos = JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[Long, CommitInfo]]() - private val rssHARetryClient = new RssHARetryClient(rpcEnv, conf) + private val masterClient = new MasterClient(rpcEnv, conf) // (workerInfo -> last connect timeout timestamp) val unavailablePeers = JavaUtils.newConcurrentHashMap[WorkerInfo, Long]() @@ -285,7 +285,7 @@ private[celeborn] class Worker( val resourceConsumption = workerInfo.updateThenGetUserResourceConsumption( storageManager.userResourceConsumptionSnapshot().asJava) - val response = rssHARetryClient.askSync[HeartbeatFromWorkerResponse]( + val response = masterClient.askSync[HeartbeatFromWorkerResponse]( HeartbeatFromWorker( host, rpcPort, @@ -392,7 +392,7 @@ private[celeborn] class Worker( } memoryManager.close(); - rssHARetryClient.close() + masterClient.close() replicateServer.close() fetchServer.close() @@ -410,7 +410,7 @@ private[celeborn] class Worker( while (registerTimeout > 0) { val resp = try { - rssHARetryClient.askSync[PbRegisterWorkerResponse]( + masterClient.askSync[PbRegisterWorkerResponse]( RegisterWorker( host, rpcPort, @@ -422,7 +422,7 @@ private[celeborn] class Worker( workerInfo.diskInfos.asScala.toMap, workerInfo.updateThenGetUserResourceConsumption( storageManager.userResourceConsumptionSnapshot().asJava).asScala.toMap, - RssHARetryClient.genRequestId()), + MasterClient.genRequestId()), classOf[PbRegisterWorkerResponse]) } catch { case throwable: Throwable => @@ -532,18 +532,18 @@ private[celeborn] class Worker( // make master remove this worker from excluded list. try { if (gracefulShutdown) { - rssHARetryClient.askSync( + masterClient.askSync( ReportWorkerUnavailable(List(workerInfo).asJava), OneWayMessageResponse.getClass) } else { - rssHARetryClient.askSync[PbWorkerLostResponse]( + masterClient.askSync[PbWorkerLostResponse]( WorkerLost( host, rpcPort, pushPort, fetchPort, replicatePort, - RssHARetryClient.genRequestId()), + MasterClient.genRequestId()), classOf[PbWorkerLostResponse]) } } catch {