[KYUUBI #1613] Record the engine's applicationId in KyuubiSessionEvent
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Record the engine's applicationId in KyuubiSessionEvent. #1613 ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [X] Add screenshots for manual tests if appropriate  - [X] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1622 from wForget/KYUUBI-1613. Closes #1613 24e001f6 [Wang Zhen] comment 29453fc3 [Wang Zhen] [KYUUBI-1613] Record the engine's applicationId in KyuubiSessionEvent Authored-by: Wang Zhen <wangzhen07@qiyi.com> Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
parent
54a1b88245
commit
fbccba90f4
@ -20,12 +20,13 @@ package org.apache.kyuubi.engine.spark
|
||||
import org.apache.hadoop.io.Text
|
||||
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
|
||||
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
|
||||
import org.apache.hive.service.rpc.thrift.{TRenewDelegationTokenReq, TRenewDelegationTokenResp}
|
||||
import org.apache.hive.service.rpc.thrift.{TOpenSessionReq, TOpenSessionResp, TRenewDelegationTokenReq, TRenewDelegationTokenResp}
|
||||
import org.apache.spark.kyuubi.SparkContextHelper
|
||||
|
||||
import org.apache.kyuubi.KyuubiSQLException
|
||||
import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, ServiceDiscovery}
|
||||
import org.apache.kyuubi.service.{Serverable, Service, ThriftBinaryFrontendService}
|
||||
import org.apache.kyuubi.service.ThriftBinaryFrontendService.{CURRENT_SERVER_CONTEXT, OK_STATUS}
|
||||
import org.apache.kyuubi.util.KyuubiHadoopUtils
|
||||
|
||||
class SparkThriftBinaryFrontendService(
|
||||
@ -63,6 +64,27 @@ class SparkThriftBinaryFrontendService(
|
||||
resp
|
||||
}
|
||||
|
||||
override def OpenSession(req: TOpenSessionReq): TOpenSessionResp = {
|
||||
debug(req.toString)
|
||||
info("Client protocol version: " + req.getClient_protocol)
|
||||
val resp = new TOpenSessionResp
|
||||
try {
|
||||
val respConfiguration = new java.util.HashMap[String, String]()
|
||||
respConfiguration.put("kyuubi.engine.id", sc.applicationId)
|
||||
|
||||
val sessionHandle = getSessionHandle(req, resp)
|
||||
resp.setSessionHandle(sessionHandle.toTSessionHandle)
|
||||
resp.setConfiguration(respConfiguration)
|
||||
resp.setStatus(OK_STATUS)
|
||||
Option(CURRENT_SERVER_CONTEXT.get()).foreach(_.setSessionHandle(sessionHandle))
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
error("Error opening session: ", e)
|
||||
resp.setStatus(KyuubiSQLException.toTStatus(e, verbose = true))
|
||||
}
|
||||
resp
|
||||
}
|
||||
|
||||
private def addHiveToken(
|
||||
newTokens: Map[Text, Token[_ <: TokenIdentifier]],
|
||||
oldCreds: Credentials,
|
||||
|
||||
@ -38,6 +38,7 @@ class KyuubiSyncThriftClient private (protocol: TProtocol)
|
||||
extends TCLIService.Client(protocol) with Logging {
|
||||
|
||||
@volatile private var _remoteSessionHandle: TSessionHandle = _
|
||||
@volatile private var _engineId: Option[String] = _
|
||||
|
||||
private val lock = new ReentrantLock()
|
||||
|
||||
@ -54,6 +55,8 @@ class KyuubiSyncThriftClient private (protocol: TProtocol)
|
||||
} finally lock.unlock()
|
||||
}
|
||||
|
||||
def engineId: Option[String] = _engineId
|
||||
|
||||
/**
|
||||
* Return the engine SessionHandle for kyuubi session so that we can get the same session id
|
||||
*/
|
||||
@ -69,6 +72,9 @@ class KyuubiSyncThriftClient private (protocol: TProtocol)
|
||||
val resp = withLockAcquired(OpenSession(req))
|
||||
ThriftUtils.verifyTStatus(resp.getStatus)
|
||||
_remoteSessionHandle = resp.getSessionHandle
|
||||
_engineId = Option(resp.getConfiguration)
|
||||
.filter(_.containsKey("kyuubi.engine.id"))
|
||||
.map(_.get("kyuubi.engine.id"))
|
||||
SessionHandle(_remoteSessionHandle, protocol)
|
||||
}
|
||||
|
||||
|
||||
@ -36,6 +36,7 @@ import org.apache.kyuubi.session.AbstractSession
|
||||
* @param openedTime session opened time
|
||||
* @param endTime session end time
|
||||
* @param totalOperations how many queries and meta calls
|
||||
* @param engineId engine id. For engine on yarn, it is applicationId.
|
||||
*/
|
||||
case class KyuubiSessionEvent(
|
||||
sessionName: String,
|
||||
@ -46,6 +47,7 @@ case class KyuubiSessionEvent(
|
||||
startTime: Long,
|
||||
var sessionId: String = "",
|
||||
var remoteSessionId: String = "",
|
||||
var engineId: String = "",
|
||||
var clientVersion: Int = -1,
|
||||
var openedTime: Long = -1L,
|
||||
var endTime: Long = -1L,
|
||||
|
||||
@ -86,6 +86,7 @@ class KyuubiSessionImpl(
|
||||
sessionEvent.sessionId = handle.identifier.toString
|
||||
sessionEvent.remoteSessionId = _engineSessionHandle.identifier.toString
|
||||
sessionEvent.clientVersion = handle.protocol.getValue
|
||||
_client.engineId.foreach(e => sessionEvent.engineId = e)
|
||||
EventLoggingService.onEvent(sessionEvent)
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user