[CELEBORN-2033] updateProduceBytes should be called even if updateProduceBytes throws exception
### What changes were proposed in this pull request? updateProduceBytes should be called even if updateProduceBytes throws exception ### Why are the changes needed? To make UserProduceSpeed metrics more accurate ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. Closes #3322 from leixm/CELEBORN-2033. Authored-by: Xianming Lei <xianming.lei@shopee.com> Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
parent
edeeb4b30a
commit
bbd3bb4814
@ -412,6 +412,8 @@ class LocalTierWriter(
|
||||
|
||||
override def writeInternal(buf: ByteBuf): Unit = {
|
||||
val numBytes = buf.readableBytes()
|
||||
if (userCongestionControlContext != null)
|
||||
userCongestionControlContext.updateProduceBytes(numBytes)
|
||||
val flushBufferReadableBytes = flushBuffer.readableBytes
|
||||
if (flushBufferReadableBytes != 0 && flushBufferReadableBytes + numBytes >= flusherBufferSize) {
|
||||
flush(false)
|
||||
@ -422,13 +424,9 @@ class LocalTierWriter(
|
||||
} catch {
|
||||
case oom: OutOfMemoryError =>
|
||||
MemoryManager.instance.incrementDiskBuffer(numBytes)
|
||||
if (userCongestionControlContext != null)
|
||||
userCongestionControlContext.updateProduceBytes(numBytes)
|
||||
throw oom
|
||||
}
|
||||
MemoryManager.instance.incrementDiskBuffer(numBytes)
|
||||
if (userCongestionControlContext != null)
|
||||
userCongestionControlContext.updateProduceBytes(numBytes)
|
||||
}
|
||||
|
||||
override def evict(file: TierWriterBase): Unit = ???
|
||||
|
||||
Loading…
Reference in New Issue
Block a user