[KYUUBI #1159] Add Session stats on Kyuubi Query Engine Page

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

For more detail, please go to https://github.com/apache/incubator-kyuubi/issues/981

![image](https://user-images.githubusercontent.com/86483005/135767720-0807c5dd-13ac-4812-a61a-38e0c8861d0c.png)

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1187 from timothy65535/ky-1159.

Closes #1159

52daf139 [timothy65535] update conf md
a9b50843 [timothy65535] improve patch
36676f2a [timothy65535] improve patch
892a6333 [timothy65535] [KYUUBI #1159] Add Session stats on Kyuubi Query Engine Page

Authored-by: timothy65535 <timothy65535@163.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
timothy65535 2021-10-12 13:44:04 +08:00 committed by Kent Yao
parent 7a77b7da40
commit badd5d516e
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
7 changed files with 310 additions and 13 deletions

View File

@ -178,7 +178,7 @@ kyuubi\.engine<br>\.deregister\.exception<br>\.messages|<div style='width: 65pt;
kyuubi\.engine<br>\.deregister\.exception<br>\.ttl|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT30M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Time to live(TTL) for exceptions pattern specified in kyuubi.engine.deregister.exception.classes and kyuubi.engine.deregister.exception.messages to deregister engines. Once the total error count hits the kyuubi.engine.deregister.job.max.failures within the TTL, an engine will deregister itself and wait for self-terminated. Otherwise, we suppose that the engine has recovered from temporary failures.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.engine<br>\.deregister\.job\.max<br>\.failures|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>4</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Number of failures of job before deregistering the engine.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.engine\.event<br>\.json\.log\.path|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>file:/tmp/kyuubi/events</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The location of all the engine events go for the builtin JSON logger.<ul><li>Local Path: start with 'file:'</li><li>HDFS Path: start with 'hdfs:'</li></ul></div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.3.0</div>
kyuubi\.engine\.event<br>\.loggers|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of engine history loggers, where engine/session/operation etc events go.<ul> <li>SPARK: the events will be written to the spark history events</li> <li>JSON: the events will be written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be done</li> <li>CUSTOM: to be done.</li></ul></div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.3.0</div>
kyuubi\.engine\.event<br>\.loggers|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SPARK</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of engine history loggers, where engine/session/operation etc events go. We use spark logger by default.<ul> <li>SPARK: the events will be written to the spark listener bus.</li> <li>JSON: the events will be written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be done</li> <li>CUSTOM: to be done.</li></ul></div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.3.0</div>
kyuubi\.engine<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SHOW DATABASES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.engine<br>\.operation\.log\.dir<br>\.root|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>engine_operation_logs</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Root directory for query operation log at engine-side.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.engine\.pool<br>\.size|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>-1</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The size of engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold).</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>

View File

@ -38,13 +38,15 @@ import org.apache.kyuubi.ha.client.RetryPolicies
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.SignalRegister
case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngine") {
case class SparkSQLEngine(
spark: SparkSession,
store: EngineEventsStore) extends Serverable("SparkSQLEngine") {
override val backendService = new SparkSQLBackendService(spark)
override val frontendServices = Seq(new SparkThriftBinaryFrontendService(this))
override def initialize(conf: KyuubiConf): Unit = {
val listener = new SparkSQLEngineListener(this, new EngineEventsStore(conf))
val listener = new SparkSQLEngineListener(this, store)
spark.sparkContext.addSparkListener(listener)
super.initialize(conf)
}
@ -109,7 +111,8 @@ object SparkSQLEngine extends Logging {
}
def startEngine(spark: SparkSession): Unit = {
currentEngine = Some(new SparkSQLEngine(spark))
val store = new EngineEventsStore(kyuubiConf)
currentEngine = Some(new SparkSQLEngine(spark, store))
currentEngine.foreach { engine =>
// start event logging ahead so that we can capture all statuses
val eventLogging = new EventLoggingService(spark.sparkContext)
@ -131,7 +134,7 @@ object SparkSQLEngine extends Logging {
}
try {
engine.start()
EngineTab(engine)
EngineTab(engine, store)
val event = EngineEvent(engine)
info(event)
EventLoggingService.onEvent(event)

View File

@ -45,6 +45,14 @@ case class SessionEvent(
override def schema: StructType = Encoders.product[SessionEvent].schema
override lazy val partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(startTime)) :: Nil
def duration: Long = {
if (endTime == -1L) {
System.currentTimeMillis - startTime
} else {
endTime - startTime
}
}
}
object SessionEvent {

View File

@ -17,15 +17,23 @@
package org.apache.spark.kyuubi.ui
import java.net.URLEncoder
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Date
import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.xml.{Node, Unparsed}
import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.ui.UIUtils.formatDurationVerbose
import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage}
import org.apache.spark.ui.UIUtils._
import org.apache.kyuubi.Utils
import org.apache.kyuubi.engine.spark.events.SessionEvent
case class EnginePage(parent: EngineTab) extends WebUIPage("") {
private val store = parent.store
override def render(request: HttpServletRequest): Seq[Node] = {
val content =
generateBasicStats() ++
@ -36,7 +44,8 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") {
{parent.engine.backendService.sessionManager.getOpenSessionCount} session(s) are online,
running {parent.engine.backendService.sessionManager.operationManager.getOperationCount}
operations
</h4>
</h4> ++
generateSessionStatsTable(request)
UIUtils.headerSparkPage(request, parent.name, content, parent)
}
@ -83,4 +92,244 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") {
Seq.empty
}
}
/** Generate stats of sessions for the engine */
private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = {
val numSessions = store.getSessionList.size
val table = if (numSessions > 0) {
val sessionTableTag = "sessionstat"
val sessionTablePage =
Option(request.getParameter(s"$sessionTableTag.page")).map(_.toInt).getOrElse(1)
try {
Some(new SessionStatsPagedTable(
request,
parent,
store.getSessionList,
"kyuubi",
UIUtils.prependBaseUri(request, parent.basePath),
sessionTableTag
).table(sessionTablePage))
} catch {
case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
Some(<div class="alert alert-error">
<p>Error while rendering job table:</p>
<pre>
{Utils.stringifyException(e)}
</pre>
</div>)
}
} else {
None
}
val content =
<span id="sessionstat" class="collapse-aggregated-sessionstat collapse-table"
onClick="collapseTable('collapse-aggregated-sessionstat',
'aggregated-sessionstat')">
<h4>
<span class="collapse-table-arrow arrow-open"></span>
<a>Session Statistics ({numSessions})</a>
</h4>
</span> ++
<div class="aggregated-sessionstat collapsible-table">
{table.getOrElse("No statistics have been generated yet.")}
</div>
content
}
private class SessionStatsPagedTable(
request: HttpServletRequest,
parent: EngineTab,
data: Seq[SessionEvent],
subPath: String,
basePath: String,
sessionStatsTableTag: String) extends PagedTable[SessionEvent] {
private val (sortColumn, desc, pageSize) =
getRequestTableParameters(request, sessionStatsTableTag, "Start Time")
private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
private val parameterPath =
s"$basePath/$subPath/?${getRequestParameterOtherTable(request, sessionStatsTableTag)}"
override val dataSource = new SessionStatsTableDataSource(data, pageSize, sortColumn, desc)
override def tableId: String = sessionStatsTableTag
override def tableCssClass: String =
"table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"
override def pageLink(page: Int): String = {
parameterPath +
s"&$pageNumberFormField=$page" +
s"&$sessionStatsTableTag.sort=$encodedSortColumn" +
s"&$sessionStatsTableTag.desc=$desc" +
s"&$pageSizeFormField=$pageSize" +
s"#$sessionStatsTableTag"
}
override def pageSizeFormField: String = s"$sessionStatsTableTag.pageSize"
override def pageNumberFormField: String = s"$sessionStatsTableTag.page"
override def goButtonFormPath: String =
s"$parameterPath&$sessionStatsTableTag.sort=$encodedSortColumn" +
s"&$sessionStatsTableTag.desc=$desc#$sessionStatsTableTag"
override def headers: Seq[Node] = {
val sessionTableHeadersAndTooltips: Seq[(String, Boolean, Option[String])] =
Seq(
("User", true, None),
("Client IP", true, None),
("Session ID", true, None),
("Start Time", true, None),
("Finish Time", true, None),
("Duration", true, None),
("Total Statements", true, None))
headerStatRow(sessionTableHeadersAndTooltips, desc, pageSize, sortColumn,
parameterPath, sessionStatsTableTag, sessionStatsTableTag)
}
override def row(session: SessionEvent): Seq[Node] = {
val sessionLink = "%s/%s/session/?id=%s".format(
UIUtils.prependBaseUri(request, parent.basePath), parent.prefix, session.sessionId)
<tr>
<td> {session.username} </td>
<td> {session.ip} </td>
<td> <a href={sessionLink}> {session.sessionId} </a> </td>
<td> {formatDate(session.startTime)} </td>
<td> {if (session.endTime > 0) formatDate(session.endTime)} </td>
<td> {formatDurationVerbose(session.duration)} </td>
<td> {session.totalOperations} </td>
</tr>
}
}
/**
* Returns parameter of this table.
*/
def getRequestTableParameters(
request: HttpServletRequest,
tableTag: String,
defaultSortColumn: String): (String, Boolean, Int) = {
val parameterSortColumn = request.getParameter(s"$tableTag.sort")
val parameterSortDesc = request.getParameter(s"$tableTag.desc")
val parameterPageSize = request.getParameter(s"$tableTag.pageSize")
val sortColumn = Option(parameterSortColumn).map { sortColumn =>
UIUtils.decodeURLParameter(sortColumn)
}.getOrElse(defaultSortColumn)
val desc = Option(parameterSortDesc).map(_.toBoolean).getOrElse(
sortColumn == defaultSortColumn
)
val pageSize = Option(parameterPageSize).map(_.toInt).getOrElse(100)
(sortColumn, desc, pageSize)
}
/**
* Returns parameters of other tables in the page.
*/
def getRequestParameterOtherTable(request: HttpServletRequest, tableTag: String): String = {
request.getParameterMap.asScala
.filterNot(_._1.startsWith(tableTag))
.map(parameter => parameter._1 + "=" + parameter._2(0))
.mkString("&")
}
def headerStatRow(
headerInfo: Seq[(String, Boolean, Option[String])],
desc: Boolean,
pageSize: Int,
sortColumn: String,
parameterPath: String,
tableTag: String,
headerId: String): Seq[Node] = {
val row: Seq[Node] = {
headerInfo.map { case (header, sortable, tooltip) =>
if (header == sortColumn) {
val headerLink = Unparsed(
parameterPath +
s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" +
s"&$tableTag.desc=${!desc}" +
s"&$tableTag.pageSize=$pageSize" +
s"#$headerId")
val arrow = if (desc) "&#x25BE;" else "&#x25B4;" // UP or DOWN
<th>
<a href={headerLink}>
<span data-toggle="tooltip" data-placement="top" title={tooltip.getOrElse("")}>
{header}&nbsp;{Unparsed(arrow)}
</span>
</a>
</th>
} else {
if (sortable) {
val headerLink = Unparsed(
parameterPath +
s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" +
s"&$tableTag.pageSize=$pageSize" +
s"#$headerId")
<th>
<a href={headerLink}>
<span data-toggle="tooltip" data-placement="top" title={tooltip.getOrElse("")}>
{header}
</span>
</a>
</th>
} else {
<th>
<span data-toggle="tooltip" data-placement="top" title={tooltip.getOrElse("")}>
{header}
</span>
</th>
}
}
}
}
<thead>
<tr>{row}</tr>
</thead>
}
}
private class SessionStatsTableDataSource(
info: Seq[SessionEvent],
pageSize: Int,
sortColumn: String,
desc: Boolean) extends PagedDataSource[SessionEvent](pageSize) {
// Sorting SessionEvent data
private val data = info.sorted(ordering(sortColumn, desc))
override def dataSize: Int = data.size
override def sliceData(from: Int, to: Int): Seq[SessionEvent] = data.slice(from, to)
/**
* Return Ordering according to sortColumn and desc.
*/
private def ordering(sortColumn: String, desc: Boolean): Ordering[SessionEvent] = {
val ordering: Ordering[SessionEvent] = sortColumn match {
case "User" => Ordering.by(_.username)
case "Client IP" => Ordering.by(_.ip)
case "Session ID" => Ordering.by(_.sessionId)
case "Start Time" => Ordering by (_.startTime)
case "Finish Time" => Ordering.by(_.endTime)
case "Duration" => Ordering.by(_.duration)
case "Total Statements" => Ordering.by(_.totalOperations)
case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
}
if (desc) {
ordering.reverse
} else {
ordering
}
}
}

View File

@ -25,12 +25,13 @@ import scala.util.control.NonFatal
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.SparkSQLEngine
import org.apache.kyuubi.engine.spark.events.EngineEventsStore
import org.apache.kyuubi.service.ServiceState
/**
* Note that [[SparkUITab]] is private for Spark
*/
case class EngineTab(engine: SparkSQLEngine)
case class EngineTab(engine: SparkSQLEngine, store: EngineEventsStore)
extends SparkUITab(engine.spark.sparkContext.ui.orNull, "kyuubi") with Logging {
override val name: String = "Kyuubi Query Engine"

View File

@ -64,5 +64,41 @@ class EngineTabSuite extends WithSparkSQLEngine with JDBCTestUtils {
}
}
test("session stats for engine tab") {
assert(spark.sparkContext.ui.nonEmpty)
val client = HttpClients.createDefault()
val req = new HttpGet(spark.sparkContext.uiWebUrl.get + "/kyuubi/")
val response = client.execute(req)
assert(response.getStatusLine.getStatusCode === 200)
val resp = EntityUtils.toString(response.getEntity)
assert(resp.contains("0 session(s) are online,"))
withJdbcStatement() { statement =>
statement.execute(
"""
|SELECT
| l.id % 100 k,
| sum(l.id) sum,
| count(l.id) cnt,
| avg(l.id) avg,
| min(l.id) min,
| max(l.id) max
|from range(0, 100000L, 1, 100) l
| left join range(0, 100000L, 2, 100) r ON l.id = r.id
|GROUP BY 1""".stripMargin)
val response = client.execute(req)
assert(response.getStatusLine.getStatusCode === 200)
val resp = EntityUtils.toString(response.getEntity)
// check session section
assert(resp.contains("Session Statistics"))
// check session stats table id
assert(resp.contains("sessionstat"))
// check session stats table title
assert(resp.contains("Total Statements"))
}
}
override protected def jdbcUrl: String = getJdbcUrl
}

View File

@ -880,8 +880,8 @@ object KyuubiConf {
val ENGINE_EVENT_LOGGERS: ConfigEntry[Seq[String]] =
buildConf("engine.event.loggers")
.doc("A comma separated list of engine history loggers, where engine/session/operation etc" +
" events go.<ul>" +
" <li>SPARK: the events will be written to the spark history events</li>" +
" events go. We use spark logger by default.<ul>" +
" <li>SPARK: the events will be written to the spark listener bus.</li>" +
s" <li>JSON: the events will be written to the location of" +
s" ${ENGINE_EVENT_JSON_LOG_PATH.key}</li>" +
s" <li>JDBC: to be done</li>" +
@ -892,7 +892,7 @@ object KyuubiConf {
.toSequence()
.checkValue(_.toSet.subsetOf(Set("SPARK", "JSON", "JDBC", "CUSTOM")),
"Unsupported event loggers")
.createWithDefault(Nil)
.createWithDefault(Seq("SPARK"))
val ENGINE_UI_STOP_ENABLED: ConfigEntry[Boolean] =
buildConf("engine.ui.stop.enabled")