From 4ee7d9eba82891a09f1b314c2ae7f63c141c19d1 Mon Sep 17 00:00:00 2001 From: Ethan Feng Date: Wed, 24 May 2023 23:15:57 +0800 Subject: [PATCH] [CELEBORN-597][FLINK] Support flink floating buffer for input gate and output gate. (#1503) --- .../AbstractRemoteShuffleInputGateFactory.java | 15 ++++++++++++--- ...tractRemoteShuffleResultPartitionFactory.java | 16 +++++++++++++--- .../celeborn/plugin/flink/config/PluginConf.java | 4 ++++ .../celeborn/plugin/flink/utils/FlinkUtils.java | 4 ++++ 4 files changed, 33 insertions(+), 6 deletions(-) diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java index c6d53b7a1..597bd8994 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java @@ -62,6 +62,8 @@ public abstract class AbstractRemoteShuffleInputGateFactory { /** Sum of buffers. */ protected final int numBuffersPerGate; + protected boolean supportFloatingBuffers; + protected CelebornConf celebornConf; public AbstractRemoteShuffleInputGateFactory( @@ -83,6 +85,9 @@ public abstract class AbstractRemoteShuffleInputGateFactory { } this.numBuffersPerGate = Utils.checkedDownCast(configuredMemorySize / networkBufferSize); + this.supportFloatingBuffers = + FlinkUtils.stringValueAsBoolean( + flinkConf, PluginConf.SUPPORT_FLOATING_BUFFER_PER_INPUT_GATE); if (numBuffersPerGate < MIN_BUFFERS_PER_GATE) { throw new IllegalArgumentException( String.format( @@ -107,7 +112,7 @@ public abstract class AbstractRemoteShuffleInputGateFactory { numConcurrentReading); SupplierWithException bufferPoolFactory = - createBufferPoolFactory(networkBufferPool, numBuffersPerGate); + createBufferPoolFactory(networkBufferPool, numBuffersPerGate, supportFloatingBuffers); BufferDecompressor bufferDecompressor = new BufferDecompressor(networkBufferSize, compressionCodec); @@ -122,7 +127,11 @@ public abstract class AbstractRemoteShuffleInputGateFactory { BufferDecompressor bufferDecompressor); private SupplierWithException createBufferPoolFactory( - BufferPoolFactory bufferPoolFactory, int numBuffers) { - return () -> bufferPoolFactory.createBufferPool(numBuffers, numBuffers); + BufferPoolFactory bufferPoolFactory, int numBuffers, boolean supportFloatingBuffers) { + if (supportFloatingBuffers) { + return () -> bufferPoolFactory.createBufferPool(1, numBuffers); + } else { + return () -> bufferPoolFactory.createBufferPool(numBuffers, numBuffers); + } } } diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java index 4b2f65a25..80e3c5dbe 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java @@ -65,6 +65,8 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory { */ protected final int numBuffersPerPartition; + protected boolean supportFloatingBuffers; + protected String compressionCodec; public AbstractRemoteShuffleResultPartitionFactory( @@ -86,6 +88,9 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory { } this.numBuffersPerPartition = Utils.checkedDownCast(configuredMemorySize / networkBufferSize); + this.supportFloatingBuffers = + FlinkUtils.stringValueAsBoolean( + flinkConf, PluginConf.SUPPORT_FLOATING_BUFFER_PER_OUTPUT_GATE); if (numBuffersPerPartition < MIN_BUFFERS_PER_PARTITION) { throw new IllegalArgumentException( String.format( @@ -184,9 +189,14 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory { int numForOutputGate = numBuffersPerPartition - numForResultPartition; List> factories = new ArrayList<>(); - factories.add( - () -> bufferPoolFactory.createBufferPool(numForResultPartition, numForResultPartition)); - factories.add(() -> bufferPoolFactory.createBufferPool(numForOutputGate, numForOutputGate)); + if (supportFloatingBuffers) { + factories.add(() -> bufferPoolFactory.createBufferPool(2, numForResultPartition)); + factories.add(() -> bufferPoolFactory.createBufferPool(2, numForOutputGate)); + } else { + factories.add( + () -> bufferPoolFactory.createBufferPool(numForResultPartition, numForResultPartition)); + factories.add(() -> bufferPoolFactory.createBufferPool(numForOutputGate, numForOutputGate)); + } return factories; } diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/config/PluginConf.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/config/PluginConf.java index f293de6c0..ddc0b5060 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/config/PluginConf.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/config/PluginConf.java @@ -28,7 +28,11 @@ public enum PluginConf { "remote-shuffle.job.concurrent-readings-per-gate", "", String.valueOf(Integer.MAX_VALUE)), MEMORY_PER_RESULT_PARTITION("remote-shuffle.job.memory-per-partition", "", "64m"), MEMORY_PER_INPUT_GATE("remote-shuffle.job.memory-per-gate", "", "32m"), + SUPPORT_FLOATING_BUFFER_PER_INPUT_GATE( + "remote-shuffle.job.support-floating-buffer-per-input-gate", "", "true"), ENABLE_DATA_COMPRESSION("remote-shuffle.job.enable-data-compression", "", "true"), + SUPPORT_FLOATING_BUFFER_PER_OUTPUT_GATE( + "remote-shuffle.job.support-floating-buffer-per-output-gate", "", "true"), REMOTE_SHUFFLE_COMPRESSION_CODEC( "remote-shuffle.job.compression.codec", CelebornConf.SHUFFLE_COMPRESSION_CODEC().key(), diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java index 823355da2..740c21ec3 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java @@ -58,4 +58,8 @@ public class FlinkUtils { public static long byteStringValueAsBytes(Configuration flinkConf, PluginConf pluginConf) { return Utils.byteStringAsBytes(PluginConf.getValue(flinkConf, pluginConf)); } + + public static boolean stringValueAsBoolean(Configuration flinkConf, PluginConf pluginConf) { + return Boolean.valueOf(PluginConf.getValue(flinkConf, pluginConf)); + } }