diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java index 683ec4708..50d569f73 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java @@ -27,28 +27,33 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.celeborn.common.protocol.message.ControlMessages.UserIdentifier; import org.apache.celeborn.common.util.Utils; public class FileInfo { private final String filePath; private final List chunkOffsets; + private final UserIdentifier userIdentifier; - public FileInfo(String filePath, List chunkOffsets) { + public FileInfo(String filePath, List chunkOffsets, UserIdentifier userIdentifier) { this.filePath = filePath; this.chunkOffsets = chunkOffsets; + this.userIdentifier = userIdentifier; } - public FileInfo(String filePath) { + public FileInfo(String filePath, UserIdentifier userIdentifier) { this.filePath = filePath; this.chunkOffsets = new ArrayList<>(); chunkOffsets.add(0L); + this.userIdentifier = userIdentifier; } @VisibleForTesting - public FileInfo(File file) { + public FileInfo(File file, UserIdentifier userIdentifier) { this.filePath = file.getAbsolutePath(); this.chunkOffsets = new ArrayList<>(); chunkOffsets.add(0L); + this.userIdentifier = userIdentifier; } public synchronized void addChunkOffset(long bytesFlushed) { @@ -107,6 +112,10 @@ public class FileInfo { return new Path(Utils.getWriteSuccessFilePath(Utils.getPeerPath(filePath))); } + public UserIdentifier getUserIdentifier() { + return userIdentifier; + } + public void deleteAllFiles(FileSystem hdfsFs) throws IOException { if (isHdfs()) { hdfsFs.delete(getHdfsPath(), false); @@ -135,6 +144,8 @@ public class FileInfo { + filePath + ", chunkOffsets=" + StringUtils.join(this.chunkOffsets, ",") + + ", userIdentifier=" + + userIdentifier.toString() + '}'; } } diff --git a/common/src/main/java/org/apache/celeborn/common/util/PbSerDeUtils.java b/common/src/main/java/org/apache/celeborn/common/util/PbSerDeUtils.java index 7dbbd2322..3bc56a8e0 100644 --- a/common/src/main/java/org/apache/celeborn/common/util/PbSerDeUtils.java +++ b/common/src/main/java/org/apache/celeborn/common/util/PbSerDeUtils.java @@ -27,7 +27,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.FileInfo; import org.apache.celeborn.common.protocol.*; -import org.apache.celeborn.common.protocol.message.ControlMessages.ResourceConsumption; +import org.apache.celeborn.common.protocol.message.ControlMessages.*; public class PbSerDeUtils { public static Set fromPbSortedShuffleFileSet(byte[] data) @@ -81,8 +81,13 @@ public class PbSerDeUtils { public static FileInfo fromPbFileInfo(PbFileInfo pbFileInfo) throws InvalidProtocolBufferException { + PbUserIdentifier pbUserIdentifier = pbFileInfo.getUserIdentifier(); + UserIdentifier userIdentifier = + new UserIdentifier(pbUserIdentifier.getTenantId(), pbUserIdentifier.getName()); return new FileInfo( - pbFileInfo.getFilePath(), new ArrayList<>(pbFileInfo.getChunkOffsetsList())); + pbFileInfo.getFilePath(), + new ArrayList<>(pbFileInfo.getChunkOffsetsList()), + userIdentifier); } public static PbFileInfo toPbFileInfo(FileInfo fileInfo) { @@ -111,19 +116,6 @@ public class PbSerDeUtils { return builder.build().toByteArray(); } - public static Set fromPbShuffleKeySet(byte[] data) throws InvalidProtocolBufferException { - PbShuffleKeySet pbShuffleKeySet = PbShuffleKeySet.parseFrom(data); - Set shuffleKeySet = ConcurrentHashMap.newKeySet(); - shuffleKeySet.addAll(pbShuffleKeySet.getShuffleKeysList()); - return shuffleKeySet; - } - - public static byte[] toPbShuffleKeySet(Set shuffleKeySet) { - PbShuffleKeySet.Builder builder = PbShuffleKeySet.newBuilder(); - builder.addAllShuffleKeys(shuffleKeySet); - return builder.build().toByteArray(); - } - public static ResourceConsumption fromPbResourceConsumption( PbResourceConsumption pbResourceConsumption) throws InvalidProtocolBufferException { return new ResourceConsumption( diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 06079079c..f3cc002bd 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -397,6 +397,7 @@ message PbStoreVersion { message PbFileInfo { string filePath = 1; repeated int64 chunkOffsets = 2; + PbUserIdentifier userIdentifier = 3; } message PbFileInfoMap { @@ -414,7 +415,3 @@ message PbResourceConsumption { int64 hdfsBytesWritten = 3; int64 hdfsFileCount = 4; } - -message PbShuffleKeySet { - repeated string shuffleKeys = 1; -} diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java index b41bd88e0..df1147993 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java @@ -19,11 +19,8 @@ package org.apache.celeborn.service.deploy.worker; import java.nio.charset.StandardCharsets; -import org.apache.celeborn.common.protocol.message.ControlMessages.*; - public abstract class ShuffleRecoverHelper { protected String SHUFFLE_KEY_PREFIX = "SHUFFLE-KEY"; - protected String USER_IDENTIFIER_PREFIX = "USER-IDENTIFIER"; protected LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider.StoreVersion(1, 0); protected byte[] dbShuffleKey(String shuffleKey) { @@ -36,17 +33,4 @@ public abstract class ShuffleRecoverHelper { } return s.substring(SHUFFLE_KEY_PREFIX.length() + 1); } - - protected byte[] dbUserIdentifier(UserIdentifier userIdentifier) { - return (USER_IDENTIFIER_PREFIX + ";" + userIdentifier.toString()) - .getBytes(StandardCharsets.UTF_8); - } - - protected UserIdentifier parseDbUserIdentifier(String s) { - if (!s.startsWith(USER_IDENTIFIER_PREFIX)) { - throw new IllegalArgumentException( - "Expected a string starting with " + USER_IDENTIFIER_PREFIX); - } - return UserIdentifier$.MODULE$.apply(s.substring(USER_IDENTIFIER_PREFIX.length() + 1)); - } } diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index 7b8d5a040..195e1330f 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -53,6 +53,7 @@ import org.apache.celeborn.common.RssConf; import org.apache.celeborn.common.meta.FileInfo; import org.apache.celeborn.common.metrics.source.AbstractSource; import org.apache.celeborn.common.network.server.MemoryTracker; +import org.apache.celeborn.common.protocol.message.ControlMessages.UserIdentifier; import org.apache.celeborn.common.unsafe.Platform; import org.apache.celeborn.common.util.PbSerDeUtils; import org.apache.celeborn.common.util.ShuffleBlockInfoUtils; @@ -168,6 +169,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { return fileInfo; } else { String fileId = shuffleKey + "-" + fileName; + UserIdentifier userIdentifier = fileInfo.getUserIdentifier(); Set sorted = sortedShuffleFiles.computeIfAbsent(shuffleKey, v -> ConcurrentHashMap.newKeySet()); @@ -180,7 +182,13 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { synchronized (sorting) { if (sorted.contains(fileId)) { return resolve( - shuffleKey, fileId, sortedFilePath, indexFilePath, startMapIndex, endMapIndex); + shuffleKey, + fileId, + userIdentifier, + sortedFilePath, + indexFilePath, + startMapIndex, + endMapIndex); } if (!sorting.contains(fileId)) { try { @@ -222,7 +230,14 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { } } - return resolve(shuffleKey, fileId, sortedFilePath, indexFilePath, startMapIndex, endMapIndex); + return resolve( + shuffleKey, + fileId, + userIdentifier, + sortedFilePath, + indexFilePath, + startMapIndex, + endMapIndex); } } @@ -436,6 +451,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { public FileInfo resolve( String shuffleKey, String fileId, + UserIdentifier userIdentifier, String sortedFilePath, String indexFilePath, int startMapIndex, @@ -481,7 +497,8 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { return new FileInfo( sortedFilePath, ShuffleBlockInfoUtils.getChunkOffsetsFromShuffleBlockInfos( - startMapIndex, endMapIndex, fetchChunkSize, indexMap)); + startMapIndex, endMapIndex, fetchChunkSize, indexMap), + userIdentifier); } class FileSorter { diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 6fe82a1e6..50941830f 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -50,8 +50,6 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract extends ShuffleRecoverHelper with DeviceObserver with Logging with MemoryTrackerListener { // mount point -> filewriter val workingDirWriters = new ConcurrentHashMap[File, util.ArrayList[FileWriter]]() - // userIdentifier -> shuffleKey - val userShuffleKeys = new ConcurrentHashMap[UserIdentifier, util.Set[String]]() val (deviceInfos, diskInfos) = { val workingDirInfos = @@ -169,9 +167,6 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract private val fileInfos = new ConcurrentHashMap[String, ConcurrentHashMap[String, FileInfo]]() private val RECOVERY_FILE_NAME = "recovery.ldb" - // sequences of store and recover are in reverse order - // store sequence : userShuffleKeys -> fileInfos - // recover sequence: fileInfos -> userShuffleKeys private var db: DB = null // ShuffleClient can fetch data from a restarted worker only // when the worker's fetching port is stable. @@ -180,7 +175,6 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract val recoverFile = new File(RssConf.workerRecoverPath(conf), RECOVERY_FILE_NAME) this.db = LevelDBProvider.initLevelDB(recoverFile, CURRENT_VERSION) reloadAndCleanFileInfos(this.db) - reloadAndCleanUserShuffleKeys(this.db) } catch { case e: Exception => logError("Init level DB failed:", e) @@ -215,6 +209,8 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract case exception: Exception => logError(s"Reload DB: ${shuffleKey} failed.", exception) } + } else { + return } } } @@ -232,41 +228,6 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract } } - private def reloadAndCleanUserShuffleKeys(db: DB): Unit = { - if (db != null) { - val itr = db.iterator - itr.seek(USER_IDENTIFIER_PREFIX.getBytes(StandardCharsets.UTF_8)) - while (itr.hasNext) { - val entry = itr.next - val key = new String(entry.getKey, StandardCharsets.UTF_8) - if (key.startsWith(USER_IDENTIFIER_PREFIX)) { - val userIdentifier = parseDbUserIdentifier(key) - try { - val shuffleKeys = PbSerDeUtils.fromPbShuffleKeySet(entry.getValue) - logDebug(s"Reload DB: ${userIdentifier}") - userShuffleKeys.put(userIdentifier, shuffleKeys) - db.delete(entry.getKey) - } catch { - case exception: Exception => - logError(s"Reload DB: ${userIdentifier} failed.", exception) - } - } - } - } - } - - def updateUserShuffleKeysInDB(): Unit = { - userShuffleKeys.asScala.foreach { case (userIdentifier, shuffleKeys) => - try { - db.put(dbUserIdentifier(userIdentifier), PbSerDeUtils.toPbShuffleKeySet(shuffleKeys)) - logDebug(s"Update UserShuffleInfos into DB: ${userIdentifier}") - } catch { - case exception: Exception => - logError(s"Update UserShuffleInfos into DB: ${userIdentifier} failed.", exception) - } - } - } - private def getNextIndex() = counter.getAndUpdate(counterOperator) private val newMapFunc = @@ -275,13 +236,6 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract new ConcurrentHashMap() } - private val newSetFunc = - new java.util.function.Function[UserIdentifier, util.Set[String]]() { - override def apply(key: UserIdentifier): util.Set[String] = { - ConcurrentHashMap.newKeySet[String]() - } - } - private val workingDirWriterListFunc = new java.util.function.Function[File, util.ArrayList[FileWriter]]() { override def apply(t: File): util.ArrayList[FileWriter] = new util.ArrayList[FileWriter]() @@ -323,7 +277,7 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract val shuffleDir = new Path(new Path(hdfsDir, RssConf.workingDirName(conf)), s"$appId/$shuffleId") FileSystem.mkdirs(StorageManager.hdfsFs, shuffleDir, hdfsPermission) - val fileInfo = new FileInfo(new Path(shuffleDir, fileName).toString) + val fileInfo = new FileInfo(new Path(shuffleDir, fileName).toString, userIdentifier) val hdfsWriter = new FileWriter( fileInfo, hdfsFlusher.get, @@ -339,7 +293,6 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract hdfsWriters.add(hdfsWriter) } hdfsWriter.registerDestroyHook(hdfsWriters) - userShuffleKeys.computeIfAbsent(userIdentifier, newSetFunc).add(shuffleKey) return hdfsWriter } else { val dir = dirs(getNextIndex() % dirs.size) @@ -353,7 +306,7 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract throw new RssException("create app shuffle data dir or file failed!" + s"${file.getAbsolutePath}") } - val fileInfo = new FileInfo(file.getAbsolutePath) + val fileInfo = new FileInfo(file.getAbsolutePath, userIdentifier) val fileWriter = new FileWriter( fileInfo, localFlushers.get(mountPoint), @@ -373,7 +326,6 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, fileInfo) location.getStorageInfo.setMountPoint(mountPoint) logDebug(s"location $location set disk hint to ${location.getStorageInfo} ") - userShuffleKeys.computeIfAbsent(userIdentifier, newSetFunc).add(shuffleKey) return fileWriter } catch { case t: Throwable => @@ -554,7 +506,6 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract def close(): Unit = { if (db != null) { try { - updateUserShuffleKeysInDB() updateFileInfosInDB() db.close() } catch { @@ -627,26 +578,34 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract } def userResourceConsumptionSnapshot(): Map[UserIdentifier, ResourceConsumption] = { - userShuffleKeys.synchronized { - userShuffleKeys.asScala.map { case (userIdentifier, shuffleKeys) => - val resourceMetrics = shuffleKeys.asScala.map { shuffleKey => - val userFileInfos = fileInfos.get(shuffleKey).values().asScala.toSeq - val diskFileInfos = userFileInfos.filter(!_.isHdfs) - val hdfsFileInfos = userFileInfos.filter(_.isHdfs) - - val diskBytesWritten = diskFileInfos.map(_.getFileLength).sum - val diskFileCount = diskFileInfos.size - val hdfsBytesWritten = hdfsFileInfos.map(_.getFileLength).sum - val hdfsFileCount = hdfsFileInfos.size - (diskBytesWritten, diskFileCount, hdfsBytesWritten, hdfsFileCount) + fileInfos.synchronized { + // shuffleId -> (fileName -> fileInfo) + fileInfos + .asScala + .toList + .flatMap { case (_, fileInfoMaps) => + // userIdentifier -> fileInfo + fileInfoMaps.values().asScala.map { fileInfo => + (fileInfo.getUserIdentifier, fileInfo) + } + } + // userIdentifier -> List((userIdentifier, fileInfo)) + .groupBy(_._1) + .map { case (userIdentifier, userWithFileInfoList) => + // collect resource consumed by each user on this worker + val resourceConsumption = { + val userFileInfos = userWithFileInfoList.map(_._2) + val diskFileInfos = userFileInfos.filter(!_.isHdfs) + val hdfsFileInfos = userFileInfos.filter(_.isHdfs) + + val diskBytesWritten = diskFileInfos.map(_.getFileLength).sum + val diskFileCount = diskFileInfos.size + val hdfsBytesWritten = hdfsFileInfos.map(_.getFileLength).sum + val hdfsFileCount = hdfsFileInfos.size + ResourceConsumption(diskBytesWritten, diskFileCount, hdfsBytesWritten, hdfsFileCount) + } + (userIdentifier, resourceConsumption) } - val resourceConsumption = ResourceConsumption( - resourceMetrics.map(_._1).sum, - resourceMetrics.map(_._2).sum, - resourceMetrics.map(_._3).sum, - resourceMetrics.map(_._4).sum) - (userIdentifier, resourceConsumption) - }.toMap } } } diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java index d6db57945..4a46f66fa 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java @@ -74,6 +74,7 @@ import org.apache.celeborn.common.network.util.TransportConf; import org.apache.celeborn.common.protocol.PartitionSplitMode; import org.apache.celeborn.common.protocol.PartitionType; import org.apache.celeborn.common.protocol.StorageInfo; +import org.apache.celeborn.common.protocol.message.ControlMessages.UserIdentifier; import org.apache.celeborn.common.util.ThreadUtils; import org.apache.celeborn.common.util.Utils; import org.apache.celeborn.service.deploy.worker.FetchHandler; @@ -96,6 +97,7 @@ public class FileWriterSuiteJ { private static TransportClientFactory clientFactory; private static long streamId; private static int numChunks; + private final UserIdentifier userIdentifier = new UserIdentifier("mock-tenantId", "mock-name"); private static final TransportConf transConf = new TransportConf("shuffle", MapConfigProvider.EMPTY); @@ -240,7 +242,7 @@ public class FileWriterSuiteJ { File file = getTemporaryFile(); FileWriter fileWriter = new FileWriter( - new FileInfo(file), + new FileInfo(file, userIdentifier), localFlusher, source, RSS_CONF, @@ -285,7 +287,7 @@ public class FileWriterSuiteJ { File file = getTemporaryFile(); FileWriter fileWriter = new FileWriter( - new FileInfo(file), + new FileInfo(file, userIdentifier), localFlusher, source, RSS_CONF, @@ -336,7 +338,7 @@ public class FileWriterSuiteJ { public void testWriteAndChunkRead() throws Exception { final int threadsNum = 8; File file = getTemporaryFile(); - FileInfo fileInfo = new FileInfo(file); + FileInfo fileInfo = new FileInfo(file, userIdentifier); FileWriter fileWriter = new FileWriter( fileInfo, diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java index 3f54a8181..b18d88359 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.common.RssConf; import org.apache.celeborn.common.meta.FileInfo; import org.apache.celeborn.common.network.server.MemoryTracker; +import org.apache.celeborn.common.protocol.message.ControlMessages.UserIdentifier; import org.apache.celeborn.common.unsafe.Platform; import org.apache.celeborn.common.util.Utils; import org.apache.celeborn.service.deploy.worker.WorkerSource; @@ -53,6 +54,7 @@ public class PartitionFilesSorterSuiteJ { private long originFileLen; private FileWriter fileWriter; private long sortTimeout = 16 * 1000; + private UserIdentifier userIdentifier = new UserIdentifier("mock-tenantId", "mock-name"); public void prepare(boolean largefile) throws IOException { byte[] batchHeader = new byte[16]; @@ -60,7 +62,7 @@ public class PartitionFilesSorterSuiteJ { shuffleFile = File.createTempFile("RSS", "sort-suite"); originFileName = shuffleFile.getAbsolutePath(); - fileInfo = new FileInfo(shuffleFile); + fileInfo = new FileInfo(shuffleFile, userIdentifier); FileOutputStream fileOutputStream = new FileOutputStream(shuffleFile); FileChannel channel = fileOutputStream.getChannel(); Map batchIds = new HashMap<>();