From 30d4323cdbb47b2c7fdf994bf25ec8a09d4e249d Mon Sep 17 00:00:00 2001 From: Ethan Feng Date: Fri, 23 Sep 2022 18:38:52 +0800 Subject: [PATCH] [FEATURE] Add a configuration to enable a map id filter mechanism. #662 (#663) --- .../emr/rss/client/read/RssInputStream.java | 5 ++++ .../rss/client/write/LifecycleManager.scala | 4 ++- .../common/protocol/PartitionLocation.java | 2 +- common/src/main/proto/TransportMessages.proto | 1 + .../com/aliyun/emr/rss/common/RssConf.scala | 4 +++ .../protocol/message/ControlMessages.scala | 10 ++++--- .../aliyun/emr/rss/common/util/Utils.scala | 8 +++--- docs/configuration.md | 1 + .../deploy/worker/storage/FileWriter.java | 27 +++++++++++++------ .../service/deploy/worker/Controller.scala | 19 ++++++++----- .../worker/storage/StorageManager.scala | 9 ++++--- .../worker/storage/FileWriterSuiteJ.java | 9 ++++--- 12 files changed, 71 insertions(+), 28 deletions(-) diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java index 9b644354f..0000642b1 100644 --- a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java +++ b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java @@ -114,6 +114,7 @@ public abstract class RssInputStream extends InputStream { private final int BATCH_HEADER_SIZE = 4 * 4; private final byte[] sizeBuf = new byte[BATCH_HEADER_SIZE]; private LongAdder skipCount = new LongAdder(); + private final boolean rangeReadFilter; RssInputStreamImpl( RssConf conf, @@ -133,6 +134,7 @@ public abstract class RssInputStream extends InputStream { this.attemptNumber = attemptNumber; this.startMapIndex = startMapIndex; this.endMapIndex = endMapIndex; + this.rangeReadFilter = RssConf.rangeReadFilterEnabled(conf); int headerLen = Decompressor.getCompressionHeaderLength(conf); int blockSize = RssConf.pushDataBufferSize(conf) + headerLen; @@ -145,6 +147,9 @@ public abstract class RssInputStream extends InputStream { } private boolean skipLocation(int startMapIndex, int endMapIndex, PartitionLocation location) { + if (!rangeReadFilter) { + return false; + } if (endMapIndex == Integer.MAX_VALUE) { return false; } diff --git a/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala b/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala index ebf5b6919..3a59994b2 100644 --- a/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala +++ b/client/src/main/scala/com/aliyun/emr/rss/client/write/LifecycleManager.scala @@ -50,6 +50,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit private val splitThreshold = RssConf.partitionSplitThreshold(conf) private val splitMode = RssConf.partitionSplitMode(conf) private val partitionType = RssConf.partitionType(conf) + private val rangeReadFilter = RssConf.rangeReadFilterEnabled(conf) private val unregisterShuffleTime = new ConcurrentHashMap[Int, Long]() private val stageEndTimeout = RssConf.stageEndTimeout(conf) @@ -907,7 +908,8 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit slaveLocations, splitThreshold, splitMode, - partitionType)) + partitionType, + rangeReadFilter)) if (res.status.equals(StatusCode.SUCCESS)) { logDebug(s"Successfully allocated " + s"partitions buffer for ${Utils.makeShuffleKey(applicationId, shuffleId)}" + diff --git a/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java b/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java index 68fad3d6b..e22bcbefa 100644 --- a/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java +++ b/common/src/main/java/com/aliyun/emr/rss/common/protocol/PartitionLocation.java @@ -59,7 +59,7 @@ public class PartitionLocation implements Serializable { private Mode mode; private PartitionLocation peer; private StorageInfo storageInfo = new StorageInfo(); - private RoaringBitmap mapIdBitMap = new RoaringBitmap(); + private RoaringBitmap mapIdBitMap = null; public PartitionLocation(PartitionLocation loc) { this.id = loc.id; diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 5e5da530b..237cc93c4 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -301,6 +301,7 @@ message PbReserveSlots { int64 splitThreshold = 5; int32 splitMode = 6; int32 partitionType = 7; + bool rangeReadFilter = 8; } message PbReserveSlotsResponse { diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala index 19eec1054..d8474d093 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala @@ -972,6 +972,10 @@ object RssConf extends Logging { conf.getInt("rss.worker.hdfs.flusher.thread.count", 4) } + def rangeReadFilterEnabled(conf: RssConf): Boolean = { + conf.getBoolean("rss.range.read.filter.enabled", false) + } + // If we want to use multi-raft group we can // add "rss.ha.service.ids" each for one raft group val HA_SERVICE_ID_KEY = "rss.ha.service.id" diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala b/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala index 5abb1fc24..d06ee084b 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala @@ -355,7 +355,8 @@ sealed trait Message extends Serializable { slaveLocations, splitThreshold, splitMode, - partType) => + partType, + rangeReadFilter) => val payload = PbReserveSlots.newBuilder() .setApplicationId(applicationId) .setShuffleId(shuffleId) @@ -366,6 +367,7 @@ sealed trait Message extends Serializable { .setSplitThreshold(splitThreshold) .setSplitMode(splitMode.getValue) .setPartitionType(partType.getValue) + .setRangeReadFilter(rangeReadFilter) .build().toByteArray new TransportMessage(MessageType.RESERVE_SLOTS, payload) @@ -675,7 +677,8 @@ object ControlMessages extends Logging { slaveLocations: util.List[PartitionLocation], splitThreshold: Long, splitMode: PartitionSplitMode, - partitionType: PartitionType) + partitionType: PartitionType, + rangeReadFilter: Boolean) extends WorkerMessage case class ReserveSlotsResponse( @@ -988,7 +991,8 @@ object ControlMessages extends Logging { .map(PartitionLocation.fromPbPartitionLocation(_)).toList.asJava), pbReserveSlots.getSplitThreshold, Utils.toShuffleSplitMode(pbReserveSlots.getSplitMode), - Utils.toPartitionType(pbReserveSlots.getPartitionType)) + Utils.toPartitionType(pbReserveSlots.getPartitionType), + pbReserveSlots.getRangeReadFilter) case RESERVE_SLOTS_RESPONSE => val pbReserveSlotsResponse = PbReserveSlotsResponse.parseFrom(message.getPayload) diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/util/Utils.scala b/common/src/main/scala/com/aliyun/emr/rss/common/util/Utils.scala index db59f819c..9eab798d9 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/util/Utils.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/util/Utils.scala @@ -880,7 +880,7 @@ object Utils extends Logging { } def roaringBitmapToByteString(roaringBitMap: RoaringBitmap): ByteString = { - if (!roaringBitMap.isEmpty) { + if (roaringBitMap != null && !roaringBitMap.isEmpty) { val buf = ByteBuffer.allocate(roaringBitMap.serializedSizeInBytes()) roaringBitMap.serialize(buf) buf.rewind() @@ -891,13 +891,15 @@ object Utils extends Logging { } def byteStringToRoaringBitmap(bytes: ByteString): RoaringBitmap = { - val roaringBitmap = new RoaringBitmap() if (!bytes.isEmpty) { + val roaringBitmap = new RoaringBitmap() val buf = bytes.asReadOnlyByteBuffer() buf.rewind() roaringBitmap.deserialize(buf) + roaringBitmap + } else { + null } - roaringBitmap } } diff --git a/docs/configuration.md b/docs/configuration.md index dc90645b1..26cea0c75 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -65,6 +65,7 @@ memory. Empirically, RSS worker off-heap memory should be set to `(numDirs * bu | spark.rss.client.compression.codec | lz4 | The codec used to compress shuffle data. By default, RSS provides two codecs: `lz4` and `zstd`. | | spark.rss.client.compression.zstd.level | 1 | Compression level for Zstd compression codec, its value should be an integer between -5 and 22. Increasing the compression level will result in better compression at the expense of more CPU and memory. | | spark.rss.identity.provider | `com.aliyun.emr.rss.common.identity.DefaultIdentityProvider` | Identity provider class name. Default value use `DefaultIdentityProvider`, return `UserIdentifier` with default tenant id and username from `UsergroupInformation`. | +| spark.rss.range.read.filter.enabled | false | If a spark application have skewed partition, this value can set to true to improve performance. | ### RSS Master Configurations diff --git a/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriter.java b/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriter.java index efc772340..a93c3b468 100644 --- a/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriter.java +++ b/server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriter.java @@ -76,10 +76,11 @@ public final class FileWriter implements DeviceObserver { private long splitThreshold = 0; private final PartitionSplitMode splitMode; private final PartitionType partitionType; + private final boolean rangeReadFilter; private Runnable destroyHook; private boolean deleted = false; - private RoaringBitmap mapIdBitMap = new RoaringBitmap(); + private RoaringBitmap mapIdBitMap = null; @Override public void notifyError(String mountPoint, DiskStatus diskStatus) { @@ -100,7 +101,8 @@ public final class FileWriter implements DeviceObserver { DeviceMonitor deviceMonitor, long splitThreshold, PartitionSplitMode splitMode, - PartitionType partitionType) + PartitionType partitionType, + boolean rangeReadFilter) throws IOException { this.fileInfo = fileInfo; this.flusher = flusher; @@ -113,6 +115,7 @@ public final class FileWriter implements DeviceObserver { this.deviceMonitor = deviceMonitor; this.splitMode = splitMode; this.partitionType = partitionType; + this.rangeReadFilter = rangeReadFilter; if (!fileInfo.isHdfs()) { channel = new FileOutputStream(fileInfo.getFilePath()).getChannel(); } else { @@ -120,6 +123,9 @@ public final class FileWriter implements DeviceObserver { } source = workerSource; logger.debug("FileWriter {} split threshold {} mode {}", this, splitThreshold, splitMode); + if (rangeReadFilter) { + this.mapIdBitMap = new RoaringBitmap(); + } takeBuffer(); } @@ -190,16 +196,21 @@ public final class FileWriter implements DeviceObserver { return; } - byte[] header = new byte[16]; - data.markReaderIndex(); - data.readBytes(header); - data.resetReaderIndex(); - int mapId = Platform.getInt(header, Platform.BYTE_ARRAY_OFFSET); + int mapId = 0; + if (rangeReadFilter) { + byte[] header = new byte[16]; + data.markReaderIndex(); + data.readBytes(header); + data.resetReaderIndex(); + mapId = Platform.getInt(header, Platform.BYTE_ARRAY_OFFSET); + } final int numBytes = data.readableBytes(); MemoryTracker.instance().incrementDiskBuffer(numBytes); synchronized (this) { - mapIdBitMap.add(mapId); + if (rangeReadFilter) { + mapIdBitMap.add(mapId); + } if (flushBuffer.readableBytes() != 0 && flushBuffer.readableBytes() + numBytes >= this.flushBufferSize) { flush(false); diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Controller.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Controller.scala index 28a6c52c8..550dfac1f 100644 --- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Controller.scala +++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Controller.scala @@ -76,7 +76,8 @@ private[deploy] class Controller( slaveLocations, splitThreshold, splitMode, - partitionType) => + partitionType, + rangeReadFilter) => val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId) workerSource.sample(WorkerSource.ReserveSlotsTime, shuffleKey) { logDebug(s"Received ReserveSlots request, $shuffleKey, " + @@ -90,7 +91,8 @@ private[deploy] class Controller( slaveLocations, splitThreshold, splitMode, - partitionType) + partitionType, + rangeReadFilter) logDebug(s"ReserveSlots for $shuffleKey finished.") } @@ -124,7 +126,8 @@ private[deploy] class Controller( slaveLocations: jList[PartitionLocation], splitThreshold: Long, splitMode: PartitionSplitMode, - partitionType: PartitionType): Unit = { + partitionType: PartitionType, + rangeReadFileter: Boolean): Unit = { val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId) if (shutdown.get()) { val msg = "Current worker is shutting down!" @@ -149,7 +152,8 @@ private[deploy] class Controller( location, splitThreshold, splitMode, - partitionType) + partitionType, + rangeReadFileter) masterPartitions.add(new WorkingPartition(location, writer)) } } catch { @@ -174,7 +178,8 @@ private[deploy] class Controller( location, splitThreshold, splitMode, - partitionType) + partitionType, + rangeReadFileter) slavePartitions.add(new WorkingPartition(location, writer)) } } catch { @@ -238,7 +243,9 @@ private[deploy] class Controller( if (bytes > 0L) { if (fileWriter.getStorageInfo != null) { committedStorageInfos.put(uniqueId, fileWriter.getStorageInfo) - committedMapIdBitMap.put(uniqueId, fileWriter.getMapIdBitMap) + if (fileWriter.getMapIdBitMap != null) { + committedMapIdBitMap.put(uniqueId, fileWriter.getMapIdBitMap) + } } if (bytes >= minimumPartitionSizeForEstimation) { partitionSizeList.add(bytes) diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/storage/StorageManager.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/storage/StorageManager.scala index 3b7f31836..0055efd74 100644 --- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/storage/StorageManager.scala +++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/storage/StorageManager.scala @@ -245,7 +245,8 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract location: PartitionLocation, splitThreshold: Long, splitMode: PartitionSplitMode, - partitionType: PartitionType): FileWriter = { + partitionType: PartitionType, + rangeReadFilter: Boolean): FileWriter = { if (healthyWorkingDirs().size <= 0) { throw new IOException("No available working dirs!") } @@ -281,7 +282,8 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract deviceMonitor, splitThreshold, splitMode, - partitionType) + partitionType, + rangeReadFilter) fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, fileInfo) hdfsWriters.synchronized { hdfsWriters.add(hdfsWriter) @@ -309,7 +311,8 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract deviceMonitor, splitThreshold, splitMode, - partitionType) + partitionType, + rangeReadFilter) deviceMonitor.registerFileWriter(fileWriter) val list = workingDirWriters.computeIfAbsent(dir, workingDirWriterListFunc) list.synchronized { diff --git a/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriterSuiteJ.java b/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriterSuiteJ.java index 040ee8a3e..4f8f3102f 100644 --- a/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriterSuiteJ.java +++ b/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriterSuiteJ.java @@ -247,7 +247,8 @@ public class FileWriterSuiteJ { DeviceMonitor$.MODULE$.EmptyMonitor(), SPLIT_THRESHOLD, splitMode, - partitionType); + partitionType, + false); List> futures = new ArrayList<>(); ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, "FileWriter-UT-1"); @@ -291,7 +292,8 @@ public class FileWriterSuiteJ { DeviceMonitor$.MODULE$.EmptyMonitor(), SPLIT_THRESHOLD, splitMode, - partitionType); + partitionType, + false); List> futures = new ArrayList<>(); ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, "FileWriter-UT-2"); @@ -344,7 +346,8 @@ public class FileWriterSuiteJ { DeviceMonitor$.MODULE$.EmptyMonitor(), SPLIT_THRESHOLD, splitMode, - partitionType); + partitionType, + false); List> futures = new ArrayList<>(); ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, "FileWriter-UT-2");