[CELEBORN-852] Adding new metrics to record the number of registered …

### What changes were proposed in this pull request?
Adding new metrics to record the number of registered connections

### Why are the changes needed?
Monitor the number of active connections on worker nodes

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
no

Closes #1773 from JQ-Cao/852.

Authored-by: caojiaqing <caojiaqing@bilibili.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
This commit is contained in:
caojiaqing 2023-08-01 21:21:13 +08:00 committed by zky.zhoukeyong
parent 5e6a23fd88
commit 3e266c0cf6
3 changed files with 27 additions and 0 deletions

View File

@ -265,7 +265,14 @@ class FetchHandler(val conf: CelebornConf, val transportConf: TransportConf)
override def checkRegistered: Boolean = registered.get
/** Invoked when the channel associated with the given client is active. */
override def channelActive(client: TransportClient): Unit = {
workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT)
super.channelActive(client)
}
override def channelInactive(client: TransportClient): Unit = {
workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT, -1)
creditStreamManager.connectionTerminated(client.getChannel)
logDebug(s"channel inactive ${client.getSocketAddress}")
}

View File

@ -1100,6 +1100,23 @@ class PushDataHandler extends BaseMessageHandler with Logging {
pushClientFactory.createClient(host, port, partitionId)
}
}
/**
* Invoked when the channel associated with the given client is active.
*/
override def channelActive(client: TransportClient): Unit = {
workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT)
super.channelActive(client)
}
/**
* Invoked when the channel associated with the given client is inactive.
* No further requests will come from this client.
*/
override def channelInactive(client: TransportClient): Unit = {
workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT, -1)
super.channelInactive(client)
}
}
object PushDataHandler {

View File

@ -36,6 +36,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, MetricsSyste
addCounter(PUSH_DATA_HANDSHAKE_FAIL_COUNT)
addCounter(REGION_START_FAIL_COUNT)
addCounter(REGION_FINISH_FAIL_COUNT)
addCounter(ACTIVE_CONNECTION_COUNT)
// add Timers
addTimer(COMMIT_FILES_TIME)
@ -94,6 +95,8 @@ object WorkerSource {
// slots
val SLOTS_ALLOCATED = "SlotsAllocated"
val ACTIVE_CONNECTION_COUNT = "ActiveConnectionCount"
// memory
val NETTY_MEMORY = "NettyMemory"
val SORT_TIME = "SortTime"