celeborn/client-flink/flink-1.16
SteNicholas 68a1db1e3b [CELEBORN-2005][FOLLOWUP] Introduce ShuffleMetricGroup for numBytesIn, numBytesOut, numRecordsOut, numBytesInPerSecond, numBytesOutPerSecond, numRecordsOutPerSecond metrics
### What changes were proposed in this pull request?

Introduce `ShuffleMetricGroup` for `numBytesIn`, `numBytesOut`, `numRecordsOut`, `numBytesInPerSecond`, `numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics.

Follow up #3272.

### Why are the changes needed?

`numBytesIn`, `numBytesOut`, `numRecordsOut`, `numBytesInPerSecond`, `numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics should put shuffle id into variables, which could introduce `ShuffleMetricGroup` to support. Meanwhile, #3272 would print many same logs as follows that shoud be improved:

```
2025-05-28 10:48:54,433 WARN  [flink-akka.actor.default-dispatcher-18] org.apache.flink.metrics.MetricGroup                         [] - Name collision: Group already contains a Metric with the name 'numRecordsOut'. Metric will not be reported.[11.66.62.202, taskmanager, antc4flink3980005426-taskmanager-3-70, antc4flink3980005426, [vertex-2]HashJoin(joinType=[LeftOuterJoin], where=[(f0 = f00)], select=[f0, f1, f2, f3, f4, f5, f6, f00, f10, f20, f30, f40, f50, f60], build=[right]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[f0, f1, f2, f3, f4, f5, f6, f00, f10, f20, f30, f40, f50, f60]), 2, Shuffle, Remote, 1]
```

### Does this PR introduce _any_ user-facing change?

Introduce `celeborn.client.flink.metrics.scope.shuffle` config option to define the scope format string that is applied to all metrics scoped to a shuffle:

- Variables:
   - Shuffle: `<task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>, <shuffle_id>`.
- Metrics:

Scope | Metrics | Description | Type
-- | -- | -- | --
Shuffle | numBytesIn | The total number of bytes this shuffle has read. | Counter |
Shuffle | numBytesOut| The total number of bytes this shuffle has written. | Counter |
Shuffle | numRecordsOut | The total number of records this shuffle has written. | Counter |
Shuffle | numBytesInPerSecond | The number of bytes this shuffle reads per second. | Meter |
Shuffle | numBytesOutPerSecond | The number of bytes this shuffle writes per second. | Meter |
Shuffle | numRecordsOutPerSecond | The number of records this shuffle writes per second. | Meter |

### How was this patch tested?

Manual test.

![image](https://github.com/user-attachments/assets/a10c18ab-84f9-44f5-bb2d-e6b08e5bc64e)
![image](https://github.com/user-attachments/assets/0cb29c17-3388-4608-b7a4-ee7e3c9b43c1)

Closes #3296 from SteNicholas/CELEBORN-2005.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Weijie Guo <reswqa@163.com>
2025-05-30 14:54:28 +08:00
..
src [CELEBORN-2005][FOLLOWUP] Introduce ShuffleMetricGroup for numBytesIn, numBytesOut, numRecordsOut, numBytesInPerSecond, numBytesOutPerSecond, numRecordsOutPerSecond metrics 2025-05-30 14:54:28 +08:00
pom.xml [CELEBORN-1504] Support for Apache Flink 1.16 2024-07-15 10:44:16 +08:00