From 961144fdbd3231e173d8ef3ec2e78fa6db1eda78 Mon Sep 17 00:00:00 2001 From: Sanskar Modi Date: Tue, 8 Oct 2024 17:02:25 +0800 Subject: [PATCH] [CELEBORN-1582] Publish metric for unreleased shuffle count when worker was decommissioned MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? Adding a worker metrics for publish unreleased shuffle count when worker was decommissioned. Screenshot 2024-09-16 at 11 12 33 AM ### Why are the changes needed? Currently celeborn don't publish the count of unreleased shuffle key which gets lost when a worker is decommissioned. This can be useful for monitoring and configuring the `forceExitTimeout`. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? NA Closes #2711 from s0nskar/unrelease_shuffle_metric. Authored-by: Sanskar Modi Signed-off-by: mingji --- assets/grafana/celeborn-dashboard.json | 91 +++++++++++++++++++ docs/monitoring.md | 1 + .../service/deploy/worker/Worker.scala | 15 ++- .../service/deploy/worker/WorkerSource.scala | 1 + 4 files changed, 106 insertions(+), 2 deletions(-) diff --git a/assets/grafana/celeborn-dashboard.json b/assets/grafana/celeborn-dashboard.json index 9b07db6c6..553bd44b0 100644 --- a/assets/grafana/celeborn-dashboard.json +++ b/assets/grafana/celeborn-dashboard.json @@ -2653,6 +2653,97 @@ ], "title": "metrics_FlushWorkingQueueSize_Value", "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 54 + }, + "id": 195, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "builder", + "expr": "metrics_UnreleasedShuffleCount_Value", + "instant": false, + "range": true, + "refId": "A" + } + ], + "title": "metrics_UnreleasedShuffleCount_Value", + "type": "timeseries" } ], "title": "Worker", diff --git a/docs/monitoring.md b/docs/monitoring.md index 5568b8f76..e097abdab 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -233,6 +233,7 @@ These metrics are exposed by Celeborn worker. | UserProduceSpeed | The speed of user production for congestion control. | | WorkerConsumeSpeed | The speed of worker consumption for congestion control. | | IsDecommissioningWorker | 1 means worker decommissioning, 0 means not decommissioning. | + | UnreleasedShuffleCount | Unreleased shuffle count when worker is decommissioning. | | MemoryStorageFileCount | The count of files in Memory Storage of a worker. | | MemoryFileStorageSize | The total amount of memory used by Memory Storage. | | EvictedFileCount | The count of files evicted from Memory Storage to Disk | diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 52c1f4b9a..e3cbd8642 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -435,6 +435,15 @@ private[celeborn] class Worker( 0 } } + // Unreleased shuffle count when worker is decommissioning + workerSource.addGauge(WorkerSource.UNRELEASED_SHUFFLE_COUNT) { () => + if (shutdown.get() && (workerStatusManager.currentWorkerStatus.getState == State.InDecommission || + workerStatusManager.currentWorkerStatus.getState == State.InDecommissionThenIdle)) { + storageManager.shuffleKeySet().size + } else { + 0 + } + } workerSource.addGauge(WorkerSource.CLEAN_TASK_QUEUE_SIZE) { () => cleanTaskQueue.size() } @@ -952,11 +961,13 @@ private[celeborn] class Worker( Thread.sleep(interval) waitTimes += 1 } - if (storageManager.shuffleKeySet().isEmpty) { + + val unreleasedShuffleKeys = storageManager.shuffleKeySet() + if (unreleasedShuffleKeys.isEmpty) { logInfo(s"Waiting for all shuffle expired cost ${waitTime}ms.") } else { logWarning(s"Waiting for all shuffle expired cost ${waitTime}ms, " + - s"unreleased shuffle: \n${storageManager.shuffleKeySet().asScala.mkString("[", ", ", "]")}") + s"unreleased shuffle: \n${unreleasedShuffleKeys.asScala.mkString("[", ", ", "]")}") } workerStatusManager.transitionState(State.Exit) } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index b2777c564..15096fadc 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -222,6 +222,7 @@ object WorkerSource { // decommission val IS_DECOMMISSIONING_WORKER = "IsDecommissioningWorker" + val UNRELEASED_SHUFFLE_COUNT = "UnreleasedShuffleCount" // clean val CLEAN_TASK_QUEUE_SIZE = "CleanTaskQueueSize"