[CELEBORN-2024] Publish commit files fail count metrics

<!--
Thanks for sending a pull request!  Here are some tips for you:
  - Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] Your PR title ...'.
  - Be sure to keep the PR description updated to reflect all changes.
  - Please write your PR title to summarize what this PR proposes.
  - If possible, provide a concise example to reproduce the issue for a faster review.
-->

### What changes were proposed in this pull request?
Added a commit files request fail count metric.

### Why are the changes needed?
To monitor and tune the configurations around the commit files workflow.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Local setup

<img width="739" alt="Screenshot 2025-06-04 at 10 51 06 AM" src="https://github.com/user-attachments/assets/d6256028-d8b7-4a81-90b1-3dcbf61adeba" />

Closes #3307 from s0nskar/commit_metric.

Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Sanskar Modi 2025-06-17 11:52:45 -07:00 committed by Wang, Fei
parent a0a4260013
commit 2a2c6e4687
4 changed files with 97 additions and 0 deletions

View File

@ -7002,6 +7002,95 @@
],
"title": "metrics_CommitFilesTime_Max",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 71
},
"id": 242,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "metrics_CommitFilesFailCount_Count{role=\"Worker\", instance=~\"${instance}\"}",
"legendFormat": "${baseLegend}",
"refId": "A"
}
],
"title": "metrics_CommitFilesFailCount_Count",
"type": "timeseries"
}
],
"title": "FlushDataRelatives",

View File

@ -214,6 +214,7 @@ These metrics are exposed by Celeborn worker.
| TakeBufferTime | The time for a worker to take out a buffer from a disk flusher. |
| FlushDataTime | The time for a worker to write a buffer which is 256KB by default to storage. |
| CommitFilesTime | The time for a worker to flush buffers and close files related to specified shuffle. |
| CommitFilesFailCount | The count of commit files request failed in current worker. |
| SlotsAllocated | Slots allocated in last hour. |
| ActiveSlotsCount | The number of slots currently being used in a worker. |
| ReserveSlotsTime | ReserveSlots means acquire a disk buffer and record partition location. |

View File

@ -645,6 +645,8 @@ private[deploy] class Controller(
commitInfo.status = CommitInfo.COMMIT_FINISHED
}
workerSource.incCounter(WorkerSource.COMMIT_FILES_FAIL_COUNT)
} else {
// finish, cancel timeout job first.
timeout.cancel()
@ -784,6 +786,8 @@ private[deploy] class Controller(
commitInfo.response = replyResponse
context.reply(replyResponse)
epochIterator.remove()
workerSource.incCounter(WorkerSource.COMMIT_FILES_FAIL_COUNT)
}
}
}

View File

@ -61,6 +61,8 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, Role.WORKER)
addCounter(SLOTS_ALLOCATED)
addCounter(REGISTER_WITH_MASTER_FAIL_COUNT)
addCounter(COMMIT_FILES_FAIL_COUNT)
// add timers
addTimer(COMMIT_FILES_TIME)
addTimer(RESERVE_SLOTS_TIME)
@ -191,6 +193,7 @@ object WorkerSource {
val TAKE_BUFFER_TIME = "TakeBufferTime"
val FLUSH_DATA_TIME = "FlushDataTime"
val COMMIT_FILES_TIME = "CommitFilesTime"
val COMMIT_FILES_FAIL_COUNT = "CommitFilesFailCount"
val FLUSH_WORKING_QUEUE_SIZE = "FlushWorkingQueueSize"
// slots