diff --git a/METRICS.md b/METRICS.md index 675f728eb..5e3d732ff 100644 --- a/METRICS.md +++ b/METRICS.md @@ -85,6 +85,17 @@ Here is an example of grafana dashboard importing. | DiskBuffer | worker | Disk buffers are part of netty used memory, means data need to write to disk but haven't been written to disk. | | PausePushData | worker | PausePushData means the count of worker stopped receiving data from client. | | PausePushDataAndReplicate | worker | PausePushDataAndReplicate means the count of worker stopped receiving data from client and other workers. | +| RPCReserveSlotsNum | worker | The count of the RPC `ReserveSlots` received by the worker. | +| RPCReserveSlotsSize | worker | The size of the RPC `ReserveSlots` 's body received by the worker. | +| RPCPushDataNum | worker | The count of the RPC `PushData` received by the worker. | +| RPCPushDataSize | worker | The size of the RPC `PushData` 's body received by the worker. | +| RPCPushMergedDataNum | worker | The count of the RPC `PushMergedData` RPC received by the worker. | +| RPCPushMergedDataSize | worker | The size of the RPC `PushMergedData` 's body received by the worker. | +| RPCCommitFilesNum | worker | The count of the RPC `CommitFiles` received by the worker. | +| RPCCommitFilesSize | worker | The size of the RPC `CommitFiles` 's body received by the worker. | +| RPCDestroyNum | worker | The count of the RPC `Destroy` received by the worker. | +| RPCDestroySize | worker | The size of the RPC `Destroy` 's body received by the worker. | +| RPCChunkFetchRequestNum | worker | The count of the RPC `ChunkFetchRequest` RPC received by the worker. | ## Implementation diff --git a/client/src/main/java/com/aliyun/emr/rss/client/ShuffleClientImpl.java b/client/src/main/java/com/aliyun/emr/rss/client/ShuffleClientImpl.java index 913385223..ca5d2c1f4 100644 --- a/client/src/main/java/com/aliyun/emr/rss/client/ShuffleClientImpl.java +++ b/client/src/main/java/com/aliyun/emr/rss/client/ShuffleClientImpl.java @@ -759,7 +759,7 @@ public class ShuffleClientImpl extends ShuffleClient { dataClientFactory.createClient(host, port); client.pushMergedData(mergedData, wrappedCallback); } catch (Exception e) { - logger.warn("PushMergeData failed", e); + logger.warn("PushMergedData failed", e); wrappedCallback.onFailure( new Exception(getPushDataFailCause(e.getMessage()).toString(), e)); } diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/metrics/source/RPCSource.scala b/common/src/main/scala/com/aliyun/emr/rss/common/metrics/source/RPCSource.scala new file mode 100644 index 000000000..aab2c5510 --- /dev/null +++ b/common/src/main/scala/com/aliyun/emr/rss/common/metrics/source/RPCSource.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.aliyun.emr.rss.common.metrics.source + +import com.aliyun.emr.rss.common.RssConf +import com.aliyun.emr.rss.common.internal.Logging +import com.aliyun.emr.rss.common.metrics.MetricsSystem +import com.aliyun.emr.rss.common.network.protocol.{ChunkFetchRequest, PushData, PushMergedData} +import com.aliyun.emr.rss.common.protocol.message.ControlMessages.{CommitFiles, Destroy, ReserveSlots} + +class RPCSource(rssConf: RssConf) + extends AbstractSource(rssConf, MetricsSystem.ROLE_WOKRER) with Logging { + override val sourceName = "rpc" + + import RPCSource._ + + // RPC + addCounter(RPCReserveSlotsNum) + addCounter(RPCReserveSlotsSize) + addCounter(RPCCommitFilesNum) + addCounter(RPCCommitFilesSize) + addCounter(RPCDestroyNum) + addCounter(RPCDestroySize) + addCounter(RPCPushDataNum) + addCounter(RPCPushDataSize) + addCounter(RPCPushMergedDataNum) + addCounter(RPCPushMergedDataSize) + addCounter(RPCChunkFetchRequestNum) + + + def updateMessageMetrics(message: Any, messageLen: Long): Unit = { + message match { + case _: ReserveSlots => + incCounter(RPCReserveSlotsNum) + incCounter(RPCReserveSlotsSize, messageLen) + case _: CommitFiles => + incCounter(RPCCommitFilesNum) + incCounter(RPCCommitFilesSize, messageLen) + case _: Destroy => + incCounter(RPCDestroyNum) + incCounter(RPCDestroySize, messageLen) + case _: PushData => + incCounter(RPCPushDataNum) + incCounter(RPCPushDataSize, messageLen) + case _: PushMergedData => + incCounter(RPCPushMergedDataNum) + incCounter(RPCPushMergedDataSize, messageLen) + case _: ChunkFetchRequest => + incCounter(RPCChunkFetchRequestNum) + case _ => // Do nothing + } + } +} + +object RPCSource { + // RPC + val RPCReserveSlotsNum = "RPCReserveSlotsNum" + val RPCReserveSlotsSize = "RPCReserveSlotsSize" + val RPCCommitFilesNum = "RPCCommitFilesNum" + val RPCCommitFilesSize = "RPCCommitFilesSize" + val RPCDestroyNum = "RPCDestroyNum" + val RPCDestroySize = "RPCDestroySize" + val RPCPushDataNum = "RPCPushDataNum" + val RPCPushDataSize = "RPCPushDataSize" + val RPCPushMergedDataNum = "RPCPushMergedDataNum" + val RPCPushMergedDataSize = "RPCPushMergedDataSize" + val RPCChunkFetchRequestNum = "RPCChunkFetchRequestNum" +} diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/rpc/RpcEnv.scala b/common/src/main/scala/com/aliyun/emr/rss/common/rpc/RpcEnv.scala index 3bf0a770b..32bcbc04f 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/rpc/RpcEnv.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/rpc/RpcEnv.scala @@ -18,11 +18,11 @@ package com.aliyun.emr.rss.common.rpc import java.io.File -import java.nio.channels.ReadableByteChannel import scala.concurrent.Future import com.aliyun.emr.rss.common.RssConf +import com.aliyun.emr.rss.common.metrics.source.RPCSource import com.aliyun.emr.rss.common.rpc.netty.NettyRpcEnvFactory import com.aliyun.emr.rss.common.util.RpcUtils @@ -41,12 +41,12 @@ object RpcEnv { } def create( - name: String, - bindAddress: String, - advertiseAddress: String, - port: Int, - conf: RssConf, - numUsableCores: Int): RpcEnv = { + name: String, + bindAddress: String, + advertiseAddress: String, + port: Int, + conf: RssConf, + numUsableCores: Int): RpcEnv = { val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, numUsableCores) new NettyRpcEnvFactory().create(config) @@ -81,7 +81,10 @@ abstract class RpcEnv(conf: RssConf) { * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not * guarantee thread-safety. */ - def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + def setupEndpoint( + name: String, + endpoint: RpcEndpoint, + source: Option[RPCSource] = None): RpcEndpointRef /** * Retrieve the [[RpcEndpointRef]] represented by `uri` asynchronously. @@ -173,9 +176,9 @@ private[rss] trait RpcEnvFileServer { } private[rss] case class RpcEnvConfig( - conf: RssConf, - name: String, - bindAddress: String, - advertiseAddress: String, - port: Int, - numUsableCores: Int) + conf: RssConf, + name: String, + bindAddress: String, + advertiseAddress: String, + port: Int, + numUsableCores: Int) diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/rpc/netty/NettyRpcEnv.scala b/common/src/main/scala/com/aliyun/emr/rss/common/rpc/netty/NettyRpcEnv.scala index afa9cbd28..5ffaddf31 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/rpc/netty/NettyRpcEnv.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/rpc/netty/NettyRpcEnv.scala @@ -33,6 +33,7 @@ import com.google.common.base.Throwables import com.aliyun.emr.rss.common.RssConf import com.aliyun.emr.rss.common.internal.Logging +import com.aliyun.emr.rss.common.metrics.source.RPCSource import com.aliyun.emr.rss.common.network.TransportContext import com.aliyun.emr.rss.common.network.buffer.NioManagedBuffer import com.aliyun.emr.rss.common.network.client._ @@ -58,6 +59,7 @@ class NettyRpcEnv( private var worker: RpcEndpoint = null + var source: Option[RPCSource] = None private val transportContext = new TransportContext(transportConf, new NettyRpcHandler(dispatcher, this)) @@ -104,10 +106,14 @@ class NettyRpcEnv( if (server != null) RpcAddress(host, server.getPort()) else null } - override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { + override def setupEndpoint( + name: String, + endpoint: RpcEndpoint, + abstractSource: Option[RPCSource] = None): RpcEndpointRef = { if (name == RpcNameConstants.WORKER_EP) { worker = endpoint } + source = abstractSource dispatcher.registerRpcEndpoint(name, endpoint) } @@ -572,8 +578,10 @@ private[rss] class NettyRpcHandler( private def internalReceive(client: TransportClient, message: ByteBuffer): RequestMessage = { val addr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress] assert(addr != null) + val messageLen = message.remaining() val clientAddr = RpcAddress(addr.getHostString, addr.getPort) val requestMessage = RequestMessage(nettyEnv, client, message) + nettyEnv.source.foreach(_.updateMessageMetrics(requestMessage.content, messageLen)) if (requestMessage.senderAddress == null) { // Create a new message with the socket address of the client as the sender. new RequestMessage(clientAddr, requestMessage.receiver, requestMessage.content) diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/FetchHandler.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/FetchHandler.scala index ca022e878..dec4e265a 100644 --- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/FetchHandler.scala +++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/FetchHandler.scala @@ -30,6 +30,7 @@ import io.netty.util.concurrent.{Future, GenericFutureListener} import com.aliyun.emr.rss.common.exception.RssException import com.aliyun.emr.rss.common.internal.Logging import com.aliyun.emr.rss.common.meta.{FileInfo, FileManagedBuffers} +import com.aliyun.emr.rss.common.metrics.source.RPCSource import com.aliyun.emr.rss.common.network.buffer.NioManagedBuffer import com.aliyun.emr.rss.common.network.client.TransportClient import com.aliyun.emr.rss.common.network.protocol._ @@ -42,13 +43,15 @@ import com.aliyun.emr.rss.service.deploy.worker.storage.{PartitionFilesSorter, S class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logging { var streamManager = new OneForOneStreamManager() - var source: WorkerSource = _ + var workerSource: WorkerSource = _ + var rpcSource: RPCSource = _ var storageManager: StorageManager = _ var partitionsSorter: PartitionFilesSorter = _ var registered: AtomicBoolean = _ def init(worker: Worker): Unit = { - this.source = worker.workerSource + this.workerSource = worker.workerSource + this.rpcSource = worker.rpcSource this.storageManager = worker.storageManager this.partitionsSorter = worker.partitionsSorter this.registered = worker.registered @@ -72,6 +75,7 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg override def receive(client: TransportClient, msg: RequestMessage): Unit = { msg match { case r: ChunkFetchRequest => + rpcSource.updateMessageMetrics(r, 0) handleChunkFetchRequest(client, r) case r: RpcRequest => handleOpenStream(client, r) @@ -86,7 +90,7 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg val startMapIndex = openBlocks.startMapIndex val endMapIndex = openBlocks.endMapIndex // metrics start - source.startTimer(WorkerSource.OpenStreamTime, shuffleKey) + workerSource.startTimer(WorkerSource.OpenStreamTime, shuffleKey) val fileInfo = openStream(shuffleKey, fileName, startMapIndex, endMapIndex) if (fileInfo != null) { @@ -114,18 +118,18 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg Throwables.getStackTraceAsString(new RssException("Chunk offsets meta exception ", e)))) } finally { // metrics end - source.stopTimer(WorkerSource.OpenStreamTime, shuffleKey) + workerSource.stopTimer(WorkerSource.OpenStreamTime, shuffleKey) request.body().release() } } else { - source.stopTimer(WorkerSource.OpenStreamTime, shuffleKey) + workerSource.stopTimer(WorkerSource.OpenStreamTime, shuffleKey) client.getChannel.writeAndFlush(new RpcFailure(request.requestId, Throwables.getStackTraceAsString(new FileNotFoundException))) } } def handleChunkFetchRequest(client: TransportClient, req: ChunkFetchRequest): Unit = { - source.startTimer(WorkerSource.FetchChunkTime, req.toString) + workerSource.startTimer(WorkerSource.FetchChunkTime, req.toString) logTrace(s"Received req from ${NettyUtils.getRemoteAddress(client.getChannel)}" + s" to fetch block ${req.streamChunkSlice}") @@ -133,7 +137,7 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg if (chunksBeingTransferred >= conf.maxChunksBeingTransferred) { logError(s"The number of chunks being transferred $chunksBeingTransferred" + s"is above ${conf.maxChunksBeingTransferred()}.") - source.stopTimer(WorkerSource.FetchChunkTime, req.toString) + workerSource.stopTimer(WorkerSource.FetchChunkTime, req.toString) } else { try { val buf = streamManager.getChunk(req.streamChunkSlice.streamId, @@ -143,7 +147,7 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg .addListener(new GenericFutureListener[Future[_ >: Void]] { override def operationComplete(future: Future[_ >: Void]): Unit = { streamManager.chunkSent(req.streamChunkSlice.streamId) - source.stopTimer(WorkerSource.FetchChunkTime, req.toString) + workerSource.stopTimer(WorkerSource.FetchChunkTime, req.toString) } }) } catch { @@ -152,7 +156,7 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg s" ${NettyUtils.getRemoteAddress(client.getChannel)}"), e) client.getChannel.writeAndFlush(new ChunkFetchFailure(req.streamChunkSlice, Throwables.getStackTraceAsString(e))) - source.stopTimer(WorkerSource.FetchChunkTime, req.toString) + workerSource.stopTimer(WorkerSource.FetchChunkTime, req.toString) } } } diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/PushDataHandler.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/PushDataHandler.scala index 5b61a8629..c3093f382 100644 --- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/PushDataHandler.scala +++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/PushDataHandler.scala @@ -28,6 +28,7 @@ import com.aliyun.emr.rss.common.RssConf import com.aliyun.emr.rss.common.exception.AlreadyClosedException import com.aliyun.emr.rss.common.internal.Logging import com.aliyun.emr.rss.common.meta.{PartitionLocationInfo, WorkerInfo} +import com.aliyun.emr.rss.common.metrics.source.RPCSource import com.aliyun.emr.rss.common.network.buffer.{NettyManagedBuffer, NioManagedBuffer} import com.aliyun.emr.rss.common.network.client.{RpcResponseCallback, TransportClient, TransportClientFactory} import com.aliyun.emr.rss.common.network.protocol.{PushData, PushMergedData, RequestMessage, RpcFailure, RpcResponse} @@ -40,6 +41,7 @@ import com.aliyun.emr.rss.service.deploy.worker.storage.{FileWriter, LocalFlushe class PushDataHandler extends BaseMessageHandler with Logging { var workerSource: WorkerSource = _ + var rpcSource: RPCSource = _ var partitionLocationInfo: PartitionLocationInfo = _ var shuffleMapperAttempts: ConcurrentHashMap[String, Array[Int]] = _ var replicateThreadPool: ThreadPoolExecutor = _ @@ -52,6 +54,7 @@ class PushDataHandler extends BaseMessageHandler with Logging { def init(worker: Worker): Unit = { workerSource = worker.workerSource + rpcSource = worker.rpcSource partitionLocationInfo = worker.partitionLocationInfo shuffleMapperAttempts = worker.shuffleMapperAttempts replicateThreadPool = worker.replicateThreadPool @@ -69,6 +72,7 @@ class PushDataHandler extends BaseMessageHandler with Logging { msg match { case pushData: PushData => try { + rpcSource.updateMessageMetrics(pushData, pushData.body().size()) handlePushData(pushData, new RpcResponseCallback { override def onSuccess(response: ByteBuffer): Unit = { client.getChannel.writeAndFlush(new RpcResponse( @@ -91,6 +95,7 @@ class PushDataHandler extends BaseMessageHandler with Logging { } case pushMergedData: PushMergedData => try { + rpcSource.updateMessageMetrics(pushMergedData, pushMergedData.body().size()) handlePushMergedData(pushMergedData, new RpcResponseCallback { override def onSuccess(response: ByteBuffer): Unit = { client.getChannel.writeAndFlush(new RpcResponse( @@ -258,8 +263,8 @@ class PushDataHandler extends BaseMessageHandler with Logging { } def handlePushMergedData( - pushMergedData: PushMergedData, - callback: RpcResponseCallback): Unit = { + pushMergedData: PushMergedData, + callback: RpcResponseCallback): Unit = { val shuffleKey = pushMergedData.shuffleKey val mode = PartitionLocation.getMode(pushMergedData.mode) val batchOffsets = pushMergedData.batchOffsets diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Worker.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Worker.scala index 0ba40d52c..b70cb24b0 100644 --- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Worker.scala +++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Worker.scala @@ -32,7 +32,7 @@ import com.aliyun.emr.rss.common.haclient.RssHARetryClient import com.aliyun.emr.rss.common.internal.Logging import com.aliyun.emr.rss.common.meta.{DiskInfo, PartitionLocationInfo, WorkerInfo} import com.aliyun.emr.rss.common.metrics.MetricsSystem -import com.aliyun.emr.rss.common.metrics.source.{JVMCPUSource, JVMSource} +import com.aliyun.emr.rss.common.metrics.source.{JVMCPUSource, JVMSource, RPCSource} import com.aliyun.emr.rss.common.network.TransportContext import com.aliyun.emr.rss.common.network.server.{ChannelsLimiter, MemoryTracker} import com.aliyun.emr.rss.common.protocol.{RpcNameConstants, TransportModuleConstants} @@ -69,13 +69,12 @@ private[deploy] class Worker( "If enable graceful shutdown, the worker should use stable server port.") val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, WorkerSource.ServletPath) - val workerSource = { - val source = new WorkerSource(conf) - metricsSystem.registerSource(source) - metricsSystem.registerSource(new JVMSource(conf, MetricsSystem.ROLE_WOKRER)) - metricsSystem.registerSource(new JVMCPUSource(conf, MetricsSystem.ROLE_WOKRER)) - source - } + val rpcSource = new RPCSource(conf) + val workerSource = new WorkerSource(conf) + metricsSystem.registerSource(workerSource) + metricsSystem.registerSource(rpcSource) + metricsSystem.registerSource(new JVMSource(conf, MetricsSystem.ROLE_WOKRER)) + metricsSystem.registerSource(new JVMCPUSource(conf, MetricsSystem.ROLE_WOKRER)) val storageManager = new StorageManager(conf, workerSource) @@ -91,7 +90,7 @@ private[deploy] class Worker( val partitionsSorter = new PartitionFilesSorter(memoryTracker, conf, workerSource) var controller = new Controller(rpcEnv, conf, metricsSystem) - rpcEnv.setupEndpoint(RpcNameConstants.WORKER_EP, controller) + rpcEnv.setupEndpoint(RpcNameConstants.WORKER_EP, controller, Some(rpcSource)) val pushDataHandler = new PushDataHandler() val (pushServer, pushClientFactory) = { diff --git a/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriterSuiteJ.java b/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriterSuiteJ.java index 9997f59be..5aff06999 100644 --- a/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriterSuiteJ.java +++ b/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/storage/FileWriterSuiteJ.java @@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory; import com.aliyun.emr.rss.common.RssConf; import com.aliyun.emr.rss.common.meta.FileInfo; +import com.aliyun.emr.rss.common.metrics.source.RPCSource; import com.aliyun.emr.rss.common.network.TransportContext; import com.aliyun.emr.rss.common.network.buffer.ManagedBuffer; import com.aliyun.emr.rss.common.network.client.ChunkReceivedCallback; @@ -140,10 +141,15 @@ public class FileWriterSuiteJ { } @Override - public WorkerSource source() { + public WorkerSource workerSource() { return source; } + @Override + public RPCSource rpcSource() { + return new RPCSource(RSS_CONF); + } + @Override public boolean checkRegistered() { return true;