From 9ba54b39e2f55382fc51e03ea897353859957cef Mon Sep 17 00:00:00 2001 From: Sanskar Modi Date: Mon, 12 May 2025 04:34:44 -0700 Subject: [PATCH] [CELEBORN-1968] Publish metric for unreleased partition location count when worker was gracefully shutdown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? Adding a worker metrics for publish unreleased partition location count when worker was gracefully shutdown. Screenshot 2025-04-16 at 1 19 18 AM ### Why are the changes needed? Similar to https://github.com/apache/celeborn/pull/2711, Currently celeborn don't publish the count of unreleased partition location when worker is gracefully exit. This can be useful for monitoring and configuring the gracefulShutdownTimeout. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? NA Closes #3213 from s0nskar/unrelease_partition_location. Lead-authored-by: Sanskar Modi Co-authored-by: Wang, Fei Signed-off-by: Wang, Fei --- assets/grafana/celeborn-dashboard.json | 96 +++++++++++++++++++ docs/monitoring.md | 1 + .../service/deploy/worker/Worker.scala | 9 ++ .../service/deploy/worker/WorkerSource.scala | 3 + 4 files changed, 109 insertions(+) diff --git a/assets/grafana/celeborn-dashboard.json b/assets/grafana/celeborn-dashboard.json index 2962deadf..8e98e6402 100644 --- a/assets/grafana/celeborn-dashboard.json +++ b/assets/grafana/celeborn-dashboard.json @@ -3236,6 +3236,102 @@ ], "title": "metrics_ PartitionFileSizeBytes_Mean", "type": "timeseries" + }, + { + "datasource": { + "default": false, + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 70 + }, + "id": 238, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "builder", + "expr": "metrics_UnreleasedPartitionLocationCount_Value{role=\"Worker\", instance=~\"${instance}\"}", + "instant": false, + "range": true, + "refId": "A" + } + ], + "title": "metrics_UnreleasedPartitionLocationCount_Value", + "type": "timeseries" } ], "title": "Worker", diff --git a/docs/monitoring.md b/docs/monitoring.md index 377d00aa1..95cefcf92 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -240,6 +240,7 @@ These metrics are exposed by Celeborn worker. | WorkerConsumeSpeed | The speed of worker consumption for congestion control. | | IsDecommissioningWorker | 1 means worker decommissioning, 0 means not decommissioning. | | UnreleasedShuffleCount | Unreleased shuffle count when worker is decommissioning. | + | UnreleasedPartitionLocationCount | Unreleased partition location counit when worker is shutting down. | | MemoryStorageFileCount | The count of files in Memory Storage of a worker. | | MemoryFileStorageSize | The total amount of memory used by Memory Storage. | | EvictedFileCount | The count of files evicted from Memory Storage to Disk | diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index edfb21d21..502740cd7 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -455,6 +455,15 @@ private[celeborn] class Worker( 0 } } + // Unreleased partition location count when worker is restarting + workerSource.addGauge(WorkerSource.UNRELEASED_PARTITION_LOCATION_COUNT) { () => + if (shutdown.get()) { + partitionLocationInfo.primaryPartitionLocations.size() + + partitionLocationInfo.replicaPartitionLocations.size() + } else { + 0 + } + } workerSource.addGauge(WorkerSource.CLEAN_TASK_QUEUE_SIZE) { () => cleanTaskQueue.size() } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index b8e72dd80..c4a82225c 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -231,6 +231,9 @@ object WorkerSource { val IS_DECOMMISSIONING_WORKER = "IsDecommissioningWorker" val UNRELEASED_SHUFFLE_COUNT = "UnreleasedShuffleCount" + // graceful + val UNRELEASED_PARTITION_LOCATION_COUNT = "UnreleasedPartitionLocationCount" + // clean val CLEAN_TASK_QUEUE_SIZE = "CleanTaskQueueSize" val CLEAN_EXPIRED_SHUFFLE_KEYS_TIME = "CleanExpiredShuffleKeysTime"