[KYUUBI #1696] Add fetch logs and results rows rate

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
Add fetch results and logs rows rate metric.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1707 from zhenjiaguo/add_fetch_logs_and_result.

Closes #1696

3b7d1fca [zhenjiaguo] Merge branch 'add_fetch_logs_and_result' of github.com:zhenjiaguo/kyuubi into add_fetch_logs_and_result
653e3b00 [zhenjiaguo] comment
e93d595a [zhenjiaguo] rerun action
6db7a768 [zhenjiaguo] add rowset not null judgment
eff57dec [zhenjiaguo] change fetch name
5543b92a [zhenjiaguo] add fetch logs and results rate
b99f5a69 [zhenjiaguo] comment
3703e350 [zhenjiaguo] rerun action
082f41a6 [zhenjiaguo] add rowset not null judgment
1ca562bc [zhenjiaguo] change fetch name
c6fef427 [zhenjiaguo] add fetch logs and results rate

Authored-by: zhenjiaguo <zhenjiaguo@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
zhenjiaguo 2022-01-17 10:12:18 +08:00 committed by Kent Yao
parent 4f2696d300
commit fb0c3a9794
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
5 changed files with 57 additions and 16 deletions

View File

@ -41,6 +41,8 @@ object MetricsConstants {
final val OPERATION_TOTAL: String = OPERATION + "total"
final private val BACKEND_SERVICE = KYUUBI + "backend_service."
final val BS_FETCH_LOG_ROWS_RATE = BACKEND_SERVICE + "fetch_log_rows_rate"
final val BS_FETCH_RESULT_ROWS_RATE = BACKEND_SERVICE + "fetch_result_rows_rate"
final val BS_OPEN_SESSION = BACKEND_SERVICE + "open_session"
final val BS_CLOSE_SESSION = BACKEND_SERVICE + "close_session"
final val BS_GET_INFO = BACKEND_SERVICE + "get_info"

View File

@ -53,6 +53,11 @@ class MetricsSystem extends CompositeService("MetricsSystem") {
timer.update(duration, unit)
}
def markMeter(key: String, value: Long = 1): Unit = {
val meter = registry.meter(key)
meter.mark(value)
}
def registerGauge[T](name: String, value: => T, default: T): Unit = {
registry.register(
MetricRegistry.name(name),
@ -105,5 +110,4 @@ object MetricsSystem {
tracing(_.updateTimer(name, System.nanoTime() - startTime, TimeUnit.NANOSECONDS))
}
}
}

View File

@ -25,7 +25,7 @@ import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.service.BackendService
import org.apache.kyuubi.session.SessionHandle
trait BackendServiceTimeMetric extends BackendService {
trait BackendServiceMetric extends BackendService {
abstract override def openSession(
protocol: TProtocolVersion,
@ -152,7 +152,28 @@ trait BackendServiceTimeMetric extends BackendService {
maxRows: Int,
fetchLog: Boolean): TRowSet = {
MetricsSystem.timerTracing(MetricsConstants.BS_FETCH_RESULTS) {
super.fetchResults(operationHandle, orientation, maxRows, fetchLog)
val rowSet = super.fetchResults(operationHandle, orientation, maxRows, fetchLog)
val rowsSize =
if (rowSet.getColumnsSize > 0) {
rowSet.getColumns.get(0).getFieldValue match {
case t: TStringColumn => t.getValues.size()
case t: TDoubleColumn => t.getValues.size()
case t: TI64Column => t.getValues.size()
case t: TI32Column => t.getValues.size()
case t: TI16Column => t.getValues.size()
case t: TBoolColumn => t.getValues.size()
case t: TByteColumn => t.getValues.size()
case t: TBinaryColumn => t.getValues.size()
case _ => 0
}
} else rowSet.getRowsSize
MetricsSystem.tracing(_.markMeter(
if (fetchLog) MetricsConstants.BS_FETCH_LOG_ROWS_RATE
else MetricsConstants.BS_FETCH_RESULT_ROWS_RATE,
rowsSize))
rowSet
}
}

View File

@ -136,7 +136,7 @@ class KyuubiServer(name: String) extends Serverable(name) {
def this() = this(classOf[KyuubiServer].getSimpleName)
override val backendService: AbstractBackendService =
new KyuubiBackendService() with BackendServiceTimeMetric
new KyuubiBackendService() with BackendServiceMetric
override lazy val frontendServices: Seq[AbstractFrontendService] =
conf.get(FRONTEND_PROTOCOLS).map(FrontendProtocols.withName).map {

View File

@ -27,7 +27,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.metrics.{MetricsConf, MetricsConstants}
import org.apache.kyuubi.operation.HiveJDBCTestHelper
class BackendServiceTimeMetricSuite extends WithKyuubiServer with HiveJDBCTestHelper {
class BackendServiceMetricSuite extends WithKyuubiServer with HiveJDBCTestHelper {
override protected def jdbcUrl: String = getJdbcUrl
@ -39,35 +39,49 @@ class BackendServiceTimeMetricSuite extends WithKyuubiServer with HiveJDBCTestHe
.set(MetricsConf.METRICS_JSON_INTERVAL, Duration.ofMillis(100).toMillis)
}
test("backend service method time metric test") {
test("backend service metric test") {
val objMapper = new ObjectMapper()
withJdbcStatement() { statement =>
statement.execute("show databases")
statement.executeQuery("CREATE TABLE stu_test(id int, name string) USING parquet")
statement.execute("insert into stu_test values(1, 'a'), (2, 'b'), (3, 'c')")
Thread.sleep(Duration.ofMillis(111).toMillis)
val res1 = objMapper.readTree(Paths.get(reportPath.toString, "report.json").toFile)
assert(res1.has("timers"))
val histograms1 = res1.get("timers")
val timer1 = res1.get("timers")
assert(
histograms1.get(MetricsConstants.BS_EXECUTE_STATEMENT).get("count").asInt() == 1)
timer1.get(MetricsConstants.BS_EXECUTE_STATEMENT).get("count").asInt() == 2)
assert(
histograms1.get(MetricsConstants.BS_EXECUTE_STATEMENT).get("mean").asDouble() > 0)
timer1.get(MetricsConstants.BS_EXECUTE_STATEMENT).get("mean").asDouble() > 0)
statement.execute("show tables")
assert(res1.has("meters"))
val meters1 = res1.get("meters")
val logRows1 = meters1.get(MetricsConstants.BS_FETCH_LOG_ROWS_RATE).get("count").asInt()
assert(logRows1 > 0)
statement.execute("select * from stu_test limit 2")
statement.getResultSet.next()
Thread.sleep(Duration.ofMillis(111).toMillis)
val res2 = objMapper.readTree(Paths.get(reportPath.toString, "report.json").toFile)
val histograms2 = res2.get("timers")
val timer2 = res2.get("timers")
assert(
histograms2.get(MetricsConstants.BS_OPEN_SESSION).get("count").asInt() == 1)
timer2.get(MetricsConstants.BS_OPEN_SESSION).get("count").asInt() == 1)
assert(
histograms2.get(MetricsConstants.BS_OPEN_SESSION).get("min").asInt() > 0)
val execStatementNode2 = histograms2.get(MetricsConstants.BS_EXECUTE_STATEMENT)
assert(execStatementNode2.get("count").asInt() == 2)
timer2.get(MetricsConstants.BS_OPEN_SESSION).get("min").asInt() > 0)
val execStatementNode2 = timer2.get(MetricsConstants.BS_EXECUTE_STATEMENT)
assert(execStatementNode2.get("count").asInt() == 3)
assert(
execStatementNode2.get("max").asDouble() >= execStatementNode2.get("mean").asDouble() &&
execStatementNode2.get("mean").asDouble() >= execStatementNode2.get("min").asDouble())
val meters2 =
objMapper.readTree(Paths.get(reportPath.toString, "report.json").toFile).get("meters")
assert(meters2.get(MetricsConstants.BS_FETCH_RESULT_ROWS_RATE).get("count").asInt() == 2)
assert(meters2.get(MetricsConstants.BS_FETCH_LOG_ROWS_RATE).get("count").asInt() >= logRows1)
statement.executeQuery("DROP TABLE stu_test")
}
}
}