diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index d25a83298..c6ba510ad 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -283,8 +283,8 @@ message PbCheckQuotaResponse { bool available = 1; } -message PbReportWorkerFailure { - repeated PbWorkerInfo failed = 1; +message PbReportWorkerUnavailable { + repeated PbWorkerInfo unavailable = 1; string requestId = 2; } diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/RPCSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/RPCSource.scala index 09e2b8497..0f0fb8a6f 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/RPCSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/RPCSource.scala @@ -49,8 +49,8 @@ class RPCSource(conf: CelebornConf, role: String) extends AbstractSource(conf, r addCounter(RPCReleaseSlotsSize) addCounter(RPCUnregisterShuffleNum) addCounter(RPCGetBlacklistNum) - addCounter(RPCReportWorkerFailureNum) - addCounter(RPCReportWorkerFailureSize) + addCounter(RPCReportWorkerUnavailableNum) + addCounter(RPCReportWorkerUnavailableSize) addCounter(RPCCheckQuotaNum) def updateMessageMetrics(message: Any, messageLen: Long): Unit = { @@ -87,9 +87,9 @@ class RPCSource(conf: CelebornConf, role: String) extends AbstractSource(conf, r incCounter(RPCUnregisterShuffleNum) case _: GetBlacklist => incCounter(RPCGetBlacklistNum) - case _: ReportWorkerFailure => - incCounter(RPCReportWorkerFailureNum) - incCounter(RPCReportWorkerFailureSize, messageLen) + case _: ReportWorkerUnavailable => + incCounter(RPCReportWorkerUnavailableNum) + incCounter(RPCReportWorkerUnavailableSize, messageLen) case CheckQuota => incCounter(RPCCheckQuotaNum) case _ => // Do nothing @@ -120,7 +120,7 @@ object RPCSource { val RPCReleaseSlotsSize = "RPCReleaseSlotsSize" val RPCUnregisterShuffleNum = "RPCUnregisterShuffleNum" val RPCGetBlacklistNum = "RPCGetBlacklistNum" - val RPCReportWorkerFailureNum = "RPCReportWorkerFailureNum" - val RPCReportWorkerFailureSize = "RPCReportWorkerFailureSize" + val RPCReportWorkerUnavailableNum = "RPCReportWorkerUnavailableNum" + val RPCReportWorkerUnavailableSize = "RPCReportWorkerUnavailableSize" val RPCCheckQuotaNum = "RPCCheckQuotaNum" } diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index c959264d8..a41631eae 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -309,8 +309,8 @@ object ControlMessages extends Logging { case class CheckQuotaResponse(isAvailable: Boolean) extends Message - case class ReportWorkerFailure( - failed: util.List[WorkerInfo], + case class ReportWorkerUnavailable( + unavailable: util.List[WorkerInfo], override var requestId: String = ZERO_UUID) extends MasterRequestMessage /** @@ -610,9 +610,9 @@ object ControlMessages extends Logging { .build().toByteArray new TransportMessage(MessageType.CHECK_QUOTA_RESPONSE, payload) - case ReportWorkerFailure(failed, requestId) => - val payload = PbReportWorkerFailure.newBuilder() - .addAllFailed(failed.asScala.map(PbSerDeUtils.toPbWorkerInfo(_)).toList.asJava) + case ReportWorkerUnavailable(failed, requestId) => + val payload = PbReportWorkerUnavailable.newBuilder() + .addAllUnavailable(failed.asScala.map(PbSerDeUtils.toPbWorkerInfo(_)).toList.asJava) .setRequestId(requestId).build().toByteArray new TransportMessage(MessageType.REPORT_WORKER_FAILURE, payload) @@ -905,11 +905,11 @@ object ControlMessages extends Logging { CheckQuotaResponse(pbCheckAvailableResponse.getAvailable) case REPORT_WORKER_FAILURE => - val pbReportWorkerFailure = PbReportWorkerFailure.parseFrom(message.getPayload) - ReportWorkerFailure( - new util.ArrayList[WorkerInfo](pbReportWorkerFailure.getFailedList + val pbReportWorkerUnavailable = PbReportWorkerUnavailable.parseFrom(message.getPayload) + ReportWorkerUnavailable( + new util.ArrayList[WorkerInfo](pbReportWorkerUnavailable.getUnavailableList .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava), - pbReportWorkerFailure.getRequestId) + pbReportWorkerUnavailable.getRequestId) case REGISTER_WORKER_RESPONSE => PbRegisterWorkerResponse.parseFrom(message.getPayload) diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index 2ea3ec34c..4f612fea9 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -351,7 +351,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler { } } - public void updateBlacklistByReportWorkerFailure(List failedWorkers) { + public void updateBlacklistByReportWorkerUnavailable(List failedWorkers) { synchronized (this.workers) { failedWorkers.retainAll(this.workers); this.blacklist.addAll(failedWorkers); diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java index 1921ab989..c09f3e9c0 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java @@ -72,7 +72,7 @@ public interface IMetadataHandler { Map userResourceConsumption, String requestId); - void handleReportWorkerFailure(List failedNodes, String requestId); + void handleReportWorkerUnavailable(List failedNodes, String requestId); void handleUpdatePartitionSize(); } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java index ae0e399c7..497e81834 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java @@ -116,8 +116,8 @@ public class SingleMasterMetaManager extends AbstractMetaManager { } @Override - public void handleReportWorkerFailure(List failedNodes, String requestId) { - updateBlacklistByReportWorkerFailure(failedNodes); + public void handleReportWorkerUnavailable(List failedNodes, String requestId) { + updateBlacklistByReportWorkerUnavailable(failedNodes); } public void handleUpdatePartitionSize() { diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java index affe026f0..5af627b1e 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java @@ -278,17 +278,17 @@ public class HAMasterMetaManager extends AbstractMetaManager { } @Override - public void handleReportWorkerFailure(List failedNodes, String requestId) { + public void handleReportWorkerUnavailable(List failedNodes, String requestId) { try { List addrs = failedNodes.stream().map(MetaUtil::infoToAddr).collect(Collectors.toList()); ratisServer.submitRequest( ResourceRequest.newBuilder() - .setCmdType(Type.ReportWorkerFailure) + .setCmdType(Type.ReportWorkerUnavailable) .setRequestId(requestId) - .setReportWorkerFailureRequest( - ResourceProtos.ReportWorkerFailureRequest.newBuilder() - .addAllFailedWorker(addrs) + .setReportWorkerUnavailableRequest( + ResourceProtos.ReportWorkerUnavailableRequest.newBuilder() + .addAllUnavailable(addrs) .build()) .build()); } catch (ServiceException e) { diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java index ddaf6bc31..4ae4a3e2e 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java @@ -226,12 +226,12 @@ public class MetaHandler { host, rpcPort, pushPort, fetchPort, replicatePort, disks, userResourceConsumption); break; - case ReportWorkerFailure: + case ReportWorkerUnavailable: List failedAddress = - request.getReportWorkerFailureRequest().getFailedWorkerList(); + request.getReportWorkerUnavailableRequest().getUnavailableList(); List failedWorkers = failedAddress.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList()); - metaSystem.updateBlacklistByReportWorkerFailure(failedWorkers); + metaSystem.updateBlacklistByReportWorkerUnavailable(failedWorkers); break; case UpdatePartitionSize: diff --git a/master/src/main/proto/Resource.proto b/master/src/main/proto/Resource.proto index 4b4bdb1b2..880317612 100644 --- a/master/src/main/proto/Resource.proto +++ b/master/src/main/proto/Resource.proto @@ -29,7 +29,7 @@ enum Type { WorkerLost = 17; WorkerHeartbeat = 18; RegisterWorker = 19; - ReportWorkerFailure = 20; + ReportWorkerUnavailable = 20; UpdatePartitionSize = 21; WorkerRemove = 22; } @@ -47,7 +47,7 @@ message ResourceRequest { optional WorkerLostRequest workerLostRequest = 15; optional WorkerHeartbeatRequest workerHeartbeatRequest = 16; optional RegisterWorkerRequest registerWorkerRequest = 17; - optional ReportWorkerFailureRequest reportWorkerFailureRequest = 18; + optional ReportWorkerUnavailableRequest reportWorkerUnavailableRequest = 18; optional WorkerRemoveRequest workerRemoveRequest = 19; } @@ -127,8 +127,8 @@ message RegisterWorkerRequest { map userResourceConsumption = 7; } -message ReportWorkerFailureRequest { - repeated WorkerAddress failedWorker = 1; +message ReportWorkerUnavailableRequest { + repeated WorkerAddress unavailable = 1; } message WorkerAddress { diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index b58dff559..7de2fe53f 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -299,8 +299,10 @@ private[celeborn] class Master( case GetWorkerInfos => executeWithLeaderChecker(context, handleGetWorkerInfos(context)) - case ReportWorkerFailure(failedWorkers: util.List[WorkerInfo], requestId: String) => - executeWithLeaderChecker(context, handleReportNodeFailure(context, failedWorkers, requestId)) + case ReportWorkerUnavailable(failedWorkers: util.List[WorkerInfo], requestId: String) => + executeWithLeaderChecker( + context, + handleReportNodeUnavailable(context, failedWorkers, requestId)) case CheckQuota(userIdentifier) => executeWithLeaderChecker(context, handleCheckQuota(userIdentifier, context)) @@ -583,13 +585,13 @@ private[celeborn] class Master( context.reply(GetWorkerInfosResponse(StatusCode.SUCCESS, workersSnapShot.asScala: _*)) } - private def handleReportNodeFailure( + private def handleReportNodeUnavailable( context: RpcCallContext, failedWorkers: util.List[WorkerInfo], requestId: String): Unit = { logInfo(s"Receive ReportNodeFailure $failedWorkers, current blacklist" + s"${statusSystem.blacklist}") - statusSystem.handleReportWorkerFailure(failedWorkers, requestId) + statusSystem.handleReportWorkerUnavailable(failedWorkers, requestId) context.reply(OneWayMessageResponse) } diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java index 08966fed1..c37ce34f3 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java @@ -604,7 +604,7 @@ public class DefaultMetaSystemSuiteJ { List failedWorkers = new ArrayList<>(); failedWorkers.add(workerInfo1); - statusSystem.handleReportWorkerFailure(failedWorkers, getNewReqeustId()); + statusSystem.handleReportWorkerUnavailable(failedWorkers, getNewReqeustId()); assert 1 == statusSystem.blacklist.size(); } diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java index d17813370..c55f04679 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java @@ -837,7 +837,7 @@ public class RatisMasterStatusSystemSuiteJ { List failedWorkers = new ArrayList<>(); failedWorkers.add(workerInfo1); - statusSystem.handleReportWorkerFailure(failedWorkers, getNewReqeustId()); + statusSystem.handleReportWorkerUnavailable(failedWorkers, getNewReqeustId()); Thread.sleep(3000L); Assert.assertEquals(1, STATUSSYSTEM1.blacklist.size()); Assert.assertEquals(1, STATUSSYSTEM2.blacklist.size()); diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index ee07b4cc4..40a771d75 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -221,8 +221,20 @@ private[celeborn] class Worker( val shuffleKeys = new JHashSet[String] shuffleKeys.addAll(partitionLocationInfo.shuffleKeySet) shuffleKeys.addAll(storageManager.shuffleKeySet()) - storageManager.updateDiskInfos() - val diskInfos = storageManager.disksSnapshot() + // During shutdown, return an empty diskInfo list to mark this worker as unavailable, + // and avoid remove this from master's blacklist. + val diskInfos = + if (shutdown.get()) { + Seq.empty[DiskInfo] + } else { + storageManager.updateDiskInfos() + workerInfo.updateThenGetDiskInfos( + storageManager.disksSnapshot().map { disk => disk.mountPoint -> disk }.toMap.asJava, + conf.initialEstimatedPartitionSize).values().asScala.toSeq + } + val resourceConsumption = workerInfo.updateThenGetUserResourceConsumption( + storageManager.userResourceConsumptionSnapshot().asJava) + val response = rssHARetryClient.askSync[HeartbeatResponse]( HeartbeatFromWorker( host, @@ -230,11 +242,8 @@ private[celeborn] class Worker( pushPort, fetchPort, replicatePort, - workerInfo.updateThenGetDiskInfos( - diskInfos.map { disk => disk.mountPoint -> disk }.toMap.asJava, - conf.initialEstimatedPartitionSize).values().asScala.toSeq, - workerInfo.updateThenGetUserResourceConsumption( - storageManager.userResourceConsumptionSnapshot().asJava), + diskInfos, + resourceConsumption, shuffleKeys), classOf[HeartbeatResponse]) if (response.registered) { @@ -427,6 +436,10 @@ private[celeborn] class Worker( logInfo("Shutdown hook called.") shutdown.set(true) if (gracefulShutdown) { + // During graceful shutdown, to avoid allocate slots in this worker, + // add this worker to master's blacklist. When restart, register worker will + // make master remove this worker from blacklist. + rssHARetryClient.send(ReportWorkerUnavailable(List(workerInfo).asJava)) val interval = conf.checkSlotsFinishedInterval val timeout = conf.checkSlotsFinishedTimeoutMs var waitTimes = 0