[ISSUE-565][REFACTOR] Unify RPC name HeartbeatXxxxx (#566)

This commit is contained in:
AngersZhuuuu 2022-09-07 21:33:18 +08:00 committed by GitHub
parent 9152906bbd
commit da7ac1721b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 68 additions and 68 deletions

View File

@ -143,7 +143,7 @@ FetchPort: 35689
TotalSlots: 4096
SlotsUsed: 0
SlotsAvailable: 4096
LastHeartBeat: 0
LastHeartbeat: 0
WorkerRef: NettyRpcEndpointRef(ess://WorkerEndpoint@172.16.159.100:40115)
.
21/12/21 20:06:23,785 INFO [netty-rpc-connection-1] TransportClientFactory: Successfully created connection to /172.16.159.98:39151 after 1 ms (0 ms spent in bootstraps)
@ -155,7 +155,7 @@ FetchPort: 37455
TotalSlots: 4096
SlotsUsed: 0
SlotsAvailable: 4096
LastHeartBeat: 0
LastHeartbeat: 0
WorkerRef: NettyRpcEndpointRef(ess://WorkerEndpoint@172.16.159.98:39151)
.
21/12/21 20:06:25,948 INFO [netty-rpc-connection-2] TransportClientFactory: Successfully created connection to /172.16.159.99:41955 after 1 ms (0 ms spent in bootstraps)
@ -167,7 +167,7 @@ FetchPort: 46865
TotalSlots: 4096
SlotsUsed: 0
SlotsAvailable: 4096
LastHeartBeat: 0
LastHeartbeat: 0
WorkerRef: NettyRpcEndpointRef(ess://WorkerEndpoint@172.16.159.99:41955)
```

File diff suppressed because one or more lines are too long

View File

@ -137,7 +137,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
val tmpFileCount = fileCount.sumThenReset()
logDebug(s"Send app heartbeat with $tmpTotalWritten $tmpFileCount")
val appHeartbeat =
HeartBeatFromApplication(appId, tmpTotalWritten, tmpFileCount, ZERO_UUID)
HeartbeatFromApplication(appId, tmpTotalWritten, tmpFileCount, ZERO_UUID)
rssHARetryClient.send(appHeartbeat)
logDebug("Successfully send app heartbeat.")
} catch {

View File

@ -253,7 +253,7 @@ message PbApplicationLostResponse {
int32 status = 1;
}
message PbHeartBeatFromApplication {
message PbHeartbeatFromApplication {
string appId = 1;
int64 totalWritten = 2;
int64 fileCount = 3 ;

View File

@ -221,7 +221,7 @@ class WorkerInfo(
|FetchPort: $fetchPort
|ReplicatePort: $replicatePort
|SlotsUsed: $usedSlots()
|LastHeartBeat: $lastHeartbeat
|LastHeartbeat: $lastHeartbeat
|Disks: $diskInfos
|WorkerRef: $endpoint
|""".stripMargin

View File

@ -40,7 +40,7 @@ class RPCSource(rssConf: RssConf, role: String) extends AbstractSource(rssConf,
addCounter(RPCChunkFetchRequestNum)
// Master RPC
addCounter(RPCHeartBeatFromApplicationNum)
addCounter(RPCHeartbeatFromApplicationNum)
addCounter(RPCHeartbeatFromWorkerNum)
addCounter(RPCRegisterWorkerNum)
addCounter(RPCRequestSlotsNum)
@ -71,8 +71,8 @@ class RPCSource(rssConf: RssConf, role: String) extends AbstractSource(rssConf,
incCounter(RPCPushMergedDataSize, messageLen)
case _: ChunkFetchRequest =>
incCounter(RPCChunkFetchRequestNum)
case _: HeartBeatFromApplication =>
incCounter(RPCHeartBeatFromApplicationNum)
case _: HeartbeatFromApplication =>
incCounter(RPCHeartbeatFromApplicationNum)
case _: HeartbeatFromWorker =>
incCounter(RPCHeartbeatFromWorkerNum)
case _: RegisterWorker =>
@ -111,7 +111,7 @@ object RPCSource {
val RPCChunkFetchRequestNum = "RPCChunkFetchRequestNum"
// Master RPC
val RPCHeartBeatFromApplicationNum = "RPCHeartBeatFromApplicationNum"
val RPCHeartbeatFromApplicationNum = "RPCHeartbeatFromApplicationNum"
val RPCHeartbeatFromWorkerNum = "RPCHeartbeatFromWorkerNum"
val RPCRegisterWorkerNum = "RPCRegisterWorkerNum"
val RPCRequestSlotsNum = "RPCRequestSlotsNum"

View File

@ -285,8 +285,8 @@ sealed trait Message extends Serializable {
.setStatus(status.getValue).build().toByteArray
new TransportMessage(MessageType.APPLICATION_LOST_RESPONSE, payload)
case HeartBeatFromApplication(appId, totalWritten, fileCount, requestId) =>
val payload = PbHeartBeatFromApplication.newBuilder()
case HeartbeatFromApplication(appId, totalWritten, fileCount, requestId) =>
val payload = PbHeartbeatFromApplication.newBuilder()
.setAppId(appId)
.setRequestId(requestId)
.setTotalWritten(totalWritten)
@ -626,7 +626,7 @@ object ControlMessages extends Logging {
case class ApplicationLostResponse(status: StatusCode) extends MasterMessage
case class HeartBeatFromApplication(
case class HeartbeatFromApplication(
appId: String,
totalWritten: Long,
fileCount: Long,
@ -770,12 +770,12 @@ object ControlMessages extends Logging {
pbHeartbeatFromWorker.getRequestId)
case HEARTBEAT_RESPONSE =>
val pbHeartBeatResponse = PbHeartbeatResponse.parseFrom(message.getPayload)
val pbHeartbeatResponse = PbHeartbeatResponse.parseFrom(message.getPayload)
val expiredShuffleKeys = new util.HashSet[String]()
if (pbHeartBeatResponse.getExpiredShuffleKeysCount > 0) {
expiredShuffleKeys.addAll(pbHeartBeatResponse.getExpiredShuffleKeysList)
if (pbHeartbeatResponse.getExpiredShuffleKeysCount > 0) {
expiredShuffleKeys.addAll(pbHeartbeatResponse.getExpiredShuffleKeysList)
}
HeartbeatResponse(expiredShuffleKeys, pbHeartBeatResponse.getRegistered)
HeartbeatResponse(expiredShuffleKeys, pbHeartbeatResponse.getRegistered)
case REGISTER_SHUFFLE =>
val pbRegisterShuffle = PbRegisterShuffle.parseFrom(message.getPayload)
@ -905,12 +905,12 @@ object ControlMessages extends Logging {
ApplicationLostResponse(Utils.toStatusCode(pbApplicationLostResponse.getStatus))
case HEARTBEAT_FROM_APPLICATION =>
val pbHeartBeatFromApplication = PbHeartBeatFromApplication.parseFrom(message.getPayload)
HeartBeatFromApplication(
pbHeartBeatFromApplication.getAppId,
pbHeartBeatFromApplication.getTotalWritten,
pbHeartBeatFromApplication.getFileCount,
pbHeartBeatFromApplication.getRequestId)
val pbHeartbeatFromApplication = PbHeartbeatFromApplication.parseFrom(message.getPayload)
HeartbeatFromApplication(
pbHeartbeatFromApplication.getAppId,
pbHeartbeatFromApplication.getTotalWritten,
pbHeartbeatFromApplication.getFileCount,
pbHeartbeatFromApplication.getRequestId)
case GET_BLACKLIST =>
val pbGetBlacklist = PbGetBlacklist.parseFrom(message.getPayload)

View File

@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
import com.aliyun.emr.rss.common.RssConf;
import com.aliyun.emr.rss.common.exception.RssException;
import com.aliyun.emr.rss.common.protocol.message.ControlMessages.HeartBeatFromApplication;
import com.aliyun.emr.rss.common.protocol.message.ControlMessages.HeartbeatFromApplication;
import com.aliyun.emr.rss.common.protocol.message.ControlMessages.HeartbeatFromWorker;
import com.aliyun.emr.rss.common.protocol.message.ControlMessages.HeartbeatResponse;
import com.aliyun.emr.rss.common.protocol.message.ControlMessages.OneWayMessageResponse$;
@ -81,7 +81,7 @@ public class RssHARetryClientSuiteJ {
prepareForRpcEnvWithoutHA();
RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
HeartBeatFromApplication message = Mockito.mock(HeartBeatFromApplication.class);
HeartbeatFromApplication message = Mockito.mock(HeartbeatFromApplication.class);
try {
client.send(message);
@ -108,7 +108,7 @@ public class RssHARetryClientSuiteJ {
prepareForRpcEnvWithoutHA();
RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
HeartBeatFromApplication message = Mockito.mock(HeartBeatFromApplication.class);
HeartbeatFromApplication message = Mockito.mock(HeartbeatFromApplication.class);
try {
client.send(message);
@ -134,7 +134,7 @@ public class RssHARetryClientSuiteJ {
});
RssHARetryClient client = new RssHARetryClient(rpcEnv, conf);
HeartBeatFromApplication message = Mockito.mock(HeartBeatFromApplication.class);
HeartbeatFromApplication message = Mockito.mock(HeartbeatFromApplication.class);
try {
client.send(message);

View File

@ -117,7 +117,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
registeredShuffle.remove(shuffleKey);
}
public void updateAppHeartBeatMeta(String appId, long time, long totalWritten, long fileCount) {
public void updateAppHeartbeatMeta(String appId, long time, long totalWritten, long fileCount) {
appHeartbeatTime.put(appId, time);
partitionTotalWritten.add(totalWritten);
partitionTotalFileCount.add(fileCount);
@ -155,7 +155,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
blacklist.remove(worker);
}
public void updateWorkerHeartBeatMeta(
public void updateWorkerHeartbeatMeta(
String host,
int rpcPort,
int pushPort,

View File

@ -49,7 +49,7 @@ public interface IMetadataHandler {
void handleWorkerRemove(
String host, int rpcPort, int pushPort, int fetchPort, int replicatePort, String requestId);
void handleWorkerHeartBeat(
void handleWorkerHeartbeat(
String host,
int rpcPort,
int pushPort,

View File

@ -64,7 +64,7 @@ public class SingleMasterMetaManager extends AbstractMetaManager {
@Override
public void handleAppHeartbeat(
String appId, long totalWritten, long fileCount, long time, String requestId) {
updateAppHeartBeatMeta(appId, time, totalWritten, fileCount);
updateAppHeartbeatMeta(appId, time, totalWritten, fileCount);
}
@Override
@ -85,7 +85,7 @@ public class SingleMasterMetaManager extends AbstractMetaManager {
}
@Override
public void handleWorkerHeartBeat(
public void handleWorkerHeartbeat(
String host,
int rpcPort,
int pushPort,
@ -94,7 +94,7 @@ public class SingleMasterMetaManager extends AbstractMetaManager {
Map<String, DiskInfo> disks,
long time,
String requestId) {
updateWorkerHeartBeatMeta(host, rpcPort, pushPort, fetchPort, replicatePort, disks, time);
updateWorkerHeartbeatMeta(host, rpcPort, pushPort, fetchPort, replicatePort, disks, time);
}
@Override

View File

@ -136,7 +136,7 @@ public class HAMasterMetaManager extends AbstractMetaManager {
try {
ratisServer.submitRequest(
ResourceRequest.newBuilder()
.setCmdType(Type.AppHeartBeat)
.setCmdType(Type.AppHeartbeat)
.setRequestId(requestId)
.setAppHeartbeatRequest(
ResourceProtos.AppHeartbeatRequest.newBuilder()
@ -210,7 +210,7 @@ public class HAMasterMetaManager extends AbstractMetaManager {
}
@Override
public void handleWorkerHeartBeat(
public void handleWorkerHeartbeat(
String host,
int rpcPort,
int pushPort,
@ -222,10 +222,10 @@ public class HAMasterMetaManager extends AbstractMetaManager {
try {
ratisServer.submitRequest(
ResourceRequest.newBuilder()
.setCmdType(Type.WorkerHeartBeat)
.setCmdType(Type.WorkerHeartbeat)
.setRequestId(requestId)
.setWorkerHeartBeatRequest(
ResourceProtos.WorkerHeartBeatRequest.newBuilder()
.setWorkerHeartbeatRequest(
ResourceProtos.WorkerHeartbeatRequest.newBuilder()
.setHost(host)
.setRpcPort(rpcPort)
.setPushPort(pushPort)

View File

@ -134,13 +134,13 @@ public class MetaHandler {
metaSystem.updateUnregisterShuffleMeta(shuffleKey);
break;
case AppHeartBeat:
case AppHeartbeat:
appId = request.getAppHeartbeatRequest().getAppId();
LOG.debug("Handle app heartbeat for {}", appId);
long time = request.getAppHeartbeatRequest().getTime();
long totalWritten = request.getAppHeartbeatRequest().getTotalWritten();
long fileCount = request.getAppHeartbeatRequest().getFileCount();
metaSystem.updateAppHeartBeatMeta(appId, time, totalWritten, fileCount);
metaSystem.updateAppHeartbeatMeta(appId, time, totalWritten, fileCount);
break;
case AppLost:
@ -169,13 +169,13 @@ public class MetaHandler {
metaSystem.updateWorkerRemoveMeta(host, rpcPort, pushPort, fetchPort, replicatePort);
break;
case WorkerHeartBeat:
host = request.getWorkerHeartBeatRequest().getHost();
rpcPort = request.getWorkerHeartBeatRequest().getRpcPort();
pushPort = request.getWorkerHeartBeatRequest().getPushPort();
fetchPort = request.getWorkerHeartBeatRequest().getFetchPort();
disks = MetaUtil.fromPbDiskInfos(request.getWorkerHeartBeatRequest().getDisksMap());
replicatePort = request.getWorkerHeartBeatRequest().getReplicatePort();
case WorkerHeartbeat:
host = request.getWorkerHeartbeatRequest().getHost();
rpcPort = request.getWorkerHeartbeatRequest().getRpcPort();
pushPort = request.getWorkerHeartbeatRequest().getPushPort();
fetchPort = request.getWorkerHeartbeatRequest().getFetchPort();
disks = MetaUtil.fromPbDiskInfos(request.getWorkerHeartbeatRequest().getDisksMap());
replicatePort = request.getWorkerHeartbeatRequest().getReplicatePort();
LOG.debug(
"Handle worker heartbeat for {} {} {} {} {} {}",
host,
@ -184,8 +184,8 @@ public class MetaHandler {
fetchPort,
replicatePort,
disks);
time = request.getWorkerHeartBeatRequest().getTime();
metaSystem.updateWorkerHeartBeatMeta(
time = request.getWorkerHeartbeatRequest().getTime();
metaSystem.updateWorkerHeartbeatMeta(
host, rpcPort, pushPort, fetchPort, replicatePort, disks, time);
break;

View File

@ -24,10 +24,10 @@ enum Type {
UnRegisterShuffle = 12;
RequestSlots = 13;
ReleaseSlots = 14;
AppHeartBeat = 15;
AppHeartbeat = 15;
AppLost = 16;
WorkerLost = 17;
WorkerHeartBeat = 18;
WorkerHeartbeat = 18;
RegisterWorker = 19;
ReportWorkerFailure = 20;
UpdatePartitionSize = 21;
@ -45,7 +45,7 @@ message ResourceRequest {
optional AppHeartbeatRequest appHeartbeatRequest = 13;
optional AppLostRequest appLostRequest = 14;
optional WorkerLostRequest workerLostRequest = 15;
optional WorkerHeartBeatRequest workerHeartBeatRequest = 16;
optional WorkerHeartbeatRequest workerHeartbeatRequest = 16;
optional RegisterWorkerRequest registerWorkerRequest = 17;
optional ReportWorkerFailureRequest reportWorkerFailureRequest = 18;
optional WorkerRemoveRequest workerRemoveRequest = 19;
@ -106,7 +106,7 @@ message WorkerRemoveRequest {
required int32 replicatePort = 5;
}
message WorkerHeartBeatRequest {
message WorkerHeartbeatRequest {
required string host = 1;
required int32 rpcPort = 2;
required int32 pushPort = 3;

View File

@ -183,11 +183,11 @@ private[deploy] class Master(
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case HeartBeatFromApplication(appId, totalWritten, fileCount, requestId) =>
case HeartbeatFromApplication(appId, totalWritten, fileCount, requestId) =>
logDebug(s"Received heartbeat from app $appId")
executeWithLeaderChecker(
context,
handleHeartBeatFromApplication(context, appId, totalWritten, fileCount, requestId))
handleHeartbeatFromApplication(context, appId, totalWritten, fileCount, requestId))
case RegisterWorker(host, rpcPort, pushPort, fetchPort, replicatePort, disks, requestId) =>
logDebug(s"Received RegisterWorker request $requestId, $host:$pushPort:$replicatePort" +
@ -241,7 +241,7 @@ private[deploy] class Master(
s" worker $host:$rpcPort:$pushPort:$fetchPort with $disks.")
executeWithLeaderChecker(
context,
handleHeartBeatFromWorker(
handleHeartbeatFromWorker(
context,
host,
rpcPort,
@ -301,7 +301,7 @@ private[deploy] class Master(
}
}
private def handleHeartBeatFromWorker(
private def handleHeartbeatFromWorker(
context: RpcCallContext,
host: String,
rpcPort: Int,
@ -317,7 +317,7 @@ private[deploy] class Master(
logWarning(s"Received heartbeat from unknown worker " +
s"$host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
} else {
statusSystem.handleWorkerHeartBeat(
statusSystem.handleWorkerHeartbeat(
host,
rpcPort,
pushPort,
@ -544,7 +544,7 @@ private[deploy] class Master(
})
}
private def handleHeartBeatFromApplication(
private def handleHeartbeatFromApplication(
context: RpcCallContext,
appId: String,
totalWritten: Long,

View File

@ -298,7 +298,7 @@ public class DefaultMetaSystemSuiteJ {
}
@Test
public void testHandleWorkerHeartBeat() {
public void testHandleWorkerHeartbeat() {
statusSystem.handleRegisterWorker(
HOSTNAME1,
RPCPORT1,
@ -324,7 +324,7 @@ public class DefaultMetaSystemSuiteJ {
new HashMap<>(),
getNewReqeustId());
statusSystem.handleWorkerHeartBeat(
statusSystem.handleWorkerHeartbeat(
HOSTNAME1,
RPCPORT1,
PUSHPORT1,
@ -336,7 +336,7 @@ public class DefaultMetaSystemSuiteJ {
Assert.assertEquals(statusSystem.blacklist.size(), 1);
statusSystem.handleWorkerHeartBeat(
statusSystem.handleWorkerHeartbeat(
HOSTNAME2,
RPCPORT2,
PUSHPORT2,
@ -348,7 +348,7 @@ public class DefaultMetaSystemSuiteJ {
Assert.assertEquals(statusSystem.blacklist.size(), 2);
statusSystem.handleWorkerHeartBeat(
statusSystem.handleWorkerHeartbeat(
HOSTNAME1, RPCPORT1, PUSHPORT1, FETCHPORT1, REPLICATEPORT3, disks1, 1, getNewReqeustId());
Assert.assertEquals(statusSystem.blacklist.size(), 2);

View File

@ -500,7 +500,7 @@ public class RatisMasterStatusSystemSuiteJ {
}
@Test
public void testHandleWorkerHeartBeat() throws InterruptedException {
public void testHandleWorkerHeartbeat() throws InterruptedException {
AbstractMetaManager statusSystem = pickLeaderStatusSystem();
Assert.assertNotNull(statusSystem);
@ -511,7 +511,7 @@ public class RatisMasterStatusSystemSuiteJ {
statusSystem.handleRegisterWorker(
HOSTNAME3, RPCPORT3, PUSHPORT3, FETCHPORT3, REPLICATEPORT3, disks3, getNewReqeustId());
statusSystem.handleWorkerHeartBeat(
statusSystem.handleWorkerHeartbeat(
HOSTNAME1,
RPCPORT1,
PUSHPORT1,
@ -526,7 +526,7 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(1, STATUSSYSTEM2.blacklist.size());
Assert.assertEquals(1, STATUSSYSTEM3.blacklist.size());
statusSystem.handleWorkerHeartBeat(
statusSystem.handleWorkerHeartbeat(
HOSTNAME2,
RPCPORT2,
PUSHPORT2,
@ -542,7 +542,7 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(2, STATUSSYSTEM2.blacklist.size());
Assert.assertEquals(2, STATUSSYSTEM3.blacklist.size());
statusSystem.handleWorkerHeartBeat(
statusSystem.handleWorkerHeartbeat(
HOSTNAME1, RPCPORT1, PUSHPORT1, FETCHPORT1, REPLICATEPORT1, disks1, 1, getNewReqeustId());
Thread.sleep(3000L);