From c931663e5fed3713c1d193670d1e518e4f13559e Mon Sep 17 00:00:00 2001 From: nafiy <30563796+nafiyAix@users.noreply.github.com> Date: Fri, 16 Dec 2022 15:47:36 +0800 Subject: [PATCH] [CELEBORN-110][REFACTOR] Notify critical error after collecting a certain number of non-critical error (#1055) --- .../apache/celeborn/common/CelebornConf.scala | 21 +++++ docs/configuration/worker.md | 2 + .../deploy/worker/storage/DeviceMonitor.scala | 86 ++++++++++++------- .../worker/storage/StorageManager.scala | 4 +- .../worker/storage/DeviceMonitorSuite.scala | 37 +------- 5 files changed, 82 insertions(+), 68 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index ec837625e..f5d87b869 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -724,6 +724,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def diskMonitorCheckList: Seq[String] = get(WORKER_DISK_MONITOR_CHECKLIST) def diskMonitorCheckInterval: Long = get(WORKER_DISK_MONITOR_CHECK_INTERVAL) def diskMonitorSysBlockDir: String = get(WORKER_DISK_MONITOR_SYS_BLOCK_DIR) + def diskMonitorNotifyErrorThreshold: Int = get(WORKER_DISK_MONITOR_NOTIFY_ERROR_THRESHOLD) + def diskMonitorNotifyErrorExpireTimeout: Long = + get(WORKER_DISK_MONITOR_NOTIFY_ERROR_EXPIRE_TIMEOUT) def createWriterMaxAttempts: Int = get(WORKER_WRITER_CREATE_MAX_ATTEMPTS) def workerStorageBaseDirPrefix: String = get(WORKER_STORAGE_BASE_DIR_PREFIX) def workerStorageBaseDirNumber: Int = get(WORKER_STORAGE_BASE_DIR_COUNT) @@ -2445,6 +2448,24 @@ object CelebornConf extends Logging { .stringConf .createWithDefault("/sys/block") + val WORKER_DISK_MONITOR_NOTIFY_ERROR_THRESHOLD: ConfigEntry[Int] = + buildConf("celeborn.worker.monitor.disk.notifyError.threshold") + .categories("worker") + .version("0.3.0") + .doc("Device monitor will only notify critical error once the accumulated valid non-critical error number " + + "exceeding this threshold.") + .intConf + .createWithDefault(64) + + val WORKER_DISK_MONITOR_NOTIFY_ERROR_EXPIRE_TIMEOUT: ConfigEntry[Long] = + buildConf("celeborn.worker.monitor.disk.notifyError.expireTimeout") + .categories("worker") + .version("0.3.0") + .doc("The expire timeout of non-critical device error. Only notify critical error when the number of non-critical " + + "errors for a period of time exceeds threshold.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("10m") + val WORKER_WRITER_CREATE_MAX_ATTEMPTS: ConfigEntry[Int] = buildConf("celeborn.worker.writer.create.maxAttempts") .withAlternative("rss.create.file.writer.retry.count") diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index bd7d7d91d..0ae29ef9f 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -62,6 +62,8 @@ license: | | celeborn.worker.monitor.disk.checkInterval | 60s | Intervals between device monitor to check disk. | 0.2.0 | | celeborn.worker.monitor.disk.checklist | readwrite,diskusage | Monitor type for disk, available items are: iohang, readwrite and diskusage. | 0.2.0 | | celeborn.worker.monitor.disk.enabled | true | When true, worker will monitor device and report to master. | 0.2.0 | +| celeborn.worker.monitor.disk.notifyError.expireTimeout | 10m | The expire timeout of non-critical device error. Only notify critical error when the number of non-critical errors for a period of time exceeds threshold. | 0.3.0 | +| celeborn.worker.monitor.disk.notifyError.threshold | 64 | Device monitor will only notify critical error once the accumulated valid non-critical error number exceeding this threshold. | 0.3.0 | | celeborn.worker.monitor.disk.sys.block.dir | /sys/block | The directory where linux file block information is stored. | 0.2.0 | | celeborn.worker.noneEmptyDirExpireDuration | 1d | If a non-empty application shuffle data dir have not been operated during le duration time, will mark this application as expired. | 0.2.0 | | celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | Max ratio of partition sorter's memory for sorting, when reserved memory is higher than max partition sorter memory, partition sorter will stop sorting. | 0.2.0 | diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala index 630804ee1..ebc8ad6a7 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala @@ -42,7 +42,6 @@ trait DeviceMonitor { def registerFlusher(flusher: LocalFlusher): Unit = {} def unregisterFlusher(flusher: LocalFlusher): Unit = {} def reportNonCriticalError(mountPoint: String, e: IOException, diskStatus: DiskStatus): Unit = {} - def reportDeviceError(mountPoint: String, e: IOException, diskStatus: DiskStatus): Unit = {} def close() {} } @@ -66,6 +65,10 @@ class LocalDeviceMonitor( val statFile = new File(s"$sysBlockDir/${deviceInfo.name}/stat") val inFlightFile = new File(s"$sysBlockDir/${deviceInfo.name}/inflight") + val nonCriticalErrors = new ConcurrentHashMap[DiskStatus, util.Set[Long]]() + val notifyErrorThreshold = conf.diskMonitorNotifyErrorThreshold + val notifyErrorExpireTimeout = conf.diskMonitorNotifyErrorExpireTimeout + var lastReadComplete: Long = -1 var lastWriteComplete: Long = -1 var lastReadInflight: Long = -1 @@ -96,6 +99,13 @@ class LocalDeviceMonitor( def notifyObserversOnNonCriticalError(mountPoints: List[String], diskStatus: DiskStatus): Unit = this.synchronized { + val nonCriticalErrorSetFunc = new util.function.Function[DiskStatus, util.Set[Long]] { + override def apply(t: DiskStatus): util.Set[Long] = { + ConcurrentHashMap.newKeySet[Long]() + } + } + nonCriticalErrors.computeIfAbsent(diskStatus, nonCriticalErrorSetFunc).add( + System.currentTimeMillis()) mountPoints.foreach { case mountPoint => diskInfos.get(mountPoint).setStatus(diskStatus) } @@ -225,26 +235,45 @@ class LocalDeviceMonitor( try { observedDevices.values().asScala.foreach(device => { val mountPoints = device.diskInfos.keySet.asScala.toList - - if (checkIoHang && device.ioHang()) { - logger.error(s"Encounter device io hang error!" + - s"${device.deviceInfo.name}, notify observers") - device.notifyObserversOnNonCriticalError(mountPoints, DiskStatus.IO_HANG) + // tolerate time accuracy for better performance + val now = System.currentTimeMillis() + for (concurrentSet <- device.nonCriticalErrors.values().asScala) { + for (time <- concurrentSet.asScala) { + if (now - time > device.notifyErrorExpireTimeout) { + concurrentSet.remove(time) + } + } + } + val nonCriticalErrorSum = device.nonCriticalErrors.values().asScala.map(_.size).sum + if (nonCriticalErrorSum > device.notifyErrorThreshold) { + logger.error(s"Device ${device.deviceInfo.name} has accumulated $nonCriticalErrorSum non-critical " + + s"error within the past ${Utils.msDurationToString(device.notifyErrorExpireTimeout)} , its sum has " + + s"exceed the threshold (${device.notifyErrorThreshold}), device monitor will notify error to " + + s"observed device.") + val mountPoints = device.diskInfos.values().asScala.map(_.mountPoint).toList + device.notifyObserversOnError(mountPoints, DiskStatus.CRITICAL_ERROR) } else { - device.diskInfos.values().asScala.foreach { case diskInfo => - if (checkDiskUsage && DeviceMonitor.highDiskUsage(conf, diskInfo)) { - logger.error(s"${diskInfo.mountPoint} high_disk_usage error, notify observers") - device.notifyObserversOnHighDiskUsage(diskInfo.mountPoint) - } else if (checkReadWrite && - DeviceMonitor.readWriteError(conf, diskInfo.dirs.head)) { - logger.error(s"${diskInfo.mountPoint} read-write error, notify observers") - // We think that if one dir in device has read-write problem, if possible all - // dirs in this device have the problem - device.notifyObserversOnNonCriticalError( - List(diskInfo.mountPoint), - DiskStatus.READ_OR_WRITE_FAILURE) - } else { - device.notifyObserversOnHealthy(diskInfo.mountPoint) + if (checkIoHang && device.ioHang()) { + logger.error(s"Encounter device io hang error!" + + s"${device.deviceInfo.name}, notify observers") + device.notifyObserversOnNonCriticalError(mountPoints, DiskStatus.IO_HANG) + } else { + device.diskInfos.values().asScala.foreach { case diskInfo => + if (checkDiskUsage && DeviceMonitor.highDiskUsage(conf, diskInfo)) { + logger.error( + s"${diskInfo.mountPoint} high_disk_usage error, notify observers") + device.notifyObserversOnHighDiskUsage(diskInfo.mountPoint) + } else if (checkReadWrite && + DeviceMonitor.readWriteError(conf, diskInfo.dirs.head)) { + logger.error(s"${diskInfo.mountPoint} read-write error, notify observers") + // We think that if one dir in device has read-write problem, if possible all + // dirs in this device have the problem + device.notifyObserversOnNonCriticalError( + List(diskInfo.mountPoint), + DiskStatus.READ_OR_WRITE_FAILURE) + } else if (nonCriticalErrorSum <= device.notifyErrorThreshold * 0.5) { + device.notifyObserversOnHealthy(diskInfo.mountPoint) + } } } } @@ -278,17 +307,6 @@ class LocalDeviceMonitor( observedDevices.get(diskInfos.get(flusher.mountPoint).deviceInfo).removeObserver(flusher) } - override def reportDeviceError( - mountPoint: String, - e: IOException, - diskStatus: DiskStatus): Unit = { - logger.error(s"Receive report exception, disk $mountPoint, $e") - if (diskInfos.containsKey(mountPoint)) { - observedDevices.get(diskInfos.get(mountPoint).deviceInfo) - .notifyObserversOnError(List(mountPoint), diskStatus) - } - } - override def reportNonCriticalError( mountPoint: String, e: IOException, @@ -332,7 +350,8 @@ object DeviceMonitor { /** * check if the disk is high usage - * @param conf conf + * + * @param conf conf * @param diskInfo diskInfo * @return true if high disk usage */ @@ -362,7 +381,8 @@ object DeviceMonitor { /** * check if the data dir has read-write problem - * @param conf conf + * + * @param conf conf * @param dataDir one of shuffle data dirs in mount disk * @return true if disk has read-write problem */ diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 54d2138af..493075199 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -143,8 +143,8 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs } override def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit = this.synchronized { - if (diskStatus == DiskStatus.IO_HANG) { - logInfo("IoHang, remove disk operator.") + if (diskStatus == DiskStatus.CRITICAL_ERROR) { + logInfo(s"Disk ${mountPoint} faces critical error, will remove its disk operator.") val operator = diskOperators.remove(mountPoint) if (operator != null) { operator.shutdown() diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala index de05c64fc..ca76c7bf0 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala @@ -214,19 +214,19 @@ class DeviceMonitorSuite extends AnyFunSuite { deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager)) assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4)) - when(fw2.notifyError("vda", DiskStatus.IO_HANG)) + when(fw2.notifyError("vda", DiskStatus.CRITICAL_ERROR)) .thenAnswer((a: String, b: List[File]) => { deviceMonitor.unregisterFileWriter(fw2) }) - when(fw4.notifyError("vdb", DiskStatus.IO_HANG)) + when(fw4.notifyError("vdb", DiskStatus.CRITICAL_ERROR)) .thenAnswer((a: String, b: List[File]) => { deviceMonitor.unregisterFileWriter(fw4) }) - when(df2.notifyError("vda", DiskStatus.IO_HANG)) + when(df2.notifyError("vda", DiskStatus.CRITICAL_ERROR)) .thenAnswer((a: String, b: List[File]) => { df2.stopFlag.set(true) }) - when(df4.notifyError("vdb", DiskStatus.IO_HANG)) + when(df4.notifyError("vdb", DiskStatus.CRITICAL_ERROR)) .thenAnswer((a: String, b: List[File]) => { df4.stopFlag.set(true) }) @@ -252,35 +252,6 @@ class DeviceMonitorSuite extends AnyFunSuite { deviceMonitor.registerFileWriter(fw4) assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 4) assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 4) - val dirs = new jArrayList[File]() - dirs.add(null) - when(fw1.notifyError(any(), any())) - .thenAnswer((_: Any) => { - deviceMonitor.unregisterFileWriter(fw1) - }) - when(fw2.notifyError(any(), any())) - .thenAnswer((_: Any) => { - deviceMonitor.unregisterFileWriter(fw2) - }) - deviceMonitor.reportDeviceError("/mnt/disk1", null, DiskStatus.IO_HANG) - assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 2) - assert( - deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager)) - assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2)) - - when(fw3.notifyError(any(), any())) - .thenAnswer((_: Any) => { - deviceMonitor.unregisterFileWriter(fw3) - }) - when(fw4.notifyError(any(), any())) - .thenAnswer((_: Any) => { - deviceMonitor.unregisterFileWriter(fw4) - }) - deviceMonitor.reportDeviceError("/mnt/disk2", null, DiskStatus.IO_HANG) - assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 2) - assert( - deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager)) - assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4)) } }