diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index aa9b72a1f..64538f1fc 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -594,20 +594,21 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs TimeUnit.MINUTES) private def cleanupExpiredAppDirs(): Unit = { + val diskInfoAndAppDirs = disksSnapshot() + .filter(_.status != DiskStatus.IO_HANG) + .map { case diskInfo => + (diskInfo, diskInfo.dirs.filter(_.exists).flatMap(_.listFiles())) + } val appIds = shuffleKeySet().asScala.map(key => Utils.splitShuffleKey(key)._1) - disksSnapshot().filter(_.status != DiskStatus.IO_HANG).foreach { diskInfo => - diskInfo.dirs.foreach { - case workingDir if workingDir.exists() => - workingDir.listFiles().foreach { appDir => - // Don't delete shuffleKey's data that exist correct shuffle file info. - if (!appIds.contains(appDir.getName)) { - val threadPool = diskOperators.get(diskInfo.mountPoint) - deleteDirectory(appDir, threadPool) - logInfo(s"Delete expired app dir $appDir.") - } - } - // workingDir not exist when initializing worker on new disk - case _ => // do nothing + + diskInfoAndAppDirs.foreach { case (diskInfo, appDirs) => + appDirs.foreach { appDir => + // Don't delete shuffleKey's data that exist correct shuffle file info. + if (!appIds.contains(appDir.getName)) { + val threadPool = diskOperators.get(diskInfo.mountPoint) + deleteDirectory(appDir, threadPool) + logInfo(s"Delete expired app dir $appDir.") + } } } }