From bb8401e4014bca1e6e6caaab393d8cc736bce33f Mon Sep 17 00:00:00 2001 From: Ethan Feng Date: Mon, 13 Mar 2023 11:15:00 +0800 Subject: [PATCH] [CELEBORN-403][FLINK] Add metrics about buffer dispatcher request queue length. (#1329) --- .../network/server/memory/MemoryManager.java | 4 ++++ .../server/memory/ReadBufferDispatcher.java | 4 ++++ .../service/deploy/worker/Worker.scala | 21 +++++++++++-------- .../service/deploy/worker/WorkerSource.scala | 1 + 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/common/src/main/java/org/apache/celeborn/common/network/server/memory/MemoryManager.java b/common/src/main/java/org/apache/celeborn/common/network/server/memory/MemoryManager.java index dd0273dc4..3bb9cdaef 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/server/memory/MemoryManager.java +++ b/common/src/main/java/org/apache/celeborn/common/network/server/memory/MemoryManager.java @@ -370,6 +370,10 @@ public class MemoryManager { return pausePushDataAndReplicateCounter.sum(); } + public int dispatchRequestsLength() { + return readBufferDispatcher.requestsLength(); + } + enum MemoryManagerStat { resumeAll, pausePushDataAndReplicate, diff --git a/common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java b/common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java index e3a198755..37f834640 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java +++ b/common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java @@ -99,4 +99,8 @@ public class ReadBufferDispatcher extends Thread { } } } + + public int requestsLength() { + return requests.size(); + } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 8d375d2f0..d34f4762b 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -90,7 +90,7 @@ private[celeborn] class Worker( val storageManager = new StorageManager(conf, workerSource) - val memoryTracker = MemoryManager.initialize( + val memoryManager = MemoryManager.initialize( conf.workerDirectMemoryRatioToPauseReceive, conf.workerDirectMemoryRatioToPauseReplicate, conf.workerDirectMemoryRatioToResume, @@ -99,9 +99,9 @@ private[celeborn] class Worker( conf.workerDirectMemoryRatioForShuffleStorage, conf.workerDirectMemoryPressureCheckIntervalMs, conf.workerDirectMemoryReportIntervalSecond) - memoryTracker.registerMemoryListener(storageManager) + memoryManager.registerMemoryListener(storageManager) - val partitionsSorter = new PartitionFilesSorter(memoryTracker, conf, workerSource) + val partitionsSorter = new PartitionFilesSorter(memoryManager, conf, workerSource) if (conf.workerCongestionControlEnabled) { if (conf.workerCongestionControlLowWatermark.isEmpty || conf.workerCongestionControlHighWatermark.isEmpty) { @@ -228,19 +228,22 @@ private[celeborn] class Worker( WorkerSource.RegisteredShuffleCount, _ => workerInfo.getShuffleKeySet.size()) workerSource.addGauge(WorkerSource.SlotsAllocated, _ => workerInfo.allocationsInLastHour()) - workerSource.addGauge(WorkerSource.SortMemory, _ => memoryTracker.getSortMemoryCounter.get()) + workerSource.addGauge(WorkerSource.SortMemory, _ => memoryManager.getSortMemoryCounter.get()) workerSource.addGauge(WorkerSource.SortingFiles, _ => partitionsSorter.getSortingCount) workerSource.addGauge(WorkerSource.SortedFiles, _ => partitionsSorter.getSortedCount) workerSource.addGauge(WorkerSource.SortedFileSize, _ => partitionsSorter.getSortedSize) - workerSource.addGauge(WorkerSource.DiskBuffer, _ => memoryTracker.getDiskBufferCounter.get()) - workerSource.addGauge(WorkerSource.NettyMemory, _ => memoryTracker.getNettyMemoryCounter.get()) - workerSource.addGauge(WorkerSource.PausePushDataCount, _ => memoryTracker.getPausePushDataCounter) + workerSource.addGauge(WorkerSource.DiskBuffer, _ => memoryManager.getDiskBufferCounter.get()) + workerSource.addGauge(WorkerSource.NettyMemory, _ => memoryManager.getNettyMemoryCounter.get()) + workerSource.addGauge(WorkerSource.PausePushDataCount, _ => memoryManager.getPausePushDataCounter) workerSource.addGauge( WorkerSource.PausePushDataAndReplicateCount, - _ => memoryTracker.getPausePushDataAndReplicateCounter) + _ => memoryManager.getPausePushDataAndReplicateCounter) workerSource.addGauge( WorkerSource.BufferStreamReadBuffer, - _ => memoryTracker.getReadBufferCounter.get()) + _ => memoryManager.getReadBufferCounter.get()) + workerSource.addGauge( + WorkerSource.readBufferDispatcherRequestsLength, + _ => memoryManager.dispatchRequestsLength) private def heartBeatToMaster(): Unit = { val activeShuffleKeys = new JHashSet[String]() diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index d3c83f780..ff8e4d570 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -99,6 +99,7 @@ object WorkerSource { val PausePushDataCount = "PausePushData" val PausePushDataAndReplicateCount = "PausePushDataAndReplicate" val BufferStreamReadBuffer = "BufferStreamReadBuffer" + val readBufferDispatcherRequestsLength = "ReadBufferDispatcherRequestsLength" // local device val DeviceOSFreeCapacity = "DeviceOSFreeCapacity(B)"