From 308eed28c92263d7f96cce073c0c9916da5cb1bf Mon Sep 17 00:00:00 2001 From: Shuang Date: Thu, 23 May 2024 16:06:11 +0800 Subject: [PATCH] [CELEBORN-1427] Add Capacity metrics for Celeborn ### What changes were proposed in this pull request? As title ### Why are the changes needed? The Celeborn cluster does not currently provide metrics for 'TotalCapacity' and 'TotalFreeCapacity ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA Closes #2521 from RexXiong/CELEBORN-1427. Authored-by: Shuang Signed-off-by: SteNicholas --- METRICS.md | 4 +- assets/grafana/celeborn-dashboard.json | 206 ++++++++++++++++-- common/src/main/proto/TransportMessages.proto | 1 + .../celeborn/common/meta/DeviceInfo.scala | 8 + .../celeborn/common/meta/WorkerInfo.scala | 9 + .../celeborn/common/util/PbSerDeUtils.scala | 2 + .../common/meta/WorkerInfoSuite.scala | 19 +- .../common/util/PbSerDeUtilsTest.scala | 5 + docs/monitoring.md | 2 + .../service/deploy/master/Master.scala | 9 + .../service/deploy/master/MasterSource.scala | 4 + .../deploy/worker/storage/DeviceMonitor.scala | 2 +- .../worker/storage/StorageManager.scala | 1 + 13 files changed, 250 insertions(+), 22 deletions(-) diff --git a/METRICS.md b/METRICS.md index ec40d638b..09532d088 100644 --- a/METRICS.md +++ b/METRICS.md @@ -84,6 +84,8 @@ Here is an example of Grafana dashboard importing. | diskBytesWritten | master and worker | The amount of disk files consumption by each user. | | hdfsFileCount | master and worker | The count of hdfs files consumption by each user. | | hdfsBytesWritten | master and worker | The amount of hdfs files consumption by each user. | +| DeviceCelebornFreeBytes | master and worker | This value means actual usable space of Celeborn for device. | +| DeviceCelebornTotalBytes | master and worker | This value means total space of Celeborn for device. | | WorkerCount | master | The count of active workers. | | LostWorkerCount | master | The count of workers in lost list. | | ExcludedWorkerCount | master | The count of workers in excluded list. | @@ -142,8 +144,6 @@ Here is an example of Grafana dashboard importing. | ActiveMapPartitionCount | worker | This value means count of active map partition reading streams. | | DeviceOSFreeBytes | worker | This value means actual usable space of OS for device monitor. | | DeviceOSTotalBytes | worker | This value means total usable space of OS for device monitor. | -| DeviceCelebornFreeBytes | worker | This value means actual usable space of Celeborn for device monitor. | -| DeviceCelebornTotalBytes | worker | This value means configured usable space of Celeborn for device monitor. | | PotentialConsumeSpeed | worker | This value means speed of potential consumption for congestion control. | | UserProduceSpeed | worker | This value means speed of user production for congestion control. | | WorkerConsumeSpeed | worker | This value means speed of worker consumption for congestion control. | diff --git a/assets/grafana/celeborn-dashboard.json b/assets/grafana/celeborn-dashboard.json index a8fe453dd..f7b4de9cf 100644 --- a/assets/grafana/celeborn-dashboard.json +++ b/assets/grafana/celeborn-dashboard.json @@ -249,6 +249,186 @@ "title": "metrics_RegisteredShuffleCount_Value", "type": "timeseries" }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "description": "Celeborn total device capacity.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 0, + "y": 10 + }, + "id": 185, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "metrics_DeviceCelebornTotalBytes_Value{role=\"Master\"}", + "legendFormat": "${baseLegend}", + "refId": "A" + } + ], + "title": "metrics_DeviceCelebornTotalBytes_Value", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "description": "Celeborn total device free capacity.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 12, + "y": 10 + }, + "id": 186, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "metrics_DeviceCelebornFreeBytes_Value{role=\"Master\"}", + "legendFormat": "${baseLegend}", + "refId": "A" + } + ], + "title": "metrics_DeviceCelebornFreeBytes_Value", + "type": "timeseries" + }, { "datasource": { "type": "prometheus", @@ -310,7 +490,7 @@ "h": 9, "w": 12, "x": 0, - "y": 10 + "y": 19 }, "id": 95, "options": { @@ -349,7 +529,7 @@ "h": 1, "w": 24, "x": 0, - "y": 19 + "y": 1 }, "id": 119, "panels": [ @@ -1188,7 +1368,7 @@ "h": 1, "w": 24, "x": 0, - "y": 60 + "y": 2 }, "id": 28, "panels": [ @@ -2200,7 +2380,7 @@ "h": 1, "w": 24, "x": 0, - "y": 109 + "y": 3 }, "id": 134, "panels": [ @@ -3393,7 +3573,7 @@ "h": 1, "w": 24, "x": 0, - "y": 166 + "y": 4 }, "id": 12, "panels": [ @@ -4222,7 +4402,7 @@ "h": 1, "w": 24, "x": 0, - "y": 207 + "y": 5 }, "id": 10, "panels": [ @@ -4776,7 +4956,7 @@ "h": 1, "w": 24, "x": 0, - "y": 232 + "y": 6 }, "id": 8, "panels": [ @@ -5798,7 +5978,7 @@ "h": 1, "w": 24, "x": 0, - "y": 281 + "y": 7 }, "id": 50, "panels": [ @@ -6354,7 +6534,7 @@ "h": 1, "w": 24, "x": 0, - "y": 306 + "y": 8 }, "id": 157, "panels": [ @@ -6647,7 +6827,7 @@ "h": 1, "w": 24, "x": 0, - "y": 323 + "y": 9 }, "id": 137, "panels": [ @@ -8038,7 +8218,7 @@ "h": 1, "w": 24, "x": 0, - "y": 388 + "y": 10 }, "id": 110, "panels": [ @@ -8234,7 +8414,7 @@ "h": 1, "w": 24, "x": 0, - "y": 397 + "y": 11 }, "id": 123, "panels": [ @@ -8712,7 +8892,7 @@ "h": 1, "w": 24, "x": 0, - "y": 422 + "y": 12 }, "id": 172, "panels": [ diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 7c457bdf7..95b1769d1 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -158,6 +158,7 @@ message PbDiskInfo { int32 status = 5; int64 avgFetchTime = 6; int32 storageType = 7; + int64 totalSpace = 8; } message PbWorkerInfo { diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala index b0438bd1b..0fc2f4984 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala @@ -82,6 +82,7 @@ class DiskInfo( var status: DiskStatus = DiskStatus.HEALTHY var threadCount = 1 var configuredUsableSpace = 0L + var totalSpace = 0L var storageType: StorageInfo.Type = StorageInfo.Type.SSD var maxSlots: Long = 0 lazy val shuffleAllocations = new util.HashMap[String, Integer]() @@ -101,6 +102,11 @@ class DiskInfo( this } + def setTotalSpace(totalSpace: Long): this.type = this.synchronized { + this.totalSpace = totalSpace + this + } + def updateFlushTime(): Unit = { avgFlushTime = flushTimeMetrics.getAverage() } @@ -175,6 +181,7 @@ class DiskInfo( s" shuffleAllocations: ${nonEmptyShuffles.toMap}," + s" mountPoint: $mountPoint," + s" usableSpace: ${Utils.bytesToString(actualUsableSpace)}," + + s" totalSpace: ${Utils.bytesToString(totalSpace)}," + s" avgFlushTime: ${Utils.nanoDurationToString(avgFlushTime)}," + s" avgFetchTime: ${Utils.nanoDurationToString(avgFetchTime)}," + s" activeSlots: $activeSlots," + @@ -286,6 +293,7 @@ object DeviceInfo { conf) val (_, maxUsableSpace, threadCount, storageType) = dirs(0) diskInfo.configuredUsableSpace = maxUsableSpace + diskInfo.totalSpace = maxUsableSpace diskInfo.threadCount = threadCount diskInfo.storageType = storageType deviceInfo.addDiskInfo(diskInfo) diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala index 3a1446be3..a0eedc3f9 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala @@ -188,6 +188,14 @@ class WorkerInfo( diskInfos.asScala.map(_._2.availableSlots()).sum } + def totalSpace(): Long = this.synchronized { + diskInfos.asScala.map(_._2.totalSpace).sum + } + + def totalActualUsableSpace(): Long = this.synchronized { + diskInfos.asScala.map(_._2.actualUsableSpace).sum + } + def updateThenGetDiskInfos( newDiskInfos: java.util.Map[String, DiskInfo], estimatedPartitionSize: Option[Long] = None): util.Map[String, DiskInfo] = this.synchronized { @@ -197,6 +205,7 @@ class WorkerInfo( val curDisk = diskInfos.get(mountPoint) if (curDisk != null) { curDisk.actualUsableSpace = newDisk.actualUsableSpace + curDisk.totalSpace = newDisk.totalSpace // Update master's diskinfo activeslots to worker's value curDisk.activeSlots = newDisk.activeSlots curDisk.avgFlushTime = newDisk.avgFlushTime diff --git a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala index f2ef85c48..cffe76857 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala @@ -72,6 +72,7 @@ object PbSerDeUtils { pbDiskInfo.getAvgFetchTime, pbDiskInfo.getUsedSlots) .setStatus(Utils.toDiskStatus(pbDiskInfo.getStatus)) + .setTotalSpace(pbDiskInfo.getTotalSpace) diskInfo.setStorageType(StorageInfo.typesMap.get(pbDiskInfo.getStorageType)) diskInfo } @@ -85,6 +86,7 @@ object PbSerDeUtils { .setUsedSlots(diskInfo.activeSlots) .setStatus(diskInfo.status.getValue) .setStorageType(diskInfo.storageType.getValue) + .setTotalSpace(diskInfo.totalSpace) .build def fromPbFileInfo(pbFileInfo: PbFileInfo): DiskFileInfo = diff --git a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala index 2dc817c9d..401177e63 100644 --- a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala @@ -246,9 +246,16 @@ class WorkerInfoSuite extends CelebornFunSuite { null) val disks = new util.HashMap[String, DiskInfo]() - disks.put("disk1", new DiskInfo("disk1", Int.MaxValue, 1, 1, 10)) - disks.put("disk2", new DiskInfo("disk2", Int.MaxValue, 2, 2, 20)) - disks.put("disk3", new DiskInfo("disk3", Int.MaxValue, 3, 3, 30)) + val diskInfo1 = new DiskInfo("disk1", Int.MaxValue, 1, 1, 10) + val diskInfo2 = new DiskInfo("disk2", Int.MaxValue, 2, 2, 20) + val diskInfo3 = new DiskInfo("disk3", Int.MaxValue, 3, 3, 30) + diskInfo1.setTotalSpace(Int.MaxValue) + diskInfo2.setTotalSpace(Int.MaxValue) + diskInfo3.setTotalSpace(Int.MaxValue) + + disks.put("disk1", diskInfo1) + disks.put("disk2", diskInfo2) + disks.put("disk3", diskInfo3) val userResourceConsumption = JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption]() userResourceConsumption.put( @@ -341,9 +348,9 @@ class WorkerInfoSuite extends CelebornFunSuite { |SlotsUsed: 60 |LastHeartbeat: 0 |Disks: $placeholder - | DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB, avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30, storageType: SSD) status: HEALTHY dirs $placeholder - | DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB, avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10, storageType: SSD) status: HEALTHY dirs $placeholder - | DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB, avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20, storageType: SSD) status: HEALTHY dirs $placeholder + | DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB, totalSpace: 2048.0 MiB, avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30, storageType: SSD) status: HEALTHY dirs $placeholder + | DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB, totalSpace: 2048.0 MiB, avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10, storageType: SSD) status: HEALTHY dirs $placeholder + | DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB, totalSpace: 2048.0 MiB, avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20, storageType: SSD) status: HEALTHY dirs $placeholder |UserResourceConsumption: $placeholder | UserIdentifier: `tenant1`.`name1`, ResourceConsumption: ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1, hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1, subResourceConsumptions: (application_1697697127390_2171854 -> ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1, hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1, subResourceConsumptions: empty))) |WorkerRef: null diff --git a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala index 5166c7965..840b9896d 100644 --- a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala +++ b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala @@ -50,6 +50,9 @@ class PbSerDeUtilsTest extends CelebornFunSuite { val device = new DeviceInfo("device-a") val diskInfo1 = new DiskInfo("/mnt/disk/0", 1000, 1000, 1000, 1000, files, device) val diskInfo2 = new DiskInfo("/mnt/disk/1", 2000, 2000, 2000, 2000, files, device) + diskInfo1.setTotalSpace(100000000) + diskInfo1.setTotalSpace(200000000) + val diskInfos = new util.HashMap[String, DiskInfo]() diskInfos.put("disk1", diskInfo1) diskInfos.put("disk2", diskInfo2) @@ -170,6 +173,8 @@ class PbSerDeUtilsTest extends CelebornFunSuite { assert(restoredDiskInfo.avgFlushTime.equals(diskInfo1.avgFlushTime)) assert(restoredDiskInfo.avgFetchTime.equals(diskInfo1.avgFetchTime)) assert(restoredDiskInfo.activeSlots.equals(diskInfo1.activeSlots)) + assert(restoredDiskInfo.totalSpace.equals(diskInfo1.totalSpace)) + assert(restoredDiskInfo.dirs.equals(List.empty)) assert(restoredDiskInfo.deviceInfo == null) } diff --git a/docs/monitoring.md b/docs/monitoring.md index 098a8f910..e9a1f3f56 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -92,6 +92,8 @@ These metrics are exposed by Celeborn master. - namespace=master - RegisteredShuffleCount + - DeviceCelebornFreeBytes + - DeviceCelebornTotalBytes - RunningApplicationCount - ActiveShuffleSize - The active shuffle size of workers. diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index b7f51b00b..9fb5d594b 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -247,6 +247,15 @@ private[celeborn] class Master( }).sum() }).sum() } + + masterSource.addGauge(MasterSource.DEVICE_CELEBORN_TOTAL_CAPACITY) { () => + statusSystem.workers.asScala.map(_.totalSpace()).sum + } + + masterSource.addGauge(MasterSource.DEVICE_CELEBORN_FREE_CAPACITY) { () => + statusSystem.workers.asScala.map(_.totalActualUsableSpace()).sum + } + masterSource.addGauge(MasterSource.IS_ACTIVE_MASTER) { () => isMasterActive } private val threadsStarted: AtomicBoolean = new AtomicBoolean(false) diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala index b0b3e008e..94ef037be 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala @@ -53,4 +53,8 @@ object MasterSource { val ACTIVE_SHUFFLE_FILE_COUNT = "ActiveShuffleFileCount" val OFFER_SLOTS_TIME = "OfferSlotsTime" + + // Capacity + val DEVICE_CELEBORN_FREE_CAPACITY = "DeviceCelebornFreeBytes" + val DEVICE_CELEBORN_TOTAL_CAPACITY = "DeviceCelebornTotalBytes" } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala index 7efa568b1..ae3304308 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala @@ -94,7 +94,7 @@ class LocalDeviceMonitor( usage.freeSpace } workerSource.addGauge(WorkerSource.DEVICE_CELEBORN_TOTAL_CAPACITY, deviceLabel) { () => - diskInfos.map(_.configuredUsableSpace).sum + diskInfos.map(_.totalSpace).sum } workerSource.addGauge(WorkerSource.DEVICE_CELEBORN_FREE_CAPACITY, deviceLabel) { () => diskInfos.map(_.actualUsableSpace).sum diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 194e13378..82e0f2ef1 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -726,6 +726,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs Math.min(diskInfo.configuredUsableSpace - totalUsage, fileSystemReportedUsableSpace) logDebug(s"updateDiskInfos workingDirUsableSpace:$workingDirUsableSpace filemeta:$fileSystemReportedUsableSpace conf:${diskInfo.configuredUsableSpace} totalUsage:$totalUsage") diskInfo.setUsableSpace(workingDirUsableSpace) + diskInfo.setTotalSpace(totalUsage + workingDirUsableSpace) diskInfo.updateFlushTime() diskInfo.updateFetchTime() }