diff --git a/common/src/main/java/org/apache/celeborn/common/network/server/memory/MemoryManager.java b/common/src/main/java/org/apache/celeborn/common/network/server/memory/MemoryManager.java index 2dd8c04c1..dd0273dc4 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/server/memory/MemoryManager.java +++ b/common/src/main/java/org/apache/celeborn/common/network/server/memory/MemoryManager.java @@ -67,7 +67,7 @@ public class MemoryManager { private boolean underPressure; private final AtomicBoolean trimInProcess = new AtomicBoolean(false); - // For read buffer + // For buffer stream private final AtomicLong readBufferCounter = new AtomicLong(0); private long readBufferThreshold = 0; private final ReadBufferDispatcher readBufferDispatcher; @@ -342,6 +342,10 @@ public class MemoryManager { return diskBufferCounter; } + public AtomicLong getReadBufferCounter() { + return readBufferCounter; + } + public long getPausePushDataCounter() { return pausePushDataCounter.sum(); } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 800cec31d..7617d1c6d 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -239,6 +239,9 @@ private[celeborn] class Worker( workerSource.addGauge( WorkerSource.PausePushDataAndReplicateCount, _ => memoryTracker.getPausePushDataAndReplicateCounter) + workerSource.addGauge( + WorkerSource.BufferStreamReadBuffer, + _ => memoryTracker.getReadBufferCounter.get()) private def heartBeatToMaster(): Unit = { val activeShuffleKeys = new JHashSet[String]() diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index df6625429..ea9b293d7 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -91,6 +91,7 @@ object WorkerSource { val DiskBuffer = "DiskBuffer" val PausePushDataCount = "PausePushData" val PausePushDataAndReplicateCount = "PausePushDataAndReplicate" + val BufferStreamReadBuffer = "BufferStreamReadBuffer" // local device val DeviceOSFreeCapacity = "DeviceOSFreeCapacity(B)"