diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala index 07dd99e10..555c6a399 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala @@ -26,6 +26,7 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter import scala.xml.{Node, Unparsed} import org.apache.commons.text.StringEscapeUtils +import org.apache.spark.kyuubi.ui.TableSourceUtil._ import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage} import org.apache.spark.ui.UIUtils._ @@ -143,107 +144,6 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") { content } - private class StatementStatsPagedTable( - request: HttpServletRequest, - parent: EngineTab, - data: Seq[SparkStatementEvent], - subPath: String, - basePath: String, - sqlStatsTableTag: String) extends PagedTable[SparkStatementEvent] { - - private val (sortColumn, desc, pageSize) = - getRequestTableParameters(request, sqlStatsTableTag, "Create Time") - - private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) - - private val parameterPath = - s"$basePath/$subPath/?${getRequestParameterOtherTable(request, sqlStatsTableTag)}" - - override val dataSource = new StatementStatsTableDataSource(data, pageSize, sortColumn, desc) - - override def tableId: String = sqlStatsTableTag - - override def tableCssClass: String = - "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited" - - override def pageLink(page: Int): String = { - parameterPath + - s"&$pageNumberFormField=$page" + - s"&$sqlStatsTableTag.sort=$encodedSortColumn" + - s"&$sqlStatsTableTag.desc=$desc" + - s"&$pageSizeFormField=$pageSize" + - s"#$sqlStatsTableTag" - } - - override def pageSizeFormField: String = s"$sqlStatsTableTag.pageSize" - - override def pageNumberFormField: String = s"$sqlStatsTableTag.page" - - override def goButtonFormPath: String = - s"$parameterPath&$sqlStatsTableTag.sort=$encodedSortColumn" + - s"&$sqlStatsTableTag.desc=$desc#$sqlStatsTableTag" - - override def headers: Seq[Node] = { - val sqlTableHeadersAndTooltips: Seq[(String, Boolean, Option[String])] = - Seq( - ("User", true, None), - ("Statement ID", true, None), - ("Create Time", true, None), - ("Finish Time", true, None), - ("Duration", true, None), - ("Statement", true, None), - ("State", true, None), - ("Query Execution", true, None)) - - headerStatRow(sqlTableHeadersAndTooltips, desc, pageSize, sortColumn, parameterPath, - sqlStatsTableTag, sqlStatsTableTag) - } - - override def row(sparkStatementEvent: SparkStatementEvent): Seq[Node] = { - - - {sparkStatementEvent.username} - - - {sparkStatementEvent.statementId} - - - {formatDate(sparkStatementEvent.createTime)} - - - {if (sparkStatementEvent.endTime > 0) formatDate(sparkStatementEvent.endTime)} - - - {formatDurationVerbose(sparkStatementEvent.duration)} - - - - {sparkStatementEvent.statement} - - - - {sparkStatementEvent.state} - - {errorMessageCell(sparkStatementEvent.queryExecution)} - - } - - private def errorMessageCell(errorMessage: String): Seq[Node] = { - val isMultiline = errorMessage.indexOf('\n') >= 0 - val errorSummary = StringEscapeUtils.escapeHtml4( - if (isMultiline) { - errorMessage.substring(0, errorMessage.indexOf('\n')) - } else { - errorMessage - }) - val details = detailsUINode(isMultiline, errorMessage) - - {errorSummary}{details} - - } - - } - /** Generate stats of sessions for the engine */ private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = { val numSessions = store.getSessionList.size @@ -362,92 +262,107 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") { } } - /** - * Returns parameter of this table. - */ - def getRequestTableParameters( - request: HttpServletRequest, - tableTag: String, - defaultSortColumn: String): (String, Boolean, Int) = { - val parameterSortColumn = request.getParameter(s"$tableTag.sort") - val parameterSortDesc = request.getParameter(s"$tableTag.desc") - val parameterPageSize = request.getParameter(s"$tableTag.pageSize") - val sortColumn = Option(parameterSortColumn).map { sortColumn => - UIUtils.decodeURLParameter(sortColumn) - }.getOrElse(defaultSortColumn) - val desc = Option(parameterSortDesc).map(_.toBoolean).getOrElse( - sortColumn == defaultSortColumn - ) - val pageSize = Option(parameterPageSize).map(_.toInt).getOrElse(100) +} - (sortColumn, desc, pageSize) +private class StatementStatsPagedTable( + request: HttpServletRequest, + parent: EngineTab, + data: Seq[SparkStatementEvent], + subPath: String, + basePath: String, + sqlStatsTableTag: String) extends PagedTable[SparkStatementEvent] { + + private val (sortColumn, desc, pageSize) = + getRequestTableParameters(request, sqlStatsTableTag, "Create Time") + + private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) + + private val parameterPath = + s"$basePath/$subPath/?${getRequestParameterOtherTable(request, sqlStatsTableTag)}" + + override val dataSource = new StatementStatsTableDataSource(data, pageSize, sortColumn, desc) + + override def tableId: String = sqlStatsTableTag + + override def tableCssClass: String = + "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited" + + override def pageLink(page: Int): String = { + parameterPath + + s"&$pageNumberFormField=$page" + + s"&$sqlStatsTableTag.sort=$encodedSortColumn" + + s"&$sqlStatsTableTag.desc=$desc" + + s"&$pageSizeFormField=$pageSize" + + s"#$sqlStatsTableTag" } - /** - * Returns parameters of other tables in the page. - */ - def getRequestParameterOtherTable(request: HttpServletRequest, tableTag: String): String = { - request.getParameterMap.asScala - .filterNot(_._1.startsWith(tableTag)) - .map(parameter => parameter._1 + "=" + parameter._2(0)) - .mkString("&") + override def pageSizeFormField: String = s"$sqlStatsTableTag.pageSize" + + override def pageNumberFormField: String = s"$sqlStatsTableTag.page" + + override def goButtonFormPath: String = + s"$parameterPath&$sqlStatsTableTag.sort=$encodedSortColumn" + + s"&$sqlStatsTableTag.desc=$desc#$sqlStatsTableTag" + + override def headers: Seq[Node] = { + val sqlTableHeadersAndTooltips: Seq[(String, Boolean, Option[String])] = + Seq( + ("User", true, None), + ("Statement ID", true, None), + ("Create Time", true, None), + ("Finish Time", true, None), + ("Duration", true, None), + ("Statement", true, None), + ("State", true, None), + ("Query Execution", true, None)) + + headerStatRow(sqlTableHeadersAndTooltips, desc, pageSize, sortColumn, parameterPath, + sqlStatsTableTag, sqlStatsTableTag) } - def headerStatRow( - headerInfo: Seq[(String, Boolean, Option[String])], - desc: Boolean, - pageSize: Int, - sortColumn: String, - parameterPath: String, - tableTag: String, - headerId: String): Seq[Node] = { - val row: Seq[Node] = { - headerInfo.map { case (header, sortable, tooltip) => - if (header == sortColumn) { - val headerLink = Unparsed( - parameterPath + - s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + - s"&$tableTag.desc=${!desc}" + - s"&$tableTag.pageSize=$pageSize" + - s"#$headerId") - val arrow = if (desc) "▾" else "▴" // UP or DOWN - - - - - {header} {Unparsed(arrow)} - - - - } else { - if (sortable) { - val headerLink = Unparsed( - parameterPath + - s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + - s"&$tableTag.pageSize=$pageSize" + - s"#$headerId") - - - - - {header} - - - - } else { - - - {header} - - - } - } - } - } - - {row} - + override def row(sparkStatementEvent: SparkStatementEvent): Seq[Node] = { + + + {sparkStatementEvent.username} + + + {sparkStatementEvent.statementId} + + + {formatDate(sparkStatementEvent.createTime)} + + + {if (sparkStatementEvent.endTime > 0) formatDate(sparkStatementEvent.endTime)} + + + {formatDurationVerbose(sparkStatementEvent.duration)} + + + + {sparkStatementEvent.statement} + + + + {sparkStatementEvent.state} + + {errorMessageCell(sparkStatementEvent.queryExecution)} + } + + private def errorMessageCell(errorMessage: String): Seq[Node] = { + val isMultiline = errorMessage.indexOf('\n') >= 0 + val errorSummary = StringEscapeUtils.escapeHtml4( + if (isMultiline) { + errorMessage.substring(0, errorMessage.indexOf('\n')) + } else { + errorMessage + }) + val details = detailsUINode(isMultiline, errorMessage) + + {errorSummary}{details} + + } + } private class SessionStatsTableDataSource( @@ -520,3 +435,93 @@ private class StatementStatsTableDataSource( } } } + +private object TableSourceUtil { + + /** + * Returns parameter of this table. + */ + def getRequestTableParameters( + request: HttpServletRequest, + tableTag: String, + defaultSortColumn: String): (String, Boolean, Int) = { + val parameterSortColumn = request.getParameter(s"$tableTag.sort") + val parameterSortDesc = request.getParameter(s"$tableTag.desc") + val parameterPageSize = request.getParameter(s"$tableTag.pageSize") + val sortColumn = Option(parameterSortColumn).map { sortColumn => + UIUtils.decodeURLParameter(sortColumn) + }.getOrElse(defaultSortColumn) + val desc = Option(parameterSortDesc).map(_.toBoolean).getOrElse( + sortColumn == defaultSortColumn + ) + val pageSize = Option(parameterPageSize).map(_.toInt).getOrElse(100) + + (sortColumn, desc, pageSize) + } + + /** + * Returns parameters of other tables in the page. + */ + def getRequestParameterOtherTable(request: HttpServletRequest, tableTag: String): String = { + request.getParameterMap.asScala + .filterNot(_._1.startsWith(tableTag)) + .map(parameter => parameter._1 + "=" + parameter._2(0)) + .mkString("&") + } + + def headerStatRow( + headerInfo: Seq[(String, Boolean, Option[String])], + desc: Boolean, + pageSize: Int, + sortColumn: String, + parameterPath: String, + tableTag: String, + headerId: String): Seq[Node] = { + val row: Seq[Node] = { + headerInfo.map { case (header, sortable, tooltip) => + if (header == sortColumn) { + val headerLink = Unparsed( + parameterPath + + s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + + s"&$tableTag.desc=${!desc}" + + s"&$tableTag.pageSize=$pageSize" + + s"#$headerId") + val arrow = if (desc) "▾" else "▴" // UP or DOWN + + + + + {header} {Unparsed(arrow)} + + + + } else { + if (sortable) { + val headerLink = Unparsed( + parameterPath + + s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + + s"&$tableTag.pageSize=$pageSize" + + s"#$headerId") + + + + + {header} + + + + } else { + + + {header} + + + } + } + } + } + + {row} + + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineSessionPage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineSessionPage.scala new file mode 100644 index 000000000..f2eaf992a --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineSessionPage.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kyuubi.ui + +import java.util.Date +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.spark.internal.Logging +import org.apache.spark.ui._ +import org.apache.spark.ui.UIUtils._ +import org.apache.spark.util.Utils + +/** Page for Spark Web UI that shows statistics of jobs running in the engine server */ +case class EngineSessionPage(parent: EngineTab) + extends WebUIPage("session") with Logging { + val store = parent.store + + /** Render the page */ + def render(request: HttpServletRequest): Seq[Node] = { + val parameterId = request.getParameter("id") + require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") + + val content = store.synchronized { // make sure all parts in this page are consistent + val sessionStat = store.getSession(parameterId).getOrElse(null) + require(sessionStat != null, "Invalid sessionID[" + parameterId + "]") + + generateBasicStats() ++ +
++ +

+ User {sessionStat.username}, + IP {sessionStat.ip}, + Session created at {formatDate(sessionStat.startTime)}, + Total run {sessionStat.totalOperations} SQL +

++ + generateSQLStatsTable(request, sessionStat.sessionId) + } + UIUtils.headerSparkPage(request, parent.name + " Session", content, parent) + } + + /** Generate basic stats of the engine server */ + private def generateBasicStats(): Seq[Node] = { + val timeSinceStart = System.currentTimeMillis() - parent.engine.getStartTime + + } + + /** Generate stats of batch statements of the engine server */ + private def generateSQLStatsTable(request: HttpServletRequest, sessionID: String): Seq[Node] = { + val executionList = store.getStatementList + .filter(_.sessionId == sessionID) + val numStatement = executionList.size + val table = if (numStatement > 0) { + + val sqlTableTag = "sqlsessionstat" + + val sqlTablePage = + Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1) + + try { + Some(new StatementStatsPagedTable( + request, + parent, + executionList, + "kyuubi/session", + UIUtils.prependBaseUri(request, parent.basePath), + sqlTableTag + ).table(sqlTablePage)) + } catch { + case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) => + Some(
+

Error while rendering job table:

+
+              {Utils.exceptionString(e)}
+            
+
) + } + } else { + None + } + val content = + +

+ + SQL Statistics +

+
++ +
+ {table.getOrElse("No statistics have been generated yet.")} +
+ + content + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineTab.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineTab.scala index 26a3aa1cb..5ccbe68d2 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineTab.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineTab.scala @@ -39,6 +39,7 @@ case class EngineTab(engine: SparkSQLEngine, store: EngineEventsStore) engine.spark.sparkContext.ui.foreach { ui => this.attachPage(EnginePage(this)) + this.attachPage(EngineSessionPage(this)) ui.attachTab(this) Utils.addShutdownHook(() => ui.detachTab(this)) }