From badd5d516e8df95bab99a41c65e426ad3705cc68 Mon Sep 17 00:00:00 2001 From: timothy65535 Date: Tue, 12 Oct 2021 13:44:04 +0800 Subject: [PATCH] [KYUUBI #1159] Add Session stats on Kyuubi Query Engine Page ### _Why are the changes needed?_ For more detail, please go to https://github.com/apache/incubator-kyuubi/issues/981 ![image](https://user-images.githubusercontent.com/86483005/135767720-0807c5dd-13ac-4812-a61a-38e0c8861d0c.png) ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1187 from timothy65535/ky-1159. Closes #1159 52daf139 [timothy65535] update conf md a9b50843 [timothy65535] improve patch 36676f2a [timothy65535] improve patch 892a6333 [timothy65535] [KYUUBI #1159] Add Session stats on Kyuubi Query Engine Page Authored-by: timothy65535 Signed-off-by: Kent Yao --- docs/deployment/settings.md | 2 +- .../kyuubi/engine/spark/SparkSQLEngine.scala | 11 +- .../engine/spark/events/SessionEvent.scala | 8 + .../apache/spark/kyuubi/ui/EnginePage.scala | 257 +++++++++++++++++- .../apache/spark/kyuubi/ui/EngineTab.scala | 3 +- .../spark/kyuubi/ui/EngineTabSuite.scala | 36 +++ .../org/apache/kyuubi/config/KyuubiConf.scala | 6 +- 7 files changed, 310 insertions(+), 13 deletions(-) diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index 977a878b4..237302aa5 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -178,7 +178,7 @@ kyuubi\.engine
\.deregister\.exception
\.messages|
PT30M
|
Time to live(TTL) for exceptions pattern specified in kyuubi.engine.deregister.exception.classes and kyuubi.engine.deregister.exception.messages to deregister engines. Once the total error count hits the kyuubi.engine.deregister.job.max.failures within the TTL, an engine will deregister itself and wait for self-terminated. Otherwise, we suppose that the engine has recovered from temporary failures.
|
duration
|
1.2.0
kyuubi\.engine
\.deregister\.job\.max
\.failures|
4
|
Number of failures of job before deregistering the engine.
|
int
|
1.2.0
kyuubi\.engine\.event
\.json\.log\.path|
file:/tmp/kyuubi/events
|
The location of all the engine events go for the builtin JSON logger.
  • Local Path: start with 'file:'
  • HDFS Path: start with 'hdfs:'
|
string
|
1.3.0
-kyuubi\.engine\.event
\.loggers|
|
A comma separated list of engine history loggers, where engine/session/operation etc events go.
  • SPARK: the events will be written to the spark history events
  • JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
  • JDBC: to be done
  • CUSTOM: to be done.
|
seq
|
1.3.0
+kyuubi\.engine\.event
\.loggers|
SPARK
|
A comma separated list of engine history loggers, where engine/session/operation etc events go. We use spark logger by default.
  • SPARK: the events will be written to the spark listener bus.
  • JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
  • JDBC: to be done
  • CUSTOM: to be done.
|
seq
|
1.3.0
kyuubi\.engine
\.initialize\.sql|
SHOW DATABASES
|
SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.
|
seq
|
1.2.0
kyuubi\.engine
\.operation\.log\.dir
\.root|
engine_operation_logs
|
Root directory for query operation log at engine-side.
|
string
|
1.4.0
kyuubi\.engine\.pool
\.size|
-1
|
The size of engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold).
|
int
|
1.4.0
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index d9941ba88..223e54a9a 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -38,13 +38,15 @@ import org.apache.kyuubi.ha.client.RetryPolicies import org.apache.kyuubi.service.Serverable import org.apache.kyuubi.util.SignalRegister -case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngine") { +case class SparkSQLEngine( + spark: SparkSession, + store: EngineEventsStore) extends Serverable("SparkSQLEngine") { override val backendService = new SparkSQLBackendService(spark) override val frontendServices = Seq(new SparkThriftBinaryFrontendService(this)) override def initialize(conf: KyuubiConf): Unit = { - val listener = new SparkSQLEngineListener(this, new EngineEventsStore(conf)) + val listener = new SparkSQLEngineListener(this, store) spark.sparkContext.addSparkListener(listener) super.initialize(conf) } @@ -109,7 +111,8 @@ object SparkSQLEngine extends Logging { } def startEngine(spark: SparkSession): Unit = { - currentEngine = Some(new SparkSQLEngine(spark)) + val store = new EngineEventsStore(kyuubiConf) + currentEngine = Some(new SparkSQLEngine(spark, store)) currentEngine.foreach { engine => // start event logging ahead so that we can capture all statuses val eventLogging = new EventLoggingService(spark.sparkContext) @@ -131,7 +134,7 @@ object SparkSQLEngine extends Logging { } try { engine.start() - EngineTab(engine) + EngineTab(engine, store) val event = EngineEvent(engine) info(event) EventLoggingService.onEvent(event) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala index 8b746ff99..2939adc7d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala @@ -45,6 +45,14 @@ case class SessionEvent( override def schema: StructType = Encoders.product[SessionEvent].schema override lazy val partitions: Seq[(String, String)] = ("day", Utils.getDateFromTimestamp(startTime)) :: Nil + + def duration: Long = { + if (endTime == -1L) { + System.currentTimeMillis - startTime + } else { + endTime - startTime + } + } } object SessionEvent { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala index f835ef7e5..1c80dd71c 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala @@ -17,15 +17,23 @@ package org.apache.spark.kyuubi.ui +import java.net.URLEncoder +import java.nio.charset.StandardCharsets.UTF_8 import java.util.Date import javax.servlet.http.HttpServletRequest -import scala.xml.Node +import scala.collection.JavaConverters.mapAsScalaMapConverter +import scala.xml.{Node, Unparsed} -import org.apache.spark.ui.{UIUtils, WebUIPage} -import org.apache.spark.ui.UIUtils.formatDurationVerbose +import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage} +import org.apache.spark.ui.UIUtils._ + +import org.apache.kyuubi.Utils +import org.apache.kyuubi.engine.spark.events.SessionEvent case class EnginePage(parent: EngineTab) extends WebUIPage("") { + private val store = parent.store + override def render(request: HttpServletRequest): Seq[Node] = { val content = generateBasicStats() ++ @@ -36,7 +44,8 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") { {parent.engine.backendService.sessionManager.getOpenSessionCount} session(s) are online, running {parent.engine.backendService.sessionManager.operationManager.getOperationCount} operations - + ++ + generateSessionStatsTable(request) UIUtils.headerSparkPage(request, parent.name, content, parent) } @@ -83,4 +92,244 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") { Seq.empty } } + + /** Generate stats of sessions for the engine */ + private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = { + val numSessions = store.getSessionList.size + val table = if (numSessions > 0) { + + val sessionTableTag = "sessionstat" + + val sessionTablePage = + Option(request.getParameter(s"$sessionTableTag.page")).map(_.toInt).getOrElse(1) + + try { + Some(new SessionStatsPagedTable( + request, + parent, + store.getSessionList, + "kyuubi", + UIUtils.prependBaseUri(request, parent.basePath), + sessionTableTag + ).table(sessionTablePage)) + } catch { + case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) => + Some(
+

Error while rendering job table:

+
+              {Utils.stringifyException(e)}
+            
+
) + } + } else { + None + } + + val content = + +

+ + Session Statistics ({numSessions}) +

+
++ +
+ {table.getOrElse("No statistics have been generated yet.")} +
+ + content + } + + private class SessionStatsPagedTable( + request: HttpServletRequest, + parent: EngineTab, + data: Seq[SessionEvent], + subPath: String, + basePath: String, + sessionStatsTableTag: String) extends PagedTable[SessionEvent] { + + private val (sortColumn, desc, pageSize) = + getRequestTableParameters(request, sessionStatsTableTag, "Start Time") + + private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) + + private val parameterPath = + s"$basePath/$subPath/?${getRequestParameterOtherTable(request, sessionStatsTableTag)}" + + override val dataSource = new SessionStatsTableDataSource(data, pageSize, sortColumn, desc) + + override def tableId: String = sessionStatsTableTag + + override def tableCssClass: String = + "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited" + + override def pageLink(page: Int): String = { + parameterPath + + s"&$pageNumberFormField=$page" + + s"&$sessionStatsTableTag.sort=$encodedSortColumn" + + s"&$sessionStatsTableTag.desc=$desc" + + s"&$pageSizeFormField=$pageSize" + + s"#$sessionStatsTableTag" + } + + override def pageSizeFormField: String = s"$sessionStatsTableTag.pageSize" + + override def pageNumberFormField: String = s"$sessionStatsTableTag.page" + + override def goButtonFormPath: String = + s"$parameterPath&$sessionStatsTableTag.sort=$encodedSortColumn" + + s"&$sessionStatsTableTag.desc=$desc#$sessionStatsTableTag" + + override def headers: Seq[Node] = { + val sessionTableHeadersAndTooltips: Seq[(String, Boolean, Option[String])] = + Seq( + ("User", true, None), + ("Client IP", true, None), + ("Session ID", true, None), + ("Start Time", true, None), + ("Finish Time", true, None), + ("Duration", true, None), + ("Total Statements", true, None)) + + headerStatRow(sessionTableHeadersAndTooltips, desc, pageSize, sortColumn, + parameterPath, sessionStatsTableTag, sessionStatsTableTag) + } + + override def row(session: SessionEvent): Seq[Node] = { + val sessionLink = "%s/%s/session/?id=%s".format( + UIUtils.prependBaseUri(request, parent.basePath), parent.prefix, session.sessionId) + + {session.username} + {session.ip} + {session.sessionId} + {formatDate(session.startTime)} + {if (session.endTime > 0) formatDate(session.endTime)} + {formatDurationVerbose(session.duration)} + {session.totalOperations} + + } + } + + /** + * Returns parameter of this table. + */ + def getRequestTableParameters( + request: HttpServletRequest, + tableTag: String, + defaultSortColumn: String): (String, Boolean, Int) = { + val parameterSortColumn = request.getParameter(s"$tableTag.sort") + val parameterSortDesc = request.getParameter(s"$tableTag.desc") + val parameterPageSize = request.getParameter(s"$tableTag.pageSize") + val sortColumn = Option(parameterSortColumn).map { sortColumn => + UIUtils.decodeURLParameter(sortColumn) + }.getOrElse(defaultSortColumn) + val desc = Option(parameterSortDesc).map(_.toBoolean).getOrElse( + sortColumn == defaultSortColumn + ) + val pageSize = Option(parameterPageSize).map(_.toInt).getOrElse(100) + + (sortColumn, desc, pageSize) + } + + /** + * Returns parameters of other tables in the page. + */ + def getRequestParameterOtherTable(request: HttpServletRequest, tableTag: String): String = { + request.getParameterMap.asScala + .filterNot(_._1.startsWith(tableTag)) + .map(parameter => parameter._1 + "=" + parameter._2(0)) + .mkString("&") + } + + def headerStatRow( + headerInfo: Seq[(String, Boolean, Option[String])], + desc: Boolean, + pageSize: Int, + sortColumn: String, + parameterPath: String, + tableTag: String, + headerId: String): Seq[Node] = { + val row: Seq[Node] = { + headerInfo.map { case (header, sortable, tooltip) => + if (header == sortColumn) { + val headerLink = Unparsed( + parameterPath + + s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + + s"&$tableTag.desc=${!desc}" + + s"&$tableTag.pageSize=$pageSize" + + s"#$headerId") + val arrow = if (desc) "▾" else "▴" // UP or DOWN + + + + + {header} {Unparsed(arrow)} + + + + } else { + if (sortable) { + val headerLink = Unparsed( + parameterPath + + s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + + s"&$tableTag.pageSize=$pageSize" + + s"#$headerId") + + + + + {header} + + + + } else { + + + {header} + + + } + } + } + } + + {row} + + } +} + +private class SessionStatsTableDataSource( + info: Seq[SessionEvent], + pageSize: Int, + sortColumn: String, + desc: Boolean) extends PagedDataSource[SessionEvent](pageSize) { + + // Sorting SessionEvent data + private val data = info.sorted(ordering(sortColumn, desc)) + + override def dataSize: Int = data.size + + override def sliceData(from: Int, to: Int): Seq[SessionEvent] = data.slice(from, to) + + /** + * Return Ordering according to sortColumn and desc. + */ + private def ordering(sortColumn: String, desc: Boolean): Ordering[SessionEvent] = { + val ordering: Ordering[SessionEvent] = sortColumn match { + case "User" => Ordering.by(_.username) + case "Client IP" => Ordering.by(_.ip) + case "Session ID" => Ordering.by(_.sessionId) + case "Start Time" => Ordering by (_.startTime) + case "Finish Time" => Ordering.by(_.endTime) + case "Duration" => Ordering.by(_.duration) + case "Total Statements" => Ordering.by(_.totalOperations) + case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") + } + if (desc) { + ordering.reverse + } else { + ordering + } + } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineTab.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineTab.scala index a18358407..26a3aa1cb 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineTab.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineTab.scala @@ -25,12 +25,13 @@ import scala.util.control.NonFatal import org.apache.kyuubi.{Logging, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.spark.SparkSQLEngine +import org.apache.kyuubi.engine.spark.events.EngineEventsStore import org.apache.kyuubi.service.ServiceState /** * Note that [[SparkUITab]] is private for Spark */ -case class EngineTab(engine: SparkSQLEngine) +case class EngineTab(engine: SparkSQLEngine, store: EngineEventsStore) extends SparkUITab(engine.spark.sparkContext.ui.orNull, "kyuubi") with Logging { override val name: String = "Kyuubi Query Engine" diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala index 3598b3d61..9f759a757 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala @@ -64,5 +64,41 @@ class EngineTabSuite extends WithSparkSQLEngine with JDBCTestUtils { } } + test("session stats for engine tab") { + assert(spark.sparkContext.ui.nonEmpty) + val client = HttpClients.createDefault() + val req = new HttpGet(spark.sparkContext.uiWebUrl.get + "/kyuubi/") + val response = client.execute(req) + assert(response.getStatusLine.getStatusCode === 200) + val resp = EntityUtils.toString(response.getEntity) + assert(resp.contains("0 session(s) are online,")) + withJdbcStatement() { statement => + statement.execute( + """ + |SELECT + | l.id % 100 k, + | sum(l.id) sum, + | count(l.id) cnt, + | avg(l.id) avg, + | min(l.id) min, + | max(l.id) max + |from range(0, 100000L, 1, 100) l + | left join range(0, 100000L, 2, 100) r ON l.id = r.id + |GROUP BY 1""".stripMargin) + val response = client.execute(req) + assert(response.getStatusLine.getStatusCode === 200) + val resp = EntityUtils.toString(response.getEntity) + + // check session section + assert(resp.contains("Session Statistics")) + + // check session stats table id + assert(resp.contains("sessionstat")) + + // check session stats table title + assert(resp.contains("Total Statements")) + } + } + override protected def jdbcUrl: String = getJdbcUrl } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index ba071ff0b..29d53c652 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -880,8 +880,8 @@ object KyuubiConf { val ENGINE_EVENT_LOGGERS: ConfigEntry[Seq[String]] = buildConf("engine.event.loggers") .doc("A comma separated list of engine history loggers, where engine/session/operation etc" + - " events go.
    " + - "
  • SPARK: the events will be written to the spark history events
  • " + + " events go. We use spark logger by default.
      " + + "
    • SPARK: the events will be written to the spark listener bus.
    • " + s"
    • JSON: the events will be written to the location of" + s" ${ENGINE_EVENT_JSON_LOG_PATH.key}
    • " + s"
    • JDBC: to be done
    • " + @@ -892,7 +892,7 @@ object KyuubiConf { .toSequence() .checkValue(_.toSet.subsetOf(Set("SPARK", "JSON", "JDBC", "CUSTOM")), "Unsupported event loggers") - .createWithDefault(Nil) + .createWithDefault(Seq("SPARK")) val ENGINE_UI_STOP_ENABLED: ConfigEntry[Boolean] = buildConf("engine.ui.stop.enabled")