From 3e266c0cf6247245ab7bcd3e1abb49cf4ece0bca Mon Sep 17 00:00:00 2001 From: caojiaqing Date: Tue, 1 Aug 2023 21:21:13 +0800 Subject: [PATCH] =?UTF-8?q?[CELEBORN-852]=20Adding=20new=20metrics=20to=20?= =?UTF-8?q?record=20the=20number=20of=20registered=20=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? Adding new metrics to record the number of registered connections ### Why are the changes needed? Monitor the number of active connections on worker nodes ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? no Closes #1773 from JQ-Cao/852. Authored-by: caojiaqing Signed-off-by: zky.zhoukeyong --- .../service/deploy/worker/FetchHandler.scala | 7 +++++++ .../service/deploy/worker/PushDataHandler.scala | 17 +++++++++++++++++ .../service/deploy/worker/WorkerSource.scala | 3 +++ 3 files changed, 27 insertions(+) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala index 9b4d08f8c..66e0ecf11 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala @@ -265,7 +265,14 @@ class FetchHandler(val conf: CelebornConf, val transportConf: TransportConf) override def checkRegistered: Boolean = registered.get + /** Invoked when the channel associated with the given client is active. */ + override def channelActive(client: TransportClient): Unit = { + workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT) + super.channelActive(client) + } + override def channelInactive(client: TransportClient): Unit = { + workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT, -1) creditStreamManager.connectionTerminated(client.getChannel) logDebug(s"channel inactive ${client.getSocketAddress}") } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index f93d46d60..97deb7f6b 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -1100,6 +1100,23 @@ class PushDataHandler extends BaseMessageHandler with Logging { pushClientFactory.createClient(host, port, partitionId) } } + + /** + * Invoked when the channel associated with the given client is active. + */ + override def channelActive(client: TransportClient): Unit = { + workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT) + super.channelActive(client) + } + + /** + * Invoked when the channel associated with the given client is inactive. + * No further requests will come from this client. + */ + override def channelInactive(client: TransportClient): Unit = { + workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT, -1) + super.channelInactive(client) + } } object PushDataHandler { diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index 87c755517..edcebbb2e 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -36,6 +36,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, MetricsSyste addCounter(PUSH_DATA_HANDSHAKE_FAIL_COUNT) addCounter(REGION_START_FAIL_COUNT) addCounter(REGION_FINISH_FAIL_COUNT) + addCounter(ACTIVE_CONNECTION_COUNT) // add Timers addTimer(COMMIT_FILES_TIME) @@ -94,6 +95,8 @@ object WorkerSource { // slots val SLOTS_ALLOCATED = "SlotsAllocated" + val ACTIVE_CONNECTION_COUNT = "ActiveConnectionCount" + // memory val NETTY_MEMORY = "NettyMemory" val SORT_TIME = "SortTime"