From da7ac1721be17a3cc3f52f41dec3a281f6502874 Mon Sep 17 00:00:00 2001 From: AngersZhuuuu Date: Wed, 7 Sep 2022 21:33:18 +0800 Subject: [PATCH] [ISSUE-565][REFACTOR] Unify RPC name HeartbeatXxxxx (#566) --- README.md | 6 ++--- assets/diagram/rss.drawio | 2 +- .../rss/client/write/LifecycleManager.scala | 2 +- common/src/main/proto/TransportMessages.proto | 2 +- .../emr/rss/common/meta/WorkerInfo.scala | 2 +- .../rss/common/metrics/source/RPCSource.scala | 8 +++--- .../protocol/message/ControlMessages.scala | 26 +++++++++---------- .../haclient/RssHARetryClientSuiteJ.java | 8 +++--- .../clustermeta/AbstractMetaManager.java | 4 +-- .../master/clustermeta/IMetadataHandler.java | 2 +- .../clustermeta/SingleMasterMetaManager.java | 6 ++--- .../clustermeta/ha/HAMasterMetaManager.java | 10 +++---- .../master/clustermeta/ha/MetaHandler.java | 22 ++++++++-------- server-master/src/main/proto/Resource.proto | 8 +++--- .../rss/service/deploy/master/Master.scala | 12 ++++----- .../clustermeta/DefaultMetaSystemSuiteJ.java | 8 +++--- .../ha/RatisMasterStatusSystemSuiteJ.java | 8 +++--- 17 files changed, 68 insertions(+), 68 deletions(-) diff --git a/README.md b/README.md index 7adcf1687..cafdf2f44 100644 --- a/README.md +++ b/README.md @@ -143,7 +143,7 @@ FetchPort: 35689 TotalSlots: 4096 SlotsUsed: 0 SlotsAvailable: 4096 -LastHeartBeat: 0 +LastHeartbeat: 0 WorkerRef: NettyRpcEndpointRef(ess://WorkerEndpoint@172.16.159.100:40115) . 21/12/21 20:06:23,785 INFO [netty-rpc-connection-1] TransportClientFactory: Successfully created connection to /172.16.159.98:39151 after 1 ms (0 ms spent in bootstraps) @@ -155,7 +155,7 @@ FetchPort: 37455 TotalSlots: 4096 SlotsUsed: 0 SlotsAvailable: 4096 -LastHeartBeat: 0 +LastHeartbeat: 0 WorkerRef: NettyRpcEndpointRef(ess://WorkerEndpoint@172.16.159.98:39151) . 21/12/21 20:06:25,948 INFO [netty-rpc-connection-2] TransportClientFactory: Successfully created connection to /172.16.159.99:41955 after 1 ms (0 ms spent in bootstraps) @@ -167,7 +167,7 @@ FetchPort: 46865 TotalSlots: 4096 SlotsUsed: 0 SlotsAvailable: 4096 -LastHeartBeat: 0 +LastHeartbeat: 0 WorkerRef: NettyRpcEndpointRef(ess://WorkerEndpoint@172.16.159.99:41955) ``` diff --git a/assets/diagram/rss.drawio b/assets/diagram/rss.drawio index fb41c2e97..f3f467af4 100644 --- a/assets/diagram/rss.drawio +++ b/assets/diagram/rss.drawio @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file diff --git a/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala b/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala index 676f41354..a1006ebc0 100644 --- a/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala +++ b/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala @@ -137,7 +137,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit val tmpFileCount = fileCount.sumThenReset() logDebug(s"Send app heartbeat with $tmpTotalWritten $tmpFileCount") val appHeartbeat = - HeartBeatFromApplication(appId, tmpTotalWritten, tmpFileCount, ZERO_UUID) + HeartbeatFromApplication(appId, tmpTotalWritten, tmpFileCount, ZERO_UUID) rssHARetryClient.send(appHeartbeat) logDebug("Successfully send app heartbeat.") } catch { diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 66c9dc3da..e088c4bb3 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -253,7 +253,7 @@ message PbApplicationLostResponse { int32 status = 1; } -message PbHeartBeatFromApplication { +message PbHeartbeatFromApplication { string appId = 1; int64 totalWritten = 2; int64 fileCount = 3 ; diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/meta/WorkerInfo.scala b/common/src/main/scala/com/aliyun/emr/rss/common/meta/WorkerInfo.scala index d7acb7b49..060fdc40e 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/meta/WorkerInfo.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/meta/WorkerInfo.scala @@ -221,7 +221,7 @@ class WorkerInfo( |FetchPort: $fetchPort |ReplicatePort: $replicatePort |SlotsUsed: $usedSlots() - |LastHeartBeat: $lastHeartbeat + |LastHeartbeat: $lastHeartbeat |Disks: $diskInfos |WorkerRef: $endpoint |""".stripMargin diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/metrics/source/RPCSource.scala b/common/src/main/scala/com/aliyun/emr/rss/common/metrics/source/RPCSource.scala index 9b1fea6e5..0e53ed24f 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/metrics/source/RPCSource.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/metrics/source/RPCSource.scala @@ -40,7 +40,7 @@ class RPCSource(rssConf: RssConf, role: String) extends AbstractSource(rssConf, addCounter(RPCChunkFetchRequestNum) // Master RPC - addCounter(RPCHeartBeatFromApplicationNum) + addCounter(RPCHeartbeatFromApplicationNum) addCounter(RPCHeartbeatFromWorkerNum) addCounter(RPCRegisterWorkerNum) addCounter(RPCRequestSlotsNum) @@ -71,8 +71,8 @@ class RPCSource(rssConf: RssConf, role: String) extends AbstractSource(rssConf, incCounter(RPCPushMergedDataSize, messageLen) case _: ChunkFetchRequest => incCounter(RPCChunkFetchRequestNum) - case _: HeartBeatFromApplication => - incCounter(RPCHeartBeatFromApplicationNum) + case _: HeartbeatFromApplication => + incCounter(RPCHeartbeatFromApplicationNum) case _: HeartbeatFromWorker => incCounter(RPCHeartbeatFromWorkerNum) case _: RegisterWorker => @@ -111,7 +111,7 @@ object RPCSource { val RPCChunkFetchRequestNum = "RPCChunkFetchRequestNum" // Master RPC - val RPCHeartBeatFromApplicationNum = "RPCHeartBeatFromApplicationNum" + val RPCHeartbeatFromApplicationNum = "RPCHeartbeatFromApplicationNum" val RPCHeartbeatFromWorkerNum = "RPCHeartbeatFromWorkerNum" val RPCRegisterWorkerNum = "RPCRegisterWorkerNum" val RPCRequestSlotsNum = "RPCRequestSlotsNum" diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala b/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala index d64a9efb3..ab6448c7b 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala @@ -285,8 +285,8 @@ sealed trait Message extends Serializable { .setStatus(status.getValue).build().toByteArray new TransportMessage(MessageType.APPLICATION_LOST_RESPONSE, payload) - case HeartBeatFromApplication(appId, totalWritten, fileCount, requestId) => - val payload = PbHeartBeatFromApplication.newBuilder() + case HeartbeatFromApplication(appId, totalWritten, fileCount, requestId) => + val payload = PbHeartbeatFromApplication.newBuilder() .setAppId(appId) .setRequestId(requestId) .setTotalWritten(totalWritten) @@ -626,7 +626,7 @@ object ControlMessages extends Logging { case class ApplicationLostResponse(status: StatusCode) extends MasterMessage - case class HeartBeatFromApplication( + case class HeartbeatFromApplication( appId: String, totalWritten: Long, fileCount: Long, @@ -770,12 +770,12 @@ object ControlMessages extends Logging { pbHeartbeatFromWorker.getRequestId) case HEARTBEAT_RESPONSE => - val pbHeartBeatResponse = PbHeartbeatResponse.parseFrom(message.getPayload) + val pbHeartbeatResponse = PbHeartbeatResponse.parseFrom(message.getPayload) val expiredShuffleKeys = new util.HashSet[String]() - if (pbHeartBeatResponse.getExpiredShuffleKeysCount > 0) { - expiredShuffleKeys.addAll(pbHeartBeatResponse.getExpiredShuffleKeysList) + if (pbHeartbeatResponse.getExpiredShuffleKeysCount > 0) { + expiredShuffleKeys.addAll(pbHeartbeatResponse.getExpiredShuffleKeysList) } - HeartbeatResponse(expiredShuffleKeys, pbHeartBeatResponse.getRegistered) + HeartbeatResponse(expiredShuffleKeys, pbHeartbeatResponse.getRegistered) case REGISTER_SHUFFLE => val pbRegisterShuffle = PbRegisterShuffle.parseFrom(message.getPayload) @@ -905,12 +905,12 @@ object ControlMessages extends Logging { ApplicationLostResponse(Utils.toStatusCode(pbApplicationLostResponse.getStatus)) case HEARTBEAT_FROM_APPLICATION => - val pbHeartBeatFromApplication = PbHeartBeatFromApplication.parseFrom(message.getPayload) - HeartBeatFromApplication( - pbHeartBeatFromApplication.getAppId, - pbHeartBeatFromApplication.getTotalWritten, - pbHeartBeatFromApplication.getFileCount, - pbHeartBeatFromApplication.getRequestId) + val pbHeartbeatFromApplication = PbHeartbeatFromApplication.parseFrom(message.getPayload) + HeartbeatFromApplication( + pbHeartbeatFromApplication.getAppId, + pbHeartbeatFromApplication.getTotalWritten, + pbHeartbeatFromApplication.getFileCount, + pbHeartbeatFromApplication.getRequestId) case GET_BLACKLIST => val pbGetBlacklist = PbGetBlacklist.parseFrom(message.getPayload) diff --git a/common/src/test/java/com/aliyun/emr/rss/common/haclient/RssHARetryClientSuiteJ.java b/common/src/test/java/com/aliyun/emr/rss/common/haclient/RssHARetryClientSuiteJ.java index 2501c0ef5..ca113b343 100644 --- a/common/src/test/java/com/aliyun/emr/rss/common/haclient/RssHARetryClientSuiteJ.java +++ b/common/src/test/java/com/aliyun/emr/rss/common/haclient/RssHARetryClientSuiteJ.java @@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory; import com.aliyun.emr.rss.common.RssConf; import com.aliyun.emr.rss.common.exception.RssException; -import com.aliyun.emr.rss.common.protocol.message.ControlMessages.HeartBeatFromApplication; +import com.aliyun.emr.rss.common.protocol.message.ControlMessages.HeartbeatFromApplication; import com.aliyun.emr.rss.common.protocol.message.ControlMessages.HeartbeatFromWorker; import com.aliyun.emr.rss.common.protocol.message.ControlMessages.HeartbeatResponse; import com.aliyun.emr.rss.common.protocol.message.ControlMessages.OneWayMessageResponse$; @@ -81,7 +81,7 @@ public class RssHARetryClientSuiteJ { prepareForRpcEnvWithoutHA(); RssHARetryClient client = new RssHARetryClient(rpcEnv, conf); - HeartBeatFromApplication message = Mockito.mock(HeartBeatFromApplication.class); + HeartbeatFromApplication message = Mockito.mock(HeartbeatFromApplication.class); try { client.send(message); @@ -108,7 +108,7 @@ public class RssHARetryClientSuiteJ { prepareForRpcEnvWithoutHA(); RssHARetryClient client = new RssHARetryClient(rpcEnv, conf); - HeartBeatFromApplication message = Mockito.mock(HeartBeatFromApplication.class); + HeartbeatFromApplication message = Mockito.mock(HeartbeatFromApplication.class); try { client.send(message); @@ -134,7 +134,7 @@ public class RssHARetryClientSuiteJ { }); RssHARetryClient client = new RssHARetryClient(rpcEnv, conf); - HeartBeatFromApplication message = Mockito.mock(HeartBeatFromApplication.class); + HeartbeatFromApplication message = Mockito.mock(HeartbeatFromApplication.class); try { client.send(message); diff --git a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/AbstractMetaManager.java b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/AbstractMetaManager.java index ee60b10bf..f89044413 100644 --- a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -117,7 +117,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler { registeredShuffle.remove(shuffleKey); } - public void updateAppHeartBeatMeta(String appId, long time, long totalWritten, long fileCount) { + public void updateAppHeartbeatMeta(String appId, long time, long totalWritten, long fileCount) { appHeartbeatTime.put(appId, time); partitionTotalWritten.add(totalWritten); partitionTotalFileCount.add(fileCount); @@ -155,7 +155,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler { blacklist.remove(worker); } - public void updateWorkerHeartBeatMeta( + public void updateWorkerHeartbeatMeta( String host, int rpcPort, int pushPort, diff --git a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/IMetadataHandler.java b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/IMetadataHandler.java index 7fad2fcc8..6498aa0bc 100644 --- a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/IMetadataHandler.java +++ b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/IMetadataHandler.java @@ -49,7 +49,7 @@ public interface IMetadataHandler { void handleWorkerRemove( String host, int rpcPort, int pushPort, int fetchPort, int replicatePort, String requestId); - void handleWorkerHeartBeat( + void handleWorkerHeartbeat( String host, int rpcPort, int pushPort, diff --git a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/SingleMasterMetaManager.java b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/SingleMasterMetaManager.java index 725f45274..0de64af99 100644 --- a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/SingleMasterMetaManager.java +++ b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/SingleMasterMetaManager.java @@ -64,7 +64,7 @@ public class SingleMasterMetaManager extends AbstractMetaManager { @Override public void handleAppHeartbeat( String appId, long totalWritten, long fileCount, long time, String requestId) { - updateAppHeartBeatMeta(appId, time, totalWritten, fileCount); + updateAppHeartbeatMeta(appId, time, totalWritten, fileCount); } @Override @@ -85,7 +85,7 @@ public class SingleMasterMetaManager extends AbstractMetaManager { } @Override - public void handleWorkerHeartBeat( + public void handleWorkerHeartbeat( String host, int rpcPort, int pushPort, @@ -94,7 +94,7 @@ public class SingleMasterMetaManager extends AbstractMetaManager { Map disks, long time, String requestId) { - updateWorkerHeartBeatMeta(host, rpcPort, pushPort, fetchPort, replicatePort, disks, time); + updateWorkerHeartbeatMeta(host, rpcPort, pushPort, fetchPort, replicatePort, disks, time); } @Override diff --git a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java index ef7b22b00..e6c3eb1c2 100644 --- a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java +++ b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java @@ -136,7 +136,7 @@ public class HAMasterMetaManager extends AbstractMetaManager { try { ratisServer.submitRequest( ResourceRequest.newBuilder() - .setCmdType(Type.AppHeartBeat) + .setCmdType(Type.AppHeartbeat) .setRequestId(requestId) .setAppHeartbeatRequest( ResourceProtos.AppHeartbeatRequest.newBuilder() @@ -210,7 +210,7 @@ public class HAMasterMetaManager extends AbstractMetaManager { } @Override - public void handleWorkerHeartBeat( + public void handleWorkerHeartbeat( String host, int rpcPort, int pushPort, @@ -222,10 +222,10 @@ public class HAMasterMetaManager extends AbstractMetaManager { try { ratisServer.submitRequest( ResourceRequest.newBuilder() - .setCmdType(Type.WorkerHeartBeat) + .setCmdType(Type.WorkerHeartbeat) .setRequestId(requestId) - .setWorkerHeartBeatRequest( - ResourceProtos.WorkerHeartBeatRequest.newBuilder() + .setWorkerHeartbeatRequest( + ResourceProtos.WorkerHeartbeatRequest.newBuilder() .setHost(host) .setRpcPort(rpcPort) .setPushPort(pushPort) diff --git a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/MetaHandler.java b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/MetaHandler.java index 1b698621c..e9eef683f 100644 --- a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/MetaHandler.java +++ b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/MetaHandler.java @@ -134,13 +134,13 @@ public class MetaHandler { metaSystem.updateUnregisterShuffleMeta(shuffleKey); break; - case AppHeartBeat: + case AppHeartbeat: appId = request.getAppHeartbeatRequest().getAppId(); LOG.debug("Handle app heartbeat for {}", appId); long time = request.getAppHeartbeatRequest().getTime(); long totalWritten = request.getAppHeartbeatRequest().getTotalWritten(); long fileCount = request.getAppHeartbeatRequest().getFileCount(); - metaSystem.updateAppHeartBeatMeta(appId, time, totalWritten, fileCount); + metaSystem.updateAppHeartbeatMeta(appId, time, totalWritten, fileCount); break; case AppLost: @@ -169,13 +169,13 @@ public class MetaHandler { metaSystem.updateWorkerRemoveMeta(host, rpcPort, pushPort, fetchPort, replicatePort); break; - case WorkerHeartBeat: - host = request.getWorkerHeartBeatRequest().getHost(); - rpcPort = request.getWorkerHeartBeatRequest().getRpcPort(); - pushPort = request.getWorkerHeartBeatRequest().getPushPort(); - fetchPort = request.getWorkerHeartBeatRequest().getFetchPort(); - disks = MetaUtil.fromPbDiskInfos(request.getWorkerHeartBeatRequest().getDisksMap()); - replicatePort = request.getWorkerHeartBeatRequest().getReplicatePort(); + case WorkerHeartbeat: + host = request.getWorkerHeartbeatRequest().getHost(); + rpcPort = request.getWorkerHeartbeatRequest().getRpcPort(); + pushPort = request.getWorkerHeartbeatRequest().getPushPort(); + fetchPort = request.getWorkerHeartbeatRequest().getFetchPort(); + disks = MetaUtil.fromPbDiskInfos(request.getWorkerHeartbeatRequest().getDisksMap()); + replicatePort = request.getWorkerHeartbeatRequest().getReplicatePort(); LOG.debug( "Handle worker heartbeat for {} {} {} {} {} {}", host, @@ -184,8 +184,8 @@ public class MetaHandler { fetchPort, replicatePort, disks); - time = request.getWorkerHeartBeatRequest().getTime(); - metaSystem.updateWorkerHeartBeatMeta( + time = request.getWorkerHeartbeatRequest().getTime(); + metaSystem.updateWorkerHeartbeatMeta( host, rpcPort, pushPort, fetchPort, replicatePort, disks, time); break; diff --git a/server-master/src/main/proto/Resource.proto b/server-master/src/main/proto/Resource.proto index 392aba09f..271d52b4d 100644 --- a/server-master/src/main/proto/Resource.proto +++ b/server-master/src/main/proto/Resource.proto @@ -24,10 +24,10 @@ enum Type { UnRegisterShuffle = 12; RequestSlots = 13; ReleaseSlots = 14; - AppHeartBeat = 15; + AppHeartbeat = 15; AppLost = 16; WorkerLost = 17; - WorkerHeartBeat = 18; + WorkerHeartbeat = 18; RegisterWorker = 19; ReportWorkerFailure = 20; UpdatePartitionSize = 21; @@ -45,7 +45,7 @@ message ResourceRequest { optional AppHeartbeatRequest appHeartbeatRequest = 13; optional AppLostRequest appLostRequest = 14; optional WorkerLostRequest workerLostRequest = 15; - optional WorkerHeartBeatRequest workerHeartBeatRequest = 16; + optional WorkerHeartbeatRequest workerHeartbeatRequest = 16; optional RegisterWorkerRequest registerWorkerRequest = 17; optional ReportWorkerFailureRequest reportWorkerFailureRequest = 18; optional WorkerRemoveRequest workerRemoveRequest = 19; @@ -106,7 +106,7 @@ message WorkerRemoveRequest { required int32 replicatePort = 5; } -message WorkerHeartBeatRequest { +message WorkerHeartbeatRequest { required string host = 1; required int32 rpcPort = 2; required int32 pushPort = 3; diff --git a/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/Master.scala b/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/Master.scala index e461a5d71..f881458ae 100644 --- a/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/Master.scala +++ b/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/Master.scala @@ -183,11 +183,11 @@ private[deploy] class Master( } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case HeartBeatFromApplication(appId, totalWritten, fileCount, requestId) => + case HeartbeatFromApplication(appId, totalWritten, fileCount, requestId) => logDebug(s"Received heartbeat from app $appId") executeWithLeaderChecker( context, - handleHeartBeatFromApplication(context, appId, totalWritten, fileCount, requestId)) + handleHeartbeatFromApplication(context, appId, totalWritten, fileCount, requestId)) case RegisterWorker(host, rpcPort, pushPort, fetchPort, replicatePort, disks, requestId) => logDebug(s"Received RegisterWorker request $requestId, $host:$pushPort:$replicatePort" + @@ -241,7 +241,7 @@ private[deploy] class Master( s" worker $host:$rpcPort:$pushPort:$fetchPort with $disks.") executeWithLeaderChecker( context, - handleHeartBeatFromWorker( + handleHeartbeatFromWorker( context, host, rpcPort, @@ -301,7 +301,7 @@ private[deploy] class Master( } } - private def handleHeartBeatFromWorker( + private def handleHeartbeatFromWorker( context: RpcCallContext, host: String, rpcPort: Int, @@ -317,7 +317,7 @@ private[deploy] class Master( logWarning(s"Received heartbeat from unknown worker " + s"$host:$rpcPort:$pushPort:$fetchPort:$replicatePort.") } else { - statusSystem.handleWorkerHeartBeat( + statusSystem.handleWorkerHeartbeat( host, rpcPort, pushPort, @@ -544,7 +544,7 @@ private[deploy] class Master( }) } - private def handleHeartBeatFromApplication( + private def handleHeartbeatFromApplication( context: RpcCallContext, appId: String, totalWritten: Long, diff --git a/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java b/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java index c5bb4b357..15b1d1431 100644 --- a/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java +++ b/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java @@ -298,7 +298,7 @@ public class DefaultMetaSystemSuiteJ { } @Test - public void testHandleWorkerHeartBeat() { + public void testHandleWorkerHeartbeat() { statusSystem.handleRegisterWorker( HOSTNAME1, RPCPORT1, @@ -324,7 +324,7 @@ public class DefaultMetaSystemSuiteJ { new HashMap<>(), getNewReqeustId()); - statusSystem.handleWorkerHeartBeat( + statusSystem.handleWorkerHeartbeat( HOSTNAME1, RPCPORT1, PUSHPORT1, @@ -336,7 +336,7 @@ public class DefaultMetaSystemSuiteJ { Assert.assertEquals(statusSystem.blacklist.size(), 1); - statusSystem.handleWorkerHeartBeat( + statusSystem.handleWorkerHeartbeat( HOSTNAME2, RPCPORT2, PUSHPORT2, @@ -348,7 +348,7 @@ public class DefaultMetaSystemSuiteJ { Assert.assertEquals(statusSystem.blacklist.size(), 2); - statusSystem.handleWorkerHeartBeat( + statusSystem.handleWorkerHeartbeat( HOSTNAME1, RPCPORT1, PUSHPORT1, FETCHPORT1, REPLICATEPORT3, disks1, 1, getNewReqeustId()); Assert.assertEquals(statusSystem.blacklist.size(), 2); diff --git a/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java b/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java index ffbc66f6c..6e1298d2e 100644 --- a/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java +++ b/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java @@ -500,7 +500,7 @@ public class RatisMasterStatusSystemSuiteJ { } @Test - public void testHandleWorkerHeartBeat() throws InterruptedException { + public void testHandleWorkerHeartbeat() throws InterruptedException { AbstractMetaManager statusSystem = pickLeaderStatusSystem(); Assert.assertNotNull(statusSystem); @@ -511,7 +511,7 @@ public class RatisMasterStatusSystemSuiteJ { statusSystem.handleRegisterWorker( HOSTNAME3, RPCPORT3, PUSHPORT3, FETCHPORT3, REPLICATEPORT3, disks3, getNewReqeustId()); - statusSystem.handleWorkerHeartBeat( + statusSystem.handleWorkerHeartbeat( HOSTNAME1, RPCPORT1, PUSHPORT1, @@ -526,7 +526,7 @@ public class RatisMasterStatusSystemSuiteJ { Assert.assertEquals(1, STATUSSYSTEM2.blacklist.size()); Assert.assertEquals(1, STATUSSYSTEM3.blacklist.size()); - statusSystem.handleWorkerHeartBeat( + statusSystem.handleWorkerHeartbeat( HOSTNAME2, RPCPORT2, PUSHPORT2, @@ -542,7 +542,7 @@ public class RatisMasterStatusSystemSuiteJ { Assert.assertEquals(2, STATUSSYSTEM2.blacklist.size()); Assert.assertEquals(2, STATUSSYSTEM3.blacklist.size()); - statusSystem.handleWorkerHeartBeat( + statusSystem.handleWorkerHeartbeat( HOSTNAME1, RPCPORT1, PUSHPORT1, FETCHPORT1, REPLICATEPORT1, disks1, 1, getNewReqeustId()); Thread.sleep(3000L);