[KYUUBI #4412][FOLLOWUP] Align the server/engine session handle for flink/hive/trino/jdbc engines

### _Why are the changes needed?_

#4412 follow up

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4422 from turboFei/align_session_id.

Closes #4412

319373296 [fwang12] save

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
fwang12 2023-02-27 20:57:19 +08:00
parent 7a9eb969ff
commit 0a7c45b8ee
7 changed files with 65 additions and 35 deletions

View File

@ -21,6 +21,7 @@ import org.apache.flink.table.client.gateway.context.DefaultContext
import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager}
@ -43,14 +44,17 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
password: String,
ipAddress: String,
conf: Map[String, String]): Session = {
new FlinkSessionImpl(
protocol,
user,
password,
ipAddress,
conf,
this,
executor)
conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap(
getSessionOption).getOrElse {
new FlinkSessionImpl(
protocol,
user,
password,
ipAddress,
conf,
this,
executor)
}
}
override def closeSession(sessionHandle: SessionHandle): Unit = {

View File

@ -26,8 +26,9 @@ import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.session.{AbstractSession, SessionManager}
import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
class FlinkSessionImpl(
protocol: TProtocolVersion,
@ -39,6 +40,9 @@ class FlinkSessionImpl(
val executor: LocalExecutor)
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
override val handle: SessionHandle =
conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
lazy val sessionContext: SessionContext = {
FlinkEngineUtils.getSessionContext(executor, handle.identifier.toString)
}

View File

@ -28,6 +28,7 @@ import org.apache.hive.service.cli.session.{HiveSessionImplwithUGI => ImportedHi
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.hive.HiveSQLEngine
import org.apache.kyuubi.engine.hive.operation.HiveOperationManager
@ -72,33 +73,38 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess
password: String,
ipAddress: String,
conf: Map[String, String]): Session = {
val sessionHandle = SessionHandle()
val hive = {
val sessionWithUGI = new ImportedHiveSessionImpl(
new ImportedSessionHandle(sessionHandle.toTSessionHandle, protocol),
conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap(
getSessionOption).getOrElse {
val sessionHandle =
conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
val hive = {
val sessionWithUGI = new ImportedHiveSessionImpl(
new ImportedSessionHandle(sessionHandle.toTSessionHandle, protocol),
protocol,
user,
password,
HiveSQLEngine.hiveConf,
ipAddress,
null,
Seq(ipAddress).asJava)
val proxy = HiveSessionProxy.getProxy(sessionWithUGI, sessionWithUGI.getSessionUgi)
sessionWithUGI.setProxySession(proxy)
proxy
}
hive.setSessionManager(internalSessionManager)
hive.setOperationManager(internalSessionManager.getOperationManager)
operationLogRoot.foreach(dir => hive.setOperationLogSessionDir(new File(dir)))
new HiveSessionImpl(
protocol,
user,
password,
HiveSQLEngine.hiveConf,
ipAddress,
null,
Seq(ipAddress).asJava)
val proxy = HiveSessionProxy.getProxy(sessionWithUGI, sessionWithUGI.getSessionUgi)
sessionWithUGI.setProxySession(proxy)
proxy
conf,
this,
sessionHandle,
hive)
}
hive.setSessionManager(internalSessionManager)
hive.setOperationManager(internalSessionManager.getOperationManager)
operationLogRoot.foreach(dir => hive.setOperationLogSessionDir(new File(dir)))
new HiveSessionImpl(
protocol,
user,
password,
ipAddress,
conf,
this,
sessionHandle,
hive)
}
override def closeSession(sessionHandle: SessionHandle): Unit = {

View File

@ -23,8 +23,9 @@ import scala.util.{Failure, Success, Try}
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider
import org.apache.kyuubi.session.{AbstractSession, SessionManager}
import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
class JdbcSessionImpl(
protocol: TProtocolVersion,
@ -35,6 +36,9 @@ class JdbcSessionImpl(
sessionManager: SessionManager)
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
override val handle: SessionHandle =
conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
private[jdbc] var sessionConnection: Connection = _
private var databaseMetaData: DatabaseMetaData = _

View File

@ -20,6 +20,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.jdbc.JdbcSQLEngine
import org.apache.kyuubi.engine.jdbc.operation.JdbcOperationManager
@ -46,7 +47,10 @@ class JdbcSessionManager(name: String)
password: String,
ipAddress: String,
conf: Map[String, String]): Session = {
new JdbcSessionImpl(protocol, user, password, ipAddress, conf, this)
conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap(
getSessionOption).getOrElse {
new JdbcSessionImpl(protocol, user, password, ipAddress, conf, this)
}
}
override def closeSession(sessionHandle: SessionHandle): Unit = {

View File

@ -30,11 +30,12 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtoco
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.Utils.currentUser
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.trino.{TrinoConf, TrinoContext, TrinoStatement}
import org.apache.kyuubi.engine.trino.event.TrinoSessionEvent
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.operation.{Operation, OperationHandle}
import org.apache.kyuubi.session.{AbstractSession, SessionManager}
import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
class TrinoSessionImpl(
protocol: TProtocolVersion,
@ -45,6 +46,9 @@ class TrinoSessionImpl(
sessionManager: SessionManager)
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
override val handle: SessionHandle =
conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
var trinoContext: TrinoContext = _
private var clientSession: ClientSession = _
private var catalogName: String = null

View File

@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.trino.session
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.trino.TrinoSqlEngine
import org.apache.kyuubi.engine.trino.operation.TrinoOperationManager
@ -36,7 +37,10 @@ class TrinoSessionManager
password: String,
ipAddress: String,
conf: Map[String, String]): Session = {
new TrinoSessionImpl(protocol, user, password, ipAddress, conf, this)
conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap(
getSessionOption).getOrElse {
new TrinoSessionImpl(protocol, user, password, ipAddress, conf, this)
}
}
override def closeSession(sessionHandle: SessionHandle): Unit = {