Skip to content

Commit

Permalink
[CELEBORN-1283] TransportClientFactory avoid contention and get or cr…
Browse files Browse the repository at this point in the history
…eate clientPools quickly

### What changes were proposed in this pull request?

`TransportClientFactory` avoid contention and get or create clientPools quickly.

### Why are the changes needed?

Avoid contention for getting or creating clientPools, and clean up the code.

Backport: [[SPARK-38555][NETWORK][SHUFFLE] Avoid contention and get or create clientPools quickly in the TransportClientFactory](apache/spark#35860)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No.

Closes #2322 from SteNicholas/CELEBORN-1283.

Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit ff0cf15)
Signed-off-by: mingji <[email protected]>
  • Loading branch information
SteNicholas authored and FMX committed Feb 23, 2024
1 parent 5680d3e commit bba7384
Showing 1 changed file with 3 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,9 @@ public TransportClient createClient(
InetSocketAddress.createUnresolved(remoteHost, remotePort);

// Create the ClientPool if we don't have it yet.
ClientPool clientPool = connectionPool.get(unresolvedAddress);
if (clientPool == null) {
connectionPool.computeIfAbsent(
unresolvedAddress, key -> new ClientPool(numConnectionsPerPeer));
clientPool = connectionPool.get(unresolvedAddress);
}

ClientPool clientPool =
connectionPool.computeIfAbsent(
unresolvedAddress, key -> new ClientPool(numConnectionsPerPeer));
int clientIndex =
partitionId < 0 ? rand.nextInt(numConnectionsPerPeer) : partitionId % numConnectionsPerPeer;
TransportClient cachedClient = clientPool.clients[clientIndex];
Expand Down

0 comments on commit bba7384

Please sign in to comment.