Cleanup some code (#943)

This commit is contained in:
leesf 2022-11-11 13:58:39 +08:00 committed by GitHub
parent 6f043f8ae9
commit 0b8376e2c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 40 deletions

View File

@ -682,8 +682,8 @@ public class ShuffleClientImpl extends ShuffleClient {
// add batch data
logger.debug("Merge batch {}.", nextBatchId);
String addressPair = genAddressPair(loc);
boolean shoudPush = pushState.addBatchData(addressPair, loc, nextBatchId, body);
if (shoudPush) {
boolean shouldPush = pushState.addBatchData(addressPair, loc, nextBatchId, body);
if (shouldPush) {
limitMaxInFlight(mapKey, pushState, maxInFlight);
DataBatches dataBatches = pushState.takeDataBatches(addressPair);
doPushMergedData(
@ -705,6 +705,7 @@ public class ShuffleClientImpl extends ShuffleClient {
int shuffleId, int partitionId, String applicationId, PartitionLocation loc) {
Set<Integer> splittingSet =
splitting.computeIfAbsent(shuffleId, integer -> ConcurrentHashMap.newKeySet());
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (splittingSet) {
if (splittingSet.contains(partitionId)) {
logger.debug(

View File

@ -22,12 +22,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import scala.reflect.ClassTag$;
@ -53,14 +49,14 @@ import org.apache.celeborn.common.rpc.RpcEndpointRef;
public class ShuffleClientSuiteJ {
private ShuffleClientImpl shuffleClient;
private RpcEndpointRef endpointRef = mock(RpcEndpointRef.class);
private TransportClientFactory clientFactory = mock(TransportClientFactory.class);
private TransportClient client = mock(TransportClient.class);
private final RpcEndpointRef endpointRef = mock(RpcEndpointRef.class);
private final TransportClientFactory clientFactory = mock(TransportClientFactory.class);
private final TransportClient client = mock(TransportClient.class);
private static String TEST_APPLICATION_ID = "testapp1";
private static int TEST_SHUFFLE_ID = 1;
private static int TEST_ATTEMPT_ID = 0;
private static int TEST_REDUCRE_ID = 0;
private static final String TEST_APPLICATION_ID = "testapp1";
private static final int TEST_SHUFFLE_ID = 1;
private static final int TEST_ATTEMPT_ID = 0;
private static final int TEST_REDUCRE_ID = 0;
private static final int MASTER_RPC_PORT = 1234;
private static final int MASTER_PUSH_PORT = 1235;
@ -91,7 +87,7 @@ public class ShuffleClientSuiteJ {
SLAVE_REPLICATE_PORT,
PartitionLocation.Mode.SLAVE);
private static byte[] TEST_BUF1 = "hello world".getBytes(StandardCharsets.UTF_8);
private static final byte[] TEST_BUF1 = "hello world".getBytes(StandardCharsets.UTF_8);
private final int BATCH_HEADER_SIZE = 4 * 4;
@Test
@ -178,18 +174,6 @@ public class ShuffleClientSuiteJ {
}
}
private synchronized String getLocalHost() {
InetAddress ia = null;
if (ia == null) {
try {
ia = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
return null;
}
}
return ia.getHostName();
}
private CelebornConf setupEnv(CompressionCodec codec) throws IOException, InterruptedException {
CelebornConf conf = new CelebornConf();
conf.set("celeborn.shuffle.compression.codec", codec.name());
@ -221,8 +205,9 @@ public class ShuffleClientSuiteJ {
return null;
}
@SafeVarargs
@Override
public ChannelFuture addListeners(
public final ChannelFuture addListeners(
GenericFutureListener<? extends Future<? super Void>>... listeners) {
return null;
}
@ -233,14 +218,15 @@ public class ShuffleClientSuiteJ {
return null;
}
@SafeVarargs
@Override
public ChannelFuture removeListeners(
public final ChannelFuture removeListeners(
GenericFutureListener<? extends Future<? super Void>>... listeners) {
return null;
}
@Override
public ChannelFuture sync() throws InterruptedException {
public ChannelFuture sync() {
return null;
}
@ -250,7 +236,7 @@ public class ShuffleClientSuiteJ {
}
@Override
public ChannelFuture await() throws InterruptedException {
public ChannelFuture await() {
return null;
}
@ -280,12 +266,12 @@ public class ShuffleClientSuiteJ {
}
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
public boolean await(long timeout, TimeUnit unit) {
return true;
}
@Override
public boolean await(long timeoutMillis) throws InterruptedException {
public boolean await(long timeoutMillis) {
return true;
}
@ -320,13 +306,13 @@ public class ShuffleClientSuiteJ {
}
@Override
public Void get() throws InterruptedException, ExecutionException {
public Void get() {
return null;
}
@SuppressWarnings("NullableProblems")
@Override
public Void get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
public Void get(long timeout, TimeUnit unit) {
return null;
}
};

View File

@ -22,7 +22,6 @@ import java.nio.charset.StandardCharsets
import org.apache.commons.lang3.RandomStringUtils
import org.junit.Assert
import org.junit.runners.JUnit4
import org.apache.celeborn.client.{LifecycleManager, ShuffleClientImpl}
import org.apache.celeborn.common.CelebornConf
@ -51,24 +50,24 @@ trait ReadWriteTestBase extends Logging {
val LENGTH1 = DATA1.length
val dataSize1 = shuffleClient.pushData(APP, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 1, 1)
logInfo(s"push data data size ${dataSize1}")
logInfo(s"push data data size $dataSize1")
val STR2 = RandomStringUtils.random(32 * 1024)
val DATA2 = STR2.getBytes(StandardCharsets.UTF_8)
val OFFSET2 = 0
val LENGTH2 = DATA2.length
val dataSize2 = shuffleClient.pushData(APP, 1, 0, 0, 0, DATA2, OFFSET2, LENGTH2, 1, 1)
logInfo(s"push data data size ${dataSize2}")
logInfo("push data data size " + dataSize2)
val STR3 = RandomStringUtils.random(32 * 1024)
val DATA3 = STR3.getBytes(StandardCharsets.UTF_8)
val LENGTH3 = DATA3.length
val dataSize3 = shuffleClient.mergeData(APP, 1, 0, 0, 0, DATA3, 0, LENGTH3, 1, 1);
shuffleClient.mergeData(APP, 1, 0, 0, 0, DATA3, 0, LENGTH3, 1, 1)
val STR4 = RandomStringUtils.random(16 * 1024)
val DATA4 = STR4.getBytes(StandardCharsets.UTF_8)
val LENGTH4 = DATA4.length
val dataSize4 = shuffleClient.mergeData(APP, 1, 0, 0, 0, DATA4, 0, LENGTH4, 1, 1);
shuffleClient.mergeData(APP, 1, 0, 0, 0, DATA4, 0, LENGTH4, 1, 1)
shuffleClient.pushMergedData(APP, 1, 0, 0)
Thread.sleep(1000)