diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 977a878b4..237302aa5 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -178,7 +178,7 @@ kyuubi\.engine
\.deregister\.exception
\.messages|
PT30M
|Time to live(TTL) for exceptions pattern specified in kyuubi.engine.deregister.exception.classes and kyuubi.engine.deregister.exception.messages to deregister engines. Once the total error count hits the kyuubi.engine.deregister.job.max.failures within the TTL, an engine will deregister itself and wait for self-terminated. Otherwise, we suppose that the engine has recovered from temporary failures.
|duration
|1.2.0
kyuubi\.engine
\.deregister\.job\.max
\.failures|4
|Number of failures of job before deregistering the engine.
|int
|1.2.0
kyuubi\.engine\.event
\.json\.log\.path|file:/tmp/kyuubi/events
|The location of all the engine events go for the builtin JSON logger.
- Local Path: start with 'file:'
- HDFS Path: start with 'hdfs:'
|string
|1.3.0
-kyuubi\.engine\.event
\.loggers||A comma separated list of engine history loggers, where engine/session/operation etc events go.
- SPARK: the events will be written to the spark history events
- JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
- JDBC: to be done
- CUSTOM: to be done.
|seq
|1.3.0
+kyuubi\.engine\.event
\.loggers|SPARK
|A comma separated list of engine history loggers, where engine/session/operation etc events go. We use spark logger by default.
- SPARK: the events will be written to the spark listener bus.
- JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
- JDBC: to be done
- CUSTOM: to be done.
|seq
|1.3.0
kyuubi\.engine
\.initialize\.sql|SHOW DATABASES
|SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.
|seq
|1.2.0
kyuubi\.engine
\.operation\.log\.dir
\.root|engine_operation_logs
|Root directory for query operation log at engine-side.
|string
|1.4.0
kyuubi\.engine\.pool
\.size|-1
|The size of engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold).
|int
|1.4.0
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index d9941ba88..223e54a9a 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -38,13 +38,15 @@ import org.apache.kyuubi.ha.client.RetryPolicies
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.SignalRegister
-case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngine") {
+case class SparkSQLEngine(
+ spark: SparkSession,
+ store: EngineEventsStore) extends Serverable("SparkSQLEngine") {
override val backendService = new SparkSQLBackendService(spark)
override val frontendServices = Seq(new SparkThriftBinaryFrontendService(this))
override def initialize(conf: KyuubiConf): Unit = {
- val listener = new SparkSQLEngineListener(this, new EngineEventsStore(conf))
+ val listener = new SparkSQLEngineListener(this, store)
spark.sparkContext.addSparkListener(listener)
super.initialize(conf)
}
@@ -109,7 +111,8 @@ object SparkSQLEngine extends Logging {
}
def startEngine(spark: SparkSession): Unit = {
- currentEngine = Some(new SparkSQLEngine(spark))
+ val store = new EngineEventsStore(kyuubiConf)
+ currentEngine = Some(new SparkSQLEngine(spark, store))
currentEngine.foreach { engine =>
// start event logging ahead so that we can capture all statuses
val eventLogging = new EventLoggingService(spark.sparkContext)
@@ -131,7 +134,7 @@ object SparkSQLEngine extends Logging {
}
try {
engine.start()
- EngineTab(engine)
+ EngineTab(engine, store)
val event = EngineEvent(engine)
info(event)
EventLoggingService.onEvent(event)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
index 8b746ff99..2939adc7d 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
@@ -45,6 +45,14 @@ case class SessionEvent(
override def schema: StructType = Encoders.product[SessionEvent].schema
override lazy val partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(startTime)) :: Nil
+
+ def duration: Long = {
+ if (endTime == -1L) {
+ System.currentTimeMillis - startTime
+ } else {
+ endTime - startTime
+ }
+ }
}
object SessionEvent {
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala
index f835ef7e5..1c80dd71c 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala
@@ -17,15 +17,23 @@
package org.apache.spark.kyuubi.ui
+import java.net.URLEncoder
+import java.nio.charset.StandardCharsets.UTF_8
import java.util.Date
import javax.servlet.http.HttpServletRequest
-import scala.xml.Node
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+import scala.xml.{Node, Unparsed}
-import org.apache.spark.ui.{UIUtils, WebUIPage}
-import org.apache.spark.ui.UIUtils.formatDurationVerbose
+import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage}
+import org.apache.spark.ui.UIUtils._
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.engine.spark.events.SessionEvent
case class EnginePage(parent: EngineTab) extends WebUIPage("") {
+ private val store = parent.store
+
override def render(request: HttpServletRequest): Seq[Node] = {
val content =
generateBasicStats() ++
@@ -36,7 +44,8 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") {
{parent.engine.backendService.sessionManager.getOpenSessionCount} session(s) are online,
running {parent.engine.backendService.sessionManager.operationManager.getOperationCount}
operations
-
+ ++
+ generateSessionStatsTable(request)
UIUtils.headerSparkPage(request, parent.name, content, parent)
}
@@ -83,4 +92,244 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") {
Seq.empty
}
}
+
+ /** Generate stats of sessions for the engine */
+ private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = {
+ val numSessions = store.getSessionList.size
+ val table = if (numSessions > 0) {
+
+ val sessionTableTag = "sessionstat"
+
+ val sessionTablePage =
+ Option(request.getParameter(s"$sessionTableTag.page")).map(_.toInt).getOrElse(1)
+
+ try {
+ Some(new SessionStatsPagedTable(
+ request,
+ parent,
+ store.getSessionList,
+ "kyuubi",
+ UIUtils.prependBaseUri(request, parent.basePath),
+ sessionTableTag
+ ).table(sessionTablePage))
+ } catch {
+ case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
+ Some(
+
Error while rendering job table:
+
+ {Utils.stringifyException(e)}
+
+
)
+ }
+ } else {
+ None
+ }
+
+ val content =
+
+
+ ++
+
+ {table.getOrElse("No statistics have been generated yet.")}
+
+
+ content
+ }
+
+ private class SessionStatsPagedTable(
+ request: HttpServletRequest,
+ parent: EngineTab,
+ data: Seq[SessionEvent],
+ subPath: String,
+ basePath: String,
+ sessionStatsTableTag: String) extends PagedTable[SessionEvent] {
+
+ private val (sortColumn, desc, pageSize) =
+ getRequestTableParameters(request, sessionStatsTableTag, "Start Time")
+
+ private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
+
+ private val parameterPath =
+ s"$basePath/$subPath/?${getRequestParameterOtherTable(request, sessionStatsTableTag)}"
+
+ override val dataSource = new SessionStatsTableDataSource(data, pageSize, sortColumn, desc)
+
+ override def tableId: String = sessionStatsTableTag
+
+ override def tableCssClass: String =
+ "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"
+
+ override def pageLink(page: Int): String = {
+ parameterPath +
+ s"&$pageNumberFormField=$page" +
+ s"&$sessionStatsTableTag.sort=$encodedSortColumn" +
+ s"&$sessionStatsTableTag.desc=$desc" +
+ s"&$pageSizeFormField=$pageSize" +
+ s"#$sessionStatsTableTag"
+ }
+
+ override def pageSizeFormField: String = s"$sessionStatsTableTag.pageSize"
+
+ override def pageNumberFormField: String = s"$sessionStatsTableTag.page"
+
+ override def goButtonFormPath: String =
+ s"$parameterPath&$sessionStatsTableTag.sort=$encodedSortColumn" +
+ s"&$sessionStatsTableTag.desc=$desc#$sessionStatsTableTag"
+
+ override def headers: Seq[Node] = {
+ val sessionTableHeadersAndTooltips: Seq[(String, Boolean, Option[String])] =
+ Seq(
+ ("User", true, None),
+ ("Client IP", true, None),
+ ("Session ID", true, None),
+ ("Start Time", true, None),
+ ("Finish Time", true, None),
+ ("Duration", true, None),
+ ("Total Statements", true, None))
+
+ headerStatRow(sessionTableHeadersAndTooltips, desc, pageSize, sortColumn,
+ parameterPath, sessionStatsTableTag, sessionStatsTableTag)
+ }
+
+ override def row(session: SessionEvent): Seq[Node] = {
+ val sessionLink = "%s/%s/session/?id=%s".format(
+ UIUtils.prependBaseUri(request, parent.basePath), parent.prefix, session.sessionId)
+
+ | {session.username} |
+ {session.ip} |
+ {session.sessionId} |
+ {formatDate(session.startTime)} |
+ {if (session.endTime > 0) formatDate(session.endTime)} |
+ {formatDurationVerbose(session.duration)} |
+ {session.totalOperations} |
+
+ }
+ }
+
+ /**
+ * Returns parameter of this table.
+ */
+ def getRequestTableParameters(
+ request: HttpServletRequest,
+ tableTag: String,
+ defaultSortColumn: String): (String, Boolean, Int) = {
+ val parameterSortColumn = request.getParameter(s"$tableTag.sort")
+ val parameterSortDesc = request.getParameter(s"$tableTag.desc")
+ val parameterPageSize = request.getParameter(s"$tableTag.pageSize")
+ val sortColumn = Option(parameterSortColumn).map { sortColumn =>
+ UIUtils.decodeURLParameter(sortColumn)
+ }.getOrElse(defaultSortColumn)
+ val desc = Option(parameterSortDesc).map(_.toBoolean).getOrElse(
+ sortColumn == defaultSortColumn
+ )
+ val pageSize = Option(parameterPageSize).map(_.toInt).getOrElse(100)
+
+ (sortColumn, desc, pageSize)
+ }
+
+ /**
+ * Returns parameters of other tables in the page.
+ */
+ def getRequestParameterOtherTable(request: HttpServletRequest, tableTag: String): String = {
+ request.getParameterMap.asScala
+ .filterNot(_._1.startsWith(tableTag))
+ .map(parameter => parameter._1 + "=" + parameter._2(0))
+ .mkString("&")
+ }
+
+ def headerStatRow(
+ headerInfo: Seq[(String, Boolean, Option[String])],
+ desc: Boolean,
+ pageSize: Int,
+ sortColumn: String,
+ parameterPath: String,
+ tableTag: String,
+ headerId: String): Seq[Node] = {
+ val row: Seq[Node] = {
+ headerInfo.map { case (header, sortable, tooltip) =>
+ if (header == sortColumn) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" +
+ s"&$tableTag.desc=${!desc}" +
+ s"&$tableTag.pageSize=$pageSize" +
+ s"#$headerId")
+ val arrow = if (desc) "▾" else "▴" // UP or DOWN
+
+
+
+
+ {header} {Unparsed(arrow)}
+
+
+ |
+ } else {
+ if (sortable) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" +
+ s"&$tableTag.pageSize=$pageSize" +
+ s"#$headerId")
+
+
+
+
+ {header}
+
+
+ |
+ } else {
+
+
+ {header}
+
+ |
+ }
+ }
+ }
+ }
+
+ {row}
+
+ }
+}
+
+private class SessionStatsTableDataSource(
+ info: Seq[SessionEvent],
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean) extends PagedDataSource[SessionEvent](pageSize) {
+
+ // Sorting SessionEvent data
+ private val data = info.sorted(ordering(sortColumn, desc))
+
+ override def dataSize: Int = data.size
+
+ override def sliceData(from: Int, to: Int): Seq[SessionEvent] = data.slice(from, to)
+
+ /**
+ * Return Ordering according to sortColumn and desc.
+ */
+ private def ordering(sortColumn: String, desc: Boolean): Ordering[SessionEvent] = {
+ val ordering: Ordering[SessionEvent] = sortColumn match {
+ case "User" => Ordering.by(_.username)
+ case "Client IP" => Ordering.by(_.ip)
+ case "Session ID" => Ordering.by(_.sessionId)
+ case "Start Time" => Ordering by (_.startTime)
+ case "Finish Time" => Ordering.by(_.endTime)
+ case "Duration" => Ordering.by(_.duration)
+ case "Total Statements" => Ordering.by(_.totalOperations)
+ case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
+ }
+ if (desc) {
+ ordering.reverse
+ } else {
+ ordering
+ }
+ }
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineTab.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineTab.scala
index a18358407..26a3aa1cb 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineTab.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineTab.scala
@@ -25,12 +25,13 @@ import scala.util.control.NonFatal
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.SparkSQLEngine
+import org.apache.kyuubi.engine.spark.events.EngineEventsStore
import org.apache.kyuubi.service.ServiceState
/**
* Note that [[SparkUITab]] is private for Spark
*/
-case class EngineTab(engine: SparkSQLEngine)
+case class EngineTab(engine: SparkSQLEngine, store: EngineEventsStore)
extends SparkUITab(engine.spark.sparkContext.ui.orNull, "kyuubi") with Logging {
override val name: String = "Kyuubi Query Engine"
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala
index 3598b3d61..9f759a757 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala
@@ -64,5 +64,41 @@ class EngineTabSuite extends WithSparkSQLEngine with JDBCTestUtils {
}
}
+ test("session stats for engine tab") {
+ assert(spark.sparkContext.ui.nonEmpty)
+ val client = HttpClients.createDefault()
+ val req = new HttpGet(spark.sparkContext.uiWebUrl.get + "/kyuubi/")
+ val response = client.execute(req)
+ assert(response.getStatusLine.getStatusCode === 200)
+ val resp = EntityUtils.toString(response.getEntity)
+ assert(resp.contains("0 session(s) are online,"))
+ withJdbcStatement() { statement =>
+ statement.execute(
+ """
+ |SELECT
+ | l.id % 100 k,
+ | sum(l.id) sum,
+ | count(l.id) cnt,
+ | avg(l.id) avg,
+ | min(l.id) min,
+ | max(l.id) max
+ |from range(0, 100000L, 1, 100) l
+ | left join range(0, 100000L, 2, 100) r ON l.id = r.id
+ |GROUP BY 1""".stripMargin)
+ val response = client.execute(req)
+ assert(response.getStatusLine.getStatusCode === 200)
+ val resp = EntityUtils.toString(response.getEntity)
+
+ // check session section
+ assert(resp.contains("Session Statistics"))
+
+ // check session stats table id
+ assert(resp.contains("sessionstat"))
+
+ // check session stats table title
+ assert(resp.contains("Total Statements"))
+ }
+ }
+
override protected def jdbcUrl: String = getJdbcUrl
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index ba071ff0b..29d53c652 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -880,8 +880,8 @@ object KyuubiConf {
val ENGINE_EVENT_LOGGERS: ConfigEntry[Seq[String]] =
buildConf("engine.event.loggers")
.doc("A comma separated list of engine history loggers, where engine/session/operation etc" +
- " events go." +
- " - SPARK: the events will be written to the spark history events
" +
+ " events go. We use spark logger by default." +
+ " - SPARK: the events will be written to the spark listener bus.
" +
s" - JSON: the events will be written to the location of" +
s" ${ENGINE_EVENT_JSON_LOG_PATH.key}
" +
s" - JDBC: to be done
" +
@@ -892,7 +892,7 @@ object KyuubiConf {
.toSequence()
.checkValue(_.toSet.subsetOf(Set("SPARK", "JSON", "JDBC", "CUSTOM")),
"Unsupported event loggers")
- .createWithDefault(Nil)
+ .createWithDefault(Seq("SPARK"))
val ENGINE_UI_STOP_ENABLED: ConfigEntry[Boolean] =
buildConf("engine.ui.stop.enabled")