[CELEBORN-881][BUG] StorageManager clean up thread may delete new app directories

### What changes were proposed in this pull request?

Worker throw FileNotFoundException while fetch chunk:
```
java.io.FileNotFoundException: /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/871-0-0 (No such file or directory
```
before commit shuffle files, files are deleted in storage-scheduler thread
```
2023-09-07 19:38:16,506 [INFO] [dispatcher-event-loop-44] - org.apache.celeborn.service.deploy.worker.storage.StorageManager -Logging.scala(51) -Create file /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/986-0-0 success
2023-09-07 19:38:16,506 [INFO] [dispatcher-event-loop-44] - org.apache.celeborn.service.deploy.worker.Controller -Logging.scala(51) -Reserved 29 primary location and 0 replica location for application_1693206141914_540726_1-1
2023-09-07 19:38:16,537 [INFO] [storage-scheduler] - org.apache.celeborn.service.deploy.worker.storage.StorageManager -Logging.scala(51) -Delete expired app dir /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
2023-09-07 19:38:16,580 [INFO] [storage-scheduler] - org.apache.celeborn.service.deploy.worker.storage.StorageManager -Logging.scala(51) -Delete expired app dir /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
2023-09-07 19:38:16,629 [INFO] [storage-scheduler] - org.apache.celeborn.service.deploy.worker.storage.StorageManager -Logging.scala(51) -Delete expired app dir /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
2023-09-07 19:38:16,661 [INFO] [storage-scheduler] - org.apache.celeborn.service.deploy.worker.storage.StorageManager -Logging.scala(51) -Delete expired app dir /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
2023-09-07 19:38:16,681 [INFO] [storage-scheduler] - org.apache.celeborn.service.deploy.worker.storage.StorageManager -Logging.scala(51) -Delete expired app dir /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
2023-09-07 19:38:17,355 [INFO] [dispatcher-event-loop-12] - org.apache.celeborn.service.deploy.worker.Controller -Logging.scala(51) -Start commitFiles for application_1693206141914_540726_1-1
2023-09-07 19:38:17,362 [INFO] [async-reply] - org.apache.celeborn.service.deploy.worker.Controller -Logging.scala(51) -CommitFiles for application_1693206141914_540726_1-1 success with 29 committed primary partitions, 0 empty primary partitions, 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions, 0 failed replica partitions.
java.io.FileNotFoundException: /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/976-0-0 (No such file or directory)
java.io.FileNotFoundException: /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/482-0-0 (No such file or directory)
java.io.FileNotFoundException: /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/658-0-0 (No such file or directory)
```
it may have concurrent problem in this method.
``` scala
private def cleanupExpiredAppDirs(): Unit = {
  val appIds = shuffleKeySet().asScala.map(key => Utils.splitShuffleKey(key)._1)
  disksSnapshot().filter(_.status != DiskStatus.IO_HANG).foreach { diskInfo =>
    diskInfo.dirs.foreach {
      case workingDir if workingDir.exists() =>
        workingDir.listFiles().foreach { appDir =>
          // Don't delete shuffleKey's data that exist correct shuffle file info.
          if (!appIds.contains(appDir.getName)) {
            val threadPool = diskOperators.get(diskInfo.mountPoint)
            deleteDirectory(appDir, threadPool)
            logInfo(s"Delete expired app dir $appDir.")
          }
        }
      // workingDir not exist when initializing worker on new disk
      case _ => // do nothing
    }
  }
}
```
We should find all app directories first, then get the active shuffle keys.

https://issues.apache.org/jira/browse/CELEBORN-881

### Why are the changes needed?
Bugfix.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Passes GA and manual test.

Closes #1889 from zy-jordan/CELEBORN-881.

Lead-authored-by: hongzhaoyang <15316036153@163.com>
Co-authored-by: Keyong Zhou <waitinfuture@gmail.com>
Co-authored-by: Keyong Zhou <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
This commit is contained in:
hongzhaoyang 2023-09-08 20:41:04 +08:00 committed by zky.zhoukeyong
parent fe2ce00176
commit a77a8eb8fd

View File

@ -594,20 +594,21 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
TimeUnit.MINUTES)
private def cleanupExpiredAppDirs(): Unit = {
val diskInfoAndAppDirs = disksSnapshot()
.filter(_.status != DiskStatus.IO_HANG)
.map { case diskInfo =>
(diskInfo, diskInfo.dirs.filter(_.exists).flatMap(_.listFiles()))
}
val appIds = shuffleKeySet().asScala.map(key => Utils.splitShuffleKey(key)._1)
disksSnapshot().filter(_.status != DiskStatus.IO_HANG).foreach { diskInfo =>
diskInfo.dirs.foreach {
case workingDir if workingDir.exists() =>
workingDir.listFiles().foreach { appDir =>
// Don't delete shuffleKey's data that exist correct shuffle file info.
if (!appIds.contains(appDir.getName)) {
val threadPool = diskOperators.get(diskInfo.mountPoint)
deleteDirectory(appDir, threadPool)
logInfo(s"Delete expired app dir $appDir.")
}
}
// workingDir not exist when initializing worker on new disk
case _ => // do nothing
diskInfoAndAppDirs.foreach { case (diskInfo, appDirs) =>
appDirs.foreach { appDir =>
// Don't delete shuffleKey's data that exist correct shuffle file info.
if (!appIds.contains(appDir.getName)) {
val threadPool = diskOperators.get(diskInfo.mountPoint)
deleteDirectory(appDir, threadPool)
logInfo(s"Delete expired app dir $appDir.")
}
}
}
}