[ISSUE-750][Refactor] Add UserIdentifier as a field of file info (#759)

This commit is contained in:
nafiy 2022-10-13 23:15:44 +08:00 committed by GitHub
parent 5829bda21a
commit 373b4a744a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 81 additions and 117 deletions

View File

@ -27,28 +27,33 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.celeborn.common.protocol.message.ControlMessages.UserIdentifier;
import org.apache.celeborn.common.util.Utils;
public class FileInfo {
private final String filePath;
private final List<Long> chunkOffsets;
private final UserIdentifier userIdentifier;
public FileInfo(String filePath, List<Long> chunkOffsets) {
public FileInfo(String filePath, List<Long> chunkOffsets, UserIdentifier userIdentifier) {
this.filePath = filePath;
this.chunkOffsets = chunkOffsets;
this.userIdentifier = userIdentifier;
}
public FileInfo(String filePath) {
public FileInfo(String filePath, UserIdentifier userIdentifier) {
this.filePath = filePath;
this.chunkOffsets = new ArrayList<>();
chunkOffsets.add(0L);
this.userIdentifier = userIdentifier;
}
@VisibleForTesting
public FileInfo(File file) {
public FileInfo(File file, UserIdentifier userIdentifier) {
this.filePath = file.getAbsolutePath();
this.chunkOffsets = new ArrayList<>();
chunkOffsets.add(0L);
this.userIdentifier = userIdentifier;
}
public synchronized void addChunkOffset(long bytesFlushed) {
@ -107,6 +112,10 @@ public class FileInfo {
return new Path(Utils.getWriteSuccessFilePath(Utils.getPeerPath(filePath)));
}
public UserIdentifier getUserIdentifier() {
return userIdentifier;
}
public void deleteAllFiles(FileSystem hdfsFs) throws IOException {
if (isHdfs()) {
hdfsFs.delete(getHdfsPath(), false);
@ -135,6 +144,8 @@ public class FileInfo {
+ filePath
+ ", chunkOffsets="
+ StringUtils.join(this.chunkOffsets, ",")
+ ", userIdentifier="
+ userIdentifier.toString()
+ '}';
}
}

View File

@ -27,7 +27,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.FileInfo;
import org.apache.celeborn.common.protocol.*;
import org.apache.celeborn.common.protocol.message.ControlMessages.ResourceConsumption;
import org.apache.celeborn.common.protocol.message.ControlMessages.*;
public class PbSerDeUtils {
public static Set<String> fromPbSortedShuffleFileSet(byte[] data)
@ -81,8 +81,13 @@ public class PbSerDeUtils {
public static FileInfo fromPbFileInfo(PbFileInfo pbFileInfo)
throws InvalidProtocolBufferException {
PbUserIdentifier pbUserIdentifier = pbFileInfo.getUserIdentifier();
UserIdentifier userIdentifier =
new UserIdentifier(pbUserIdentifier.getTenantId(), pbUserIdentifier.getName());
return new FileInfo(
pbFileInfo.getFilePath(), new ArrayList<>(pbFileInfo.getChunkOffsetsList()));
pbFileInfo.getFilePath(),
new ArrayList<>(pbFileInfo.getChunkOffsetsList()),
userIdentifier);
}
public static PbFileInfo toPbFileInfo(FileInfo fileInfo) {
@ -111,19 +116,6 @@ public class PbSerDeUtils {
return builder.build().toByteArray();
}
public static Set<String> fromPbShuffleKeySet(byte[] data) throws InvalidProtocolBufferException {
PbShuffleKeySet pbShuffleKeySet = PbShuffleKeySet.parseFrom(data);
Set<String> shuffleKeySet = ConcurrentHashMap.newKeySet();
shuffleKeySet.addAll(pbShuffleKeySet.getShuffleKeysList());
return shuffleKeySet;
}
public static byte[] toPbShuffleKeySet(Set<String> shuffleKeySet) {
PbShuffleKeySet.Builder builder = PbShuffleKeySet.newBuilder();
builder.addAllShuffleKeys(shuffleKeySet);
return builder.build().toByteArray();
}
public static ResourceConsumption fromPbResourceConsumption(
PbResourceConsumption pbResourceConsumption) throws InvalidProtocolBufferException {
return new ResourceConsumption(

View File

@ -397,6 +397,7 @@ message PbStoreVersion {
message PbFileInfo {
string filePath = 1;
repeated int64 chunkOffsets = 2;
PbUserIdentifier userIdentifier = 3;
}
message PbFileInfoMap {
@ -414,7 +415,3 @@ message PbResourceConsumption {
int64 hdfsBytesWritten = 3;
int64 hdfsFileCount = 4;
}
message PbShuffleKeySet {
repeated string shuffleKeys = 1;
}

View File

@ -19,11 +19,8 @@ package org.apache.celeborn.service.deploy.worker;
import java.nio.charset.StandardCharsets;
import org.apache.celeborn.common.protocol.message.ControlMessages.*;
public abstract class ShuffleRecoverHelper {
protected String SHUFFLE_KEY_PREFIX = "SHUFFLE-KEY";
protected String USER_IDENTIFIER_PREFIX = "USER-IDENTIFIER";
protected LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider.StoreVersion(1, 0);
protected byte[] dbShuffleKey(String shuffleKey) {
@ -36,17 +33,4 @@ public abstract class ShuffleRecoverHelper {
}
return s.substring(SHUFFLE_KEY_PREFIX.length() + 1);
}
protected byte[] dbUserIdentifier(UserIdentifier userIdentifier) {
return (USER_IDENTIFIER_PREFIX + ";" + userIdentifier.toString())
.getBytes(StandardCharsets.UTF_8);
}
protected UserIdentifier parseDbUserIdentifier(String s) {
if (!s.startsWith(USER_IDENTIFIER_PREFIX)) {
throw new IllegalArgumentException(
"Expected a string starting with " + USER_IDENTIFIER_PREFIX);
}
return UserIdentifier$.MODULE$.apply(s.substring(USER_IDENTIFIER_PREFIX.length() + 1));
}
}

View File

@ -53,6 +53,7 @@ import org.apache.celeborn.common.RssConf;
import org.apache.celeborn.common.meta.FileInfo;
import org.apache.celeborn.common.metrics.source.AbstractSource;
import org.apache.celeborn.common.network.server.MemoryTracker;
import org.apache.celeborn.common.protocol.message.ControlMessages.UserIdentifier;
import org.apache.celeborn.common.unsafe.Platform;
import org.apache.celeborn.common.util.PbSerDeUtils;
import org.apache.celeborn.common.util.ShuffleBlockInfoUtils;
@ -168,6 +169,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
return fileInfo;
} else {
String fileId = shuffleKey + "-" + fileName;
UserIdentifier userIdentifier = fileInfo.getUserIdentifier();
Set<String> sorted =
sortedShuffleFiles.computeIfAbsent(shuffleKey, v -> ConcurrentHashMap.newKeySet());
@ -180,7 +182,13 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
synchronized (sorting) {
if (sorted.contains(fileId)) {
return resolve(
shuffleKey, fileId, sortedFilePath, indexFilePath, startMapIndex, endMapIndex);
shuffleKey,
fileId,
userIdentifier,
sortedFilePath,
indexFilePath,
startMapIndex,
endMapIndex);
}
if (!sorting.contains(fileId)) {
try {
@ -222,7 +230,14 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
}
}
return resolve(shuffleKey, fileId, sortedFilePath, indexFilePath, startMapIndex, endMapIndex);
return resolve(
shuffleKey,
fileId,
userIdentifier,
sortedFilePath,
indexFilePath,
startMapIndex,
endMapIndex);
}
}
@ -436,6 +451,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
public FileInfo resolve(
String shuffleKey,
String fileId,
UserIdentifier userIdentifier,
String sortedFilePath,
String indexFilePath,
int startMapIndex,
@ -481,7 +497,8 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
return new FileInfo(
sortedFilePath,
ShuffleBlockInfoUtils.getChunkOffsetsFromShuffleBlockInfos(
startMapIndex, endMapIndex, fetchChunkSize, indexMap));
startMapIndex, endMapIndex, fetchChunkSize, indexMap),
userIdentifier);
}
class FileSorter {

View File

@ -50,8 +50,6 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
extends ShuffleRecoverHelper with DeviceObserver with Logging with MemoryTrackerListener {
// mount point -> filewriter
val workingDirWriters = new ConcurrentHashMap[File, util.ArrayList[FileWriter]]()
// userIdentifier -> shuffleKey
val userShuffleKeys = new ConcurrentHashMap[UserIdentifier, util.Set[String]]()
val (deviceInfos, diskInfos) = {
val workingDirInfos =
@ -169,9 +167,6 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
private val fileInfos =
new ConcurrentHashMap[String, ConcurrentHashMap[String, FileInfo]]()
private val RECOVERY_FILE_NAME = "recovery.ldb"
// sequences of store and recover are in reverse order
// store sequence : userShuffleKeys -> fileInfos
// recover sequence: fileInfos -> userShuffleKeys
private var db: DB = null
// ShuffleClient can fetch data from a restarted worker only
// when the worker's fetching port is stable.
@ -180,7 +175,6 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
val recoverFile = new File(RssConf.workerRecoverPath(conf), RECOVERY_FILE_NAME)
this.db = LevelDBProvider.initLevelDB(recoverFile, CURRENT_VERSION)
reloadAndCleanFileInfos(this.db)
reloadAndCleanUserShuffleKeys(this.db)
} catch {
case e: Exception =>
logError("Init level DB failed:", e)
@ -215,6 +209,8 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
case exception: Exception =>
logError(s"Reload DB: ${shuffleKey} failed.", exception)
}
} else {
return
}
}
}
@ -232,41 +228,6 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
}
}
private def reloadAndCleanUserShuffleKeys(db: DB): Unit = {
if (db != null) {
val itr = db.iterator
itr.seek(USER_IDENTIFIER_PREFIX.getBytes(StandardCharsets.UTF_8))
while (itr.hasNext) {
val entry = itr.next
val key = new String(entry.getKey, StandardCharsets.UTF_8)
if (key.startsWith(USER_IDENTIFIER_PREFIX)) {
val userIdentifier = parseDbUserIdentifier(key)
try {
val shuffleKeys = PbSerDeUtils.fromPbShuffleKeySet(entry.getValue)
logDebug(s"Reload DB: ${userIdentifier}")
userShuffleKeys.put(userIdentifier, shuffleKeys)
db.delete(entry.getKey)
} catch {
case exception: Exception =>
logError(s"Reload DB: ${userIdentifier} failed.", exception)
}
}
}
}
}
def updateUserShuffleKeysInDB(): Unit = {
userShuffleKeys.asScala.foreach { case (userIdentifier, shuffleKeys) =>
try {
db.put(dbUserIdentifier(userIdentifier), PbSerDeUtils.toPbShuffleKeySet(shuffleKeys))
logDebug(s"Update UserShuffleInfos into DB: ${userIdentifier}")
} catch {
case exception: Exception =>
logError(s"Update UserShuffleInfos into DB: ${userIdentifier} failed.", exception)
}
}
}
private def getNextIndex() = counter.getAndUpdate(counterOperator)
private val newMapFunc =
@ -275,13 +236,6 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
new ConcurrentHashMap()
}
private val newSetFunc =
new java.util.function.Function[UserIdentifier, util.Set[String]]() {
override def apply(key: UserIdentifier): util.Set[String] = {
ConcurrentHashMap.newKeySet[String]()
}
}
private val workingDirWriterListFunc =
new java.util.function.Function[File, util.ArrayList[FileWriter]]() {
override def apply(t: File): util.ArrayList[FileWriter] = new util.ArrayList[FileWriter]()
@ -323,7 +277,7 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
val shuffleDir =
new Path(new Path(hdfsDir, RssConf.workingDirName(conf)), s"$appId/$shuffleId")
FileSystem.mkdirs(StorageManager.hdfsFs, shuffleDir, hdfsPermission)
val fileInfo = new FileInfo(new Path(shuffleDir, fileName).toString)
val fileInfo = new FileInfo(new Path(shuffleDir, fileName).toString, userIdentifier)
val hdfsWriter = new FileWriter(
fileInfo,
hdfsFlusher.get,
@ -339,7 +293,6 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
hdfsWriters.add(hdfsWriter)
}
hdfsWriter.registerDestroyHook(hdfsWriters)
userShuffleKeys.computeIfAbsent(userIdentifier, newSetFunc).add(shuffleKey)
return hdfsWriter
} else {
val dir = dirs(getNextIndex() % dirs.size)
@ -353,7 +306,7 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
throw new RssException("create app shuffle data dir or file failed!" +
s"${file.getAbsolutePath}")
}
val fileInfo = new FileInfo(file.getAbsolutePath)
val fileInfo = new FileInfo(file.getAbsolutePath, userIdentifier)
val fileWriter = new FileWriter(
fileInfo,
localFlushers.get(mountPoint),
@ -373,7 +326,6 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, fileInfo)
location.getStorageInfo.setMountPoint(mountPoint)
logDebug(s"location $location set disk hint to ${location.getStorageInfo} ")
userShuffleKeys.computeIfAbsent(userIdentifier, newSetFunc).add(shuffleKey)
return fileWriter
} catch {
case t: Throwable =>
@ -554,7 +506,6 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
def close(): Unit = {
if (db != null) {
try {
updateUserShuffleKeysInDB()
updateFileInfosInDB()
db.close()
} catch {
@ -627,26 +578,34 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
}
def userResourceConsumptionSnapshot(): Map[UserIdentifier, ResourceConsumption] = {
userShuffleKeys.synchronized {
userShuffleKeys.asScala.map { case (userIdentifier, shuffleKeys) =>
val resourceMetrics = shuffleKeys.asScala.map { shuffleKey =>
val userFileInfos = fileInfos.get(shuffleKey).values().asScala.toSeq
val diskFileInfos = userFileInfos.filter(!_.isHdfs)
val hdfsFileInfos = userFileInfos.filter(_.isHdfs)
val diskBytesWritten = diskFileInfos.map(_.getFileLength).sum
val diskFileCount = diskFileInfos.size
val hdfsBytesWritten = hdfsFileInfos.map(_.getFileLength).sum
val hdfsFileCount = hdfsFileInfos.size
(diskBytesWritten, diskFileCount, hdfsBytesWritten, hdfsFileCount)
fileInfos.synchronized {
// shuffleId -> (fileName -> fileInfo)
fileInfos
.asScala
.toList
.flatMap { case (_, fileInfoMaps) =>
// userIdentifier -> fileInfo
fileInfoMaps.values().asScala.map { fileInfo =>
(fileInfo.getUserIdentifier, fileInfo)
}
}
// userIdentifier -> List((userIdentifier, fileInfo))
.groupBy(_._1)
.map { case (userIdentifier, userWithFileInfoList) =>
// collect resource consumed by each user on this worker
val resourceConsumption = {
val userFileInfos = userWithFileInfoList.map(_._2)
val diskFileInfos = userFileInfos.filter(!_.isHdfs)
val hdfsFileInfos = userFileInfos.filter(_.isHdfs)
val diskBytesWritten = diskFileInfos.map(_.getFileLength).sum
val diskFileCount = diskFileInfos.size
val hdfsBytesWritten = hdfsFileInfos.map(_.getFileLength).sum
val hdfsFileCount = hdfsFileInfos.size
ResourceConsumption(diskBytesWritten, diskFileCount, hdfsBytesWritten, hdfsFileCount)
}
(userIdentifier, resourceConsumption)
}
val resourceConsumption = ResourceConsumption(
resourceMetrics.map(_._1).sum,
resourceMetrics.map(_._2).sum,
resourceMetrics.map(_._3).sum,
resourceMetrics.map(_._4).sum)
(userIdentifier, resourceConsumption)
}.toMap
}
}
}

View File

@ -74,6 +74,7 @@ import org.apache.celeborn.common.network.util.TransportConf;
import org.apache.celeborn.common.protocol.PartitionSplitMode;
import org.apache.celeborn.common.protocol.PartitionType;
import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.common.protocol.message.ControlMessages.UserIdentifier;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.service.deploy.worker.FetchHandler;
@ -96,6 +97,7 @@ public class FileWriterSuiteJ {
private static TransportClientFactory clientFactory;
private static long streamId;
private static int numChunks;
private final UserIdentifier userIdentifier = new UserIdentifier("mock-tenantId", "mock-name");
private static final TransportConf transConf =
new TransportConf("shuffle", MapConfigProvider.EMPTY);
@ -240,7 +242,7 @@ public class FileWriterSuiteJ {
File file = getTemporaryFile();
FileWriter fileWriter =
new FileWriter(
new FileInfo(file),
new FileInfo(file, userIdentifier),
localFlusher,
source,
RSS_CONF,
@ -285,7 +287,7 @@ public class FileWriterSuiteJ {
File file = getTemporaryFile();
FileWriter fileWriter =
new FileWriter(
new FileInfo(file),
new FileInfo(file, userIdentifier),
localFlusher,
source,
RSS_CONF,
@ -336,7 +338,7 @@ public class FileWriterSuiteJ {
public void testWriteAndChunkRead() throws Exception {
final int threadsNum = 8;
File file = getTemporaryFile();
FileInfo fileInfo = new FileInfo(file);
FileInfo fileInfo = new FileInfo(file, userIdentifier);
FileWriter fileWriter =
new FileWriter(
fileInfo,

View File

@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.RssConf;
import org.apache.celeborn.common.meta.FileInfo;
import org.apache.celeborn.common.network.server.MemoryTracker;
import org.apache.celeborn.common.protocol.message.ControlMessages.UserIdentifier;
import org.apache.celeborn.common.unsafe.Platform;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.service.deploy.worker.WorkerSource;
@ -53,6 +54,7 @@ public class PartitionFilesSorterSuiteJ {
private long originFileLen;
private FileWriter fileWriter;
private long sortTimeout = 16 * 1000;
private UserIdentifier userIdentifier = new UserIdentifier("mock-tenantId", "mock-name");
public void prepare(boolean largefile) throws IOException {
byte[] batchHeader = new byte[16];
@ -60,7 +62,7 @@ public class PartitionFilesSorterSuiteJ {
shuffleFile = File.createTempFile("RSS", "sort-suite");
originFileName = shuffleFile.getAbsolutePath();
fileInfo = new FileInfo(shuffleFile);
fileInfo = new FileInfo(shuffleFile, userIdentifier);
FileOutputStream fileOutputStream = new FileOutputStream(shuffleFile);
FileChannel channel = fileOutputStream.getChannel();
Map<Integer, Integer> batchIds = new HashMap<>();