[CELEBORN-1459] Introduce CleanTaskQueueSize and CleanExpiredShuffleKeysTime to record situation of cleaning up expired shuffle keys

### What changes were proposed in this pull request?

Introduce `CleanTaskQueueSize` and `CleanExpiredShuffleKeysTime` to record situation of cleaning up expired shuffle keys.

### Why are the changes needed?

There is a backlog of task queue for cleaning up shuffle data of expired shuffle keys in the production environment. It's recommended to introduce `CleanTaskQueueSize` and `CleanExpiredShuffleKeysTime` to record the progress of cleaning up expired shuffle keys.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

[Celeborn Grafana Dashboard](https://stenicholas.grafana.net/public-dashboards/4b5a0b79a35e4ddbb18ddccfe2ec06d7)

Closes #2557 from SteNicholas/CELEBORN-1459.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
SteNicholas 2024-06-18 16:31:57 +08:00 committed by mingji
parent 565e39ac1b
commit c7b1b8d61e
No known key found for this signature in database
GPG Key ID: 6392F71F37356FA0
5 changed files with 303 additions and 24 deletions

View File

@ -143,6 +143,8 @@ Here is an example of Grafana dashboard importing.
| ReadBufferAllocatedCount | worker | This value means count of allocated read buffer. |
| ActiveCreditStreamCount | worker | This value means active count of stream for map partition reading streams. |
| ActiveMapPartitionCount | worker | This value means count of active map partition reading streams. |
| CleanTaskQueueSize | worker | This value means count of task for cleaning up expired shuffle keys. |
| CleanExpiredShuffleKeysTime | worker | CleanExpiredShuffleKeys means clean up shuffle data of expired shuffle keys. |
| DeviceOSFreeBytes | worker | This value means actual usable space of OS for device monitor. |
| DeviceOSTotalBytes | worker | This value means total usable space of OS for device monitor. |
| PotentialConsumeSpeed | worker | This value means speed of potential consumption for congestion control. |

View File

@ -2536,6 +2536,270 @@
],
"title": "metrics_ActiveSlotsCount_Value",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 54
},
"id": 49,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "metrics_CleanTaskQueueSize_Value",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
"title": "metrics_CleanTaskQueueSize_Value",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "ms"
},
"overrides": []
},
"gridPos": {
"h": 9,
"w": 12,
"x": 0,
"y": 62
},
"id": 51,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "metrics_CleanExpiredShuffleKeysTime_Mean",
"legendFormat": "${baseLegend}",
"refId": "A"
}
],
"title": "metrics_CleanExpiredShuffleKeysTime_Mean",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "ms"
},
"overrides": []
},
"gridPos": {
"h": 9,
"w": 12,
"x": 12,
"y": 62
},
"id": 52,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "metrics_CleanExpiredShuffleKeysTime_Max",
"legendFormat": "${baseLegend}",
"refId": "A"
}
],
"title": "metrics_CleanExpiredShuffleKeysTime_Max",
"type": "timeseries"
}
],
"title": "Worker",
@ -2547,7 +2811,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 43
"y": 71
},
"id": 134,
"panels": [
@ -2612,7 +2876,7 @@
"h": 9,
"w": 12,
"x": 0,
"y": 137
"y": 72
},
"id": 68,
"options": {
@ -2702,7 +2966,7 @@
"h": 9,
"w": 12,
"x": 12,
"y": 137
"y": 72
},
"id": 70,
"options": {
@ -2792,7 +3056,7 @@
"h": 9,
"w": 12,
"x": 0,
"y": 146
"y": 81
},
"id": 72,
"options": {
@ -2882,7 +3146,7 @@
"h": 9,
"w": 12,
"x": 12,
"y": 146
"y": 81
},
"id": 74,
"options": {
@ -2971,7 +3235,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 155
"y": 90
},
"id": 83,
"options": {
@ -3062,7 +3326,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 155
"y": 90
},
"id": 76,
"options": {
@ -3153,7 +3417,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 163
"y": 98
},
"id": 128,
"options": {
@ -3244,7 +3508,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 163
"y": 98
},
"id": 129,
"options": {
@ -3335,7 +3599,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 171
"y": 106
},
"id": 130,
"options": {
@ -3426,7 +3690,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 171
"y": 106
},
"id": 132,
"options": {
@ -3608,7 +3872,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 179
"y": 114
},
"id": 133,
"options": {
@ -3699,7 +3963,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 158
"y": 122
},
"id": 79,
"options": {
@ -3740,7 +4004,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 44
"y": 72
},
"id": 12,
"panels": [
@ -4569,7 +4833,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 45
"y": 73
},
"id": 10,
"panels": [
@ -5123,7 +5387,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 46
"y": 74
},
"id": 8,
"panels": [
@ -6522,7 +6786,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 47
"y": 75
},
"id": 50,
"panels": [
@ -7078,7 +7342,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 48
"y": 76
},
"id": 157,
"panels": [
@ -7371,7 +7635,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 49
"y": 77
},
"id": 137,
"panels": [
@ -8762,7 +9026,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 50
"y": 78
},
"id": 110,
"panels": [
@ -8958,7 +9222,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 51
"y": 79
},
"id": 123,
"panels": [
@ -9436,7 +9700,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 52
"y": 80
},
"id": 172,
"panels": [

View File

@ -229,6 +229,9 @@ These metrics are exposed by Celeborn worker.
- ActiveCreditStreamCount
- Active stream count for map partition reading streams.
- ActiveMapPartitionCount
- CleanTaskQueueSize
- CleanExpiredShuffleKeysTime
- The time for a worker to clean up shuffle data of expired shuffle keys.
- DeviceOSFreeBytes
- DeviceOSTotalBytes
- DeviceCelebornFreeBytes

View File

@ -20,7 +20,7 @@ package org.apache.celeborn.service.deploy.worker
import java.io.File
import java.lang.{Long => JLong}
import java.util
import java.util.{HashMap => JHashMap, HashSet => JHashSet, Locale, Map => JMap}
import java.util.{HashMap => JHashMap, HashSet => JHashSet, Locale, Map => JMap, UUID}
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray}
@ -728,7 +728,11 @@ private[celeborn] class Worker(
threadPool.execute(new Runnable {
override def run(): Unit = {
removeAppActiveConnection(expiredApplicationIds)
storageManager.cleanupExpiredShuffleKey(expiredShuffleKeys)
workerSource.sample(
WorkerSource.CLEAN_EXPIRED_SHUFFLE_KEYS_TIME,
s"cleanExpiredShuffleKeys-${UUID.randomUUID()}") {
storageManager.cleanupExpiredShuffleKey(expiredShuffleKeys)
}
}
})
}

View File

@ -78,6 +78,8 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, MetricsSyste
addTimer(TAKE_BUFFER_TIME)
addTimer(SORT_TIME)
addTimer(CLEAN_EXPIRED_SHUFFLE_KEYS_TIME)
def getCounterCount(metricsName: String): Long = {
val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, Map.empty)
namedCounters.get(metricNameWithLabel).counter.getCount
@ -211,4 +213,8 @@ object WorkerSource {
// decommission
val IS_DECOMMISSIONING_WORKER = "IsDecommissioningWorker"
// clean
val CLEAN_TASK_QUEUE_SIZE = "CleanTaskQueueSize"
val CLEAN_EXPIRED_SHUFFLE_KEYS_TIME = "CleanExpiredShuffleKeysTime"
}