fix unexpected index update. (#295)

This commit is contained in:
Ethan Feng 2022-08-02 18:16:26 +08:00 committed by GitHub
parent e57ad27887
commit 594feab279
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -56,7 +56,7 @@ private[worker] final class DiskFlusher(
private val workingQueues = new Array[LinkedBlockingQueue[FlushTask]](threadCount)
private val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf](queueCapacity)
private val workers = new Array[Thread](threadCount)
private val nextWorkerIndex = new AtomicInteger()
private var nextWorkerIndex: Int = 0
@volatile
private var lastBeginFlushTime: Long = -1
@ -111,12 +111,9 @@ private[worker] final class DiskFlusher(
deviceMonitor.registerDiskFlusher(this)
}
def getWorkerIndex: Int = {
val nextIndex = nextWorkerIndex.getAndIncrement()
if (nextIndex > threadCount) {
nextWorkerIndex.set(0)
}
nextIndex % threadCount
def getWorkerIndex: Int = this.synchronized {
nextWorkerIndex = (nextWorkerIndex + 1) % threadCount
nextWorkerIndex
}
def takeBuffer(timeoutMs: Long): CompositeByteBuf = {