[CELEBORN-1582] Publish metric for unreleased shuffle count when worker was decommissioned

### What changes were proposed in this pull request?

Adding a worker metrics for publish unreleased shuffle count when worker was decommissioned.

<img width="885" alt="Screenshot 2024-09-16 at 11 12 33 AM" src="https://github.com/user-attachments/assets/c81f36c1-cbed-44fe-814b-88f3ff29875d">

### Why are the changes needed?

Currently celeborn don't publish the count of unreleased shuffle key which gets lost when a worker is decommissioned. This can be useful for monitoring and configuring the `forceExitTimeout`.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?
NA

Closes #2711 from s0nskar/unrelease_shuffle_metric.

Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
Sanskar Modi 2024-10-08 17:02:25 +08:00 committed by mingji
parent 6629be858b
commit 961144fdbd
4 changed files with 106 additions and 2 deletions

View File

@ -2653,6 +2653,97 @@
],
"title": "metrics_FlushWorkingQueueSize_Value",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 54
},
"id": 195,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "builder",
"expr": "metrics_UnreleasedShuffleCount_Value",
"instant": false,
"range": true,
"refId": "A"
}
],
"title": "metrics_UnreleasedShuffleCount_Value",
"type": "timeseries"
}
],
"title": "Worker",

View File

@ -233,6 +233,7 @@ These metrics are exposed by Celeborn worker.
| UserProduceSpeed | The speed of user production for congestion control. |
| WorkerConsumeSpeed | The speed of worker consumption for congestion control. |
| IsDecommissioningWorker | 1 means worker decommissioning, 0 means not decommissioning. |
| UnreleasedShuffleCount | Unreleased shuffle count when worker is decommissioning. |
| MemoryStorageFileCount | The count of files in Memory Storage of a worker. |
| MemoryFileStorageSize | The total amount of memory used by Memory Storage. |
| EvictedFileCount | The count of files evicted from Memory Storage to Disk |

View File

@ -435,6 +435,15 @@ private[celeborn] class Worker(
0
}
}
// Unreleased shuffle count when worker is decommissioning
workerSource.addGauge(WorkerSource.UNRELEASED_SHUFFLE_COUNT) { () =>
if (shutdown.get() && (workerStatusManager.currentWorkerStatus.getState == State.InDecommission ||
workerStatusManager.currentWorkerStatus.getState == State.InDecommissionThenIdle)) {
storageManager.shuffleKeySet().size
} else {
0
}
}
workerSource.addGauge(WorkerSource.CLEAN_TASK_QUEUE_SIZE) { () =>
cleanTaskQueue.size()
}
@ -952,11 +961,13 @@ private[celeborn] class Worker(
Thread.sleep(interval)
waitTimes += 1
}
if (storageManager.shuffleKeySet().isEmpty) {
val unreleasedShuffleKeys = storageManager.shuffleKeySet()
if (unreleasedShuffleKeys.isEmpty) {
logInfo(s"Waiting for all shuffle expired cost ${waitTime}ms.")
} else {
logWarning(s"Waiting for all shuffle expired cost ${waitTime}ms, " +
s"unreleased shuffle: \n${storageManager.shuffleKeySet().asScala.mkString("[", ", ", "]")}")
s"unreleased shuffle: \n${unreleasedShuffleKeys.asScala.mkString("[", ", ", "]")}")
}
workerStatusManager.transitionState(State.Exit)
}

View File

@ -222,6 +222,7 @@ object WorkerSource {
// decommission
val IS_DECOMMISSIONING_WORKER = "IsDecommissioningWorker"
val UNRELEASED_SHUFFLE_COUNT = "UnreleasedShuffleCount"
// clean
val CLEAN_TASK_QUEUE_SIZE = "CleanTaskQueueSize"