diff --git a/docs/monitor/metrics.md b/docs/monitor/metrics.md index a8bacab09..26c6f98ee 100644 --- a/docs/monitor/metrics.md +++ b/docs/monitor/metrics.md @@ -79,6 +79,7 @@ Metrics Prefix | Metrics Suffix | Type | Since | Description `kyuubi.backend_service.fetch_result_rows_rate` | | meter | 1.5.0 |
kyuubi backend service `fetchResults` method that fetch result rows rate
`kyuubi.backend_service.get_primary_keys` | | meter | 1.6.0 |
kyuubi backend service `get_primary_keys` method execution time and rate
`kyuubi.backend_service.get_cross_reference` | | meter | 1.6.0 |
kyuubi backend service `get_cross_reference` method execution time and rate
+`kyuubi.operation.state` | `${operationType}`
`.${state}` | meter | 1.6.0 |
The `${operationType}` with a particular `${state}` rate, e.g. `BatchJobSubmission.pending`, `BatchJobSubmission.finished`. Note that, the terminal states are cumulative, but the intermediate ones are not.
Before v1.5.0, if you use these metrics: - `kyuubi.statement.total` diff --git a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala index 41ef03710..2507eb773 100644 --- a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala +++ b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala @@ -117,4 +117,8 @@ object MetricsSystem { def counterValue(name: String): Option[Long] = { maybeSystem.map(_.registry.counter(name).getCount) } + + def meterValue(name: String): Option[Long] = { + maybeSystem.map(_.registry.meter(name).getCount) + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index 097e0bc19..4f9ba3053 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -66,8 +66,6 @@ class BatchJobSubmission( extends KyuubiOperation(session) { import BatchJobSubmission._ - override def statement: String = "BATCH_JOB_SUBMISSION" - override def shouldRunAsync: Boolean = true private val _operationLog = OperationLog.createOperationLog(session, getHandle) @@ -306,9 +304,7 @@ class BatchJobSubmission( case e: IOException => error(e.getMessage, e) } - MetricsSystem.tracing(_.decCount( - MetricRegistry.name(OPERATION_OPEN, statement.toLowerCase(Locale.getDefault)))) - + MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opType))) // fast fail if (isTerminalState(state)) { killMessage = (false, s"batch $batchId is already terminal so can not kill it.") diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala index 9f13980c1..2d28c767e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala @@ -38,6 +38,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi MetricsSystem.tracing { ms => ms.incCount(MetricRegistry.name(OPERATION_OPEN, opType)) ms.incCount(MetricRegistry.name(OPERATION_TOTAL, opType)) + ms.markMeter(MetricRegistry.name(OPERATION_STATE, opType, state.toString.toLowerCase)) ms.incCount(MetricRegistry.name(OPERATION_TOTAL)) ms.markMeter(MetricRegistry.name(OPERATION_STATE, state.toString.toLowerCase)) } @@ -156,8 +157,11 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi override def shouldRunAsync: Boolean = false override def setState(newState: OperationState): Unit = { + MetricsSystem.tracing { ms => + ms.markMeter(MetricRegistry.name(OPERATION_STATE, opType, state.toString.toLowerCase), -1) + ms.markMeter(MetricRegistry.name(OPERATION_STATE, opType, newState.toString.toLowerCase)) + ms.markMeter(MetricRegistry.name(OPERATION_STATE, newState.toString.toLowerCase)) + } super.setState(newState) - MetricsSystem.tracing( - _.markMeter(MetricRegistry.name(OPERATION_STATE, newState.toString.toLowerCase))) } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala index 7fdd7a7e2..48d4e8429 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala @@ -34,7 +34,9 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.engine.ApplicationOperation.{APP_ERROR_KEY, APP_ID_KEY, APP_NAME_KEY, APP_STATE_KEY, APP_URL_KEY} import org.apache.kyuubi.engine.spark.{SparkBatchProcessBuilder, SparkProcessBuilder} -import org.apache.kyuubi.operation.OperationState +import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem} +import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState} +import org.apache.kyuubi.operation.OperationState.OperationState import org.apache.kyuubi.server.KyuubiRestFrontendService import org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER import org.apache.kyuubi.server.metadata.api.Metadata @@ -592,4 +594,50 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { val batchSession = sessionManager.getBatchSessionImpl(SessionHandle.fromUUID(batch.getId)) assert(batchSession.ipAddress === realClientIp) } + + test("expose the metrics with operation type and current state") { + assert(getBatchJobSubmissionStateCounter(OperationState.INITIALIZED) === 0) + assert(getBatchJobSubmissionStateCounter(OperationState.PENDING) === 0) + assert(getBatchJobSubmissionStateCounter(OperationState.RUNNING) === 0) + val originalCanceledCounter = getBatchJobSubmissionStateCounter(OperationState.CANCELED) + + val appName = "spark-batch-submission" + val requestObj = new BatchRequest( + "spark", + sparkProcessBuilder.mainResource.get, + sparkProcessBuilder.mainClass, + appName, + Map( + "spark.master" -> "local", + s"spark.${ENGINE_SPARK_MAX_LIFETIME.key}" -> "5000", + s"spark.${ENGINE_CHECK_INTERVAL.key}" -> "1000").asJava, + Seq.empty[String].asJava) + + val response = webTarget.path("api/v1/batches") + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE)) + assert(200 == response.getStatus) + val batch = response.readEntity(classOf[Batch]) + + assert(getBatchJobSubmissionStateCounter(OperationState.INITIALIZED) + + getBatchJobSubmissionStateCounter(OperationState.PENDING) + + getBatchJobSubmissionStateCounter(OperationState.RUNNING) === 1) + + val deleteResp = webTarget.path(s"api/v1/batches/${batch.getId}") + .request(MediaType.APPLICATION_JSON_TYPE) + .delete() + assert(200 == deleteResp.getStatus) + + assert(getBatchJobSubmissionStateCounter(OperationState.INITIALIZED) === 0) + assert(getBatchJobSubmissionStateCounter(OperationState.PENDING) === 0) + assert(getBatchJobSubmissionStateCounter(OperationState.RUNNING) === 0) + assert( + getBatchJobSubmissionStateCounter(OperationState.CANCELED) - originalCanceledCounter === 1) + } + + private def getBatchJobSubmissionStateCounter(state: OperationState): Long = { + val opType = classOf[BatchJobSubmission].getSimpleName + val counterName = s"${MetricsConstants.OPERATION_STATE}.$opType.${state.toString.toLowerCase}" + MetricsSystem.meterValue(counterName).getOrElse(0L) + } }