Revert "[CELEBORN-1376] Push data failed should always release request body"

This reverts commit b65b5433dc.

<!--
Thanks for sending a pull request!  Here are some tips for you:
  - Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] Your PR title ...'.
  - Be sure to keep the PR description updated to reflect all changes.
  - Please write your PR title to summarize what this PR proposes.
  - If possible, provide a concise example to reproduce the issue for a faster review.
-->

### What changes were proposed in this pull request?
Revert [CELEBORN-1376](https://github.com/apache/celeborn/pull/2449)
This pr will introduce reference count error when replica enable and workers randomly terminate

### Why are the changes needed?
When data replication is enabled and workers are randomly terminated there will be IllegalReferenceCountException `refCnt: 0, decrement: 1` which will fail the task.
![image](https://github.com/user-attachments/assets/bb4965e4-5fa2-44ad-bb88-36bd475a6b5f)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
cluster testing.

Closes #2992 from zaynt4606/clbr1376.

Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
This commit is contained in:
zhengtao 2024-12-13 11:20:11 +08:00 committed by Shuang
parent f7b036d4c7
commit c316fdbdfb
2 changed files with 5 additions and 51 deletions

View File

@ -111,7 +111,7 @@ public class FlinkShuffleClientImplSuiteJ {
@Test
public void testPushDataByteBufFail() throws IOException {
ByteBuf byteBuf = Unpooled.wrappedBuffer(TEST_BUF1);
when(client.pushData(any(), anyLong(), any(), any(), any()))
when(client.pushData(any(), anyLong(), any(), any()))
.thenAnswer(
t -> {
RpcResponseCallback rpcResponseCallback = t.getArgument(1, RpcResponseCallback.class);

View File

@ -216,23 +216,6 @@ public class TransportClient implements Closeable {
long pushDataTimeout,
RpcResponseCallback callback,
Runnable rpcSendoutCallback) {
Runnable rpcFailureCallback =
() -> {
try {
pushData.body().release();
} catch (Throwable e) {
logger.error("Error release buffer for PUSH_DATA request {}", pushData.requestId, e);
}
};
return pushData(pushData, pushDataTimeout, callback, rpcSendoutCallback, rpcFailureCallback);
}
public ChannelFuture pushData(
PushData pushData,
long pushDataTimeout,
RpcResponseCallback callback,
Runnable rpcSendoutCallback,
Runnable rpcFailureCallback) {
if (logger.isTraceEnabled()) {
logger.trace("Pushing data to {}", NettyUtils.getRemoteAddress(channel));
}
@ -242,8 +225,7 @@ public class TransportClient implements Closeable {
PushRequestInfo info = new PushRequestInfo(dueTime, callback);
handler.addPushRequest(requestId, info);
pushData.requestId = requestId;
PushChannelListener listener =
new PushChannelListener(requestId, rpcSendoutCallback, rpcFailureCallback);
PushChannelListener listener = new PushChannelListener(requestId, rpcSendoutCallback);
ChannelFuture channelFuture = channel.writeAndFlush(pushData).addListener(listener);
info.setChannelFuture(channelFuture);
return channelFuture;
@ -251,26 +233,6 @@ public class TransportClient implements Closeable {
public ChannelFuture pushMergedData(
PushMergedData pushMergedData, long pushDataTimeout, RpcResponseCallback callback) {
Runnable rpcFailureCallback =
() -> {
try {
pushMergedData.body().release();
} catch (Throwable e) {
logger.error(
"Error release buffer for PUSH_MERGED_DATA request {}",
pushMergedData.requestId,
e);
}
};
return pushMergedData(pushMergedData, pushDataTimeout, callback, null, rpcFailureCallback);
}
public ChannelFuture pushMergedData(
PushMergedData pushMergedData,
long pushDataTimeout,
RpcResponseCallback callback,
Runnable rpcSendoutCallback,
Runnable rpcFailureCallback) {
if (logger.isTraceEnabled()) {
logger.trace("Pushing merged data to {}", NettyUtils.getRemoteAddress(channel));
}
@ -281,8 +243,7 @@ public class TransportClient implements Closeable {
handler.addPushRequest(requestId, info);
pushMergedData.requestId = requestId;
PushChannelListener listener =
new PushChannelListener(requestId, rpcSendoutCallback, rpcFailureCallback);
PushChannelListener listener = new PushChannelListener(requestId);
ChannelFuture channelFuture = channel.writeAndFlush(pushMergedData).addListener(listener);
info.setChannelFuture(channelFuture);
return channelFuture;
@ -498,18 +459,14 @@ public class TransportClient implements Closeable {
final long pushRequestId;
Runnable rpcSendOutCallback;
Runnable rpcFailureCallback;
PushChannelListener(long pushRequestId) {
this(pushRequestId, null, null);
this(pushRequestId, null);
}
PushChannelListener(
long pushRequestId, Runnable rpcSendOutCallback, Runnable rpcFailureCallback) {
PushChannelListener(long pushRequestId, Runnable rpcSendOutCallback) {
super("PUSH " + pushRequestId);
this.pushRequestId = pushRequestId;
this.rpcSendOutCallback = rpcSendOutCallback;
this.rpcFailureCallback = rpcFailureCallback;
}
@Override
@ -523,9 +480,6 @@ public class TransportClient implements Closeable {
@Override
protected void handleFailure(String errorMsg, Throwable cause) {
handler.handlePushFailure(pushRequestId, errorMsg, cause);
if (rpcFailureCallback != null) {
rpcFailureCallback.run();
}
}
}
}