[KYUUBI #1586] Add time metric on each KyuubiBackendService method
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Add time metric on each` KyuubiBackendService` method can help us inspect Kyuubi Server running status inside. It can Indirect reflecting our RPC call time when using Kyuubi. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [x] Add screenshots for manual tests if appropriate  - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1588 from zhenjiaguo/add_be_method_metric. Closes #1586 a001eb96 [zhenjiaguo] Add time metric on each KyuubiBackendService method Authored-by: zhenjiaguo <zhenjia_guo@163.com> Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
parent
01364e589f
commit
f3dc1fdecd
@ -40,4 +40,22 @@ object MetricsConstants {
|
||||
final val OPERATION_FAIL: String = OPERATION + "failed"
|
||||
final val OPERATION_TOTAL: String = OPERATION + "total"
|
||||
|
||||
final private val BACKEND_SERVICE = KYUUBI + "backend_service."
|
||||
final val OPEN_SESSION_MS = BACKEND_SERVICE + "open_session_ms"
|
||||
final val CLOSE_SESSION_MS = BACKEND_SERVICE + "close_session_ms"
|
||||
final val GET_INFO_MS = BACKEND_SERVICE + "get_info_ms"
|
||||
final val EXECUTE_STATEMENT_MS = BACKEND_SERVICE + "execute_statement_ms"
|
||||
final val GET_TYPE_INFO_MS = BACKEND_SERVICE + "get_type_info_ms"
|
||||
final val GET_CATALOGS_MS = BACKEND_SERVICE + "get_catalogs_ms"
|
||||
final val GET_SCHEMAS_MS = BACKEND_SERVICE + "get_schemas_ms"
|
||||
final val GET_TABLES_MS = BACKEND_SERVICE + "get_tables_ms"
|
||||
final val GET_TABLE_TYPES_MS = BACKEND_SERVICE + "get_table_types_ms"
|
||||
final val GET_COLUMNS_MS = BACKEND_SERVICE + "get_columns_ms"
|
||||
final val GET_FUNCTIONS_MS = BACKEND_SERVICE + "get_functions_ms"
|
||||
final val GET_OPERATION_STATUS_MS = BACKEND_SERVICE + "get_operation_status_ms"
|
||||
final val CANCEL_OPERATION_MS = BACKEND_SERVICE + "cancel_operation_ms"
|
||||
final val CLOSE_OPERATION_MS = BACKEND_SERVICE + "close_operation_ms"
|
||||
final val GET_RESULT_SET_METADATA_MS = BACKEND_SERVICE + "get_result_set_metadata_ms"
|
||||
final val FETCH_RESULTS_MS = BACKEND_SERVICE + "fetch_results_ms"
|
||||
|
||||
}
|
||||
|
||||
@ -42,6 +42,11 @@ class MetricsSystem extends CompositeService("MetricsSystem") {
|
||||
counter.dec(1L)
|
||||
}
|
||||
|
||||
def updateHistogram(key: String, value: Long): Unit = {
|
||||
val histogram = registry.histogram(key)
|
||||
histogram.update(value)
|
||||
}
|
||||
|
||||
def registerGauge[T](name: String, value: => T, default: T): Unit = {
|
||||
registry.register(
|
||||
MetricRegistry.name(name),
|
||||
|
||||
@ -0,0 +1,169 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.server
|
||||
|
||||
import org.apache.hive.service.rpc.thrift._
|
||||
|
||||
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
|
||||
import org.apache.kyuubi.operation.{OperationHandle, OperationStatus}
|
||||
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
|
||||
import org.apache.kyuubi.service.BackendService
|
||||
import org.apache.kyuubi.session.SessionHandle
|
||||
|
||||
trait BackendServiceTimeMetric extends BackendService {
|
||||
|
||||
@throws[Exception]
|
||||
private def timeMetric[T](name: String)(f: => T): T = {
|
||||
val startTime = System.currentTimeMillis()
|
||||
try {
|
||||
f
|
||||
} finally {
|
||||
MetricsSystem.tracing(
|
||||
_.updateHistogram(name, System.currentTimeMillis() - startTime))
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def openSession(
|
||||
protocol: TProtocolVersion,
|
||||
user: String,
|
||||
password: String,
|
||||
ipAddr: String,
|
||||
configs: Map[String, String]): SessionHandle = {
|
||||
timeMetric(MetricsConstants.OPEN_SESSION_MS) {
|
||||
super.openSession(protocol, user, password, ipAddr, configs)
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def closeSession(sessionHandle: SessionHandle): Unit = {
|
||||
timeMetric(MetricsConstants.CLOSE_SESSION_MS) {
|
||||
super.closeSession(sessionHandle)
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def getInfo(
|
||||
sessionHandle: SessionHandle,
|
||||
infoType: TGetInfoType): TGetInfoValue = {
|
||||
timeMetric(MetricsConstants.GET_INFO_MS) {
|
||||
super.getInfo(sessionHandle, infoType)
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def executeStatement(
|
||||
sessionHandle: SessionHandle,
|
||||
statement: String,
|
||||
runAsync: Boolean,
|
||||
queryTimeout: Long): OperationHandle = {
|
||||
timeMetric(MetricsConstants.EXECUTE_STATEMENT_MS) {
|
||||
super.executeStatement(sessionHandle, statement, runAsync, queryTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def getTypeInfo(sessionHandle: SessionHandle): OperationHandle = {
|
||||
timeMetric(MetricsConstants.GET_TYPE_INFO_MS) {
|
||||
super.getTypeInfo(sessionHandle)
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def getCatalogs(sessionHandle: SessionHandle): OperationHandle = {
|
||||
timeMetric(MetricsConstants.GET_CATALOGS_MS) {
|
||||
super.getCatalogs(sessionHandle)
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def getSchemas(
|
||||
sessionHandle: SessionHandle,
|
||||
catalogName: String,
|
||||
schemaName: String): OperationHandle = {
|
||||
timeMetric(MetricsConstants.GET_SCHEMAS_MS) {
|
||||
super.getSchemas(sessionHandle, catalogName, schemaName)
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def getTables(
|
||||
sessionHandle: SessionHandle,
|
||||
catalogName: String,
|
||||
schemaName: String,
|
||||
tableName: String,
|
||||
tableTypes: java.util.List[String]): OperationHandle = {
|
||||
timeMetric(MetricsConstants.GET_TABLES_MS) {
|
||||
super.getTables(sessionHandle, catalogName, schemaName, tableName, tableTypes)
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def getTableTypes(sessionHandle: SessionHandle): OperationHandle = {
|
||||
timeMetric(MetricsConstants.GET_TABLE_TYPES_MS) {
|
||||
super.getTableTypes(sessionHandle)
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def getColumns(
|
||||
sessionHandle: SessionHandle,
|
||||
catalogName: String,
|
||||
schemaName: String,
|
||||
tableName: String,
|
||||
columnName: String): OperationHandle = {
|
||||
timeMetric(MetricsConstants.GET_COLUMNS_MS) {
|
||||
super.getColumns(sessionHandle, catalogName, schemaName, tableName, columnName)
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def getFunctions(
|
||||
sessionHandle: SessionHandle,
|
||||
catalogName: String,
|
||||
schemaName: String,
|
||||
functionName: String): OperationHandle = {
|
||||
timeMetric(MetricsConstants.GET_FUNCTIONS_MS) {
|
||||
super.getFunctions(sessionHandle, catalogName, schemaName, functionName)
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def getOperationStatus(operationHandle: OperationHandle): OperationStatus = {
|
||||
timeMetric(MetricsConstants.GET_OPERATION_STATUS_MS) {
|
||||
super.getOperationStatus(operationHandle)
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def cancelOperation(operationHandle: OperationHandle): Unit = {
|
||||
timeMetric(MetricsConstants.CANCEL_OPERATION_MS) {
|
||||
super.cancelOperation(operationHandle)
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def closeOperation(operationHandle: OperationHandle): Unit = {
|
||||
timeMetric(MetricsConstants.CLOSE_OPERATION_MS) {
|
||||
super.closeOperation(operationHandle)
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def getResultSetMetadata(operationHandle: OperationHandle): TTableSchema = {
|
||||
timeMetric(MetricsConstants.GET_RESULT_SET_METADATA_MS) {
|
||||
super.getResultSetMetadata(operationHandle)
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def fetchResults(
|
||||
operationHandle: OperationHandle,
|
||||
orientation: FetchOrientation,
|
||||
maxRows: Int,
|
||||
fetchLog: Boolean): TRowSet = {
|
||||
timeMetric(MetricsConstants.FETCH_RESULTS_MS) {
|
||||
super.fetchResults(operationHandle, orientation, maxRows, fetchLog)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -135,7 +135,8 @@ class KyuubiServer(name: String) extends Serverable(name) {
|
||||
|
||||
def this() = this(classOf[KyuubiServer].getSimpleName)
|
||||
|
||||
override val backendService: AbstractBackendService = new KyuubiBackendService()
|
||||
override val backendService: AbstractBackendService =
|
||||
new KyuubiBackendService() with BackendServiceTimeMetric
|
||||
|
||||
override lazy val frontendServices: Seq[AbstractFrontendService] =
|
||||
conf.get(FRONTEND_PROTOCOLS).map(FrontendProtocols.withName).map {
|
||||
|
||||
@ -0,0 +1,73 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.server
|
||||
|
||||
import java.nio.file.{Path, Paths}
|
||||
import java.time.Duration
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
|
||||
import org.apache.kyuubi.{Utils, WithKyuubiServer}
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.metrics.MetricsConf
|
||||
import org.apache.kyuubi.operation.HiveJDBCTestHelper
|
||||
|
||||
class BackendServiceTimeMetricSuite extends WithKyuubiServer with HiveJDBCTestHelper {
|
||||
|
||||
override protected def jdbcUrl: String = getJdbcUrl
|
||||
|
||||
val reportPath: Path = Utils.createTempDir()
|
||||
override protected val conf: KyuubiConf = {
|
||||
KyuubiConf()
|
||||
.set(MetricsConf.METRICS_REPORTERS, Seq("JSON"))
|
||||
.set(MetricsConf.METRICS_JSON_LOCATION, reportPath.toString)
|
||||
.set(MetricsConf.METRICS_JSON_INTERVAL, Duration.ofMillis(100).toMillis)
|
||||
}
|
||||
|
||||
test("backend service method time metric test") {
|
||||
val objMapper = new ObjectMapper()
|
||||
|
||||
withJdbcStatement() { statement =>
|
||||
statement.execute("show databases")
|
||||
Thread.sleep(Duration.ofMillis(111).toMillis)
|
||||
|
||||
val res1 = objMapper.readTree(Paths.get(reportPath.toString, "report.json").toFile)
|
||||
assert(res1.has("histograms"))
|
||||
val histograms1 = res1.get("histograms")
|
||||
assert(
|
||||
histograms1.get("kyuubi.backend_service.execute_statement_ms").get("count").asInt() == 1)
|
||||
assert(
|
||||
histograms1.get("kyuubi.backend_service.execute_statement_ms").get("mean").asDouble() > 0)
|
||||
|
||||
statement.execute("show tables")
|
||||
Thread.sleep(Duration.ofMillis(111).toMillis)
|
||||
|
||||
val res2 = objMapper.readTree(Paths.get(reportPath.toString, "report.json").toFile)
|
||||
val histograms2 = res2.get("histograms")
|
||||
assert(
|
||||
histograms2.get("kyuubi.backend_service.open_session_ms").get("count").asInt() == 1)
|
||||
assert(
|
||||
histograms2.get("kyuubi.backend_service.open_session_ms").get("min").asInt() > 0)
|
||||
val execStatementNode2 = histograms2.get("kyuubi.backend_service.execute_statement_ms")
|
||||
assert(execStatementNode2.get("count").asInt() == 2)
|
||||
assert(
|
||||
execStatementNode2.get("max").asDouble() >= execStatementNode2.get("mean").asDouble() &&
|
||||
execStatementNode2.get("mean").asDouble() >= execStatementNode2.get("min").asDouble())
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user