[CELEBORN-1444] Introduce worker decommission metrics and corresponding REST API

### What changes were proposed in this pull request?

Introduce worker decommission metrics and corresponding REST API.

### Why are the changes needed?

In a production environment, due to certain hardware or environmental reasons, our script will automatically decommission the node. At this time, we need to distinguish between graceful shutdown nodes and decommissioned nodes.

If we distinguish shutdown worker and decommission worker metrics, we can achieve better operation and maintenance.

### Does this PR introduce _any_ user-facing change?

Yes.

### How was this patch tested?

- `DefaultMetaSystemSuiteJ#testHandleReportWorkerDecommission`
- `RatisMasterStatusSystemSuiteJ#testHandleReportWorkerDecommission`
- `ApiMasterResourceSuite#decommissionWorkers`
- `ApiWorkerResourceSuite#isDecommissioning`

Closes #2535 from leixm/issue_1444.

Lead-authored-by: Xianming Lei <jerrylei@apache.org>
Co-authored-by: Xianming Lei <31424839+leixm@users.noreply.github.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
This commit is contained in:
Xianming Lei 2024-06-08 11:10:31 +08:00 committed by zky.zhoukeyong
parent 7188e845f7
commit 999510b265
24 changed files with 572 additions and 128 deletions

View File

@ -90,6 +90,7 @@ Here is an example of Grafana dashboard importing.
| LostWorkerCount | master | The count of workers in lost list. |
| ExcludedWorkerCount | master | The count of workers in excluded list. |
| ShutdownWorkerCount | master | The count of workers in shutdown list. |
| DecommissionWorkerCount | master | The count of workers in decommission list. |
| IsActiveMaster | master | Whether the current master is active. |
| PartitionSize | master | The estimated partition size of last 20 flush window whose length is 15 seconds by defaults. |
| OfferSlotsTime | master | The time of offer slots. |
@ -147,6 +148,7 @@ Here is an example of Grafana dashboard importing.
| PotentialConsumeSpeed | worker | This value means speed of potential consumption for congestion control. |
| UserProduceSpeed | worker | This value means speed of user production for congestion control. |
| WorkerConsumeSpeed | worker | This value means speed of worker consumption for congestion control. |
| IsDecommissioningWorker | worker | 1 means worker decommissioning, 0 means not decommissioning. |
| jvm_gc_count | JVM | The GC count of each garbage collector. |
| jvm_gc_time | JVM | The GC cost time of each garbage collector. |
| jvm_memory_heap_init | JVM | The amount of heap init memory. |

View File

@ -1357,6 +1357,96 @@
],
"title": "metrics_ShutdownWorkerCount_Value",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "The count of workers in decommission list.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 34
},
"id": 189,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "metrics_DecommissionWorkerCount_Value",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
"title": "metrics_DecommissionWorkerCount_Value",
"type": "timeseries"
}
],
"title": "Master",
@ -1383,8 +1473,6 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -1432,7 +1520,7 @@
"h": 10,
"w": 24,
"x": 0,
"y": 85
"y": 3
},
"id": 84,
"options": {
@ -1472,98 +1560,6 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 290
},
"id": 48,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "metrics_ActiveSlotsCount_Value",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
"title": "metrics_ActiveSlotsCount_Value",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -1612,7 +1608,7 @@
"h": 9,
"w": 12,
"x": 0,
"y": 95
"y": 13
},
"id": 60,
"options": {
@ -1652,8 +1648,6 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -1702,7 +1696,7 @@
"h": 9,
"w": 12,
"x": 12,
"y": 95
"y": 13
},
"id": 62,
"options": {
@ -1742,8 +1736,6 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -1791,7 +1783,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 104
"y": 22
},
"id": 90,
"options": {
@ -1832,8 +1824,6 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -1881,7 +1871,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 104
"y": 22
},
"id": 92,
"options": {
@ -1922,8 +1912,6 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -1972,7 +1960,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 112
"y": 30
},
"id": 182,
"options": {
@ -2013,8 +2001,6 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -2063,7 +2049,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 112
"y": 30
},
"id": 184,
"options": {
@ -2105,8 +2091,6 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -2155,7 +2139,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 120
"y": 38
},
"id": 181,
"options": {
@ -2199,8 +2183,6 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -2248,7 +2230,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 120
"y": 38
},
"id": 183,
"options": {
@ -2291,8 +2273,6 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -2340,7 +2320,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 128
"y": 46
},
"id": 179,
"options": {
@ -2369,6 +2349,182 @@
],
"title": "metrics_ActiveConnectionCount_Count",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 46
},
"id": 190,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "metrics_IsDecommissioningWorker_Value",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
"title": "metrics_IsDecommissioningWorker_Value",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 54
},
"id": 48,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "metrics_ActiveSlotsCount_Value",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
"title": "metrics_ActiveSlotsCount_Value",
"type": "timeseries"
}
],
"title": "Worker",

View File

@ -102,6 +102,7 @@ enum MessageType {
APPLICATION_META_REQUEST = 79;
BATCH_OPEN_STREAM = 80;
BATCH_OPEN_STREAM_RESPONSE = 81;
REPORT_WORKER_DECOMMISSION = 82;
}
enum StreamType {
@ -625,6 +626,7 @@ message PbSnapshotMetaInfo {
repeated PbWorkerInfo manuallyExcludedWorkers = 14;
map<string, PbWorkerEventInfo> workerEventInfos = 15;
map<string, PbApplicationMeta> applicationMetas = 16;
repeated PbWorkerInfo decommissionWorkers = 17;
}
message PbOpenStream {
@ -789,4 +791,9 @@ message PbPackedPartitionLocationsPair {
message PbPackedWorkerResource {
PbPackedPartitionLocationsPair locationPairs = 1;
string networkLocation = 2;
}
}
message PbReportWorkerDecommission {
repeated PbWorkerInfo workers = 1;
string requestId = 2;
}

View File

@ -383,6 +383,10 @@ object ControlMessages extends Logging {
unavailable: util.List[WorkerInfo],
override var requestId: String = ZERO_UUID) extends MasterRequestMessage
case class ReportWorkerDecommission(
unavailable: util.List[WorkerInfo],
override var requestId: String = ZERO_UUID) extends MasterRequestMessage
object CheckWorkersAvailable {
def apply(): PbCheckWorkersAvailable = {
PbCheckWorkersAvailable.newBuilder().build()
@ -777,6 +781,14 @@ object ControlMessages extends Logging {
.setRequestId(requestId).build().toByteArray
new TransportMessage(MessageType.REPORT_WORKER_FAILURE, payload)
case ReportWorkerDecommission(workers, requestId) =>
val payload = PbReportWorkerDecommission.newBuilder()
.addAllWorkers(workers.asScala.map { workerInfo =>
PbSerDeUtils.toPbWorkerInfo(workerInfo, true)
}.toList.asJava)
.setRequestId(requestId).build().toByteArray
new TransportMessage(MessageType.REPORT_WORKER_DECOMMISSION, payload)
case pb: PbRemoveWorkersUnavailableInfo =>
new TransportMessage(MessageType.REMOVE_WORKERS_UNAVAILABLE_INFO, pb.toByteArray)
@ -1136,6 +1148,13 @@ object ControlMessages extends Logging {
.asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
pbReportWorkerUnavailable.getRequestId)
case REPORT_WORKER_DECOMMISSION_VALUE =>
val pbReportWorkerDecommission = PbReportWorkerDecommission.parseFrom(message.getPayload)
ReportWorkerDecommission(
new util.ArrayList[WorkerInfo](pbReportWorkerDecommission.getWorkersList
.asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
pbReportWorkerDecommission.getRequestId)
case REMOVE_WORKERS_UNAVAILABLE_INFO_VALUE =>
PbRemoveWorkersUnavailableInfo.parseFrom(message.getPayload)

View File

@ -421,7 +421,8 @@ object PbSerDeUtils {
lostWorkers: ConcurrentHashMap[WorkerInfo, java.lang.Long],
shutdownWorkers: java.util.Set[WorkerInfo],
workerEventInfos: ConcurrentHashMap[WorkerInfo, WorkerEventInfo],
applicationMetas: ConcurrentHashMap[String, ApplicationMeta]): PbSnapshotMetaInfo = {
applicationMetas: ConcurrentHashMap[String, ApplicationMeta],
decommissionWorkers: java.util.Set[WorkerInfo]): PbSnapshotMetaInfo = {
val builder = PbSnapshotMetaInfo.newBuilder()
.setEstimatedPartitionSize(estimatedPartitionSize)
.addAllRegisteredShuffle(registeredShuffle)
@ -446,6 +447,8 @@ object PbSerDeUtils {
case (worker, workerEventInfo) =>
(worker.toUniqueId(), PbSerDeUtils.toPbWorkerEventInfo(workerEventInfo))
}.asJava)
.addAllDecommissionWorkers(decommissionWorkers.asScala.map(toPbWorkerInfo(_, true)).asJava)
if (currentAppDiskUsageMetricsSnapshot != null) {
builder.setCurrentAppDiskUsageMetricsSnapshot(
toPbAppDiskUsageSnapshot(currentAppDiskUsageMetricsSnapshot))

View File

@ -103,6 +103,7 @@ These metrics are exposed by Celeborn master.
- LostWorkerCount
- ExcludedWorkerCount
- ShutdownWorkerCount
- DecommissionWorkerCount
- IsActiveMaster
- PartitionSize
- The size of estimated shuffle partition.
@ -235,6 +236,7 @@ These metrics are exposed by Celeborn worker.
- PotentialConsumeSpeed
- UserProduceSpeed
- WorkerConsumeSpeed
- IsDecommissioningWorker
- push_server_usedHeapMemory
- push_server_usedDirectMemory
- push_server_numAllocations
@ -369,6 +371,7 @@ API path listed as below:
| /metrics/prometheus | GET | | List the metrics data in prometheus format of the master. The url path is defined by configure `celeborn.metrics.prometheus.path`. |
| /shuffles | GET | | List all running shuffle keys of the service. It will return all running shuffle's key of the cluster. |
| /shutdownWorkers | GET | | List all shutdown workers of the master. |
| /decommissionWorkers | GET | | List all decommission workers of the master. |
| /threadDump | GET | | List the current thread dump of the master. |
| /workerEventInfo | GET | | List all worker event information of the master. |
| /workerInfo | GET | | List worker information of the service. It will list all registered workers' information. |
@ -384,6 +387,7 @@ API path listed as below:
| /help | GET | | List the available API providers of the worker. |
| /isRegistered | GET | | Show if the worker is registered to the master success. |
| /isShutdown | GET | | Show if the worker is during the process of shutdown. |
| /isDecommissioning | GET | | Show if the worker is during the process of decommission. |
| /listDynamicConfigs | GET | level=${LEVEL} tenant=${TENANT} name=${NAME} | List the dynamic configs of the worker. The parameter level specifies the config level of dynamic configs. The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. The parameter name specifies the user name of TENANT_USER level. Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level. |
| /listPartitionLocationInfo | GET | | List all the living PartitionLocation information in that worker. |
| /listTopDiskUsedApps | GET | | List the top disk usage application ids. It only return application ids running in that worker. |

View File

@ -74,6 +74,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
public final Set<WorkerInfo> excludedWorkers = ConcurrentHashMap.newKeySet();
public final Set<WorkerInfo> manuallyExcludedWorkers = ConcurrentHashMap.newKeySet();
public final Set<WorkerInfo> shutdownWorkers = ConcurrentHashMap.newKeySet();
public final Set<WorkerInfo> decommissionWorkers = ConcurrentHashMap.newKeySet();
public final Set<WorkerInfo> workerLostEvents = ConcurrentHashMap.newKeySet();
protected RpcEnv rpcEnv;
@ -162,6 +163,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
lostWorkers.remove(workerInfo);
shutdownWorkers.remove(workerInfo);
workerEventInfos.remove(workerInfo);
decommissionWorkers.remove(workerInfo);
}
}
}
@ -256,6 +258,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
lostWorkers.remove(workerInfo);
excludedWorkers.remove(workerInfo);
workerEventInfos.remove(workerInfo);
decommissionWorkers.remove(workerInfo);
}
}
@ -283,7 +286,8 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
lostWorkers,
shutdownWorkers,
workerEventInfos,
applicationMetas)
applicationMetas,
decommissionWorkers)
.toByteArray();
Files.write(file.toPath(), snapshotBytes);
}
@ -366,6 +370,11 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
.map(PbSerDeUtils::fromPbWorkerInfo)
.collect(Collectors.toSet()));
decommissionWorkers.addAll(
snapshotMetaInfo.getDecommissionWorkersList().stream()
.map(PbSerDeUtils::fromPbWorkerInfo)
.collect(Collectors.toSet()));
partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten());
partitionTotalFileCount.add(snapshotMetaInfo.getPartitionTotalFileCount());
appDiskUsageMetric.restoreFromSnapshot(
@ -403,6 +412,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
appHeartbeatTime.clear();
excludedWorkers.clear();
shutdownWorkers.clear();
decommissionWorkers.clear();
manuallyExcludedWorkers.clear();
workerLostEvents.clear();
partitionTotalWritten.reset();
@ -436,6 +446,12 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
}
}
public void updateMetaByReportWorkerDecommission(List<WorkerInfo> workers) {
synchronized (this.workers) {
decommissionWorkers.addAll(workers);
}
}
public void updatePartitionSize() {
long oldEstimatedPartitionSize = estimatedPartitionSize;
long tmpTotalWritten = partitionTotalWritten.sumThenReset();

View File

@ -86,4 +86,6 @@ public interface IMetadataHandler {
void handleUpdatePartitionSize();
void handleApplicationMeta(ApplicationMeta applicationMeta);
void handleReportWorkerDecommission(List<WorkerInfo> workers, String requestId);
}

View File

@ -171,4 +171,9 @@ public class SingleMasterMetaManager extends AbstractMetaManager {
public void handleApplicationMeta(ApplicationMeta applicationMeta) {
updateApplicationMeta(applicationMeta);
}
@Override
public void handleReportWorkerDecommission(List<WorkerInfo> workers, String requestId) {
updateMetaByReportWorkerDecommission(workers);
}
}

View File

@ -365,6 +365,26 @@ public class HAMasterMetaManager extends AbstractMetaManager {
}
}
@Override
public void handleReportWorkerDecommission(List<WorkerInfo> workers, String requestId) {
try {
List<ResourceProtos.WorkerAddress> addrs =
workers.stream().map(MetaUtil::infoToAddr).collect(Collectors.toList());
ratisServer.submitRequest(
ResourceRequest.newBuilder()
.setCmdType(Type.ReportWorkerDecommission)
.setRequestId(MasterClient.genRequestId())
.setReportWorkerDecommissionRequest(
ResourceProtos.ReportWorkerDecommissionRequest.newBuilder()
.addAllWorkers(addrs)
.build())
.build());
} catch (CelebornRuntimeException e) {
LOG.error("Handle worker decommission failed!", e);
throw e;
}
}
@Override
public void handleUpdatePartitionSize() {
try {

View File

@ -277,6 +277,14 @@ public class MetaHandler {
metaSystem.updateApplicationMeta(new ApplicationMeta(appId, secret));
break;
case ReportWorkerDecommission:
List<ResourceProtos.WorkerAddress> decommissionList =
request.getReportWorkerDecommissionRequest().getWorkersList();
List<WorkerInfo> decommissionWorkers =
decommissionList.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
metaSystem.updateMetaByReportWorkerDecommission(decommissionWorkers);
break;
default:
throw new IOException("Can not parse this command!" + request);
}

View File

@ -39,6 +39,7 @@ enum Type {
WorkerExclude = 24;
WorkerEvent = 25;
ApplicationMeta = 26;
ReportWorkerDecommission = 27;
}
enum WorkerEventType {
@ -71,6 +72,7 @@ message ResourceRequest {
optional WorkerExcludeRequest workerExcludeRequest = 21;
optional WorkerEventRequest workerEventRequest = 22;
optional ApplicationMetaRequest applicationMetaRequest = 23;
optional ReportWorkerDecommissionRequest reportWorkerDecommissionRequest = 24;
}
message DiskInfo {
@ -180,6 +182,10 @@ message ReportWorkerUnavailableRequest {
repeated WorkerAddress unavailable = 1;
}
message ReportWorkerDecommissionRequest {
repeated WorkerAddress workers = 1;
}
message RemoveWorkersUnavailableInfoRequest {
repeated WorkerAddress unavailable = 1;
}

View File

@ -258,6 +258,10 @@ private[celeborn] class Master(
masterSource.addGauge(MasterSource.IS_ACTIVE_MASTER) { () => isMasterActive }
masterSource.addGauge(MasterSource.DECOMMISSION_WORKER_COUNT) { () =>
statusSystem.decommissionWorkers.size()
}
private val threadsStarted: AtomicBoolean = new AtomicBoolean(false)
rpcEnv.setupEndpoint(RpcNameConstants.MASTER_EP, this)
// Visible for testing
@ -512,6 +516,11 @@ private[celeborn] class Master(
context,
handleReportNodeUnavailable(context, failedWorkers, requestId))
case ReportWorkerDecommission(workers: util.List[WorkerInfo], requestId: String) =>
executeWithLeaderChecker(
context,
handleWorkerDecommission(context, workers, requestId))
case pb: PbWorkerExclude =>
val workersToAdd = new util.ArrayList[WorkerInfo](pb.getWorkersToAddList
.asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
@ -959,6 +968,16 @@ private[celeborn] class Master(
context.reply(OneWayMessageResponse)
}
private def handleWorkerDecommission(
context: RpcCallContext,
workers: util.List[WorkerInfo],
requestId: String): Unit = {
logInfo(s"Receive ReportWorkerDecommission $workers, current decommission workers" +
s"${statusSystem.excludedWorkers}")
statusSystem.handleReportWorkerDecommission(workers, requestId)
context.reply(OneWayMessageResponse)
}
def handleApplicationLost(context: RpcCallContext, appId: String, requestId: String): Unit = {
nonEagerHandler.submit(new Runnable {
override def run(): Unit = {
@ -1023,7 +1042,8 @@ private[celeborn] class Master(
new util.ArrayList(
(statusSystem.excludedWorkers.asScala ++ statusSystem.manuallyExcludedWorkers.asScala).asJava),
needCheckedWorkerList,
new util.ArrayList[WorkerInfo](statusSystem.shutdownWorkers)))
new util.ArrayList[WorkerInfo](
(statusSystem.shutdownWorkers.asScala ++ statusSystem.decommissionWorkers.asScala).asJava)))
} else {
context.reply(OneWayMessageResponse)
}
@ -1215,6 +1235,15 @@ private[celeborn] class Master(
sb.toString()
}
override def getDecommissionWorkers: String = {
val sb = new StringBuilder
sb.append("===================== Decommission Workers in Master ======================\n")
statusSystem.decommissionWorkers.asScala.foreach { worker =>
sb.append(s"${worker.toUniqueId()}\n")
}
sb.toString()
}
override def getExcludedWorkers: String = {
val sb = new StringBuilder
sb.append("===================== Excluded Workers in Master ======================\n")

View File

@ -40,6 +40,8 @@ object MasterSource {
val SHUTDOWN_WORKER_COUNT = "ShutdownWorkerCount"
val DECOMMISSION_WORKER_COUNT = "DecommissionWorkerCount"
val REGISTERED_SHUFFLE_COUNT = "RegisteredShuffleCount"
val RUNNING_APPLICATION_COUNT = "RunningApplicationCount"

View File

@ -65,6 +65,15 @@ class ApiMasterResource extends ApiRequestContext {
@GET
def shutdownWorkers: String = httpService.getShutdownWorkers
@Path("/decommissionWorkers")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description = "List all decommission workers of the master.")
@GET
def decommissionWorkers: String = httpService.getDecommissionWorkers
@Path("/hostnames")
@ApiResponse(
responseCode = "200",

View File

@ -764,4 +764,55 @@ public class DefaultMetaSystemSuiteJ {
assertEquals(appSecret2, statusSystem.applicationMetas.get(appId2).secret());
assertEquals(2, statusSystem.applicationMetas.size());
}
@Test
public void testHandleReportWorkerDecommission() {
statusSystem.handleRegisterWorker(
HOSTNAME1,
RPCPORT1,
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
INTERNALPORT1,
NETWORK_LOCATION1,
disks1,
userResourceConsumption1,
getNewReqeustId());
statusSystem.handleRegisterWorker(
HOSTNAME2,
RPCPORT2,
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
INTERNALPORT2,
NETWORK_LOCATION2,
disks2,
userResourceConsumption2,
getNewReqeustId());
statusSystem.handleRegisterWorker(
HOSTNAME3,
RPCPORT3,
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
INTERNALPORT3,
NETWORK_LOCATION3,
disks3,
userResourceConsumption3,
getNewReqeustId());
List<WorkerInfo> workers = new ArrayList<>();
workers.add(
new WorkerInfo(
HOSTNAME1,
RPCPORT1,
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
INTERNALPORT1,
disks1,
userResourceConsumption1));
statusSystem.handleReportWorkerDecommission(workers, getNewReqeustId());
assertEquals(1, statusSystem.decommissionWorkers.size());
assertTrue(statusSystem.excludedWorkers.isEmpty());
}
}

View File

@ -1278,4 +1278,65 @@ public class RatisMasterStatusSystemSuiteJ {
}
}
}
@Test
public void testHandleReportWorkerDecommission() throws InterruptedException {
AbstractMetaManager statusSystem = pickLeaderStatusSystem();
Assert.assertNotNull(statusSystem);
statusSystem.handleRegisterWorker(
HOSTNAME1,
RPCPORT1,
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
INTERNALPORT1,
NETWORK_LOCATION1,
disks1,
userResourceConsumption1,
getNewReqeustId());
statusSystem.handleRegisterWorker(
HOSTNAME2,
RPCPORT2,
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
INTERNALPORT2,
NETWORK_LOCATION2,
disks2,
userResourceConsumption2,
getNewReqeustId());
statusSystem.handleRegisterWorker(
HOSTNAME3,
RPCPORT3,
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
INTERNALPORT3,
NETWORK_LOCATION3,
disks3,
userResourceConsumption3,
getNewReqeustId());
List<WorkerInfo> workers = new ArrayList<>();
workers.add(
new WorkerInfo(
HOSTNAME1,
RPCPORT1,
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
INTERNALPORT1,
disks1,
userResourceConsumption1));
statusSystem.handleReportWorkerDecommission(workers, getNewReqeustId());
Thread.sleep(3000L);
Assert.assertEquals(1, STATUSSYSTEM1.decommissionWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM2.decommissionWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM3.decommissionWorkers.size());
Assert.assertEquals(0, STATUSSYSTEM1.excludedWorkers.size());
Assert.assertEquals(0, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(0, STATUSSYSTEM3.excludedWorkers.size());
}
}

View File

@ -114,4 +114,9 @@ class ApiMasterResourceSuite extends ApiBaseResourceSuite {
.post(Entity.entity(form, MediaType.APPLICATION_FORM_URLENCODED_TYPE))
assert(200 == response.getStatus)
}
test("decommissionWorkers") {
val response = webTarget.path("decommissionWorkers").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
}
}

View File

@ -159,6 +159,8 @@ abstract class HttpService extends Service with Logging {
def getShutdownWorkers: String = throw new UnsupportedOperationException()
def getDecommissionWorkers: String = throw new UnsupportedOperationException()
def getExcludedWorkers: String = throw new UnsupportedOperationException()
def getHostnameList: String = throw new UnsupportedOperationException()
@ -174,6 +176,8 @@ abstract class HttpService extends Service with Logging {
def isRegistered: String = throw new UnsupportedOperationException()
def isDecommissioning: String = throw new UnsupportedOperationException()
def exit(exitType: String): String = throw new UnsupportedOperationException()
def handleWorkerEvent(workerEventType: String, workers: String): String =

View File

@ -422,6 +422,14 @@ private[celeborn] class Worker(
workerSource.addGauge(WorkerSource.ACTIVE_SLOTS_COUNT) { () =>
workerInfo.usedSlots()
}
workerSource.addGauge(WorkerSource.IS_DECOMMISSIONING_WORKER) { () =>
if (shutdown.get() && (workerStatusManager.currentWorkerStatus.getState == State.InDecommission ||
workerStatusManager.currentWorkerStatus.getState == State.InDecommissionThenIdle)) {
1
} else {
0
}
}
private def highWorkload: Boolean = {
(memoryManager.currentServingState, conf.workerActiveConnectionMax) match {
@ -784,6 +792,16 @@ private[celeborn] class Worker(
sb.toString()
}
override def isDecommissioning: String = {
val sb = new StringBuilder
sb.append("========================= Worker Decommission ==========================\n")
sb.append(
shutdown.get() && (workerStatusManager.currentWorkerStatus.getState == State.InDecommission ||
workerStatusManager.currentWorkerStatus.getState == State.InDecommissionThenIdle))
.append("\n")
sb.toString()
}
override def isRegistered: String = {
val sb = new StringBuilder
sb.append("========================= Worker Registered ==========================\n")
@ -876,10 +894,10 @@ private[celeborn] class Worker(
workerStatusManager.transitionState(State.Exit)
}
def sendWorkerUnavailableToMaster(): Unit = {
def sendWorkerDecommissionToMaster(): Unit = {
try {
masterClient.askSync(
ReportWorkerUnavailable(List(workerInfo).asJava),
ReportWorkerDecommission(List(workerInfo).asJava),
OneWayMessageResponse.getClass)
} catch {
case e: Throwable =>
@ -893,7 +911,7 @@ private[celeborn] class Worker(
def decommissionWorker(): Unit = {
logInfo("Worker start to decommission")
workerStatusManager.transitionState(State.InDecommission)
sendWorkerUnavailableToMaster()
sendWorkerDecommissionToMaster()
shutdown.set(true)
val interval = conf.workerDecommissionCheckInterval
val timeout = conf.workerDecommissionForceExitTimeout

View File

@ -210,4 +210,7 @@ object WorkerSource {
// active shuffle
val ACTIVE_SHUFFLE_SIZE = "ActiveShuffleSize"
val ACTIVE_SHUFFLE_FILE_COUNT = "ActiveShuffleFileCount"
// decommission
val IS_DECOMMISSIONING_WORKER = "IsDecommissioningWorker"
}

View File

@ -140,7 +140,7 @@ private[celeborn] class WorkerStatusManager(conf: CelebornConf) extends Logging
private def decommissionWorkerThenIdle(): Unit = this.synchronized {
shutdown.set(true)
transitionState(State.InDecommissionThenIdle)
worker.sendWorkerUnavailableToMaster()
worker.sendWorkerDecommissionToMaster()
checkIfNeedTransitionStatus()
}

View File

@ -56,6 +56,15 @@ class ApiWorkerResource extends ApiRequestContext {
@GET
def isShutdown: String = httpService.isShutdown
@Path("/isDecommissioning")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description = "Show if the worker is during the process of decommission.")
@GET
def isDecommissioning: String = httpService.isDecommissioning
@Path("/isRegistered")
@ApiResponse(
responseCode = "200",

View File

@ -61,4 +61,9 @@ class ApiWorkerResourceSuite extends ApiBaseResourceSuite with MiniClusterFeatur
val response = webTarget.path("isRegistered").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
}
test("isDecommissioning") {
val response = webTarget.path("isDecommissioning").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
}
}