From c363311e3938163f9e0b02c3b58525765ebc893a Mon Sep 17 00:00:00 2001 From: Wang Zhen Date: Thu, 9 Dec 2021 15:31:14 +0800 Subject: [PATCH] [KYUUBI #1528] Record the kyuubi server ip address in event log ### _Why are the changes needed?_ Record the kyuubi server ip address in event log. For details: #1528 ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [X] Add screenshots for manual tests if appropriate - [X] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1530 from wForget/KYUUBI-1528. Closes #1528 6bc18571 [Wang Zhen] [KYUUBI-1528] Record the kyuubi server ip address in event log, set serverIpAddress in SessionEvent.apply 5f0307d1 [Wang Zhen] [KYUUBI-1528] Record the kyuubi server ip address in event log Authored-by: Wang Zhen Signed-off-by: ulysses-you --- .../engine/spark/events/SessionEvent.scala | 7 +++++-- .../spark/session/SparkSQLSessionManager.scala | 1 + .../spark/session/SparkSessionImpl.scala | 3 +++ .../scala/org/apache/spark/ui/EnginePage.scala | 3 +++ .../apache/spark/ui/EngineSessionPage.scala | 1 + .../spark/events/EngineEventsStoreSuite.scala | 18 +++++++++--------- 6 files changed, 22 insertions(+), 11 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala index 2939adc7d..ce21c3936 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.types.StructType import org.apache.kyuubi.Utils import org.apache.kyuubi.engine.spark.KyuubiSparkUtil -import org.apache.kyuubi.session.Session +import org.apache.kyuubi.engine.spark.session.SparkSessionImpl /** * Event Tracking for user sessions @@ -31,6 +31,7 @@ import org.apache.kyuubi.session.Session * @param startTime Start time * @param endTime End time * @param ip Client IP address + * @param serverIp Kyuubi Server IP address * @param totalOperations how many queries and meta calls */ case class SessionEvent( @@ -38,6 +39,7 @@ case class SessionEvent( engineId: String, username: String, ip: String, + serverIp: String, startTime: Long, var endTime: Long = -1L, var totalOperations: Int = 0) extends KyuubiSparkEvent { @@ -56,12 +58,13 @@ case class SessionEvent( } object SessionEvent { - def apply(session: Session): SessionEvent = { + def apply(session: SparkSessionImpl): SessionEvent = { new SessionEvent( session.handle.identifier.toString, KyuubiSparkUtil.engineId, session.user, session.ipAddress, + session.serverIpAddress, session.createTime) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala index 693bbc60c..4a8769b9e 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala @@ -79,6 +79,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) protocol, user, password, + ipAddress, clientIp, conf, this, diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala index 6cc7cd947..98ca7d190 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala @@ -30,6 +30,7 @@ class SparkSessionImpl( protocol: TProtocolVersion, user: String, password: String, + serverIpAddress: String, ipAddress: String, conf: Map[String, String], sessionManager: SessionManager, @@ -47,6 +48,8 @@ class SparkSessionImpl( private val sessionEvent = SessionEvent(this) + def serverIpAddress(): String = serverIpAddress + override def open(): Unit = { normalizedConf.foreach { case ("use:database", database) => spark.catalog.setCurrentDatabase(database) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala index 8f5b79a79..32c7d6b35 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala @@ -240,6 +240,7 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") { Seq( ("User", true, None), ("Client IP", true, None), + ("Server IP", true, None), ("Session ID", true, None), ("Start Time", true, None), ("Finish Time", true, None), @@ -264,6 +265,7 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") { {session.username} {session.ip} + {session.serverIp} {session.sessionId} {formatDate(session.startTime)} {if (session.endTime > 0) formatDate(session.endTime)} @@ -402,6 +404,7 @@ private class SessionStatsTableDataSource( val ordering: Ordering[SessionEvent] = sortColumn match { case "User" => Ordering.by(_.username) case "Client IP" => Ordering.by(_.ip) + case "Server IP" => Ordering.by(_.serverIp) case "Session ID" => Ordering.by(_.sessionId) case "Start Time" => Ordering.by(_.startTime) case "Finish Time" => Ordering.by(_.endTime) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala index 8d3e828be..5d214596f 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala @@ -45,6 +45,7 @@ case class EngineSessionPage(parent: EngineTab)

User {sessionStat.username}, IP {sessionStat.ip}, + Server {sessionStat.serverIp}, Session created at {formatDate(sessionStat.startTime)}, Total run {sessionStat.totalOperations} SQL

++ diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala index 9166cd8fa..666d006b1 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala @@ -26,9 +26,9 @@ class EngineEventsStoreSuite extends KyuubiFunSuite { test("ensure that the sessions are stored in order") { val store = new EngineEventsStore(KyuubiConf()) - val s1 = SessionEvent("a", "ea", "test1", "1.1.1.1", 1L) - val s2 = SessionEvent("c", "ea", "test2", "1.1.1.1", 3L) - val s3 = SessionEvent("b", "ea", "test3", "1.1.1.1", 2L) + val s1 = SessionEvent("a", "ea", "test1", "1.1.1.1", "1.1.1.2", 1L) + val s2 = SessionEvent("c", "ea", "test2", "1.1.1.1", "1.1.1.2", 3L) + val s3 = SessionEvent("b", "ea", "test3", "1.1.1.1", "1.1.1.2", 2L) store.saveSession(s1) store.saveSession(s2) @@ -45,7 +45,7 @@ class EngineEventsStoreSuite extends KyuubiFunSuite { val store = new EngineEventsStore(conf) for (i <- 1 to 5) { - val s = SessionEvent(s"b$i", "ea", s"test$i", "1.1.1.1", 2L) + val s = SessionEvent(s"b$i", "ea", s"test$i", "1.1.1.1", "1.1.1.2", 2L) store.saveSession(s) } @@ -58,10 +58,10 @@ class EngineEventsStoreSuite extends KyuubiFunSuite { val store = new EngineEventsStore(conf) - store.saveSession(SessionEvent("s1", "ea", "test1", "1.1.1.1", 1L, -1L)) - store.saveSession(SessionEvent("s2", "ea", "test1", "1.1.1.1", 2L, -1L)) - store.saveSession(SessionEvent("s3", "ea", "test1", "1.1.1.1", 3L, 1L)) - store.saveSession(SessionEvent("s4", "ea", "test1", "1.1.1.1", 4L, -1L)) + store.saveSession(SessionEvent("s1", "ea", "test1", "1.1.1.1", "1.1.1.2", 1L, -1L)) + store.saveSession(SessionEvent("s2", "ea", "test1", "1.1.1.1", "1.1.1.2", 2L, -1L)) + store.saveSession(SessionEvent("s3", "ea", "test1", "1.1.1.1", "1.1.1.2", 3L, 1L)) + store.saveSession(SessionEvent("s4", "ea", "test1", "1.1.1.1", "1.1.1.2", 4L, -1L)) assert(store.getSessionList.size == 3) assert(store.getSessionList(2).sessionId == "s4") @@ -69,7 +69,7 @@ class EngineEventsStoreSuite extends KyuubiFunSuite { test("test check session after update session") { val store = new EngineEventsStore(KyuubiConf()) - val s = SessionEvent("abc", "ea", "test3", "1.1.1.1", 2L) + val s = SessionEvent("abc", "ea", "test3", "1.1.1.1", "1.1.1.2", 2L) store.saveSession(s) val finishTimestamp: Long = 456L