From aa9dfd05667275b8c2a3089ff3dcf15389348f8b Mon Sep 17 00:00:00 2001 From: sunjunjie Date: Mon, 25 Sep 2023 23:20:48 +0800 Subject: [PATCH] =?UTF-8?q?[CELEBORN-1005][BUG]=20Clean=20Expired=20App=20?= =?UTF-8?q?Dirs=20will=20delete=20the=20running=20a=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? When working on reading shuffle data, the file was accidentally deleted `2023-09-22 16:32:36,810 [storage-scheduler] INFO org.apache.celeborn.service.deploy.worker.storage.StorageManager[51]: Delete expired app dir /data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1. 2023-09-22 16:32:36,810 [Disk-cleaner-/data8-6] DEBUG org.apache.celeborn.service.deploy.worker.storage.StorageManager[47]: Deleted expired shuffle file /data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1/32/924-0-0. 2023-09-22 16:32:53,304 [fetch-server-11-31] DEBUG org.apache.celeborn.service.deploy.worker.FetchHandler[47]: Received chunk fetch request application_1689848866482_12296544_1-32 924-0-0 0 2147483647 get file info FileInfo{file=/data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1/32/924-0-0, chunkOffsets=0,558, userIdentifier=`default`.`default`, partitionType=REDUCE} java.io.FileNotFoundException: /data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1/32/924-0-0 (No such file or directory)` Because when cleaning up the directories of expired apps, the file directory is created first and then added to the fileInfos collection. As a result, when getting the shuffleKeySet, the running apps do not yet exist, causing the files to be mistakenly deleted. https://issues.apache.org/jira/browse/CELEBORN-1005 ### Why are the changes needed? bugfix ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1937 from wilsonjie/CELEBORN-1005. Lead-authored-by: sunjunjie Co-authored-by: junjie.sun <40379361+wilsonjie@users.noreply.github.com> Signed-off-by: zky.zhoukeyong --- .../worker/storage/StorageManager.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 20252e911..b1f850f8d 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -358,13 +358,14 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs if (dirs.isEmpty) { val shuffleDir = new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId") - FileSystem.mkdirs(StorageManager.hadoopFs, shuffleDir, hdfsPermission) val fileInfo = new FileInfo( new Path(shuffleDir, fileName).toString, userIdentifier, partitionType, partitionSplitEnabled) + fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, fileInfo) + FileSystem.mkdirs(StorageManager.hadoopFs, shuffleDir, hdfsPermission) val hdfsWriter = partitionType match { case PartitionType.MAP => new MapPartitionFileWriter( fileInfo, @@ -390,7 +391,6 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs hdfsWriter.setStorageManager(this) hdfsWriter.setShuffleKey(shuffleKey) } - fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, fileInfo) hdfsWriters.put(fileInfo.getFilePath, hdfsWriter) return hdfsWriter } else { @@ -399,6 +399,14 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val shuffleDir = new File(dir, s"$appId/$shuffleId") val file = new File(shuffleDir, fileName) try { + val fileInfo = + new FileInfo( + file.getAbsolutePath, + userIdentifier, + partitionType, + partitionSplitEnabled) + fileInfo.setMountPoint(mountPoint) + fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, fileInfo) shuffleDir.mkdirs() if (file.exists()) { throw new FileAlreadyExistsException( @@ -410,13 +418,6 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs s"Create shuffle data file ${file.getAbsolutePath} failed!") } } - val fileInfo = - new FileInfo( - file.getAbsolutePath, - userIdentifier, - partitionType, - partitionSplitEnabled) - fileInfo.setMountPoint(mountPoint) val fileWriter = partitionType match { case PartitionType.MAP => new MapPartitionFileWriter( fileInfo, @@ -445,7 +446,6 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs deviceMonitor.registerFileWriter(fileWriter) val map = workingDirWriters.computeIfAbsent(dir, workingDirWriterListFunc) map.put(fileInfo.getFilePath, fileWriter) - fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, fileInfo) location.getStorageInfo.setMountPoint(mountPoint) logDebug(s"location $location set disk hint to ${location.getStorageInfo} ") return fileWriter