diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index 54811357e..e825abce3 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -1261,4 +1261,10 @@ object Utils extends Logging { connectException || rpcTimeout || fetchChunkTimeout } + def makeOpenStreamRequestId( + shuffleKey: String, + clientChannelId: String, + rpcRequestId: Long): String = { + s"$shuffleKey-$clientChannelId-$rpcRequestId" + } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala index 568e0d193..a943de58a 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala @@ -355,7 +355,11 @@ class FetchHandler( callback: RpcResponseCallback): Unit = { checkAuth(client, Utils.splitShuffleKey(shuffleKey)._1) workerSource.recordAppActiveConnection(client, shuffleKey) - workerSource.startTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey) + val requestId = Utils.makeOpenStreamRequestId( + shuffleKey, + client.getChannel.id().toString, + rpcRequestId) + workerSource.startTimer(WorkerSource.OPEN_STREAM_TIME, requestId) try { val fileInfo = getRawFileInfo(shuffleKey, fileName) fileInfo.getFileMeta match { @@ -400,7 +404,7 @@ class FetchHandler( workerSource.incCounter(WorkerSource.OPEN_STREAM_FAIL_COUNT) handleRpcIOException(client, rpcRequestId, shuffleKey, fileName, e, callback) } finally { - workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey) + workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, requestId) } }