[CELEBORN-1376] Push data failed should always release request body

### What changes were proposed in this pull request?
Worker netty not release
<img width="1729" alt="截屏2024-04-07 17 26 40" src="https://github.com/apache/celeborn/assets/46485123/5774f735-570b-448e-ab94-4c78661717f5">

Many push failed
<img width="767" alt="截屏2024-04-07 17 27 46" src="https://github.com/apache/celeborn/assets/46485123/41866bd0-d634-4dbf-8518-b474c8d1faad">

1. For spark shuffle client, enable it release push data body when rpc failure
2. For flink client, since it use wrapped bytbuf, we need release push data body when rpc failure and release origin body when rpc completed.
3. For worker replicate, we should enable it release push data body when rpc failure.

### Why are the changes needed?
Avoid worker netty memory leak

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #2449 from AngersZhuuuu/CELEBORN-1376.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
Angerszhuuuu 2024-04-10 19:42:14 +08:00 committed by mingji
parent f25972d003
commit b65b5433dc
No known key found for this signature in database
GPG Key ID: 6392F71F37356FA0
2 changed files with 51 additions and 5 deletions

View File

@ -111,7 +111,7 @@ public class FlinkShuffleClientImplSuiteJ {
@Test
public void testPushDataByteBufFail() throws IOException {
ByteBuf byteBuf = Unpooled.wrappedBuffer(TEST_BUF1);
when(client.pushData(any(), anyLong(), any(), any()))
when(client.pushData(any(), anyLong(), any(), any(), any()))
.thenAnswer(
t -> {
RpcResponseCallback rpcResponseCallback = t.getArgument(1, RpcResponseCallback.class);

View File

@ -216,6 +216,23 @@ public class TransportClient implements Closeable {
long pushDataTimeout,
RpcResponseCallback callback,
Runnable rpcSendoutCallback) {
Runnable rpcFailureCallback =
() -> {
try {
pushData.body().release();
} catch (Throwable e) {
logger.error("Error release buffer for PUSH_DATA request {}", pushData.requestId, e);
}
};
return pushData(pushData, pushDataTimeout, callback, rpcSendoutCallback, rpcFailureCallback);
}
public ChannelFuture pushData(
PushData pushData,
long pushDataTimeout,
RpcResponseCallback callback,
Runnable rpcSendoutCallback,
Runnable rpcFailureCallback) {
if (logger.isTraceEnabled()) {
logger.trace("Pushing data to {}", NettyUtils.getRemoteAddress(channel));
}
@ -225,7 +242,8 @@ public class TransportClient implements Closeable {
PushRequestInfo info = new PushRequestInfo(dueTime, callback);
handler.addPushRequest(requestId, info);
pushData.requestId = requestId;
PushChannelListener listener = new PushChannelListener(requestId, rpcSendoutCallback);
PushChannelListener listener =
new PushChannelListener(requestId, rpcSendoutCallback, rpcFailureCallback);
ChannelFuture channelFuture = channel.writeAndFlush(pushData).addListener(listener);
info.setChannelFuture(channelFuture);
return channelFuture;
@ -233,6 +251,26 @@ public class TransportClient implements Closeable {
public ChannelFuture pushMergedData(
PushMergedData pushMergedData, long pushDataTimeout, RpcResponseCallback callback) {
Runnable rpcFailureCallback =
() -> {
try {
pushMergedData.body().release();
} catch (Throwable e) {
logger.error(
"Error release buffer for PUSH_MERGED_DATA request {}",
pushMergedData.requestId,
e);
}
};
return pushMergedData(pushMergedData, pushDataTimeout, callback, null, rpcFailureCallback);
}
public ChannelFuture pushMergedData(
PushMergedData pushMergedData,
long pushDataTimeout,
RpcResponseCallback callback,
Runnable rpcSendoutCallback,
Runnable rpcFailureCallback) {
if (logger.isTraceEnabled()) {
logger.trace("Pushing merged data to {}", NettyUtils.getRemoteAddress(channel));
}
@ -243,7 +281,8 @@ public class TransportClient implements Closeable {
handler.addPushRequest(requestId, info);
pushMergedData.requestId = requestId;
PushChannelListener listener = new PushChannelListener(requestId);
PushChannelListener listener =
new PushChannelListener(requestId, rpcSendoutCallback, rpcFailureCallback);
ChannelFuture channelFuture = channel.writeAndFlush(pushMergedData).addListener(listener);
info.setChannelFuture(channelFuture);
return channelFuture;
@ -417,14 +456,18 @@ public class TransportClient implements Closeable {
final long pushRequestId;
Runnable rpcSendOutCallback;
Runnable rpcFailureCallback;
PushChannelListener(long pushRequestId) {
this(pushRequestId, null);
this(pushRequestId, null, null);
}
PushChannelListener(long pushRequestId, Runnable rpcSendOutCallback) {
PushChannelListener(
long pushRequestId, Runnable rpcSendOutCallback, Runnable rpcFailureCallback) {
super("PUSH " + pushRequestId);
this.pushRequestId = pushRequestId;
this.rpcSendOutCallback = rpcSendOutCallback;
this.rpcFailureCallback = rpcFailureCallback;
}
@Override
@ -438,6 +481,9 @@ public class TransportClient implements Closeable {
@Override
protected void handleFailure(String errorMsg, Throwable cause) {
handler.handlePushFailure(pushRequestId, errorMsg, cause);
if (rpcFailureCallback != null) {
rpcFailureCallback.run();
}
}
}
}