[FEATURE] Add a configuration to enable a map id filter mechanism. #662 (#663)

This commit is contained in:
Ethan Feng 2022-09-23 18:38:52 +08:00 committed by GitHub
parent 4a7a7d42b5
commit 30d4323cdb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 71 additions and 28 deletions

View File

@ -114,6 +114,7 @@ public abstract class RssInputStream extends InputStream {
private final int BATCH_HEADER_SIZE = 4 * 4;
private final byte[] sizeBuf = new byte[BATCH_HEADER_SIZE];
private LongAdder skipCount = new LongAdder();
private final boolean rangeReadFilter;
RssInputStreamImpl(
RssConf conf,
@ -133,6 +134,7 @@ public abstract class RssInputStream extends InputStream {
this.attemptNumber = attemptNumber;
this.startMapIndex = startMapIndex;
this.endMapIndex = endMapIndex;
this.rangeReadFilter = RssConf.rangeReadFilterEnabled(conf);
int headerLen = Decompressor.getCompressionHeaderLength(conf);
int blockSize = RssConf.pushDataBufferSize(conf) + headerLen;
@ -145,6 +147,9 @@ public abstract class RssInputStream extends InputStream {
}
private boolean skipLocation(int startMapIndex, int endMapIndex, PartitionLocation location) {
if (!rangeReadFilter) {
return false;
}
if (endMapIndex == Integer.MAX_VALUE) {
return false;
}

View File

@ -50,6 +50,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
private val splitThreshold = RssConf.partitionSplitThreshold(conf)
private val splitMode = RssConf.partitionSplitMode(conf)
private val partitionType = RssConf.partitionType(conf)
private val rangeReadFilter = RssConf.rangeReadFilterEnabled(conf)
private val unregisterShuffleTime = new ConcurrentHashMap[Int, Long]()
private val stageEndTimeout = RssConf.stageEndTimeout(conf)
@ -907,7 +908,8 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
slaveLocations,
splitThreshold,
splitMode,
partitionType))
partitionType,
rangeReadFilter))
if (res.status.equals(StatusCode.SUCCESS)) {
logDebug(s"Successfully allocated " +
s"partitions buffer for ${Utils.makeShuffleKey(applicationId, shuffleId)}" +

View File

@ -59,7 +59,7 @@ public class PartitionLocation implements Serializable {
private Mode mode;
private PartitionLocation peer;
private StorageInfo storageInfo = new StorageInfo();
private RoaringBitmap mapIdBitMap = new RoaringBitmap();
private RoaringBitmap mapIdBitMap = null;
public PartitionLocation(PartitionLocation loc) {
this.id = loc.id;

View File

@ -301,6 +301,7 @@ message PbReserveSlots {
int64 splitThreshold = 5;
int32 splitMode = 6;
int32 partitionType = 7;
bool rangeReadFilter = 8;
}
message PbReserveSlotsResponse {

View File

@ -972,6 +972,10 @@ object RssConf extends Logging {
conf.getInt("rss.worker.hdfs.flusher.thread.count", 4)
}
def rangeReadFilterEnabled(conf: RssConf): Boolean = {
conf.getBoolean("rss.range.read.filter.enabled", false)
}
// If we want to use multi-raft group we can
// add "rss.ha.service.ids" each for one raft group
val HA_SERVICE_ID_KEY = "rss.ha.service.id"

View File

@ -355,7 +355,8 @@ sealed trait Message extends Serializable {
slaveLocations,
splitThreshold,
splitMode,
partType) =>
partType,
rangeReadFilter) =>
val payload = PbReserveSlots.newBuilder()
.setApplicationId(applicationId)
.setShuffleId(shuffleId)
@ -366,6 +367,7 @@ sealed trait Message extends Serializable {
.setSplitThreshold(splitThreshold)
.setSplitMode(splitMode.getValue)
.setPartitionType(partType.getValue)
.setRangeReadFilter(rangeReadFilter)
.build().toByteArray
new TransportMessage(MessageType.RESERVE_SLOTS, payload)
@ -675,7 +677,8 @@ object ControlMessages extends Logging {
slaveLocations: util.List[PartitionLocation],
splitThreshold: Long,
splitMode: PartitionSplitMode,
partitionType: PartitionType)
partitionType: PartitionType,
rangeReadFilter: Boolean)
extends WorkerMessage
case class ReserveSlotsResponse(
@ -988,7 +991,8 @@ object ControlMessages extends Logging {
.map(PartitionLocation.fromPbPartitionLocation(_)).toList.asJava),
pbReserveSlots.getSplitThreshold,
Utils.toShuffleSplitMode(pbReserveSlots.getSplitMode),
Utils.toPartitionType(pbReserveSlots.getPartitionType))
Utils.toPartitionType(pbReserveSlots.getPartitionType),
pbReserveSlots.getRangeReadFilter)
case RESERVE_SLOTS_RESPONSE =>
val pbReserveSlotsResponse = PbReserveSlotsResponse.parseFrom(message.getPayload)

View File

@ -880,7 +880,7 @@ object Utils extends Logging {
}
def roaringBitmapToByteString(roaringBitMap: RoaringBitmap): ByteString = {
if (!roaringBitMap.isEmpty) {
if (roaringBitMap != null && !roaringBitMap.isEmpty) {
val buf = ByteBuffer.allocate(roaringBitMap.serializedSizeInBytes())
roaringBitMap.serialize(buf)
buf.rewind()
@ -891,13 +891,15 @@ object Utils extends Logging {
}
def byteStringToRoaringBitmap(bytes: ByteString): RoaringBitmap = {
val roaringBitmap = new RoaringBitmap()
if (!bytes.isEmpty) {
val roaringBitmap = new RoaringBitmap()
val buf = bytes.asReadOnlyByteBuffer()
buf.rewind()
roaringBitmap.deserialize(buf)
roaringBitmap
} else {
null
}
roaringBitmap
}
}

View File

@ -65,6 +65,7 @@ memory. Empirically, RSS worker off-heap memory should be set to `(numDirs * bu
| spark.rss.client.compression.codec | lz4 | The codec used to compress shuffle data. By default, RSS provides two codecs: `lz4` and `zstd`. |
| spark.rss.client.compression.zstd.level | 1 | Compression level for Zstd compression codec, its value should be an integer between -5 and 22. Increasing the compression level will result in better compression at the expense of more CPU and memory. |
| spark.rss.identity.provider | `com.aliyun.emr.rss.common.identity.DefaultIdentityProvider` | Identity provider class name. Default value use `DefaultIdentityProvider`, return `UserIdentifier` with default tenant id and username from `UsergroupInformation`. |
| spark.rss.range.read.filter.enabled | false | If a spark application have skewed partition, this value can set to true to improve performance. |
### RSS Master Configurations

View File

@ -76,10 +76,11 @@ public final class FileWriter implements DeviceObserver {
private long splitThreshold = 0;
private final PartitionSplitMode splitMode;
private final PartitionType partitionType;
private final boolean rangeReadFilter;
private Runnable destroyHook;
private boolean deleted = false;
private RoaringBitmap mapIdBitMap = new RoaringBitmap();
private RoaringBitmap mapIdBitMap = null;
@Override
public void notifyError(String mountPoint, DiskStatus diskStatus) {
@ -100,7 +101,8 @@ public final class FileWriter implements DeviceObserver {
DeviceMonitor deviceMonitor,
long splitThreshold,
PartitionSplitMode splitMode,
PartitionType partitionType)
PartitionType partitionType,
boolean rangeReadFilter)
throws IOException {
this.fileInfo = fileInfo;
this.flusher = flusher;
@ -113,6 +115,7 @@ public final class FileWriter implements DeviceObserver {
this.deviceMonitor = deviceMonitor;
this.splitMode = splitMode;
this.partitionType = partitionType;
this.rangeReadFilter = rangeReadFilter;
if (!fileInfo.isHdfs()) {
channel = new FileOutputStream(fileInfo.getFilePath()).getChannel();
} else {
@ -120,6 +123,9 @@ public final class FileWriter implements DeviceObserver {
}
source = workerSource;
logger.debug("FileWriter {} split threshold {} mode {}", this, splitThreshold, splitMode);
if (rangeReadFilter) {
this.mapIdBitMap = new RoaringBitmap();
}
takeBuffer();
}
@ -190,16 +196,21 @@ public final class FileWriter implements DeviceObserver {
return;
}
byte[] header = new byte[16];
data.markReaderIndex();
data.readBytes(header);
data.resetReaderIndex();
int mapId = Platform.getInt(header, Platform.BYTE_ARRAY_OFFSET);
int mapId = 0;
if (rangeReadFilter) {
byte[] header = new byte[16];
data.markReaderIndex();
data.readBytes(header);
data.resetReaderIndex();
mapId = Platform.getInt(header, Platform.BYTE_ARRAY_OFFSET);
}
final int numBytes = data.readableBytes();
MemoryTracker.instance().incrementDiskBuffer(numBytes);
synchronized (this) {
mapIdBitMap.add(mapId);
if (rangeReadFilter) {
mapIdBitMap.add(mapId);
}
if (flushBuffer.readableBytes() != 0
&& flushBuffer.readableBytes() + numBytes >= this.flushBufferSize) {
flush(false);

View File

@ -76,7 +76,8 @@ private[deploy] class Controller(
slaveLocations,
splitThreshold,
splitMode,
partitionType) =>
partitionType,
rangeReadFilter) =>
val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId)
workerSource.sample(WorkerSource.ReserveSlotsTime, shuffleKey) {
logDebug(s"Received ReserveSlots request, $shuffleKey, " +
@ -90,7 +91,8 @@ private[deploy] class Controller(
slaveLocations,
splitThreshold,
splitMode,
partitionType)
partitionType,
rangeReadFilter)
logDebug(s"ReserveSlots for $shuffleKey finished.")
}
@ -124,7 +126,8 @@ private[deploy] class Controller(
slaveLocations: jList[PartitionLocation],
splitThreshold: Long,
splitMode: PartitionSplitMode,
partitionType: PartitionType): Unit = {
partitionType: PartitionType,
rangeReadFileter: Boolean): Unit = {
val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId)
if (shutdown.get()) {
val msg = "Current worker is shutting down!"
@ -149,7 +152,8 @@ private[deploy] class Controller(
location,
splitThreshold,
splitMode,
partitionType)
partitionType,
rangeReadFileter)
masterPartitions.add(new WorkingPartition(location, writer))
}
} catch {
@ -174,7 +178,8 @@ private[deploy] class Controller(
location,
splitThreshold,
splitMode,
partitionType)
partitionType,
rangeReadFileter)
slavePartitions.add(new WorkingPartition(location, writer))
}
} catch {
@ -238,7 +243,9 @@ private[deploy] class Controller(
if (bytes > 0L) {
if (fileWriter.getStorageInfo != null) {
committedStorageInfos.put(uniqueId, fileWriter.getStorageInfo)
committedMapIdBitMap.put(uniqueId, fileWriter.getMapIdBitMap)
if (fileWriter.getMapIdBitMap != null) {
committedMapIdBitMap.put(uniqueId, fileWriter.getMapIdBitMap)
}
}
if (bytes >= minimumPartitionSizeForEstimation) {
partitionSizeList.add(bytes)

View File

@ -245,7 +245,8 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
location: PartitionLocation,
splitThreshold: Long,
splitMode: PartitionSplitMode,
partitionType: PartitionType): FileWriter = {
partitionType: PartitionType,
rangeReadFilter: Boolean): FileWriter = {
if (healthyWorkingDirs().size <= 0) {
throw new IOException("No available working dirs!")
}
@ -281,7 +282,8 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
deviceMonitor,
splitThreshold,
splitMode,
partitionType)
partitionType,
rangeReadFilter)
fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, fileInfo)
hdfsWriters.synchronized {
hdfsWriters.add(hdfsWriter)
@ -309,7 +311,8 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
deviceMonitor,
splitThreshold,
splitMode,
partitionType)
partitionType,
rangeReadFilter)
deviceMonitor.registerFileWriter(fileWriter)
val list = workingDirWriters.computeIfAbsent(dir, workingDirWriterListFunc)
list.synchronized {

View File

@ -247,7 +247,8 @@ public class FileWriterSuiteJ {
DeviceMonitor$.MODULE$.EmptyMonitor(),
SPLIT_THRESHOLD,
splitMode,
partitionType);
partitionType,
false);
List<Future<?>> futures = new ArrayList<>();
ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, "FileWriter-UT-1");
@ -291,7 +292,8 @@ public class FileWriterSuiteJ {
DeviceMonitor$.MODULE$.EmptyMonitor(),
SPLIT_THRESHOLD,
splitMode,
partitionType);
partitionType,
false);
List<Future<?>> futures = new ArrayList<>();
ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, "FileWriter-UT-2");
@ -344,7 +346,8 @@ public class FileWriterSuiteJ {
DeviceMonitor$.MODULE$.EmptyMonitor(),
SPLIT_THRESHOLD,
splitMode,
partitionType);
partitionType,
false);
List<Future<?>> futures = new ArrayList<>();
ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, "FileWriter-UT-2");