[CELEBORN-1005][BUG] Clean Expired App Dirs will delete the running a…

### What changes were proposed in this pull request?
When working on reading shuffle data, the file was accidentally deleted

`2023-09-22 16:32:36,810 [storage-scheduler] INFO  org.apache.celeborn.service.deploy.worker.storage.StorageManager[51]: Delete expired app dir /data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1.
2023-09-22 16:32:36,810 [Disk-cleaner-/data8-6] DEBUG org.apache.celeborn.service.deploy.worker.storage.StorageManager[47]: Deleted expired shuffle file /data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1/32/924-0-0.
2023-09-22 16:32:53,304 [fetch-server-11-31] DEBUG org.apache.celeborn.service.deploy.worker.FetchHandler[47]: Received chunk fetch request application_1689848866482_12296544_1-32 924-0-0 0 2147483647 get file info FileInfo{file=/data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1/32/924-0-0, chunkOffsets=0,558, userIdentifier=`default`.`default`, partitionType=REDUCE}
java.io.FileNotFoundException: /data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1/32/924-0-0 (No such file or directory)`

Because when cleaning up the directories of expired apps, the file directory is created first and then added to the fileInfos collection. As a result, when getting the shuffleKeySet, the running apps do not yet exist, causing the files to be mistakenly deleted.

https://issues.apache.org/jira/browse/CELEBORN-1005
### Why are the changes needed?
bugfix

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #1937 from wilsonjie/CELEBORN-1005.

Lead-authored-by: sunjunjie <sunjunjie@zto.com>
Co-authored-by: junjie.sun <40379361+wilsonjie@users.noreply.github.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
This commit is contained in:
sunjunjie 2023-09-25 23:20:48 +08:00 committed by zky.zhoukeyong
parent 3a41db360b
commit aa9dfd0566

View File

@ -358,13 +358,14 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
if (dirs.isEmpty) {
val shuffleDir =
new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId")
FileSystem.mkdirs(StorageManager.hadoopFs, shuffleDir, hdfsPermission)
val fileInfo =
new FileInfo(
new Path(shuffleDir, fileName).toString,
userIdentifier,
partitionType,
partitionSplitEnabled)
fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, fileInfo)
FileSystem.mkdirs(StorageManager.hadoopFs, shuffleDir, hdfsPermission)
val hdfsWriter = partitionType match {
case PartitionType.MAP => new MapPartitionFileWriter(
fileInfo,
@ -390,7 +391,6 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
hdfsWriter.setStorageManager(this)
hdfsWriter.setShuffleKey(shuffleKey)
}
fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, fileInfo)
hdfsWriters.put(fileInfo.getFilePath, hdfsWriter)
return hdfsWriter
} else {
@ -399,6 +399,14 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
val shuffleDir = new File(dir, s"$appId/$shuffleId")
val file = new File(shuffleDir, fileName)
try {
val fileInfo =
new FileInfo(
file.getAbsolutePath,
userIdentifier,
partitionType,
partitionSplitEnabled)
fileInfo.setMountPoint(mountPoint)
fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, fileInfo)
shuffleDir.mkdirs()
if (file.exists()) {
throw new FileAlreadyExistsException(
@ -410,13 +418,6 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
s"Create shuffle data file ${file.getAbsolutePath} failed!")
}
}
val fileInfo =
new FileInfo(
file.getAbsolutePath,
userIdentifier,
partitionType,
partitionSplitEnabled)
fileInfo.setMountPoint(mountPoint)
val fileWriter = partitionType match {
case PartitionType.MAP => new MapPartitionFileWriter(
fileInfo,
@ -445,7 +446,6 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
deviceMonitor.registerFileWriter(fileWriter)
val map = workingDirWriters.computeIfAbsent(dir, workingDirWriterListFunc)
map.put(fileInfo.getFilePath, fileWriter)
fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, fileInfo)
location.getStorageInfo.setMountPoint(mountPoint)
logDebug(s"location $location set disk hint to ${location.getStorageInfo} ")
return fileWriter