diff --git a/assets/grafana/celeborn-dashboard.json b/assets/grafana/celeborn-dashboard.json index 9b07db6c6..553bd44b0 100644 --- a/assets/grafana/celeborn-dashboard.json +++ b/assets/grafana/celeborn-dashboard.json @@ -2653,6 +2653,97 @@ ], "title": "metrics_FlushWorkingQueueSize_Value", "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 54 + }, + "id": 195, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "builder", + "expr": "metrics_UnreleasedShuffleCount_Value", + "instant": false, + "range": true, + "refId": "A" + } + ], + "title": "metrics_UnreleasedShuffleCount_Value", + "type": "timeseries" } ], "title": "Worker", diff --git a/docs/monitoring.md b/docs/monitoring.md index 5568b8f76..e097abdab 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -233,6 +233,7 @@ These metrics are exposed by Celeborn worker. | UserProduceSpeed | The speed of user production for congestion control. | | WorkerConsumeSpeed | The speed of worker consumption for congestion control. | | IsDecommissioningWorker | 1 means worker decommissioning, 0 means not decommissioning. | + | UnreleasedShuffleCount | Unreleased shuffle count when worker is decommissioning. | | MemoryStorageFileCount | The count of files in Memory Storage of a worker. | | MemoryFileStorageSize | The total amount of memory used by Memory Storage. | | EvictedFileCount | The count of files evicted from Memory Storage to Disk | diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 52c1f4b9a..e3cbd8642 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -435,6 +435,15 @@ private[celeborn] class Worker( 0 } } + // Unreleased shuffle count when worker is decommissioning + workerSource.addGauge(WorkerSource.UNRELEASED_SHUFFLE_COUNT) { () => + if (shutdown.get() && (workerStatusManager.currentWorkerStatus.getState == State.InDecommission || + workerStatusManager.currentWorkerStatus.getState == State.InDecommissionThenIdle)) { + storageManager.shuffleKeySet().size + } else { + 0 + } + } workerSource.addGauge(WorkerSource.CLEAN_TASK_QUEUE_SIZE) { () => cleanTaskQueue.size() } @@ -952,11 +961,13 @@ private[celeborn] class Worker( Thread.sleep(interval) waitTimes += 1 } - if (storageManager.shuffleKeySet().isEmpty) { + + val unreleasedShuffleKeys = storageManager.shuffleKeySet() + if (unreleasedShuffleKeys.isEmpty) { logInfo(s"Waiting for all shuffle expired cost ${waitTime}ms.") } else { logWarning(s"Waiting for all shuffle expired cost ${waitTime}ms, " + - s"unreleased shuffle: \n${storageManager.shuffleKeySet().asScala.mkString("[", ", ", "]")}") + s"unreleased shuffle: \n${unreleasedShuffleKeys.asScala.mkString("[", ", ", "]")}") } workerStatusManager.transitionState(State.Exit) } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index b2777c564..15096fadc 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -222,6 +222,7 @@ object WorkerSource { // decommission val IS_DECOMMISSIONING_WORKER = "IsDecommissioningWorker" + val UNRELEASED_SHUFFLE_COUNT = "UnreleasedShuffleCount" // clean val CLEAN_TASK_QUEUE_SIZE = "CleanTaskQueueSize"