[CELEBORN-1427] Add Capacity metrics for Celeborn

### What changes were proposed in this pull request?
As title

### Why are the changes needed?
The Celeborn cluster does not currently provide metrics for 'TotalCapacity' and 'TotalFreeCapacity

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA

Closes #2521 from RexXiong/CELEBORN-1427.

Authored-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Signed-off-by: SteNicholas <programgeek@163.com>
This commit is contained in:
Shuang 2024-05-23 16:06:11 +08:00 committed by SteNicholas
parent ba499704d1
commit 308eed28c9
13 changed files with 250 additions and 22 deletions

View File

@ -84,6 +84,8 @@ Here is an example of Grafana dashboard importing.
| diskBytesWritten | master and worker | The amount of disk files consumption by each user. |
| hdfsFileCount | master and worker | The count of hdfs files consumption by each user. |
| hdfsBytesWritten | master and worker | The amount of hdfs files consumption by each user. |
| DeviceCelebornFreeBytes | master and worker | This value means actual usable space of Celeborn for device. |
| DeviceCelebornTotalBytes | master and worker | This value means total space of Celeborn for device. |
| WorkerCount | master | The count of active workers. |
| LostWorkerCount | master | The count of workers in lost list. |
| ExcludedWorkerCount | master | The count of workers in excluded list. |
@ -142,8 +144,6 @@ Here is an example of Grafana dashboard importing.
| ActiveMapPartitionCount | worker | This value means count of active map partition reading streams. |
| DeviceOSFreeBytes | worker | This value means actual usable space of OS for device monitor. |
| DeviceOSTotalBytes | worker | This value means total usable space of OS for device monitor. |
| DeviceCelebornFreeBytes | worker | This value means actual usable space of Celeborn for device monitor. |
| DeviceCelebornTotalBytes | worker | This value means configured usable space of Celeborn for device monitor. |
| PotentialConsumeSpeed | worker | This value means speed of potential consumption for congestion control. |
| UserProduceSpeed | worker | This value means speed of user production for congestion control. |
| WorkerConsumeSpeed | worker | This value means speed of worker consumption for congestion control. |

View File

@ -249,6 +249,186 @@
"title": "metrics_RegisteredShuffleCount_Value",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Celeborn total device capacity.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "bytes"
},
"overrides": []
},
"gridPos": {
"h": 9,
"w": 12,
"x": 0,
"y": 10
},
"id": 185,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "metrics_DeviceCelebornTotalBytes_Value{role=\"Master\"}",
"legendFormat": "${baseLegend}",
"refId": "A"
}
],
"title": "metrics_DeviceCelebornTotalBytes_Value",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Celeborn total device free capacity.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "bytes"
},
"overrides": []
},
"gridPos": {
"h": 9,
"w": 12,
"x": 12,
"y": 10
},
"id": 186,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "metrics_DeviceCelebornFreeBytes_Value{role=\"Master\"}",
"legendFormat": "${baseLegend}",
"refId": "A"
}
],
"title": "metrics_DeviceCelebornFreeBytes_Value",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
@ -310,7 +490,7 @@
"h": 9,
"w": 12,
"x": 0,
"y": 10
"y": 19
},
"id": 95,
"options": {
@ -349,7 +529,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 19
"y": 1
},
"id": 119,
"panels": [
@ -1188,7 +1368,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 60
"y": 2
},
"id": 28,
"panels": [
@ -2200,7 +2380,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 109
"y": 3
},
"id": 134,
"panels": [
@ -3393,7 +3573,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 166
"y": 4
},
"id": 12,
"panels": [
@ -4222,7 +4402,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 207
"y": 5
},
"id": 10,
"panels": [
@ -4776,7 +4956,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 232
"y": 6
},
"id": 8,
"panels": [
@ -5798,7 +5978,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 281
"y": 7
},
"id": 50,
"panels": [
@ -6354,7 +6534,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 306
"y": 8
},
"id": 157,
"panels": [
@ -6647,7 +6827,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 323
"y": 9
},
"id": 137,
"panels": [
@ -8038,7 +8218,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 388
"y": 10
},
"id": 110,
"panels": [
@ -8234,7 +8414,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 397
"y": 11
},
"id": 123,
"panels": [
@ -8712,7 +8892,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 422
"y": 12
},
"id": 172,
"panels": [

View File

@ -158,6 +158,7 @@ message PbDiskInfo {
int32 status = 5;
int64 avgFetchTime = 6;
int32 storageType = 7;
int64 totalSpace = 8;
}
message PbWorkerInfo {

View File

@ -82,6 +82,7 @@ class DiskInfo(
var status: DiskStatus = DiskStatus.HEALTHY
var threadCount = 1
var configuredUsableSpace = 0L
var totalSpace = 0L
var storageType: StorageInfo.Type = StorageInfo.Type.SSD
var maxSlots: Long = 0
lazy val shuffleAllocations = new util.HashMap[String, Integer]()
@ -101,6 +102,11 @@ class DiskInfo(
this
}
def setTotalSpace(totalSpace: Long): this.type = this.synchronized {
this.totalSpace = totalSpace
this
}
def updateFlushTime(): Unit = {
avgFlushTime = flushTimeMetrics.getAverage()
}
@ -175,6 +181,7 @@ class DiskInfo(
s" shuffleAllocations: ${nonEmptyShuffles.toMap}," +
s" mountPoint: $mountPoint," +
s" usableSpace: ${Utils.bytesToString(actualUsableSpace)}," +
s" totalSpace: ${Utils.bytesToString(totalSpace)}," +
s" avgFlushTime: ${Utils.nanoDurationToString(avgFlushTime)}," +
s" avgFetchTime: ${Utils.nanoDurationToString(avgFetchTime)}," +
s" activeSlots: $activeSlots," +
@ -286,6 +293,7 @@ object DeviceInfo {
conf)
val (_, maxUsableSpace, threadCount, storageType) = dirs(0)
diskInfo.configuredUsableSpace = maxUsableSpace
diskInfo.totalSpace = maxUsableSpace
diskInfo.threadCount = threadCount
diskInfo.storageType = storageType
deviceInfo.addDiskInfo(diskInfo)

View File

@ -188,6 +188,14 @@ class WorkerInfo(
diskInfos.asScala.map(_._2.availableSlots()).sum
}
def totalSpace(): Long = this.synchronized {
diskInfos.asScala.map(_._2.totalSpace).sum
}
def totalActualUsableSpace(): Long = this.synchronized {
diskInfos.asScala.map(_._2.actualUsableSpace).sum
}
def updateThenGetDiskInfos(
newDiskInfos: java.util.Map[String, DiskInfo],
estimatedPartitionSize: Option[Long] = None): util.Map[String, DiskInfo] = this.synchronized {
@ -197,6 +205,7 @@ class WorkerInfo(
val curDisk = diskInfos.get(mountPoint)
if (curDisk != null) {
curDisk.actualUsableSpace = newDisk.actualUsableSpace
curDisk.totalSpace = newDisk.totalSpace
// Update master's diskinfo activeslots to worker's value
curDisk.activeSlots = newDisk.activeSlots
curDisk.avgFlushTime = newDisk.avgFlushTime

View File

@ -72,6 +72,7 @@ object PbSerDeUtils {
pbDiskInfo.getAvgFetchTime,
pbDiskInfo.getUsedSlots)
.setStatus(Utils.toDiskStatus(pbDiskInfo.getStatus))
.setTotalSpace(pbDiskInfo.getTotalSpace)
diskInfo.setStorageType(StorageInfo.typesMap.get(pbDiskInfo.getStorageType))
diskInfo
}
@ -85,6 +86,7 @@ object PbSerDeUtils {
.setUsedSlots(diskInfo.activeSlots)
.setStatus(diskInfo.status.getValue)
.setStorageType(diskInfo.storageType.getValue)
.setTotalSpace(diskInfo.totalSpace)
.build
def fromPbFileInfo(pbFileInfo: PbFileInfo): DiskFileInfo =

View File

@ -246,9 +246,16 @@ class WorkerInfoSuite extends CelebornFunSuite {
null)
val disks = new util.HashMap[String, DiskInfo]()
disks.put("disk1", new DiskInfo("disk1", Int.MaxValue, 1, 1, 10))
disks.put("disk2", new DiskInfo("disk2", Int.MaxValue, 2, 2, 20))
disks.put("disk3", new DiskInfo("disk3", Int.MaxValue, 3, 3, 30))
val diskInfo1 = new DiskInfo("disk1", Int.MaxValue, 1, 1, 10)
val diskInfo2 = new DiskInfo("disk2", Int.MaxValue, 2, 2, 20)
val diskInfo3 = new DiskInfo("disk3", Int.MaxValue, 3, 3, 30)
diskInfo1.setTotalSpace(Int.MaxValue)
diskInfo2.setTotalSpace(Int.MaxValue)
diskInfo3.setTotalSpace(Int.MaxValue)
disks.put("disk1", diskInfo1)
disks.put("disk2", diskInfo2)
disks.put("disk3", diskInfo3)
val userResourceConsumption =
JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption]()
userResourceConsumption.put(
@ -341,9 +348,9 @@ class WorkerInfoSuite extends CelebornFunSuite {
|SlotsUsed: 60
|LastHeartbeat: 0
|Disks: $placeholder
| DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB, avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30, storageType: SSD) status: HEALTHY dirs $placeholder
| DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB, avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10, storageType: SSD) status: HEALTHY dirs $placeholder
| DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB, avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20, storageType: SSD) status: HEALTHY dirs $placeholder
| DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB, totalSpace: 2048.0 MiB, avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30, storageType: SSD) status: HEALTHY dirs $placeholder
| DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB, totalSpace: 2048.0 MiB, avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10, storageType: SSD) status: HEALTHY dirs $placeholder
| DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB, totalSpace: 2048.0 MiB, avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20, storageType: SSD) status: HEALTHY dirs $placeholder
|UserResourceConsumption: $placeholder
| UserIdentifier: `tenant1`.`name1`, ResourceConsumption: ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1, hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1, subResourceConsumptions: (application_1697697127390_2171854 -> ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1, hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1, subResourceConsumptions: empty)))
|WorkerRef: null

View File

@ -50,6 +50,9 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
val device = new DeviceInfo("device-a")
val diskInfo1 = new DiskInfo("/mnt/disk/0", 1000, 1000, 1000, 1000, files, device)
val diskInfo2 = new DiskInfo("/mnt/disk/1", 2000, 2000, 2000, 2000, files, device)
diskInfo1.setTotalSpace(100000000)
diskInfo1.setTotalSpace(200000000)
val diskInfos = new util.HashMap[String, DiskInfo]()
diskInfos.put("disk1", diskInfo1)
diskInfos.put("disk2", diskInfo2)
@ -170,6 +173,8 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
assert(restoredDiskInfo.avgFlushTime.equals(diskInfo1.avgFlushTime))
assert(restoredDiskInfo.avgFetchTime.equals(diskInfo1.avgFetchTime))
assert(restoredDiskInfo.activeSlots.equals(diskInfo1.activeSlots))
assert(restoredDiskInfo.totalSpace.equals(diskInfo1.totalSpace))
assert(restoredDiskInfo.dirs.equals(List.empty))
assert(restoredDiskInfo.deviceInfo == null)
}

View File

@ -92,6 +92,8 @@ These metrics are exposed by Celeborn master.
- namespace=master
- RegisteredShuffleCount
- DeviceCelebornFreeBytes
- DeviceCelebornTotalBytes
- RunningApplicationCount
- ActiveShuffleSize
- The active shuffle size of workers.

View File

@ -247,6 +247,15 @@ private[celeborn] class Master(
}).sum()
}).sum()
}
masterSource.addGauge(MasterSource.DEVICE_CELEBORN_TOTAL_CAPACITY) { () =>
statusSystem.workers.asScala.map(_.totalSpace()).sum
}
masterSource.addGauge(MasterSource.DEVICE_CELEBORN_FREE_CAPACITY) { () =>
statusSystem.workers.asScala.map(_.totalActualUsableSpace()).sum
}
masterSource.addGauge(MasterSource.IS_ACTIVE_MASTER) { () => isMasterActive }
private val threadsStarted: AtomicBoolean = new AtomicBoolean(false)

View File

@ -53,4 +53,8 @@ object MasterSource {
val ACTIVE_SHUFFLE_FILE_COUNT = "ActiveShuffleFileCount"
val OFFER_SLOTS_TIME = "OfferSlotsTime"
// Capacity
val DEVICE_CELEBORN_FREE_CAPACITY = "DeviceCelebornFreeBytes"
val DEVICE_CELEBORN_TOTAL_CAPACITY = "DeviceCelebornTotalBytes"
}

View File

@ -94,7 +94,7 @@ class LocalDeviceMonitor(
usage.freeSpace
}
workerSource.addGauge(WorkerSource.DEVICE_CELEBORN_TOTAL_CAPACITY, deviceLabel) { () =>
diskInfos.map(_.configuredUsableSpace).sum
diskInfos.map(_.totalSpace).sum
}
workerSource.addGauge(WorkerSource.DEVICE_CELEBORN_FREE_CAPACITY, deviceLabel) { () =>
diskInfos.map(_.actualUsableSpace).sum

View File

@ -726,6 +726,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
Math.min(diskInfo.configuredUsableSpace - totalUsage, fileSystemReportedUsableSpace)
logDebug(s"updateDiskInfos workingDirUsableSpace:$workingDirUsableSpace filemeta:$fileSystemReportedUsableSpace conf:${diskInfo.configuredUsableSpace} totalUsage:$totalUsage")
diskInfo.setUsableSpace(workingDirUsableSpace)
diskInfo.setTotalSpace(totalUsage + workingDirUsableSpace)
diskInfo.updateFlushTime()
diskInfo.updateFetchTime()
}