diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index 4ec1d35ee..9cecb1403 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -682,8 +682,8 @@ public class ShuffleClientImpl extends ShuffleClient { // add batch data logger.debug("Merge batch {}.", nextBatchId); String addressPair = genAddressPair(loc); - boolean shoudPush = pushState.addBatchData(addressPair, loc, nextBatchId, body); - if (shoudPush) { + boolean shouldPush = pushState.addBatchData(addressPair, loc, nextBatchId, body); + if (shouldPush) { limitMaxInFlight(mapKey, pushState, maxInFlight); DataBatches dataBatches = pushState.takeDataBatches(addressPair); doPushMergedData( @@ -705,6 +705,7 @@ public class ShuffleClientImpl extends ShuffleClient { int shuffleId, int partitionId, String applicationId, PartitionLocation loc) { Set splittingSet = splitting.computeIfAbsent(shuffleId, integer -> ConcurrentHashMap.newKeySet()); + //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (splittingSet) { if (splittingSet.contains(partitionId)) { logger.debug( diff --git a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java index ece87c8ae..e2be945d5 100644 --- a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java +++ b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java @@ -22,12 +22,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import scala.reflect.ClassTag$; @@ -53,14 +49,14 @@ import org.apache.celeborn.common.rpc.RpcEndpointRef; public class ShuffleClientSuiteJ { private ShuffleClientImpl shuffleClient; - private RpcEndpointRef endpointRef = mock(RpcEndpointRef.class); - private TransportClientFactory clientFactory = mock(TransportClientFactory.class); - private TransportClient client = mock(TransportClient.class); + private final RpcEndpointRef endpointRef = mock(RpcEndpointRef.class); + private final TransportClientFactory clientFactory = mock(TransportClientFactory.class); + private final TransportClient client = mock(TransportClient.class); - private static String TEST_APPLICATION_ID = "testapp1"; - private static int TEST_SHUFFLE_ID = 1; - private static int TEST_ATTEMPT_ID = 0; - private static int TEST_REDUCRE_ID = 0; + private static final String TEST_APPLICATION_ID = "testapp1"; + private static final int TEST_SHUFFLE_ID = 1; + private static final int TEST_ATTEMPT_ID = 0; + private static final int TEST_REDUCRE_ID = 0; private static final int MASTER_RPC_PORT = 1234; private static final int MASTER_PUSH_PORT = 1235; @@ -91,7 +87,7 @@ public class ShuffleClientSuiteJ { SLAVE_REPLICATE_PORT, PartitionLocation.Mode.SLAVE); - private static byte[] TEST_BUF1 = "hello world".getBytes(StandardCharsets.UTF_8); + private static final byte[] TEST_BUF1 = "hello world".getBytes(StandardCharsets.UTF_8); private final int BATCH_HEADER_SIZE = 4 * 4; @Test @@ -178,18 +174,6 @@ public class ShuffleClientSuiteJ { } } - private synchronized String getLocalHost() { - InetAddress ia = null; - if (ia == null) { - try { - ia = InetAddress.getLocalHost(); - } catch (UnknownHostException e) { - return null; - } - } - return ia.getHostName(); - } - private CelebornConf setupEnv(CompressionCodec codec) throws IOException, InterruptedException { CelebornConf conf = new CelebornConf(); conf.set("celeborn.shuffle.compression.codec", codec.name()); @@ -221,8 +205,9 @@ public class ShuffleClientSuiteJ { return null; } + @SafeVarargs @Override - public ChannelFuture addListeners( + public final ChannelFuture addListeners( GenericFutureListener>... listeners) { return null; } @@ -233,14 +218,15 @@ public class ShuffleClientSuiteJ { return null; } + @SafeVarargs @Override - public ChannelFuture removeListeners( + public final ChannelFuture removeListeners( GenericFutureListener>... listeners) { return null; } @Override - public ChannelFuture sync() throws InterruptedException { + public ChannelFuture sync() { return null; } @@ -250,7 +236,7 @@ public class ShuffleClientSuiteJ { } @Override - public ChannelFuture await() throws InterruptedException { + public ChannelFuture await() { return null; } @@ -280,12 +266,12 @@ public class ShuffleClientSuiteJ { } @Override - public boolean await(long timeout, TimeUnit unit) throws InterruptedException { + public boolean await(long timeout, TimeUnit unit) { return true; } @Override - public boolean await(long timeoutMillis) throws InterruptedException { + public boolean await(long timeoutMillis) { return true; } @@ -320,13 +306,13 @@ public class ShuffleClientSuiteJ { } @Override - public Void get() throws InterruptedException, ExecutionException { + public Void get() { return null; } + @SuppressWarnings("NullableProblems") @Override - public Void get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { + public Void get(long timeout, TimeUnit unit) { return null; } }; diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala index 802e0d0ce..b2e3fc6c0 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala @@ -22,7 +22,6 @@ import java.nio.charset.StandardCharsets import org.apache.commons.lang3.RandomStringUtils import org.junit.Assert -import org.junit.runners.JUnit4 import org.apache.celeborn.client.{LifecycleManager, ShuffleClientImpl} import org.apache.celeborn.common.CelebornConf @@ -51,24 +50,24 @@ trait ReadWriteTestBase extends Logging { val LENGTH1 = DATA1.length val dataSize1 = shuffleClient.pushData(APP, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 1, 1) - logInfo(s"push data data size ${dataSize1}") + logInfo(s"push data data size $dataSize1") val STR2 = RandomStringUtils.random(32 * 1024) val DATA2 = STR2.getBytes(StandardCharsets.UTF_8) val OFFSET2 = 0 val LENGTH2 = DATA2.length val dataSize2 = shuffleClient.pushData(APP, 1, 0, 0, 0, DATA2, OFFSET2, LENGTH2, 1, 1) - logInfo(s"push data data size ${dataSize2}") + logInfo("push data data size " + dataSize2) val STR3 = RandomStringUtils.random(32 * 1024) val DATA3 = STR3.getBytes(StandardCharsets.UTF_8) val LENGTH3 = DATA3.length - val dataSize3 = shuffleClient.mergeData(APP, 1, 0, 0, 0, DATA3, 0, LENGTH3, 1, 1); + shuffleClient.mergeData(APP, 1, 0, 0, 0, DATA3, 0, LENGTH3, 1, 1) val STR4 = RandomStringUtils.random(16 * 1024) val DATA4 = STR4.getBytes(StandardCharsets.UTF_8) val LENGTH4 = DATA4.length - val dataSize4 = shuffleClient.mergeData(APP, 1, 0, 0, 0, DATA4, 0, LENGTH4, 1, 1); + shuffleClient.mergeData(APP, 1, 0, 0, 0, DATA4, 0, LENGTH4, 1, 1) shuffleClient.pushMergedData(APP, 1, 0, 0) Thread.sleep(1000)