diff --git a/CONFIGURATION_GUIDE.md b/CONFIGURATION_GUIDE.md index 19e273999..0b348c5e8 100644 --- a/CONFIGURATION_GUIDE.md +++ b/CONFIGURATION_GUIDE.md @@ -211,11 +211,14 @@ rss.replicateserver.port | `rss.ha.port` | 9872 | int | | | `rss.ha.storage.dir` | `/tmp/ratis` | String | | | `rss.device.monitor.enabled` | true | boolean | Whether to enable device monitor | -| `rss.device.monitor.checklist` | `readwrite,diskusage` | String | Select what the device needs to detect ; The options include iohang, readOrWrite and diskUsage. | +| `rss.device.monitor.checklist` | `readwrite,diskusage` | String | Select what the device needs to detect ; The options include iohang, readOrWrite and diskUsage. | | `rss.disk.check.interval` | 15s | String | How frequency DeviceMonitor checks IO hang | | `rss.slow.flush.interval` | 10s | String | Threshold that determines slow flush | | `rss.sys.block.dir` | "/sys/block" | String | Directory to read device stat and inflight | -| `rss.create.file.writer.retry.count` | 3 | Int | Worker create FileWriter retry count | +| `rss.create.file.writer.retry.count` | 3 | int | Worker create FileWriter retry count | +| `rss.worker.checkFileCleanRetryTimes` | 3 | int | The number of retries for a worker to check if the working directory is cleaned up before registering with the master. | +| `rss.worker.checkFileCleanTimeoutMs` | 1000ms | String | The wait time per retry for a worker to check if the working directory is cleaned up before registering with the master. | +| `rss.worker.diskFlusherShutdownTimeoutMs` | 3000ms | String | The timeout to wait for diskOperators to execute remaining jobs before being shutdown immediately. | | `rss.worker.status.check.timeout` | 10s | String | Worker device check timeout | | `rss.worker.offheap.memory.critical.ratio` | 0.9 | float | Worker direct memory usage critical level ratio | | `rss.worker.memory.check.interval` | 10 | int | Timeunit is millisecond | diff --git a/client/src/main/java/com/aliyun/emr/rss/client/ShuffleClient.java b/client/src/main/java/com/aliyun/emr/rss/client/ShuffleClient.java index 086a8b354..ee7d8c770 100644 --- a/client/src/main/java/com/aliyun/emr/rss/client/ShuffleClient.java +++ b/client/src/main/java/com/aliyun/emr/rss/client/ShuffleClient.java @@ -26,7 +26,10 @@ import com.aliyun.emr.rss.client.read.RssInputStream; import com.aliyun.emr.rss.common.RssConf; import com.aliyun.emr.rss.common.rpc.RpcEndpointRef; -/** ShuffleClient有可能是进程单例 具体的PartitionLocation应该隐藏在实现里面 */ +/** + * ShuffleClient may be a process singleton, the specific PartitionLocation should be hidden in the + * implementation + */ public abstract class ShuffleClient implements Cloneable { private static volatile ShuffleClient _instance; private static volatile boolean initFinished = false; diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala index d356e8edf..6114b794d 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala @@ -778,6 +778,14 @@ object RssConf extends Logging { conf.getTimeAsSeconds("rss.worker.status.check.timeout", "30s") } + def checkFileCleanRetryTimes(conf: RssConf): Int = { + conf.getInt("rss.worker.checkFileCleanRetryTimes", 3) + } + + def checkFileCleanTimeoutMs(conf: RssConf): Long = { + conf.getTimeAsMs("rss.worker.checkFileCleanTimeoutMs", "1000ms") + } + /** * Add non empty and non null suffix to a key. */ diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/storage/StorageManager.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/storage/StorageManager.scala index be1cbd76e..2d3b3997c 100644 --- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/storage/StorageManager.scala +++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/storage/StorageManager.scala @@ -181,6 +181,15 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract this.fileInfosDb = null } } + cleanupExpiredAppDirs(System.currentTimeMillis(), RssConf.workerGracefulShutdown(conf)) + if (!checkIfWorkingDirCleaned) { + logWarning( + "Worker still has residual files in the working directory before registering with Master, " + + "please refer to the configuration document to increase rss.worker.checkFileCleanRetryTimes or " + + "rss.worker.checkFileCleanTimeoutMs .") + } else { + logInfo("Successfully remove all files under working directory.") + } private def reloadAndCleanFileInfos(db: DB): Unit = { if (db != null) { @@ -380,25 +389,35 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract 30, TimeUnit.MINUTES) - private def cleanupExpiredAppDirs(expireTime: Long): Unit = { + private def cleanupExpiredAppDirs(expireTime: Long, isGracefulShutdown: Boolean = false): Unit = { + val appIds = shuffleKeySet().asScala.map(key => Utils.splitShuffleKey(key)._1) disksSnapshot().filter(_.status != DiskStatus.IoHang).foreach { case diskInfo => - diskInfo.dirs.foreach { case workingDir => - workingDir.listFiles().foreach { case appDir => - if (appDir.lastModified() < expireTime) { - val threadPool = diskOperators.get(diskInfo.mountPoint) - deleteDirectory(appDir, threadPool) - logInfo(s"Delete expired app dir $appDir.") + diskInfo.dirs.foreach { + case workingDir if workingDir.exists() => + workingDir.listFiles().foreach { case appDir => + // Don't delete shuffleKey's data recovered from levelDB when restart with graceful shutdown + if (!(isGracefulShutdown && appIds.contains(appDir.getName))) { + if (appDir.lastModified() < expireTime) { + val threadPool = diskOperators.get(diskInfo.mountPoint) + deleteDirectory(appDir, threadPool) + logInfo(s"Delete expired app dir $appDir.") + } + } } - } + // workingDir not exist when initializing worker on new disk + case _ => // do nothing } } if (hdfsFs != null) { - val iter = hdfsFs.listFiles(new Path(hdfsDir, RssConf.workingDirName(conf)), false) - while (iter.hasNext) { - val fileStatus = iter.next() - if (fileStatus.getModificationTime < expireTime) { - StorageManager.hdfsFs.delete(fileStatus.getPath, true) + val hdfsWorkPath = new Path(hdfsDir, RssConf.workingDirName(conf)) + if (hdfsFs.exists(hdfsWorkPath)) { + val iter = hdfsFs.listFiles(hdfsWorkPath, false) + while (iter.hasNext) { + val fileStatus = iter.next() + if (fileStatus.getModificationTime < expireTime) { + hdfsFs.delete(fileStatus.getPath, true) + } } } } @@ -437,6 +456,47 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract } } + private def checkIfWorkingDirCleaned: Boolean = { + var retryTimes = 0 + val awaitTimeout = RssConf.checkFileCleanTimeoutMs(conf) + val appIds = shuffleKeySet().asScala.map(key => Utils.splitShuffleKey(key)._1) + while (retryTimes < RssConf.checkFileCleanRetryTimes(conf)) { + val localCleaned = !disksSnapshot().filter(_.status != DiskStatus.IoHang).exists { + case diskInfo => diskInfo.dirs.exists { + case workingDir if workingDir.exists() => + // Don't check appDirs that store information in the fileInfos + workingDir.listFiles().exists(appDir => !appIds.contains(appDir.getName)) + case _ => + false + } + } + + val hdfsCleaned = hdfsFs match { + case hdfs: FileSystem => + val hdfsWorkPath = new Path(hdfsDir, RssConf.workingDirName(conf)) + // hdfs path not exist when first time initialize + if (hdfs.exists(hdfsWorkPath)) { + !hdfs.listFiles(hdfsWorkPath, false).hasNext + } else { + true + } + case _ => + true + } + + if (localCleaned && hdfsCleaned) { + return true + } + retryTimes += 1 + if (retryTimes < RssConf.checkFileCleanRetryTimes(conf)) { + logInfo(s"Working directory's files have not been cleaned up completely, " + + s"will start ${retryTimes + 1}th attempt after ${awaitTimeout} milliseconds.") + } + Thread.sleep(awaitTimeout) + } + false + } + def close(): Unit = { if (fileInfosDb != null) { try {