Generate TransportMessages.proto into multiple files (#549)

This commit is contained in:
Cheng Pan 2022-09-06 00:26:24 +08:00 committed by GitHub
parent 4f24483cc9
commit cc4e32b146
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 113 additions and 122 deletions

View File

@ -19,19 +19,19 @@ package com.aliyun.emr.rss.common.network.protocol;
import java.io.Serializable;
import com.aliyun.emr.rss.common.protocol.TransportMessages;
import com.aliyun.emr.rss.common.protocol.MessageType;
public class TransportMessage implements Serializable {
private static final long serialVersionUID = -3259000920699629773L;
private final TransportMessages.MessageType type;
private final MessageType type;
private final byte[] payload;
public TransportMessage(TransportMessages.MessageType type, byte[] payload) {
public TransportMessage(MessageType type, byte[] payload) {
this.type = type;
this.payload = payload;
}
public TransportMessages.MessageType getType() {
public MessageType getType() {
return type;
}

View File

@ -20,7 +20,6 @@ package com.aliyun.emr.rss.common.protocol;
import java.io.Serializable;
import com.aliyun.emr.rss.common.meta.WorkerInfo;
import com.aliyun.emr.rss.common.protocol.TransportMessages.PbPartitionLocation;
public class PartitionLocation implements Serializable {
public enum Mode {
@ -330,7 +329,7 @@ public class PartitionLocation implements Serializable {
}
public static PbPartitionLocation toPbPartitionLocation(PartitionLocation location) {
PbPartitionLocation.Builder builder = TransportMessages.PbPartitionLocation.newBuilder();
PbPartitionLocation.Builder builder = PbPartitionLocation.newBuilder();
if (location.mode == Mode.Master) {
builder.setMode(PbPartitionLocation.Mode.Master);
} else {
@ -346,7 +345,7 @@ public class PartitionLocation implements Serializable {
builder.setStorageInfo(StorageInfo.toPb(location.storageInfo));
if (location.getPeer() != null) {
PbPartitionLocation.Builder peerBuilder = TransportMessages.PbPartitionLocation.newBuilder();
PbPartitionLocation.Builder peerBuilder = PbPartitionLocation.newBuilder();
if (location.getPeer().mode == Mode.Master) {
peerBuilder.setMode(PbPartitionLocation.Mode.Master);
} else {

View File

@ -21,8 +21,6 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import com.aliyun.emr.rss.common.protocol.TransportMessages.PbStorageInfo;
public class StorageInfo implements Serializable {
public static String UNKNOWN_DISK = "UNKNOWN_DISK";

View File

@ -25,29 +25,26 @@ import java.util.concurrent.ConcurrentHashMap;
import com.google.protobuf.InvalidProtocolBufferException;
import com.aliyun.emr.rss.common.meta.FileInfo;
import com.aliyun.emr.rss.common.protocol.TransportMessages;
import com.aliyun.emr.rss.common.protocol.*;
public class PBSerDeUtils {
public static Set<String> fromPbSortedShuffleFileSet(byte[] data)
throws InvalidProtocolBufferException {
TransportMessages.PbSortedShuffleFileSet pbSortedShuffleFileSet =
TransportMessages.PbSortedShuffleFileSet.parseFrom(data);
PbSortedShuffleFileSet pbSortedShuffleFileSet = PbSortedShuffleFileSet.parseFrom(data);
Set<String> files = ConcurrentHashMap.newKeySet();
files.addAll(pbSortedShuffleFileSet.getFilesList());
return files;
}
public static byte[] toPbSortedShuffleFileSet(Set<String> files) {
TransportMessages.PbSortedShuffleFileSet.Builder builder =
TransportMessages.PbSortedShuffleFileSet.newBuilder();
PbSortedShuffleFileSet.Builder builder = PbSortedShuffleFileSet.newBuilder();
builder.addAllFiles(files);
return builder.build().toByteArray();
}
public static ArrayList<Integer> fromPbStoreVersion(byte[] data)
throws InvalidProtocolBufferException {
TransportMessages.PbStoreVersion pbStoreVersion =
TransportMessages.PbStoreVersion.parseFrom(data);
PbStoreVersion pbStoreVersion = PbStoreVersion.parseFrom(data);
ArrayList<Integer> versions = new ArrayList<>();
versions.add(pbStoreVersion.getMajor());
versions.add(pbStoreVersion.getMinor());
@ -55,40 +52,37 @@ public class PBSerDeUtils {
}
public static byte[] toPbStoreVersion(int major, int minor) {
TransportMessages.PbStoreVersion.Builder builder =
TransportMessages.PbStoreVersion.newBuilder();
PbStoreVersion.Builder builder = PbStoreVersion.newBuilder();
builder.setMajor(major);
builder.setMinor(minor);
return builder.build().toByteArray();
}
public static FileInfo fromPbFileInfo(TransportMessages.PbFileInfo pbFileInfo)
public static FileInfo fromPbFileInfo(PbFileInfo pbFileInfo)
throws InvalidProtocolBufferException {
return new FileInfo(
pbFileInfo.getFilePath(), new ArrayList<>(pbFileInfo.getChunkOffsetsList()));
}
public static TransportMessages.PbFileInfo toPbFileInfo(FileInfo fileInfo) {
TransportMessages.PbFileInfo.Builder builder = TransportMessages.PbFileInfo.newBuilder();
public static PbFileInfo toPbFileInfo(FileInfo fileInfo) {
PbFileInfo.Builder builder = PbFileInfo.newBuilder();
builder.setFilePath(fileInfo.getFilePath()).addAllChunkOffsets(fileInfo.getChunkOffsets());
return builder.build();
}
public static ConcurrentHashMap<String, FileInfo> fromPbFileInfoMap(byte[] data)
throws InvalidProtocolBufferException {
TransportMessages.PBFileInfoMap pbFileInfoMap = TransportMessages.PBFileInfoMap.parseFrom(data);
PBFileInfoMap pbFileInfoMap = PBFileInfoMap.parseFrom(data);
ConcurrentHashMap<String, FileInfo> fileInfoMap = new ConcurrentHashMap<>();
for (Map.Entry<String, TransportMessages.PbFileInfo> entry :
pbFileInfoMap.getValuesMap().entrySet()) {
for (Map.Entry<String, PbFileInfo> entry : pbFileInfoMap.getValuesMap().entrySet()) {
fileInfoMap.put(entry.getKey(), fromPbFileInfo(entry.getValue()));
}
return fileInfoMap;
}
public static byte[] toPbFileInfoMap(ConcurrentHashMap<String, FileInfo> fileInfoMap) {
TransportMessages.PBFileInfoMap.Builder builder = TransportMessages.PBFileInfoMap.newBuilder();
ConcurrentHashMap<String, TransportMessages.PbFileInfo> pbFileInfoMap =
new ConcurrentHashMap<>();
PBFileInfoMap.Builder builder = PBFileInfoMap.newBuilder();
ConcurrentHashMap<String, PbFileInfo> pbFileInfoMap = new ConcurrentHashMap<>();
for (Map.Entry<String, FileInfo> entry : fileInfoMap.entrySet()) {
pbFileInfoMap.put(entry.getKey(), toPbFileInfo(entry.getValue()));
}

View File

@ -15,7 +15,7 @@
syntax = "proto3";
option java_package = "com.aliyun.emr.rss.common.protocol";
option java_outer_classname = "TransportMessages";
option java_multiple_files = true;
enum MessageType {
UNKNOWN_MESSAGE = 0;

View File

@ -23,7 +23,7 @@ import java.util.Objects
import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, mapAsScalaMapConverter}
import com.aliyun.emr.rss.common.internal.Logging
import com.aliyun.emr.rss.common.protocol.TransportMessages.{PbDiskInfo, PbWorkerInfo}
import com.aliyun.emr.rss.common.protocol.{PbDiskInfo, PbWorkerInfo}
import com.aliyun.emr.rss.common.rpc.RpcEndpointRef
import com.aliyun.emr.rss.common.rpc.netty.NettyRpcEndpointRef

View File

@ -25,23 +25,23 @@ import scala.collection.JavaConverters._
import com.aliyun.emr.rss.common.internal.Logging
import com.aliyun.emr.rss.common.meta.{DiskInfo, WorkerInfo}
import com.aliyun.emr.rss.common.network.protocol.TransportMessage
import com.aliyun.emr.rss.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType, StorageInfo, TransportMessages}
import com.aliyun.emr.rss.common.protocol.TransportMessages._
import com.aliyun.emr.rss.common.protocol.TransportMessages.MessageType._
import com.aliyun.emr.rss.common.protocol._
import com.aliyun.emr.rss.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType, StorageInfo}
import com.aliyun.emr.rss.common.protocol.MessageType._
import com.aliyun.emr.rss.common.util.Utils
sealed trait Message extends Serializable {
import com.aliyun.emr.rss.common.protocol.message.ControlMessages._
def toTransportMessage(): TransportMessage = {
def toTransportMessage: TransportMessage = {
this match {
case CheckForWorkerTimeOut =>
new TransportMessage(TransportMessages.MessageType.CHECK_FOR_WORKER_TIMEOUT, null)
new TransportMessage(MessageType.CHECK_FOR_WORKER_TIMEOUT, null)
case CheckForApplicationTimeOut =>
new TransportMessage(TransportMessages.MessageType.CHECK_FOR_APPLICATION_TIMEOUT, null)
new TransportMessage(MessageType.CHECK_FOR_APPLICATION_TIMEOUT, null)
case RemoveExpiredShuffle =>
new TransportMessage(TransportMessages.MessageType.REMOVE_EXPIRED_SHUFFLE, null)
new TransportMessage(MessageType.REMOVE_EXPIRED_SHUFFLE, null)
case RegisterWorker(host, rpcPort, pushPort, fetchPort, replicatePort, disks, requestId) =>
val pbDisks = disks.asScala
@ -52,7 +52,7 @@ sealed trait Message extends Serializable {
.setAvgFlushTime(item._2.avgFlushTime)
.setUsedSlots(item._2.activeSlots)
.build()).toMap.asJava
val payload = TransportMessages.PbRegisterWorker.newBuilder()
val payload = PbRegisterWorker.newBuilder()
.setHost(host)
.setRpcPort(rpcPort)
.setPushPort(pushPort)
@ -61,7 +61,7 @@ sealed trait Message extends Serializable {
.putAllDisks(pbDisks)
.setRequestId(requestId)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.REGISTER_WORKER, payload)
new TransportMessage(MessageType.REGISTER_WORKER, payload)
case HeartbeatFromWorker(
host,
@ -81,7 +81,7 @@ sealed trait Message extends Serializable {
.setUsedSlots(item._2.activeSlots)
.setStatus(item._2.status.getValue)
.build()).toMap.asJava
val payload = TransportMessages.PbHeartbeatFromWorker.newBuilder()
val payload = PbHeartbeatFromWorker.newBuilder()
.setHost(host)
.setRpcPort(rpcPort)
.setPushPort(pushPort)
@ -91,33 +91,33 @@ sealed trait Message extends Serializable {
.addAllShuffleKeys(shuffleKeys)
.setRequestId(requestId)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.HEARTBEAT_FROM_WORKER, payload)
new TransportMessage(MessageType.HEARTBEAT_FROM_WORKER, payload)
case HeartbeatResponse(expiredShuffleKeys, registered) =>
val payload = TransportMessages.PbHeartbeatResponse.newBuilder()
val payload = PbHeartbeatResponse.newBuilder()
.addAllExpiredShuffleKeys(expiredShuffleKeys)
.setRegistered(registered)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.HEARTBEAT_RESPONSE, payload)
new TransportMessage(MessageType.HEARTBEAT_RESPONSE, payload)
case RegisterShuffle(applicationId, shuffleId, numMappers, numPartitions) =>
val payload = TransportMessages.PbRegisterShuffle.newBuilder()
val payload = PbRegisterShuffle.newBuilder()
.setApplicationId(applicationId)
.setShuffleId(shuffleId)
.setNumMapppers(numMappers)
.setNumPartitions(numPartitions)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.REGISTER_SHUFFLE, payload)
new TransportMessage(MessageType.REGISTER_SHUFFLE, payload)
case RegisterShuffleResponse(status, partitionLocations) =>
val builder = TransportMessages.PbRegisterShuffleResponse.newBuilder()
val builder = PbRegisterShuffleResponse.newBuilder()
.setStatus(status.getValue)
if (!partitionLocations.isEmpty) {
builder.addAllPartitionLocations(partitionLocations.iterator().asScala
.map(PartitionLocation.toPbPartitionLocation).toList.asJava)
}
val payload = builder.build().toByteArray
new TransportMessage(TransportMessages.MessageType.REGISTER_SHUFFLE_RESPONSE, payload)
new TransportMessage(MessageType.REGISTER_SHUFFLE_RESPONSE, payload)
case RequestSlots(
applicationId,
@ -126,7 +126,7 @@ sealed trait Message extends Serializable {
hostname,
shouldReplicate,
requestId) =>
val payload = TransportMessages.PbRequestSlots.newBuilder()
val payload = PbRequestSlots.newBuilder()
.setApplicationId(applicationId)
.setShuffleId(shuffleId)
.addAllPartitionIdList(partitionIdList)
@ -134,34 +134,34 @@ sealed trait Message extends Serializable {
.setShouldReplicate(shouldReplicate)
.setRequestId(requestId)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.REQUEST_SLOTS, payload)
new TransportMessage(MessageType.REQUEST_SLOTS, payload)
case ReleaseSlots(applicationId, shuffleId, workerIds, slots, requestId) =>
val pbSlots = slots.asScala.map(slot =>
PbSlotInfo.newBuilder().putAllSlot(slot).build()).toList
val payload = TransportMessages.PbReleaseSlots.newBuilder()
val payload = PbReleaseSlots.newBuilder()
.setApplicationId(applicationId)
.setShuffleId(shuffleId)
.setRequestId(requestId)
.addAllWorkerIds(workerIds)
.addAllSlots(pbSlots.asJava)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.RELEASE_SLOTS, payload)
new TransportMessage(MessageType.RELEASE_SLOTS, payload)
case ReleaseSlotsResponse(status) =>
val payload = TransportMessages.PbReleaseSlotsResponse.newBuilder()
val payload = PbReleaseSlotsResponse.newBuilder()
.setStatus(status.getValue).build().toByteArray
new TransportMessage(TransportMessages.MessageType.RELEASE_SLOTS_RESPONSE, payload)
new TransportMessage(MessageType.RELEASE_SLOTS_RESPONSE, payload)
case RequestSlotsResponse(status, workerResource) =>
val builder = TransportMessages.PbRequestSlotsResponse.newBuilder()
val builder = PbRequestSlotsResponse.newBuilder()
.setStatus(status.getValue)
if (!workerResource.isEmpty) {
builder.putAllWorkerResource(
Utils.convertWorkerResourceToPbWorkerResource(workerResource))
}
val payload = builder.build().toByteArray
new TransportMessage(TransportMessages.MessageType.REQUEST_SLOTS_RESPONSE, payload)
new TransportMessage(MessageType.REQUEST_SLOTS_RESPONSE, payload)
case Revive(
applicationId,
@ -172,7 +172,7 @@ sealed trait Message extends Serializable {
epoch,
oldPartition,
cause) =>
val builder = TransportMessages.PbRevive.newBuilder()
val builder = PbRevive.newBuilder()
builder.setApplicationId(applicationId)
.setShuffleId(shuffleId)
.setMapId(mapId)
@ -184,41 +184,41 @@ sealed trait Message extends Serializable {
builder.setOldPartition(PartitionLocation.toPbPartitionLocation(oldPartition))
}
val payload = builder.build().toByteArray
new TransportMessage(TransportMessages.MessageType.REVIVE, payload)
new TransportMessage(MessageType.REVIVE, payload)
case ChangeLocationResponse(status, location) =>
val builder = TransportMessages.PbChangeLocationResponse.newBuilder()
val builder = PbChangeLocationResponse.newBuilder()
.setStatus(status.getValue)
if (location != null) {
builder.setLocation(PartitionLocation.toPbPartitionLocation(location))
}
val payload = builder.build().toByteArray
new TransportMessage(TransportMessages.MessageType.CHANGE_LOCATION_RESPONSE, payload)
new TransportMessage(MessageType.CHANGE_LOCATION_RESPONSE, payload)
case MapperEnd(applicationId, shuffleId, mapId, attemptId, numMappers) =>
val payload = TransportMessages.PbMapperEnd.newBuilder()
val payload = PbMapperEnd.newBuilder()
.setApplicationId(applicationId)
.setShuffleId(shuffleId)
.setMapId(mapId)
.setAttemptId(attemptId)
.setNumMappers(numMappers)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.MAPPER_END, payload)
new TransportMessage(MessageType.MAPPER_END, payload)
case MapperEndResponse(status) =>
val payload = TransportMessages.PbMapperEndResponse.newBuilder()
val payload = PbMapperEndResponse.newBuilder()
.setStatus(status.getValue)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.MAPPER_END_RESPONSE, payload)
new TransportMessage(MessageType.MAPPER_END_RESPONSE, payload)
case GetReducerFileGroup(applicationId, shuffleId) =>
val payload = TransportMessages.PbGetReducerFileGroup.newBuilder()
val payload = PbGetReducerFileGroup.newBuilder()
.setApplicationId(applicationId).setShuffleId(shuffleId)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.GET_REDUCER_FILE_GROUP, payload)
new TransportMessage(MessageType.GET_REDUCER_FILE_GROUP, payload)
case GetReducerFileGroupResponse(status, fileGroup, attempts) =>
val builder = TransportMessages.PbGetReducerFileGroupResponse
val builder = PbGetReducerFileGroupResponse
.newBuilder()
.setStatus(status.getValue)
builder.addAllFileGroup(
@ -230,10 +230,10 @@ sealed trait Message extends Serializable {
.asJava)
builder.addAllAttempts(attempts.map(new Integer(_)).toIterable.asJava)
val payload = builder.build().toByteArray
new TransportMessage(TransportMessages.MessageType.GET_REDUCER_FILE_GROUP_RESPONSE, payload)
new TransportMessage(MessageType.GET_REDUCER_FILE_GROUP_RESPONSE, payload)
case WorkerLost(host, rpcPort, pushPort, fetchPort, replicatePort, requestId) =>
val payload = TransportMessages.PbWorkerLost.newBuilder()
val payload = PbWorkerLost.newBuilder()
.setHost(host)
.setRpcPort(rpcPort)
.setPushPort(pushPort)
@ -241,104 +241,104 @@ sealed trait Message extends Serializable {
.setReplicatePort(replicatePort)
.setRequestId(requestId)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.WORKER_LOST, payload)
new TransportMessage(MessageType.WORKER_LOST, payload)
case WorkerLostResponse(success) =>
val payload = TransportMessages.PbWorkerLostResponse.newBuilder()
val payload = PbWorkerLostResponse.newBuilder()
.setSuccess(success)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.WORKER_LOST_RESPONSE, payload)
new TransportMessage(MessageType.WORKER_LOST_RESPONSE, payload)
case StageEnd(applicationId, shuffleId) =>
val payload = TransportMessages.PbStageEnd.newBuilder()
val payload = PbStageEnd.newBuilder()
.setApplicationId(applicationId)
.setShuffleId(shuffleId)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.STAGE_END, payload)
new TransportMessage(MessageType.STAGE_END, payload)
case StageEndResponse(status) =>
val payload = TransportMessages.PbStageEndResponse.newBuilder()
val payload = PbStageEndResponse.newBuilder()
.setStatus(status.getValue)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.STAGE_END_RESPONSE, payload)
new TransportMessage(MessageType.STAGE_END_RESPONSE, payload)
case UnregisterShuffle(appId, shuffleId, requestId) =>
val payload = TransportMessages.PbUnregisterShuffle.newBuilder()
val payload = PbUnregisterShuffle.newBuilder()
.setAppId(appId).setShuffleId(shuffleId).setRequestId(requestId)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.UNREGISTER_SHUFFLE, payload)
new TransportMessage(MessageType.UNREGISTER_SHUFFLE, payload)
case UnregisterShuffleResponse(status) =>
val payload = TransportMessages.PbUnregisterShuffleResponse.newBuilder()
val payload = PbUnregisterShuffleResponse.newBuilder()
.setStatus(status.getValue)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.UNREGISTER_SHUFFLE_RESPONSE, payload)
new TransportMessage(MessageType.UNREGISTER_SHUFFLE_RESPONSE, payload)
case ApplicationLost(appId, requestId) =>
val payload = TransportMessages.PbApplicationLost.newBuilder()
val payload = PbApplicationLost.newBuilder()
.setAppId(appId).setRequestId(requestId)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.APPLICATION_LOST, payload)
new TransportMessage(MessageType.APPLICATION_LOST, payload)
case ApplicationLostResponse(status) =>
val payload = TransportMessages.PbApplicationLostResponse.newBuilder()
val payload = PbApplicationLostResponse.newBuilder()
.setStatus(status.getValue).build().toByteArray
new TransportMessage(TransportMessages.MessageType.APPLICATION_LOST_RESPONSE, payload)
new TransportMessage(MessageType.APPLICATION_LOST_RESPONSE, payload)
case HeartBeatFromApplication(appId, totalWritten, fileCount, requestId) =>
val payload = TransportMessages.PbHeartBeatFromApplication.newBuilder()
val payload = PbHeartBeatFromApplication.newBuilder()
.setAppId(appId)
.setRequestId(requestId)
.setTotalWritten(totalWritten)
.setFileCount(fileCount)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.HEARTBEAT_FROM_APPLICATION, payload)
new TransportMessage(MessageType.HEARTBEAT_FROM_APPLICATION, payload)
case GetBlacklist(localBlacklist) =>
val payload = TransportMessages.PbGetBlacklist.newBuilder()
val payload = PbGetBlacklist.newBuilder()
.addAllLocalBlackList(localBlacklist.asScala.map(WorkerInfo.toPbWorkerInfo)
.toList.asJava)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.GET_BLACKLIST, payload)
new TransportMessage(MessageType.GET_BLACKLIST, payload)
case GetBlacklistResponse(statusCode, blacklist, unknownWorkers) =>
val builder = TransportMessages.PbGetBlacklistResponse.newBuilder()
val builder = PbGetBlacklistResponse.newBuilder()
.setStatus(statusCode.getValue)
builder.addAllBlacklist(blacklist.asScala.map(WorkerInfo.toPbWorkerInfo).toList.asJava)
builder.addAllUnknownWorkers(
unknownWorkers.asScala.map(WorkerInfo.toPbWorkerInfo).toList.asJava)
val payload = builder.build().toByteArray
new TransportMessage(TransportMessages.MessageType.GET_BLACKLIST_RESPONSE, payload)
new TransportMessage(MessageType.GET_BLACKLIST_RESPONSE, payload)
case CheckAlive =>
new TransportMessage(
TransportMessages.MessageType.CHECK_ALIVE,
MessageType.CHECK_ALIVE,
PbCheckAlive.newBuilder().build().toByteArray)
case CheckAliveResponse(isAlive) =>
val payload = TransportMessages.PbCheckAliveResponse.newBuilder()
val payload = PbCheckAliveResponse.newBuilder()
.setIsAlive(isAlive)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.CHECK_ALIVE_RESPONSE, payload)
new TransportMessage(MessageType.CHECK_ALIVE_RESPONSE, payload)
case ReportWorkerFailure(failed, requestId) =>
val payload = TransportMessages.PbReportWorkerFailure.newBuilder()
val payload = PbReportWorkerFailure.newBuilder()
.addAllFailed(failed.asScala.map(WorkerInfo.toPbWorkerInfo(_)).toList.asJava)
.setRequestId(requestId).build().toByteArray
new TransportMessage(TransportMessages.MessageType.REPORT_WORKER_FAILURE, payload)
new TransportMessage(MessageType.REPORT_WORKER_FAILURE, payload)
case RegisterWorkerResponse(success, message) =>
val payload = TransportMessages.PbRegisterWorkerResponse.newBuilder()
val payload = PbRegisterWorkerResponse.newBuilder()
.setSuccess(success)
.setMessage(message)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.REGISTER_WORKER_RESPONSE, payload)
new TransportMessage(MessageType.REGISTER_WORKER_RESPONSE, payload)
case ReregisterWorkerResponse(success) =>
val payload = TransportMessages.PbReregisterWorkerResponse.newBuilder()
val payload = PbReregisterWorkerResponse.newBuilder()
.setSuccess(success)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.REREGISTER_WORKER_RESPONSE, payload)
new TransportMessage(MessageType.REREGISTER_WORKER_RESPONSE, payload)
case ReserveSlots(
applicationId,
@ -348,7 +348,7 @@ sealed trait Message extends Serializable {
splitThreshold,
splitMode,
partType) =>
val payload = TransportMessages.PbReserveSlots.newBuilder()
val payload = PbReserveSlots.newBuilder()
.setApplicationId(applicationId)
.setShuffleId(shuffleId)
.addAllMasterLocations(masterLocations.asScala
@ -359,23 +359,23 @@ sealed trait Message extends Serializable {
.setSplitMode(splitMode.getValue)
.setPartitionType(partType.getValue)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.RESERVE_SLOTS, payload)
new TransportMessage(MessageType.RESERVE_SLOTS, payload)
case ReserveSlotsResponse(status, reason) =>
val payload = TransportMessages.PbReserveSlotsResponse.newBuilder()
val payload = PbReserveSlotsResponse.newBuilder()
.setStatus(status.getValue).setReason(reason)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.RESERVE_SLOTS_RESPONSE, payload)
new TransportMessage(MessageType.RESERVE_SLOTS_RESPONSE, payload)
case CommitFiles(applicationId, shuffleId, masterIds, slaveIds, mapAttempts) =>
val payload = TransportMessages.PbCommitFiles.newBuilder()
val payload = PbCommitFiles.newBuilder()
.setApplicationId(applicationId)
.setShuffleId(shuffleId)
.addAllMasterIds(masterIds)
.addAllSlaveIds(slaveIds)
.addAllMapAttempts(mapAttempts.map(new Integer(_)).toIterable.asJava)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.COMMIT_FILES, payload)
new TransportMessage(MessageType.COMMIT_FILES, payload)
case CommitFilesResponse(
status,
@ -387,7 +387,7 @@ sealed trait Message extends Serializable {
committedSlaveStorageInfos,
totalWritten,
fileCount) =>
val builder = TransportMessages.PbCommitFilesResponse.newBuilder()
val builder = PbCommitFilesResponse.newBuilder()
.setStatus(status.getValue)
builder.addAllCommittedMasterIds(committedMasterIds)
builder.addAllCommittedSlaveIds(committedSlaveIds)
@ -400,58 +400,58 @@ sealed trait Message extends Serializable {
builder.setTotalWritten(totalWritten)
builder.setFileCount(fileCount)
val payload = builder.build().toByteArray
new TransportMessage(TransportMessages.MessageType.COMMIT_FILES_RESPONSE, payload)
new TransportMessage(MessageType.COMMIT_FILES_RESPONSE, payload)
case Destroy(shuffleKey, masterLocations, slaveLocations) =>
val payload = TransportMessages.PbDestroy.newBuilder()
val payload = PbDestroy.newBuilder()
.setShuffleKey(shuffleKey)
.addAllMasterLocations(masterLocations)
.addAllSlaveLocation(slaveLocations)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.DESTROY, payload)
new TransportMessage(MessageType.DESTROY, payload)
case DestroyResponse(status, failedMasters, failedSlaves) =>
val builder = TransportMessages.PbDestroyResponse.newBuilder()
val builder = PbDestroyResponse.newBuilder()
.setStatus(status.getValue)
builder.addAllFailedMasters(failedMasters)
builder.addAllFailedSlaves(failedSlaves)
val payload = builder.build().toByteArray
new TransportMessage(TransportMessages.MessageType.DESTROY_RESPONSE, payload)
new TransportMessage(MessageType.DESTROY_RESPONSE, payload)
case SlaveLostResponse(status, slaveLocation) =>
val payload = TransportMessages.PbSlaveLostResponse.newBuilder()
val payload = PbSlaveLostResponse.newBuilder()
.setStatus(status.getValue)
.setSlaveLocation(PartitionLocation.toPbPartitionLocation(slaveLocation))
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.SLAVE_LOST_RESPONSE, payload)
new TransportMessage(MessageType.SLAVE_LOST_RESPONSE, payload)
case GetWorkerInfos =>
new TransportMessage(TransportMessages.MessageType.GET_WORKER_INFO, null)
new TransportMessage(MessageType.GET_WORKER_INFO, null)
case GetWorkerInfosResponse(status, workerInfos @ _*) =>
val payload = TransportMessages.PbGetWorkerInfosResponse.newBuilder()
val payload = PbGetWorkerInfosResponse.newBuilder()
.setStatus(status.getValue)
.addAllWorkerInfos(workerInfos.map(WorkerInfo.toPbWorkerInfo(_)).toList.asJava)
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.GET_WORKER_INFO_RESPONSE, payload)
new TransportMessage(MessageType.GET_WORKER_INFO_RESPONSE, payload)
case ThreadDump =>
new TransportMessage(TransportMessages.MessageType.THREAD_DUMP, null)
new TransportMessage(MessageType.THREAD_DUMP, null)
case ThreadDumpResponse(threadDump) =>
val payload = TransportMessages.PbThreadDumpResponse.newBuilder()
val payload = PbThreadDumpResponse.newBuilder()
.setThreadDump(threadDump).build().toByteArray
new TransportMessage(TransportMessages.MessageType.THREAD_DUMP_RESPONSE, payload)
new TransportMessage(MessageType.THREAD_DUMP_RESPONSE, payload)
case PartitionSplit(applicationId, shuffleId, partitionId, epoch, oldPartition) =>
val payload = TransportMessages.PbPartitionSplit.newBuilder()
val payload = PbPartitionSplit.newBuilder()
.setApplicationId(applicationId).setShuffleId(shuffleId).setPartitionId(partitionId)
.setEpoch(epoch).setOldPartition(PartitionLocation.toPbPartitionLocation(oldPartition))
.build().toByteArray
new TransportMessage(TransportMessages.MessageType.PARTITION_SPLIT, payload)
new TransportMessage(MessageType.PARTITION_SPLIT, payload)
case OneWayMessageResponse =>
new TransportMessage(TransportMessages.MessageType.ONE_WAY_MESSAGE_RESPONSE, null)
new TransportMessage(MessageType.ONE_WAY_MESSAGE_RESPONSE, null)
}
}
}

View File

@ -43,7 +43,7 @@ import com.aliyun.emr.rss.common.meta.{DiskStatus, WorkerInfo}
import com.aliyun.emr.rss.common.network.protocol.TransportMessage
import com.aliyun.emr.rss.common.network.util.{ConfigProvider, JavaUtils, TransportConf}
import com.aliyun.emr.rss.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType}
import com.aliyun.emr.rss.common.protocol.TransportMessages.PbWorkerResource
import com.aliyun.emr.rss.common.protocol.PbWorkerResource
import com.aliyun.emr.rss.common.protocol.message.{ControlMessages, Message, StatusCode}
import com.aliyun.emr.rss.common.protocol.message.ControlMessages.WorkerResource
@ -726,7 +726,7 @@ object Utils extends Logging {
def toTransportMessage(message: Any): Any = {
message match {
case transportMessage: Message =>
transportMessage.toTransportMessage()
transportMessage.toTransportMessage
case _ =>
message
}