From 0a7c45b8eec4998bb8ff35e65e2b805ea7370bf1 Mon Sep 17 00:00:00 2001 From: fwang12 Date: Mon, 27 Feb 2023 20:57:19 +0800 Subject: [PATCH] [KYUUBI #4412][FOLLOWUP] Align the server/engine session handle for flink/hive/trino/jdbc engines ### _Why are the changes needed?_ #4412 follow up ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4422 from turboFei/align_session_id. Closes #4412 319373296 [fwang12] save Authored-by: fwang12 Signed-off-by: fwang12 --- .../session/FlinkSQLSessionManager.scala | 20 +++++--- .../flink/session/FlinkSessionImpl.scala | 6 ++- .../hive/session/HiveSessionManager.scala | 50 +++++++++++-------- .../engine/jdbc/session/JdbcSessionImpl.scala | 6 ++- .../jdbc/session/JdbcSessionManager.scala | 6 ++- .../trino/session/TrinoSessionImpl.scala | 6 ++- .../trino/session/TrinoSessionManager.scala | 6 ++- 7 files changed, 65 insertions(+), 35 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala index 8a3fc7446..07971e39f 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala @@ -21,6 +21,7 @@ import org.apache.flink.table.client.gateway.context.DefaultContext import org.apache.flink.table.client.gateway.local.LocalExecutor import org.apache.hive.service.rpc.thrift.TProtocolVersion +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager} @@ -43,14 +44,17 @@ class FlinkSQLSessionManager(engineContext: DefaultContext) password: String, ipAddress: String, conf: Map[String, String]): Session = { - new FlinkSessionImpl( - protocol, - user, - password, - ipAddress, - conf, - this, - executor) + conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap( + getSessionOption).getOrElse { + new FlinkSessionImpl( + protocol, + user, + password, + ipAddress, + conf, + this, + executor) + } } override def closeSession(sessionHandle: SessionHandle): Unit = { diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala index 03d9ce42e..75087b48c 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala @@ -26,8 +26,9 @@ import org.apache.flink.table.client.gateway.local.LocalExecutor import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion} import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.flink.FlinkEngineUtils -import org.apache.kyuubi.session.{AbstractSession, SessionManager} +import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager} class FlinkSessionImpl( protocol: TProtocolVersion, @@ -39,6 +40,9 @@ class FlinkSessionImpl( val executor: LocalExecutor) extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) { + override val handle: SessionHandle = + conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle()) + lazy val sessionContext: SessionContext = { FlinkEngineUtils.getSessionContext(executor, handle.identifier.toString) } diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala index dc807429c..d09912770 100644 --- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala +++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala @@ -28,6 +28,7 @@ import org.apache.hive.service.cli.session.{HiveSessionImplwithUGI => ImportedHi import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.hive.HiveSQLEngine import org.apache.kyuubi.engine.hive.operation.HiveOperationManager @@ -72,33 +73,38 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess password: String, ipAddress: String, conf: Map[String, String]): Session = { - val sessionHandle = SessionHandle() - val hive = { - val sessionWithUGI = new ImportedHiveSessionImpl( - new ImportedSessionHandle(sessionHandle.toTSessionHandle, protocol), + conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap( + getSessionOption).getOrElse { + val sessionHandle = + conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle()) + val hive = { + val sessionWithUGI = new ImportedHiveSessionImpl( + new ImportedSessionHandle(sessionHandle.toTSessionHandle, protocol), + protocol, + user, + password, + HiveSQLEngine.hiveConf, + ipAddress, + null, + Seq(ipAddress).asJava) + val proxy = HiveSessionProxy.getProxy(sessionWithUGI, sessionWithUGI.getSessionUgi) + sessionWithUGI.setProxySession(proxy) + proxy + } + hive.setSessionManager(internalSessionManager) + hive.setOperationManager(internalSessionManager.getOperationManager) + operationLogRoot.foreach(dir => hive.setOperationLogSessionDir(new File(dir))) + new HiveSessionImpl( protocol, user, password, - HiveSQLEngine.hiveConf, ipAddress, - null, - Seq(ipAddress).asJava) - val proxy = HiveSessionProxy.getProxy(sessionWithUGI, sessionWithUGI.getSessionUgi) - sessionWithUGI.setProxySession(proxy) - proxy + conf, + this, + sessionHandle, + hive) } - hive.setSessionManager(internalSessionManager) - hive.setOperationManager(internalSessionManager.getOperationManager) - operationLogRoot.foreach(dir => hive.setOperationLogSessionDir(new File(dir))) - new HiveSessionImpl( - protocol, - user, - password, - ipAddress, - conf, - this, - sessionHandle, - hive) + } override def closeSession(sessionHandle: SessionHandle): Unit = { diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionImpl.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionImpl.scala index 63fb2dd07..f8cd40412 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionImpl.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionImpl.scala @@ -23,8 +23,9 @@ import scala.util.{Failure, Success, Try} import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion} import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider -import org.apache.kyuubi.session.{AbstractSession, SessionManager} +import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager} class JdbcSessionImpl( protocol: TProtocolVersion, @@ -35,6 +36,9 @@ class JdbcSessionImpl( sessionManager: SessionManager) extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) { + override val handle: SessionHandle = + conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle()) + private[jdbc] var sessionConnection: Connection = _ private var databaseMetaData: DatabaseMetaData = _ diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionManager.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionManager.scala index db8f60c3c..09958e050 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionManager.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionManager.scala @@ -20,6 +20,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.jdbc.JdbcSQLEngine import org.apache.kyuubi.engine.jdbc.operation.JdbcOperationManager @@ -46,7 +47,10 @@ class JdbcSessionManager(name: String) password: String, ipAddress: String, conf: Map[String, String]): Session = { - new JdbcSessionImpl(protocol, user, password, ipAddress, conf, this) + conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap( + getSessionOption).getOrElse { + new JdbcSessionImpl(protocol, user, password, ipAddress, conf, this) + } } override def closeSession(sessionHandle: SessionHandle): Unit = { diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala index a19d74d58..81f973b1b 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala @@ -30,11 +30,12 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtoco import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.Utils.currentUser import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys} +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.trino.{TrinoConf, TrinoContext, TrinoStatement} import org.apache.kyuubi.engine.trino.event.TrinoSessionEvent import org.apache.kyuubi.events.EventBus import org.apache.kyuubi.operation.{Operation, OperationHandle} -import org.apache.kyuubi.session.{AbstractSession, SessionManager} +import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager} class TrinoSessionImpl( protocol: TProtocolVersion, @@ -45,6 +46,9 @@ class TrinoSessionImpl( sessionManager: SessionManager) extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) { + override val handle: SessionHandle = + conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle()) + var trinoContext: TrinoContext = _ private var clientSession: ClientSession = _ private var catalogName: String = null diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala index 6d56d5c05..e18b8f758 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala @@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.trino.session import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.trino.TrinoSqlEngine import org.apache.kyuubi.engine.trino.operation.TrinoOperationManager @@ -36,7 +37,10 @@ class TrinoSessionManager password: String, ipAddress: String, conf: Map[String, String]): Session = { - new TrinoSessionImpl(protocol, user, password, ipAddress, conf, this) + conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap( + getSessionOption).getOrElse { + new TrinoSessionImpl(protocol, user, password, ipAddress, conf, this) + } } override def closeSession(sessionHandle: SessionHandle): Unit = {