[ISSUE-901][BUG] During worker graceful shutdown, worker should report itself as unavailable and avoid master allocate slots on it. (#905)

This commit is contained in:
Angerszhuuuu 2022-11-02 16:09:58 +08:00 committed by GitHub
parent f1694f3d20
commit ea4ed10e5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 62 additions and 47 deletions

View File

@ -283,8 +283,8 @@ message PbCheckQuotaResponse {
bool available = 1;
}
message PbReportWorkerFailure {
repeated PbWorkerInfo failed = 1;
message PbReportWorkerUnavailable {
repeated PbWorkerInfo unavailable = 1;
string requestId = 2;
}

View File

@ -49,8 +49,8 @@ class RPCSource(conf: CelebornConf, role: String) extends AbstractSource(conf, r
addCounter(RPCReleaseSlotsSize)
addCounter(RPCUnregisterShuffleNum)
addCounter(RPCGetBlacklistNum)
addCounter(RPCReportWorkerFailureNum)
addCounter(RPCReportWorkerFailureSize)
addCounter(RPCReportWorkerUnavailableNum)
addCounter(RPCReportWorkerUnavailableSize)
addCounter(RPCCheckQuotaNum)
def updateMessageMetrics(message: Any, messageLen: Long): Unit = {
@ -87,9 +87,9 @@ class RPCSource(conf: CelebornConf, role: String) extends AbstractSource(conf, r
incCounter(RPCUnregisterShuffleNum)
case _: GetBlacklist =>
incCounter(RPCGetBlacklistNum)
case _: ReportWorkerFailure =>
incCounter(RPCReportWorkerFailureNum)
incCounter(RPCReportWorkerFailureSize, messageLen)
case _: ReportWorkerUnavailable =>
incCounter(RPCReportWorkerUnavailableNum)
incCounter(RPCReportWorkerUnavailableSize, messageLen)
case CheckQuota =>
incCounter(RPCCheckQuotaNum)
case _ => // Do nothing
@ -120,7 +120,7 @@ object RPCSource {
val RPCReleaseSlotsSize = "RPCReleaseSlotsSize"
val RPCUnregisterShuffleNum = "RPCUnregisterShuffleNum"
val RPCGetBlacklistNum = "RPCGetBlacklistNum"
val RPCReportWorkerFailureNum = "RPCReportWorkerFailureNum"
val RPCReportWorkerFailureSize = "RPCReportWorkerFailureSize"
val RPCReportWorkerUnavailableNum = "RPCReportWorkerUnavailableNum"
val RPCReportWorkerUnavailableSize = "RPCReportWorkerUnavailableSize"
val RPCCheckQuotaNum = "RPCCheckQuotaNum"
}

View File

@ -309,8 +309,8 @@ object ControlMessages extends Logging {
case class CheckQuotaResponse(isAvailable: Boolean) extends Message
case class ReportWorkerFailure(
failed: util.List[WorkerInfo],
case class ReportWorkerUnavailable(
unavailable: util.List[WorkerInfo],
override var requestId: String = ZERO_UUID) extends MasterRequestMessage
/**
@ -610,9 +610,9 @@ object ControlMessages extends Logging {
.build().toByteArray
new TransportMessage(MessageType.CHECK_QUOTA_RESPONSE, payload)
case ReportWorkerFailure(failed, requestId) =>
val payload = PbReportWorkerFailure.newBuilder()
.addAllFailed(failed.asScala.map(PbSerDeUtils.toPbWorkerInfo(_)).toList.asJava)
case ReportWorkerUnavailable(failed, requestId) =>
val payload = PbReportWorkerUnavailable.newBuilder()
.addAllUnavailable(failed.asScala.map(PbSerDeUtils.toPbWorkerInfo(_)).toList.asJava)
.setRequestId(requestId).build().toByteArray
new TransportMessage(MessageType.REPORT_WORKER_FAILURE, payload)
@ -905,11 +905,11 @@ object ControlMessages extends Logging {
CheckQuotaResponse(pbCheckAvailableResponse.getAvailable)
case REPORT_WORKER_FAILURE =>
val pbReportWorkerFailure = PbReportWorkerFailure.parseFrom(message.getPayload)
ReportWorkerFailure(
new util.ArrayList[WorkerInfo](pbReportWorkerFailure.getFailedList
val pbReportWorkerUnavailable = PbReportWorkerUnavailable.parseFrom(message.getPayload)
ReportWorkerUnavailable(
new util.ArrayList[WorkerInfo](pbReportWorkerUnavailable.getUnavailableList
.asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
pbReportWorkerFailure.getRequestId)
pbReportWorkerUnavailable.getRequestId)
case REGISTER_WORKER_RESPONSE =>
PbRegisterWorkerResponse.parseFrom(message.getPayload)

View File

@ -351,7 +351,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
}
}
public void updateBlacklistByReportWorkerFailure(List<WorkerInfo> failedWorkers) {
public void updateBlacklistByReportWorkerUnavailable(List<WorkerInfo> failedWorkers) {
synchronized (this.workers) {
failedWorkers.retainAll(this.workers);
this.blacklist.addAll(failedWorkers);

View File

@ -72,7 +72,7 @@ public interface IMetadataHandler {
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
String requestId);
void handleReportWorkerFailure(List<WorkerInfo> failedNodes, String requestId);
void handleReportWorkerUnavailable(List<WorkerInfo> failedNodes, String requestId);
void handleUpdatePartitionSize();
}

View File

@ -116,8 +116,8 @@ public class SingleMasterMetaManager extends AbstractMetaManager {
}
@Override
public void handleReportWorkerFailure(List<WorkerInfo> failedNodes, String requestId) {
updateBlacklistByReportWorkerFailure(failedNodes);
public void handleReportWorkerUnavailable(List<WorkerInfo> failedNodes, String requestId) {
updateBlacklistByReportWorkerUnavailable(failedNodes);
}
public void handleUpdatePartitionSize() {

View File

@ -278,17 +278,17 @@ public class HAMasterMetaManager extends AbstractMetaManager {
}
@Override
public void handleReportWorkerFailure(List<WorkerInfo> failedNodes, String requestId) {
public void handleReportWorkerUnavailable(List<WorkerInfo> failedNodes, String requestId) {
try {
List<ResourceProtos.WorkerAddress> addrs =
failedNodes.stream().map(MetaUtil::infoToAddr).collect(Collectors.toList());
ratisServer.submitRequest(
ResourceRequest.newBuilder()
.setCmdType(Type.ReportWorkerFailure)
.setCmdType(Type.ReportWorkerUnavailable)
.setRequestId(requestId)
.setReportWorkerFailureRequest(
ResourceProtos.ReportWorkerFailureRequest.newBuilder()
.addAllFailedWorker(addrs)
.setReportWorkerUnavailableRequest(
ResourceProtos.ReportWorkerUnavailableRequest.newBuilder()
.addAllUnavailable(addrs)
.build())
.build());
} catch (ServiceException e) {

View File

@ -226,12 +226,12 @@ public class MetaHandler {
host, rpcPort, pushPort, fetchPort, replicatePort, disks, userResourceConsumption);
break;
case ReportWorkerFailure:
case ReportWorkerUnavailable:
List<ResourceProtos.WorkerAddress> failedAddress =
request.getReportWorkerFailureRequest().getFailedWorkerList();
request.getReportWorkerUnavailableRequest().getUnavailableList();
List<WorkerInfo> failedWorkers =
failedAddress.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
metaSystem.updateBlacklistByReportWorkerFailure(failedWorkers);
metaSystem.updateBlacklistByReportWorkerUnavailable(failedWorkers);
break;
case UpdatePartitionSize:

View File

@ -29,7 +29,7 @@ enum Type {
WorkerLost = 17;
WorkerHeartbeat = 18;
RegisterWorker = 19;
ReportWorkerFailure = 20;
ReportWorkerUnavailable = 20;
UpdatePartitionSize = 21;
WorkerRemove = 22;
}
@ -47,7 +47,7 @@ message ResourceRequest {
optional WorkerLostRequest workerLostRequest = 15;
optional WorkerHeartbeatRequest workerHeartbeatRequest = 16;
optional RegisterWorkerRequest registerWorkerRequest = 17;
optional ReportWorkerFailureRequest reportWorkerFailureRequest = 18;
optional ReportWorkerUnavailableRequest reportWorkerUnavailableRequest = 18;
optional WorkerRemoveRequest workerRemoveRequest = 19;
}
@ -127,8 +127,8 @@ message RegisterWorkerRequest {
map<string, ResourceConsumption> userResourceConsumption = 7;
}
message ReportWorkerFailureRequest {
repeated WorkerAddress failedWorker = 1;
message ReportWorkerUnavailableRequest {
repeated WorkerAddress unavailable = 1;
}
message WorkerAddress {

View File

@ -299,8 +299,10 @@ private[celeborn] class Master(
case GetWorkerInfos =>
executeWithLeaderChecker(context, handleGetWorkerInfos(context))
case ReportWorkerFailure(failedWorkers: util.List[WorkerInfo], requestId: String) =>
executeWithLeaderChecker(context, handleReportNodeFailure(context, failedWorkers, requestId))
case ReportWorkerUnavailable(failedWorkers: util.List[WorkerInfo], requestId: String) =>
executeWithLeaderChecker(
context,
handleReportNodeUnavailable(context, failedWorkers, requestId))
case CheckQuota(userIdentifier) =>
executeWithLeaderChecker(context, handleCheckQuota(userIdentifier, context))
@ -583,13 +585,13 @@ private[celeborn] class Master(
context.reply(GetWorkerInfosResponse(StatusCode.SUCCESS, workersSnapShot.asScala: _*))
}
private def handleReportNodeFailure(
private def handleReportNodeUnavailable(
context: RpcCallContext,
failedWorkers: util.List[WorkerInfo],
requestId: String): Unit = {
logInfo(s"Receive ReportNodeFailure $failedWorkers, current blacklist" +
s"${statusSystem.blacklist}")
statusSystem.handleReportWorkerFailure(failedWorkers, requestId)
statusSystem.handleReportWorkerUnavailable(failedWorkers, requestId)
context.reply(OneWayMessageResponse)
}

View File

@ -604,7 +604,7 @@ public class DefaultMetaSystemSuiteJ {
List<WorkerInfo> failedWorkers = new ArrayList<>();
failedWorkers.add(workerInfo1);
statusSystem.handleReportWorkerFailure(failedWorkers, getNewReqeustId());
statusSystem.handleReportWorkerUnavailable(failedWorkers, getNewReqeustId());
assert 1 == statusSystem.blacklist.size();
}

View File

@ -837,7 +837,7 @@ public class RatisMasterStatusSystemSuiteJ {
List<WorkerInfo> failedWorkers = new ArrayList<>();
failedWorkers.add(workerInfo1);
statusSystem.handleReportWorkerFailure(failedWorkers, getNewReqeustId());
statusSystem.handleReportWorkerUnavailable(failedWorkers, getNewReqeustId());
Thread.sleep(3000L);
Assert.assertEquals(1, STATUSSYSTEM1.blacklist.size());
Assert.assertEquals(1, STATUSSYSTEM2.blacklist.size());

View File

@ -221,8 +221,20 @@ private[celeborn] class Worker(
val shuffleKeys = new JHashSet[String]
shuffleKeys.addAll(partitionLocationInfo.shuffleKeySet)
shuffleKeys.addAll(storageManager.shuffleKeySet())
storageManager.updateDiskInfos()
val diskInfos = storageManager.disksSnapshot()
// During shutdown, return an empty diskInfo list to mark this worker as unavailable,
// and avoid remove this from master's blacklist.
val diskInfos =
if (shutdown.get()) {
Seq.empty[DiskInfo]
} else {
storageManager.updateDiskInfos()
workerInfo.updateThenGetDiskInfos(
storageManager.disksSnapshot().map { disk => disk.mountPoint -> disk }.toMap.asJava,
conf.initialEstimatedPartitionSize).values().asScala.toSeq
}
val resourceConsumption = workerInfo.updateThenGetUserResourceConsumption(
storageManager.userResourceConsumptionSnapshot().asJava)
val response = rssHARetryClient.askSync[HeartbeatResponse](
HeartbeatFromWorker(
host,
@ -230,11 +242,8 @@ private[celeborn] class Worker(
pushPort,
fetchPort,
replicatePort,
workerInfo.updateThenGetDiskInfos(
diskInfos.map { disk => disk.mountPoint -> disk }.toMap.asJava,
conf.initialEstimatedPartitionSize).values().asScala.toSeq,
workerInfo.updateThenGetUserResourceConsumption(
storageManager.userResourceConsumptionSnapshot().asJava),
diskInfos,
resourceConsumption,
shuffleKeys),
classOf[HeartbeatResponse])
if (response.registered) {
@ -427,6 +436,10 @@ private[celeborn] class Worker(
logInfo("Shutdown hook called.")
shutdown.set(true)
if (gracefulShutdown) {
// During graceful shutdown, to avoid allocate slots in this worker,
// add this worker to master's blacklist. When restart, register worker will
// make master remove this worker from blacklist.
rssHARetryClient.send(ReportWorkerUnavailable(List(workerInfo).asJava))
val interval = conf.checkSlotsFinishedInterval
val timeout = conf.checkSlotsFinishedTimeoutMs
var waitTimes = 0