From 594feab27901bb129b47fff95fc99e810862315a Mon Sep 17 00:00:00 2001 From: Ethan Feng Date: Tue, 2 Aug 2022 18:16:26 +0800 Subject: [PATCH] fix unexpected index update. (#295) --- .../service/deploy/worker/LocalStorageManager.scala | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala index e1f6bcb8a..89e410d7a 100644 --- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala +++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala @@ -56,7 +56,7 @@ private[worker] final class DiskFlusher( private val workingQueues = new Array[LinkedBlockingQueue[FlushTask]](threadCount) private val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf](queueCapacity) private val workers = new Array[Thread](threadCount) - private val nextWorkerIndex = new AtomicInteger() + private var nextWorkerIndex: Int = 0 @volatile private var lastBeginFlushTime: Long = -1 @@ -111,12 +111,9 @@ private[worker] final class DiskFlusher( deviceMonitor.registerDiskFlusher(this) } - def getWorkerIndex: Int = { - val nextIndex = nextWorkerIndex.getAndIncrement() - if (nextIndex > threadCount) { - nextWorkerIndex.set(0) - } - nextIndex % threadCount + def getWorkerIndex: Int = this.synchronized { + nextWorkerIndex = (nextWorkerIndex + 1) % threadCount + nextWorkerIndex } def takeBuffer(timeoutMs: Long): CompositeByteBuf = {