[ISSUE-516][FEATURE] Worker should clean remaining directory when start before registering to Master (#540)

This commit is contained in:
nafiy 2022-09-06 23:37:47 +08:00 committed by GitHub
parent 8da0dfa948
commit 644471debb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 90 additions and 16 deletions

View File

@ -211,11 +211,14 @@ rss.replicateserver.port
| `rss.ha.port` | 9872 | int | |
| `rss.ha.storage.dir` | `/tmp/ratis` | String | |
| `rss.device.monitor.enabled` | true | boolean | Whether to enable device monitor |
| `rss.device.monitor.checklist` | `readwrite,diskusage` | String | Select what the device needs to detect ; The options include iohang, readOrWrite and diskUsage. |
| `rss.device.monitor.checklist` | `readwrite,diskusage` | String | Select what the device needs to detect ; The options include iohang, readOrWrite and diskUsage. |
| `rss.disk.check.interval` | 15s | String | How frequency DeviceMonitor checks IO hang |
| `rss.slow.flush.interval` | 10s | String | Threshold that determines slow flush |
| `rss.sys.block.dir` | "/sys/block" | String | Directory to read device stat and inflight |
| `rss.create.file.writer.retry.count` | 3 | Int | Worker create FileWriter retry count |
| `rss.create.file.writer.retry.count` | 3 | int | Worker create FileWriter retry count |
| `rss.worker.checkFileCleanRetryTimes` | 3 | int | The number of retries for a worker to check if the working directory is cleaned up before registering with the master. |
| `rss.worker.checkFileCleanTimeoutMs` | 1000ms | String | The wait time per retry for a worker to check if the working directory is cleaned up before registering with the master. |
| `rss.worker.diskFlusherShutdownTimeoutMs` | 3000ms | String | The timeout to wait for diskOperators to execute remaining jobs before being shutdown immediately. |
| `rss.worker.status.check.timeout` | 10s | String | Worker device check timeout |
| `rss.worker.offheap.memory.critical.ratio` | 0.9 | float | Worker direct memory usage critical level ratio |
| `rss.worker.memory.check.interval` | 10 | int | Timeunit is millisecond |

View File

@ -26,7 +26,10 @@ import com.aliyun.emr.rss.client.read.RssInputStream;
import com.aliyun.emr.rss.common.RssConf;
import com.aliyun.emr.rss.common.rpc.RpcEndpointRef;
/** ShuffleClient有可能是进程单例 具体的PartitionLocation应该隐藏在实现里面 */
/**
* ShuffleClient may be a process singleton, the specific PartitionLocation should be hidden in the
* implementation
*/
public abstract class ShuffleClient implements Cloneable {
private static volatile ShuffleClient _instance;
private static volatile boolean initFinished = false;

View File

@ -778,6 +778,14 @@ object RssConf extends Logging {
conf.getTimeAsSeconds("rss.worker.status.check.timeout", "30s")
}
def checkFileCleanRetryTimes(conf: RssConf): Int = {
conf.getInt("rss.worker.checkFileCleanRetryTimes", 3)
}
def checkFileCleanTimeoutMs(conf: RssConf): Long = {
conf.getTimeAsMs("rss.worker.checkFileCleanTimeoutMs", "1000ms")
}
/**
* Add non empty and non null suffix to a key.
*/

View File

@ -181,6 +181,15 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
this.fileInfosDb = null
}
}
cleanupExpiredAppDirs(System.currentTimeMillis(), RssConf.workerGracefulShutdown(conf))
if (!checkIfWorkingDirCleaned) {
logWarning(
"Worker still has residual files in the working directory before registering with Master, " +
"please refer to the configuration document to increase rss.worker.checkFileCleanRetryTimes or " +
"rss.worker.checkFileCleanTimeoutMs .")
} else {
logInfo("Successfully remove all files under working directory.")
}
private def reloadAndCleanFileInfos(db: DB): Unit = {
if (db != null) {
@ -380,25 +389,35 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
30,
TimeUnit.MINUTES)
private def cleanupExpiredAppDirs(expireTime: Long): Unit = {
private def cleanupExpiredAppDirs(expireTime: Long, isGracefulShutdown: Boolean = false): Unit = {
val appIds = shuffleKeySet().asScala.map(key => Utils.splitShuffleKey(key)._1)
disksSnapshot().filter(_.status != DiskStatus.IoHang).foreach { case diskInfo =>
diskInfo.dirs.foreach { case workingDir =>
workingDir.listFiles().foreach { case appDir =>
if (appDir.lastModified() < expireTime) {
val threadPool = diskOperators.get(diskInfo.mountPoint)
deleteDirectory(appDir, threadPool)
logInfo(s"Delete expired app dir $appDir.")
diskInfo.dirs.foreach {
case workingDir if workingDir.exists() =>
workingDir.listFiles().foreach { case appDir =>
// Don't delete shuffleKey's data recovered from levelDB when restart with graceful shutdown
if (!(isGracefulShutdown && appIds.contains(appDir.getName))) {
if (appDir.lastModified() < expireTime) {
val threadPool = diskOperators.get(diskInfo.mountPoint)
deleteDirectory(appDir, threadPool)
logInfo(s"Delete expired app dir $appDir.")
}
}
}
}
// workingDir not exist when initializing worker on new disk
case _ => // do nothing
}
}
if (hdfsFs != null) {
val iter = hdfsFs.listFiles(new Path(hdfsDir, RssConf.workingDirName(conf)), false)
while (iter.hasNext) {
val fileStatus = iter.next()
if (fileStatus.getModificationTime < expireTime) {
StorageManager.hdfsFs.delete(fileStatus.getPath, true)
val hdfsWorkPath = new Path(hdfsDir, RssConf.workingDirName(conf))
if (hdfsFs.exists(hdfsWorkPath)) {
val iter = hdfsFs.listFiles(hdfsWorkPath, false)
while (iter.hasNext) {
val fileStatus = iter.next()
if (fileStatus.getModificationTime < expireTime) {
hdfsFs.delete(fileStatus.getPath, true)
}
}
}
}
@ -437,6 +456,47 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
}
}
private def checkIfWorkingDirCleaned: Boolean = {
var retryTimes = 0
val awaitTimeout = RssConf.checkFileCleanTimeoutMs(conf)
val appIds = shuffleKeySet().asScala.map(key => Utils.splitShuffleKey(key)._1)
while (retryTimes < RssConf.checkFileCleanRetryTimes(conf)) {
val localCleaned = !disksSnapshot().filter(_.status != DiskStatus.IoHang).exists {
case diskInfo => diskInfo.dirs.exists {
case workingDir if workingDir.exists() =>
// Don't check appDirs that store information in the fileInfos
workingDir.listFiles().exists(appDir => !appIds.contains(appDir.getName))
case _ =>
false
}
}
val hdfsCleaned = hdfsFs match {
case hdfs: FileSystem =>
val hdfsWorkPath = new Path(hdfsDir, RssConf.workingDirName(conf))
// hdfs path not exist when first time initialize
if (hdfs.exists(hdfsWorkPath)) {
!hdfs.listFiles(hdfsWorkPath, false).hasNext
} else {
true
}
case _ =>
true
}
if (localCleaned && hdfsCleaned) {
return true
}
retryTimes += 1
if (retryTimes < RssConf.checkFileCleanRetryTimes(conf)) {
logInfo(s"Working directory's files have not been cleaned up completely, " +
s"will start ${retryTimes + 1}th attempt after ${awaitTimeout} milliseconds.")
}
Thread.sleep(awaitTimeout)
}
false
}
def close(): Unit = {
if (fileInfosDb != null) {
try {