diff --git a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala index 9a73ccc2e..2fcb1023b 100644 --- a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala +++ b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala @@ -40,4 +40,22 @@ object MetricsConstants { final val OPERATION_FAIL: String = OPERATION + "failed" final val OPERATION_TOTAL: String = OPERATION + "total" + final private val BACKEND_SERVICE = KYUUBI + "backend_service." + final val OPEN_SESSION_MS = BACKEND_SERVICE + "open_session_ms" + final val CLOSE_SESSION_MS = BACKEND_SERVICE + "close_session_ms" + final val GET_INFO_MS = BACKEND_SERVICE + "get_info_ms" + final val EXECUTE_STATEMENT_MS = BACKEND_SERVICE + "execute_statement_ms" + final val GET_TYPE_INFO_MS = BACKEND_SERVICE + "get_type_info_ms" + final val GET_CATALOGS_MS = BACKEND_SERVICE + "get_catalogs_ms" + final val GET_SCHEMAS_MS = BACKEND_SERVICE + "get_schemas_ms" + final val GET_TABLES_MS = BACKEND_SERVICE + "get_tables_ms" + final val GET_TABLE_TYPES_MS = BACKEND_SERVICE + "get_table_types_ms" + final val GET_COLUMNS_MS = BACKEND_SERVICE + "get_columns_ms" + final val GET_FUNCTIONS_MS = BACKEND_SERVICE + "get_functions_ms" + final val GET_OPERATION_STATUS_MS = BACKEND_SERVICE + "get_operation_status_ms" + final val CANCEL_OPERATION_MS = BACKEND_SERVICE + "cancel_operation_ms" + final val CLOSE_OPERATION_MS = BACKEND_SERVICE + "close_operation_ms" + final val GET_RESULT_SET_METADATA_MS = BACKEND_SERVICE + "get_result_set_metadata_ms" + final val FETCH_RESULTS_MS = BACKEND_SERVICE + "fetch_results_ms" + } diff --git a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala index a6200a3fc..ec7a14281 100644 --- a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala +++ b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala @@ -42,6 +42,11 @@ class MetricsSystem extends CompositeService("MetricsSystem") { counter.dec(1L) } + def updateHistogram(key: String, value: Long): Unit = { + val histogram = registry.histogram(key) + histogram.update(value) + } + def registerGauge[T](name: String, value: => T, default: T): Unit = { registry.register( MetricRegistry.name(name), diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceTimeMetric.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceTimeMetric.scala new file mode 100644 index 000000000..76c9394c4 --- /dev/null +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceTimeMetric.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.server + +import org.apache.hive.service.rpc.thrift._ + +import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem} +import org.apache.kyuubi.operation.{OperationHandle, OperationStatus} +import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation +import org.apache.kyuubi.service.BackendService +import org.apache.kyuubi.session.SessionHandle + +trait BackendServiceTimeMetric extends BackendService { + + @throws[Exception] + private def timeMetric[T](name: String)(f: => T): T = { + val startTime = System.currentTimeMillis() + try { + f + } finally { + MetricsSystem.tracing( + _.updateHistogram(name, System.currentTimeMillis() - startTime)) + } + } + + abstract override def openSession( + protocol: TProtocolVersion, + user: String, + password: String, + ipAddr: String, + configs: Map[String, String]): SessionHandle = { + timeMetric(MetricsConstants.OPEN_SESSION_MS) { + super.openSession(protocol, user, password, ipAddr, configs) + } + } + + abstract override def closeSession(sessionHandle: SessionHandle): Unit = { + timeMetric(MetricsConstants.CLOSE_SESSION_MS) { + super.closeSession(sessionHandle) + } + } + + abstract override def getInfo( + sessionHandle: SessionHandle, + infoType: TGetInfoType): TGetInfoValue = { + timeMetric(MetricsConstants.GET_INFO_MS) { + super.getInfo(sessionHandle, infoType) + } + } + + abstract override def executeStatement( + sessionHandle: SessionHandle, + statement: String, + runAsync: Boolean, + queryTimeout: Long): OperationHandle = { + timeMetric(MetricsConstants.EXECUTE_STATEMENT_MS) { + super.executeStatement(sessionHandle, statement, runAsync, queryTimeout) + } + } + + abstract override def getTypeInfo(sessionHandle: SessionHandle): OperationHandle = { + timeMetric(MetricsConstants.GET_TYPE_INFO_MS) { + super.getTypeInfo(sessionHandle) + } + } + + abstract override def getCatalogs(sessionHandle: SessionHandle): OperationHandle = { + timeMetric(MetricsConstants.GET_CATALOGS_MS) { + super.getCatalogs(sessionHandle) + } + } + + abstract override def getSchemas( + sessionHandle: SessionHandle, + catalogName: String, + schemaName: String): OperationHandle = { + timeMetric(MetricsConstants.GET_SCHEMAS_MS) { + super.getSchemas(sessionHandle, catalogName, schemaName) + } + } + + abstract override def getTables( + sessionHandle: SessionHandle, + catalogName: String, + schemaName: String, + tableName: String, + tableTypes: java.util.List[String]): OperationHandle = { + timeMetric(MetricsConstants.GET_TABLES_MS) { + super.getTables(sessionHandle, catalogName, schemaName, tableName, tableTypes) + } + } + + abstract override def getTableTypes(sessionHandle: SessionHandle): OperationHandle = { + timeMetric(MetricsConstants.GET_TABLE_TYPES_MS) { + super.getTableTypes(sessionHandle) + } + } + + abstract override def getColumns( + sessionHandle: SessionHandle, + catalogName: String, + schemaName: String, + tableName: String, + columnName: String): OperationHandle = { + timeMetric(MetricsConstants.GET_COLUMNS_MS) { + super.getColumns(sessionHandle, catalogName, schemaName, tableName, columnName) + } + } + + abstract override def getFunctions( + sessionHandle: SessionHandle, + catalogName: String, + schemaName: String, + functionName: String): OperationHandle = { + timeMetric(MetricsConstants.GET_FUNCTIONS_MS) { + super.getFunctions(sessionHandle, catalogName, schemaName, functionName) + } + } + + abstract override def getOperationStatus(operationHandle: OperationHandle): OperationStatus = { + timeMetric(MetricsConstants.GET_OPERATION_STATUS_MS) { + super.getOperationStatus(operationHandle) + } + } + + abstract override def cancelOperation(operationHandle: OperationHandle): Unit = { + timeMetric(MetricsConstants.CANCEL_OPERATION_MS) { + super.cancelOperation(operationHandle) + } + } + + abstract override def closeOperation(operationHandle: OperationHandle): Unit = { + timeMetric(MetricsConstants.CLOSE_OPERATION_MS) { + super.closeOperation(operationHandle) + } + } + + abstract override def getResultSetMetadata(operationHandle: OperationHandle): TTableSchema = { + timeMetric(MetricsConstants.GET_RESULT_SET_METADATA_MS) { + super.getResultSetMetadata(operationHandle) + } + } + + abstract override def fetchResults( + operationHandle: OperationHandle, + orientation: FetchOrientation, + maxRows: Int, + fetchLog: Boolean): TRowSet = { + timeMetric(MetricsConstants.FETCH_RESULTS_MS) { + super.fetchResults(operationHandle, orientation, maxRows, fetchLog) + } + } + +} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala index 3196f69ef..e6e6f42b6 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala @@ -135,7 +135,8 @@ class KyuubiServer(name: String) extends Serverable(name) { def this() = this(classOf[KyuubiServer].getSimpleName) - override val backendService: AbstractBackendService = new KyuubiBackendService() + override val backendService: AbstractBackendService = + new KyuubiBackendService() with BackendServiceTimeMetric override lazy val frontendServices: Seq[AbstractFrontendService] = conf.get(FRONTEND_PROTOCOLS).map(FrontendProtocols.withName).map { diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/BackendServiceTimeMetricSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/BackendServiceTimeMetricSuite.scala new file mode 100644 index 000000000..d25c26635 --- /dev/null +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/BackendServiceTimeMetricSuite.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.server + +import java.nio.file.{Path, Paths} +import java.time.Duration + +import com.fasterxml.jackson.databind.ObjectMapper + +import org.apache.kyuubi.{Utils, WithKyuubiServer} +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.metrics.MetricsConf +import org.apache.kyuubi.operation.HiveJDBCTestHelper + +class BackendServiceTimeMetricSuite extends WithKyuubiServer with HiveJDBCTestHelper { + + override protected def jdbcUrl: String = getJdbcUrl + + val reportPath: Path = Utils.createTempDir() + override protected val conf: KyuubiConf = { + KyuubiConf() + .set(MetricsConf.METRICS_REPORTERS, Seq("JSON")) + .set(MetricsConf.METRICS_JSON_LOCATION, reportPath.toString) + .set(MetricsConf.METRICS_JSON_INTERVAL, Duration.ofMillis(100).toMillis) + } + + test("backend service method time metric test") { + val objMapper = new ObjectMapper() + + withJdbcStatement() { statement => + statement.execute("show databases") + Thread.sleep(Duration.ofMillis(111).toMillis) + + val res1 = objMapper.readTree(Paths.get(reportPath.toString, "report.json").toFile) + assert(res1.has("histograms")) + val histograms1 = res1.get("histograms") + assert( + histograms1.get("kyuubi.backend_service.execute_statement_ms").get("count").asInt() == 1) + assert( + histograms1.get("kyuubi.backend_service.execute_statement_ms").get("mean").asDouble() > 0) + + statement.execute("show tables") + Thread.sleep(Duration.ofMillis(111).toMillis) + + val res2 = objMapper.readTree(Paths.get(reportPath.toString, "report.json").toFile) + val histograms2 = res2.get("histograms") + assert( + histograms2.get("kyuubi.backend_service.open_session_ms").get("count").asInt() == 1) + assert( + histograms2.get("kyuubi.backend_service.open_session_ms").get("min").asInt() > 0) + val execStatementNode2 = histograms2.get("kyuubi.backend_service.execute_statement_ms") + assert(execStatementNode2.get("count").asInt() == 2) + assert( + execStatementNode2.get("max").asDouble() >= execStatementNode2.get("mean").asDouble() && + execStatementNode2.get("mean").asDouble() >= execStatementNode2.get("min").asDouble()) + } + } +}