From d03efcbdb32cb8acac19108beb6e73f7f9794aea Mon Sep 17 00:00:00 2001 From: Xianming Lei Date: Thu, 15 May 2025 01:52:14 -0700 Subject: [PATCH] [CELEBORN-1999] OpenStreamTime should use requestId to record cost time ### What changes were proposed in this pull request? OpenStreamTime should use requestId to record cost time instead of shuffleKey ### Why are the changes needed? OpenStreamTime is wrong because there will be multiple OpenStream requests for the same shuffleKey in the same time period. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. Closes #3258 from leixm/CELEBORN-1999. Authored-by: Xianming Lei Signed-off-by: Wang, Fei --- .../scala/org/apache/celeborn/common/util/Utils.scala | 6 ++++++ .../celeborn/service/deploy/worker/FetchHandler.scala | 8 ++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index 54811357e..e825abce3 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -1261,4 +1261,10 @@ object Utils extends Logging { connectException || rpcTimeout || fetchChunkTimeout } + def makeOpenStreamRequestId( + shuffleKey: String, + clientChannelId: String, + rpcRequestId: Long): String = { + s"$shuffleKey-$clientChannelId-$rpcRequestId" + } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala index 568e0d193..a943de58a 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala @@ -355,7 +355,11 @@ class FetchHandler( callback: RpcResponseCallback): Unit = { checkAuth(client, Utils.splitShuffleKey(shuffleKey)._1) workerSource.recordAppActiveConnection(client, shuffleKey) - workerSource.startTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey) + val requestId = Utils.makeOpenStreamRequestId( + shuffleKey, + client.getChannel.id().toString, + rpcRequestId) + workerSource.startTimer(WorkerSource.OPEN_STREAM_TIME, requestId) try { val fileInfo = getRawFileInfo(shuffleKey, fileName) fileInfo.getFileMeta match { @@ -400,7 +404,7 @@ class FetchHandler( workerSource.incCounter(WorkerSource.OPEN_STREAM_FAIL_COUNT) handleRpcIOException(client, rpcRequestId, shuffleKey, fileName, e, callback) } finally { - workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey) + workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, requestId) } }