diff --git a/common/src/main/java/com/aliyun/emr/rss/common/network/protocol/TransportMessage.java b/common/src/main/java/com/aliyun/emr/rss/common/network/protocol/TransportMessage.java index e54ebfaaa..3f318d238 100644 --- a/common/src/main/java/com/aliyun/emr/rss/common/network/protocol/TransportMessage.java +++ b/common/src/main/java/com/aliyun/emr/rss/common/network/protocol/TransportMessage.java @@ -19,19 +19,19 @@ package com.aliyun.emr.rss.common.network.protocol; import java.io.Serializable; -import com.aliyun.emr.rss.common.protocol.TransportMessages; +import com.aliyun.emr.rss.common.protocol.MessageType; public class TransportMessage implements Serializable { private static final long serialVersionUID = -3259000920699629773L; - private final TransportMessages.MessageType type; + private final MessageType type; private final byte[] payload; - public TransportMessage(TransportMessages.MessageType type, byte[] payload) { + public TransportMessage(MessageType type, byte[] payload) { this.type = type; this.payload = payload; } - public TransportMessages.MessageType getType() { + public MessageType getType() { return type; } diff --git a/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java b/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java index 653d16154..def5615ac 100644 --- a/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java +++ b/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java @@ -20,7 +20,6 @@ package com.aliyun.emr.rss.common.protocol; import java.io.Serializable; import com.aliyun.emr.rss.common.meta.WorkerInfo; -import com.aliyun.emr.rss.common.protocol.TransportMessages.PbPartitionLocation; public class PartitionLocation implements Serializable { public enum Mode { @@ -330,7 +329,7 @@ public class PartitionLocation implements Serializable { } public static PbPartitionLocation toPbPartitionLocation(PartitionLocation location) { - PbPartitionLocation.Builder builder = TransportMessages.PbPartitionLocation.newBuilder(); + PbPartitionLocation.Builder builder = PbPartitionLocation.newBuilder(); if (location.mode == Mode.Master) { builder.setMode(PbPartitionLocation.Mode.Master); } else { @@ -346,7 +345,7 @@ public class PartitionLocation implements Serializable { builder.setStorageInfo(StorageInfo.toPb(location.storageInfo)); if (location.getPeer() != null) { - PbPartitionLocation.Builder peerBuilder = TransportMessages.PbPartitionLocation.newBuilder(); + PbPartitionLocation.Builder peerBuilder = PbPartitionLocation.newBuilder(); if (location.getPeer().mode == Mode.Master) { peerBuilder.setMode(PbPartitionLocation.Mode.Master); } else { diff --git a/common/src/main/java/com/aliyun/emr/rss/common/protocol/StorageInfo.java b/common/src/main/java/com/aliyun/emr/rss/common/protocol/StorageInfo.java index c922054a4..b5eeac616 100644 --- a/common/src/main/java/com/aliyun/emr/rss/common/protocol/StorageInfo.java +++ b/common/src/main/java/com/aliyun/emr/rss/common/protocol/StorageInfo.java @@ -21,8 +21,6 @@ import java.io.Serializable; import java.util.HashMap; import java.util.Map; -import com.aliyun.emr.rss.common.protocol.TransportMessages.PbStorageInfo; - public class StorageInfo implements Serializable { public static String UNKNOWN_DISK = "UNKNOWN_DISK"; diff --git a/common/src/main/java/com/aliyun/emr/rss/common/util/PBSerDeUtils.java b/common/src/main/java/com/aliyun/emr/rss/common/util/PBSerDeUtils.java index 4a4346773..c79b2365d 100644 --- a/common/src/main/java/com/aliyun/emr/rss/common/util/PBSerDeUtils.java +++ b/common/src/main/java/com/aliyun/emr/rss/common/util/PBSerDeUtils.java @@ -25,29 +25,26 @@ import java.util.concurrent.ConcurrentHashMap; import com.google.protobuf.InvalidProtocolBufferException; import com.aliyun.emr.rss.common.meta.FileInfo; -import com.aliyun.emr.rss.common.protocol.TransportMessages; +import com.aliyun.emr.rss.common.protocol.*; public class PBSerDeUtils { public static Set fromPbSortedShuffleFileSet(byte[] data) throws InvalidProtocolBufferException { - TransportMessages.PbSortedShuffleFileSet pbSortedShuffleFileSet = - TransportMessages.PbSortedShuffleFileSet.parseFrom(data); + PbSortedShuffleFileSet pbSortedShuffleFileSet = PbSortedShuffleFileSet.parseFrom(data); Set files = ConcurrentHashMap.newKeySet(); files.addAll(pbSortedShuffleFileSet.getFilesList()); return files; } public static byte[] toPbSortedShuffleFileSet(Set files) { - TransportMessages.PbSortedShuffleFileSet.Builder builder = - TransportMessages.PbSortedShuffleFileSet.newBuilder(); + PbSortedShuffleFileSet.Builder builder = PbSortedShuffleFileSet.newBuilder(); builder.addAllFiles(files); return builder.build().toByteArray(); } public static ArrayList fromPbStoreVersion(byte[] data) throws InvalidProtocolBufferException { - TransportMessages.PbStoreVersion pbStoreVersion = - TransportMessages.PbStoreVersion.parseFrom(data); + PbStoreVersion pbStoreVersion = PbStoreVersion.parseFrom(data); ArrayList versions = new ArrayList<>(); versions.add(pbStoreVersion.getMajor()); versions.add(pbStoreVersion.getMinor()); @@ -55,40 +52,37 @@ public class PBSerDeUtils { } public static byte[] toPbStoreVersion(int major, int minor) { - TransportMessages.PbStoreVersion.Builder builder = - TransportMessages.PbStoreVersion.newBuilder(); + PbStoreVersion.Builder builder = PbStoreVersion.newBuilder(); builder.setMajor(major); builder.setMinor(minor); return builder.build().toByteArray(); } - public static FileInfo fromPbFileInfo(TransportMessages.PbFileInfo pbFileInfo) + public static FileInfo fromPbFileInfo(PbFileInfo pbFileInfo) throws InvalidProtocolBufferException { return new FileInfo( pbFileInfo.getFilePath(), new ArrayList<>(pbFileInfo.getChunkOffsetsList())); } - public static TransportMessages.PbFileInfo toPbFileInfo(FileInfo fileInfo) { - TransportMessages.PbFileInfo.Builder builder = TransportMessages.PbFileInfo.newBuilder(); + public static PbFileInfo toPbFileInfo(FileInfo fileInfo) { + PbFileInfo.Builder builder = PbFileInfo.newBuilder(); builder.setFilePath(fileInfo.getFilePath()).addAllChunkOffsets(fileInfo.getChunkOffsets()); return builder.build(); } public static ConcurrentHashMap fromPbFileInfoMap(byte[] data) throws InvalidProtocolBufferException { - TransportMessages.PBFileInfoMap pbFileInfoMap = TransportMessages.PBFileInfoMap.parseFrom(data); + PBFileInfoMap pbFileInfoMap = PBFileInfoMap.parseFrom(data); ConcurrentHashMap fileInfoMap = new ConcurrentHashMap<>(); - for (Map.Entry entry : - pbFileInfoMap.getValuesMap().entrySet()) { + for (Map.Entry entry : pbFileInfoMap.getValuesMap().entrySet()) { fileInfoMap.put(entry.getKey(), fromPbFileInfo(entry.getValue())); } return fileInfoMap; } public static byte[] toPbFileInfoMap(ConcurrentHashMap fileInfoMap) { - TransportMessages.PBFileInfoMap.Builder builder = TransportMessages.PBFileInfoMap.newBuilder(); - ConcurrentHashMap pbFileInfoMap = - new ConcurrentHashMap<>(); + PBFileInfoMap.Builder builder = PBFileInfoMap.newBuilder(); + ConcurrentHashMap pbFileInfoMap = new ConcurrentHashMap<>(); for (Map.Entry entry : fileInfoMap.entrySet()) { pbFileInfoMap.put(entry.getKey(), toPbFileInfo(entry.getValue())); } diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 6bbd62456..66c9dc3da 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -15,7 +15,7 @@ syntax = "proto3"; option java_package = "com.aliyun.emr.rss.common.protocol"; -option java_outer_classname = "TransportMessages"; +option java_multiple_files = true; enum MessageType { UNKNOWN_MESSAGE = 0; diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/meta/WorkerInfo.scala b/common/src/main/scala/com/aliyun/emr/rss/common/meta/WorkerInfo.scala index 7e75eb76d..d7acb7b49 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/meta/WorkerInfo.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/meta/WorkerInfo.scala @@ -23,7 +23,7 @@ import java.util.Objects import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, mapAsScalaMapConverter} import com.aliyun.emr.rss.common.internal.Logging -import com.aliyun.emr.rss.common.protocol.TransportMessages.{PbDiskInfo, PbWorkerInfo} +import com.aliyun.emr.rss.common.protocol.{PbDiskInfo, PbWorkerInfo} import com.aliyun.emr.rss.common.rpc.RpcEndpointRef import com.aliyun.emr.rss.common.rpc.netty.NettyRpcEndpointRef diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala b/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala index 26217d8d6..d64a9efb3 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala @@ -25,23 +25,23 @@ import scala.collection.JavaConverters._ import com.aliyun.emr.rss.common.internal.Logging import com.aliyun.emr.rss.common.meta.{DiskInfo, WorkerInfo} import com.aliyun.emr.rss.common.network.protocol.TransportMessage -import com.aliyun.emr.rss.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType, StorageInfo, TransportMessages} -import com.aliyun.emr.rss.common.protocol.TransportMessages._ -import com.aliyun.emr.rss.common.protocol.TransportMessages.MessageType._ +import com.aliyun.emr.rss.common.protocol._ +import com.aliyun.emr.rss.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType, StorageInfo} +import com.aliyun.emr.rss.common.protocol.MessageType._ import com.aliyun.emr.rss.common.util.Utils sealed trait Message extends Serializable { import com.aliyun.emr.rss.common.protocol.message.ControlMessages._ - def toTransportMessage(): TransportMessage = { + def toTransportMessage: TransportMessage = { this match { case CheckForWorkerTimeOut => - new TransportMessage(TransportMessages.MessageType.CHECK_FOR_WORKER_TIMEOUT, null) + new TransportMessage(MessageType.CHECK_FOR_WORKER_TIMEOUT, null) case CheckForApplicationTimeOut => - new TransportMessage(TransportMessages.MessageType.CHECK_FOR_APPLICATION_TIMEOUT, null) + new TransportMessage(MessageType.CHECK_FOR_APPLICATION_TIMEOUT, null) case RemoveExpiredShuffle => - new TransportMessage(TransportMessages.MessageType.REMOVE_EXPIRED_SHUFFLE, null) + new TransportMessage(MessageType.REMOVE_EXPIRED_SHUFFLE, null) case RegisterWorker(host, rpcPort, pushPort, fetchPort, replicatePort, disks, requestId) => val pbDisks = disks.asScala @@ -52,7 +52,7 @@ sealed trait Message extends Serializable { .setAvgFlushTime(item._2.avgFlushTime) .setUsedSlots(item._2.activeSlots) .build()).toMap.asJava - val payload = TransportMessages.PbRegisterWorker.newBuilder() + val payload = PbRegisterWorker.newBuilder() .setHost(host) .setRpcPort(rpcPort) .setPushPort(pushPort) @@ -61,7 +61,7 @@ sealed trait Message extends Serializable { .putAllDisks(pbDisks) .setRequestId(requestId) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.REGISTER_WORKER, payload) + new TransportMessage(MessageType.REGISTER_WORKER, payload) case HeartbeatFromWorker( host, @@ -81,7 +81,7 @@ sealed trait Message extends Serializable { .setUsedSlots(item._2.activeSlots) .setStatus(item._2.status.getValue) .build()).toMap.asJava - val payload = TransportMessages.PbHeartbeatFromWorker.newBuilder() + val payload = PbHeartbeatFromWorker.newBuilder() .setHost(host) .setRpcPort(rpcPort) .setPushPort(pushPort) @@ -91,33 +91,33 @@ sealed trait Message extends Serializable { .addAllShuffleKeys(shuffleKeys) .setRequestId(requestId) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.HEARTBEAT_FROM_WORKER, payload) + new TransportMessage(MessageType.HEARTBEAT_FROM_WORKER, payload) case HeartbeatResponse(expiredShuffleKeys, registered) => - val payload = TransportMessages.PbHeartbeatResponse.newBuilder() + val payload = PbHeartbeatResponse.newBuilder() .addAllExpiredShuffleKeys(expiredShuffleKeys) .setRegistered(registered) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.HEARTBEAT_RESPONSE, payload) + new TransportMessage(MessageType.HEARTBEAT_RESPONSE, payload) case RegisterShuffle(applicationId, shuffleId, numMappers, numPartitions) => - val payload = TransportMessages.PbRegisterShuffle.newBuilder() + val payload = PbRegisterShuffle.newBuilder() .setApplicationId(applicationId) .setShuffleId(shuffleId) .setNumMapppers(numMappers) .setNumPartitions(numPartitions) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.REGISTER_SHUFFLE, payload) + new TransportMessage(MessageType.REGISTER_SHUFFLE, payload) case RegisterShuffleResponse(status, partitionLocations) => - val builder = TransportMessages.PbRegisterShuffleResponse.newBuilder() + val builder = PbRegisterShuffleResponse.newBuilder() .setStatus(status.getValue) if (!partitionLocations.isEmpty) { builder.addAllPartitionLocations(partitionLocations.iterator().asScala .map(PartitionLocation.toPbPartitionLocation).toList.asJava) } val payload = builder.build().toByteArray - new TransportMessage(TransportMessages.MessageType.REGISTER_SHUFFLE_RESPONSE, payload) + new TransportMessage(MessageType.REGISTER_SHUFFLE_RESPONSE, payload) case RequestSlots( applicationId, @@ -126,7 +126,7 @@ sealed trait Message extends Serializable { hostname, shouldReplicate, requestId) => - val payload = TransportMessages.PbRequestSlots.newBuilder() + val payload = PbRequestSlots.newBuilder() .setApplicationId(applicationId) .setShuffleId(shuffleId) .addAllPartitionIdList(partitionIdList) @@ -134,34 +134,34 @@ sealed trait Message extends Serializable { .setShouldReplicate(shouldReplicate) .setRequestId(requestId) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.REQUEST_SLOTS, payload) + new TransportMessage(MessageType.REQUEST_SLOTS, payload) case ReleaseSlots(applicationId, shuffleId, workerIds, slots, requestId) => val pbSlots = slots.asScala.map(slot => PbSlotInfo.newBuilder().putAllSlot(slot).build()).toList - val payload = TransportMessages.PbReleaseSlots.newBuilder() + val payload = PbReleaseSlots.newBuilder() .setApplicationId(applicationId) .setShuffleId(shuffleId) .setRequestId(requestId) .addAllWorkerIds(workerIds) .addAllSlots(pbSlots.asJava) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.RELEASE_SLOTS, payload) + new TransportMessage(MessageType.RELEASE_SLOTS, payload) case ReleaseSlotsResponse(status) => - val payload = TransportMessages.PbReleaseSlotsResponse.newBuilder() + val payload = PbReleaseSlotsResponse.newBuilder() .setStatus(status.getValue).build().toByteArray - new TransportMessage(TransportMessages.MessageType.RELEASE_SLOTS_RESPONSE, payload) + new TransportMessage(MessageType.RELEASE_SLOTS_RESPONSE, payload) case RequestSlotsResponse(status, workerResource) => - val builder = TransportMessages.PbRequestSlotsResponse.newBuilder() + val builder = PbRequestSlotsResponse.newBuilder() .setStatus(status.getValue) if (!workerResource.isEmpty) { builder.putAllWorkerResource( Utils.convertWorkerResourceToPbWorkerResource(workerResource)) } val payload = builder.build().toByteArray - new TransportMessage(TransportMessages.MessageType.REQUEST_SLOTS_RESPONSE, payload) + new TransportMessage(MessageType.REQUEST_SLOTS_RESPONSE, payload) case Revive( applicationId, @@ -172,7 +172,7 @@ sealed trait Message extends Serializable { epoch, oldPartition, cause) => - val builder = TransportMessages.PbRevive.newBuilder() + val builder = PbRevive.newBuilder() builder.setApplicationId(applicationId) .setShuffleId(shuffleId) .setMapId(mapId) @@ -184,41 +184,41 @@ sealed trait Message extends Serializable { builder.setOldPartition(PartitionLocation.toPbPartitionLocation(oldPartition)) } val payload = builder.build().toByteArray - new TransportMessage(TransportMessages.MessageType.REVIVE, payload) + new TransportMessage(MessageType.REVIVE, payload) case ChangeLocationResponse(status, location) => - val builder = TransportMessages.PbChangeLocationResponse.newBuilder() + val builder = PbChangeLocationResponse.newBuilder() .setStatus(status.getValue) if (location != null) { builder.setLocation(PartitionLocation.toPbPartitionLocation(location)) } val payload = builder.build().toByteArray - new TransportMessage(TransportMessages.MessageType.CHANGE_LOCATION_RESPONSE, payload) + new TransportMessage(MessageType.CHANGE_LOCATION_RESPONSE, payload) case MapperEnd(applicationId, shuffleId, mapId, attemptId, numMappers) => - val payload = TransportMessages.PbMapperEnd.newBuilder() + val payload = PbMapperEnd.newBuilder() .setApplicationId(applicationId) .setShuffleId(shuffleId) .setMapId(mapId) .setAttemptId(attemptId) .setNumMappers(numMappers) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.MAPPER_END, payload) + new TransportMessage(MessageType.MAPPER_END, payload) case MapperEndResponse(status) => - val payload = TransportMessages.PbMapperEndResponse.newBuilder() + val payload = PbMapperEndResponse.newBuilder() .setStatus(status.getValue) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.MAPPER_END_RESPONSE, payload) + new TransportMessage(MessageType.MAPPER_END_RESPONSE, payload) case GetReducerFileGroup(applicationId, shuffleId) => - val payload = TransportMessages.PbGetReducerFileGroup.newBuilder() + val payload = PbGetReducerFileGroup.newBuilder() .setApplicationId(applicationId).setShuffleId(shuffleId) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.GET_REDUCER_FILE_GROUP, payload) + new TransportMessage(MessageType.GET_REDUCER_FILE_GROUP, payload) case GetReducerFileGroupResponse(status, fileGroup, attempts) => - val builder = TransportMessages.PbGetReducerFileGroupResponse + val builder = PbGetReducerFileGroupResponse .newBuilder() .setStatus(status.getValue) builder.addAllFileGroup( @@ -230,10 +230,10 @@ sealed trait Message extends Serializable { .asJava) builder.addAllAttempts(attempts.map(new Integer(_)).toIterable.asJava) val payload = builder.build().toByteArray - new TransportMessage(TransportMessages.MessageType.GET_REDUCER_FILE_GROUP_RESPONSE, payload) + new TransportMessage(MessageType.GET_REDUCER_FILE_GROUP_RESPONSE, payload) case WorkerLost(host, rpcPort, pushPort, fetchPort, replicatePort, requestId) => - val payload = TransportMessages.PbWorkerLost.newBuilder() + val payload = PbWorkerLost.newBuilder() .setHost(host) .setRpcPort(rpcPort) .setPushPort(pushPort) @@ -241,104 +241,104 @@ sealed trait Message extends Serializable { .setReplicatePort(replicatePort) .setRequestId(requestId) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.WORKER_LOST, payload) + new TransportMessage(MessageType.WORKER_LOST, payload) case WorkerLostResponse(success) => - val payload = TransportMessages.PbWorkerLostResponse.newBuilder() + val payload = PbWorkerLostResponse.newBuilder() .setSuccess(success) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.WORKER_LOST_RESPONSE, payload) + new TransportMessage(MessageType.WORKER_LOST_RESPONSE, payload) case StageEnd(applicationId, shuffleId) => - val payload = TransportMessages.PbStageEnd.newBuilder() + val payload = PbStageEnd.newBuilder() .setApplicationId(applicationId) .setShuffleId(shuffleId) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.STAGE_END, payload) + new TransportMessage(MessageType.STAGE_END, payload) case StageEndResponse(status) => - val payload = TransportMessages.PbStageEndResponse.newBuilder() + val payload = PbStageEndResponse.newBuilder() .setStatus(status.getValue) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.STAGE_END_RESPONSE, payload) + new TransportMessage(MessageType.STAGE_END_RESPONSE, payload) case UnregisterShuffle(appId, shuffleId, requestId) => - val payload = TransportMessages.PbUnregisterShuffle.newBuilder() + val payload = PbUnregisterShuffle.newBuilder() .setAppId(appId).setShuffleId(shuffleId).setRequestId(requestId) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.UNREGISTER_SHUFFLE, payload) + new TransportMessage(MessageType.UNREGISTER_SHUFFLE, payload) case UnregisterShuffleResponse(status) => - val payload = TransportMessages.PbUnregisterShuffleResponse.newBuilder() + val payload = PbUnregisterShuffleResponse.newBuilder() .setStatus(status.getValue) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.UNREGISTER_SHUFFLE_RESPONSE, payload) + new TransportMessage(MessageType.UNREGISTER_SHUFFLE_RESPONSE, payload) case ApplicationLost(appId, requestId) => - val payload = TransportMessages.PbApplicationLost.newBuilder() + val payload = PbApplicationLost.newBuilder() .setAppId(appId).setRequestId(requestId) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.APPLICATION_LOST, payload) + new TransportMessage(MessageType.APPLICATION_LOST, payload) case ApplicationLostResponse(status) => - val payload = TransportMessages.PbApplicationLostResponse.newBuilder() + val payload = PbApplicationLostResponse.newBuilder() .setStatus(status.getValue).build().toByteArray - new TransportMessage(TransportMessages.MessageType.APPLICATION_LOST_RESPONSE, payload) + new TransportMessage(MessageType.APPLICATION_LOST_RESPONSE, payload) case HeartBeatFromApplication(appId, totalWritten, fileCount, requestId) => - val payload = TransportMessages.PbHeartBeatFromApplication.newBuilder() + val payload = PbHeartBeatFromApplication.newBuilder() .setAppId(appId) .setRequestId(requestId) .setTotalWritten(totalWritten) .setFileCount(fileCount) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.HEARTBEAT_FROM_APPLICATION, payload) + new TransportMessage(MessageType.HEARTBEAT_FROM_APPLICATION, payload) case GetBlacklist(localBlacklist) => - val payload = TransportMessages.PbGetBlacklist.newBuilder() + val payload = PbGetBlacklist.newBuilder() .addAllLocalBlackList(localBlacklist.asScala.map(WorkerInfo.toPbWorkerInfo) .toList.asJava) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.GET_BLACKLIST, payload) + new TransportMessage(MessageType.GET_BLACKLIST, payload) case GetBlacklistResponse(statusCode, blacklist, unknownWorkers) => - val builder = TransportMessages.PbGetBlacklistResponse.newBuilder() + val builder = PbGetBlacklistResponse.newBuilder() .setStatus(statusCode.getValue) builder.addAllBlacklist(blacklist.asScala.map(WorkerInfo.toPbWorkerInfo).toList.asJava) builder.addAllUnknownWorkers( unknownWorkers.asScala.map(WorkerInfo.toPbWorkerInfo).toList.asJava) val payload = builder.build().toByteArray - new TransportMessage(TransportMessages.MessageType.GET_BLACKLIST_RESPONSE, payload) + new TransportMessage(MessageType.GET_BLACKLIST_RESPONSE, payload) case CheckAlive => new TransportMessage( - TransportMessages.MessageType.CHECK_ALIVE, + MessageType.CHECK_ALIVE, PbCheckAlive.newBuilder().build().toByteArray) case CheckAliveResponse(isAlive) => - val payload = TransportMessages.PbCheckAliveResponse.newBuilder() + val payload = PbCheckAliveResponse.newBuilder() .setIsAlive(isAlive) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.CHECK_ALIVE_RESPONSE, payload) + new TransportMessage(MessageType.CHECK_ALIVE_RESPONSE, payload) case ReportWorkerFailure(failed, requestId) => - val payload = TransportMessages.PbReportWorkerFailure.newBuilder() + val payload = PbReportWorkerFailure.newBuilder() .addAllFailed(failed.asScala.map(WorkerInfo.toPbWorkerInfo(_)).toList.asJava) .setRequestId(requestId).build().toByteArray - new TransportMessage(TransportMessages.MessageType.REPORT_WORKER_FAILURE, payload) + new TransportMessage(MessageType.REPORT_WORKER_FAILURE, payload) case RegisterWorkerResponse(success, message) => - val payload = TransportMessages.PbRegisterWorkerResponse.newBuilder() + val payload = PbRegisterWorkerResponse.newBuilder() .setSuccess(success) .setMessage(message) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.REGISTER_WORKER_RESPONSE, payload) + new TransportMessage(MessageType.REGISTER_WORKER_RESPONSE, payload) case ReregisterWorkerResponse(success) => - val payload = TransportMessages.PbReregisterWorkerResponse.newBuilder() + val payload = PbReregisterWorkerResponse.newBuilder() .setSuccess(success) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.REREGISTER_WORKER_RESPONSE, payload) + new TransportMessage(MessageType.REREGISTER_WORKER_RESPONSE, payload) case ReserveSlots( applicationId, @@ -348,7 +348,7 @@ sealed trait Message extends Serializable { splitThreshold, splitMode, partType) => - val payload = TransportMessages.PbReserveSlots.newBuilder() + val payload = PbReserveSlots.newBuilder() .setApplicationId(applicationId) .setShuffleId(shuffleId) .addAllMasterLocations(masterLocations.asScala @@ -359,23 +359,23 @@ sealed trait Message extends Serializable { .setSplitMode(splitMode.getValue) .setPartitionType(partType.getValue) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.RESERVE_SLOTS, payload) + new TransportMessage(MessageType.RESERVE_SLOTS, payload) case ReserveSlotsResponse(status, reason) => - val payload = TransportMessages.PbReserveSlotsResponse.newBuilder() + val payload = PbReserveSlotsResponse.newBuilder() .setStatus(status.getValue).setReason(reason) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.RESERVE_SLOTS_RESPONSE, payload) + new TransportMessage(MessageType.RESERVE_SLOTS_RESPONSE, payload) case CommitFiles(applicationId, shuffleId, masterIds, slaveIds, mapAttempts) => - val payload = TransportMessages.PbCommitFiles.newBuilder() + val payload = PbCommitFiles.newBuilder() .setApplicationId(applicationId) .setShuffleId(shuffleId) .addAllMasterIds(masterIds) .addAllSlaveIds(slaveIds) .addAllMapAttempts(mapAttempts.map(new Integer(_)).toIterable.asJava) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.COMMIT_FILES, payload) + new TransportMessage(MessageType.COMMIT_FILES, payload) case CommitFilesResponse( status, @@ -387,7 +387,7 @@ sealed trait Message extends Serializable { committedSlaveStorageInfos, totalWritten, fileCount) => - val builder = TransportMessages.PbCommitFilesResponse.newBuilder() + val builder = PbCommitFilesResponse.newBuilder() .setStatus(status.getValue) builder.addAllCommittedMasterIds(committedMasterIds) builder.addAllCommittedSlaveIds(committedSlaveIds) @@ -400,58 +400,58 @@ sealed trait Message extends Serializable { builder.setTotalWritten(totalWritten) builder.setFileCount(fileCount) val payload = builder.build().toByteArray - new TransportMessage(TransportMessages.MessageType.COMMIT_FILES_RESPONSE, payload) + new TransportMessage(MessageType.COMMIT_FILES_RESPONSE, payload) case Destroy(shuffleKey, masterLocations, slaveLocations) => - val payload = TransportMessages.PbDestroy.newBuilder() + val payload = PbDestroy.newBuilder() .setShuffleKey(shuffleKey) .addAllMasterLocations(masterLocations) .addAllSlaveLocation(slaveLocations) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.DESTROY, payload) + new TransportMessage(MessageType.DESTROY, payload) case DestroyResponse(status, failedMasters, failedSlaves) => - val builder = TransportMessages.PbDestroyResponse.newBuilder() + val builder = PbDestroyResponse.newBuilder() .setStatus(status.getValue) builder.addAllFailedMasters(failedMasters) builder.addAllFailedSlaves(failedSlaves) val payload = builder.build().toByteArray - new TransportMessage(TransportMessages.MessageType.DESTROY_RESPONSE, payload) + new TransportMessage(MessageType.DESTROY_RESPONSE, payload) case SlaveLostResponse(status, slaveLocation) => - val payload = TransportMessages.PbSlaveLostResponse.newBuilder() + val payload = PbSlaveLostResponse.newBuilder() .setStatus(status.getValue) .setSlaveLocation(PartitionLocation.toPbPartitionLocation(slaveLocation)) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.SLAVE_LOST_RESPONSE, payload) + new TransportMessage(MessageType.SLAVE_LOST_RESPONSE, payload) case GetWorkerInfos => - new TransportMessage(TransportMessages.MessageType.GET_WORKER_INFO, null) + new TransportMessage(MessageType.GET_WORKER_INFO, null) case GetWorkerInfosResponse(status, workerInfos @ _*) => - val payload = TransportMessages.PbGetWorkerInfosResponse.newBuilder() + val payload = PbGetWorkerInfosResponse.newBuilder() .setStatus(status.getValue) .addAllWorkerInfos(workerInfos.map(WorkerInfo.toPbWorkerInfo(_)).toList.asJava) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.GET_WORKER_INFO_RESPONSE, payload) + new TransportMessage(MessageType.GET_WORKER_INFO_RESPONSE, payload) case ThreadDump => - new TransportMessage(TransportMessages.MessageType.THREAD_DUMP, null) + new TransportMessage(MessageType.THREAD_DUMP, null) case ThreadDumpResponse(threadDump) => - val payload = TransportMessages.PbThreadDumpResponse.newBuilder() + val payload = PbThreadDumpResponse.newBuilder() .setThreadDump(threadDump).build().toByteArray - new TransportMessage(TransportMessages.MessageType.THREAD_DUMP_RESPONSE, payload) + new TransportMessage(MessageType.THREAD_DUMP_RESPONSE, payload) case PartitionSplit(applicationId, shuffleId, partitionId, epoch, oldPartition) => - val payload = TransportMessages.PbPartitionSplit.newBuilder() + val payload = PbPartitionSplit.newBuilder() .setApplicationId(applicationId).setShuffleId(shuffleId).setPartitionId(partitionId) .setEpoch(epoch).setOldPartition(PartitionLocation.toPbPartitionLocation(oldPartition)) .build().toByteArray - new TransportMessage(TransportMessages.MessageType.PARTITION_SPLIT, payload) + new TransportMessage(MessageType.PARTITION_SPLIT, payload) case OneWayMessageResponse => - new TransportMessage(TransportMessages.MessageType.ONE_WAY_MESSAGE_RESPONSE, null) + new TransportMessage(MessageType.ONE_WAY_MESSAGE_RESPONSE, null) } } } diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/util/Utils.scala b/common/src/main/scala/com/aliyun/emr/rss/common/util/Utils.scala index 9b33086ca..2182a93b0 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/util/Utils.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/util/Utils.scala @@ -43,7 +43,7 @@ import com.aliyun.emr.rss.common.meta.{DiskStatus, WorkerInfo} import com.aliyun.emr.rss.common.network.protocol.TransportMessage import com.aliyun.emr.rss.common.network.util.{ConfigProvider, JavaUtils, TransportConf} import com.aliyun.emr.rss.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType} -import com.aliyun.emr.rss.common.protocol.TransportMessages.PbWorkerResource +import com.aliyun.emr.rss.common.protocol.PbWorkerResource import com.aliyun.emr.rss.common.protocol.message.{ControlMessages, Message, StatusCode} import com.aliyun.emr.rss.common.protocol.message.ControlMessages.WorkerResource @@ -726,7 +726,7 @@ object Utils extends Logging { def toTransportMessage(message: Any): Any = { message match { case transportMessage: Message => - transportMessage.toTransportMessage() + transportMessage.toTransportMessage case _ => message }