[KYUUBI #1528] Record the kyuubi server ip address in event log
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Record the kyuubi server ip address in event log. For details: #1528 ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [X] Add screenshots for manual tests if appropriate - [X] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1530 from wForget/KYUUBI-1528. Closes #1528 6bc18571 [Wang Zhen] [KYUUBI-1528] Record the kyuubi server ip address in event log, set serverIpAddress in SessionEvent.apply 5f0307d1 [Wang Zhen] [KYUUBI-1528] Record the kyuubi server ip address in event log Authored-by: Wang Zhen <wangzhen07@qiyi.com> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
This commit is contained in:
parent
c30d580686
commit
c363311e39
@ -22,7 +22,7 @@ import org.apache.spark.sql.types.StructType
|
||||
|
||||
import org.apache.kyuubi.Utils
|
||||
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
|
||||
import org.apache.kyuubi.session.Session
|
||||
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
|
||||
|
||||
/**
|
||||
* Event Tracking for user sessions
|
||||
@ -31,6 +31,7 @@ import org.apache.kyuubi.session.Session
|
||||
* @param startTime Start time
|
||||
* @param endTime End time
|
||||
* @param ip Client IP address
|
||||
* @param serverIp Kyuubi Server IP address
|
||||
* @param totalOperations how many queries and meta calls
|
||||
*/
|
||||
case class SessionEvent(
|
||||
@ -38,6 +39,7 @@ case class SessionEvent(
|
||||
engineId: String,
|
||||
username: String,
|
||||
ip: String,
|
||||
serverIp: String,
|
||||
startTime: Long,
|
||||
var endTime: Long = -1L,
|
||||
var totalOperations: Int = 0) extends KyuubiSparkEvent {
|
||||
@ -56,12 +58,13 @@ case class SessionEvent(
|
||||
}
|
||||
|
||||
object SessionEvent {
|
||||
def apply(session: Session): SessionEvent = {
|
||||
def apply(session: SparkSessionImpl): SessionEvent = {
|
||||
new SessionEvent(
|
||||
session.handle.identifier.toString,
|
||||
KyuubiSparkUtil.engineId,
|
||||
session.user,
|
||||
session.ipAddress,
|
||||
session.serverIpAddress,
|
||||
session.createTime)
|
||||
}
|
||||
}
|
||||
|
||||
@ -79,6 +79,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
|
||||
protocol,
|
||||
user,
|
||||
password,
|
||||
ipAddress,
|
||||
clientIp,
|
||||
conf,
|
||||
this,
|
||||
|
||||
@ -30,6 +30,7 @@ class SparkSessionImpl(
|
||||
protocol: TProtocolVersion,
|
||||
user: String,
|
||||
password: String,
|
||||
serverIpAddress: String,
|
||||
ipAddress: String,
|
||||
conf: Map[String, String],
|
||||
sessionManager: SessionManager,
|
||||
@ -47,6 +48,8 @@ class SparkSessionImpl(
|
||||
|
||||
private val sessionEvent = SessionEvent(this)
|
||||
|
||||
def serverIpAddress(): String = serverIpAddress
|
||||
|
||||
override def open(): Unit = {
|
||||
normalizedConf.foreach {
|
||||
case ("use:database", database) => spark.catalog.setCurrentDatabase(database)
|
||||
|
||||
@ -240,6 +240,7 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") {
|
||||
Seq(
|
||||
("User", true, None),
|
||||
("Client IP", true, None),
|
||||
("Server IP", true, None),
|
||||
("Session ID", true, None),
|
||||
("Start Time", true, None),
|
||||
("Finish Time", true, None),
|
||||
@ -264,6 +265,7 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") {
|
||||
<tr>
|
||||
<td> {session.username} </td>
|
||||
<td> {session.ip} </td>
|
||||
<td> {session.serverIp} </td>
|
||||
<td> <a href={sessionLink}> {session.sessionId} </a> </td>
|
||||
<td> {formatDate(session.startTime)} </td>
|
||||
<td> {if (session.endTime > 0) formatDate(session.endTime)} </td>
|
||||
@ -402,6 +404,7 @@ private class SessionStatsTableDataSource(
|
||||
val ordering: Ordering[SessionEvent] = sortColumn match {
|
||||
case "User" => Ordering.by(_.username)
|
||||
case "Client IP" => Ordering.by(_.ip)
|
||||
case "Server IP" => Ordering.by(_.serverIp)
|
||||
case "Session ID" => Ordering.by(_.sessionId)
|
||||
case "Start Time" => Ordering.by(_.startTime)
|
||||
case "Finish Time" => Ordering.by(_.endTime)
|
||||
|
||||
@ -45,6 +45,7 @@ case class EngineSessionPage(parent: EngineTab)
|
||||
<h4>
|
||||
User {sessionStat.username},
|
||||
IP {sessionStat.ip},
|
||||
Server {sessionStat.serverIp},
|
||||
Session created at {formatDate(sessionStat.startTime)},
|
||||
Total run {sessionStat.totalOperations} SQL
|
||||
</h4> ++
|
||||
|
||||
@ -26,9 +26,9 @@ class EngineEventsStoreSuite extends KyuubiFunSuite {
|
||||
test("ensure that the sessions are stored in order") {
|
||||
val store = new EngineEventsStore(KyuubiConf())
|
||||
|
||||
val s1 = SessionEvent("a", "ea", "test1", "1.1.1.1", 1L)
|
||||
val s2 = SessionEvent("c", "ea", "test2", "1.1.1.1", 3L)
|
||||
val s3 = SessionEvent("b", "ea", "test3", "1.1.1.1", 2L)
|
||||
val s1 = SessionEvent("a", "ea", "test1", "1.1.1.1", "1.1.1.2", 1L)
|
||||
val s2 = SessionEvent("c", "ea", "test2", "1.1.1.1", "1.1.1.2", 3L)
|
||||
val s3 = SessionEvent("b", "ea", "test3", "1.1.1.1", "1.1.1.2", 2L)
|
||||
|
||||
store.saveSession(s1)
|
||||
store.saveSession(s2)
|
||||
@ -45,7 +45,7 @@ class EngineEventsStoreSuite extends KyuubiFunSuite {
|
||||
|
||||
val store = new EngineEventsStore(conf)
|
||||
for (i <- 1 to 5) {
|
||||
val s = SessionEvent(s"b$i", "ea", s"test$i", "1.1.1.1", 2L)
|
||||
val s = SessionEvent(s"b$i", "ea", s"test$i", "1.1.1.1", "1.1.1.2", 2L)
|
||||
store.saveSession(s)
|
||||
}
|
||||
|
||||
@ -58,10 +58,10 @@ class EngineEventsStoreSuite extends KyuubiFunSuite {
|
||||
|
||||
val store = new EngineEventsStore(conf)
|
||||
|
||||
store.saveSession(SessionEvent("s1", "ea", "test1", "1.1.1.1", 1L, -1L))
|
||||
store.saveSession(SessionEvent("s2", "ea", "test1", "1.1.1.1", 2L, -1L))
|
||||
store.saveSession(SessionEvent("s3", "ea", "test1", "1.1.1.1", 3L, 1L))
|
||||
store.saveSession(SessionEvent("s4", "ea", "test1", "1.1.1.1", 4L, -1L))
|
||||
store.saveSession(SessionEvent("s1", "ea", "test1", "1.1.1.1", "1.1.1.2", 1L, -1L))
|
||||
store.saveSession(SessionEvent("s2", "ea", "test1", "1.1.1.1", "1.1.1.2", 2L, -1L))
|
||||
store.saveSession(SessionEvent("s3", "ea", "test1", "1.1.1.1", "1.1.1.2", 3L, 1L))
|
||||
store.saveSession(SessionEvent("s4", "ea", "test1", "1.1.1.1", "1.1.1.2", 4L, -1L))
|
||||
|
||||
assert(store.getSessionList.size == 3)
|
||||
assert(store.getSessionList(2).sessionId == "s4")
|
||||
@ -69,7 +69,7 @@ class EngineEventsStoreSuite extends KyuubiFunSuite {
|
||||
|
||||
test("test check session after update session") {
|
||||
val store = new EngineEventsStore(KyuubiConf())
|
||||
val s = SessionEvent("abc", "ea", "test3", "1.1.1.1", 2L)
|
||||
val s = SessionEvent("abc", "ea", "test3", "1.1.1.1", "1.1.1.2", 2L)
|
||||
store.saveSession(s)
|
||||
|
||||
val finishTimestamp: Long = 456L
|
||||
|
||||
Loading…
Reference in New Issue
Block a user