[CELEBORN-562][REFACTOR] Rename Destroy and DestroyResponse to make it more clear (#1467)

This commit is contained in:
Angerszhuuuu 2023-04-27 12:31:32 +08:00 committed by GitHub
parent 64a4f7274c
commit be84e8ba0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 21 additions and 19 deletions

View File

@ -977,7 +977,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
slotsToDestroy.asScala,
"DestroySlot",
parallelism) { case (workerInfo, (masterLocations, slaveLocations)) =>
val destroy = Destroy(
val destroy = DestroyWorkerSlots(
shuffleKey,
masterLocations.asScala.map(_.getUniqueId).asJava,
slaveLocations.asScala.map(_.getUniqueId).asJava)
@ -987,7 +987,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
s"will retry request destroy.")
res = requestDestroy(
workerInfo.endpoint,
Destroy(shuffleKey, res.failedMasters, res.failedSlaves))
DestroyWorkerSlots(shuffleKey, res.failedMasters, res.failedSlaves))
}
}
}
@ -1060,7 +1060,9 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
}
}
private def requestDestroy(endpoint: RpcEndpointRef, message: Destroy): DestroyResponse = {
private def requestDestroy(
endpoint: RpcEndpointRef,
message: DestroyWorkerSlots): DestroyResponse = {
try {
endpoint.askSync[DestroyResponse](message)
} catch {

View File

@ -50,8 +50,8 @@ enum MessageType {
RESERVE_SLOTS_RESPONSE = 29;
COMMIT_FILES = 30;
COMMIT_FILES_RESPONSE = 31;
DESTROY = 32;
DESTROY_RESPONSE = 33;
DESTROY_WORKER_SLOTS = 32;
DESTROY_WORKER_SLOTS_RESPONSE = 33;
SLAVE_LOST_RESPONSE = 34;
GET_WORKER_INFO = 35;
GET_WORKER_INFO_RESPONSE = 36;
@ -366,13 +366,13 @@ message PbCommitFilesResponse {
int32 fileCount = 9;
}
message PbDestroy {
message PbDestroyWorkerSlots {
string shuffleKey = 1;
repeated string masterLocations = 2;
repeated string slaveLocation = 3;
}
message PbDestroyResponse {
message PbDestroyWorkerSlotsResponse {
int32 status = 1;
repeated string failedMasters = 2;
repeated string failedSlaves = 3;

View File

@ -397,7 +397,7 @@ object ControlMessages extends Logging {
totalWritten: Long = 0,
fileCount: Int = 0) extends WorkerMessage
case class Destroy(
case class DestroyWorkerSlots(
shuffleKey: String,
masterLocations: util.List[String],
slaveLocations: util.List[String])
@ -762,21 +762,21 @@ object ControlMessages extends Logging {
val payload = builder.build().toByteArray
new TransportMessage(MessageType.COMMIT_FILES_RESPONSE, payload)
case Destroy(shuffleKey, masterLocations, slaveLocations) =>
val payload = PbDestroy.newBuilder()
case DestroyWorkerSlots(shuffleKey, masterLocations, slaveLocations) =>
val payload = PbDestroyWorkerSlots.newBuilder()
.setShuffleKey(shuffleKey)
.addAllMasterLocations(masterLocations)
.addAllSlaveLocation(slaveLocations)
.build().toByteArray
new TransportMessage(MessageType.DESTROY, payload)
new TransportMessage(MessageType.DESTROY_WORKER_SLOTS, payload)
case DestroyResponse(status, failedMasters, failedSlaves) =>
val builder = PbDestroyResponse.newBuilder()
val builder = PbDestroyWorkerSlotsResponse.newBuilder()
.setStatus(status.getValue)
builder.addAllFailedMasters(failedMasters)
builder.addAllFailedSlaves(failedSlaves)
val payload = builder.build().toByteArray
new TransportMessage(MessageType.DESTROY_RESPONSE, payload)
new TransportMessage(MessageType.DESTROY_WORKER_SLOTS_RESPONSE, payload)
case SlaveLostResponse(status, slaveLocation) =>
val payload = PbSlaveLostResponse.newBuilder()
@ -1078,15 +1078,15 @@ object ControlMessages extends Logging {
pbCommitFilesResponse.getTotalWritten,
pbCommitFilesResponse.getFileCount)
case DESTROY =>
val pbDestroy = PbDestroy.parseFrom(message.getPayload)
Destroy(
case DESTROY_WORKER_SLOTS =>
val pbDestroy = PbDestroyWorkerSlots.parseFrom(message.getPayload)
DestroyWorkerSlots(
pbDestroy.getShuffleKey,
pbDestroy.getMasterLocationsList,
pbDestroy.getSlaveLocationList)
case DESTROY_RESPONSE =>
val pbDestroyResponse = PbDestroyResponse.parseFrom(message.getPayload)
case DESTROY_WORKER_SLOTS_RESPONSE =>
val pbDestroyResponse = PbDestroyWorkerSlotsResponse.parseFrom(message.getPayload)
DestroyResponse(
Utils.toStatusCode(pbDestroyResponse.getStatus),
pbDestroyResponse.getFailedMastersList,

View File

@ -126,7 +126,7 @@ private[deploy] class Controller(
case ThreadDump =>
handleThreadDump(context)
case Destroy(shuffleKey, masterLocations, slaveLocations) =>
case DestroyWorkerSlots(shuffleKey, masterLocations, slaveLocations) =>
handleDestroy(context, shuffleKey, masterLocations, slaveLocations)
}