[CELEBORN-597][FLINK] Support flink floating buffer for input gate and output gate. (#1503)

This commit is contained in:
Ethan Feng 2023-05-24 23:15:57 +08:00 committed by GitHub
parent ef8e556202
commit 4ee7d9eba8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 33 additions and 6 deletions

View File

@ -62,6 +62,8 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
/** Sum of buffers. */
protected final int numBuffersPerGate;
protected boolean supportFloatingBuffers;
protected CelebornConf celebornConf;
public AbstractRemoteShuffleInputGateFactory(
@ -83,6 +85,9 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
}
this.numBuffersPerGate = Utils.checkedDownCast(configuredMemorySize / networkBufferSize);
this.supportFloatingBuffers =
FlinkUtils.stringValueAsBoolean(
flinkConf, PluginConf.SUPPORT_FLOATING_BUFFER_PER_INPUT_GATE);
if (numBuffersPerGate < MIN_BUFFERS_PER_GATE) {
throw new IllegalArgumentException(
String.format(
@ -107,7 +112,7 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
numConcurrentReading);
SupplierWithException<BufferPool, IOException> bufferPoolFactory =
createBufferPoolFactory(networkBufferPool, numBuffersPerGate);
createBufferPoolFactory(networkBufferPool, numBuffersPerGate, supportFloatingBuffers);
BufferDecompressor bufferDecompressor =
new BufferDecompressor(networkBufferSize, compressionCodec);
@ -122,7 +127,11 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
BufferDecompressor bufferDecompressor);
private SupplierWithException<BufferPool, IOException> createBufferPoolFactory(
BufferPoolFactory bufferPoolFactory, int numBuffers) {
return () -> bufferPoolFactory.createBufferPool(numBuffers, numBuffers);
BufferPoolFactory bufferPoolFactory, int numBuffers, boolean supportFloatingBuffers) {
if (supportFloatingBuffers) {
return () -> bufferPoolFactory.createBufferPool(1, numBuffers);
} else {
return () -> bufferPoolFactory.createBufferPool(numBuffers, numBuffers);
}
}
}

View File

@ -65,6 +65,8 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory {
*/
protected final int numBuffersPerPartition;
protected boolean supportFloatingBuffers;
protected String compressionCodec;
public AbstractRemoteShuffleResultPartitionFactory(
@ -86,6 +88,9 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory {
}
this.numBuffersPerPartition = Utils.checkedDownCast(configuredMemorySize / networkBufferSize);
this.supportFloatingBuffers =
FlinkUtils.stringValueAsBoolean(
flinkConf, PluginConf.SUPPORT_FLOATING_BUFFER_PER_OUTPUT_GATE);
if (numBuffersPerPartition < MIN_BUFFERS_PER_PARTITION) {
throw new IllegalArgumentException(
String.format(
@ -184,9 +189,14 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory {
int numForOutputGate = numBuffersPerPartition - numForResultPartition;
List<SupplierWithException<BufferPool, IOException>> factories = new ArrayList<>();
factories.add(
() -> bufferPoolFactory.createBufferPool(numForResultPartition, numForResultPartition));
factories.add(() -> bufferPoolFactory.createBufferPool(numForOutputGate, numForOutputGate));
if (supportFloatingBuffers) {
factories.add(() -> bufferPoolFactory.createBufferPool(2, numForResultPartition));
factories.add(() -> bufferPoolFactory.createBufferPool(2, numForOutputGate));
} else {
factories.add(
() -> bufferPoolFactory.createBufferPool(numForResultPartition, numForResultPartition));
factories.add(() -> bufferPoolFactory.createBufferPool(numForOutputGate, numForOutputGate));
}
return factories;
}

View File

@ -28,7 +28,11 @@ public enum PluginConf {
"remote-shuffle.job.concurrent-readings-per-gate", "", String.valueOf(Integer.MAX_VALUE)),
MEMORY_PER_RESULT_PARTITION("remote-shuffle.job.memory-per-partition", "", "64m"),
MEMORY_PER_INPUT_GATE("remote-shuffle.job.memory-per-gate", "", "32m"),
SUPPORT_FLOATING_BUFFER_PER_INPUT_GATE(
"remote-shuffle.job.support-floating-buffer-per-input-gate", "", "true"),
ENABLE_DATA_COMPRESSION("remote-shuffle.job.enable-data-compression", "", "true"),
SUPPORT_FLOATING_BUFFER_PER_OUTPUT_GATE(
"remote-shuffle.job.support-floating-buffer-per-output-gate", "", "true"),
REMOTE_SHUFFLE_COMPRESSION_CODEC(
"remote-shuffle.job.compression.codec",
CelebornConf.SHUFFLE_COMPRESSION_CODEC().key(),

View File

@ -58,4 +58,8 @@ public class FlinkUtils {
public static long byteStringValueAsBytes(Configuration flinkConf, PluginConf pluginConf) {
return Utils.byteStringAsBytes(PluginConf.getValue(flinkConf, pluginConf));
}
public static boolean stringValueAsBoolean(Configuration flinkConf, PluginConf pluginConf) {
return Boolean.valueOf(PluginConf.getValue(flinkConf, pluginConf));
}
}