From bbd3bb4814a7505d6b7499ab1029997958564705 Mon Sep 17 00:00:00 2001 From: Xianming Lei Date: Wed, 11 Jun 2025 10:54:24 -0700 Subject: [PATCH] [CELEBORN-2033] updateProduceBytes should be called even if updateProduceBytes throws exception MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? updateProduceBytes should be called even if updateProduceBytes throws exception ### Why are the changes needed? To make UserProduceSpeed ​​metrics more accurate ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. Closes #3322 from leixm/CELEBORN-2033. Authored-by: Xianming Lei Signed-off-by: Wang, Fei --- .../celeborn/service/deploy/worker/storage/TierWriter.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala index 278b6f945..00dd90bd5 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala @@ -412,6 +412,8 @@ class LocalTierWriter( override def writeInternal(buf: ByteBuf): Unit = { val numBytes = buf.readableBytes() + if (userCongestionControlContext != null) + userCongestionControlContext.updateProduceBytes(numBytes) val flushBufferReadableBytes = flushBuffer.readableBytes if (flushBufferReadableBytes != 0 && flushBufferReadableBytes + numBytes >= flusherBufferSize) { flush(false) @@ -422,13 +424,9 @@ class LocalTierWriter( } catch { case oom: OutOfMemoryError => MemoryManager.instance.incrementDiskBuffer(numBytes) - if (userCongestionControlContext != null) - userCongestionControlContext.updateProduceBytes(numBytes) throw oom } MemoryManager.instance.incrementDiskBuffer(numBytes) - if (userCongestionControlContext != null) - userCongestionControlContext.updateProduceBytes(numBytes) } override def evict(file: TierWriterBase): Unit = ???