[ISSUE-484][FEATURE] Add Worker related RPC metrics (#488)

This commit is contained in:
AngersZhuuuu 2022-09-01 16:47:31 +08:00 committed by GitHub
parent 56e46763d9
commit 87f529da35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 156 additions and 37 deletions

View File

@ -85,6 +85,17 @@ Here is an example of grafana dashboard importing.
| DiskBuffer | worker | Disk buffers are part of netty used memory, means data need to write to disk but haven't been written to disk. |
| PausePushData | worker | PausePushData means the count of worker stopped receiving data from client. |
| PausePushDataAndReplicate | worker | PausePushDataAndReplicate means the count of worker stopped receiving data from client and other workers. |
| RPCReserveSlotsNum | worker | The count of the RPC `ReserveSlots` received by the worker. |
| RPCReserveSlotsSize | worker | The size of the RPC `ReserveSlots` 's body received by the worker. |
| RPCPushDataNum | worker | The count of the RPC `PushData` received by the worker. |
| RPCPushDataSize | worker | The size of the RPC `PushData` 's body received by the worker. |
| RPCPushMergedDataNum | worker | The count of the RPC `PushMergedData` RPC received by the worker. |
| RPCPushMergedDataSize | worker | The size of the RPC `PushMergedData` 's body received by the worker. |
| RPCCommitFilesNum | worker | The count of the RPC `CommitFiles` received by the worker. |
| RPCCommitFilesSize | worker | The size of the RPC `CommitFiles` 's body received by the worker. |
| RPCDestroyNum | worker | The count of the RPC `Destroy` received by the worker. |
| RPCDestroySize | worker | The size of the RPC `Destroy` 's body received by the worker. |
| RPCChunkFetchRequestNum | worker | The count of the RPC `ChunkFetchRequest` RPC received by the worker. |
## Implementation

View File

@ -759,7 +759,7 @@ public class ShuffleClientImpl extends ShuffleClient {
dataClientFactory.createClient(host, port);
client.pushMergedData(mergedData, wrappedCallback);
} catch (Exception e) {
logger.warn("PushMergeData failed", e);
logger.warn("PushMergedData failed", e);
wrappedCallback.onFailure(
new Exception(getPushDataFailCause(e.getMessage()).toString(), e));
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.aliyun.emr.rss.common.metrics.source
import com.aliyun.emr.rss.common.RssConf
import com.aliyun.emr.rss.common.internal.Logging
import com.aliyun.emr.rss.common.metrics.MetricsSystem
import com.aliyun.emr.rss.common.network.protocol.{ChunkFetchRequest, PushData, PushMergedData}
import com.aliyun.emr.rss.common.protocol.message.ControlMessages.{CommitFiles, Destroy, ReserveSlots}
class RPCSource(rssConf: RssConf)
extends AbstractSource(rssConf, MetricsSystem.ROLE_WOKRER) with Logging {
override val sourceName = "rpc"
import RPCSource._
// RPC
addCounter(RPCReserveSlotsNum)
addCounter(RPCReserveSlotsSize)
addCounter(RPCCommitFilesNum)
addCounter(RPCCommitFilesSize)
addCounter(RPCDestroyNum)
addCounter(RPCDestroySize)
addCounter(RPCPushDataNum)
addCounter(RPCPushDataSize)
addCounter(RPCPushMergedDataNum)
addCounter(RPCPushMergedDataSize)
addCounter(RPCChunkFetchRequestNum)
def updateMessageMetrics(message: Any, messageLen: Long): Unit = {
message match {
case _: ReserveSlots =>
incCounter(RPCReserveSlotsNum)
incCounter(RPCReserveSlotsSize, messageLen)
case _: CommitFiles =>
incCounter(RPCCommitFilesNum)
incCounter(RPCCommitFilesSize, messageLen)
case _: Destroy =>
incCounter(RPCDestroyNum)
incCounter(RPCDestroySize, messageLen)
case _: PushData =>
incCounter(RPCPushDataNum)
incCounter(RPCPushDataSize, messageLen)
case _: PushMergedData =>
incCounter(RPCPushMergedDataNum)
incCounter(RPCPushMergedDataSize, messageLen)
case _: ChunkFetchRequest =>
incCounter(RPCChunkFetchRequestNum)
case _ => // Do nothing
}
}
}
object RPCSource {
// RPC
val RPCReserveSlotsNum = "RPCReserveSlotsNum"
val RPCReserveSlotsSize = "RPCReserveSlotsSize"
val RPCCommitFilesNum = "RPCCommitFilesNum"
val RPCCommitFilesSize = "RPCCommitFilesSize"
val RPCDestroyNum = "RPCDestroyNum"
val RPCDestroySize = "RPCDestroySize"
val RPCPushDataNum = "RPCPushDataNum"
val RPCPushDataSize = "RPCPushDataSize"
val RPCPushMergedDataNum = "RPCPushMergedDataNum"
val RPCPushMergedDataSize = "RPCPushMergedDataSize"
val RPCChunkFetchRequestNum = "RPCChunkFetchRequestNum"
}

View File

@ -18,11 +18,11 @@
package com.aliyun.emr.rss.common.rpc
import java.io.File
import java.nio.channels.ReadableByteChannel
import scala.concurrent.Future
import com.aliyun.emr.rss.common.RssConf
import com.aliyun.emr.rss.common.metrics.source.RPCSource
import com.aliyun.emr.rss.common.rpc.netty.NettyRpcEnvFactory
import com.aliyun.emr.rss.common.util.RpcUtils
@ -41,12 +41,12 @@ object RpcEnv {
}
def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: RssConf,
numUsableCores: Int): RpcEnv = {
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: RssConf,
numUsableCores: Int): RpcEnv = {
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port,
numUsableCores)
new NettyRpcEnvFactory().create(config)
@ -81,7 +81,10 @@ abstract class RpcEnv(conf: RssConf) {
* Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not
* guarantee thread-safety.
*/
def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
def setupEndpoint(
name: String,
endpoint: RpcEndpoint,
source: Option[RPCSource] = None): RpcEndpointRef
/**
* Retrieve the [[RpcEndpointRef]] represented by `uri` asynchronously.
@ -173,9 +176,9 @@ private[rss] trait RpcEnvFileServer {
}
private[rss] case class RpcEnvConfig(
conf: RssConf,
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
numUsableCores: Int)
conf: RssConf,
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
numUsableCores: Int)

View File

@ -33,6 +33,7 @@ import com.google.common.base.Throwables
import com.aliyun.emr.rss.common.RssConf
import com.aliyun.emr.rss.common.internal.Logging
import com.aliyun.emr.rss.common.metrics.source.RPCSource
import com.aliyun.emr.rss.common.network.TransportContext
import com.aliyun.emr.rss.common.network.buffer.NioManagedBuffer
import com.aliyun.emr.rss.common.network.client._
@ -58,6 +59,7 @@ class NettyRpcEnv(
private var worker: RpcEndpoint = null
var source: Option[RPCSource] = None
private val transportContext = new TransportContext(transportConf,
new NettyRpcHandler(dispatcher, this))
@ -104,10 +106,14 @@ class NettyRpcEnv(
if (server != null) RpcAddress(host, server.getPort()) else null
}
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
override def setupEndpoint(
name: String,
endpoint: RpcEndpoint,
abstractSource: Option[RPCSource] = None): RpcEndpointRef = {
if (name == RpcNameConstants.WORKER_EP) {
worker = endpoint
}
source = abstractSource
dispatcher.registerRpcEndpoint(name, endpoint)
}
@ -572,8 +578,10 @@ private[rss] class NettyRpcHandler(
private def internalReceive(client: TransportClient, message: ByteBuffer): RequestMessage = {
val addr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress]
assert(addr != null)
val messageLen = message.remaining()
val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
val requestMessage = RequestMessage(nettyEnv, client, message)
nettyEnv.source.foreach(_.updateMessageMetrics(requestMessage.content, messageLen))
if (requestMessage.senderAddress == null) {
// Create a new message with the socket address of the client as the sender.
new RequestMessage(clientAddr, requestMessage.receiver, requestMessage.content)

View File

@ -30,6 +30,7 @@ import io.netty.util.concurrent.{Future, GenericFutureListener}
import com.aliyun.emr.rss.common.exception.RssException
import com.aliyun.emr.rss.common.internal.Logging
import com.aliyun.emr.rss.common.meta.{FileInfo, FileManagedBuffers}
import com.aliyun.emr.rss.common.metrics.source.RPCSource
import com.aliyun.emr.rss.common.network.buffer.NioManagedBuffer
import com.aliyun.emr.rss.common.network.client.TransportClient
import com.aliyun.emr.rss.common.network.protocol._
@ -42,13 +43,15 @@ import com.aliyun.emr.rss.service.deploy.worker.storage.{PartitionFilesSorter, S
class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logging {
var streamManager = new OneForOneStreamManager()
var source: WorkerSource = _
var workerSource: WorkerSource = _
var rpcSource: RPCSource = _
var storageManager: StorageManager = _
var partitionsSorter: PartitionFilesSorter = _
var registered: AtomicBoolean = _
def init(worker: Worker): Unit = {
this.source = worker.workerSource
this.workerSource = worker.workerSource
this.rpcSource = worker.rpcSource
this.storageManager = worker.storageManager
this.partitionsSorter = worker.partitionsSorter
this.registered = worker.registered
@ -72,6 +75,7 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg
override def receive(client: TransportClient, msg: RequestMessage): Unit = {
msg match {
case r: ChunkFetchRequest =>
rpcSource.updateMessageMetrics(r, 0)
handleChunkFetchRequest(client, r)
case r: RpcRequest =>
handleOpenStream(client, r)
@ -86,7 +90,7 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg
val startMapIndex = openBlocks.startMapIndex
val endMapIndex = openBlocks.endMapIndex
// metrics start
source.startTimer(WorkerSource.OpenStreamTime, shuffleKey)
workerSource.startTimer(WorkerSource.OpenStreamTime, shuffleKey)
val fileInfo = openStream(shuffleKey, fileName, startMapIndex, endMapIndex)
if (fileInfo != null) {
@ -114,18 +118,18 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg
Throwables.getStackTraceAsString(new RssException("Chunk offsets meta exception ", e))))
} finally {
// metrics end
source.stopTimer(WorkerSource.OpenStreamTime, shuffleKey)
workerSource.stopTimer(WorkerSource.OpenStreamTime, shuffleKey)
request.body().release()
}
} else {
source.stopTimer(WorkerSource.OpenStreamTime, shuffleKey)
workerSource.stopTimer(WorkerSource.OpenStreamTime, shuffleKey)
client.getChannel.writeAndFlush(new RpcFailure(request.requestId,
Throwables.getStackTraceAsString(new FileNotFoundException)))
}
}
def handleChunkFetchRequest(client: TransportClient, req: ChunkFetchRequest): Unit = {
source.startTimer(WorkerSource.FetchChunkTime, req.toString)
workerSource.startTimer(WorkerSource.FetchChunkTime, req.toString)
logTrace(s"Received req from ${NettyUtils.getRemoteAddress(client.getChannel)}" +
s" to fetch block ${req.streamChunkSlice}")
@ -133,7 +137,7 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg
if (chunksBeingTransferred >= conf.maxChunksBeingTransferred) {
logError(s"The number of chunks being transferred $chunksBeingTransferred" +
s"is above ${conf.maxChunksBeingTransferred()}.")
source.stopTimer(WorkerSource.FetchChunkTime, req.toString)
workerSource.stopTimer(WorkerSource.FetchChunkTime, req.toString)
} else {
try {
val buf = streamManager.getChunk(req.streamChunkSlice.streamId,
@ -143,7 +147,7 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg
.addListener(new GenericFutureListener[Future[_ >: Void]] {
override def operationComplete(future: Future[_ >: Void]): Unit = {
streamManager.chunkSent(req.streamChunkSlice.streamId)
source.stopTimer(WorkerSource.FetchChunkTime, req.toString)
workerSource.stopTimer(WorkerSource.FetchChunkTime, req.toString)
}
})
} catch {
@ -152,7 +156,7 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg
s" ${NettyUtils.getRemoteAddress(client.getChannel)}"), e)
client.getChannel.writeAndFlush(new ChunkFetchFailure(req.streamChunkSlice,
Throwables.getStackTraceAsString(e)))
source.stopTimer(WorkerSource.FetchChunkTime, req.toString)
workerSource.stopTimer(WorkerSource.FetchChunkTime, req.toString)
}
}
}

View File

@ -28,6 +28,7 @@ import com.aliyun.emr.rss.common.RssConf
import com.aliyun.emr.rss.common.exception.AlreadyClosedException
import com.aliyun.emr.rss.common.internal.Logging
import com.aliyun.emr.rss.common.meta.{PartitionLocationInfo, WorkerInfo}
import com.aliyun.emr.rss.common.metrics.source.RPCSource
import com.aliyun.emr.rss.common.network.buffer.{NettyManagedBuffer, NioManagedBuffer}
import com.aliyun.emr.rss.common.network.client.{RpcResponseCallback, TransportClient, TransportClientFactory}
import com.aliyun.emr.rss.common.network.protocol.{PushData, PushMergedData, RequestMessage, RpcFailure, RpcResponse}
@ -40,6 +41,7 @@ import com.aliyun.emr.rss.service.deploy.worker.storage.{FileWriter, LocalFlushe
class PushDataHandler extends BaseMessageHandler with Logging {
var workerSource: WorkerSource = _
var rpcSource: RPCSource = _
var partitionLocationInfo: PartitionLocationInfo = _
var shuffleMapperAttempts: ConcurrentHashMap[String, Array[Int]] = _
var replicateThreadPool: ThreadPoolExecutor = _
@ -52,6 +54,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
def init(worker: Worker): Unit = {
workerSource = worker.workerSource
rpcSource = worker.rpcSource
partitionLocationInfo = worker.partitionLocationInfo
shuffleMapperAttempts = worker.shuffleMapperAttempts
replicateThreadPool = worker.replicateThreadPool
@ -69,6 +72,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
msg match {
case pushData: PushData =>
try {
rpcSource.updateMessageMetrics(pushData, pushData.body().size())
handlePushData(pushData, new RpcResponseCallback {
override def onSuccess(response: ByteBuffer): Unit = {
client.getChannel.writeAndFlush(new RpcResponse(
@ -91,6 +95,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
}
case pushMergedData: PushMergedData =>
try {
rpcSource.updateMessageMetrics(pushMergedData, pushMergedData.body().size())
handlePushMergedData(pushMergedData, new RpcResponseCallback {
override def onSuccess(response: ByteBuffer): Unit = {
client.getChannel.writeAndFlush(new RpcResponse(
@ -258,8 +263,8 @@ class PushDataHandler extends BaseMessageHandler with Logging {
}
def handlePushMergedData(
pushMergedData: PushMergedData,
callback: RpcResponseCallback): Unit = {
pushMergedData: PushMergedData,
callback: RpcResponseCallback): Unit = {
val shuffleKey = pushMergedData.shuffleKey
val mode = PartitionLocation.getMode(pushMergedData.mode)
val batchOffsets = pushMergedData.batchOffsets

View File

@ -32,7 +32,7 @@ import com.aliyun.emr.rss.common.haclient.RssHARetryClient
import com.aliyun.emr.rss.common.internal.Logging
import com.aliyun.emr.rss.common.meta.{DiskInfo, PartitionLocationInfo, WorkerInfo}
import com.aliyun.emr.rss.common.metrics.MetricsSystem
import com.aliyun.emr.rss.common.metrics.source.{JVMCPUSource, JVMSource}
import com.aliyun.emr.rss.common.metrics.source.{JVMCPUSource, JVMSource, RPCSource}
import com.aliyun.emr.rss.common.network.TransportContext
import com.aliyun.emr.rss.common.network.server.{ChannelsLimiter, MemoryTracker}
import com.aliyun.emr.rss.common.protocol.{RpcNameConstants, TransportModuleConstants}
@ -69,13 +69,12 @@ private[deploy] class Worker(
"If enable graceful shutdown, the worker should use stable server port.")
val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, WorkerSource.ServletPath)
val workerSource = {
val source = new WorkerSource(conf)
metricsSystem.registerSource(source)
metricsSystem.registerSource(new JVMSource(conf, MetricsSystem.ROLE_WOKRER))
metricsSystem.registerSource(new JVMCPUSource(conf, MetricsSystem.ROLE_WOKRER))
source
}
val rpcSource = new RPCSource(conf)
val workerSource = new WorkerSource(conf)
metricsSystem.registerSource(workerSource)
metricsSystem.registerSource(rpcSource)
metricsSystem.registerSource(new JVMSource(conf, MetricsSystem.ROLE_WOKRER))
metricsSystem.registerSource(new JVMCPUSource(conf, MetricsSystem.ROLE_WOKRER))
val storageManager = new StorageManager(conf, workerSource)
@ -91,7 +90,7 @@ private[deploy] class Worker(
val partitionsSorter = new PartitionFilesSorter(memoryTracker, conf, workerSource)
var controller = new Controller(rpcEnv, conf, metricsSystem)
rpcEnv.setupEndpoint(RpcNameConstants.WORKER_EP, controller)
rpcEnv.setupEndpoint(RpcNameConstants.WORKER_EP, controller, Some(rpcSource))
val pushDataHandler = new PushDataHandler()
val (pushServer, pushClientFactory) = {

View File

@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory;
import com.aliyun.emr.rss.common.RssConf;
import com.aliyun.emr.rss.common.meta.FileInfo;
import com.aliyun.emr.rss.common.metrics.source.RPCSource;
import com.aliyun.emr.rss.common.network.TransportContext;
import com.aliyun.emr.rss.common.network.buffer.ManagedBuffer;
import com.aliyun.emr.rss.common.network.client.ChunkReceivedCallback;
@ -140,10 +141,15 @@ public class FileWriterSuiteJ {
}
@Override
public WorkerSource source() {
public WorkerSource workerSource() {
return source;
}
@Override
public RPCSource rpcSource() {
return new RPCSource(RSS_CONF);
}
@Override
public boolean checkRegistered() {
return true;