[CELEBORN-1283] TransportClientFactory avoid contention and get or create clientPools quickly

### What changes were proposed in this pull request?

`TransportClientFactory` avoid contention and get or create clientPools quickly.

### Why are the changes needed?

Avoid contention for getting or creating clientPools, and clean up the code.

Backport: [[SPARK-38555][NETWORK][SHUFFLE] Avoid contention and get or create clientPools quickly in the TransportClientFactory](https://github.com/apache/spark/pull/35860)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No.

Closes #2322 from SteNicholas/CELEBORN-1283.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
SteNicholas 2024-02-23 15:30:24 +08:00 committed by mingji
parent b9bdea3c72
commit ff0cf15770
No known key found for this signature in database
GPG Key ID: 6392F71F37356FA0

View File

@ -136,13 +136,9 @@ public class TransportClientFactory implements Closeable {
InetSocketAddress.createUnresolved(remoteHost, remotePort);
// Create the ClientPool if we don't have it yet.
ClientPool clientPool = connectionPool.get(unresolvedAddress);
if (clientPool == null) {
connectionPool.computeIfAbsent(
unresolvedAddress, key -> new ClientPool(numConnectionsPerPeer));
clientPool = connectionPool.get(unresolvedAddress);
}
ClientPool clientPool =
connectionPool.computeIfAbsent(
unresolvedAddress, key -> new ClientPool(numConnectionsPerPeer));
int clientIndex =
partitionId < 0 ? rand.nextInt(numConnectionsPerPeer) : partitionId % numConnectionsPerPeer;
TransportClient cachedClient = clientPool.clients[clientIndex];