[CELEBORN-110][REFACTOR] Notify critical error after collecting a certain number of non-critical error (#1055)
This commit is contained in:
parent
dc66369973
commit
c931663e5f
@ -724,6 +724,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
|
||||
def diskMonitorCheckList: Seq[String] = get(WORKER_DISK_MONITOR_CHECKLIST)
|
||||
def diskMonitorCheckInterval: Long = get(WORKER_DISK_MONITOR_CHECK_INTERVAL)
|
||||
def diskMonitorSysBlockDir: String = get(WORKER_DISK_MONITOR_SYS_BLOCK_DIR)
|
||||
def diskMonitorNotifyErrorThreshold: Int = get(WORKER_DISK_MONITOR_NOTIFY_ERROR_THRESHOLD)
|
||||
def diskMonitorNotifyErrorExpireTimeout: Long =
|
||||
get(WORKER_DISK_MONITOR_NOTIFY_ERROR_EXPIRE_TIMEOUT)
|
||||
def createWriterMaxAttempts: Int = get(WORKER_WRITER_CREATE_MAX_ATTEMPTS)
|
||||
def workerStorageBaseDirPrefix: String = get(WORKER_STORAGE_BASE_DIR_PREFIX)
|
||||
def workerStorageBaseDirNumber: Int = get(WORKER_STORAGE_BASE_DIR_COUNT)
|
||||
@ -2445,6 +2448,24 @@ object CelebornConf extends Logging {
|
||||
.stringConf
|
||||
.createWithDefault("/sys/block")
|
||||
|
||||
val WORKER_DISK_MONITOR_NOTIFY_ERROR_THRESHOLD: ConfigEntry[Int] =
|
||||
buildConf("celeborn.worker.monitor.disk.notifyError.threshold")
|
||||
.categories("worker")
|
||||
.version("0.3.0")
|
||||
.doc("Device monitor will only notify critical error once the accumulated valid non-critical error number " +
|
||||
"exceeding this threshold.")
|
||||
.intConf
|
||||
.createWithDefault(64)
|
||||
|
||||
val WORKER_DISK_MONITOR_NOTIFY_ERROR_EXPIRE_TIMEOUT: ConfigEntry[Long] =
|
||||
buildConf("celeborn.worker.monitor.disk.notifyError.expireTimeout")
|
||||
.categories("worker")
|
||||
.version("0.3.0")
|
||||
.doc("The expire timeout of non-critical device error. Only notify critical error when the number of non-critical " +
|
||||
"errors for a period of time exceeds threshold.")
|
||||
.timeConf(TimeUnit.MILLISECONDS)
|
||||
.createWithDefaultString("10m")
|
||||
|
||||
val WORKER_WRITER_CREATE_MAX_ATTEMPTS: ConfigEntry[Int] =
|
||||
buildConf("celeborn.worker.writer.create.maxAttempts")
|
||||
.withAlternative("rss.create.file.writer.retry.count")
|
||||
|
||||
@ -62,6 +62,8 @@ license: |
|
||||
| celeborn.worker.monitor.disk.checkInterval | 60s | Intervals between device monitor to check disk. | 0.2.0 |
|
||||
| celeborn.worker.monitor.disk.checklist | readwrite,diskusage | Monitor type for disk, available items are: iohang, readwrite and diskusage. | 0.2.0 |
|
||||
| celeborn.worker.monitor.disk.enabled | true | When true, worker will monitor device and report to master. | 0.2.0 |
|
||||
| celeborn.worker.monitor.disk.notifyError.expireTimeout | 10m | The expire timeout of non-critical device error. Only notify critical error when the number of non-critical errors for a period of time exceeds threshold. | 0.3.0 |
|
||||
| celeborn.worker.monitor.disk.notifyError.threshold | 64 | Device monitor will only notify critical error once the accumulated valid non-critical error number exceeding this threshold. | 0.3.0 |
|
||||
| celeborn.worker.monitor.disk.sys.block.dir | /sys/block | The directory where linux file block information is stored. | 0.2.0 |
|
||||
| celeborn.worker.noneEmptyDirExpireDuration | 1d | If a non-empty application shuffle data dir have not been operated during le duration time, will mark this application as expired. | 0.2.0 |
|
||||
| celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | Max ratio of partition sorter's memory for sorting, when reserved memory is higher than max partition sorter memory, partition sorter will stop sorting. | 0.2.0 |
|
||||
|
||||
@ -42,7 +42,6 @@ trait DeviceMonitor {
|
||||
def registerFlusher(flusher: LocalFlusher): Unit = {}
|
||||
def unregisterFlusher(flusher: LocalFlusher): Unit = {}
|
||||
def reportNonCriticalError(mountPoint: String, e: IOException, diskStatus: DiskStatus): Unit = {}
|
||||
def reportDeviceError(mountPoint: String, e: IOException, diskStatus: DiskStatus): Unit = {}
|
||||
def close() {}
|
||||
}
|
||||
|
||||
@ -66,6 +65,10 @@ class LocalDeviceMonitor(
|
||||
val statFile = new File(s"$sysBlockDir/${deviceInfo.name}/stat")
|
||||
val inFlightFile = new File(s"$sysBlockDir/${deviceInfo.name}/inflight")
|
||||
|
||||
val nonCriticalErrors = new ConcurrentHashMap[DiskStatus, util.Set[Long]]()
|
||||
val notifyErrorThreshold = conf.diskMonitorNotifyErrorThreshold
|
||||
val notifyErrorExpireTimeout = conf.diskMonitorNotifyErrorExpireTimeout
|
||||
|
||||
var lastReadComplete: Long = -1
|
||||
var lastWriteComplete: Long = -1
|
||||
var lastReadInflight: Long = -1
|
||||
@ -96,6 +99,13 @@ class LocalDeviceMonitor(
|
||||
|
||||
def notifyObserversOnNonCriticalError(mountPoints: List[String], diskStatus: DiskStatus): Unit =
|
||||
this.synchronized {
|
||||
val nonCriticalErrorSetFunc = new util.function.Function[DiskStatus, util.Set[Long]] {
|
||||
override def apply(t: DiskStatus): util.Set[Long] = {
|
||||
ConcurrentHashMap.newKeySet[Long]()
|
||||
}
|
||||
}
|
||||
nonCriticalErrors.computeIfAbsent(diskStatus, nonCriticalErrorSetFunc).add(
|
||||
System.currentTimeMillis())
|
||||
mountPoints.foreach { case mountPoint =>
|
||||
diskInfos.get(mountPoint).setStatus(diskStatus)
|
||||
}
|
||||
@ -225,26 +235,45 @@ class LocalDeviceMonitor(
|
||||
try {
|
||||
observedDevices.values().asScala.foreach(device => {
|
||||
val mountPoints = device.diskInfos.keySet.asScala.toList
|
||||
|
||||
if (checkIoHang && device.ioHang()) {
|
||||
logger.error(s"Encounter device io hang error!" +
|
||||
s"${device.deviceInfo.name}, notify observers")
|
||||
device.notifyObserversOnNonCriticalError(mountPoints, DiskStatus.IO_HANG)
|
||||
// tolerate time accuracy for better performance
|
||||
val now = System.currentTimeMillis()
|
||||
for (concurrentSet <- device.nonCriticalErrors.values().asScala) {
|
||||
for (time <- concurrentSet.asScala) {
|
||||
if (now - time > device.notifyErrorExpireTimeout) {
|
||||
concurrentSet.remove(time)
|
||||
}
|
||||
}
|
||||
}
|
||||
val nonCriticalErrorSum = device.nonCriticalErrors.values().asScala.map(_.size).sum
|
||||
if (nonCriticalErrorSum > device.notifyErrorThreshold) {
|
||||
logger.error(s"Device ${device.deviceInfo.name} has accumulated $nonCriticalErrorSum non-critical " +
|
||||
s"error within the past ${Utils.msDurationToString(device.notifyErrorExpireTimeout)} , its sum has " +
|
||||
s"exceed the threshold (${device.notifyErrorThreshold}), device monitor will notify error to " +
|
||||
s"observed device.")
|
||||
val mountPoints = device.diskInfos.values().asScala.map(_.mountPoint).toList
|
||||
device.notifyObserversOnError(mountPoints, DiskStatus.CRITICAL_ERROR)
|
||||
} else {
|
||||
device.diskInfos.values().asScala.foreach { case diskInfo =>
|
||||
if (checkDiskUsage && DeviceMonitor.highDiskUsage(conf, diskInfo)) {
|
||||
logger.error(s"${diskInfo.mountPoint} high_disk_usage error, notify observers")
|
||||
device.notifyObserversOnHighDiskUsage(diskInfo.mountPoint)
|
||||
} else if (checkReadWrite &&
|
||||
DeviceMonitor.readWriteError(conf, diskInfo.dirs.head)) {
|
||||
logger.error(s"${diskInfo.mountPoint} read-write error, notify observers")
|
||||
// We think that if one dir in device has read-write problem, if possible all
|
||||
// dirs in this device have the problem
|
||||
device.notifyObserversOnNonCriticalError(
|
||||
List(diskInfo.mountPoint),
|
||||
DiskStatus.READ_OR_WRITE_FAILURE)
|
||||
} else {
|
||||
device.notifyObserversOnHealthy(diskInfo.mountPoint)
|
||||
if (checkIoHang && device.ioHang()) {
|
||||
logger.error(s"Encounter device io hang error!" +
|
||||
s"${device.deviceInfo.name}, notify observers")
|
||||
device.notifyObserversOnNonCriticalError(mountPoints, DiskStatus.IO_HANG)
|
||||
} else {
|
||||
device.diskInfos.values().asScala.foreach { case diskInfo =>
|
||||
if (checkDiskUsage && DeviceMonitor.highDiskUsage(conf, diskInfo)) {
|
||||
logger.error(
|
||||
s"${diskInfo.mountPoint} high_disk_usage error, notify observers")
|
||||
device.notifyObserversOnHighDiskUsage(diskInfo.mountPoint)
|
||||
} else if (checkReadWrite &&
|
||||
DeviceMonitor.readWriteError(conf, diskInfo.dirs.head)) {
|
||||
logger.error(s"${diskInfo.mountPoint} read-write error, notify observers")
|
||||
// We think that if one dir in device has read-write problem, if possible all
|
||||
// dirs in this device have the problem
|
||||
device.notifyObserversOnNonCriticalError(
|
||||
List(diskInfo.mountPoint),
|
||||
DiskStatus.READ_OR_WRITE_FAILURE)
|
||||
} else if (nonCriticalErrorSum <= device.notifyErrorThreshold * 0.5) {
|
||||
device.notifyObserversOnHealthy(diskInfo.mountPoint)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -278,17 +307,6 @@ class LocalDeviceMonitor(
|
||||
observedDevices.get(diskInfos.get(flusher.mountPoint).deviceInfo).removeObserver(flusher)
|
||||
}
|
||||
|
||||
override def reportDeviceError(
|
||||
mountPoint: String,
|
||||
e: IOException,
|
||||
diskStatus: DiskStatus): Unit = {
|
||||
logger.error(s"Receive report exception, disk $mountPoint, $e")
|
||||
if (diskInfos.containsKey(mountPoint)) {
|
||||
observedDevices.get(diskInfos.get(mountPoint).deviceInfo)
|
||||
.notifyObserversOnError(List(mountPoint), diskStatus)
|
||||
}
|
||||
}
|
||||
|
||||
override def reportNonCriticalError(
|
||||
mountPoint: String,
|
||||
e: IOException,
|
||||
@ -332,7 +350,8 @@ object DeviceMonitor {
|
||||
|
||||
/**
|
||||
* check if the disk is high usage
|
||||
* @param conf conf
|
||||
*
|
||||
* @param conf conf
|
||||
* @param diskInfo diskInfo
|
||||
* @return true if high disk usage
|
||||
*/
|
||||
@ -362,7 +381,8 @@ object DeviceMonitor {
|
||||
|
||||
/**
|
||||
* check if the data dir has read-write problem
|
||||
* @param conf conf
|
||||
*
|
||||
* @param conf conf
|
||||
* @param dataDir one of shuffle data dirs in mount disk
|
||||
* @return true if disk has read-write problem
|
||||
*/
|
||||
|
||||
@ -143,8 +143,8 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
|
||||
}
|
||||
|
||||
override def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit = this.synchronized {
|
||||
if (diskStatus == DiskStatus.IO_HANG) {
|
||||
logInfo("IoHang, remove disk operator.")
|
||||
if (diskStatus == DiskStatus.CRITICAL_ERROR) {
|
||||
logInfo(s"Disk ${mountPoint} faces critical error, will remove its disk operator.")
|
||||
val operator = diskOperators.remove(mountPoint)
|
||||
if (operator != null) {
|
||||
operator.shutdown()
|
||||
|
||||
@ -214,19 +214,19 @@ class DeviceMonitorSuite extends AnyFunSuite {
|
||||
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
|
||||
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4))
|
||||
|
||||
when(fw2.notifyError("vda", DiskStatus.IO_HANG))
|
||||
when(fw2.notifyError("vda", DiskStatus.CRITICAL_ERROR))
|
||||
.thenAnswer((a: String, b: List[File]) => {
|
||||
deviceMonitor.unregisterFileWriter(fw2)
|
||||
})
|
||||
when(fw4.notifyError("vdb", DiskStatus.IO_HANG))
|
||||
when(fw4.notifyError("vdb", DiskStatus.CRITICAL_ERROR))
|
||||
.thenAnswer((a: String, b: List[File]) => {
|
||||
deviceMonitor.unregisterFileWriter(fw4)
|
||||
})
|
||||
when(df2.notifyError("vda", DiskStatus.IO_HANG))
|
||||
when(df2.notifyError("vda", DiskStatus.CRITICAL_ERROR))
|
||||
.thenAnswer((a: String, b: List[File]) => {
|
||||
df2.stopFlag.set(true)
|
||||
})
|
||||
when(df4.notifyError("vdb", DiskStatus.IO_HANG))
|
||||
when(df4.notifyError("vdb", DiskStatus.CRITICAL_ERROR))
|
||||
.thenAnswer((a: String, b: List[File]) => {
|
||||
df4.stopFlag.set(true)
|
||||
})
|
||||
@ -252,35 +252,6 @@ class DeviceMonitorSuite extends AnyFunSuite {
|
||||
deviceMonitor.registerFileWriter(fw4)
|
||||
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 4)
|
||||
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 4)
|
||||
val dirs = new jArrayList[File]()
|
||||
dirs.add(null)
|
||||
when(fw1.notifyError(any(), any()))
|
||||
.thenAnswer((_: Any) => {
|
||||
deviceMonitor.unregisterFileWriter(fw1)
|
||||
})
|
||||
when(fw2.notifyError(any(), any()))
|
||||
.thenAnswer((_: Any) => {
|
||||
deviceMonitor.unregisterFileWriter(fw2)
|
||||
})
|
||||
deviceMonitor.reportDeviceError("/mnt/disk1", null, DiskStatus.IO_HANG)
|
||||
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 2)
|
||||
assert(
|
||||
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
|
||||
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2))
|
||||
|
||||
when(fw3.notifyError(any(), any()))
|
||||
.thenAnswer((_: Any) => {
|
||||
deviceMonitor.unregisterFileWriter(fw3)
|
||||
})
|
||||
when(fw4.notifyError(any(), any()))
|
||||
.thenAnswer((_: Any) => {
|
||||
deviceMonitor.unregisterFileWriter(fw4)
|
||||
})
|
||||
deviceMonitor.reportDeviceError("/mnt/disk2", null, DiskStatus.IO_HANG)
|
||||
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 2)
|
||||
assert(
|
||||
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
|
||||
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user