From aa8ff3c17ca99cfca5603a93d7190cfc3d28cfa4 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Tue, 4 Jun 2024 10:48:52 +0800 Subject: [PATCH] [CELEBORN-1182][FOLLOWUP] Fix WorkerSource record application active connection for application dimension ActiveConnectionCount metric ### What changes were proposed in this pull request? Fix `WorkerSource` record application active connection for application dimension `ActiveConnectionCount` metric. Follow up #2167. ### Why are the changes needed? Application dimension `ActiveConnectionCount` metric does not have value because the check of recording application active connection is wrong for `WorkerSource`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA. ``` celebornceleborn-worker-1:/data/service/celeborn$ curl http://celeborn-worker-1:9096/metrics|grep application|grep ActiveConnectionCount % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 53673 0 53673 0 0 2710k 0 --:--:-- --:--:-- --:--:-- 2758k metrics_ActiveConnectionCount_Value{applicationId="application_1692685933461_14488489",hostName="celeborn-worker-1",role="Worker"} 68 1717418613619 metrics_ActiveConnectionCount_Value{applicationId="application_1692685933461_14488489",hostName="celeborn-worker-1",role="Worker"} 68 1717418613619 ``` Closes #2542 from SteNicholas/CELEBORN-1182. Authored-by: SteNicholas Signed-off-by: mingji --- .../apache/celeborn/service/deploy/worker/WorkerSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index 726f7acec..e34488bea 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -98,7 +98,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, MetricsSyste def recordAppActiveConnection(client: TransportClient, shuffleKey: String): Unit = { val applicationIds = appActiveConnections.get(client.getChannel.id().asLongText()) val applicationId = Utils.splitShuffleKey(shuffleKey)._1 - if (CollectionUtils.isNotEmpty(applicationIds) && !applicationIds.contains(applicationId)) { + if (applicationIds != null && !applicationIds.contains(applicationId)) { applicationIds.add(applicationId) addGauge(ACTIVE_CONNECTION_COUNT, Map(applicationLabel -> applicationId)) { () => appActiveConnections.asScala.count { case (_, applicationIds) =>