[KYUUBI #1284] Add Kyuubi Query Sesstion Page

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

![image](https://user-images.githubusercontent.com/86483005/138647034-25c304b8-4342-4022-9f87-3f8386a94eca.png)

![image](https://user-images.githubusercontent.com/86483005/138647103-12e7165b-8a25-4bb1-b565-efc453a49608.png)

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1285 from timothy65535/1284.

Closes #1284

13bffaaf [timothy65535] [KYUUBI #1284] Add Kyuubi Query Sesstion Page

Authored-by: timothy65535 <timothy65535@163.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
timothy65535 2021-10-26 17:15:33 +08:00 committed by Kent Yao
parent 7b3442a709
commit 14dfd14a93
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
3 changed files with 311 additions and 181 deletions

View File

@ -26,6 +26,7 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.xml.{Node, Unparsed}
import org.apache.commons.text.StringEscapeUtils
import org.apache.spark.kyuubi.ui.TableSourceUtil._
import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage}
import org.apache.spark.ui.UIUtils._
@ -143,107 +144,6 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") {
content
}
private class StatementStatsPagedTable(
request: HttpServletRequest,
parent: EngineTab,
data: Seq[SparkStatementEvent],
subPath: String,
basePath: String,
sqlStatsTableTag: String) extends PagedTable[SparkStatementEvent] {
private val (sortColumn, desc, pageSize) =
getRequestTableParameters(request, sqlStatsTableTag, "Create Time")
private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
private val parameterPath =
s"$basePath/$subPath/?${getRequestParameterOtherTable(request, sqlStatsTableTag)}"
override val dataSource = new StatementStatsTableDataSource(data, pageSize, sortColumn, desc)
override def tableId: String = sqlStatsTableTag
override def tableCssClass: String =
"table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"
override def pageLink(page: Int): String = {
parameterPath +
s"&$pageNumberFormField=$page" +
s"&$sqlStatsTableTag.sort=$encodedSortColumn" +
s"&$sqlStatsTableTag.desc=$desc" +
s"&$pageSizeFormField=$pageSize" +
s"#$sqlStatsTableTag"
}
override def pageSizeFormField: String = s"$sqlStatsTableTag.pageSize"
override def pageNumberFormField: String = s"$sqlStatsTableTag.page"
override def goButtonFormPath: String =
s"$parameterPath&$sqlStatsTableTag.sort=$encodedSortColumn" +
s"&$sqlStatsTableTag.desc=$desc#$sqlStatsTableTag"
override def headers: Seq[Node] = {
val sqlTableHeadersAndTooltips: Seq[(String, Boolean, Option[String])] =
Seq(
("User", true, None),
("Statement ID", true, None),
("Create Time", true, None),
("Finish Time", true, None),
("Duration", true, None),
("Statement", true, None),
("State", true, None),
("Query Execution", true, None))
headerStatRow(sqlTableHeadersAndTooltips, desc, pageSize, sortColumn, parameterPath,
sqlStatsTableTag, sqlStatsTableTag)
}
override def row(sparkStatementEvent: SparkStatementEvent): Seq[Node] = {
<tr>
<td>
{sparkStatementEvent.username}
</td>
<td>
{sparkStatementEvent.statementId}
</td>
<td >
{formatDate(sparkStatementEvent.createTime)}
</td>
<td>
{if (sparkStatementEvent.endTime > 0) formatDate(sparkStatementEvent.endTime)}
</td>
<td >
{formatDurationVerbose(sparkStatementEvent.duration)}
</td>
<td>
<span class="description-input">
{sparkStatementEvent.statement}
</span>
</td>
<td>
{sparkStatementEvent.state}
</td>
{errorMessageCell(sparkStatementEvent.queryExecution)}
</tr>
}
private def errorMessageCell(errorMessage: String): Seq[Node] = {
val isMultiline = errorMessage.indexOf('\n') >= 0
val errorSummary = StringEscapeUtils.escapeHtml4(
if (isMultiline) {
errorMessage.substring(0, errorMessage.indexOf('\n'))
} else {
errorMessage
})
val details = detailsUINode(isMultiline, errorMessage)
<td>
{errorSummary}{details}
</td>
}
}
/** Generate stats of sessions for the engine */
private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = {
val numSessions = store.getSessionList.size
@ -362,92 +262,107 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") {
}
}
/**
* Returns parameter of this table.
*/
def getRequestTableParameters(
request: HttpServletRequest,
tableTag: String,
defaultSortColumn: String): (String, Boolean, Int) = {
val parameterSortColumn = request.getParameter(s"$tableTag.sort")
val parameterSortDesc = request.getParameter(s"$tableTag.desc")
val parameterPageSize = request.getParameter(s"$tableTag.pageSize")
val sortColumn = Option(parameterSortColumn).map { sortColumn =>
UIUtils.decodeURLParameter(sortColumn)
}.getOrElse(defaultSortColumn)
val desc = Option(parameterSortDesc).map(_.toBoolean).getOrElse(
sortColumn == defaultSortColumn
)
val pageSize = Option(parameterPageSize).map(_.toInt).getOrElse(100)
}
(sortColumn, desc, pageSize)
private class StatementStatsPagedTable(
request: HttpServletRequest,
parent: EngineTab,
data: Seq[SparkStatementEvent],
subPath: String,
basePath: String,
sqlStatsTableTag: String) extends PagedTable[SparkStatementEvent] {
private val (sortColumn, desc, pageSize) =
getRequestTableParameters(request, sqlStatsTableTag, "Create Time")
private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
private val parameterPath =
s"$basePath/$subPath/?${getRequestParameterOtherTable(request, sqlStatsTableTag)}"
override val dataSource = new StatementStatsTableDataSource(data, pageSize, sortColumn, desc)
override def tableId: String = sqlStatsTableTag
override def tableCssClass: String =
"table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"
override def pageLink(page: Int): String = {
parameterPath +
s"&$pageNumberFormField=$page" +
s"&$sqlStatsTableTag.sort=$encodedSortColumn" +
s"&$sqlStatsTableTag.desc=$desc" +
s"&$pageSizeFormField=$pageSize" +
s"#$sqlStatsTableTag"
}
/**
* Returns parameters of other tables in the page.
*/
def getRequestParameterOtherTable(request: HttpServletRequest, tableTag: String): String = {
request.getParameterMap.asScala
.filterNot(_._1.startsWith(tableTag))
.map(parameter => parameter._1 + "=" + parameter._2(0))
.mkString("&")
override def pageSizeFormField: String = s"$sqlStatsTableTag.pageSize"
override def pageNumberFormField: String = s"$sqlStatsTableTag.page"
override def goButtonFormPath: String =
s"$parameterPath&$sqlStatsTableTag.sort=$encodedSortColumn" +
s"&$sqlStatsTableTag.desc=$desc#$sqlStatsTableTag"
override def headers: Seq[Node] = {
val sqlTableHeadersAndTooltips: Seq[(String, Boolean, Option[String])] =
Seq(
("User", true, None),
("Statement ID", true, None),
("Create Time", true, None),
("Finish Time", true, None),
("Duration", true, None),
("Statement", true, None),
("State", true, None),
("Query Execution", true, None))
headerStatRow(sqlTableHeadersAndTooltips, desc, pageSize, sortColumn, parameterPath,
sqlStatsTableTag, sqlStatsTableTag)
}
def headerStatRow(
headerInfo: Seq[(String, Boolean, Option[String])],
desc: Boolean,
pageSize: Int,
sortColumn: String,
parameterPath: String,
tableTag: String,
headerId: String): Seq[Node] = {
val row: Seq[Node] = {
headerInfo.map { case (header, sortable, tooltip) =>
if (header == sortColumn) {
val headerLink = Unparsed(
parameterPath +
s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" +
s"&$tableTag.desc=${!desc}" +
s"&$tableTag.pageSize=$pageSize" +
s"#$headerId")
val arrow = if (desc) "&#x25BE;" else "&#x25B4;" // UP or DOWN
<th>
<a href={headerLink}>
<span data-toggle="tooltip" data-placement="top" title={tooltip.getOrElse("")}>
{header}&nbsp;{Unparsed(arrow)}
</span>
</a>
</th>
} else {
if (sortable) {
val headerLink = Unparsed(
parameterPath +
s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" +
s"&$tableTag.pageSize=$pageSize" +
s"#$headerId")
<th>
<a href={headerLink}>
<span data-toggle="tooltip" data-placement="top" title={tooltip.getOrElse("")}>
{header}
</span>
</a>
</th>
} else {
<th>
<span data-toggle="tooltip" data-placement="top" title={tooltip.getOrElse("")}>
{header}
</span>
</th>
}
}
}
}
<thead>
<tr>{row}</tr>
</thead>
override def row(sparkStatementEvent: SparkStatementEvent): Seq[Node] = {
<tr>
<td>
{sparkStatementEvent.username}
</td>
<td>
{sparkStatementEvent.statementId}
</td>
<td >
{formatDate(sparkStatementEvent.createTime)}
</td>
<td>
{if (sparkStatementEvent.endTime > 0) formatDate(sparkStatementEvent.endTime)}
</td>
<td >
{formatDurationVerbose(sparkStatementEvent.duration)}
</td>
<td>
<span class="description-input">
{sparkStatementEvent.statement}
</span>
</td>
<td>
{sparkStatementEvent.state}
</td>
{errorMessageCell(sparkStatementEvent.queryExecution)}
</tr>
}
private def errorMessageCell(errorMessage: String): Seq[Node] = {
val isMultiline = errorMessage.indexOf('\n') >= 0
val errorSummary = StringEscapeUtils.escapeHtml4(
if (isMultiline) {
errorMessage.substring(0, errorMessage.indexOf('\n'))
} else {
errorMessage
})
val details = detailsUINode(isMultiline, errorMessage)
<td>
{errorSummary}{details}
</td>
}
}
private class SessionStatsTableDataSource(
@ -520,3 +435,93 @@ private class StatementStatsTableDataSource(
}
}
}
private object TableSourceUtil {
/**
* Returns parameter of this table.
*/
def getRequestTableParameters(
request: HttpServletRequest,
tableTag: String,
defaultSortColumn: String): (String, Boolean, Int) = {
val parameterSortColumn = request.getParameter(s"$tableTag.sort")
val parameterSortDesc = request.getParameter(s"$tableTag.desc")
val parameterPageSize = request.getParameter(s"$tableTag.pageSize")
val sortColumn = Option(parameterSortColumn).map { sortColumn =>
UIUtils.decodeURLParameter(sortColumn)
}.getOrElse(defaultSortColumn)
val desc = Option(parameterSortDesc).map(_.toBoolean).getOrElse(
sortColumn == defaultSortColumn
)
val pageSize = Option(parameterPageSize).map(_.toInt).getOrElse(100)
(sortColumn, desc, pageSize)
}
/**
* Returns parameters of other tables in the page.
*/
def getRequestParameterOtherTable(request: HttpServletRequest, tableTag: String): String = {
request.getParameterMap.asScala
.filterNot(_._1.startsWith(tableTag))
.map(parameter => parameter._1 + "=" + parameter._2(0))
.mkString("&")
}
def headerStatRow(
headerInfo: Seq[(String, Boolean, Option[String])],
desc: Boolean,
pageSize: Int,
sortColumn: String,
parameterPath: String,
tableTag: String,
headerId: String): Seq[Node] = {
val row: Seq[Node] = {
headerInfo.map { case (header, sortable, tooltip) =>
if (header == sortColumn) {
val headerLink = Unparsed(
parameterPath +
s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" +
s"&$tableTag.desc=${!desc}" +
s"&$tableTag.pageSize=$pageSize" +
s"#$headerId")
val arrow = if (desc) "&#x25BE;" else "&#x25B4;" // UP or DOWN
<th>
<a href={headerLink}>
<span data-toggle="tooltip" data-placement="top" title={tooltip.getOrElse("")}>
{header}&nbsp;{Unparsed(arrow)}
</span>
</a>
</th>
} else {
if (sortable) {
val headerLink = Unparsed(
parameterPath +
s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" +
s"&$tableTag.pageSize=$pageSize" +
s"#$headerId")
<th>
<a href={headerLink}>
<span data-toggle="tooltip" data-placement="top" title={tooltip.getOrElse("")}>
{header}
</span>
</a>
</th>
} else {
<th>
<span data-toggle="tooltip" data-placement="top" title={tooltip.getOrElse("")}>
{header}
</span>
</th>
}
}
}
}
<thead>
<tr>{row}</tr>
</thead>
}
}

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.kyuubi.ui
import java.util.Date
import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.internal.Logging
import org.apache.spark.ui._
import org.apache.spark.ui.UIUtils._
import org.apache.spark.util.Utils
/** Page for Spark Web UI that shows statistics of jobs running in the engine server */
case class EngineSessionPage(parent: EngineTab)
extends WebUIPage("session") with Logging {
val store = parent.store
/** Render the page */
def render(request: HttpServletRequest): Seq[Node] = {
val parameterId = request.getParameter("id")
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
val content = store.synchronized { // make sure all parts in this page are consistent
val sessionStat = store.getSession(parameterId).getOrElse(null)
require(sessionStat != null, "Invalid sessionID[" + parameterId + "]")
generateBasicStats() ++
<br/> ++
<h4>
User {sessionStat.username},
IP {sessionStat.ip},
Session created at {formatDate(sessionStat.startTime)},
Total run {sessionStat.totalOperations} SQL
</h4> ++
generateSQLStatsTable(request, sessionStat.sessionId)
}
UIUtils.headerSparkPage(request, parent.name + " Session", content, parent)
}
/** Generate basic stats of the engine server */
private def generateBasicStats(): Seq[Node] = {
val timeSinceStart = System.currentTimeMillis() - parent.engine.getStartTime
<ul class ="list-unstyled">
<li>
<strong>Started at: </strong>
{new Date(parent.engine.getStartTime)}
</li>
<li>
<strong>Latest Logout at: </strong>
{new Date(parent.engine.backendService.sessionManager.latestLogoutTime)}
</li>
<li>
<strong>Time since start: </strong>
{formatDurationVerbose(timeSinceStart)}
</li>
</ul>
}
/** Generate stats of batch statements of the engine server */
private def generateSQLStatsTable(request: HttpServletRequest, sessionID: String): Seq[Node] = {
val executionList = store.getStatementList
.filter(_.sessionId == sessionID)
val numStatement = executionList.size
val table = if (numStatement > 0) {
val sqlTableTag = "sqlsessionstat"
val sqlTablePage =
Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1)
try {
Some(new StatementStatsPagedTable(
request,
parent,
executionList,
"kyuubi/session",
UIUtils.prependBaseUri(request, parent.basePath),
sqlTableTag
).table(sqlTablePage))
} catch {
case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
Some(<div class="alert alert-error">
<p>Error while rendering job table:</p>
<pre>
{Utils.exceptionString(e)}
</pre>
</div>)
}
} else {
None
}
val content =
<span id="sqlsessionstat" class="collapse-aggregated-sqlsessionstat collapse-table"
onClick="collapseTable('collapse-aggregated-sqlsessionstat',
'aggregated-sqlsessionstat')">
<h4>
<span class="collapse-table-arrow arrow-open"></span>
<a>SQL Statistics</a>
</h4>
</span> ++
<div class="aggregated-sqlsessionstat collapsible-table">
{table.getOrElse("No statistics have been generated yet.")}
</div>
content
}
}

View File

@ -39,6 +39,7 @@ case class EngineTab(engine: SparkSQLEngine, store: EngineEventsStore)
engine.spark.sparkContext.ui.foreach { ui =>
this.attachPage(EnginePage(this))
this.attachPage(EngineSessionPage(this))
ui.attachTab(this)
Utils.addShutdownHook(() => ui.detachTab(this))
}