[CELEBORN-1968] Publish metric for unreleased partition location count when worker was gracefully shutdown

### What changes were proposed in this pull request?

Adding a worker metrics for publish unreleased partition location count when worker was gracefully shutdown.

<img width="742" alt="Screenshot 2025-04-16 at 1 19 18 AM" src="https://github.com/user-attachments/assets/159f744a-cd76-45a2-9387-930f27dd72be" />

### Why are the changes needed?

Similar to https://github.com/apache/celeborn/pull/2711, Currently celeborn don't publish the count of unreleased partition location when worker is gracefully exit. This can be useful for monitoring and configuring the gracefulShutdownTimeout.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
NA

Closes #3213 from s0nskar/unrelease_partition_location.

Lead-authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Sanskar Modi 2025-05-12 04:34:44 -07:00 committed by Wang, Fei
parent 3896249b92
commit 9ba54b39e2
4 changed files with 109 additions and 0 deletions

View File

@ -3236,6 +3236,102 @@
],
"title": "metrics_ PartitionFileSizeBytes_Mean",
"type": "timeseries"
},
{
"datasource": {
"default": false,
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 70
},
"id": 238,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "builder",
"expr": "metrics_UnreleasedPartitionLocationCount_Value{role=\"Worker\", instance=~\"${instance}\"}",
"instant": false,
"range": true,
"refId": "A"
}
],
"title": "metrics_UnreleasedPartitionLocationCount_Value",
"type": "timeseries"
}
],
"title": "Worker",

View File

@ -240,6 +240,7 @@ These metrics are exposed by Celeborn worker.
| WorkerConsumeSpeed | The speed of worker consumption for congestion control. |
| IsDecommissioningWorker | 1 means worker decommissioning, 0 means not decommissioning. |
| UnreleasedShuffleCount | Unreleased shuffle count when worker is decommissioning. |
| UnreleasedPartitionLocationCount | Unreleased partition location counit when worker is shutting down. |
| MemoryStorageFileCount | The count of files in Memory Storage of a worker. |
| MemoryFileStorageSize | The total amount of memory used by Memory Storage. |
| EvictedFileCount | The count of files evicted from Memory Storage to Disk |

View File

@ -455,6 +455,15 @@ private[celeborn] class Worker(
0
}
}
// Unreleased partition location count when worker is restarting
workerSource.addGauge(WorkerSource.UNRELEASED_PARTITION_LOCATION_COUNT) { () =>
if (shutdown.get()) {
partitionLocationInfo.primaryPartitionLocations.size() +
partitionLocationInfo.replicaPartitionLocations.size()
} else {
0
}
}
workerSource.addGauge(WorkerSource.CLEAN_TASK_QUEUE_SIZE) { () =>
cleanTaskQueue.size()
}

View File

@ -231,6 +231,9 @@ object WorkerSource {
val IS_DECOMMISSIONING_WORKER = "IsDecommissioningWorker"
val UNRELEASED_SHUFFLE_COUNT = "UnreleasedShuffleCount"
// graceful
val UNRELEASED_PARTITION_LOCATION_COUNT = "UnreleasedPartitionLocationCount"
// clean
val CLEAN_TASK_QUEUE_SIZE = "CleanTaskQueueSize"
val CLEAN_EXPIRED_SHUFFLE_KEYS_TIME = "CleanExpiredShuffleKeysTime"