diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index e6bfd9372..4b1137919 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -53,8 +53,8 @@ enum MessageType { DESTROY = 32; DESTROY_RESPONSE = 33; // SLAVE_LOST_RESPONSE = 34; - GET_WORKER_INFO = 35; - GET_WORKER_INFO_RESPONSE = 36; + // GET_WORKER_INFO = 35; + // GET_WORKER_INFO_RESPONSE = 36; // THREAD_DUMP = 37; // THREAD_DUMP_RESPONSE = 38; REMOVE_EXPIRED_SHUFFLE = 39; @@ -381,11 +381,6 @@ message PbDestroyWorkerSlotsResponse { repeated string failedSlaves = 3; } -message PbGetWorkerInfosResponse { - int32 status = 1; - repeated PbWorkerInfo workerInfos = 2; -} - message PbCheckForWorkerTimeout { } diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index ca2cc52ad..43034aada 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -418,9 +418,6 @@ object ControlMessages extends Logging { * common * ========================================== */ - case object GetWorkerInfos extends Message - - case class GetWorkerInfosResponse(status: StatusCode, workerInfos: WorkerInfo*) extends Message // TODO change message type to GeneratedMessageV3 def toTransportMessage(message: Any): TransportMessage = message match { @@ -775,19 +772,6 @@ object ControlMessages extends Logging { val payload = builder.build().toByteArray new TransportMessage(MessageType.DESTROY_RESPONSE, payload) - case GetWorkerInfos => - new TransportMessage(MessageType.GET_WORKER_INFO, null) - - case GetWorkerInfosResponse(status, workerInfos @ _*) => - val payload = PbGetWorkerInfosResponse.newBuilder() - .setStatus(status.getValue) - .addAllWorkerInfos(workerInfos.map { workerInfo => - PbSerDeUtils.toPbWorkerInfo(workerInfo, false) - } - .toList.asJava) - .build().toByteArray - new TransportMessage(MessageType.GET_WORKER_INFO_RESPONSE, payload) - case pb: PbPartitionSplit => new TransportMessage(MessageType.PARTITION_SPLIT, pb.toByteArray) @@ -1071,16 +1055,6 @@ object ControlMessages extends Logging { pbDestroyResponse.getFailedMastersList, pbDestroyResponse.getFailedSlavesList) - case GET_WORKER_INFO => - GetWorkerInfos - - case GET_WORKER_INFO_RESPONSE => - val pbGetWorkerInfoResponse = PbGetWorkerInfosResponse.parseFrom(message.getPayload) - GetWorkerInfosResponse( - Utils.toStatusCode(pbGetWorkerInfoResponse.getStatus), - pbGetWorkerInfoResponse.getWorkerInfosList.asScala - .map(PbSerDeUtils.fromPbWorkerInfo).toList: _*) - case REMOVE_EXPIRED_SHUFFLE => RemoveExpiredShuffle diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index ee1dc1488..eda5d6022 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -329,9 +329,6 @@ private[celeborn] class Master( estimatedAppDiskUsage, requestId)) - case GetWorkerInfos => - executeWithLeaderChecker(context, handleGetWorkerInfos(context)) - case ReportWorkerUnavailable(failedWorkers: util.List[WorkerInfo], requestId: String) => executeWithLeaderChecker( context, @@ -639,10 +636,6 @@ private[celeborn] class Master( msg.localExcludedWorkers)) } - private def handleGetWorkerInfos(context: RpcCallContext): Unit = { - context.reply(GetWorkerInfosResponse(StatusCode.SUCCESS, workersSnapShot.asScala: _*)) - } - private def handleReportNodeUnavailable( context: RpcCallContext, failedWorkers: util.List[WorkerInfo], diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala index d1c5984a5..ef4048a7a 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala @@ -121,9 +121,6 @@ private[deploy] class Controller( logDebug(s"Done processed CommitFiles request with shuffleKey $shuffleKey, in " + s"$commitFilesTimeMs ms.") - case GetWorkerInfos => - handleGetWorkerInfos(context) - case DestroyWorkerSlots(shuffleKey, masterLocations, slaveLocations) => handleDestroy(context, shuffleKey, masterLocations, slaveLocations) } @@ -662,10 +659,4 @@ private[deploy] class Controller( failedSlaves)) } } - - private def handleGetWorkerInfos(context: RpcCallContext): Unit = { - val list = new jArrayList[WorkerInfo]() - list.add(workerInfo) - context.reply(GetWorkerInfosResponse(StatusCode.SUCCESS, list.asScala.toList: _*)) - } }