diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteBufferStreamReader.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteBufferStreamReader.java index 51dadf9f1..640ea8a25 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteBufferStreamReader.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteBufferStreamReader.java @@ -113,6 +113,7 @@ public class RemoteBufferStreamReader extends CreditListener { return isOpened; } + @Override public void notifyAvailableCredits(int numCredits) { if (!closed) { bufferStream.addCredit( diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java index 4427b8673..e0499dc83 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java @@ -154,8 +154,7 @@ public class RemoteShuffleInputGateDelegation { try { String appUniqueId = - ((RemoteShuffleDescriptor) (gateDescriptor.getShuffleDescriptors()[0])) - .getCelebornAppId(); + ((RemoteShuffleDescriptor) gateDescriptor.getShuffleDescriptors()[0]).getCelebornAppId(); this.shuffleClient = FlinkShuffleClientImpl.get( appUniqueId, diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java index 0c2a0a2bb..88b5338d6 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java @@ -368,7 +368,7 @@ public class PartitionSortedBuffer implements SortBuffer { } private int getSegmentOffsetFromPointer(long value) { - return (int) (value); + return (int) value; } @Override diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java index 68e418a54..b3cfc09ea 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java @@ -71,6 +71,7 @@ public class FlinkTransportClientFactory extends TransportClientFactory { return null; } + @Override public TransportClient createClient(String remoteHost, int remotePort) throws IOException, InterruptedException { return createClient( diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java index 84ec43aa5..9140b6b23 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java @@ -167,6 +167,7 @@ public class TransportFrameDecoderWithBufferSupplier extends ChannelInboundHandl } } + @Override public void channelRead(ChannelHandlerContext ctx, Object data) { io.netty.buffer.ByteBuf nettyBuf = (io.netty.buffer.ByteBuf) data; try { diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java index c7ed33b33..5e2974547 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java @@ -557,6 +557,7 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl { this.dataClientFactory = dataClientFactory; } + @Override @VisibleForTesting public TransportClientFactory getDataClientFactory() { return flinkTransportClientFactory; diff --git a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java index 7e53e2585..f5e2b2938 100644 --- a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java +++ b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java @@ -36,6 +36,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat super(conf, networkBufferPool, networkBufferSize); } + @Override protected RemoteShuffleInputGate createInputGate( String owningTaskName, int gateIndex, diff --git a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java index a8cf5e645..de0ddab7c 100644 --- a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java +++ b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java @@ -36,6 +36,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat super(conf, networkBufferPool, networkBufferSize); } + @Override protected RemoteShuffleInputGate createInputGate( String owningTaskName, int gateIndex, diff --git a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java index d13613023..b2ac441ae 100644 --- a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java +++ b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java @@ -37,6 +37,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat } // For testing. + @Override protected RemoteShuffleInputGate createInputGate( String owningTaskName, int gateIndex, diff --git a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java index d13613023..b2ac441ae 100644 --- a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java +++ b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java @@ -37,6 +37,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat } // For testing. + @Override protected RemoteShuffleInputGate createInputGate( String owningTaskName, int gateIndex, diff --git a/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java b/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java index f6d06acdd..43958e680 100644 --- a/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java +++ b/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java @@ -135,7 +135,7 @@ public class MRAppMasterWithCeleborn extends MRAppMaster { ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId(); if (applicationAttemptId != null) { CallerContext.setCurrent( - (new CallerContext.Builder("mr_app_master_with_celeborn_" + applicationAttemptId)) + new CallerContext.Builder("mr_app_master_with_celeborn_" + applicationAttemptId) .build()); } diff --git a/client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java b/client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java index 7aa3857bd..4692a58cf 100644 --- a/client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java +++ b/client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java @@ -295,6 +295,7 @@ public class CelebornSortBasedPusher extends OutputStream { } } + @Override public void flush() { logger.info("Sort based pusher called flush"); try { @@ -305,6 +306,7 @@ public class CelebornSortBasedPusher extends OutputStream { } } + @Override public void close() { flush(); try { diff --git a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java index 35939dd5c..5b68db0f8 100644 --- a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java +++ b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java @@ -268,6 +268,7 @@ public class SparkShuffleManager implements ShuffleManager { } } + @Override public ShuffleReader getReader( ShuffleHandle handle, int startPartition, int endPartition, TaskContext context) { if (handle instanceof CelebornShuffleHandle) { diff --git a/client-spark/spark-3-columnar-shuffle/src/main/java/org/apache/spark/sql/execution/columnar/CelebornColumnDictionary.java b/client-spark/spark-3-columnar-shuffle/src/main/java/org/apache/spark/sql/execution/columnar/CelebornColumnDictionary.java index 6354bac41..c264b257b 100644 --- a/client-spark/spark-3-columnar-shuffle/src/main/java/org/apache/spark/sql/execution/columnar/CelebornColumnDictionary.java +++ b/client-spark/spark-3-columnar-shuffle/src/main/java/org/apache/spark/sql/execution/columnar/CelebornColumnDictionary.java @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.columnar; +import java.nio.charset.StandardCharsets; + import org.apache.spark.sql.execution.vectorized.Dictionary; public class CelebornColumnDictionary implements Dictionary { @@ -58,6 +60,6 @@ public class CelebornColumnDictionary implements Dictionary { @Override public byte[] decodeToBinary(int id) { - return stringDictionary[id].getBytes(); + return stringDictionary[id].getBytes(StandardCharsets.UTF_8); } } diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java index 7b30101f7..a7c494c03 100644 --- a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java +++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java @@ -407,6 +407,7 @@ public class HashBasedShuffleWriter extends ShuffleWriter { } } + // Added in SPARK-32917, for Spark 3.2 and above public long[] getPartitionLengths() { throw new UnsupportedOperationException( "Celeborn is not compatible with Spark push mode, please set spark.shuffle.push.enabled to false"); diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index d83e4283a..bc60c9fd1 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -504,7 +504,7 @@ public class ShuffleClientImpl extends ShuffleClient { ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class))); } - @VisibleForTesting + @Override public PartitionLocation registerMapPartitionTask( int shuffleId, int numMappers, int mapId, int attemptId, int partitionId) throws IOException { logger.info( @@ -1277,6 +1277,7 @@ public class ShuffleClientImpl extends ShuffleClient { false); } + @Override public void pushMergedData(int shuffleId, int mapId, int attemptId) throws IOException { final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId); PushState pushState = pushStates.get(mapKey); @@ -1791,8 +1792,8 @@ public class ShuffleClientImpl extends ShuffleClient { private boolean connectFail(String message) { return (message.startsWith("Connection from ") && message.endsWith(" closed")) - || (message.equals("Connection reset by peer")) - || (message.startsWith("Failed to send RPC ")); + || message.equals("Connection reset by peer") + || message.startsWith("Failed to send RPC "); } @VisibleForTesting diff --git a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java index 080643814..a00f02476 100644 --- a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java +++ b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java @@ -134,10 +134,12 @@ public class WorkerPartitionReader implements PartitionReader { ShuffleClient.incrementTotalReadCounter(); } + @Override public boolean hasNext() { return returnedChunks < streamHandler.getNumChunks(); } + @Override public ByteBuf next() throws IOException, InterruptedException { checkException(); if (chunkIndex < streamHandler.getNumChunks()) { @@ -160,6 +162,7 @@ public class WorkerPartitionReader implements PartitionReader { return chunk; } + @Override public void close() { synchronized (this) { closed = true; diff --git a/client/src/main/java/org/apache/celeborn/client/security/CryptoUtils.java b/client/src/main/java/org/apache/celeborn/client/security/CryptoUtils.java index 5ce4672d6..95bebd919 100644 --- a/client/src/main/java/org/apache/celeborn/client/security/CryptoUtils.java +++ b/client/src/main/java/org/apache/celeborn/client/security/CryptoUtils.java @@ -18,6 +18,7 @@ package org.apache.celeborn.client.security; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.security.GeneralSecurityException; import java.util.Optional; import java.util.Properties; @@ -50,7 +51,7 @@ public class CryptoUtils { CryptoRandomFactory.getCryptoRandom(new Properties()).nextBytes(iv); } catch (GeneralSecurityException e) { logger.warn("Failed to create crypto Initialization Vector", e); - iv = "1234567890123456".getBytes(); + iv = "1234567890123456".getBytes(StandardCharsets.UTF_8); } long initialIVFinish = System.nanoTime(); long initialIVTime = TimeUnit.NANOSECONDS.toMillis(initialIVFinish - initialIVStart); diff --git a/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java b/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java index 53ba4ccd4..07ba142e4 100644 --- a/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java +++ b/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java @@ -31,7 +31,7 @@ public class CodecSuiteJ { @Test public void testLz4Codec() { - int blockSize = (new CelebornConf()).clientPushBufferMaxSize(); + int blockSize = new CelebornConf().clientPushBufferMaxSize(); Lz4Compressor lz4Compressor = new Lz4Compressor(blockSize); byte[] data = RandomStringUtils.random(1024).getBytes(StandardCharsets.UTF_8); int oriLength = data.length; @@ -49,7 +49,7 @@ public class CodecSuiteJ { @Test public void testZstdCodec() { for (int level = -5; level <= 22; level++) { - int blockSize = (new CelebornConf()).clientPushBufferMaxSize(); + int blockSize = new CelebornConf().clientPushBufferMaxSize(); ZstdCompressor zstdCompressor = new ZstdCompressor(blockSize, level); byte[] data = RandomStringUtils.random(1024).getBytes(StandardCharsets.UTF_8); int oriLength = data.length; diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/PushData.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/PushData.java index f8765d5a2..cf94b44c8 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/PushData.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/PushData.java @@ -99,7 +99,7 @@ public final class PushData extends RequestMessage { return requestId == o.requestId && mode == o.mode && shuffleKey.equals(o.shuffleKey) - && partitionUniqueId.equals((o.partitionUniqueId)) + && partitionUniqueId.equals(o.partitionUniqueId) && super.equals(o); } return false; diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/PushDataHandShake.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/PushDataHandShake.java index 1f6745d72..d0e3bda17 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/PushDataHandShake.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/PushDataHandShake.java @@ -95,7 +95,7 @@ public final class PushDataHandShake extends RequestMessage { PushDataHandShake o = (PushDataHandShake) other; return mode == o.mode && shuffleKey.equals(o.shuffleKey) - && partitionUniqueId.equals((o.partitionUniqueId)) + && partitionUniqueId.equals(o.partitionUniqueId) && attemptId == o.attemptId && numPartitions == o.numPartitions && bufferSize == o.bufferSize diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionFinish.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionFinish.java index 0d64bdcdd..34e1e66df 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionFinish.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionFinish.java @@ -80,7 +80,7 @@ public final class RegionFinish extends RequestMessage { RegionFinish o = (RegionFinish) other; return mode == o.mode && shuffleKey.equals(o.shuffleKey) - && partitionUniqueId.equals((o.partitionUniqueId)) + && partitionUniqueId.equals(o.partitionUniqueId) && attemptId == o.attemptId && super.equals(o); } diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionStart.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionStart.java index f12c4a010..918a38fe4 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionStart.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionStart.java @@ -98,7 +98,7 @@ public final class RegionStart extends RequestMessage { RegionStart o = (RegionStart) other; return mode == o.mode && shuffleKey.equals(o.shuffleKey) - && partitionUniqueId.equals((o.partitionUniqueId)) + && partitionUniqueId.equals(o.partitionUniqueId) && attemptId == o.attemptId && currentRegionIndex == o.currentRegionIndex && isBroadcast == o.isBroadcast diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/StreamChunkSlice.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/StreamChunkSlice.java index b0129a42b..abbbb2f93 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/StreamChunkSlice.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/StreamChunkSlice.java @@ -53,6 +53,7 @@ public final class StreamChunkSlice implements Encodable { return 20; } + @Override public void encode(ByteBuf buffer) { buffer.writeLong(streamId); buffer.writeInt(chunkIndex); diff --git a/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java b/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java index 402fc3a27..92b33bc47 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java +++ b/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java @@ -41,6 +41,7 @@ public class SecretRegistryImpl implements SecretRegistry { secrets.remove(appId); } + @Override public boolean isRegistered(String appId) { return secrets.containsKey(appId); } diff --git a/common/src/test/java/org/apache/celeborn/common/network/sasl/CelebornSaslSuiteJ.java b/common/src/test/java/org/apache/celeborn/common/network/sasl/CelebornSaslSuiteJ.java index 582988e1d..10ed9851e 100644 --- a/common/src/test/java/org/apache/celeborn/common/network/sasl/CelebornSaslSuiteJ.java +++ b/common/src/test/java/org/apache/celeborn/common/network/sasl/CelebornSaslSuiteJ.java @@ -195,6 +195,7 @@ public class CelebornSaslSuiteJ { } } + @Override public void close() { if (client != null) { client.close(); diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java index c901341c5..2ddd1ec06 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java @@ -138,6 +138,7 @@ public class SingleMasterMetaManager extends AbstractMetaManager { updateMetaByReportWorkerUnavailable(failedNodes); } + @Override public void handleUpdatePartitionSize() { updatePartitionSize(); } diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java index 6862b5a11..7140cc6c6 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java @@ -21,8 +21,10 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NO import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY; import java.io.File; -import java.io.FileWriter; import java.io.IOException; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.*; import java.util.function.Consumer; import java.util.function.Function; @@ -55,7 +57,7 @@ public class SlotsAllocatorRackAwareSuiteJ { conf.set(CelebornConf.CLIENT_RESERVE_SLOTS_RACKAWARE_ENABLED().key(), "true"); File mapFile = File.createTempFile("testResolve1", ".txt"); - FileWriter mapFileWriter = new FileWriter(mapFile); + Writer mapFileWriter = Files.newBufferedWriter(mapFile.toPath(), StandardCharsets.UTF_8); mapFileWriter.write( "host1 /default/rack1\nhost2 /default/rack1\nhost3 /default/rack1\n" + "host4 /default/rack2\nhost5 /default/rack2\nhost6 /default/rack2\n"); @@ -82,6 +84,7 @@ public class SlotsAllocatorRackAwareSuiteJ { Consumer assertCustomer = new Consumer() { + @Override public void accept(PartitionLocation location) { Assert.assertNotEquals( resolver.resolve(location.getHost()).getNetworkLocation(), @@ -118,6 +121,7 @@ public class SlotsAllocatorRackAwareSuiteJ { Consumer assertConsumer = new Consumer() { + @Override public void accept(PartitionLocation location) { Assert.assertEquals( NetworkTopology.DEFAULT_RACK, @@ -141,6 +145,7 @@ public class SlotsAllocatorRackAwareSuiteJ { workers.forEach( new Consumer() { + @Override public void accept(WorkerInfo workerInfo) { workerInfo.networkLocation_$eq( resolver.resolve(workerInfo.host()).getNetworkLocation()); @@ -416,7 +421,7 @@ public class SlotsAllocatorRackAwareSuiteJ { int expected = (int) Math.ceil( - ((double) (numPartitions) + ((double) numPartitions / totalHosts * (1 + ((double) (maxHostsPerRack - secondMaxHostsPerRack + 1)) diff --git a/pom.xml b/pom.xml index aa4ed20bb..8ad8013c1 100644 --- a/pom.xml +++ b/pom.xml @@ -929,7 +929,7 @@ true -XDcompilePolicy=simple - -Xplugin:ErrorProne + -Xplugin:ErrorProne -XepExcludedPaths:.*/target/generated-sources/protobuf/.* -Xep:FutureReturnValueIgnored:OFF -Xep:TypeParameterUnusedInFormals:OFF -Xep:UnusedVariable:OFF diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java index 812414150..82b5714b8 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java @@ -41,6 +41,7 @@ public class SystemConfig extends DynamicConfig { return null; } + @Override public T getValue( String configKey, ConfigEntry configEntry, diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBIterator.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBIterator.java index 351b773df..15864fa54 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBIterator.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBIterator.java @@ -27,6 +27,7 @@ public interface DBIterator extends Iterator>, Closeab /** Position at the first entry in the source whose `key` is at target. */ void seek(byte[] key); + @Override default void remove() { throw new UnsupportedOperationException(); } diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java index ca38a49c4..3cba81cba 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java @@ -401,15 +401,18 @@ public abstract class FileWriter implements DeviceObserver { } } + @Override public int hashCode() { return fileInfo.getFilePath().hashCode(); } + @Override public boolean equals(Object obj) { return (obj instanceof FileWriter) && fileInfo.getFilePath().equals(((FileWriter) obj).fileInfo.getFilePath()); } + @Override public String toString() { return fileInfo.getFilePath(); } diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java index 2b306f025..42e21c926 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java @@ -91,6 +91,7 @@ public final class MapPartitionFileWriter extends FileWriter { } } + @Override public void write(ByteBuf data) throws IOException { data.markReaderIndex(); int partitionId = data.readInt(); @@ -155,13 +156,14 @@ public final class MapPartitionFileWriter extends FileWriter { deleted = true; } else { StorageManager.hadoopFs() - .create(new Path(Utils.getWriteSuccessFilePath((fileInfo.getIndexPath())))) + .create(new Path(Utils.getWriteSuccessFilePath(fileInfo.getIndexPath()))) .close(); } } }); } + @Override public synchronized void destroy(IOException ioException) { destroyIndex(); super.destroy(ioException); diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java index d9b89849b..768a69492 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java @@ -62,6 +62,7 @@ public final class ReducePartitionFileWriter extends FileWriter { this.nextBoundary = this.shuffleChunkSize; } + @Override protected void flush(boolean finalFlush) throws IOException { super.flush(finalFlush); maybeSetChunkOffsets(finalFlush); @@ -87,6 +88,7 @@ public final class ReducePartitionFileWriter extends FileWriter { return fileInfo.getLastChunkOffset() == fileInfo.getFileLength(); } + @Override public synchronized long close() throws IOException { return super.close( () -> {