diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala index 8a3fc7446..07971e39f 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala @@ -21,6 +21,7 @@ import org.apache.flink.table.client.gateway.context.DefaultContext import org.apache.flink.table.client.gateway.local.LocalExecutor import org.apache.hive.service.rpc.thrift.TProtocolVersion +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager} @@ -43,14 +44,17 @@ class FlinkSQLSessionManager(engineContext: DefaultContext) password: String, ipAddress: String, conf: Map[String, String]): Session = { - new FlinkSessionImpl( - protocol, - user, - password, - ipAddress, - conf, - this, - executor) + conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap( + getSessionOption).getOrElse { + new FlinkSessionImpl( + protocol, + user, + password, + ipAddress, + conf, + this, + executor) + } } override def closeSession(sessionHandle: SessionHandle): Unit = { diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala index 03d9ce42e..75087b48c 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala @@ -26,8 +26,9 @@ import org.apache.flink.table.client.gateway.local.LocalExecutor import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion} import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.flink.FlinkEngineUtils -import org.apache.kyuubi.session.{AbstractSession, SessionManager} +import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager} class FlinkSessionImpl( protocol: TProtocolVersion, @@ -39,6 +40,9 @@ class FlinkSessionImpl( val executor: LocalExecutor) extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) { + override val handle: SessionHandle = + conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle()) + lazy val sessionContext: SessionContext = { FlinkEngineUtils.getSessionContext(executor, handle.identifier.toString) } diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala index dc807429c..d09912770 100644 --- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala +++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala @@ -28,6 +28,7 @@ import org.apache.hive.service.cli.session.{HiveSessionImplwithUGI => ImportedHi import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.hive.HiveSQLEngine import org.apache.kyuubi.engine.hive.operation.HiveOperationManager @@ -72,33 +73,38 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess password: String, ipAddress: String, conf: Map[String, String]): Session = { - val sessionHandle = SessionHandle() - val hive = { - val sessionWithUGI = new ImportedHiveSessionImpl( - new ImportedSessionHandle(sessionHandle.toTSessionHandle, protocol), + conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap( + getSessionOption).getOrElse { + val sessionHandle = + conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle()) + val hive = { + val sessionWithUGI = new ImportedHiveSessionImpl( + new ImportedSessionHandle(sessionHandle.toTSessionHandle, protocol), + protocol, + user, + password, + HiveSQLEngine.hiveConf, + ipAddress, + null, + Seq(ipAddress).asJava) + val proxy = HiveSessionProxy.getProxy(sessionWithUGI, sessionWithUGI.getSessionUgi) + sessionWithUGI.setProxySession(proxy) + proxy + } + hive.setSessionManager(internalSessionManager) + hive.setOperationManager(internalSessionManager.getOperationManager) + operationLogRoot.foreach(dir => hive.setOperationLogSessionDir(new File(dir))) + new HiveSessionImpl( protocol, user, password, - HiveSQLEngine.hiveConf, ipAddress, - null, - Seq(ipAddress).asJava) - val proxy = HiveSessionProxy.getProxy(sessionWithUGI, sessionWithUGI.getSessionUgi) - sessionWithUGI.setProxySession(proxy) - proxy + conf, + this, + sessionHandle, + hive) } - hive.setSessionManager(internalSessionManager) - hive.setOperationManager(internalSessionManager.getOperationManager) - operationLogRoot.foreach(dir => hive.setOperationLogSessionDir(new File(dir))) - new HiveSessionImpl( - protocol, - user, - password, - ipAddress, - conf, - this, - sessionHandle, - hive) + } override def closeSession(sessionHandle: SessionHandle): Unit = { diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionImpl.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionImpl.scala index 63fb2dd07..f8cd40412 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionImpl.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionImpl.scala @@ -23,8 +23,9 @@ import scala.util.{Failure, Success, Try} import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion} import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider -import org.apache.kyuubi.session.{AbstractSession, SessionManager} +import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager} class JdbcSessionImpl( protocol: TProtocolVersion, @@ -35,6 +36,9 @@ class JdbcSessionImpl( sessionManager: SessionManager) extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) { + override val handle: SessionHandle = + conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle()) + private[jdbc] var sessionConnection: Connection = _ private var databaseMetaData: DatabaseMetaData = _ diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionManager.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionManager.scala index db8f60c3c..09958e050 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionManager.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionManager.scala @@ -20,6 +20,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.jdbc.JdbcSQLEngine import org.apache.kyuubi.engine.jdbc.operation.JdbcOperationManager @@ -46,7 +47,10 @@ class JdbcSessionManager(name: String) password: String, ipAddress: String, conf: Map[String, String]): Session = { - new JdbcSessionImpl(protocol, user, password, ipAddress, conf, this) + conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap( + getSessionOption).getOrElse { + new JdbcSessionImpl(protocol, user, password, ipAddress, conf, this) + } } override def closeSession(sessionHandle: SessionHandle): Unit = { diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala index a19d74d58..81f973b1b 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala @@ -30,11 +30,12 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtoco import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.Utils.currentUser import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys} +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.trino.{TrinoConf, TrinoContext, TrinoStatement} import org.apache.kyuubi.engine.trino.event.TrinoSessionEvent import org.apache.kyuubi.events.EventBus import org.apache.kyuubi.operation.{Operation, OperationHandle} -import org.apache.kyuubi.session.{AbstractSession, SessionManager} +import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager} class TrinoSessionImpl( protocol: TProtocolVersion, @@ -45,6 +46,9 @@ class TrinoSessionImpl( sessionManager: SessionManager) extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) { + override val handle: SessionHandle = + conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle()) + var trinoContext: TrinoContext = _ private var clientSession: ClientSession = _ private var catalogName: String = null diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala index 6d56d5c05..e18b8f758 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala @@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.trino.session import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.trino.TrinoSqlEngine import org.apache.kyuubi.engine.trino.operation.TrinoOperationManager @@ -36,7 +37,10 @@ class TrinoSessionManager password: String, ipAddress: String, conf: Map[String, String]): Session = { - new TrinoSessionImpl(protocol, user, password, ipAddress, conf, this) + conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap( + getSessionOption).getOrElse { + new TrinoSessionImpl(protocol, user, password, ipAddress, conf, this) + } } override def closeSession(sessionHandle: SessionHandle): Unit = {