From 999510b2652befe1600c81e0c13424dcb29124c0 Mon Sep 17 00:00:00 2001 From: Xianming Lei Date: Sat, 8 Jun 2024 11:10:31 +0800 Subject: [PATCH] [CELEBORN-1444] Introduce worker decommission metrics and corresponding REST API ### What changes were proposed in this pull request? Introduce worker decommission metrics and corresponding REST API. ### Why are the changes needed? In a production environment, due to certain hardware or environmental reasons, our script will automatically decommission the node. At this time, we need to distinguish between graceful shutdown nodes and decommissioned nodes. If we distinguish shutdown worker and decommission worker metrics, we can achieve better operation and maintenance. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? - `DefaultMetaSystemSuiteJ#testHandleReportWorkerDecommission` - `RatisMasterStatusSystemSuiteJ#testHandleReportWorkerDecommission` - `ApiMasterResourceSuite#decommissionWorkers` - `ApiWorkerResourceSuite#isDecommissioning` Closes #2535 from leixm/issue_1444. Lead-authored-by: Xianming Lei Co-authored-by: Xianming Lei <31424839+leixm@users.noreply.github.com> Signed-off-by: zky.zhoukeyong --- METRICS.md | 2 + assets/grafana/celeborn-dashboard.json | 396 ++++++++++++------ common/src/main/proto/TransportMessages.proto | 9 +- .../protocol/message/ControlMessages.scala | 19 + .../celeborn/common/util/PbSerDeUtils.scala | 5 +- docs/monitoring.md | 4 + .../clustermeta/AbstractMetaManager.java | 18 +- .../master/clustermeta/IMetadataHandler.java | 2 + .../clustermeta/SingleMasterMetaManager.java | 5 + .../clustermeta/ha/HAMasterMetaManager.java | 20 + .../master/clustermeta/ha/MetaHandler.java | 8 + master/src/main/proto/Resource.proto | 6 + .../service/deploy/master/Master.scala | 31 +- .../service/deploy/master/MasterSource.scala | 2 + .../master/http/api/ApiMasterResource.scala | 9 + .../clustermeta/DefaultMetaSystemSuiteJ.java | 51 +++ .../ha/RatisMasterStatusSystemSuiteJ.java | 61 +++ .../http/api/ApiMasterResourceSuite.scala | 5 + .../celeborn/server/common/HttpService.scala | 4 + .../service/deploy/worker/Worker.scala | 24 +- .../service/deploy/worker/WorkerSource.scala | 3 + .../deploy/worker/WorkerStatusManager.scala | 2 +- .../worker/http/api/ApiWorkerResource.scala | 9 + .../http/api/ApiWorkerResourceSuite.scala | 5 + 24 files changed, 572 insertions(+), 128 deletions(-) diff --git a/METRICS.md b/METRICS.md index f6fed40e9..6c9dc05db 100644 --- a/METRICS.md +++ b/METRICS.md @@ -90,6 +90,7 @@ Here is an example of Grafana dashboard importing. | LostWorkerCount | master | The count of workers in lost list. | | ExcludedWorkerCount | master | The count of workers in excluded list. | | ShutdownWorkerCount | master | The count of workers in shutdown list. | +| DecommissionWorkerCount | master | The count of workers in decommission list. | | IsActiveMaster | master | Whether the current master is active. | | PartitionSize | master | The estimated partition size of last 20 flush window whose length is 15 seconds by defaults. | | OfferSlotsTime | master | The time of offer slots. | @@ -147,6 +148,7 @@ Here is an example of Grafana dashboard importing. | PotentialConsumeSpeed | worker | This value means speed of potential consumption for congestion control. | | UserProduceSpeed | worker | This value means speed of user production for congestion control. | | WorkerConsumeSpeed | worker | This value means speed of worker consumption for congestion control. | +| IsDecommissioningWorker | worker | 1 means worker decommissioning, 0 means not decommissioning. | | jvm_gc_count | JVM | The GC count of each garbage collector. | | jvm_gc_time | JVM | The GC cost time of each garbage collector. | | jvm_memory_heap_init | JVM | The amount of heap init memory. | diff --git a/assets/grafana/celeborn-dashboard.json b/assets/grafana/celeborn-dashboard.json index f914886cd..74e83b540 100644 --- a/assets/grafana/celeborn-dashboard.json +++ b/assets/grafana/celeborn-dashboard.json @@ -1357,6 +1357,96 @@ ], "title": "metrics_ShutdownWorkerCount_Value", "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "description": "The count of workers in decommission list.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 34 + }, + "id": 189, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "metrics_DecommissionWorkerCount_Value", + "legendFormat": "${baseLegend}", + "range": true, + "refId": "A" + } + ], + "title": "metrics_DecommissionWorkerCount_Value", + "type": "timeseries" } ], "title": "Master", @@ -1383,8 +1473,6 @@ "mode": "palette-classic" }, "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -1432,7 +1520,7 @@ "h": 10, "w": 24, "x": 0, - "y": 85 + "y": 3 }, "id": 84, "options": { @@ -1472,98 +1560,6 @@ "mode": "palette-classic" }, "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green" - }, - { - "color": "red", - "value": 80 - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 290 - }, - "id": 48, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "tooltip": { - "mode": "single", - "sort": "none" - } - }, - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" - }, - "expr": "metrics_ActiveSlotsCount_Value", - "legendFormat": "${baseLegend}", - "range": true, - "refId": "A" - } - ], - "title": "metrics_ActiveSlotsCount_Value", - "type": "timeseries" - }, - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -1612,7 +1608,7 @@ "h": 9, "w": 12, "x": 0, - "y": 95 + "y": 13 }, "id": 60, "options": { @@ -1652,8 +1648,6 @@ "mode": "palette-classic" }, "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -1702,7 +1696,7 @@ "h": 9, "w": 12, "x": 12, - "y": 95 + "y": 13 }, "id": 62, "options": { @@ -1742,8 +1736,6 @@ "mode": "palette-classic" }, "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -1791,7 +1783,7 @@ "h": 8, "w": 12, "x": 0, - "y": 104 + "y": 22 }, "id": 90, "options": { @@ -1832,8 +1824,6 @@ "mode": "palette-classic" }, "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -1881,7 +1871,7 @@ "h": 8, "w": 12, "x": 12, - "y": 104 + "y": 22 }, "id": 92, "options": { @@ -1922,8 +1912,6 @@ "mode": "palette-classic" }, "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -1972,7 +1960,7 @@ "h": 8, "w": 12, "x": 0, - "y": 112 + "y": 30 }, "id": 182, "options": { @@ -2013,8 +2001,6 @@ "mode": "palette-classic" }, "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -2063,7 +2049,7 @@ "h": 8, "w": 12, "x": 12, - "y": 112 + "y": 30 }, "id": 184, "options": { @@ -2105,8 +2091,6 @@ "mode": "palette-classic" }, "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -2155,7 +2139,7 @@ "h": 8, "w": 12, "x": 0, - "y": 120 + "y": 38 }, "id": 181, "options": { @@ -2199,8 +2183,6 @@ "mode": "palette-classic" }, "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -2248,7 +2230,7 @@ "h": 8, "w": 12, "x": 12, - "y": 120 + "y": 38 }, "id": 183, "options": { @@ -2291,8 +2273,6 @@ "mode": "palette-classic" }, "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -2340,7 +2320,7 @@ "h": 8, "w": 12, "x": 0, - "y": 128 + "y": 46 }, "id": 179, "options": { @@ -2369,6 +2349,182 @@ ], "title": "metrics_ActiveConnectionCount_Count", "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 46 + }, + "id": 190, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "metrics_IsDecommissioningWorker_Value", + "legendFormat": "${baseLegend}", + "range": true, + "refId": "A" + } + ], + "title": "metrics_IsDecommissioningWorker_Value", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 54 + }, + "id": 48, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "metrics_ActiveSlotsCount_Value", + "legendFormat": "${baseLegend}", + "range": true, + "refId": "A" + } + ], + "title": "metrics_ActiveSlotsCount_Value", + "type": "timeseries" } ], "title": "Worker", diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 95b1769d1..f8de50486 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -102,6 +102,7 @@ enum MessageType { APPLICATION_META_REQUEST = 79; BATCH_OPEN_STREAM = 80; BATCH_OPEN_STREAM_RESPONSE = 81; + REPORT_WORKER_DECOMMISSION = 82; } enum StreamType { @@ -625,6 +626,7 @@ message PbSnapshotMetaInfo { repeated PbWorkerInfo manuallyExcludedWorkers = 14; map workerEventInfos = 15; map applicationMetas = 16; + repeated PbWorkerInfo decommissionWorkers = 17; } message PbOpenStream { @@ -789,4 +791,9 @@ message PbPackedPartitionLocationsPair { message PbPackedWorkerResource { PbPackedPartitionLocationsPair locationPairs = 1; string networkLocation = 2; -} \ No newline at end of file +} + +message PbReportWorkerDecommission { + repeated PbWorkerInfo workers = 1; + string requestId = 2; +} diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 9491ba29d..6b169a6dc 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -383,6 +383,10 @@ object ControlMessages extends Logging { unavailable: util.List[WorkerInfo], override var requestId: String = ZERO_UUID) extends MasterRequestMessage + case class ReportWorkerDecommission( + unavailable: util.List[WorkerInfo], + override var requestId: String = ZERO_UUID) extends MasterRequestMessage + object CheckWorkersAvailable { def apply(): PbCheckWorkersAvailable = { PbCheckWorkersAvailable.newBuilder().build() @@ -777,6 +781,14 @@ object ControlMessages extends Logging { .setRequestId(requestId).build().toByteArray new TransportMessage(MessageType.REPORT_WORKER_FAILURE, payload) + case ReportWorkerDecommission(workers, requestId) => + val payload = PbReportWorkerDecommission.newBuilder() + .addAllWorkers(workers.asScala.map { workerInfo => + PbSerDeUtils.toPbWorkerInfo(workerInfo, true) + }.toList.asJava) + .setRequestId(requestId).build().toByteArray + new TransportMessage(MessageType.REPORT_WORKER_DECOMMISSION, payload) + case pb: PbRemoveWorkersUnavailableInfo => new TransportMessage(MessageType.REMOVE_WORKERS_UNAVAILABLE_INFO, pb.toByteArray) @@ -1136,6 +1148,13 @@ object ControlMessages extends Logging { .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava), pbReportWorkerUnavailable.getRequestId) + case REPORT_WORKER_DECOMMISSION_VALUE => + val pbReportWorkerDecommission = PbReportWorkerDecommission.parseFrom(message.getPayload) + ReportWorkerDecommission( + new util.ArrayList[WorkerInfo](pbReportWorkerDecommission.getWorkersList + .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava), + pbReportWorkerDecommission.getRequestId) + case REMOVE_WORKERS_UNAVAILABLE_INFO_VALUE => PbRemoveWorkersUnavailableInfo.parseFrom(message.getPayload) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala index cffe76857..2274e5780 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala @@ -421,7 +421,8 @@ object PbSerDeUtils { lostWorkers: ConcurrentHashMap[WorkerInfo, java.lang.Long], shutdownWorkers: java.util.Set[WorkerInfo], workerEventInfos: ConcurrentHashMap[WorkerInfo, WorkerEventInfo], - applicationMetas: ConcurrentHashMap[String, ApplicationMeta]): PbSnapshotMetaInfo = { + applicationMetas: ConcurrentHashMap[String, ApplicationMeta], + decommissionWorkers: java.util.Set[WorkerInfo]): PbSnapshotMetaInfo = { val builder = PbSnapshotMetaInfo.newBuilder() .setEstimatedPartitionSize(estimatedPartitionSize) .addAllRegisteredShuffle(registeredShuffle) @@ -446,6 +447,8 @@ object PbSerDeUtils { case (worker, workerEventInfo) => (worker.toUniqueId(), PbSerDeUtils.toPbWorkerEventInfo(workerEventInfo)) }.asJava) + .addAllDecommissionWorkers(decommissionWorkers.asScala.map(toPbWorkerInfo(_, true)).asJava) + if (currentAppDiskUsageMetricsSnapshot != null) { builder.setCurrentAppDiskUsageMetricsSnapshot( toPbAppDiskUsageSnapshot(currentAppDiskUsageMetricsSnapshot)) diff --git a/docs/monitoring.md b/docs/monitoring.md index cc3badbb4..b4eba8686 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -103,6 +103,7 @@ These metrics are exposed by Celeborn master. - LostWorkerCount - ExcludedWorkerCount - ShutdownWorkerCount + - DecommissionWorkerCount - IsActiveMaster - PartitionSize - The size of estimated shuffle partition. @@ -235,6 +236,7 @@ These metrics are exposed by Celeborn worker. - PotentialConsumeSpeed - UserProduceSpeed - WorkerConsumeSpeed + - IsDecommissioningWorker - push_server_usedHeapMemory - push_server_usedDirectMemory - push_server_numAllocations @@ -369,6 +371,7 @@ API path listed as below: | /metrics/prometheus | GET | | List the metrics data in prometheus format of the master. The url path is defined by configure `celeborn.metrics.prometheus.path`. | | /shuffles | GET | | List all running shuffle keys of the service. It will return all running shuffle's key of the cluster. | | /shutdownWorkers | GET | | List all shutdown workers of the master. | +| /decommissionWorkers | GET | | List all decommission workers of the master. | | /threadDump | GET | | List the current thread dump of the master. | | /workerEventInfo | GET | | List all worker event information of the master. | | /workerInfo | GET | | List worker information of the service. It will list all registered workers' information. | @@ -384,6 +387,7 @@ API path listed as below: | /help | GET | | List the available API providers of the worker. | | /isRegistered | GET | | Show if the worker is registered to the master success. | | /isShutdown | GET | | Show if the worker is during the process of shutdown. | +| /isDecommissioning | GET | | Show if the worker is during the process of decommission. | | /listDynamicConfigs | GET | level=${LEVEL} tenant=${TENANT} name=${NAME} | List the dynamic configs of the worker. The parameter level specifies the config level of dynamic configs. The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. The parameter name specifies the user name of TENANT_USER level. Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level. | | /listPartitionLocationInfo | GET | | List all the living PartitionLocation information in that worker. | | /listTopDiskUsedApps | GET | | List the top disk usage application ids. It only return application ids running in that worker. | diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index b9c1c2c67..f3ab34b67 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -74,6 +74,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler { public final Set excludedWorkers = ConcurrentHashMap.newKeySet(); public final Set manuallyExcludedWorkers = ConcurrentHashMap.newKeySet(); public final Set shutdownWorkers = ConcurrentHashMap.newKeySet(); + public final Set decommissionWorkers = ConcurrentHashMap.newKeySet(); public final Set workerLostEvents = ConcurrentHashMap.newKeySet(); protected RpcEnv rpcEnv; @@ -162,6 +163,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler { lostWorkers.remove(workerInfo); shutdownWorkers.remove(workerInfo); workerEventInfos.remove(workerInfo); + decommissionWorkers.remove(workerInfo); } } } @@ -256,6 +258,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler { lostWorkers.remove(workerInfo); excludedWorkers.remove(workerInfo); workerEventInfos.remove(workerInfo); + decommissionWorkers.remove(workerInfo); } } @@ -283,7 +286,8 @@ public abstract class AbstractMetaManager implements IMetadataHandler { lostWorkers, shutdownWorkers, workerEventInfos, - applicationMetas) + applicationMetas, + decommissionWorkers) .toByteArray(); Files.write(file.toPath(), snapshotBytes); } @@ -366,6 +370,11 @@ public abstract class AbstractMetaManager implements IMetadataHandler { .map(PbSerDeUtils::fromPbWorkerInfo) .collect(Collectors.toSet())); + decommissionWorkers.addAll( + snapshotMetaInfo.getDecommissionWorkersList().stream() + .map(PbSerDeUtils::fromPbWorkerInfo) + .collect(Collectors.toSet())); + partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten()); partitionTotalFileCount.add(snapshotMetaInfo.getPartitionTotalFileCount()); appDiskUsageMetric.restoreFromSnapshot( @@ -403,6 +412,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler { appHeartbeatTime.clear(); excludedWorkers.clear(); shutdownWorkers.clear(); + decommissionWorkers.clear(); manuallyExcludedWorkers.clear(); workerLostEvents.clear(); partitionTotalWritten.reset(); @@ -436,6 +446,12 @@ public abstract class AbstractMetaManager implements IMetadataHandler { } } + public void updateMetaByReportWorkerDecommission(List workers) { + synchronized (this.workers) { + decommissionWorkers.addAll(workers); + } + } + public void updatePartitionSize() { long oldEstimatedPartitionSize = estimatedPartitionSize; long tmpTotalWritten = partitionTotalWritten.sumThenReset(); diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java index 95257adc2..2c3a596f1 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java @@ -86,4 +86,6 @@ public interface IMetadataHandler { void handleUpdatePartitionSize(); void handleApplicationMeta(ApplicationMeta applicationMeta); + + void handleReportWorkerDecommission(List workers, String requestId); } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java index 8c2c73e53..4cc547612 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java @@ -171,4 +171,9 @@ public class SingleMasterMetaManager extends AbstractMetaManager { public void handleApplicationMeta(ApplicationMeta applicationMeta) { updateApplicationMeta(applicationMeta); } + + @Override + public void handleReportWorkerDecommission(List workers, String requestId) { + updateMetaByReportWorkerDecommission(workers); + } } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java index 00eb00b60..15183d8ba 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java @@ -365,6 +365,26 @@ public class HAMasterMetaManager extends AbstractMetaManager { } } + @Override + public void handleReportWorkerDecommission(List workers, String requestId) { + try { + List addrs = + workers.stream().map(MetaUtil::infoToAddr).collect(Collectors.toList()); + ratisServer.submitRequest( + ResourceRequest.newBuilder() + .setCmdType(Type.ReportWorkerDecommission) + .setRequestId(MasterClient.genRequestId()) + .setReportWorkerDecommissionRequest( + ResourceProtos.ReportWorkerDecommissionRequest.newBuilder() + .addAllWorkers(addrs) + .build()) + .build()); + } catch (CelebornRuntimeException e) { + LOG.error("Handle worker decommission failed!", e); + throw e; + } + } + @Override public void handleUpdatePartitionSize() { try { diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java index aa5a97827..77d68c598 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java @@ -277,6 +277,14 @@ public class MetaHandler { metaSystem.updateApplicationMeta(new ApplicationMeta(appId, secret)); break; + case ReportWorkerDecommission: + List decommissionList = + request.getReportWorkerDecommissionRequest().getWorkersList(); + List decommissionWorkers = + decommissionList.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList()); + metaSystem.updateMetaByReportWorkerDecommission(decommissionWorkers); + break; + default: throw new IOException("Can not parse this command!" + request); } diff --git a/master/src/main/proto/Resource.proto b/master/src/main/proto/Resource.proto index 660a30cb9..4a944748f 100644 --- a/master/src/main/proto/Resource.proto +++ b/master/src/main/proto/Resource.proto @@ -39,6 +39,7 @@ enum Type { WorkerExclude = 24; WorkerEvent = 25; ApplicationMeta = 26; + ReportWorkerDecommission = 27; } enum WorkerEventType { @@ -71,6 +72,7 @@ message ResourceRequest { optional WorkerExcludeRequest workerExcludeRequest = 21; optional WorkerEventRequest workerEventRequest = 22; optional ApplicationMetaRequest applicationMetaRequest = 23; + optional ReportWorkerDecommissionRequest reportWorkerDecommissionRequest = 24; } message DiskInfo { @@ -180,6 +182,10 @@ message ReportWorkerUnavailableRequest { repeated WorkerAddress unavailable = 1; } +message ReportWorkerDecommissionRequest { + repeated WorkerAddress workers = 1; +} + message RemoveWorkersUnavailableInfoRequest { repeated WorkerAddress unavailable = 1; } diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index b7eb192e8..704c6df6d 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -258,6 +258,10 @@ private[celeborn] class Master( masterSource.addGauge(MasterSource.IS_ACTIVE_MASTER) { () => isMasterActive } + masterSource.addGauge(MasterSource.DECOMMISSION_WORKER_COUNT) { () => + statusSystem.decommissionWorkers.size() + } + private val threadsStarted: AtomicBoolean = new AtomicBoolean(false) rpcEnv.setupEndpoint(RpcNameConstants.MASTER_EP, this) // Visible for testing @@ -512,6 +516,11 @@ private[celeborn] class Master( context, handleReportNodeUnavailable(context, failedWorkers, requestId)) + case ReportWorkerDecommission(workers: util.List[WorkerInfo], requestId: String) => + executeWithLeaderChecker( + context, + handleWorkerDecommission(context, workers, requestId)) + case pb: PbWorkerExclude => val workersToAdd = new util.ArrayList[WorkerInfo](pb.getWorkersToAddList .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava) @@ -959,6 +968,16 @@ private[celeborn] class Master( context.reply(OneWayMessageResponse) } + private def handleWorkerDecommission( + context: RpcCallContext, + workers: util.List[WorkerInfo], + requestId: String): Unit = { + logInfo(s"Receive ReportWorkerDecommission $workers, current decommission workers" + + s"${statusSystem.excludedWorkers}") + statusSystem.handleReportWorkerDecommission(workers, requestId) + context.reply(OneWayMessageResponse) + } + def handleApplicationLost(context: RpcCallContext, appId: String, requestId: String): Unit = { nonEagerHandler.submit(new Runnable { override def run(): Unit = { @@ -1023,7 +1042,8 @@ private[celeborn] class Master( new util.ArrayList( (statusSystem.excludedWorkers.asScala ++ statusSystem.manuallyExcludedWorkers.asScala).asJava), needCheckedWorkerList, - new util.ArrayList[WorkerInfo](statusSystem.shutdownWorkers))) + new util.ArrayList[WorkerInfo]( + (statusSystem.shutdownWorkers.asScala ++ statusSystem.decommissionWorkers.asScala).asJava))) } else { context.reply(OneWayMessageResponse) } @@ -1215,6 +1235,15 @@ private[celeborn] class Master( sb.toString() } + override def getDecommissionWorkers: String = { + val sb = new StringBuilder + sb.append("===================== Decommission Workers in Master ======================\n") + statusSystem.decommissionWorkers.asScala.foreach { worker => + sb.append(s"${worker.toUniqueId()}\n") + } + sb.toString() + } + override def getExcludedWorkers: String = { val sb = new StringBuilder sb.append("===================== Excluded Workers in Master ======================\n") diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala index 94ef037be..71404ced0 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala @@ -40,6 +40,8 @@ object MasterSource { val SHUTDOWN_WORKER_COUNT = "ShutdownWorkerCount" + val DECOMMISSION_WORKER_COUNT = "DecommissionWorkerCount" + val REGISTERED_SHUFFLE_COUNT = "RegisteredShuffleCount" val RUNNING_APPLICATION_COUNT = "RunningApplicationCount" diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala index 876e38716..935255f4e 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala @@ -65,6 +65,15 @@ class ApiMasterResource extends ApiRequestContext { @GET def shutdownWorkers: String = httpService.getShutdownWorkers + @Path("/decommissionWorkers") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = "List all decommission workers of the master.") + @GET + def decommissionWorkers: String = httpService.getDecommissionWorkers + @Path("/hostnames") @ApiResponse( responseCode = "200", diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java index 64fae0b9b..5964ba700 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java @@ -764,4 +764,55 @@ public class DefaultMetaSystemSuiteJ { assertEquals(appSecret2, statusSystem.applicationMetas.get(appId2).secret()); assertEquals(2, statusSystem.applicationMetas.size()); } + + @Test + public void testHandleReportWorkerDecommission() { + statusSystem.handleRegisterWorker( + HOSTNAME1, + RPCPORT1, + PUSHPORT1, + FETCHPORT1, + REPLICATEPORT1, + INTERNALPORT1, + NETWORK_LOCATION1, + disks1, + userResourceConsumption1, + getNewReqeustId()); + statusSystem.handleRegisterWorker( + HOSTNAME2, + RPCPORT2, + PUSHPORT2, + FETCHPORT2, + REPLICATEPORT2, + INTERNALPORT2, + NETWORK_LOCATION2, + disks2, + userResourceConsumption2, + getNewReqeustId()); + statusSystem.handleRegisterWorker( + HOSTNAME3, + RPCPORT3, + PUSHPORT3, + FETCHPORT3, + REPLICATEPORT3, + INTERNALPORT3, + NETWORK_LOCATION3, + disks3, + userResourceConsumption3, + getNewReqeustId()); + List workers = new ArrayList<>(); + workers.add( + new WorkerInfo( + HOSTNAME1, + RPCPORT1, + PUSHPORT1, + FETCHPORT1, + REPLICATEPORT1, + INTERNALPORT1, + disks1, + userResourceConsumption1)); + statusSystem.handleReportWorkerDecommission(workers, getNewReqeustId()); + assertEquals(1, statusSystem.decommissionWorkers.size()); + assertTrue(statusSystem.excludedWorkers.isEmpty()); + } } diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java index 340fb8e27..dcf48ced6 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java @@ -1278,4 +1278,65 @@ public class RatisMasterStatusSystemSuiteJ { } } } + + @Test + public void testHandleReportWorkerDecommission() throws InterruptedException { + AbstractMetaManager statusSystem = pickLeaderStatusSystem(); + Assert.assertNotNull(statusSystem); + + statusSystem.handleRegisterWorker( + HOSTNAME1, + RPCPORT1, + PUSHPORT1, + FETCHPORT1, + REPLICATEPORT1, + INTERNALPORT1, + NETWORK_LOCATION1, + disks1, + userResourceConsumption1, + getNewReqeustId()); + statusSystem.handleRegisterWorker( + HOSTNAME2, + RPCPORT2, + PUSHPORT2, + FETCHPORT2, + REPLICATEPORT2, + INTERNALPORT2, + NETWORK_LOCATION2, + disks2, + userResourceConsumption2, + getNewReqeustId()); + statusSystem.handleRegisterWorker( + HOSTNAME3, + RPCPORT3, + PUSHPORT3, + FETCHPORT3, + REPLICATEPORT3, + INTERNALPORT3, + NETWORK_LOCATION3, + disks3, + userResourceConsumption3, + getNewReqeustId()); + + List workers = new ArrayList<>(); + workers.add( + new WorkerInfo( + HOSTNAME1, + RPCPORT1, + PUSHPORT1, + FETCHPORT1, + REPLICATEPORT1, + INTERNALPORT1, + disks1, + userResourceConsumption1)); + + statusSystem.handleReportWorkerDecommission(workers, getNewReqeustId()); + Thread.sleep(3000L); + Assert.assertEquals(1, STATUSSYSTEM1.decommissionWorkers.size()); + Assert.assertEquals(1, STATUSSYSTEM2.decommissionWorkers.size()); + Assert.assertEquals(1, STATUSSYSTEM3.decommissionWorkers.size()); + Assert.assertEquals(0, STATUSSYSTEM1.excludedWorkers.size()); + Assert.assertEquals(0, STATUSSYSTEM2.excludedWorkers.size()); + Assert.assertEquals(0, STATUSSYSTEM3.excludedWorkers.size()); + } } diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala index 1fc92793c..40bde4a05 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala @@ -114,4 +114,9 @@ class ApiMasterResourceSuite extends ApiBaseResourceSuite { .post(Entity.entity(form, MediaType.APPLICATION_FORM_URLENCODED_TYPE)) assert(200 == response.getStatus) } + + test("decommissionWorkers") { + val response = webTarget.path("decommissionWorkers").request(MediaType.TEXT_PLAIN).get() + assert(200 == response.getStatus) + } } diff --git a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala index a41c18466..6779d1249 100644 --- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala +++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala @@ -159,6 +159,8 @@ abstract class HttpService extends Service with Logging { def getShutdownWorkers: String = throw new UnsupportedOperationException() + def getDecommissionWorkers: String = throw new UnsupportedOperationException() + def getExcludedWorkers: String = throw new UnsupportedOperationException() def getHostnameList: String = throw new UnsupportedOperationException() @@ -174,6 +176,8 @@ abstract class HttpService extends Service with Logging { def isRegistered: String = throw new UnsupportedOperationException() + def isDecommissioning: String = throw new UnsupportedOperationException() + def exit(exitType: String): String = throw new UnsupportedOperationException() def handleWorkerEvent(workerEventType: String, workers: String): String = diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index bc815f6d1..3edcd8cfd 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -422,6 +422,14 @@ private[celeborn] class Worker( workerSource.addGauge(WorkerSource.ACTIVE_SLOTS_COUNT) { () => workerInfo.usedSlots() } + workerSource.addGauge(WorkerSource.IS_DECOMMISSIONING_WORKER) { () => + if (shutdown.get() && (workerStatusManager.currentWorkerStatus.getState == State.InDecommission || + workerStatusManager.currentWorkerStatus.getState == State.InDecommissionThenIdle)) { + 1 + } else { + 0 + } + } private def highWorkload: Boolean = { (memoryManager.currentServingState, conf.workerActiveConnectionMax) match { @@ -784,6 +792,16 @@ private[celeborn] class Worker( sb.toString() } + override def isDecommissioning: String = { + val sb = new StringBuilder + sb.append("========================= Worker Decommission ==========================\n") + sb.append( + shutdown.get() && (workerStatusManager.currentWorkerStatus.getState == State.InDecommission || + workerStatusManager.currentWorkerStatus.getState == State.InDecommissionThenIdle)) + .append("\n") + sb.toString() + } + override def isRegistered: String = { val sb = new StringBuilder sb.append("========================= Worker Registered ==========================\n") @@ -876,10 +894,10 @@ private[celeborn] class Worker( workerStatusManager.transitionState(State.Exit) } - def sendWorkerUnavailableToMaster(): Unit = { + def sendWorkerDecommissionToMaster(): Unit = { try { masterClient.askSync( - ReportWorkerUnavailable(List(workerInfo).asJava), + ReportWorkerDecommission(List(workerInfo).asJava), OneWayMessageResponse.getClass) } catch { case e: Throwable => @@ -893,7 +911,7 @@ private[celeborn] class Worker( def decommissionWorker(): Unit = { logInfo("Worker start to decommission") workerStatusManager.transitionState(State.InDecommission) - sendWorkerUnavailableToMaster() + sendWorkerDecommissionToMaster() shutdown.set(true) val interval = conf.workerDecommissionCheckInterval val timeout = conf.workerDecommissionForceExitTimeout diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index ac5e54cab..09e9cd551 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -210,4 +210,7 @@ object WorkerSource { // active shuffle val ACTIVE_SHUFFLE_SIZE = "ActiveShuffleSize" val ACTIVE_SHUFFLE_FILE_COUNT = "ActiveShuffleFileCount" + + // decommission + val IS_DECOMMISSIONING_WORKER = "IsDecommissioningWorker" } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala index d6648c2fa..61002b06f 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala @@ -140,7 +140,7 @@ private[celeborn] class WorkerStatusManager(conf: CelebornConf) extends Logging private def decommissionWorkerThenIdle(): Unit = this.synchronized { shutdown.set(true) transitionState(State.InDecommissionThenIdle) - worker.sendWorkerUnavailableToMaster() + worker.sendWorkerDecommissionToMaster() checkIfNeedTransitionStatus() } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResource.scala index 0d0b1febd..7dff9348c 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResource.scala @@ -56,6 +56,15 @@ class ApiWorkerResource extends ApiRequestContext { @GET def isShutdown: String = httpService.isShutdown + @Path("/isDecommissioning") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = "Show if the worker is during the process of decommission.") + @GET + def isDecommissioning: String = httpService.isDecommissioning + @Path("/isRegistered") @ApiResponse( responseCode = "200", diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala index d950d0674..ccf5697d4 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala @@ -61,4 +61,9 @@ class ApiWorkerResourceSuite extends ApiBaseResourceSuite with MiniClusterFeatur val response = webTarget.path("isRegistered").request(MediaType.TEXT_PLAIN).get() assert(200 == response.getStatus) } + + test("isDecommissioning") { + val response = webTarget.path("isDecommissioning").request(MediaType.TEXT_PLAIN).get() + assert(200 == response.getStatus) + } }