[CELEBORN-1867][FLINK] Fix flink client memory leak of TransportResponseHandler#outstandingRpcs for handling addCredit and notifyRequiredSegment response
### What changes were proposed in this pull request? When I tested the performance of Flink with Celeborn in session mode, using both the shuffle-service plugin and hybrid shuffle integration strategies, I noticed that the task heap continuously grew even when no jobs were running. The issue arises because the Celeborn client sends addCredit or notifyRequiredSegment requests, expecting a response. This creates a callback and maintains a reference to CelebornBufferStream, SingleInputGate, and StreamTask. These callbacks are stored in TransportResponseHandler#outstandingRpcs and are cleared upon receiving a response. However, the Worker does not send a response for these two RPC requests, which leads to a significant memory leak in the client. ### Why are the changes needed? It may cause flink client memory leak. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Test by rerun TPCDS in Flink session mode. Closes #3103 from codenohup/celeborn-1867. Authored-by: codenohup <huangxu.walker@gmail.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
This commit is contained in:
parent
27c6605c4a
commit
f09482108f
@ -32,7 +32,7 @@ import org.apache.celeborn.common.CelebornConf.MAX_CHUNKS_BEING_TRANSFERRED
|
||||
import org.apache.celeborn.common.exception.CelebornIOException
|
||||
import org.apache.celeborn.common.internal.Logging
|
||||
import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MapFileMeta, MemoryFileInfo, ReduceFileMeta}
|
||||
import org.apache.celeborn.common.network.buffer.{FileChunkBuffers, MemoryChunkBuffers, NioManagedBuffer}
|
||||
import org.apache.celeborn.common.network.buffer.{FileChunkBuffers, MemoryChunkBuffers, NettyManagedBuffer, NioManagedBuffer}
|
||||
import org.apache.celeborn.common.network.client.{RpcResponseCallback, TransportClient}
|
||||
import org.apache.celeborn.common.network.protocol._
|
||||
import org.apache.celeborn.common.network.server.BaseMessageHandler
|
||||
@ -103,7 +103,7 @@ class FetchHandler(
|
||||
case r: BufferStreamEnd =>
|
||||
handleEndStreamFromClient(client, r.getStreamId)
|
||||
case r: ReadAddCredit =>
|
||||
handleReadAddCredit(client, r.getCredit, r.getStreamId)
|
||||
handleReadAddCredit(client, r.getCredit, r.getStreamId, -1)
|
||||
case r: ChunkFetchRequest =>
|
||||
handleChunkFetchRequest(client, r.streamChunkSlice, r)
|
||||
case unknown: RequestMessage =>
|
||||
@ -170,13 +170,18 @@ class FetchHandler(
|
||||
bufferStreamEnd.getStreamId,
|
||||
bufferStreamEnd.getStreamType)
|
||||
case readAddCredit: PbReadAddCredit =>
|
||||
handleReadAddCredit(client, readAddCredit.getCredit, readAddCredit.getStreamId)
|
||||
handleReadAddCredit(
|
||||
client,
|
||||
readAddCredit.getCredit,
|
||||
readAddCredit.getStreamId,
|
||||
rpcRequest.requestId)
|
||||
case notifyRequiredSegment: PbNotifyRequiredSegment =>
|
||||
handleNotifyRequiredSegment(
|
||||
client,
|
||||
notifyRequiredSegment.getRequiredSegmentId,
|
||||
notifyRequiredSegment.getStreamId,
|
||||
notifyRequiredSegment.getSubPartitionId)
|
||||
notifyRequiredSegment.getSubPartitionId,
|
||||
rpcRequest.requestId)
|
||||
case chunkFetchRequest: PbChunkFetchRequest =>
|
||||
handleChunkFetchRequest(
|
||||
client,
|
||||
@ -480,13 +485,22 @@ class FetchHandler(
|
||||
}
|
||||
}
|
||||
|
||||
def handleReadAddCredit(client: TransportClient, credit: Int, streamId: Long): Unit = {
|
||||
def handleReadAddCredit(
|
||||
client: TransportClient,
|
||||
credit: Int,
|
||||
streamId: Long,
|
||||
requestId: Long): Unit = {
|
||||
val shuffleKey = creditStreamManager.getStreamShuffleKey(streamId)
|
||||
if (shuffleKey != null) {
|
||||
workerSource.recordAppActiveConnection(
|
||||
client,
|
||||
shuffleKey)
|
||||
creditStreamManager.addCredit(credit, streamId)
|
||||
if (requestId != -1) {
|
||||
client.getChannel.writeAndFlush(new RpcResponse(
|
||||
requestId,
|
||||
NettyManagedBuffer.EmptyBuffer))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -494,7 +508,8 @@ class FetchHandler(
|
||||
client: TransportClient,
|
||||
requiredSegmentId: Int,
|
||||
streamId: Long,
|
||||
subPartitionId: Int): Unit = {
|
||||
subPartitionId: Int,
|
||||
requestId: Long): Unit = {
|
||||
// process NotifyRequiredSegment request here, the MapPartitionDataReader will send data if the segment buffer is available.
|
||||
logDebug(
|
||||
s"Handle NotifyRequiredSegment with streamId: $streamId, requiredSegmentId: $requiredSegmentId")
|
||||
@ -504,6 +519,9 @@ class FetchHandler(
|
||||
client,
|
||||
shuffleKey)
|
||||
creditStreamManager.notifyRequiredSegment(requiredSegmentId, streamId, subPartitionId)
|
||||
client.getChannel.writeAndFlush(new RpcResponse(
|
||||
requestId,
|
||||
NettyManagedBuffer.EmptyBuffer))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user