[CELEBORN-1727] Correct the calculation of worker diskInfo actualUsableSpace

### What changes were proposed in this pull request?
Correct the calculation of worker diskInfo actualUsableSpace.
Make the expression of the function to get the reserve size clearer. (`getMinimumUsableSize` -> `getActualReserveSize`).
Let deviceMonitor startCheck after the first `storageManager.updateDiskInfos()` to avoid disks from being misidentified as HIGH_DISK_USAGE.
Fix PushDataHandler#checkDiskFull judge.

### Why are the changes needed?
Make sure worker disk reserve work correctly.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Cluster test and UT.

Closes #2931 from onebox-li/fix-disk-usablespace.

Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
onebox-li 2024-11-21 16:17:46 +08:00 committed by mingji
parent bb96a5f31f
commit 351173bacd
7 changed files with 91 additions and 76 deletions

View File

@ -27,28 +27,28 @@ import org.apache.celeborn.common.meta.DiskInfo
object DiskUtils {
/**
* Gets the minimum usable size for each disk, which size is the max space between the reserved space
* and the space calculate via reserved ratio.
* Get the actual size that the disk needs to reserve. It will take the larger value between
* the configured fixed reserved size and the size calculated by the reserved ratio.
*
* @param diskInfo The reserved disk info.
* @param diskReserveSize The reserved space for each disk.
* @param diskReserveRatio The reserved ratio for each disk.
* @return the minimum usable space.
* @param diskInfo The disk info being calculated.
* @param diskReserveSize The configured fixed reserved size space for the disk.
* @param diskReserveRatio The configured reserved ratio for the disk.
* @return the actual size (in bytes) that the disk needs to reserve.
*/
def getMinimumUsableSize(
def getActualReserveSize(
diskInfo: DiskInfo,
diskReserveSize: Long,
diskReserveRatio: Option[Double]): Long = {
var minimumUsableSize = diskReserveSize
var actualReserveSize = diskReserveSize
if (diskReserveRatio.isDefined) {
try {
val totalSpace = Files.getFileStore(Paths.get(diskInfo.mountPoint)).getTotalSpace
minimumUsableSize =
BigDecimal(totalSpace * diskReserveRatio.get).longValue.max(minimumUsableSize)
actualReserveSize =
BigDecimal(totalSpace * diskReserveRatio.get).longValue.max(actualReserveSize)
} catch {
case _: Exception => // Do nothing
}
}
minimumUsableSize
actualReserveSize
}
}

View File

@ -60,7 +60,6 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
val combineResult = combine(sparkSession)
val groupByResult = groupBy(sparkSession)
val repartitionResult = repartition(sparkSession)
val sqlResult = runsql(sparkSession)
sparkSession.stop()
val sparkSessionEnableCeleborn = SparkSession.builder()
@ -69,16 +68,14 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
val celebornCombineResult = combine(sparkSessionEnableCeleborn)
val celebornGroupByResult = groupBy(sparkSessionEnableCeleborn)
val celebornRepartitionResult = repartition(sparkSessionEnableCeleborn)
val celebornSqlResult = runsql(sparkSessionEnableCeleborn)
assert(combineResult.equals(celebornCombineResult))
assert(groupByResult.equals(celebornGroupByResult))
assert(repartitionResult.equals(celebornRepartitionResult))
assert(combineResult.equals(celebornCombineResult))
assert(sqlResult.equals(celebornSqlResult))
// shuffle key not expired, diskInfo.actualUsableSpace <= 0, no space
workers.foreach { worker =>
worker.storageManager.updateDiskInfos()
worker.storageManager.disksSnapshot().foreach { diskInfo =>
assert(diskInfo.actualUsableSpace <= 0)
}

View File

@ -21,7 +21,6 @@ import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray}
import scala.collection.JavaConverters._
import scala.concurrent.{Await, Promise}
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}
@ -42,7 +41,7 @@ import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMod
import org.apache.celeborn.common.protocol.PbPartitionLocation.Mode
import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.common.unsafe.Platform
import org.apache.celeborn.common.util.{DiskUtils, ExceptionUtils, Utils}
import org.apache.celeborn.common.util.{ExceptionUtils, Utils}
import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
import org.apache.celeborn.service.deploy.worker.storage.{HdfsFlusher, LocalFlusher, MapPartitionDataWriter, PartitionDataWriter, S3Flusher, StorageManager}
import org.apache.celeborn.service.deploy.worker.storage.segment.SegmentMapPartitionFileWriter
@ -58,9 +57,6 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
private var replicateClientFactory: TransportClientFactory = _
private var registered: Option[AtomicBoolean] = None
private var workerInfo: WorkerInfo = _
private var diskReserveSize: Long = _
private var diskReserveRatio: Option[Double] = _
private var diskUsableSizes: Map[String, Long] = _
private var partitionSplitMinimumSize: Long = _
private var partitionSplitMaximumSize: Long = _
private var shutdown: AtomicBoolean = _
@ -80,11 +76,6 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
unavailablePeers = worker.unavailablePeers
replicateClientFactory = worker.replicateClientFactory
workerInfo = worker.workerInfo
diskReserveSize = worker.conf.workerDiskReserveSize
diskReserveRatio = worker.conf.workerDiskReserveRatio
diskUsableSizes = workerInfo.diskInfos.asScala.map { case (mountPoint, diskInfo) =>
(mountPoint, DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize, diskReserveRatio))
}.toMap
partitionSplitMinimumSize = worker.conf.partitionSplitMinimumSize
partitionSplitMaximumSize = worker.conf.partitionSplitMaximumSize
storageManager = worker.storageManager
@ -95,8 +86,6 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
testPushPrimaryDataTimeout = worker.conf.testPushPrimaryDataTimeout
testPushReplicaDataTimeout = worker.conf.testPushReplicaDataTimeout
registered = Some(worker.registered)
logInfo(
s"diskReserveSize ${Utils.bytesToString(diskReserveSize)}, diskReserveRatio ${diskReserveRatio.orNull}")
}
override def receive(
@ -1230,8 +1219,8 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
}
val mountPoint = fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint
val diskInfo = workerInfo.diskInfos.get(mountPoint)
val diskFull = diskInfo.status.equals(
DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace < diskUsableSizes(mountPoint)
val diskFull =
diskInfo.status.equals(DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace <= 0
diskFull
}

View File

@ -274,6 +274,7 @@ private[celeborn] class Worker(
assert(replicatePort > 0, "worker replica bind port should be positive")
storageManager.updateDiskInfos()
storageManager.startDeviceMonitor()
// WorkerInfo's diskInfos is a reference to storageManager.diskInfos
val diskInfos = JavaUtils.newConcurrentHashMap[String, DiskInfo]()

View File

@ -254,18 +254,18 @@ object DeviceMonitor extends Logging {
tryWithTimeoutAndCallback({
val usage = getDiskUsageInfos(diskInfo)
// assume no single device capacity exceeds 1EB in this era
val minimumUsableSize = DiskUtils.getMinimumUsableSize(
val actualReserveSize = DiskUtils.getActualReserveSize(
diskInfo,
conf.workerDiskReserveSize,
conf.workerDiskReserveRatio)
val highDiskUsage =
usage.freeSpace < minimumUsableSize || diskInfo.actualUsableSpace <= 0
usage.freeSpace < actualReserveSize || diskInfo.actualUsableSpace <= 0
if (highDiskUsage) {
logWarning(s"${diskInfo.mountPoint} usage is above threshold." +
s" Disk usage(Report by OS):{total:${Utils.bytesToString(usage.totalSpace)}," +
s" free:${Utils.bytesToString(usage.freeSpace)}, used_percent:${usage.usedPercent}} " +
s"usage(Report by Celeborn):{" +
s"total:${Utils.bytesToString(diskInfo.configuredUsableSpace)}" +
s" Disk usage(Report by OS): {total:${Utils.bytesToString(usage.totalSpace)}," +
s" free:${Utils.bytesToString(usage.freeSpace)}, used_percent:${usage.usedPercent}}," +
s" usage(Report by Celeborn): {" +
s" total:${Utils.bytesToString(diskInfo.configuredUsableSpace)}," +
s" free:${Utils.bytesToString(diskInfo.actualUsableSpace)} }")
}
highDiskUsage

View File

@ -160,8 +160,6 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
(flushers, totalThread)
}
deviceMonitor.startCheck()
val hdfsDir = conf.hdfsDir
val s3Dir = conf.s3Dir
val hdfsPermission = new FsPermission("755")
@ -890,15 +888,17 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
try {
val (fileSystemReportedUsableSpace, fileSystemReportedTotalSpace) =
getFileSystemReportedSpace(diskInfo.mountPoint)
val actualReserveSize =
DiskUtils.getActualReserveSize(diskInfo, diskReserveSize, diskReserveRatio)
val workingDirUsableSpace =
Math.min(diskInfo.configuredUsableSpace - totalUsage, fileSystemReportedUsableSpace)
val minimumReserveSize =
DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize, diskReserveRatio)
val usableSpace = Math.max(workingDirUsableSpace - minimumReserveSize, 0)
Math.min(
diskInfo.configuredUsableSpace - totalUsage,
fileSystemReportedUsableSpace - actualReserveSize)
val usableSpace = Math.max(workingDirUsableSpace, 0)
logDebug(
s"Update diskInfo:${diskInfo.mountPoint} workingDirUsableSpace:$workingDirUsableSpace fileMeta:$fileSystemReportedUsableSpace" +
s"conf:${diskInfo.configuredUsableSpace} totalUsage:$totalUsage totalSpace:$fileSystemReportedTotalSpace" +
s"minimumReserveSize:$minimumReserveSize usableSpace:$usableSpace")
s"Update diskInfo:${diskInfo.mountPoint} workingDirUsableSpace:$workingDirUsableSpace fileMeta:$fileSystemReportedUsableSpace " +
s"configuredUsableSpace:${diskInfo.configuredUsableSpace} totalUsage:$totalUsage totalSpace:$fileSystemReportedTotalSpace " +
s"actualReserveSize:$actualReserveSize usableSpace:$usableSpace")
diskInfo.setUsableSpace(usableSpace)
diskInfo.setTotalSpace(fileSystemReportedTotalSpace)
} catch {
@ -1170,6 +1170,11 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
}
throw exception
}
def startDeviceMonitor(): Unit = {
deviceMonitor.startCheck()
}
}
object StorageManager {

View File

@ -23,9 +23,9 @@ import org.mockito.stubbing.Stubber
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.CelebornConf.{WORKER_GRACEFUL_SHUTDOWN_ENABLED, WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH}
import org.apache.celeborn.common.CelebornConf.{WORKER_DISK_RESERVE_SIZE, WORKER_GRACEFUL_SHUTDOWN_ENABLED, WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH}
import org.apache.celeborn.common.meta.DiskInfo
import org.apache.celeborn.common.util.DiskUtils
import org.apache.celeborn.common.util.Utils
import org.apache.celeborn.service.deploy.worker.WorkerSource
trait MockitoHelper extends MockitoSugar {
@ -36,8 +36,6 @@ trait MockitoHelper extends MockitoSugar {
class StorageManagerSuite extends CelebornFunSuite with MockitoHelper {
val conf = new CelebornConf()
test("[CELEBORN-926] saveAllCommittedFileInfosToDB cause IllegalMonitorStateException") {
val conf = new CelebornConf().set(WORKER_GRACEFUL_SHUTDOWN_ENABLED, true)
.set(WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH, "/tmp/recover")
@ -47,42 +45,67 @@ class StorageManagerSuite extends CelebornFunSuite with MockitoHelper {
}
test("updateDiskInfosWithDiskReserveSize") {
// reserve size set to 5g
val conf = new CelebornConf().set(WORKER_DISK_RESERVE_SIZE, Utils.byteStringAsBytes("5g"))
val storageManager = new StorageManager(conf, new WorkerSource(conf))
val spyStorageManager = spy(storageManager)
val disks = prepareDisks()
val diskSetSpace = (80 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L)
doReturn(disks).when(spyStorageManager).disksSnapshot()
doReturn(diskSetSpace).when(spyStorageManager).getFileSystemReportedSpace(any)
val diskInfo = new DiskInfo("/mnt/disk1", List.empty, null, conf)
diskInfo.setUsableSpace(-1L)
var diskSetSpace = (0L, 0L)
doReturn(List(diskInfo)).when(spyStorageManager).disksSnapshot()
doAnswer(diskSetSpace).when(spyStorageManager).getFileSystemReportedSpace(any)
// disk usable 80g, total 80g, worker config 8EB
diskSetSpace = (80 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L)
diskInfo.configuredUsableSpace = Long.MaxValue
spyStorageManager.updateDiskInfos()
for (disk <- disks) {
val minimumReserveSize =
DiskUtils.getMinimumUsableSize(
disk,
conf.workerDiskReserveSize,
conf.workerDiskReserveRatio)
assert(disk.actualUsableSpace == diskSetSpace._1 - minimumReserveSize)
}
}
assert(diskInfo.actualUsableSpace == 75 * 1024 * 1024 * 1024L)
def prepareDisks(): List[DiskInfo] = {
val diskSetSpaces = Array(
90L * 1024 * 1024 * 1024,
95L * 1024 * 1024 * 1024,
100L * 1024 * 1024 * 1024)
// disk usable 80g, total 80g, worker config 50g
diskInfo.configuredUsableSpace = 50 * 1024 * 1024 * 1024L
diskInfo.setUsableSpace(-1L)
spyStorageManager.updateDiskInfos()
assert(diskInfo.actualUsableSpace == 50 * 1024 * 1024 * 1024L)
val diskInfo1 = new DiskInfo("/mnt/disk1", List.empty, null, conf)
diskInfo1.configuredUsableSpace = (Long.MaxValue)
diskInfo1.setUsableSpace(diskSetSpaces(0))
// disk usable 10g, total 80g, worker config 20g
diskSetSpace = (10 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L)
diskInfo.configuredUsableSpace = 20 * 1024 * 1024 * 1024L
diskInfo.setUsableSpace(-1L)
spyStorageManager.updateDiskInfos()
assert(diskInfo.actualUsableSpace == 5 * 1024 * 1024 * 1024L)
val diskInfo2 = new DiskInfo("/mnt/disk2", List.empty, null, conf)
diskInfo2.configuredUsableSpace = (Long.MaxValue)
diskInfo2.setUsableSpace(diskSetSpaces(1))
// disk usable 10g, total 80g, worker config 5g
diskInfo.configuredUsableSpace = 5 * 1024 * 1024 * 1024L
diskInfo.setUsableSpace(-1L)
spyStorageManager.updateDiskInfos()
assert(diskInfo.actualUsableSpace == 5 * 1024 * 1024 * 1024L)
val diskInfo3 = new DiskInfo("/mnt/disk3", List.empty, null, conf)
diskInfo3.configuredUsableSpace = (Long.MaxValue)
diskInfo3.setUsableSpace(diskSetSpaces(2))
// disk usable 5g, total 80g, worker config 20g
diskSetSpace = (5 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L)
diskInfo.configuredUsableSpace = 20 * 1024 * 1024 * 1024L
diskInfo.setUsableSpace(-1L)
spyStorageManager.updateDiskInfos()
assert(diskInfo.actualUsableSpace == 0L)
List(diskInfo1, diskInfo2, diskInfo3)
// disk usable 5g, total 80g, worker config 5g
diskInfo.configuredUsableSpace = 5 * 1024 * 1024 * 1024L
diskInfo.setUsableSpace(-1L)
spyStorageManager.updateDiskInfos()
assert(diskInfo.actualUsableSpace == 0L)
// disk usable 1g, total 80g, worker config 20g
diskSetSpace = (1 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L)
diskInfo.configuredUsableSpace = 20 * 1024 * 1024 * 1024L
diskInfo.setUsableSpace(-1L)
spyStorageManager.updateDiskInfos()
assert(diskInfo.actualUsableSpace == 0L)
// disk usable 1g, total 80g, worker config 5g
diskInfo.configuredUsableSpace = 5 * 1024 * 1024 * 1024L
diskInfo.setUsableSpace(-1L)
spyStorageManager.updateDiskInfos()
assert(diskInfo.actualUsableSpace == 0L)
}
}