[CELEBORN-1363] AbstractRemoteShuffleInputGateFactory supports celeborn.client.shuffle.compression.codec to configure compression codec

### What changes were proposed in this pull request?

`AbstractRemoteShuffleInputGateFactory` supports `celeborn.client.shuffle.compression.codec` to configure compression codec.

### Why are the changes needed?

`AbstractRemoteShuffleInputGateFactory` only supports LZ4 compression codec via hard code at present. `AbstractRemoteShuffleInputGateFactory` should support `celeborn.client.shuffle.compression.codec` to configure compression codec like ZSTD etc.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No.

Closes #2435 from SteNicholas/CELEBORN-1363.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
SteNicholas 2024-04-01 11:32:44 +08:00 committed by mingji
parent 82022a9427
commit af5c5060f6
No known key found for this signature in database
GPG Key ID: 6392F71F37356FA0

View File

@ -44,9 +44,6 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
/** Number of max concurrent reading channels. */
protected final int numConcurrentReading;
/** Codec used for compression / decompression. */
protected static final String compressionCodec = "LZ4";
/** Network buffer size. */
protected final int networkBufferSize;
@ -94,7 +91,7 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
SupplierWithException<BufferPool, IOException> bufferPoolFactory =
createBufferPoolFactory(networkBufferPool, numBuffersPerGate, supportFloatingBuffers);
BufferDecompressor bufferDecompressor =
new BufferDecompressor(networkBufferSize, compressionCodec);
new BufferDecompressor(networkBufferSize, celebornConf.shuffleCompressionCodec().name());
return createInputGate(owningTaskName, gateIndex, igdd, bufferPoolFactory, bufferDecompressor);
}