[CELEBORN-281] Add metrics about buffer stream read buffer. (#1216)
This commit is contained in:
parent
89b4eab3b6
commit
1dcfdb0c8f
@ -67,7 +67,7 @@ public class MemoryManager {
|
||||
private boolean underPressure;
|
||||
private final AtomicBoolean trimInProcess = new AtomicBoolean(false);
|
||||
|
||||
// For read buffer
|
||||
// For buffer stream
|
||||
private final AtomicLong readBufferCounter = new AtomicLong(0);
|
||||
private long readBufferThreshold = 0;
|
||||
private final ReadBufferDispatcher readBufferDispatcher;
|
||||
@ -342,6 +342,10 @@ public class MemoryManager {
|
||||
return diskBufferCounter;
|
||||
}
|
||||
|
||||
public AtomicLong getReadBufferCounter() {
|
||||
return readBufferCounter;
|
||||
}
|
||||
|
||||
public long getPausePushDataCounter() {
|
||||
return pausePushDataCounter.sum();
|
||||
}
|
||||
|
||||
@ -239,6 +239,9 @@ private[celeborn] class Worker(
|
||||
workerSource.addGauge(
|
||||
WorkerSource.PausePushDataAndReplicateCount,
|
||||
_ => memoryTracker.getPausePushDataAndReplicateCounter)
|
||||
workerSource.addGauge(
|
||||
WorkerSource.BufferStreamReadBuffer,
|
||||
_ => memoryTracker.getReadBufferCounter.get())
|
||||
|
||||
private def heartBeatToMaster(): Unit = {
|
||||
val activeShuffleKeys = new JHashSet[String]()
|
||||
|
||||
@ -91,6 +91,7 @@ object WorkerSource {
|
||||
val DiskBuffer = "DiskBuffer"
|
||||
val PausePushDataCount = "PausePushData"
|
||||
val PausePushDataAndReplicateCount = "PausePushDataAndReplicate"
|
||||
val BufferStreamReadBuffer = "BufferStreamReadBuffer"
|
||||
|
||||
// local device
|
||||
val DeviceOSFreeCapacity = "DeviceOSFreeCapacity(B)"
|
||||
|
||||
Loading…
Reference in New Issue
Block a user