[CELEBORN-1491] introduce flusher working queue size metric

### What changes were proposed in this pull request?
Add metrics about flusher working queue size.

### Why are the changes needed?
To show if there is an accumulation of flush tasks.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
GA.

Closes #2598 from FMX/b1491.

Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
This commit is contained in:
mingji 2024-07-05 09:55:02 +08:00 committed by Shuang
parent c90a1647af
commit cb6e2202ae
3 changed files with 272 additions and 503 deletions

File diff suppressed because it is too large Load Diff

View File

@ -168,6 +168,7 @@ object WorkerSource {
val TAKE_BUFFER_TIME = "TakeBufferTime"
val FLUSH_DATA_TIME = "FlushDataTime"
val COMMIT_FILES_TIME = "CommitFilesTime"
val FLUSH_WORKING_QUEUE_SIZE = "FlushWorkingQueueSize"
// slots
val SLOTS_ALLOCATED = "SlotsAllocated"

View File

@ -33,6 +33,7 @@ import org.apache.celeborn.common.metrics.source.{AbstractSource, ThreadPoolSour
import org.apache.celeborn.common.protocol.StorageInfo
import org.apache.celeborn.common.util.{ThreadUtils, Utils}
import org.apache.celeborn.service.deploy.worker.WorkerSource
import org.apache.celeborn.service.deploy.worker.WorkerSource.FLUSH_WORKING_QUEUE_SIZE
import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
@ -41,7 +42,8 @@ abstract private[worker] class Flusher(
val threadCount: Int,
val allocator: PooledByteBufAllocator,
val maxComponents: Int,
flushTimeMetric: TimeWindow) extends Logging {
flushTimeMetric: TimeWindow,
mountPoint: String) extends Logging {
protected lazy val flusherId: Int = System.identityHashCode(this)
protected val workingQueues = new Array[LinkedBlockingQueue[FlushTask]](threadCount)
protected val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf]()
@ -95,6 +97,10 @@ abstract private[worker] class Flusher(
}
}
})
workerSource.addGauge(FLUSH_WORKING_QUEUE_SIZE, Map("mountpoint" -> s"$mountPoint-$index")) {
() =>
workingQueues(index).size()
}
}
ThreadPoolSource.registerSource(s"$this", workers)
}
@ -147,7 +153,8 @@ private[worker] class LocalFlusher(
threadCount,
allocator,
maxComponents,
timeWindow)
timeWindow,
mountPoint)
with DeviceObserver with Logging {
deviceMonitor.registerFlusher(this)
@ -182,7 +189,8 @@ final private[worker] class HdfsFlusher(
hdfsFlusherThreads,
allocator,
maxComponents,
null) with Logging {
null,
"HDFS") with Logging {
override def processIOException(e: IOException, deviceErrorType: DiskStatus): Unit = {
logError(s"$this write failed, reason $deviceErrorType ,exception: $e")