diff --git a/common/src/main/scala/org/apache/celeborn/common/util/DiskUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/DiskUtils.scala index 2e361a985..b4972b0f0 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/DiskUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/DiskUtils.scala @@ -27,28 +27,28 @@ import org.apache.celeborn.common.meta.DiskInfo object DiskUtils { /** - * Gets the minimum usable size for each disk, which size is the max space between the reserved space - * and the space calculate via reserved ratio. + * Get the actual size that the disk needs to reserve. It will take the larger value between + * the configured fixed reserved size and the size calculated by the reserved ratio. * - * @param diskInfo The reserved disk info. - * @param diskReserveSize The reserved space for each disk. - * @param diskReserveRatio The reserved ratio for each disk. - * @return the minimum usable space. + * @param diskInfo The disk info being calculated. + * @param diskReserveSize The configured fixed reserved size space for the disk. + * @param diskReserveRatio The configured reserved ratio for the disk. + * @return the actual size (in bytes) that the disk needs to reserve. */ - def getMinimumUsableSize( + def getActualReserveSize( diskInfo: DiskInfo, diskReserveSize: Long, diskReserveRatio: Option[Double]): Long = { - var minimumUsableSize = diskReserveSize + var actualReserveSize = diskReserveSize if (diskReserveRatio.isDefined) { try { val totalSpace = Files.getFileStore(Paths.get(diskInfo.mountPoint)).getTotalSpace - minimumUsableSize = - BigDecimal(totalSpace * diskReserveRatio.get).longValue.max(minimumUsableSize) + actualReserveSize = + BigDecimal(totalSpace * diskReserveRatio.get).longValue.max(actualReserveSize) } catch { case _: Exception => // Do nothing } } - minimumUsableSize + actualReserveSize } } diff --git a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala index 5b461013d..eafabbbe6 100644 --- a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala +++ b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala @@ -60,7 +60,6 @@ class CelebornHashCheckDiskSuite extends SparkTestBase { val combineResult = combine(sparkSession) val groupByResult = groupBy(sparkSession) val repartitionResult = repartition(sparkSession) - val sqlResult = runsql(sparkSession) sparkSession.stop() val sparkSessionEnableCeleborn = SparkSession.builder() @@ -69,16 +68,14 @@ class CelebornHashCheckDiskSuite extends SparkTestBase { val celebornCombineResult = combine(sparkSessionEnableCeleborn) val celebornGroupByResult = groupBy(sparkSessionEnableCeleborn) val celebornRepartitionResult = repartition(sparkSessionEnableCeleborn) - val celebornSqlResult = runsql(sparkSessionEnableCeleborn) assert(combineResult.equals(celebornCombineResult)) assert(groupByResult.equals(celebornGroupByResult)) assert(repartitionResult.equals(celebornRepartitionResult)) - assert(combineResult.equals(celebornCombineResult)) - assert(sqlResult.equals(celebornSqlResult)) // shuffle key not expired, diskInfo.actualUsableSpace <= 0, no space workers.foreach { worker => + worker.storageManager.updateDiskInfos() worker.storageManager.disksSnapshot().foreach { diskInfo => assert(diskInfo.actualUsableSpace <= 0) } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index b6ddff261..af6d4cc66 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -21,7 +21,6 @@ import java.nio.ByteBuffer import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor} import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray} -import scala.collection.JavaConverters._ import scala.concurrent.{Await, Promise} import scala.concurrent.duration.Duration import scala.util.{Failure, Success, Try} @@ -42,7 +41,7 @@ import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMod import org.apache.celeborn.common.protocol.PbPartitionLocation.Mode import org.apache.celeborn.common.protocol.message.StatusCode import org.apache.celeborn.common.unsafe.Platform -import org.apache.celeborn.common.util.{DiskUtils, ExceptionUtils, Utils} +import org.apache.celeborn.common.util.{ExceptionUtils, Utils} import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController import org.apache.celeborn.service.deploy.worker.storage.{HdfsFlusher, LocalFlusher, MapPartitionDataWriter, PartitionDataWriter, S3Flusher, StorageManager} import org.apache.celeborn.service.deploy.worker.storage.segment.SegmentMapPartitionFileWriter @@ -58,9 +57,6 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler private var replicateClientFactory: TransportClientFactory = _ private var registered: Option[AtomicBoolean] = None private var workerInfo: WorkerInfo = _ - private var diskReserveSize: Long = _ - private var diskReserveRatio: Option[Double] = _ - private var diskUsableSizes: Map[String, Long] = _ private var partitionSplitMinimumSize: Long = _ private var partitionSplitMaximumSize: Long = _ private var shutdown: AtomicBoolean = _ @@ -80,11 +76,6 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler unavailablePeers = worker.unavailablePeers replicateClientFactory = worker.replicateClientFactory workerInfo = worker.workerInfo - diskReserveSize = worker.conf.workerDiskReserveSize - diskReserveRatio = worker.conf.workerDiskReserveRatio - diskUsableSizes = workerInfo.diskInfos.asScala.map { case (mountPoint, diskInfo) => - (mountPoint, DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize, diskReserveRatio)) - }.toMap partitionSplitMinimumSize = worker.conf.partitionSplitMinimumSize partitionSplitMaximumSize = worker.conf.partitionSplitMaximumSize storageManager = worker.storageManager @@ -95,8 +86,6 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler testPushPrimaryDataTimeout = worker.conf.testPushPrimaryDataTimeout testPushReplicaDataTimeout = worker.conf.testPushReplicaDataTimeout registered = Some(worker.registered) - logInfo( - s"diskReserveSize ${Utils.bytesToString(diskReserveSize)}, diskReserveRatio ${diskReserveRatio.orNull}") } override def receive( @@ -1230,8 +1219,8 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler } val mountPoint = fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint val diskInfo = workerInfo.diskInfos.get(mountPoint) - val diskFull = diskInfo.status.equals( - DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace < diskUsableSizes(mountPoint) + val diskFull = + diskInfo.status.equals(DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace <= 0 diskFull } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 0be341912..15f89555a 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -274,6 +274,7 @@ private[celeborn] class Worker( assert(replicatePort > 0, "worker replica bind port should be positive") storageManager.updateDiskInfos() + storageManager.startDeviceMonitor() // WorkerInfo's diskInfos is a reference to storageManager.diskInfos val diskInfos = JavaUtils.newConcurrentHashMap[String, DiskInfo]() diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala index 8243f6522..77cd6d44f 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala @@ -254,18 +254,18 @@ object DeviceMonitor extends Logging { tryWithTimeoutAndCallback({ val usage = getDiskUsageInfos(diskInfo) // assume no single device capacity exceeds 1EB in this era - val minimumUsableSize = DiskUtils.getMinimumUsableSize( + val actualReserveSize = DiskUtils.getActualReserveSize( diskInfo, conf.workerDiskReserveSize, conf.workerDiskReserveRatio) val highDiskUsage = - usage.freeSpace < minimumUsableSize || diskInfo.actualUsableSpace <= 0 + usage.freeSpace < actualReserveSize || diskInfo.actualUsableSpace <= 0 if (highDiskUsage) { logWarning(s"${diskInfo.mountPoint} usage is above threshold." + - s" Disk usage(Report by OS):{total:${Utils.bytesToString(usage.totalSpace)}," + - s" free:${Utils.bytesToString(usage.freeSpace)}, used_percent:${usage.usedPercent}} " + - s"usage(Report by Celeborn):{" + - s"total:${Utils.bytesToString(diskInfo.configuredUsableSpace)}" + + s" Disk usage(Report by OS): {total:${Utils.bytesToString(usage.totalSpace)}," + + s" free:${Utils.bytesToString(usage.freeSpace)}, used_percent:${usage.usedPercent}}," + + s" usage(Report by Celeborn): {" + + s" total:${Utils.bytesToString(diskInfo.configuredUsableSpace)}," + s" free:${Utils.bytesToString(diskInfo.actualUsableSpace)} }") } highDiskUsage diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 517bcb971..87d6de179 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -160,8 +160,6 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs (flushers, totalThread) } - deviceMonitor.startCheck() - val hdfsDir = conf.hdfsDir val s3Dir = conf.s3Dir val hdfsPermission = new FsPermission("755") @@ -890,15 +888,17 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs try { val (fileSystemReportedUsableSpace, fileSystemReportedTotalSpace) = getFileSystemReportedSpace(diskInfo.mountPoint) + val actualReserveSize = + DiskUtils.getActualReserveSize(diskInfo, diskReserveSize, diskReserveRatio) val workingDirUsableSpace = - Math.min(diskInfo.configuredUsableSpace - totalUsage, fileSystemReportedUsableSpace) - val minimumReserveSize = - DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize, diskReserveRatio) - val usableSpace = Math.max(workingDirUsableSpace - minimumReserveSize, 0) + Math.min( + diskInfo.configuredUsableSpace - totalUsage, + fileSystemReportedUsableSpace - actualReserveSize) + val usableSpace = Math.max(workingDirUsableSpace, 0) logDebug( - s"Update diskInfo:${diskInfo.mountPoint} workingDirUsableSpace:$workingDirUsableSpace fileMeta:$fileSystemReportedUsableSpace" + - s"conf:${diskInfo.configuredUsableSpace} totalUsage:$totalUsage totalSpace:$fileSystemReportedTotalSpace" + - s"minimumReserveSize:$minimumReserveSize usableSpace:$usableSpace") + s"Update diskInfo:${diskInfo.mountPoint} workingDirUsableSpace:$workingDirUsableSpace fileMeta:$fileSystemReportedUsableSpace " + + s"configuredUsableSpace:${diskInfo.configuredUsableSpace} totalUsage:$totalUsage totalSpace:$fileSystemReportedTotalSpace " + + s"actualReserveSize:$actualReserveSize usableSpace:$usableSpace") diskInfo.setUsableSpace(usableSpace) diskInfo.setTotalSpace(fileSystemReportedTotalSpace) } catch { @@ -1170,6 +1170,11 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs } throw exception } + + def startDeviceMonitor(): Unit = { + deviceMonitor.startCheck() + } + } object StorageManager { diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala index b85697552..d393df083 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala @@ -23,9 +23,9 @@ import org.mockito.stubbing.Stubber import org.apache.celeborn.CelebornFunSuite import org.apache.celeborn.common.CelebornConf -import org.apache.celeborn.common.CelebornConf.{WORKER_GRACEFUL_SHUTDOWN_ENABLED, WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH} +import org.apache.celeborn.common.CelebornConf.{WORKER_DISK_RESERVE_SIZE, WORKER_GRACEFUL_SHUTDOWN_ENABLED, WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH} import org.apache.celeborn.common.meta.DiskInfo -import org.apache.celeborn.common.util.DiskUtils +import org.apache.celeborn.common.util.Utils import org.apache.celeborn.service.deploy.worker.WorkerSource trait MockitoHelper extends MockitoSugar { @@ -36,8 +36,6 @@ trait MockitoHelper extends MockitoSugar { class StorageManagerSuite extends CelebornFunSuite with MockitoHelper { - val conf = new CelebornConf() - test("[CELEBORN-926] saveAllCommittedFileInfosToDB cause IllegalMonitorStateException") { val conf = new CelebornConf().set(WORKER_GRACEFUL_SHUTDOWN_ENABLED, true) .set(WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH, "/tmp/recover") @@ -47,42 +45,67 @@ class StorageManagerSuite extends CelebornFunSuite with MockitoHelper { } test("updateDiskInfosWithDiskReserveSize") { + // reserve size set to 5g + val conf = new CelebornConf().set(WORKER_DISK_RESERVE_SIZE, Utils.byteStringAsBytes("5g")) val storageManager = new StorageManager(conf, new WorkerSource(conf)) val spyStorageManager = spy(storageManager) - val disks = prepareDisks() - val diskSetSpace = (80 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L) - doReturn(disks).when(spyStorageManager).disksSnapshot() - doReturn(diskSetSpace).when(spyStorageManager).getFileSystemReportedSpace(any) + val diskInfo = new DiskInfo("/mnt/disk1", List.empty, null, conf) + diskInfo.setUsableSpace(-1L) + + var diskSetSpace = (0L, 0L) + doReturn(List(diskInfo)).when(spyStorageManager).disksSnapshot() + doAnswer(diskSetSpace).when(spyStorageManager).getFileSystemReportedSpace(any) + + // disk usable 80g, total 80g, worker config 8EB + diskSetSpace = (80 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L) + diskInfo.configuredUsableSpace = Long.MaxValue spyStorageManager.updateDiskInfos() - for (disk <- disks) { - val minimumReserveSize = - DiskUtils.getMinimumUsableSize( - disk, - conf.workerDiskReserveSize, - conf.workerDiskReserveRatio) - assert(disk.actualUsableSpace == diskSetSpace._1 - minimumReserveSize) - } - } + assert(diskInfo.actualUsableSpace == 75 * 1024 * 1024 * 1024L) - def prepareDisks(): List[DiskInfo] = { - val diskSetSpaces = Array( - 90L * 1024 * 1024 * 1024, - 95L * 1024 * 1024 * 1024, - 100L * 1024 * 1024 * 1024) + // disk usable 80g, total 80g, worker config 50g + diskInfo.configuredUsableSpace = 50 * 1024 * 1024 * 1024L + diskInfo.setUsableSpace(-1L) + spyStorageManager.updateDiskInfos() + assert(diskInfo.actualUsableSpace == 50 * 1024 * 1024 * 1024L) - val diskInfo1 = new DiskInfo("/mnt/disk1", List.empty, null, conf) - diskInfo1.configuredUsableSpace = (Long.MaxValue) - diskInfo1.setUsableSpace(diskSetSpaces(0)) + // disk usable 10g, total 80g, worker config 20g + diskSetSpace = (10 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L) + diskInfo.configuredUsableSpace = 20 * 1024 * 1024 * 1024L + diskInfo.setUsableSpace(-1L) + spyStorageManager.updateDiskInfos() + assert(diskInfo.actualUsableSpace == 5 * 1024 * 1024 * 1024L) - val diskInfo2 = new DiskInfo("/mnt/disk2", List.empty, null, conf) - diskInfo2.configuredUsableSpace = (Long.MaxValue) - diskInfo2.setUsableSpace(diskSetSpaces(1)) + // disk usable 10g, total 80g, worker config 5g + diskInfo.configuredUsableSpace = 5 * 1024 * 1024 * 1024L + diskInfo.setUsableSpace(-1L) + spyStorageManager.updateDiskInfos() + assert(diskInfo.actualUsableSpace == 5 * 1024 * 1024 * 1024L) - val diskInfo3 = new DiskInfo("/mnt/disk3", List.empty, null, conf) - diskInfo3.configuredUsableSpace = (Long.MaxValue) - diskInfo3.setUsableSpace(diskSetSpaces(2)) + // disk usable 5g, total 80g, worker config 20g + diskSetSpace = (5 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L) + diskInfo.configuredUsableSpace = 20 * 1024 * 1024 * 1024L + diskInfo.setUsableSpace(-1L) + spyStorageManager.updateDiskInfos() + assert(diskInfo.actualUsableSpace == 0L) - List(diskInfo1, diskInfo2, diskInfo3) + // disk usable 5g, total 80g, worker config 5g + diskInfo.configuredUsableSpace = 5 * 1024 * 1024 * 1024L + diskInfo.setUsableSpace(-1L) + spyStorageManager.updateDiskInfos() + assert(diskInfo.actualUsableSpace == 0L) + + // disk usable 1g, total 80g, worker config 20g + diskSetSpace = (1 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L) + diskInfo.configuredUsableSpace = 20 * 1024 * 1024 * 1024L + diskInfo.setUsableSpace(-1L) + spyStorageManager.updateDiskInfos() + assert(diskInfo.actualUsableSpace == 0L) + + // disk usable 1g, total 80g, worker config 5g + diskInfo.configuredUsableSpace = 5 * 1024 * 1024 * 1024L + diskInfo.setUsableSpace(-1L) + spyStorageManager.updateDiskInfos() + assert(diskInfo.actualUsableSpace == 0L) } }