diff --git a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala index adb0cc563..a2c9afd0f 100644 --- a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala +++ b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala @@ -41,6 +41,8 @@ object MetricsConstants { final val OPERATION_TOTAL: String = OPERATION + "total" final private val BACKEND_SERVICE = KYUUBI + "backend_service." + final val BS_FETCH_LOG_ROWS_RATE = BACKEND_SERVICE + "fetch_log_rows_rate" + final val BS_FETCH_RESULT_ROWS_RATE = BACKEND_SERVICE + "fetch_result_rows_rate" final val BS_OPEN_SESSION = BACKEND_SERVICE + "open_session" final val BS_CLOSE_SESSION = BACKEND_SERVICE + "close_session" final val BS_GET_INFO = BACKEND_SERVICE + "get_info" diff --git a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala index ded1e748c..389dba08c 100644 --- a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala +++ b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala @@ -53,6 +53,11 @@ class MetricsSystem extends CompositeService("MetricsSystem") { timer.update(duration, unit) } + def markMeter(key: String, value: Long = 1): Unit = { + val meter = registry.meter(key) + meter.mark(value) + } + def registerGauge[T](name: String, value: => T, default: T): Unit = { registry.register( MetricRegistry.name(name), @@ -105,5 +110,4 @@ object MetricsSystem { tracing(_.updateTimer(name, System.nanoTime() - startTime, TimeUnit.NANOSECONDS)) } } - } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceTimeMetric.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala similarity index 85% rename from kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceTimeMetric.scala rename to kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala index dd26c3d39..aa4f183ff 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceTimeMetric.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala @@ -25,7 +25,7 @@ import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation import org.apache.kyuubi.service.BackendService import org.apache.kyuubi.session.SessionHandle -trait BackendServiceTimeMetric extends BackendService { +trait BackendServiceMetric extends BackendService { abstract override def openSession( protocol: TProtocolVersion, @@ -152,7 +152,28 @@ trait BackendServiceTimeMetric extends BackendService { maxRows: Int, fetchLog: Boolean): TRowSet = { MetricsSystem.timerTracing(MetricsConstants.BS_FETCH_RESULTS) { - super.fetchResults(operationHandle, orientation, maxRows, fetchLog) + val rowSet = super.fetchResults(operationHandle, orientation, maxRows, fetchLog) + val rowsSize = + if (rowSet.getColumnsSize > 0) { + rowSet.getColumns.get(0).getFieldValue match { + case t: TStringColumn => t.getValues.size() + case t: TDoubleColumn => t.getValues.size() + case t: TI64Column => t.getValues.size() + case t: TI32Column => t.getValues.size() + case t: TI16Column => t.getValues.size() + case t: TBoolColumn => t.getValues.size() + case t: TByteColumn => t.getValues.size() + case t: TBinaryColumn => t.getValues.size() + case _ => 0 + } + } else rowSet.getRowsSize + + MetricsSystem.tracing(_.markMeter( + if (fetchLog) MetricsConstants.BS_FETCH_LOG_ROWS_RATE + else MetricsConstants.BS_FETCH_RESULT_ROWS_RATE, + rowsSize)) + + rowSet } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala index 3a9ec0b2a..2cb4ead69 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala @@ -136,7 +136,7 @@ class KyuubiServer(name: String) extends Serverable(name) { def this() = this(classOf[KyuubiServer].getSimpleName) override val backendService: AbstractBackendService = - new KyuubiBackendService() with BackendServiceTimeMetric + new KyuubiBackendService() with BackendServiceMetric override lazy val frontendServices: Seq[AbstractFrontendService] = conf.get(FRONTEND_PROTOCOLS).map(FrontendProtocols.withName).map { diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/BackendServiceTimeMetricSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/BackendServiceMetricSuite.scala similarity index 59% rename from kyuubi-server/src/test/scala/org/apache/kyuubi/server/BackendServiceTimeMetricSuite.scala rename to kyuubi-server/src/test/scala/org/apache/kyuubi/server/BackendServiceMetricSuite.scala index 0e1313353..5917aa7f7 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/BackendServiceTimeMetricSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/BackendServiceMetricSuite.scala @@ -27,7 +27,7 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.metrics.{MetricsConf, MetricsConstants} import org.apache.kyuubi.operation.HiveJDBCTestHelper -class BackendServiceTimeMetricSuite extends WithKyuubiServer with HiveJDBCTestHelper { +class BackendServiceMetricSuite extends WithKyuubiServer with HiveJDBCTestHelper { override protected def jdbcUrl: String = getJdbcUrl @@ -39,35 +39,49 @@ class BackendServiceTimeMetricSuite extends WithKyuubiServer with HiveJDBCTestHe .set(MetricsConf.METRICS_JSON_INTERVAL, Duration.ofMillis(100).toMillis) } - test("backend service method time metric test") { + test("backend service metric test") { val objMapper = new ObjectMapper() withJdbcStatement() { statement => - statement.execute("show databases") + statement.executeQuery("CREATE TABLE stu_test(id int, name string) USING parquet") + statement.execute("insert into stu_test values(1, 'a'), (2, 'b'), (3, 'c')") Thread.sleep(Duration.ofMillis(111).toMillis) val res1 = objMapper.readTree(Paths.get(reportPath.toString, "report.json").toFile) assert(res1.has("timers")) - val histograms1 = res1.get("timers") + val timer1 = res1.get("timers") assert( - histograms1.get(MetricsConstants.BS_EXECUTE_STATEMENT).get("count").asInt() == 1) + timer1.get(MetricsConstants.BS_EXECUTE_STATEMENT).get("count").asInt() == 2) assert( - histograms1.get(MetricsConstants.BS_EXECUTE_STATEMENT).get("mean").asDouble() > 0) + timer1.get(MetricsConstants.BS_EXECUTE_STATEMENT).get("mean").asDouble() > 0) - statement.execute("show tables") + assert(res1.has("meters")) + val meters1 = res1.get("meters") + val logRows1 = meters1.get(MetricsConstants.BS_FETCH_LOG_ROWS_RATE).get("count").asInt() + assert(logRows1 > 0) + + statement.execute("select * from stu_test limit 2") + statement.getResultSet.next() Thread.sleep(Duration.ofMillis(111).toMillis) val res2 = objMapper.readTree(Paths.get(reportPath.toString, "report.json").toFile) - val histograms2 = res2.get("timers") + val timer2 = res2.get("timers") assert( - histograms2.get(MetricsConstants.BS_OPEN_SESSION).get("count").asInt() == 1) + timer2.get(MetricsConstants.BS_OPEN_SESSION).get("count").asInt() == 1) assert( - histograms2.get(MetricsConstants.BS_OPEN_SESSION).get("min").asInt() > 0) - val execStatementNode2 = histograms2.get(MetricsConstants.BS_EXECUTE_STATEMENT) - assert(execStatementNode2.get("count").asInt() == 2) + timer2.get(MetricsConstants.BS_OPEN_SESSION).get("min").asInt() > 0) + val execStatementNode2 = timer2.get(MetricsConstants.BS_EXECUTE_STATEMENT) + assert(execStatementNode2.get("count").asInt() == 3) assert( execStatementNode2.get("max").asDouble() >= execStatementNode2.get("mean").asDouble() && execStatementNode2.get("mean").asDouble() >= execStatementNode2.get("min").asDouble()) + + val meters2 = + objMapper.readTree(Paths.get(reportPath.toString, "report.json").toFile).get("meters") + assert(meters2.get(MetricsConstants.BS_FETCH_RESULT_ROWS_RATE).get("count").asInt() == 2) + assert(meters2.get(MetricsConstants.BS_FETCH_LOG_ROWS_RATE).get("count").asInt() >= logRows1) + + statement.executeQuery("DROP TABLE stu_test") } } }