[CELEBORN-1999] OpenStreamTime should use requestId to record cost time

### What changes were proposed in this pull request?
OpenStreamTime should use requestId to record cost time instead of shuffleKey

### Why are the changes needed?
OpenStreamTime is wrong because there will be multiple OpenStream requests for the same shuffleKey in the same time period.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing UTs.

Closes #3258 from leixm/CELEBORN-1999.

Authored-by: Xianming Lei <xianming.lei@shopee.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Xianming Lei 2025-05-15 01:52:14 -07:00 committed by Wang, Fei
parent 8e66ac833a
commit d03efcbdb3
2 changed files with 12 additions and 2 deletions

View File

@ -1261,4 +1261,10 @@ object Utils extends Logging {
connectException || rpcTimeout || fetchChunkTimeout
}
def makeOpenStreamRequestId(
shuffleKey: String,
clientChannelId: String,
rpcRequestId: Long): String = {
s"$shuffleKey-$clientChannelId-$rpcRequestId"
}
}

View File

@ -355,7 +355,11 @@ class FetchHandler(
callback: RpcResponseCallback): Unit = {
checkAuth(client, Utils.splitShuffleKey(shuffleKey)._1)
workerSource.recordAppActiveConnection(client, shuffleKey)
workerSource.startTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey)
val requestId = Utils.makeOpenStreamRequestId(
shuffleKey,
client.getChannel.id().toString,
rpcRequestId)
workerSource.startTimer(WorkerSource.OPEN_STREAM_TIME, requestId)
try {
val fileInfo = getRawFileInfo(shuffleKey, fileName)
fileInfo.getFileMeta match {
@ -400,7 +404,7 @@ class FetchHandler(
workerSource.incCounter(WorkerSource.OPEN_STREAM_FAIL_COUNT)
handleRpcIOException(client, rpcRequestId, shuffleKey, fileName, e, callback)
} finally {
workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey)
workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, requestId)
}
}