[CELEBORN-1190] Apply error prone patch and suppress some problems

### What changes were proposed in this pull request?
1.  Fix MissingOverride, DefaultCharset, UnnecessaryParentheses Rule
2. Exclude generated sources, FutureReturnValueIgnored, TypeParameterUnusedInFormals, UnusedVariable

### Why are the changes needed?
```
./build/make-distribution.sh --release
```
We get a lot of WARNINGs.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

Closes #2177 from cxzl25/error_prone_patch.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
This commit is contained in:
sychen 2023-12-20 20:54:18 +08:00 committed by Fu Chen
parent eba1efbb04
commit 7f653ce7d6
34 changed files with 56 additions and 20 deletions

View File

@ -113,6 +113,7 @@ public class RemoteBufferStreamReader extends CreditListener {
return isOpened;
}
@Override
public void notifyAvailableCredits(int numCredits) {
if (!closed) {
bufferStream.addCredit(

View File

@ -154,8 +154,7 @@ public class RemoteShuffleInputGateDelegation {
try {
String appUniqueId =
((RemoteShuffleDescriptor) (gateDescriptor.getShuffleDescriptors()[0]))
.getCelebornAppId();
((RemoteShuffleDescriptor) gateDescriptor.getShuffleDescriptors()[0]).getCelebornAppId();
this.shuffleClient =
FlinkShuffleClientImpl.get(
appUniqueId,

View File

@ -368,7 +368,7 @@ public class PartitionSortedBuffer implements SortBuffer {
}
private int getSegmentOffsetFromPointer(long value) {
return (int) (value);
return (int) value;
}
@Override

View File

@ -71,6 +71,7 @@ public class FlinkTransportClientFactory extends TransportClientFactory {
return null;
}
@Override
public TransportClient createClient(String remoteHost, int remotePort)
throws IOException, InterruptedException {
return createClient(

View File

@ -167,6 +167,7 @@ public class TransportFrameDecoderWithBufferSupplier extends ChannelInboundHandl
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object data) {
io.netty.buffer.ByteBuf nettyBuf = (io.netty.buffer.ByteBuf) data;
try {

View File

@ -557,6 +557,7 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl {
this.dataClientFactory = dataClientFactory;
}
@Override
@VisibleForTesting
public TransportClientFactory getDataClientFactory() {
return flinkTransportClientFactory;

View File

@ -36,6 +36,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat
super(conf, networkBufferPool, networkBufferSize);
}
@Override
protected RemoteShuffleInputGate createInputGate(
String owningTaskName,
int gateIndex,

View File

@ -36,6 +36,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat
super(conf, networkBufferPool, networkBufferSize);
}
@Override
protected RemoteShuffleInputGate createInputGate(
String owningTaskName,
int gateIndex,

View File

@ -37,6 +37,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat
}
// For testing.
@Override
protected RemoteShuffleInputGate createInputGate(
String owningTaskName,
int gateIndex,

View File

@ -37,6 +37,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat
}
// For testing.
@Override
protected RemoteShuffleInputGate createInputGate(
String owningTaskName,
int gateIndex,

View File

@ -135,7 +135,7 @@ public class MRAppMasterWithCeleborn extends MRAppMaster {
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
if (applicationAttemptId != null) {
CallerContext.setCurrent(
(new CallerContext.Builder("mr_app_master_with_celeborn_" + applicationAttemptId))
new CallerContext.Builder("mr_app_master_with_celeborn_" + applicationAttemptId)
.build());
}

View File

@ -295,6 +295,7 @@ public class CelebornSortBasedPusher<K, V> extends OutputStream {
}
}
@Override
public void flush() {
logger.info("Sort based pusher called flush");
try {
@ -305,6 +306,7 @@ public class CelebornSortBasedPusher<K, V> extends OutputStream {
}
}
@Override
public void close() {
flush();
try {

View File

@ -268,6 +268,7 @@ public class SparkShuffleManager implements ShuffleManager {
}
}
@Override
public <K, C> ShuffleReader<K, C> getReader(
ShuffleHandle handle, int startPartition, int endPartition, TaskContext context) {
if (handle instanceof CelebornShuffleHandle) {

View File

@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.columnar;
import java.nio.charset.StandardCharsets;
import org.apache.spark.sql.execution.vectorized.Dictionary;
public class CelebornColumnDictionary implements Dictionary {
@ -58,6 +60,6 @@ public class CelebornColumnDictionary implements Dictionary {
@Override
public byte[] decodeToBinary(int id) {
return stringDictionary[id].getBytes();
return stringDictionary[id].getBytes(StandardCharsets.UTF_8);
}
}

View File

@ -407,6 +407,7 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
}
}
// Added in SPARK-32917, for Spark 3.2 and above
public long[] getPartitionLengths() {
throw new UnsupportedOperationException(
"Celeborn is not compatible with Spark push mode, please set spark.shuffle.push.enabled to false");

View File

@ -504,7 +504,7 @@ public class ShuffleClientImpl extends ShuffleClient {
ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
}
@VisibleForTesting
@Override
public PartitionLocation registerMapPartitionTask(
int shuffleId, int numMappers, int mapId, int attemptId, int partitionId) throws IOException {
logger.info(
@ -1277,6 +1277,7 @@ public class ShuffleClientImpl extends ShuffleClient {
false);
}
@Override
public void pushMergedData(int shuffleId, int mapId, int attemptId) throws IOException {
final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
PushState pushState = pushStates.get(mapKey);
@ -1791,8 +1792,8 @@ public class ShuffleClientImpl extends ShuffleClient {
private boolean connectFail(String message) {
return (message.startsWith("Connection from ") && message.endsWith(" closed"))
|| (message.equals("Connection reset by peer"))
|| (message.startsWith("Failed to send RPC "));
|| message.equals("Connection reset by peer")
|| message.startsWith("Failed to send RPC ");
}
@VisibleForTesting

View File

@ -134,10 +134,12 @@ public class WorkerPartitionReader implements PartitionReader {
ShuffleClient.incrementTotalReadCounter();
}
@Override
public boolean hasNext() {
return returnedChunks < streamHandler.getNumChunks();
}
@Override
public ByteBuf next() throws IOException, InterruptedException {
checkException();
if (chunkIndex < streamHandler.getNumChunks()) {
@ -160,6 +162,7 @@ public class WorkerPartitionReader implements PartitionReader {
return chunk;
}
@Override
public void close() {
synchronized (this) {
closed = true;

View File

@ -18,6 +18,7 @@
package org.apache.celeborn.client.security;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.Optional;
import java.util.Properties;
@ -50,7 +51,7 @@ public class CryptoUtils {
CryptoRandomFactory.getCryptoRandom(new Properties()).nextBytes(iv);
} catch (GeneralSecurityException e) {
logger.warn("Failed to create crypto Initialization Vector", e);
iv = "1234567890123456".getBytes();
iv = "1234567890123456".getBytes(StandardCharsets.UTF_8);
}
long initialIVFinish = System.nanoTime();
long initialIVTime = TimeUnit.NANOSECONDS.toMillis(initialIVFinish - initialIVStart);

View File

@ -31,7 +31,7 @@ public class CodecSuiteJ {
@Test
public void testLz4Codec() {
int blockSize = (new CelebornConf()).clientPushBufferMaxSize();
int blockSize = new CelebornConf().clientPushBufferMaxSize();
Lz4Compressor lz4Compressor = new Lz4Compressor(blockSize);
byte[] data = RandomStringUtils.random(1024).getBytes(StandardCharsets.UTF_8);
int oriLength = data.length;
@ -49,7 +49,7 @@ public class CodecSuiteJ {
@Test
public void testZstdCodec() {
for (int level = -5; level <= 22; level++) {
int blockSize = (new CelebornConf()).clientPushBufferMaxSize();
int blockSize = new CelebornConf().clientPushBufferMaxSize();
ZstdCompressor zstdCompressor = new ZstdCompressor(blockSize, level);
byte[] data = RandomStringUtils.random(1024).getBytes(StandardCharsets.UTF_8);
int oriLength = data.length;

View File

@ -99,7 +99,7 @@ public final class PushData extends RequestMessage {
return requestId == o.requestId
&& mode == o.mode
&& shuffleKey.equals(o.shuffleKey)
&& partitionUniqueId.equals((o.partitionUniqueId))
&& partitionUniqueId.equals(o.partitionUniqueId)
&& super.equals(o);
}
return false;

View File

@ -95,7 +95,7 @@ public final class PushDataHandShake extends RequestMessage {
PushDataHandShake o = (PushDataHandShake) other;
return mode == o.mode
&& shuffleKey.equals(o.shuffleKey)
&& partitionUniqueId.equals((o.partitionUniqueId))
&& partitionUniqueId.equals(o.partitionUniqueId)
&& attemptId == o.attemptId
&& numPartitions == o.numPartitions
&& bufferSize == o.bufferSize

View File

@ -80,7 +80,7 @@ public final class RegionFinish extends RequestMessage {
RegionFinish o = (RegionFinish) other;
return mode == o.mode
&& shuffleKey.equals(o.shuffleKey)
&& partitionUniqueId.equals((o.partitionUniqueId))
&& partitionUniqueId.equals(o.partitionUniqueId)
&& attemptId == o.attemptId
&& super.equals(o);
}

View File

@ -98,7 +98,7 @@ public final class RegionStart extends RequestMessage {
RegionStart o = (RegionStart) other;
return mode == o.mode
&& shuffleKey.equals(o.shuffleKey)
&& partitionUniqueId.equals((o.partitionUniqueId))
&& partitionUniqueId.equals(o.partitionUniqueId)
&& attemptId == o.attemptId
&& currentRegionIndex == o.currentRegionIndex
&& isBroadcast == o.isBroadcast

View File

@ -53,6 +53,7 @@ public final class StreamChunkSlice implements Encodable {
return 20;
}
@Override
public void encode(ByteBuf buffer) {
buffer.writeLong(streamId);
buffer.writeInt(chunkIndex);

View File

@ -41,6 +41,7 @@ public class SecretRegistryImpl implements SecretRegistry {
secrets.remove(appId);
}
@Override
public boolean isRegistered(String appId) {
return secrets.containsKey(appId);
}

View File

@ -195,6 +195,7 @@ public class CelebornSaslSuiteJ {
}
}
@Override
public void close() {
if (client != null) {
client.close();

View File

@ -138,6 +138,7 @@ public class SingleMasterMetaManager extends AbstractMetaManager {
updateMetaByReportWorkerUnavailable(failedNodes);
}
@Override
public void handleUpdatePartitionSize() {
updatePartitionSize();
}

View File

@ -21,8 +21,10 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NO
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
@ -55,7 +57,7 @@ public class SlotsAllocatorRackAwareSuiteJ {
conf.set(CelebornConf.CLIENT_RESERVE_SLOTS_RACKAWARE_ENABLED().key(), "true");
File mapFile = File.createTempFile("testResolve1", ".txt");
FileWriter mapFileWriter = new FileWriter(mapFile);
Writer mapFileWriter = Files.newBufferedWriter(mapFile.toPath(), StandardCharsets.UTF_8);
mapFileWriter.write(
"host1 /default/rack1\nhost2 /default/rack1\nhost3 /default/rack1\n"
+ "host4 /default/rack2\nhost5 /default/rack2\nhost6 /default/rack2\n");
@ -82,6 +84,7 @@ public class SlotsAllocatorRackAwareSuiteJ {
Consumer<PartitionLocation> assertCustomer =
new Consumer<PartitionLocation>() {
@Override
public void accept(PartitionLocation location) {
Assert.assertNotEquals(
resolver.resolve(location.getHost()).getNetworkLocation(),
@ -118,6 +121,7 @@ public class SlotsAllocatorRackAwareSuiteJ {
Consumer<PartitionLocation> assertConsumer =
new Consumer<PartitionLocation>() {
@Override
public void accept(PartitionLocation location) {
Assert.assertEquals(
NetworkTopology.DEFAULT_RACK,
@ -141,6 +145,7 @@ public class SlotsAllocatorRackAwareSuiteJ {
workers.forEach(
new Consumer<WorkerInfo>() {
@Override
public void accept(WorkerInfo workerInfo) {
workerInfo.networkLocation_$eq(
resolver.resolve(workerInfo.host()).getNetworkLocation());
@ -416,7 +421,7 @@ public class SlotsAllocatorRackAwareSuiteJ {
int expected =
(int)
Math.ceil(
((double) (numPartitions)
((double) numPartitions
/ totalHosts
* (1
+ ((double) (maxHostsPerRack - secondMaxHostsPerRack + 1))

View File

@ -929,7 +929,7 @@
<fork>true</fork>
<compilerArgs>
<arg>-XDcompilePolicy=simple</arg>
<arg>-Xplugin:ErrorProne</arg>
<arg>-Xplugin:ErrorProne -XepExcludedPaths:.*/target/generated-sources/protobuf/.* -Xep:FutureReturnValueIgnored:OFF -Xep:TypeParameterUnusedInFormals:OFF -Xep:UnusedVariable:OFF</arg>
</compilerArgs>
<annotationProcessorPaths>
<path>

View File

@ -41,6 +41,7 @@ public class SystemConfig extends DynamicConfig {
return null;
}
@Override
public <T> T getValue(
String configKey,
ConfigEntry<Object> configEntry,

View File

@ -27,6 +27,7 @@ public interface DBIterator extends Iterator<Map.Entry<byte[], byte[]>>, Closeab
/** Position at the first entry in the source whose `key` is at target. */
void seek(byte[] key);
@Override
default void remove() {
throw new UnsupportedOperationException();
}

View File

@ -401,15 +401,18 @@ public abstract class FileWriter implements DeviceObserver {
}
}
@Override
public int hashCode() {
return fileInfo.getFilePath().hashCode();
}
@Override
public boolean equals(Object obj) {
return (obj instanceof FileWriter)
&& fileInfo.getFilePath().equals(((FileWriter) obj).fileInfo.getFilePath());
}
@Override
public String toString() {
return fileInfo.getFilePath();
}

View File

@ -91,6 +91,7 @@ public final class MapPartitionFileWriter extends FileWriter {
}
}
@Override
public void write(ByteBuf data) throws IOException {
data.markReaderIndex();
int partitionId = data.readInt();
@ -155,13 +156,14 @@ public final class MapPartitionFileWriter extends FileWriter {
deleted = true;
} else {
StorageManager.hadoopFs()
.create(new Path(Utils.getWriteSuccessFilePath((fileInfo.getIndexPath()))))
.create(new Path(Utils.getWriteSuccessFilePath(fileInfo.getIndexPath())))
.close();
}
}
});
}
@Override
public synchronized void destroy(IOException ioException) {
destroyIndex();
super.destroy(ioException);

View File

@ -62,6 +62,7 @@ public final class ReducePartitionFileWriter extends FileWriter {
this.nextBoundary = this.shuffleChunkSize;
}
@Override
protected void flush(boolean finalFlush) throws IOException {
super.flush(finalFlush);
maybeSetChunkOffsets(finalFlush);
@ -87,6 +88,7 @@ public final class ReducePartitionFileWriter extends FileWriter {
return fileInfo.getLastChunkOffset() == fileInfo.getFileLength();
}
@Override
public synchronized long close() throws IOException {
return super.close(
() -> {