Revert "[CELEBORN-1376] Push data failed should always release request body"
This reverts commit b65b5433dc.
<!--
Thanks for sending a pull request! Here are some tips for you:
- Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] Your PR title ...'.
- Be sure to keep the PR description updated to reflect all changes.
- Please write your PR title to summarize what this PR proposes.
- If possible, provide a concise example to reproduce the issue for a faster review.
-->
### What changes were proposed in this pull request?
Revert [CELEBORN-1376](https://github.com/apache/celeborn/pull/2449)
This pr will introduce reference count error when replica enable and workers randomly terminate
### Why are the changes needed?
When data replication is enabled and workers are randomly terminated there will be IllegalReferenceCountException `refCnt: 0, decrement: 1` which will fail the task.

### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
cluster testing.
Closes #2992 from zaynt4606/clbr1376.
Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
This commit is contained in:
parent
f7b036d4c7
commit
c316fdbdfb
@ -111,7 +111,7 @@ public class FlinkShuffleClientImplSuiteJ {
|
||||
@Test
|
||||
public void testPushDataByteBufFail() throws IOException {
|
||||
ByteBuf byteBuf = Unpooled.wrappedBuffer(TEST_BUF1);
|
||||
when(client.pushData(any(), anyLong(), any(), any(), any()))
|
||||
when(client.pushData(any(), anyLong(), any(), any()))
|
||||
.thenAnswer(
|
||||
t -> {
|
||||
RpcResponseCallback rpcResponseCallback = t.getArgument(1, RpcResponseCallback.class);
|
||||
|
||||
@ -216,23 +216,6 @@ public class TransportClient implements Closeable {
|
||||
long pushDataTimeout,
|
||||
RpcResponseCallback callback,
|
||||
Runnable rpcSendoutCallback) {
|
||||
Runnable rpcFailureCallback =
|
||||
() -> {
|
||||
try {
|
||||
pushData.body().release();
|
||||
} catch (Throwable e) {
|
||||
logger.error("Error release buffer for PUSH_DATA request {}", pushData.requestId, e);
|
||||
}
|
||||
};
|
||||
return pushData(pushData, pushDataTimeout, callback, rpcSendoutCallback, rpcFailureCallback);
|
||||
}
|
||||
|
||||
public ChannelFuture pushData(
|
||||
PushData pushData,
|
||||
long pushDataTimeout,
|
||||
RpcResponseCallback callback,
|
||||
Runnable rpcSendoutCallback,
|
||||
Runnable rpcFailureCallback) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Pushing data to {}", NettyUtils.getRemoteAddress(channel));
|
||||
}
|
||||
@ -242,8 +225,7 @@ public class TransportClient implements Closeable {
|
||||
PushRequestInfo info = new PushRequestInfo(dueTime, callback);
|
||||
handler.addPushRequest(requestId, info);
|
||||
pushData.requestId = requestId;
|
||||
PushChannelListener listener =
|
||||
new PushChannelListener(requestId, rpcSendoutCallback, rpcFailureCallback);
|
||||
PushChannelListener listener = new PushChannelListener(requestId, rpcSendoutCallback);
|
||||
ChannelFuture channelFuture = channel.writeAndFlush(pushData).addListener(listener);
|
||||
info.setChannelFuture(channelFuture);
|
||||
return channelFuture;
|
||||
@ -251,26 +233,6 @@ public class TransportClient implements Closeable {
|
||||
|
||||
public ChannelFuture pushMergedData(
|
||||
PushMergedData pushMergedData, long pushDataTimeout, RpcResponseCallback callback) {
|
||||
Runnable rpcFailureCallback =
|
||||
() -> {
|
||||
try {
|
||||
pushMergedData.body().release();
|
||||
} catch (Throwable e) {
|
||||
logger.error(
|
||||
"Error release buffer for PUSH_MERGED_DATA request {}",
|
||||
pushMergedData.requestId,
|
||||
e);
|
||||
}
|
||||
};
|
||||
return pushMergedData(pushMergedData, pushDataTimeout, callback, null, rpcFailureCallback);
|
||||
}
|
||||
|
||||
public ChannelFuture pushMergedData(
|
||||
PushMergedData pushMergedData,
|
||||
long pushDataTimeout,
|
||||
RpcResponseCallback callback,
|
||||
Runnable rpcSendoutCallback,
|
||||
Runnable rpcFailureCallback) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Pushing merged data to {}", NettyUtils.getRemoteAddress(channel));
|
||||
}
|
||||
@ -281,8 +243,7 @@ public class TransportClient implements Closeable {
|
||||
handler.addPushRequest(requestId, info);
|
||||
pushMergedData.requestId = requestId;
|
||||
|
||||
PushChannelListener listener =
|
||||
new PushChannelListener(requestId, rpcSendoutCallback, rpcFailureCallback);
|
||||
PushChannelListener listener = new PushChannelListener(requestId);
|
||||
ChannelFuture channelFuture = channel.writeAndFlush(pushMergedData).addListener(listener);
|
||||
info.setChannelFuture(channelFuture);
|
||||
return channelFuture;
|
||||
@ -498,18 +459,14 @@ public class TransportClient implements Closeable {
|
||||
final long pushRequestId;
|
||||
Runnable rpcSendOutCallback;
|
||||
|
||||
Runnable rpcFailureCallback;
|
||||
|
||||
PushChannelListener(long pushRequestId) {
|
||||
this(pushRequestId, null, null);
|
||||
this(pushRequestId, null);
|
||||
}
|
||||
|
||||
PushChannelListener(
|
||||
long pushRequestId, Runnable rpcSendOutCallback, Runnable rpcFailureCallback) {
|
||||
PushChannelListener(long pushRequestId, Runnable rpcSendOutCallback) {
|
||||
super("PUSH " + pushRequestId);
|
||||
this.pushRequestId = pushRequestId;
|
||||
this.rpcSendOutCallback = rpcSendOutCallback;
|
||||
this.rpcFailureCallback = rpcFailureCallback;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -523,9 +480,6 @@ public class TransportClient implements Closeable {
|
||||
@Override
|
||||
protected void handleFailure(String errorMsg, Throwable cause) {
|
||||
handler.handlePushFailure(pushRequestId, errorMsg, cause);
|
||||
if (rpcFailureCallback != null) {
|
||||
rpcFailureCallback.run();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user