[CELEBORN-1357] AbstractRemoteShuffleResultPartitionFactory should remove the check of shuffle compression codec

### What changes were proposed in this pull request?

`AbstractRemoteShuffleResultPartitionFactory` removes the check of shuffle compression codec.

### Why are the changes needed?

`AbstractRemoteShuffleResultPartitionFactory` checks whether shuffle compression codec is LZ4 for Flink 1.14 and 1.15 version at present. Meanwhile, since Flink 1.17 version, ZSTD has already been supported. Therefore `AbstractRemoteShuffleResultPartitionFactory` should remove the check of shuffle compression codec for Flink 1.17 version and above, which is checked via the constructor of `BufferCompressor`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

- `RemoteShuffleResultPartitionFactorySuiteJ`

Closes #2414 from SteNicholas/CELEBORN-1357.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
This commit is contained in:
SteNicholas 2024-03-25 15:44:45 +08:00 committed by Shuang
parent 9497d557e6
commit e29f013e3a
8 changed files with 276 additions and 12 deletions

View File

@ -134,14 +134,6 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory {
ShuffleDescriptor shuffleDescriptor,
CelebornConf celebornConf,
int numMappers) {
// in flink1.14/1.15, just support LZ4
if (!compressionCodec.equals(CompressionCodec.LZ4.name())) {
throw new IllegalStateException("Unknown CompressionMethod " + compressionCodec);
}
final BufferCompressor bufferCompressor =
new BufferCompressor(networkBufferSize, compressionCodec);
RemoteShuffleDescriptor rsd = (RemoteShuffleDescriptor) shuffleDescriptor;
ResultPartition partition =
createRemoteShuffleResultPartitionInternal(
taskNameWithSubtaskAndId,
@ -153,8 +145,8 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory {
bufferPoolFactories,
celebornConf,
numMappers,
bufferCompressor,
rsd);
getBufferCompressor(),
(RemoteShuffleDescriptor) shuffleDescriptor);
LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this);
return partition;
}
@ -195,4 +187,11 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory {
int getNetworkBufferSize() {
return networkBufferSize;
}
@VisibleForTesting
BufferCompressor getBufferCompressor() {
return CompressionCodec.NONE.name().equals(compressionCodec)
? null
: new BufferCompressor(networkBufferSize, compressionCodec);
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.plugin.flink;
import static org.mockito.Mockito.mock;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.junit.Assert;
import org.junit.Test;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.CompressionCodec;
/** Tests for {@link RemoteShuffleResultPartitionFactory}. */
public class RemoteShuffleResultPartitionFactorySuiteJ {
@Test
public void testGetBufferCompressor() {
CelebornConf celebornConf = new CelebornConf();
for (CompressionCodec compressionCodec : CompressionCodec.values()) {
RemoteShuffleResultPartitionFactory partitionFactory =
new RemoteShuffleResultPartitionFactory(
celebornConf.set(
CelebornConf.SHUFFLE_COMPRESSION_CODEC().key(), compressionCodec.name()),
mock(ResultPartitionManager.class),
mock(BufferPoolFactory.class),
1);
if (CompressionCodec.NONE.equals(compressionCodec)) {
Assert.assertNull(partitionFactory.getBufferCompressor());
} else if (CompressionCodec.LZ4.equals(compressionCodec)) {
Assert.assertNotNull(partitionFactory.getBufferCompressor());
} else {
Assert.assertThrows(
IllegalConfigurationException.class, partitionFactory::getBufferCompressor);
}
}
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.plugin.flink;
import static org.mockito.Mockito.mock;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.junit.Assert;
import org.junit.Test;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.CompressionCodec;
/** Tests for {@link RemoteShuffleResultPartitionFactory}. */
public class RemoteShuffleResultPartitionFactorySuiteJ {
@Test
public void testGetBufferCompressor() {
CelebornConf celebornConf = new CelebornConf();
for (CompressionCodec compressionCodec : CompressionCodec.values()) {
RemoteShuffleResultPartitionFactory partitionFactory =
new RemoteShuffleResultPartitionFactory(
celebornConf.set(
CelebornConf.SHUFFLE_COMPRESSION_CODEC().key(), compressionCodec.name()),
mock(ResultPartitionManager.class),
mock(BufferPoolFactory.class),
1);
if (CompressionCodec.NONE.equals(compressionCodec)) {
Assert.assertNull(partitionFactory.getBufferCompressor());
} else if (CompressionCodec.LZ4.equals(compressionCodec)) {
Assert.assertNotNull(partitionFactory.getBufferCompressor());
} else {
Assert.assertThrows(
IllegalConfigurationException.class, partitionFactory::getBufferCompressor);
}
}
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.plugin.flink;
import static org.mockito.Mockito.mock;
import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.junit.Assert;
import org.junit.Test;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.CompressionCodec;
/** Tests for {@link RemoteShuffleResultPartitionFactory}. */
public class RemoteShuffleResultPartitionFactorySuiteJ {
@Test
public void testGetBufferCompressor() {
CelebornConf celebornConf = new CelebornConf();
for (CompressionCodec compressionCodec : CompressionCodec.values()) {
RemoteShuffleResultPartitionFactory partitionFactory =
new RemoteShuffleResultPartitionFactory(
celebornConf.set(
CelebornConf.SHUFFLE_COMPRESSION_CODEC().key(), compressionCodec.name()),
mock(ResultPartitionManager.class),
mock(BufferPoolFactory.class),
1);
if (CompressionCodec.NONE.equals(compressionCodec)) {
Assert.assertNull(partitionFactory.getBufferCompressor());
} else {
Assert.assertNotNull(partitionFactory.getBufferCompressor());
}
}
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.plugin.flink;
import static org.mockito.Mockito.mock;
import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.junit.Assert;
import org.junit.Test;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.CompressionCodec;
/** Tests for {@link RemoteShuffleResultPartitionFactory}. */
public class RemoteShuffleResultPartitionFactorySuiteJ {
@Test
public void testGetBufferCompressor() {
CelebornConf celebornConf = new CelebornConf();
for (CompressionCodec compressionCodec : CompressionCodec.values()) {
RemoteShuffleResultPartitionFactory partitionFactory =
new RemoteShuffleResultPartitionFactory(
celebornConf.set(
CelebornConf.SHUFFLE_COMPRESSION_CODEC().key(), compressionCodec.name()),
mock(ResultPartitionManager.class),
mock(BufferPoolFactory.class),
1);
if (CompressionCodec.NONE.equals(compressionCodec)) {
Assert.assertNull(partitionFactory.getBufferCompressor());
} else {
Assert.assertNotNull(partitionFactory.getBufferCompressor());
}
}
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.plugin.flink;
import static org.mockito.Mockito.mock;
import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.junit.Assert;
import org.junit.Test;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.CompressionCodec;
/** Tests for {@link RemoteShuffleResultPartitionFactory}. */
public class RemoteShuffleResultPartitionFactorySuiteJ {
@Test
public void testGetBufferCompressor() {
CelebornConf celebornConf = new CelebornConf();
for (CompressionCodec compressionCodec : CompressionCodec.values()) {
RemoteShuffleResultPartitionFactory partitionFactory =
new RemoteShuffleResultPartitionFactory(
celebornConf.set(
CelebornConf.SHUFFLE_COMPRESSION_CODEC().key(), compressionCodec.name()),
mock(ResultPartitionManager.class),
mock(BufferPoolFactory.class),
1);
if (CompressionCodec.NONE.equals(compressionCodec)) {
Assert.assertNull(partitionFactory.getBufferCompressor());
} else {
Assert.assertNotNull(partitionFactory.getBufferCompressor());
}
}
}
}

View File

@ -3658,7 +3658,9 @@ object CelebornConf extends Logging {
.withAlternative("celeborn.shuffle.compression.codec")
.withAlternative("remote-shuffle.job.compression.codec")
.categories("client")
.doc("The codec used to compress shuffle data. By default, Celeborn provides three codecs: `lz4`, `zstd`, `none`.")
.doc("The codec used to compress shuffle data. By default, Celeborn provides three codecs: `lz4`, `zstd`, `none`. " +
"`none` means that shuffle compression is disabled. " +
"Since Flink version 1.17, zstd is supported for Flink shuffle client.")
.version("0.3.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))

View File

@ -88,7 +88,7 @@ license: |
| celeborn.client.shuffle.batchHandleCommitPartition.threads | 8 | false | Threads number for LifecycleManager to handle commit partition request in batch. | 0.3.0 | celeborn.shuffle.batchHandleCommitPartition.threads |
| celeborn.client.shuffle.batchHandleReleasePartition.interval | 5s | false | Interval for LifecycleManager to schedule handling release partition requests in batch. | 0.3.0 | |
| celeborn.client.shuffle.batchHandleReleasePartition.threads | 8 | false | Threads number for LifecycleManager to handle release partition request in batch. | 0.3.0 | |
| celeborn.client.shuffle.compression.codec | LZ4 | false | The codec used to compress shuffle data. By default, Celeborn provides three codecs: `lz4`, `zstd`, `none`. | 0.3.0 | celeborn.shuffle.compression.codec,remote-shuffle.job.compression.codec |
| celeborn.client.shuffle.compression.codec | LZ4 | false | The codec used to compress shuffle data. By default, Celeborn provides three codecs: `lz4`, `zstd`, `none`. `none` means that shuffle compression is disabled. Since Flink version 1.17, zstd is supported for Flink shuffle client. | 0.3.0 | celeborn.shuffle.compression.codec,remote-shuffle.job.compression.codec |
| celeborn.client.shuffle.compression.zstd.level | 1 | false | Compression level for Zstd compression codec, its value should be an integer between -5 and 22. Increasing the compression level will result in better compression at the expense of more CPU and memory. | 0.3.0 | celeborn.shuffle.compression.zstd.level |
| celeborn.client.shuffle.decompression.lz4.xxhash.instance | &lt;undefined&gt; | false | Decompression XXHash instance for Lz4. Available options: JNI, JAVASAFE, JAVAUNSAFE. | 0.3.2 | |
| celeborn.client.shuffle.expired.checkInterval | 60s | false | Interval for client to check expired shuffles. | 0.3.0 | celeborn.shuffle.expired.checkInterval |