[CELEBORN-403][FLINK] Add metrics about buffer dispatcher request queue length. (#1329)

This commit is contained in:
Ethan Feng 2023-03-13 11:15:00 +08:00 committed by GitHub
parent c6eced69fc
commit bb8401e401
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 21 additions and 9 deletions

View File

@ -370,6 +370,10 @@ public class MemoryManager {
return pausePushDataAndReplicateCounter.sum();
}
public int dispatchRequestsLength() {
return readBufferDispatcher.requestsLength();
}
enum MemoryManagerStat {
resumeAll,
pausePushDataAndReplicate,

View File

@ -99,4 +99,8 @@ public class ReadBufferDispatcher extends Thread {
}
}
}
public int requestsLength() {
return requests.size();
}
}

View File

@ -90,7 +90,7 @@ private[celeborn] class Worker(
val storageManager = new StorageManager(conf, workerSource)
val memoryTracker = MemoryManager.initialize(
val memoryManager = MemoryManager.initialize(
conf.workerDirectMemoryRatioToPauseReceive,
conf.workerDirectMemoryRatioToPauseReplicate,
conf.workerDirectMemoryRatioToResume,
@ -99,9 +99,9 @@ private[celeborn] class Worker(
conf.workerDirectMemoryRatioForShuffleStorage,
conf.workerDirectMemoryPressureCheckIntervalMs,
conf.workerDirectMemoryReportIntervalSecond)
memoryTracker.registerMemoryListener(storageManager)
memoryManager.registerMemoryListener(storageManager)
val partitionsSorter = new PartitionFilesSorter(memoryTracker, conf, workerSource)
val partitionsSorter = new PartitionFilesSorter(memoryManager, conf, workerSource)
if (conf.workerCongestionControlEnabled) {
if (conf.workerCongestionControlLowWatermark.isEmpty || conf.workerCongestionControlHighWatermark.isEmpty) {
@ -228,19 +228,22 @@ private[celeborn] class Worker(
WorkerSource.RegisteredShuffleCount,
_ => workerInfo.getShuffleKeySet.size())
workerSource.addGauge(WorkerSource.SlotsAllocated, _ => workerInfo.allocationsInLastHour())
workerSource.addGauge(WorkerSource.SortMemory, _ => memoryTracker.getSortMemoryCounter.get())
workerSource.addGauge(WorkerSource.SortMemory, _ => memoryManager.getSortMemoryCounter.get())
workerSource.addGauge(WorkerSource.SortingFiles, _ => partitionsSorter.getSortingCount)
workerSource.addGauge(WorkerSource.SortedFiles, _ => partitionsSorter.getSortedCount)
workerSource.addGauge(WorkerSource.SortedFileSize, _ => partitionsSorter.getSortedSize)
workerSource.addGauge(WorkerSource.DiskBuffer, _ => memoryTracker.getDiskBufferCounter.get())
workerSource.addGauge(WorkerSource.NettyMemory, _ => memoryTracker.getNettyMemoryCounter.get())
workerSource.addGauge(WorkerSource.PausePushDataCount, _ => memoryTracker.getPausePushDataCounter)
workerSource.addGauge(WorkerSource.DiskBuffer, _ => memoryManager.getDiskBufferCounter.get())
workerSource.addGauge(WorkerSource.NettyMemory, _ => memoryManager.getNettyMemoryCounter.get())
workerSource.addGauge(WorkerSource.PausePushDataCount, _ => memoryManager.getPausePushDataCounter)
workerSource.addGauge(
WorkerSource.PausePushDataAndReplicateCount,
_ => memoryTracker.getPausePushDataAndReplicateCounter)
_ => memoryManager.getPausePushDataAndReplicateCounter)
workerSource.addGauge(
WorkerSource.BufferStreamReadBuffer,
_ => memoryTracker.getReadBufferCounter.get())
_ => memoryManager.getReadBufferCounter.get())
workerSource.addGauge(
WorkerSource.readBufferDispatcherRequestsLength,
_ => memoryManager.dispatchRequestsLength)
private def heartBeatToMaster(): Unit = {
val activeShuffleKeys = new JHashSet[String]()

View File

@ -99,6 +99,7 @@ object WorkerSource {
val PausePushDataCount = "PausePushData"
val PausePushDataAndReplicateCount = "PausePushDataAndReplicate"
val BufferStreamReadBuffer = "BufferStreamReadBuffer"
val readBufferDispatcherRequestsLength = "ReadBufferDispatcherRequestsLength"
// local device
val DeviceOSFreeCapacity = "DeviceOSFreeCapacity(B)"