[CELEBORN-748] Rename RssHARetryClient to MasterClient

### What changes were proposed in this pull request?

Rename RssHARetryClient to MasterClient

### Why are the changes needed?

Code refactor

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass GA.

Closes #1661 from AngersZhuuuu/CELEBORN-748.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Angerszhuuuu 2023-06-29 16:47:15 +08:00 committed by Cheng Pan
parent 11569689be
commit 1fd8833756
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
13 changed files with 51 additions and 54 deletions

View File

@ -37,8 +37,8 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.client.compress.Compressor;
import org.apache.celeborn.client.read.RssInputStream;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.client.MasterClient;
import org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.haclient.RssHARetryClient;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.network.TransportContext;
import org.apache.celeborn.common.network.buffer.NettyManagedBuffer;
@ -1496,8 +1496,7 @@ public class ShuffleClientImpl extends ShuffleClient {
if (isDriver) {
try {
driverRssMetaService.send(
UnregisterShuffle$.MODULE$.apply(
appUniqueId, shuffleId, RssHARetryClient.genRequestId()));
UnregisterShuffle$.MODULE$.apply(appUniqueId, shuffleId, MasterClient.genRequestId()));
} catch (Exception e) {
// If some exceptions need to be ignored, they shouldn't be logged as error-level,
// otherwise it will mislead users.

View File

@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
import scala.concurrent.duration.DurationInt
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.haclient.RssHARetryClient
import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.protocol.message.ControlMessages.{HeartbeatFromApplication, HeartbeatFromApplicationResponse, ZERO_UUID}
import org.apache.celeborn.common.protocol.message.StatusCode
@ -32,7 +32,7 @@ import org.apache.celeborn.common.util.{ThreadUtils, Utils}
class ApplicationHeartbeater(
appId: String,
conf: CelebornConf,
rssHARetryClient: RssHARetryClient,
masterClient: MasterClient,
shuffleMetrics: () => (Long, Long),
workerStatusTracker: WorkerStatusTracker) extends Logging {
@ -47,7 +47,7 @@ class ApplicationHeartbeater(
new Runnable {
override def run(): Unit = {
try {
require(rssHARetryClient != null, "When sending a heartbeat, client shouldn't be null.")
require(masterClient != null, "When sending a heartbeat, client shouldn't be null.")
val (tmpTotalWritten, tmpTotalFileCount) = shuffleMetrics()
logInfo("Send app heartbeat with " +
s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count: $tmpTotalFileCount")
@ -82,7 +82,7 @@ class ApplicationHeartbeater(
private def requestHeartbeat(message: HeartbeatFromApplication)
: HeartbeatFromApplicationResponse = {
try {
rssHARetryClient.askSync[HeartbeatFromApplicationResponse](
masterClient.askSync[HeartbeatFromApplicationResponse](
message,
classOf[HeartbeatFromApplicationResponse])
} catch {

View File

@ -30,7 +30,7 @@ import com.google.common.annotations.VisibleForTesting
import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, ShuffleFailedWorkers}
import org.apache.celeborn.client.listener.WorkerStatusListener
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.haclient.RssHARetryClient
import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo, WorkerInfo}
@ -118,14 +118,14 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
logInfo(s"Starting LifecycleManager on ${rpcEnv.address}")
private val rssHARetryClient = new RssHARetryClient(rpcEnv, conf)
private val masterClient = new MasterClient(rpcEnv, conf)
val commitManager = new CommitManager(appUniqueId, conf, this)
val workerStatusTracker = new WorkerStatusTracker(conf, this)
private val heartbeater =
new ApplicationHeartbeater(
appUniqueId,
conf,
rssHARetryClient,
masterClient,
() => commitManager.commitMetrics(),
workerStatusTracker)
private val changePartitionManager = new ChangePartitionManager(conf, this)
@ -168,7 +168,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
releasePartitionManager.stop()
heartbeater.stop()
rssHARetryClient.close()
masterClient.close()
if (rpcEnv != null) {
rpcEnv.shutdown()
rpcEnv.awaitTermination()
@ -1023,7 +1023,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
commitManager.removeExpiredShuffle(shuffleId)
changePartitionManager.removeExpiredShuffle(shuffleId)
val unregisterShuffleResponse = requestMasterUnregisterShuffle(
UnregisterShuffle(appUniqueId, shuffleId, RssHARetryClient.genRequestId()))
UnregisterShuffle(appUniqueId, shuffleId, MasterClient.genRequestId()))
// if unregister shuffle not success, wait next turn
if (StatusCode.SUCCESS == Utils.toStatusCode(unregisterShuffleResponse.getStatus)) {
unregisterShuffleTime.remove(shuffleId)
@ -1055,7 +1055,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
private def requestMasterRequestSlots(message: RequestSlots): RequestSlotsResponse = {
val shuffleKey = Utils.makeShuffleKey(message.applicationId, message.shuffleId)
try {
rssHARetryClient.askSync[RequestSlotsResponse](message, classOf[RequestSlotsResponse])
masterClient.askSync[RequestSlotsResponse](message, classOf[RequestSlotsResponse])
} catch {
case e: Exception =>
logError(s"AskSync RegisterShuffle for $shuffleKey failed.", e)
@ -1095,7 +1095,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
private def requestMasterReleaseSlots(message: ReleaseSlots): ReleaseSlotsResponse = {
try {
rssHARetryClient.askSync[ReleaseSlotsResponse](message, classOf[ReleaseSlotsResponse])
masterClient.askSync[ReleaseSlotsResponse](message, classOf[ReleaseSlotsResponse])
} catch {
case e: Exception =>
logError(s"AskSync ReleaseSlots for ${message.shuffleId} failed.", e)
@ -1106,7 +1106,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
private def requestMasterUnregisterShuffle(message: PbUnregisterShuffle)
: PbUnregisterShuffleResponse = {
try {
rssHARetryClient.askSync[PbUnregisterShuffleResponse](
masterClient.askSync[PbUnregisterShuffleResponse](
message,
classOf[PbUnregisterShuffleResponse])
} catch {
@ -1118,7 +1118,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
def checkQuota(): CheckQuotaResponse = {
try {
rssHARetryClient.askSync[CheckQuotaResponse](
masterClient.askSync[CheckQuotaResponse](
CheckQuota(userIdentifier),
classOf[CheckQuotaResponse])
} catch {

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.celeborn.common.haclient;
package org.apache.celeborn.common.client;
import java.io.IOException;
import java.util.UUID;
@ -48,8 +48,8 @@ import org.apache.celeborn.common.rpc.RpcEnv;
import org.apache.celeborn.common.rpc.RpcTimeout;
import org.apache.celeborn.common.util.ThreadUtils;
public class RssHARetryClient {
private static final Logger LOG = LoggerFactory.getLogger(RssHARetryClient.class);
public class MasterClient {
private static final Logger LOG = LoggerFactory.getLogger(MasterClient.class);
private final RpcEnv rpcEnv;
private final String[] masterEndpoints;
@ -60,7 +60,7 @@ public class RssHARetryClient {
private final AtomicReference<RpcEndpointRef> rpcEndpointRef;
private final ExecutorService oneWayMessageSender;
public RssHARetryClient(RpcEnv rpcEnv, CelebornConf conf) {
public MasterClient(RpcEnv rpcEnv, CelebornConf conf) {
this.rpcEnv = rpcEnv;
this.masterEndpoints = conf.masterEndpoints();
this.maxRetries = Math.max(masterEndpoints.length, conf.masterClientMaxRetries());

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.celeborn.common.haclient;
package org.apache.celeborn.common.client;
import java.io.IOException;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.celeborn.common.haclient;
package org.apache.celeborn.common.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -46,8 +46,8 @@ import org.apache.celeborn.common.rpc.RpcAddress;
import org.apache.celeborn.common.rpc.RpcEndpointRef;
import org.apache.celeborn.common.rpc.RpcEnv;
public class RssHARetryClientSuiteJ {
private static final Logger LOG = LoggerFactory.getLogger(RssHARetryClientSuiteJ.class);
public class MasterClientSuiteJ {
private static final Logger LOG = LoggerFactory.getLogger(MasterClientSuiteJ.class);
private final String masterHost = "localhost";
private final int masterPort = 9097;
@ -79,7 +79,7 @@ public class RssHARetryClientSuiteJ {
});
prepareForRpcEnvWithoutHA();
RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
MasterClient client = new MasterClient(rpcEnv, conf);
HeartbeatFromApplication message = Mockito.mock(HeartbeatFromApplication.class);
try {
@ -106,7 +106,7 @@ public class RssHARetryClientSuiteJ {
});
prepareForRpcEnvWithoutHA();
RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
MasterClient client = new MasterClient(rpcEnv, conf);
HeartbeatFromApplication message = Mockito.mock(HeartbeatFromApplication.class);
try {
@ -132,7 +132,7 @@ public class RssHARetryClientSuiteJ {
return Future$.MODULE$.successful(response);
});
RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
MasterClient client = new MasterClient(rpcEnv, conf);
HeartbeatFromApplication message = Mockito.mock(HeartbeatFromApplication.class);
try {
@ -152,7 +152,7 @@ public class RssHARetryClientSuiteJ {
prepareForEndpointRefWithoutRetry(() -> Future$.MODULE$.successful(mockResponse));
prepareForRpcEnvWithoutHA();
RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
MasterClient client = new MasterClient(rpcEnv, conf);
HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
HeartbeatFromWorkerResponse response = null;
@ -174,7 +174,7 @@ public class RssHARetryClientSuiteJ {
prepareForEndpointRefWithRetry(numTries, () -> Future$.MODULE$.successful(mockResponse));
prepareForRpcEnvWithoutHA();
RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
MasterClient client = new MasterClient(rpcEnv, conf);
HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
HeartbeatFromWorkerResponse response = null;
@ -195,7 +195,7 @@ public class RssHARetryClientSuiteJ {
prepareForRpcEnvWithHA(() -> Future$.MODULE$.successful(mockResponse));
RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
MasterClient client = new MasterClient(rpcEnv, conf);
HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
HeartbeatFromWorkerResponse response = null;
@ -254,7 +254,7 @@ public class RssHARetryClientSuiteJ {
.when(rpcEnv)
.setupEndpointRef(Mockito.any(RpcAddress.class), Mockito.anyString());
RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
MasterClient client = new MasterClient(rpcEnv, conf);
HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
HeartbeatFromWorkerResponse response = null;

View File

@ -26,8 +26,8 @@ import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorageUtil;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.celeborn.common.client.MasterNotLeaderException;
import org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.haclient.MasterNotLeaderException;
import org.apache.celeborn.common.rpc.RpcCallContext;
import org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;

View File

@ -25,8 +25,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.client.MasterClient;
import org.apache.celeborn.common.exception.CelebornRuntimeException;
import org.apache.celeborn.common.haclient.RssHARetryClient;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.AppDiskUsageMetric;
import org.apache.celeborn.common.meta.DiskInfo;
@ -318,7 +318,7 @@ public class HAMasterMetaManager extends AbstractMetaManager {
ratisServer.submitRequest(
ResourceRequest.newBuilder()
.setCmdType(Type.UpdatePartitionSize)
.setRequestId(RssHARetryClient.genRequestId())
.setRequestId(MasterClient.genRequestId())
.build());
} catch (CelebornRuntimeException e) {
LOG.error("Handle update partition size failed!", e);

View File

@ -51,8 +51,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.client.MasterClient;
import org.apache.celeborn.common.exception.CelebornRuntimeException;
import org.apache.celeborn.common.haclient.RssHARetryClient;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceResponse;
@ -198,7 +198,7 @@ public class HARaftServer {
public ResourceResponse submitRequest(ResourceProtos.ResourceRequest request)
throws CelebornRuntimeException {
String requestId = request.getRequestId();
Tuple2<String, Long> decoded = RssHARetryClient.decodeRequestId(requestId);
Tuple2<String, Long> decoded = MasterClient.decodeRequestId(requestId);
if (decoded == null) {
throw new CelebornRuntimeException(
"RequestId:" + requestId + " invalid, should be: uuid#callId.");

View File

@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
import scala.util.Random
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.haclient.RssHARetryClient
import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo}
@ -369,7 +369,7 @@ private[celeborn] class Master(
worker.pushPort,
worker.fetchPort,
worker.replicatePort,
RssHARetryClient.genRequestId()))
MasterClient.genRequestId()))
}
ind += 1
}
@ -384,7 +384,7 @@ private[celeborn] class Master(
statusSystem.appHeartbeatTime.keySet().asScala.foreach { key =>
if (statusSystem.appHeartbeatTime.get(key) < currentTime - appHeartbeatTimeoutMs) {
logWarning(s"Application $key timeout, trigger applicationLost event.")
val requestId = RssHARetryClient.genRequestId()
val requestId = MasterClient.genRequestId()
var res = self.askSync[ApplicationLostResponse](ApplicationLost(key, requestId))
var retry = 1
while (res.status != StatusCode.SUCCESS && retry <= 3) {

View File

@ -30,7 +30,7 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.haclient.RssHARetryClient;
import org.apache.celeborn.common.client.MasterClient;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
@ -105,8 +105,7 @@ public class DefaultMetaSystemSuiteJ {
public void tearDown() throws Exception {}
private String getNewReqeustId() {
return RssHARetryClient.encodeRequestId(
UUID.randomUUID().toString(), callerId.incrementAndGet());
return MasterClient.encodeRequestId(UUID.randomUUID().toString(), callerId.incrementAndGet());
}
@Test

View File

@ -29,8 +29,8 @@ import org.junit.*;
import org.mockito.Mockito;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.client.MasterClient;
import org.apache.celeborn.common.exception.CelebornRuntimeException;
import org.apache.celeborn.common.haclient.RssHARetryClient;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
@ -195,8 +195,7 @@ public class RatisMasterStatusSystemSuiteJ {
private static String SHUFFLEKEY1 = APPID1 + "-" + SHUFFLEID1;
private String getNewReqeustId() {
return RssHARetryClient.encodeRequestId(
UUID.randomUUID().toString(), callerId.incrementAndGet());
return MasterClient.encodeRequestId(UUID.randomUUID().toString(), callerId.incrementAndGet());
}
public HAMasterMetaManager pickLeaderStatusSystem() {

View File

@ -30,8 +30,8 @@ import io.netty.util.HashedWheelTimer
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.CelebornConf._
import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.exception.CelebornException
import org.apache.celeborn.common.haclient.RssHARetryClient
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, WorkerPartitionLocationInfo}
@ -219,7 +219,7 @@ private[celeborn] class Worker(
val shuffleCommitInfos =
JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[Long, CommitInfo]]()
private val rssHARetryClient = new RssHARetryClient(rpcEnv, conf)
private val masterClient = new MasterClient(rpcEnv, conf)
// (workerInfo -> last connect timeout timestamp)
val unavailablePeers = JavaUtils.newConcurrentHashMap[WorkerInfo, Long]()
@ -285,7 +285,7 @@ private[celeborn] class Worker(
val resourceConsumption = workerInfo.updateThenGetUserResourceConsumption(
storageManager.userResourceConsumptionSnapshot().asJava)
val response = rssHARetryClient.askSync[HeartbeatFromWorkerResponse](
val response = masterClient.askSync[HeartbeatFromWorkerResponse](
HeartbeatFromWorker(
host,
rpcPort,
@ -392,7 +392,7 @@ private[celeborn] class Worker(
}
memoryManager.close();
rssHARetryClient.close()
masterClient.close()
replicateServer.close()
fetchServer.close()
@ -410,7 +410,7 @@ private[celeborn] class Worker(
while (registerTimeout > 0) {
val resp =
try {
rssHARetryClient.askSync[PbRegisterWorkerResponse](
masterClient.askSync[PbRegisterWorkerResponse](
RegisterWorker(
host,
rpcPort,
@ -422,7 +422,7 @@ private[celeborn] class Worker(
workerInfo.diskInfos.asScala.toMap,
workerInfo.updateThenGetUserResourceConsumption(
storageManager.userResourceConsumptionSnapshot().asJava).asScala.toMap,
RssHARetryClient.genRequestId()),
MasterClient.genRequestId()),
classOf[PbRegisterWorkerResponse])
} catch {
case throwable: Throwable =>
@ -532,18 +532,18 @@ private[celeborn] class Worker(
// make master remove this worker from excluded list.
try {
if (gracefulShutdown) {
rssHARetryClient.askSync(
masterClient.askSync(
ReportWorkerUnavailable(List(workerInfo).asJava),
OneWayMessageResponse.getClass)
} else {
rssHARetryClient.askSync[PbWorkerLostResponse](
masterClient.askSync[PbWorkerLostResponse](
WorkerLost(
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
RssHARetryClient.genRequestId()),
MasterClient.genRequestId()),
classOf[PbWorkerLostResponse])
}
} catch {