[CELEBORN-1182][FOLLOWUP] Fix WorkerSource record application active connection for application dimension ActiveConnectionCount metric

### What changes were proposed in this pull request?

Fix `WorkerSource` record application active connection for application dimension `ActiveConnectionCount` metric.

Follow up #2167.

### Why are the changes needed?

Application dimension `ActiveConnectionCount` metric does not have value because the check of recording application active connection is wrong for `WorkerSource`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

GA.

```
celebornceleborn-worker-1:/data/service/celeborn$ curl http://celeborn-worker-1:9096/metrics|grep application|grep ActiveConnectionCount
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 53673    0 53673    0     0  2710k      0 --:--:-- --:--:-- --:--:-- 2758k
metrics_ActiveConnectionCount_Value{applicationId="application_1692685933461_14488489",hostName="celeborn-worker-1",role="Worker"} 68 1717418613619
metrics_ActiveConnectionCount_Value{applicationId="application_1692685933461_14488489",hostName="celeborn-worker-1",role="Worker"} 68 1717418613619
```

Closes #2542 from SteNicholas/CELEBORN-1182.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
SteNicholas 2024-06-04 10:48:52 +08:00 committed by mingji
parent dc2826a614
commit aa8ff3c17c
No known key found for this signature in database
GPG Key ID: 6392F71F37356FA0

View File

@ -98,7 +98,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, MetricsSyste
def recordAppActiveConnection(client: TransportClient, shuffleKey: String): Unit = {
val applicationIds = appActiveConnections.get(client.getChannel.id().asLongText())
val applicationId = Utils.splitShuffleKey(shuffleKey)._1
if (CollectionUtils.isNotEmpty(applicationIds) && !applicationIds.contains(applicationId)) {
if (applicationIds != null && !applicationIds.contains(applicationId)) {
applicationIds.add(applicationId)
addGauge(ACTIVE_CONNECTION_COUNT, Map(applicationLabel -> applicationId)) { () =>
appActiveConnections.asScala.count { case (_, applicationIds) =>