From f09482108fe91f986b9cfceebd4b050379bf6f64 Mon Sep 17 00:00:00 2001 From: codenohup Date: Fri, 21 Feb 2025 13:50:54 +0800 Subject: [PATCH] [CELEBORN-1867][FLINK] Fix flink client memory leak of TransportResponseHandler#outstandingRpcs for handling addCredit and notifyRequiredSegment response ### What changes were proposed in this pull request? When I tested the performance of Flink with Celeborn in session mode, using both the shuffle-service plugin and hybrid shuffle integration strategies, I noticed that the task heap continuously grew even when no jobs were running. The issue arises because the Celeborn client sends addCredit or notifyRequiredSegment requests, expecting a response. This creates a callback and maintains a reference to CelebornBufferStream, SingleInputGate, and StreamTask. These callbacks are stored in TransportResponseHandler#outstandingRpcs and are cleared upon receiving a response. However, the Worker does not send a response for these two RPC requests, which leads to a significant memory leak in the client. ### Why are the changes needed? It may cause flink client memory leak. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Test by rerun TPCDS in Flink session mode. Closes #3103 from codenohup/celeborn-1867. Authored-by: codenohup Signed-off-by: Shuang --- .../service/deploy/worker/FetchHandler.scala | 30 +++++++++++++++---- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala index 3c99639eb..c58c535fc 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala @@ -32,7 +32,7 @@ import org.apache.celeborn.common.CelebornConf.MAX_CHUNKS_BEING_TRANSFERRED import org.apache.celeborn.common.exception.CelebornIOException import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MapFileMeta, MemoryFileInfo, ReduceFileMeta} -import org.apache.celeborn.common.network.buffer.{FileChunkBuffers, MemoryChunkBuffers, NioManagedBuffer} +import org.apache.celeborn.common.network.buffer.{FileChunkBuffers, MemoryChunkBuffers, NettyManagedBuffer, NioManagedBuffer} import org.apache.celeborn.common.network.client.{RpcResponseCallback, TransportClient} import org.apache.celeborn.common.network.protocol._ import org.apache.celeborn.common.network.server.BaseMessageHandler @@ -103,7 +103,7 @@ class FetchHandler( case r: BufferStreamEnd => handleEndStreamFromClient(client, r.getStreamId) case r: ReadAddCredit => - handleReadAddCredit(client, r.getCredit, r.getStreamId) + handleReadAddCredit(client, r.getCredit, r.getStreamId, -1) case r: ChunkFetchRequest => handleChunkFetchRequest(client, r.streamChunkSlice, r) case unknown: RequestMessage => @@ -170,13 +170,18 @@ class FetchHandler( bufferStreamEnd.getStreamId, bufferStreamEnd.getStreamType) case readAddCredit: PbReadAddCredit => - handleReadAddCredit(client, readAddCredit.getCredit, readAddCredit.getStreamId) + handleReadAddCredit( + client, + readAddCredit.getCredit, + readAddCredit.getStreamId, + rpcRequest.requestId) case notifyRequiredSegment: PbNotifyRequiredSegment => handleNotifyRequiredSegment( client, notifyRequiredSegment.getRequiredSegmentId, notifyRequiredSegment.getStreamId, - notifyRequiredSegment.getSubPartitionId) + notifyRequiredSegment.getSubPartitionId, + rpcRequest.requestId) case chunkFetchRequest: PbChunkFetchRequest => handleChunkFetchRequest( client, @@ -480,13 +485,22 @@ class FetchHandler( } } - def handleReadAddCredit(client: TransportClient, credit: Int, streamId: Long): Unit = { + def handleReadAddCredit( + client: TransportClient, + credit: Int, + streamId: Long, + requestId: Long): Unit = { val shuffleKey = creditStreamManager.getStreamShuffleKey(streamId) if (shuffleKey != null) { workerSource.recordAppActiveConnection( client, shuffleKey) creditStreamManager.addCredit(credit, streamId) + if (requestId != -1) { + client.getChannel.writeAndFlush(new RpcResponse( + requestId, + NettyManagedBuffer.EmptyBuffer)) + } } } @@ -494,7 +508,8 @@ class FetchHandler( client: TransportClient, requiredSegmentId: Int, streamId: Long, - subPartitionId: Int): Unit = { + subPartitionId: Int, + requestId: Long): Unit = { // process NotifyRequiredSegment request here, the MapPartitionDataReader will send data if the segment buffer is available. logDebug( s"Handle NotifyRequiredSegment with streamId: $streamId, requiredSegmentId: $requiredSegmentId") @@ -504,6 +519,9 @@ class FetchHandler( client, shuffleKey) creditStreamManager.notifyRequiredSegment(requiredSegmentId, streamId, subPartitionId) + client.getChannel.writeAndFlush(new RpcResponse( + requestId, + NettyManagedBuffer.EmptyBuffer)) } }