diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java index 37491c997..35fbf324b 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java @@ -134,14 +134,6 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory { ShuffleDescriptor shuffleDescriptor, CelebornConf celebornConf, int numMappers) { - - // in flink1.14/1.15, just support LZ4 - if (!compressionCodec.equals(CompressionCodec.LZ4.name())) { - throw new IllegalStateException("Unknown CompressionMethod " + compressionCodec); - } - final BufferCompressor bufferCompressor = - new BufferCompressor(networkBufferSize, compressionCodec); - RemoteShuffleDescriptor rsd = (RemoteShuffleDescriptor) shuffleDescriptor; ResultPartition partition = createRemoteShuffleResultPartitionInternal( taskNameWithSubtaskAndId, @@ -153,8 +145,8 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory { bufferPoolFactories, celebornConf, numMappers, - bufferCompressor, - rsd); + getBufferCompressor(), + (RemoteShuffleDescriptor) shuffleDescriptor); LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this); return partition; } @@ -195,4 +187,11 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory { int getNetworkBufferSize() { return networkBufferSize; } + + @VisibleForTesting + BufferCompressor getBufferCompressor() { + return CompressionCodec.NONE.name().equals(compressionCodec) + ? null + : new BufferCompressor(networkBufferSize, compressionCodec); + } } diff --git a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactorySuiteJ.java b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactorySuiteJ.java new file mode 100644 index 000000000..677a7aaba --- /dev/null +++ b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactorySuiteJ.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.plugin.flink; + +import static org.mockito.Mockito.mock; + +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.protocol.CompressionCodec; + +/** Tests for {@link RemoteShuffleResultPartitionFactory}. */ +public class RemoteShuffleResultPartitionFactorySuiteJ { + + @Test + public void testGetBufferCompressor() { + CelebornConf celebornConf = new CelebornConf(); + for (CompressionCodec compressionCodec : CompressionCodec.values()) { + RemoteShuffleResultPartitionFactory partitionFactory = + new RemoteShuffleResultPartitionFactory( + celebornConf.set( + CelebornConf.SHUFFLE_COMPRESSION_CODEC().key(), compressionCodec.name()), + mock(ResultPartitionManager.class), + mock(BufferPoolFactory.class), + 1); + if (CompressionCodec.NONE.equals(compressionCodec)) { + Assert.assertNull(partitionFactory.getBufferCompressor()); + } else if (CompressionCodec.LZ4.equals(compressionCodec)) { + Assert.assertNotNull(partitionFactory.getBufferCompressor()); + } else { + Assert.assertThrows( + IllegalConfigurationException.class, partitionFactory::getBufferCompressor); + } + } + } +} diff --git a/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactorySuiteJ.java b/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactorySuiteJ.java new file mode 100644 index 000000000..677a7aaba --- /dev/null +++ b/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactorySuiteJ.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.plugin.flink; + +import static org.mockito.Mockito.mock; + +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.protocol.CompressionCodec; + +/** Tests for {@link RemoteShuffleResultPartitionFactory}. */ +public class RemoteShuffleResultPartitionFactorySuiteJ { + + @Test + public void testGetBufferCompressor() { + CelebornConf celebornConf = new CelebornConf(); + for (CompressionCodec compressionCodec : CompressionCodec.values()) { + RemoteShuffleResultPartitionFactory partitionFactory = + new RemoteShuffleResultPartitionFactory( + celebornConf.set( + CelebornConf.SHUFFLE_COMPRESSION_CODEC().key(), compressionCodec.name()), + mock(ResultPartitionManager.class), + mock(BufferPoolFactory.class), + 1); + if (CompressionCodec.NONE.equals(compressionCodec)) { + Assert.assertNull(partitionFactory.getBufferCompressor()); + } else if (CompressionCodec.LZ4.equals(compressionCodec)) { + Assert.assertNotNull(partitionFactory.getBufferCompressor()); + } else { + Assert.assertThrows( + IllegalConfigurationException.class, partitionFactory::getBufferCompressor); + } + } + } +} diff --git a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactorySuiteJ.java b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactorySuiteJ.java new file mode 100644 index 000000000..548336db6 --- /dev/null +++ b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactorySuiteJ.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.plugin.flink; + +import static org.mockito.Mockito.mock; + +import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.protocol.CompressionCodec; + +/** Tests for {@link RemoteShuffleResultPartitionFactory}. */ +public class RemoteShuffleResultPartitionFactorySuiteJ { + + @Test + public void testGetBufferCompressor() { + CelebornConf celebornConf = new CelebornConf(); + for (CompressionCodec compressionCodec : CompressionCodec.values()) { + RemoteShuffleResultPartitionFactory partitionFactory = + new RemoteShuffleResultPartitionFactory( + celebornConf.set( + CelebornConf.SHUFFLE_COMPRESSION_CODEC().key(), compressionCodec.name()), + mock(ResultPartitionManager.class), + mock(BufferPoolFactory.class), + 1); + if (CompressionCodec.NONE.equals(compressionCodec)) { + Assert.assertNull(partitionFactory.getBufferCompressor()); + } else { + Assert.assertNotNull(partitionFactory.getBufferCompressor()); + } + } + } +} diff --git a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactorySuiteJ.java b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactorySuiteJ.java new file mode 100644 index 000000000..548336db6 --- /dev/null +++ b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactorySuiteJ.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.plugin.flink; + +import static org.mockito.Mockito.mock; + +import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.protocol.CompressionCodec; + +/** Tests for {@link RemoteShuffleResultPartitionFactory}. */ +public class RemoteShuffleResultPartitionFactorySuiteJ { + + @Test + public void testGetBufferCompressor() { + CelebornConf celebornConf = new CelebornConf(); + for (CompressionCodec compressionCodec : CompressionCodec.values()) { + RemoteShuffleResultPartitionFactory partitionFactory = + new RemoteShuffleResultPartitionFactory( + celebornConf.set( + CelebornConf.SHUFFLE_COMPRESSION_CODEC().key(), compressionCodec.name()), + mock(ResultPartitionManager.class), + mock(BufferPoolFactory.class), + 1); + if (CompressionCodec.NONE.equals(compressionCodec)) { + Assert.assertNull(partitionFactory.getBufferCompressor()); + } else { + Assert.assertNotNull(partitionFactory.getBufferCompressor()); + } + } + } +} diff --git a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactorySuiteJ.java b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactorySuiteJ.java new file mode 100644 index 000000000..548336db6 --- /dev/null +++ b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactorySuiteJ.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.plugin.flink; + +import static org.mockito.Mockito.mock; + +import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.protocol.CompressionCodec; + +/** Tests for {@link RemoteShuffleResultPartitionFactory}. */ +public class RemoteShuffleResultPartitionFactorySuiteJ { + + @Test + public void testGetBufferCompressor() { + CelebornConf celebornConf = new CelebornConf(); + for (CompressionCodec compressionCodec : CompressionCodec.values()) { + RemoteShuffleResultPartitionFactory partitionFactory = + new RemoteShuffleResultPartitionFactory( + celebornConf.set( + CelebornConf.SHUFFLE_COMPRESSION_CODEC().key(), compressionCodec.name()), + mock(ResultPartitionManager.class), + mock(BufferPoolFactory.class), + 1); + if (CompressionCodec.NONE.equals(compressionCodec)) { + Assert.assertNull(partitionFactory.getBufferCompressor()); + } else { + Assert.assertNotNull(partitionFactory.getBufferCompressor()); + } + } + } +} diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 90248088a..9ccbb207b 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -3658,7 +3658,9 @@ object CelebornConf extends Logging { .withAlternative("celeborn.shuffle.compression.codec") .withAlternative("remote-shuffle.job.compression.codec") .categories("client") - .doc("The codec used to compress shuffle data. By default, Celeborn provides three codecs: `lz4`, `zstd`, `none`.") + .doc("The codec used to compress shuffle data. By default, Celeborn provides three codecs: `lz4`, `zstd`, `none`. " + + "`none` means that shuffle compression is disabled. " + + "Since Flink version 1.17, zstd is supported for Flink shuffle client.") .version("0.3.0") .stringConf .transform(_.toUpperCase(Locale.ROOT)) diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 48763e6d6..20aa71580 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -88,7 +88,7 @@ license: | | celeborn.client.shuffle.batchHandleCommitPartition.threads | 8 | false | Threads number for LifecycleManager to handle commit partition request in batch. | 0.3.0 | celeborn.shuffle.batchHandleCommitPartition.threads | | celeborn.client.shuffle.batchHandleReleasePartition.interval | 5s | false | Interval for LifecycleManager to schedule handling release partition requests in batch. | 0.3.0 | | | celeborn.client.shuffle.batchHandleReleasePartition.threads | 8 | false | Threads number for LifecycleManager to handle release partition request in batch. | 0.3.0 | | -| celeborn.client.shuffle.compression.codec | LZ4 | false | The codec used to compress shuffle data. By default, Celeborn provides three codecs: `lz4`, `zstd`, `none`. | 0.3.0 | celeborn.shuffle.compression.codec,remote-shuffle.job.compression.codec | +| celeborn.client.shuffle.compression.codec | LZ4 | false | The codec used to compress shuffle data. By default, Celeborn provides three codecs: `lz4`, `zstd`, `none`. `none` means that shuffle compression is disabled. Since Flink version 1.17, zstd is supported for Flink shuffle client. | 0.3.0 | celeborn.shuffle.compression.codec,remote-shuffle.job.compression.codec | | celeborn.client.shuffle.compression.zstd.level | 1 | false | Compression level for Zstd compression codec, its value should be an integer between -5 and 22. Increasing the compression level will result in better compression at the expense of more CPU and memory. | 0.3.0 | celeborn.shuffle.compression.zstd.level | | celeborn.client.shuffle.decompression.lz4.xxhash.instance | <undefined> | false | Decompression XXHash instance for Lz4. Available options: JNI, JAVASAFE, JAVAUNSAFE. | 0.3.2 | | | celeborn.client.shuffle.expired.checkInterval | 60s | false | Interval for client to check expired shuffles. | 0.3.0 | celeborn.shuffle.expired.checkInterval |