From a77a8eb8fddb331aaaf6001356cd1eaab42ff4df Mon Sep 17 00:00:00 2001 From: hongzhaoyang <15316036153@163.com> Date: Fri, 8 Sep 2023 20:41:04 +0800 Subject: [PATCH] [CELEBORN-881][BUG] StorageManager clean up thread may delete new app directories ### What changes were proposed in this pull request? Worker throw FileNotFoundException while fetch chunk: ``` java.io.FileNotFoundException: /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/871-0-0 (No such file or directory ``` before commit shuffle files, files are deleted in storage-scheduler thread ``` 2023-09-07 19:38:16,506 [INFO] [dispatcher-event-loop-44] - org.apache.celeborn.service.deploy.worker.storage.StorageManager -Logging.scala(51) -Create file /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/986-0-0 success 2023-09-07 19:38:16,506 [INFO] [dispatcher-event-loop-44] - org.apache.celeborn.service.deploy.worker.Controller -Logging.scala(51) -Reserved 29 primary location and 0 replica location for application_1693206141914_540726_1-1 2023-09-07 19:38:16,537 [INFO] [storage-scheduler] - org.apache.celeborn.service.deploy.worker.storage.StorageManager -Logging.scala(51) -Delete expired app dir /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1. 2023-09-07 19:38:16,580 [INFO] [storage-scheduler] - org.apache.celeborn.service.deploy.worker.storage.StorageManager -Logging.scala(51) -Delete expired app dir /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1. 2023-09-07 19:38:16,629 [INFO] [storage-scheduler] - org.apache.celeborn.service.deploy.worker.storage.StorageManager -Logging.scala(51) -Delete expired app dir /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1. 2023-09-07 19:38:16,661 [INFO] [storage-scheduler] - org.apache.celeborn.service.deploy.worker.storage.StorageManager -Logging.scala(51) -Delete expired app dir /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1. 2023-09-07 19:38:16,681 [INFO] [storage-scheduler] - org.apache.celeborn.service.deploy.worker.storage.StorageManager -Logging.scala(51) -Delete expired app dir /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1. 2023-09-07 19:38:17,355 [INFO] [dispatcher-event-loop-12] - org.apache.celeborn.service.deploy.worker.Controller -Logging.scala(51) -Start commitFiles for application_1693206141914_540726_1-1 2023-09-07 19:38:17,362 [INFO] [async-reply] - org.apache.celeborn.service.deploy.worker.Controller -Logging.scala(51) -CommitFiles for application_1693206141914_540726_1-1 success with 29 committed primary partitions, 0 empty primary partitions, 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions, 0 failed replica partitions. java.io.FileNotFoundException: /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/976-0-0 (No such file or directory) java.io.FileNotFoundException: /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/482-0-0 (No such file or directory) java.io.FileNotFoundException: /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/658-0-0 (No such file or directory) ``` it may have concurrent problem in this method. ``` scala private def cleanupExpiredAppDirs(): Unit = { val appIds = shuffleKeySet().asScala.map(key => Utils.splitShuffleKey(key)._1) disksSnapshot().filter(_.status != DiskStatus.IO_HANG).foreach { diskInfo => diskInfo.dirs.foreach { case workingDir if workingDir.exists() => workingDir.listFiles().foreach { appDir => // Don't delete shuffleKey's data that exist correct shuffle file info. if (!appIds.contains(appDir.getName)) { val threadPool = diskOperators.get(diskInfo.mountPoint) deleteDirectory(appDir, threadPool) logInfo(s"Delete expired app dir $appDir.") } } // workingDir not exist when initializing worker on new disk case _ => // do nothing } } } ``` We should find all app directories first, then get the active shuffle keys. https://issues.apache.org/jira/browse/CELEBORN-881 ### Why are the changes needed? Bugfix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Passes GA and manual test. Closes #1889 from zy-jordan/CELEBORN-881. Lead-authored-by: hongzhaoyang <15316036153@163.com> Co-authored-by: Keyong Zhou Co-authored-by: Keyong Zhou Signed-off-by: zky.zhoukeyong --- .../worker/storage/StorageManager.scala | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index aa9b72a1f..64538f1fc 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -594,20 +594,21 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs TimeUnit.MINUTES) private def cleanupExpiredAppDirs(): Unit = { + val diskInfoAndAppDirs = disksSnapshot() + .filter(_.status != DiskStatus.IO_HANG) + .map { case diskInfo => + (diskInfo, diskInfo.dirs.filter(_.exists).flatMap(_.listFiles())) + } val appIds = shuffleKeySet().asScala.map(key => Utils.splitShuffleKey(key)._1) - disksSnapshot().filter(_.status != DiskStatus.IO_HANG).foreach { diskInfo => - diskInfo.dirs.foreach { - case workingDir if workingDir.exists() => - workingDir.listFiles().foreach { appDir => - // Don't delete shuffleKey's data that exist correct shuffle file info. - if (!appIds.contains(appDir.getName)) { - val threadPool = diskOperators.get(diskInfo.mountPoint) - deleteDirectory(appDir, threadPool) - logInfo(s"Delete expired app dir $appDir.") - } - } - // workingDir not exist when initializing worker on new disk - case _ => // do nothing + + diskInfoAndAppDirs.foreach { case (diskInfo, appDirs) => + appDirs.foreach { appDir => + // Don't delete shuffleKey's data that exist correct shuffle file info. + if (!appIds.contains(appDir.getName)) { + val threadPool = diskOperators.get(diskInfo.mountPoint) + deleteDirectory(appDir, threadPool) + logInfo(s"Delete expired app dir $appDir.") + } } } }