From ff7a1221e0945cd7d8aea56c8cac85e426b60289 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 24 Sep 2020 11:59:55 +0800 Subject: [PATCH] handle client ip properly --- .../kyuubi/engine/spark/KyuubiSparkUtil.scala | 3 +-- .../session/SparkSQLSessionManager.scala | 2 +- .../engine/spark/WithSparkSQLEngine.scala | 9 ++++++- .../kyuubi/service/FrontendService.scala | 26 ++++++++++++++++++- .../KyuubiAuthenticationFactory.scala | 1 + 5 files changed, 36 insertions(+), 5 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala index e76e2f7d4..2aff3b94b 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala @@ -32,7 +32,6 @@ object KyuubiSparkUtil { | deploy mode: ${spark.sparkContext.deployMode} | version: ${spark.sparkContext.version} | Start time: ${Instant.ofEpochMilli(spark.sparkContext.startTime)} - | User: ${spark.sparkContext.sparkUser} - |""".stripMargin + | User: ${spark.sparkContext.sparkUser}""".stripMargin } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala index 42f0c5253..293230d9a 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala @@ -45,6 +45,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) password: String, ipAddress: String, conf: Map[String, String]): SessionHandle = { + info(s"Opening session for $user@$ipAddress") val sessionImpl = new SparkSessionImpl(protocol, user, password, ipAddress, conf, this) val handle = sessionImpl.handle try { @@ -68,4 +69,3 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) } } } - diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala index 296c3a7fc..2d0f31c10 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala @@ -62,7 +62,7 @@ trait WithSparkSQLEngine extends KyuubiFunSuite { Hive.closeCurrent() } - protected def jdbcUrl: String = s"jdbc:hive2://$connectionUrl/" + protected def jdbcUrl: String = s"jdbc:hive2://$connectionUrl/;" protected def withMultipleConnectionJdbcStatement( @@ -81,8 +81,12 @@ trait WithSparkSQLEngine extends KyuubiFunSuite { statements.head.execute(s"DROP TABLE IF EXISTS $name") } } + info("Closing statements") statements.foreach(_.close()) + info("Closed statements") connections.foreach(_.close()) + info("Closing connections") + } } @@ -97,8 +101,11 @@ trait WithSparkSQLEngine extends KyuubiFunSuite { dbNames.foreach { name => statements.head.execute(s"DROP DATABASE IF EXISTS $name") } + info("Closing statements") statements.foreach(_.close()) + info("Closed statements") connections.foreach(_.close()) + info("Closing connections") } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/FrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/FrontendService.scala index 1a6b40b9b..fee0230b1 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/FrontendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/FrontendService.scala @@ -172,6 +172,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } override def OpenSession(req: TOpenSessionReq): TOpenSessionResp = { + debug(req.toString) info("Client protocol version: " + req.getClient_protocol) val resp = new TOpenSessionResp try { @@ -189,9 +190,12 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } override def CloseSession(req: TCloseSessionReq): TCloseSessionResp = { + debug(req.toString) + val handle = SessionHandle(req.getSessionHandle) + info(s"Received request of closing $handle") val resp = new TCloseSessionResp try { - be.closeSession(SessionHandle(req.getSessionHandle)) + be.closeSession(handle) resp.setStatus(OK_STATUS) Option(CURRENT_SERVER_CONTEXT.get()).foreach(_.setSessionHandle(null)) } catch { @@ -199,10 +203,12 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab warn("Error closing session: ", e) resp.setStatus(KyuubiSQLException.toTStatus(e)) } + info(s"Finished closing $handle") resp } override def GetInfo(req: TGetInfoReq): TGetInfoResp = { + debug(req.toString) val resp = new TGetInfoResp try { val infoValue = be.getInfo(SessionHandle(req.getSessionHandle), req.getInfoType) @@ -217,6 +223,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } override def ExecuteStatement(req: TExecuteStatementReq): TExecuteStatementResp = { + debug(req.toString) val resp = new TExecuteStatementResp try { val sessionHandle = SessionHandle(req.getSessionHandle) @@ -240,6 +247,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } override def GetTypeInfo(req: TGetTypeInfoReq): TGetTypeInfoResp = { + debug(req.toString) val resp = new TGetTypeInfoResp try { val operationHandle = be.getTypeInfo(SessionHandle(req.getSessionHandle)) @@ -254,6 +262,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } override def GetCatalogs(req: TGetCatalogsReq): TGetCatalogsResp = { + debug(req.toString) val resp = new TGetCatalogsResp try { val opHandle = be.getCatalogs(SessionHandle(req.getSessionHandle)) @@ -268,6 +277,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } override def GetSchemas(req: TGetSchemasReq): TGetSchemasResp = { + debug(req.toString) val resp = new TGetSchemasResp try { val opHandle = be.getSchemas( @@ -283,6 +293,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } override def GetTables(req: TGetTablesReq): TGetTablesResp = { + debug(req.toString) val resp = new TGetTablesResp try { val sessionHandle = SessionHandle(req.getSessionHandle) @@ -302,6 +313,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } override def GetTableTypes(req: TGetTableTypesReq): TGetTableTypesResp = { + debug(req.toString) val resp = new TGetTableTypesResp try { val opHandle = be.getTableTypes(SessionHandle(req.getSessionHandle)) @@ -316,6 +328,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } override def GetColumns(req: TGetColumnsReq): TGetColumnsResp = { + debug(req.toString) val resp = new TGetColumnsResp try { val sessionHandle = SessionHandle(req.getSessionHandle) @@ -335,6 +348,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } override def GetFunctions(req: TGetFunctionsReq): TGetFunctionsResp = { + debug(req.toString) val resp = new TGetFunctionsResp try { val sessionHandle = SessionHandle(req.getSessionHandle) @@ -353,6 +367,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } override def GetPrimaryKeys(req: TGetPrimaryKeysReq): TGetPrimaryKeysResp = { + debug(req.toString) val resp = new TGetPrimaryKeysResp val errStatus = KyuubiSQLException("Feature is not available").toTStatus resp.setStatus(errStatus) @@ -360,6 +375,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } override def GetCrossReference(req: TGetCrossReferenceReq): TGetCrossReferenceResp = { + debug(req.toString) val resp = new TGetCrossReferenceResp val errStatus = KyuubiSQLException("Feature is not available").toTStatus resp.setStatus(errStatus) @@ -367,6 +383,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } override def GetOperationStatus(req: TGetOperationStatusReq): TGetOperationStatusResp = { + debug(req.toString) val resp = new TGetOperationStatusResp try { val operationHandle = OperationHandle(req.getOperationHandle) @@ -390,6 +407,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } override def CancelOperation(req: TCancelOperationReq): TCancelOperationResp = { + debug(req.toString) val resp = new TCancelOperationResp try { be.cancelOperation(OperationHandle(req.getOperationHandle)) @@ -403,6 +421,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } override def CloseOperation(req: TCloseOperationReq): TCloseOperationResp = { + debug(req.toString) val resp = new TCloseOperationResp try { be.closeOperation(OperationHandle(req.getOperationHandle)) @@ -416,6 +435,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } override def GetResultSetMetadata(req: TGetResultSetMetadataReq): TGetResultSetMetadataResp = { + debug(req.toString) val resp = new TGetResultSetMetadataResp try { val schema = be.getResultSetMetadata(OperationHandle(req.getOperationHandle)) @@ -430,6 +450,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } override def FetchResults(req: TFetchResultsReq): TFetchResultsResp = { + debug(req.toString) val resp = new TFetchResultsResp try { val operationHandle = OperationHandle(req.getOperationHandle) @@ -455,18 +476,21 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } override def GetDelegationToken(req: TGetDelegationTokenReq): TGetDelegationTokenResp = { + debug(req.toString) val resp = new TGetDelegationTokenResp() resp.setStatus(notSupportTokenErrorStatus) resp } override def CancelDelegationToken(req: TCancelDelegationTokenReq): TCancelDelegationTokenResp = { + debug(req.toString) val resp = new TCancelDelegationTokenResp resp.setStatus(notSupportTokenErrorStatus) resp } override def RenewDelegationToken(req: TRenewDelegationTokenReq): TRenewDelegationTokenResp = { + debug(req.toString) val resp = new TRenewDelegationTokenResp resp.setStatus(notSupportTokenErrorStatus) resp diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactory.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactory.scala index e39530633..776f50ac1 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactory.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactory.scala @@ -83,6 +83,7 @@ class KyuubiAuthenticationFactory(conf: KyuubiConf) { def getIpAddress: Option[String] = { saslServer.map(_.getRemoteAddress).map(_.getHostAddress) + .orElse(Option(TSetIpAddressProcessor.getUserIpAddress)) } } object KyuubiAuthenticationFactory {