[KYUUBI #3046][Metrics] Add meter metrics for recording the rate of the operation state for each kyuubi operation
### _Why are the changes needed?_ Close #3046 Expose the metrics likes: ``` kyuubi.operation.state.BatchJobSubmission.pending kyuubi.operation.state.BatchJobSubmission.running kyuubi.operation.state.BatchJobSubmission.finished kyuubi.operation.state.BatchJobSubmission.error kyuubi.operation.state.BatchJobSubmission.canceled ``` So that the kyuubi service admin can know that how many BatchJobSubmission operations are pending, running and so on. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #3063 from turboFei/batch_metrics. Closes #3046 89ee21db [Fei Wang] remove statement overwrite 25ae41e9 [Fei Wang] update docs 7287527c [Fei Wang] use meter a2dde891 [Fei Wang] save a0c6eade [Fei Wang] remove another bug fix c250a089 [Fei Wang] add docs 87f27e89 [Fei Wang] comments 6bdc4ca9 [Fei Wang] fix flaky test b000525d [Fei Wang] fix flaky test f748093e [Fei Wang] expose more metrics Authored-by: Fei Wang <fwang12@ebay.com> Signed-off-by: Fei Wang <fwang12@ebay.com>
This commit is contained in:
parent
99934591f0
commit
4bb06542a2
@ -79,6 +79,7 @@ Metrics Prefix | Metrics Suffix | Type | Since | Description
|
||||
`kyuubi.backend_service.fetch_result_rows_rate` | | meter | 1.5.0 |<div style='width: 150pt;word-wrap: break-word;white-space: normal'> kyuubi backend service `fetchResults` method that fetch result rows rate </div>
|
||||
`kyuubi.backend_service.get_primary_keys` | | meter | 1.6.0 |<div style='width: 150pt;word-wrap: break-word;white-space: normal'> kyuubi backend service `get_primary_keys` method execution time and rate </div>
|
||||
`kyuubi.backend_service.get_cross_reference` | | meter | 1.6.0 |<div style='width: 150pt;word-wrap: break-word;white-space: normal'> kyuubi backend service `get_cross_reference` method execution time and rate </div>
|
||||
`kyuubi.operation.state` | `${operationType}`<br/>`.${state}` | meter | 1.6.0 |<div style='width: 150pt;word-wrap: break-word;white-space: normal'> The `${operationType}` with a particular `${state}` rate, e.g. `BatchJobSubmission.pending`, `BatchJobSubmission.finished`. Note that, the terminal states are cumulative, but the intermediate ones are not. </div>
|
||||
|
||||
Before v1.5.0, if you use these metrics:
|
||||
- `kyuubi.statement.total`
|
||||
|
||||
@ -117,4 +117,8 @@ object MetricsSystem {
|
||||
def counterValue(name: String): Option[Long] = {
|
||||
maybeSystem.map(_.registry.counter(name).getCount)
|
||||
}
|
||||
|
||||
def meterValue(name: String): Option[Long] = {
|
||||
maybeSystem.map(_.registry.meter(name).getCount)
|
||||
}
|
||||
}
|
||||
|
||||
@ -66,8 +66,6 @@ class BatchJobSubmission(
|
||||
extends KyuubiOperation(session) {
|
||||
import BatchJobSubmission._
|
||||
|
||||
override def statement: String = "BATCH_JOB_SUBMISSION"
|
||||
|
||||
override def shouldRunAsync: Boolean = true
|
||||
|
||||
private val _operationLog = OperationLog.createOperationLog(session, getHandle)
|
||||
@ -306,9 +304,7 @@ class BatchJobSubmission(
|
||||
case e: IOException => error(e.getMessage, e)
|
||||
}
|
||||
|
||||
MetricsSystem.tracing(_.decCount(
|
||||
MetricRegistry.name(OPERATION_OPEN, statement.toLowerCase(Locale.getDefault))))
|
||||
|
||||
MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opType)))
|
||||
// fast fail
|
||||
if (isTerminalState(state)) {
|
||||
killMessage = (false, s"batch $batchId is already terminal so can not kill it.")
|
||||
|
||||
@ -38,6 +38,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
|
||||
MetricsSystem.tracing { ms =>
|
||||
ms.incCount(MetricRegistry.name(OPERATION_OPEN, opType))
|
||||
ms.incCount(MetricRegistry.name(OPERATION_TOTAL, opType))
|
||||
ms.markMeter(MetricRegistry.name(OPERATION_STATE, opType, state.toString.toLowerCase))
|
||||
ms.incCount(MetricRegistry.name(OPERATION_TOTAL))
|
||||
ms.markMeter(MetricRegistry.name(OPERATION_STATE, state.toString.toLowerCase))
|
||||
}
|
||||
@ -156,8 +157,11 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
|
||||
override def shouldRunAsync: Boolean = false
|
||||
|
||||
override def setState(newState: OperationState): Unit = {
|
||||
MetricsSystem.tracing { ms =>
|
||||
ms.markMeter(MetricRegistry.name(OPERATION_STATE, opType, state.toString.toLowerCase), -1)
|
||||
ms.markMeter(MetricRegistry.name(OPERATION_STATE, opType, newState.toString.toLowerCase))
|
||||
ms.markMeter(MetricRegistry.name(OPERATION_STATE, newState.toString.toLowerCase))
|
||||
}
|
||||
super.setState(newState)
|
||||
MetricsSystem.tracing(
|
||||
_.markMeter(MetricRegistry.name(OPERATION_STATE, newState.toString.toLowerCase)))
|
||||
}
|
||||
}
|
||||
|
||||
@ -34,7 +34,9 @@ import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.KyuubiConf._
|
||||
import org.apache.kyuubi.engine.ApplicationOperation.{APP_ERROR_KEY, APP_ID_KEY, APP_NAME_KEY, APP_STATE_KEY, APP_URL_KEY}
|
||||
import org.apache.kyuubi.engine.spark.{SparkBatchProcessBuilder, SparkProcessBuilder}
|
||||
import org.apache.kyuubi.operation.OperationState
|
||||
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
|
||||
import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState}
|
||||
import org.apache.kyuubi.operation.OperationState.OperationState
|
||||
import org.apache.kyuubi.server.KyuubiRestFrontendService
|
||||
import org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER
|
||||
import org.apache.kyuubi.server.metadata.api.Metadata
|
||||
@ -592,4 +594,50 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
|
||||
val batchSession = sessionManager.getBatchSessionImpl(SessionHandle.fromUUID(batch.getId))
|
||||
assert(batchSession.ipAddress === realClientIp)
|
||||
}
|
||||
|
||||
test("expose the metrics with operation type and current state") {
|
||||
assert(getBatchJobSubmissionStateCounter(OperationState.INITIALIZED) === 0)
|
||||
assert(getBatchJobSubmissionStateCounter(OperationState.PENDING) === 0)
|
||||
assert(getBatchJobSubmissionStateCounter(OperationState.RUNNING) === 0)
|
||||
val originalCanceledCounter = getBatchJobSubmissionStateCounter(OperationState.CANCELED)
|
||||
|
||||
val appName = "spark-batch-submission"
|
||||
val requestObj = new BatchRequest(
|
||||
"spark",
|
||||
sparkProcessBuilder.mainResource.get,
|
||||
sparkProcessBuilder.mainClass,
|
||||
appName,
|
||||
Map(
|
||||
"spark.master" -> "local",
|
||||
s"spark.${ENGINE_SPARK_MAX_LIFETIME.key}" -> "5000",
|
||||
s"spark.${ENGINE_CHECK_INTERVAL.key}" -> "1000").asJava,
|
||||
Seq.empty[String].asJava)
|
||||
|
||||
val response = webTarget.path("api/v1/batches")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
val batch = response.readEntity(classOf[Batch])
|
||||
|
||||
assert(getBatchJobSubmissionStateCounter(OperationState.INITIALIZED) +
|
||||
getBatchJobSubmissionStateCounter(OperationState.PENDING) +
|
||||
getBatchJobSubmissionStateCounter(OperationState.RUNNING) === 1)
|
||||
|
||||
val deleteResp = webTarget.path(s"api/v1/batches/${batch.getId}")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.delete()
|
||||
assert(200 == deleteResp.getStatus)
|
||||
|
||||
assert(getBatchJobSubmissionStateCounter(OperationState.INITIALIZED) === 0)
|
||||
assert(getBatchJobSubmissionStateCounter(OperationState.PENDING) === 0)
|
||||
assert(getBatchJobSubmissionStateCounter(OperationState.RUNNING) === 0)
|
||||
assert(
|
||||
getBatchJobSubmissionStateCounter(OperationState.CANCELED) - originalCanceledCounter === 1)
|
||||
}
|
||||
|
||||
private def getBatchJobSubmissionStateCounter(state: OperationState): Long = {
|
||||
val opType = classOf[BatchJobSubmission].getSimpleName
|
||||
val counterName = s"${MetricsConstants.OPERATION_STATE}.$opType.${state.toString.toLowerCase}"
|
||||
MetricsSystem.meterValue(counterName).getOrElse(0L)
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user