diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala index 9b4d08f8c..66e0ecf11 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala @@ -265,7 +265,14 @@ class FetchHandler(val conf: CelebornConf, val transportConf: TransportConf) override def checkRegistered: Boolean = registered.get + /** Invoked when the channel associated with the given client is active. */ + override def channelActive(client: TransportClient): Unit = { + workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT) + super.channelActive(client) + } + override def channelInactive(client: TransportClient): Unit = { + workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT, -1) creditStreamManager.connectionTerminated(client.getChannel) logDebug(s"channel inactive ${client.getSocketAddress}") } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index f93d46d60..97deb7f6b 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -1100,6 +1100,23 @@ class PushDataHandler extends BaseMessageHandler with Logging { pushClientFactory.createClient(host, port, partitionId) } } + + /** + * Invoked when the channel associated with the given client is active. + */ + override def channelActive(client: TransportClient): Unit = { + workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT) + super.channelActive(client) + } + + /** + * Invoked when the channel associated with the given client is inactive. + * No further requests will come from this client. + */ + override def channelInactive(client: TransportClient): Unit = { + workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT, -1) + super.channelInactive(client) + } } object PushDataHandler { diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index 87c755517..edcebbb2e 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -36,6 +36,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, MetricsSyste addCounter(PUSH_DATA_HANDSHAKE_FAIL_COUNT) addCounter(REGION_START_FAIL_COUNT) addCounter(REGION_FINISH_FAIL_COUNT) + addCounter(ACTIVE_CONNECTION_COUNT) // add Timers addTimer(COMMIT_FILES_TIME) @@ -94,6 +95,8 @@ object WorkerSource { // slots val SLOTS_ALLOCATED = "SlotsAllocated" + val ACTIVE_CONNECTION_COUNT = "ActiveConnectionCount" + // memory val NETTY_MEMORY = "NettyMemory" val SORT_TIME = "SortTime"