[KYUUBI #1188] Add Statement stats on Kyuubi Query Engine Page

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

Add Statement stats on Kyuubi Query Engine Page

![image](https://user-images.githubusercontent.com/86483005/137263757-398a378f-e0b8-498f-b482-60eef650c0ff.png)

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1234 from timothy65535/ky-1188.

Closes #1188

6e0ac523 [timothy65535] trigger rebuild
2b2b2a03 [timothy65535] update patch
f0431359 [timothy65535] [KYUUBI #1188] Add Statement stats on Kyuubi Query Engine Page

Authored-by: timothy65535 <timothy65535@163.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
This commit is contained in:
timothy65535 2021-10-22 14:52:35 +08:00 committed by ulysses-you
parent 589b23f578
commit d6cefb1b9c
No known key found for this signature in database
GPG Key ID: 4C500BC62D576766
9 changed files with 356 additions and 12 deletions

View File

@ -189,6 +189,7 @@ kyuubi\.engine\.share<br>\.level\.sub\.domain|<div style='width: 65pt;word-wrap:
kyuubi\.engine\.share<br>\.level\.subdomain|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Allow end-users to create a subdomain for the share level of an engine. A subdomain is a case-insensitive string values that must be a valid zookeeper sub path. For example, for `USER` share level, an end-user can share a certain engine within a subdomain, not for all of its clients. End-users are free to create multiple engines in the `USER` share level</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.engine\.single<br>\.spark\.session|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>false</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When set to true, this engine is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.3.0</div>
kyuubi\.engine\.ui<br>\.retainedSessions|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>200</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The number of SQL client sessions kept in the Kyuubi Query Engine web UI.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.engine\.ui<br>\.retainedStatements|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>200</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The number of statements kept in the Kyuubi Query Engine web UI.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.engine\.ui\.stop<br>\.enabled|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When true, allows Kyuubi engine to be killed from the Spark Web UI.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.3.0</div>

View File

@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_UI_SESSION_LIMIT
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_UI_SESSION_LIMIT, ENGINE_UI_STATEMENT_LIMIT}
/**
* A memory store that tracking the number of statements and sessions, it provides:
@ -32,7 +32,6 @@ import org.apache.kyuubi.config.KyuubiConf.ENGINE_UI_SESSION_LIMIT
* 1). remove the finished events first.
* 2). remove the active events if still reach the threshold.
*
* // TODO KYUUBI #983 this store will be used in the third task.
*/
class EngineEventsStore(conf: KyuubiConf) {
@ -41,6 +40,11 @@ class EngineEventsStore(conf: KyuubiConf) {
*/
private val retainedSessions: Int = conf.get(ENGINE_UI_SESSION_LIMIT)
/**
* The number of statements kept in the Kyuubi Query Engine web UI.
*/
private val retainedStatements: Int = conf.get(ENGINE_UI_STATEMENT_LIMIT)
/**
* store all session events.
*/
@ -74,7 +78,7 @@ class EngineEventsStore(conf: KyuubiConf) {
val reverseSeq = sessions.values().asScala.toSeq.sortBy(_.startTime).reverse
// remove finished sessions first.
for (event <- reverseSeq if event.endTime != 0L && countToDelete > 0) {
for (event <- reverseSeq if event.endTime != -1L && countToDelete > 0) {
sessions.remove(event.sessionId)
countToDelete -= 1
}
@ -85,5 +89,51 @@ class EngineEventsStore(conf: KyuubiConf) {
countToDelete -= 1
}
}
/**
* store all statements events.
*/
val statements = new ConcurrentHashMap[String, SparkStatementEvent]
/**
* get all statement events order by startTime
*/
def getStatementList: Seq[SparkStatementEvent] = {
statements.values().asScala.toSeq.sortBy(_.createTime)
}
def getStatement(statementId: String): Option[SparkStatementEvent] = {
Option(statements.get(statementId))
}
/**
* save statement events and check the capacity threshold
*/
def saveStatement(statementEvent: SparkStatementEvent): Unit = {
statements.put(statementEvent.statementId, statementEvent)
checkStatementCapacity()
}
/**
* cleanup the statement events if reach the threshold
*/
private def checkStatementCapacity(): Unit = {
var countToDelete = statements.size - retainedStatements
val reverseSeq = statements.values().asScala.toSeq.sortBy(_.createTime).reverse
// remove finished statements first.
for (event <- reverseSeq if event.endTime != -1L && countToDelete > 0) {
statements.remove(event.statementId)
countToDelete -= 1
}
// remove active event if still reach the threshold
for (event <- reverseSeq if countToDelete > 0) {
statements.remove(event.statementId)
countToDelete -= 1
}
}
}

View File

@ -34,6 +34,7 @@ import org.apache.kyuubi.Utils
* @param exception: caught exception if have
*/
case class SparkStatementEvent(
username: String,
statementId: String,
statement: String,
appId: String,
@ -41,10 +42,19 @@ case class SparkStatementEvent(
createTime: Long,
var state: String,
var stateTime: Long,
var endTime: Long = -1L,
var queryExecution: String = "",
var exception: String = "") extends KyuubiSparkEvent {
override def schema: StructType = Encoders.product[SparkStatementEvent].schema
override def partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(createTime)) :: Nil
def duration: Long = {
if (endTime == -1L) {
System.currentTimeMillis - createTime
} else {
endTime - createTime
}
}
}

View File

@ -62,7 +62,7 @@ class ExecuteStatement(
private val operationListener: SQLOperationListener = new SQLOperationListener(this, spark)
val statementEvent: SparkStatementEvent = SparkStatementEvent(
statementId, statement, spark.sparkContext.applicationId,
session.user, statementId, statement, spark.sparkContext.applicationId,
session.handle.identifier.toString, lastAccessTime, state.toString, lastAccessTime)
EventLoggingService.onEvent(statementEvent)
@ -178,6 +178,9 @@ class ExecuteStatement(
super.setState(newState)
statementEvent.state = newState.toString
statementEvent.stateTime = lastAccessTime
if (newState == OperationState.ERROR || newState == OperationState.FINISHED) {
statementEvent.endTime = System.currentTimeMillis()
}
EventLoggingService.onEvent(statementEvent)
}

View File

@ -30,7 +30,7 @@ import org.apache.kyuubi.KyuubiSparkUtils.KYUUBI_STATEMENT_ID_KEY
import org.apache.kyuubi.Logging
import org.apache.kyuubi.Utils.stringifyException
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.spark.events.{EngineEventsStore, SessionEvent}
import org.apache.kyuubi.engine.spark.events.{EngineEventsStore, SessionEvent, SparkStatementEvent}
import org.apache.kyuubi.service.{Serverable, ServiceState}
/**
@ -117,12 +117,17 @@ class SparkSQLEngineListener(
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case e: SessionEvent => updateSession(e)
case e: SessionEvent => updateSessionStore(e)
case e: SparkStatementEvent => updateStatementStore(e)
case _ => // Ignore
}
}
private def updateSession(event: SessionEvent): Unit = {
private def updateSessionStore(event: SessionEvent): Unit = {
store.saveSession(event)
}
private def updateStatementStore(event: SparkStatementEvent): Unit = {
store.saveStatement(event)
}
}

View File

@ -25,11 +25,13 @@ import javax.servlet.http.HttpServletRequest
import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.xml.{Node, Unparsed}
import org.apache.commons.text.StringEscapeUtils
import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage}
import org.apache.spark.ui.UIUtils._
import org.apache.kyuubi.Utils
import org.apache.kyuubi.engine.spark.events.SessionEvent
import org.apache.kyuubi.engine.spark.events.SparkStatementEvent
case class EnginePage(parent: EngineTab) extends WebUIPage("") {
private val store = parent.store
@ -45,7 +47,8 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") {
running {parent.engine.backendService.sessionManager.operationManager.getOperationCount}
operations
</h4> ++
generateSessionStatsTable(request)
generateSessionStatsTable(request) ++
generateStatementStatsTable(request)
UIUtils.headerSparkPage(request, parent.name, content, parent)
}
@ -93,6 +96,154 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") {
}
}
/** Generate stats of statements for the engine */
private def generateStatementStatsTable(request: HttpServletRequest): Seq[Node] = {
val numStatement = store.getStatementList.size
val table = if (numStatement > 0) {
val sqlTableTag = "sqlstat"
val sqlTablePage =
Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1)
try {
Some(new StatementStatsPagedTable(
request,
parent,
store.getStatementList,
"kyuubi",
UIUtils.prependBaseUri(request, parent.basePath),
sqlTableTag).table(sqlTablePage))
} catch {
case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
Some(<div class="alert alert-error">
<p>Error while rendering job table:</p>
<pre>
{Utils.stringifyException(e)}
</pre>
</div>)
}
} else {
None
}
val content =
<span id="sqlstat" class="collapse-aggregated-sqlstat collapse-table"
onClick="collapseTable('collapse-aggregated-sqlstat',
'aggregated-sqlstat')">
<h4>
<span class="collapse-table-arrow arrow-open"></span>
<a>SQL Statistics ({numStatement})</a>
</h4>
</span> ++
<div class="aggregated-sqlstat collapsible-table">
{table.getOrElse("No statistics have been generated yet.")}
</div>
content
}
private class StatementStatsPagedTable(
request: HttpServletRequest,
parent: EngineTab,
data: Seq[SparkStatementEvent],
subPath: String,
basePath: String,
sqlStatsTableTag: String) extends PagedTable[SparkStatementEvent] {
private val (sortColumn, desc, pageSize) =
getRequestTableParameters(request, sqlStatsTableTag, "Create Time")
private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
private val parameterPath =
s"$basePath/$subPath/?${getRequestParameterOtherTable(request, sqlStatsTableTag)}"
override val dataSource = new StatementStatsTableDataSource(data, pageSize, sortColumn, desc)
override def tableId: String = sqlStatsTableTag
override def tableCssClass: String =
"table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"
override def pageLink(page: Int): String = {
parameterPath +
s"&$pageNumberFormField=$page" +
s"&$sqlStatsTableTag.sort=$encodedSortColumn" +
s"&$sqlStatsTableTag.desc=$desc" +
s"&$pageSizeFormField=$pageSize" +
s"#$sqlStatsTableTag"
}
override def pageSizeFormField: String = s"$sqlStatsTableTag.pageSize"
override def pageNumberFormField: String = s"$sqlStatsTableTag.page"
override def goButtonFormPath: String =
s"$parameterPath&$sqlStatsTableTag.sort=$encodedSortColumn" +
s"&$sqlStatsTableTag.desc=$desc#$sqlStatsTableTag"
override def headers: Seq[Node] = {
val sqlTableHeadersAndTooltips: Seq[(String, Boolean, Option[String])] =
Seq(
("User", true, None),
("Statement ID", true, None),
("Create Time", true, None),
("Finish Time", true, None),
("Duration", true, None),
("Statement", true, None),
("State", true, None),
("Query Execution", true, None))
headerStatRow(sqlTableHeadersAndTooltips, desc, pageSize, sortColumn, parameterPath,
sqlStatsTableTag, sqlStatsTableTag)
}
override def row(sparkStatementEvent: SparkStatementEvent): Seq[Node] = {
<tr>
<td>
{sparkStatementEvent.username}
</td>
<td>
{sparkStatementEvent.statementId}
</td>
<td >
{formatDate(sparkStatementEvent.createTime)}
</td>
<td>
{if (sparkStatementEvent.endTime > 0) formatDate(sparkStatementEvent.endTime)}
</td>
<td >
{formatDurationVerbose(sparkStatementEvent.duration)}
</td>
<td>
<span class="description-input">
{sparkStatementEvent.statement}
</span>
</td>
<td>
{sparkStatementEvent.state}
</td>
{errorMessageCell(sparkStatementEvent.queryExecution)}
</tr>
}
private def errorMessageCell(errorMessage: String): Seq[Node] = {
val isMultiline = errorMessage.indexOf('\n') >= 0
val errorSummary = StringEscapeUtils.escapeHtml4(
if (isMultiline) {
errorMessage.substring(0, errorMessage.indexOf('\n'))
} else {
errorMessage
})
val details = detailsUINode(isMultiline, errorMessage)
<td>
{errorSummary}{details}
</td>
}
}
/** Generate stats of sessions for the engine */
private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = {
val numSessions = store.getSessionList.size
@ -333,3 +484,39 @@ private class SessionStatsTableDataSource(
}
}
}
private class StatementStatsTableDataSource(
info: Seq[SparkStatementEvent],
pageSize: Int,
sortColumn: String,
desc: Boolean) extends PagedDataSource[SparkStatementEvent](pageSize) {
// Sorting SessionEvent data
private val data = info.sorted(ordering(sortColumn, desc))
override def dataSize: Int = data.size
override def sliceData(from: Int, to: Int): Seq[SparkStatementEvent] = data.slice(from, to)
/**
* Return Ordering according to sortColumn and desc.
*/
private def ordering(sortColumn: String, desc: Boolean): Ordering[SparkStatementEvent] = {
val ordering: Ordering[SparkStatementEvent] = sortColumn match {
case "User" => Ordering.by(_.username)
case "Statement ID" => Ordering.by(_.statementId)
case "Create Time" => Ordering by (_.createTime)
case "Finish Time" => Ordering.by(_.endTime)
case "Duration" => Ordering.by(_.duration)
case "Statement" => Ordering.by(_.statement)
case "State" => Ordering.by(_.state)
case "Query Execution" => Ordering.by(_.queryExecution)
case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
}
if (desc) {
ordering.reverse
} else {
ordering
}
}
}

View File

@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.events
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_UI_SESSION_LIMIT
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_UI_SESSION_LIMIT, ENGINE_UI_STATEMENT_LIMIT}
class EngineEventsStoreSuite extends KyuubiFunSuite {
@ -58,10 +58,10 @@ class EngineEventsStoreSuite extends KyuubiFunSuite {
val store = new EngineEventsStore(conf)
store.saveSession(SessionEvent("s1", "ea", "test1", "1.1.1.1", 1L, 0L))
store.saveSession(SessionEvent("s2", "ea", "test1", "1.1.1.1", 2L, 0L))
store.saveSession(SessionEvent("s1", "ea", "test1", "1.1.1.1", 1L, -1L))
store.saveSession(SessionEvent("s2", "ea", "test1", "1.1.1.1", 2L, -1L))
store.saveSession(SessionEvent("s3", "ea", "test1", "1.1.1.1", 3L, 1L))
store.saveSession(SessionEvent("s4", "ea", "test1", "1.1.1.1", 4L, 0L))
store.saveSession(SessionEvent("s4", "ea", "test1", "1.1.1.1", 4L, -1L))
assert(store.getSessionList.size == 3)
assert(store.getSessionList(2).sessionId == "s4")
@ -79,4 +79,48 @@ class EngineEventsStoreSuite extends KyuubiFunSuite {
assert(store.getSession("abc").get.endTime == finishTimestamp)
}
test("ensure that the statements are stored in order") {
val store = new EngineEventsStore(KyuubiConf())
val s1 = SparkStatementEvent("a", "ea1", "select 1", "app1", "sid1", 1L, "RUNNING", 2L)
val s2 = SparkStatementEvent("c", "ea2", "select 2", "app2", "sid1", 2L, "RUNNING", 2L)
val s3 = SparkStatementEvent("b", "ea3", "select 3", "app3", "sid1", 3L, "RUNNING", 2L)
store.saveStatement(s1)
store.saveStatement(s2)
store.saveStatement(s3)
assert(store.getStatementList.size == 3)
assert(store.getStatementList.head.statementId == "ea1")
assert(store.getStatementList.last.statementId == "ea3")
}
test("test drop statements when reach the threshold ") {
val conf = KyuubiConf()
conf.set(ENGINE_UI_STATEMENT_LIMIT, 3)
val store = new EngineEventsStore(conf)
for (i <- 1 to 5) {
val s = SparkStatementEvent("a", s"ea1${i}", "select 1", "app1", "sid1", 1L, "RUNNING", 2L)
store.saveStatement(s)
}
assert(store.getStatementList.size == 3)
}
test("test drop statements when reach the threshold, and try to keep active events.") {
val conf = KyuubiConf()
conf.set(ENGINE_UI_STATEMENT_LIMIT, 3)
val store = new EngineEventsStore(conf)
store.saveStatement(SparkStatementEvent("a", "s1", "select 1", "a1", "si1", 1L, "RUNNING", -1L))
store.saveStatement(SparkStatementEvent("a", "s2", "select 1", "a2", "si1", 2L, "RUNNING", -1L))
store.saveStatement(SparkStatementEvent("a", "s3", "1", "a3", "si1", 3L, "ERROR", 3L, 3L))
store.saveStatement(SparkStatementEvent("a", "s4", "select 1", "a4", "si1", 4L, "RUNNING", -1L))
assert(store.getStatementList.size == 3)
assert(store.getStatementList(2).statementId == "s4")
}
}

View File

@ -100,5 +100,41 @@ class EngineTabSuite extends WithSparkSQLEngine with JDBCTestUtils {
}
}
test("statement stats for engine tab") {
assert(spark.sparkContext.ui.nonEmpty)
val client = HttpClients.createDefault()
val req = new HttpGet(spark.sparkContext.uiWebUrl.get + "/kyuubi/")
val response = client.execute(req)
assert(response.getStatusLine.getStatusCode === 200)
val resp = EntityUtils.toString(response.getEntity)
assert(resp.contains("0 session(s) are online,"))
withJdbcStatement() { statement =>
statement.execute(
"""
|SELECT
| l.id % 100 k,
| sum(l.id) sum,
| count(l.id) cnt,
| avg(l.id) avg,
| min(l.id) min,
| max(l.id) max
|from range(0, 100000L, 1, 100) l
| left join range(0, 100000L, 2, 100) r ON l.id = r.id
|GROUP BY 1""".stripMargin)
val response = client.execute(req)
assert(response.getStatusLine.getStatusCode === 200)
val resp = EntityUtils.toString(response.getEntity)
// check session section
assert(resp.contains("SQL Statistics"))
// check sql stats table id
assert(resp.contains("sqlstat"))
// check sql stats table title
assert(resp.contains("Query Execution"))
}
}
override protected def jdbcUrl: String = getJdbcUrl
}

View File

@ -902,6 +902,14 @@ object KyuubiConf {
.checkValue(_ > 0, "retained sessions must be positive.")
.createWithDefault(200)
val ENGINE_UI_STATEMENT_LIMIT: ConfigEntry[Int] =
buildConf("engine.ui.retainedStatements")
.doc("The number of statements kept in the Kyuubi Query Engine web UI.")
.version("1.4.0")
.intConf
.checkValue(_ > 0, "retained statements must be positive.")
.createWithDefault(200)
val ENGINE_OPERATION_LOG_DIR_ROOT: ConfigEntry[String] =
buildConf("engine.operation.log.dir.root")
.doc("Root directory for query operation log at engine-side.")