diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 7fcc98539..a82ea48f8 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -977,7 +977,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin slotsToDestroy.asScala, "DestroySlot", parallelism) { case (workerInfo, (masterLocations, slaveLocations)) => - val destroy = Destroy( + val destroy = DestroyWorkerSlots( shuffleKey, masterLocations.asScala.map(_.getUniqueId).asJava, slaveLocations.asScala.map(_.getUniqueId).asJava) @@ -987,7 +987,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin s"will retry request destroy.") res = requestDestroy( workerInfo.endpoint, - Destroy(shuffleKey, res.failedMasters, res.failedSlaves)) + DestroyWorkerSlots(shuffleKey, res.failedMasters, res.failedSlaves)) } } } @@ -1060,7 +1060,9 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin } } - private def requestDestroy(endpoint: RpcEndpointRef, message: Destroy): DestroyResponse = { + private def requestDestroy( + endpoint: RpcEndpointRef, + message: DestroyWorkerSlots): DestroyResponse = { try { endpoint.askSync[DestroyResponse](message) } catch { diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index c404fec83..2663d08db 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -50,8 +50,8 @@ enum MessageType { RESERVE_SLOTS_RESPONSE = 29; COMMIT_FILES = 30; COMMIT_FILES_RESPONSE = 31; - DESTROY = 32; - DESTROY_RESPONSE = 33; + DESTROY_WORKER_SLOTS = 32; + DESTROY_WORKER_SLOTS_RESPONSE = 33; SLAVE_LOST_RESPONSE = 34; GET_WORKER_INFO = 35; GET_WORKER_INFO_RESPONSE = 36; @@ -366,13 +366,13 @@ message PbCommitFilesResponse { int32 fileCount = 9; } -message PbDestroy { +message PbDestroyWorkerSlots { string shuffleKey = 1; repeated string masterLocations = 2; repeated string slaveLocation = 3; } -message PbDestroyResponse { +message PbDestroyWorkerSlotsResponse { int32 status = 1; repeated string failedMasters = 2; repeated string failedSlaves = 3; diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 8a0f965ca..e64cec03f 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -397,7 +397,7 @@ object ControlMessages extends Logging { totalWritten: Long = 0, fileCount: Int = 0) extends WorkerMessage - case class Destroy( + case class DestroyWorkerSlots( shuffleKey: String, masterLocations: util.List[String], slaveLocations: util.List[String]) @@ -762,21 +762,21 @@ object ControlMessages extends Logging { val payload = builder.build().toByteArray new TransportMessage(MessageType.COMMIT_FILES_RESPONSE, payload) - case Destroy(shuffleKey, masterLocations, slaveLocations) => - val payload = PbDestroy.newBuilder() + case DestroyWorkerSlots(shuffleKey, masterLocations, slaveLocations) => + val payload = PbDestroyWorkerSlots.newBuilder() .setShuffleKey(shuffleKey) .addAllMasterLocations(masterLocations) .addAllSlaveLocation(slaveLocations) .build().toByteArray - new TransportMessage(MessageType.DESTROY, payload) + new TransportMessage(MessageType.DESTROY_WORKER_SLOTS, payload) case DestroyResponse(status, failedMasters, failedSlaves) => - val builder = PbDestroyResponse.newBuilder() + val builder = PbDestroyWorkerSlotsResponse.newBuilder() .setStatus(status.getValue) builder.addAllFailedMasters(failedMasters) builder.addAllFailedSlaves(failedSlaves) val payload = builder.build().toByteArray - new TransportMessage(MessageType.DESTROY_RESPONSE, payload) + new TransportMessage(MessageType.DESTROY_WORKER_SLOTS_RESPONSE, payload) case SlaveLostResponse(status, slaveLocation) => val payload = PbSlaveLostResponse.newBuilder() @@ -1078,15 +1078,15 @@ object ControlMessages extends Logging { pbCommitFilesResponse.getTotalWritten, pbCommitFilesResponse.getFileCount) - case DESTROY => - val pbDestroy = PbDestroy.parseFrom(message.getPayload) - Destroy( + case DESTROY_WORKER_SLOTS => + val pbDestroy = PbDestroyWorkerSlots.parseFrom(message.getPayload) + DestroyWorkerSlots( pbDestroy.getShuffleKey, pbDestroy.getMasterLocationsList, pbDestroy.getSlaveLocationList) - case DESTROY_RESPONSE => - val pbDestroyResponse = PbDestroyResponse.parseFrom(message.getPayload) + case DESTROY_WORKER_SLOTS_RESPONSE => + val pbDestroyResponse = PbDestroyWorkerSlotsResponse.parseFrom(message.getPayload) DestroyResponse( Utils.toStatusCode(pbDestroyResponse.getStatus), pbDestroyResponse.getFailedMastersList, diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala index 7d65f3403..4bb8ae689 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala @@ -126,7 +126,7 @@ private[deploy] class Controller( case ThreadDump => handleThreadDump(context) - case Destroy(shuffleKey, masterLocations, slaveLocations) => + case DestroyWorkerSlots(shuffleKey, masterLocations, slaveLocations) => handleDestroy(context, shuffleKey, masterLocations, slaveLocations) }