handle client ip properly

This commit is contained in:
Kent Yao 2020-09-24 11:59:55 +08:00
parent f8b8c6c172
commit ff7a1221e0
5 changed files with 36 additions and 5 deletions

View File

@ -32,7 +32,6 @@ object KyuubiSparkUtil {
| deploy mode: ${spark.sparkContext.deployMode}
| version: ${spark.sparkContext.version}
| Start time: ${Instant.ofEpochMilli(spark.sparkContext.startTime)}
| User: ${spark.sparkContext.sparkUser}
|""".stripMargin
| User: ${spark.sparkContext.sparkUser}""".stripMargin
}
}

View File

@ -45,6 +45,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
password: String,
ipAddress: String,
conf: Map[String, String]): SessionHandle = {
info(s"Opening session for $user@$ipAddress")
val sessionImpl = new SparkSessionImpl(protocol, user, password, ipAddress, conf, this)
val handle = sessionImpl.handle
try {
@ -68,4 +69,3 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
}
}
}

View File

@ -62,7 +62,7 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
Hive.closeCurrent()
}
protected def jdbcUrl: String = s"jdbc:hive2://$connectionUrl/"
protected def jdbcUrl: String = s"jdbc:hive2://$connectionUrl/;"
protected def withMultipleConnectionJdbcStatement(
@ -81,8 +81,12 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
statements.head.execute(s"DROP TABLE IF EXISTS $name")
}
}
info("Closing statements")
statements.foreach(_.close())
info("Closed statements")
connections.foreach(_.close())
info("Closing connections")
}
}
@ -97,8 +101,11 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
dbNames.foreach { name =>
statements.head.execute(s"DROP DATABASE IF EXISTS $name")
}
info("Closing statements")
statements.foreach(_.close())
info("Closed statements")
connections.foreach(_.close())
info("Closing connections")
}
}

View File

@ -172,6 +172,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
}
override def OpenSession(req: TOpenSessionReq): TOpenSessionResp = {
debug(req.toString)
info("Client protocol version: " + req.getClient_protocol)
val resp = new TOpenSessionResp
try {
@ -189,9 +190,12 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
}
override def CloseSession(req: TCloseSessionReq): TCloseSessionResp = {
debug(req.toString)
val handle = SessionHandle(req.getSessionHandle)
info(s"Received request of closing $handle")
val resp = new TCloseSessionResp
try {
be.closeSession(SessionHandle(req.getSessionHandle))
be.closeSession(handle)
resp.setStatus(OK_STATUS)
Option(CURRENT_SERVER_CONTEXT.get()).foreach(_.setSessionHandle(null))
} catch {
@ -199,10 +203,12 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
warn("Error closing session: ", e)
resp.setStatus(KyuubiSQLException.toTStatus(e))
}
info(s"Finished closing $handle")
resp
}
override def GetInfo(req: TGetInfoReq): TGetInfoResp = {
debug(req.toString)
val resp = new TGetInfoResp
try {
val infoValue = be.getInfo(SessionHandle(req.getSessionHandle), req.getInfoType)
@ -217,6 +223,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
}
override def ExecuteStatement(req: TExecuteStatementReq): TExecuteStatementResp = {
debug(req.toString)
val resp = new TExecuteStatementResp
try {
val sessionHandle = SessionHandle(req.getSessionHandle)
@ -240,6 +247,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
}
override def GetTypeInfo(req: TGetTypeInfoReq): TGetTypeInfoResp = {
debug(req.toString)
val resp = new TGetTypeInfoResp
try {
val operationHandle = be.getTypeInfo(SessionHandle(req.getSessionHandle))
@ -254,6 +262,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
}
override def GetCatalogs(req: TGetCatalogsReq): TGetCatalogsResp = {
debug(req.toString)
val resp = new TGetCatalogsResp
try {
val opHandle = be.getCatalogs(SessionHandle(req.getSessionHandle))
@ -268,6 +277,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
}
override def GetSchemas(req: TGetSchemasReq): TGetSchemasResp = {
debug(req.toString)
val resp = new TGetSchemasResp
try {
val opHandle = be.getSchemas(
@ -283,6 +293,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
}
override def GetTables(req: TGetTablesReq): TGetTablesResp = {
debug(req.toString)
val resp = new TGetTablesResp
try {
val sessionHandle = SessionHandle(req.getSessionHandle)
@ -302,6 +313,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
}
override def GetTableTypes(req: TGetTableTypesReq): TGetTableTypesResp = {
debug(req.toString)
val resp = new TGetTableTypesResp
try {
val opHandle = be.getTableTypes(SessionHandle(req.getSessionHandle))
@ -316,6 +328,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
}
override def GetColumns(req: TGetColumnsReq): TGetColumnsResp = {
debug(req.toString)
val resp = new TGetColumnsResp
try {
val sessionHandle = SessionHandle(req.getSessionHandle)
@ -335,6 +348,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
}
override def GetFunctions(req: TGetFunctionsReq): TGetFunctionsResp = {
debug(req.toString)
val resp = new TGetFunctionsResp
try {
val sessionHandle = SessionHandle(req.getSessionHandle)
@ -353,6 +367,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
}
override def GetPrimaryKeys(req: TGetPrimaryKeysReq): TGetPrimaryKeysResp = {
debug(req.toString)
val resp = new TGetPrimaryKeysResp
val errStatus = KyuubiSQLException("Feature is not available").toTStatus
resp.setStatus(errStatus)
@ -360,6 +375,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
}
override def GetCrossReference(req: TGetCrossReferenceReq): TGetCrossReferenceResp = {
debug(req.toString)
val resp = new TGetCrossReferenceResp
val errStatus = KyuubiSQLException("Feature is not available").toTStatus
resp.setStatus(errStatus)
@ -367,6 +383,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
}
override def GetOperationStatus(req: TGetOperationStatusReq): TGetOperationStatusResp = {
debug(req.toString)
val resp = new TGetOperationStatusResp
try {
val operationHandle = OperationHandle(req.getOperationHandle)
@ -390,6 +407,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
}
override def CancelOperation(req: TCancelOperationReq): TCancelOperationResp = {
debug(req.toString)
val resp = new TCancelOperationResp
try {
be.cancelOperation(OperationHandle(req.getOperationHandle))
@ -403,6 +421,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
}
override def CloseOperation(req: TCloseOperationReq): TCloseOperationResp = {
debug(req.toString)
val resp = new TCloseOperationResp
try {
be.closeOperation(OperationHandle(req.getOperationHandle))
@ -416,6 +435,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
}
override def GetResultSetMetadata(req: TGetResultSetMetadataReq): TGetResultSetMetadataResp = {
debug(req.toString)
val resp = new TGetResultSetMetadataResp
try {
val schema = be.getResultSetMetadata(OperationHandle(req.getOperationHandle))
@ -430,6 +450,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
}
override def FetchResults(req: TFetchResultsReq): TFetchResultsResp = {
debug(req.toString)
val resp = new TFetchResultsResp
try {
val operationHandle = OperationHandle(req.getOperationHandle)
@ -455,18 +476,21 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
}
override def GetDelegationToken(req: TGetDelegationTokenReq): TGetDelegationTokenResp = {
debug(req.toString)
val resp = new TGetDelegationTokenResp()
resp.setStatus(notSupportTokenErrorStatus)
resp
}
override def CancelDelegationToken(req: TCancelDelegationTokenReq): TCancelDelegationTokenResp = {
debug(req.toString)
val resp = new TCancelDelegationTokenResp
resp.setStatus(notSupportTokenErrorStatus)
resp
}
override def RenewDelegationToken(req: TRenewDelegationTokenReq): TRenewDelegationTokenResp = {
debug(req.toString)
val resp = new TRenewDelegationTokenResp
resp.setStatus(notSupportTokenErrorStatus)
resp

View File

@ -83,6 +83,7 @@ class KyuubiAuthenticationFactory(conf: KyuubiConf) {
def getIpAddress: Option[String] = {
saslServer.map(_.getRemoteAddress).map(_.getHostAddress)
.orElse(Option(TSetIpAddressProcessor.getUserIpAddress))
}
}
object KyuubiAuthenticationFactory {