diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 20252e911..b1f850f8d 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -358,13 +358,14 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs if (dirs.isEmpty) { val shuffleDir = new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId") - FileSystem.mkdirs(StorageManager.hadoopFs, shuffleDir, hdfsPermission) val fileInfo = new FileInfo( new Path(shuffleDir, fileName).toString, userIdentifier, partitionType, partitionSplitEnabled) + fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, fileInfo) + FileSystem.mkdirs(StorageManager.hadoopFs, shuffleDir, hdfsPermission) val hdfsWriter = partitionType match { case PartitionType.MAP => new MapPartitionFileWriter( fileInfo, @@ -390,7 +391,6 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs hdfsWriter.setStorageManager(this) hdfsWriter.setShuffleKey(shuffleKey) } - fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, fileInfo) hdfsWriters.put(fileInfo.getFilePath, hdfsWriter) return hdfsWriter } else { @@ -399,6 +399,14 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val shuffleDir = new File(dir, s"$appId/$shuffleId") val file = new File(shuffleDir, fileName) try { + val fileInfo = + new FileInfo( + file.getAbsolutePath, + userIdentifier, + partitionType, + partitionSplitEnabled) + fileInfo.setMountPoint(mountPoint) + fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, fileInfo) shuffleDir.mkdirs() if (file.exists()) { throw new FileAlreadyExistsException( @@ -410,13 +418,6 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs s"Create shuffle data file ${file.getAbsolutePath} failed!") } } - val fileInfo = - new FileInfo( - file.getAbsolutePath, - userIdentifier, - partitionType, - partitionSplitEnabled) - fileInfo.setMountPoint(mountPoint) val fileWriter = partitionType match { case PartitionType.MAP => new MapPartitionFileWriter( fileInfo, @@ -445,7 +446,6 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs deviceMonitor.registerFileWriter(fileWriter) val map = workingDirWriters.computeIfAbsent(dir, workingDirWriterListFunc) map.put(fileInfo.getFilePath, fileWriter) - fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, fileInfo) location.getStorageInfo.setMountPoint(mountPoint) logDebug(s"location $location set disk hint to ${location.getStorageInfo} ") return fileWriter