diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala index cb8702df3..091149d21 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.SparkSession import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiConf._ +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.ShareLevel._ import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine} @@ -135,21 +136,23 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) password: String, ipAddress: String, conf: Map[String, String]): Session = { - val sparkSession = - try { - getOrNewSparkSession(user) - } catch { - case e: Exception => throw KyuubiSQLException(e) - } + getSessionOption(SessionHandle.fromUUID(conf(KYUUBI_SESSION_HANDLE_KEY))).getOrElse { + val sparkSession = + try { + getOrNewSparkSession(user) + } catch { + case e: Exception => throw KyuubiSQLException(e) + } - new SparkSessionImpl( - protocol, - user, - password, - ipAddress, - conf, - this, - sparkSession) + new SparkSessionImpl( + protocol, + user, + password, + ipAddress, + conf, + this, + sparkSession) + } } override def closeSession(sessionHandle: SessionHandle): Unit = { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala index 5bf1ec084..30155c8f2 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala @@ -21,13 +21,14 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtoco import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.spark.events.SessionEvent import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim import org.apache.kyuubi.engine.spark.udf.KDFRegistry import org.apache.kyuubi.events.EventBus import org.apache.kyuubi.operation.{Operation, OperationHandle} -import org.apache.kyuubi.session.{AbstractSession, SessionManager} +import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager} class SparkSessionImpl( protocol: TProtocolVersion, @@ -39,6 +40,8 @@ class SparkSessionImpl( val spark: SparkSession) extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) { + override val handle: SessionHandle = SessionHandle.fromUUID(conf(KYUUBI_SESSION_HANDLE_KEY)) + private def setModifiableConfig(key: String, value: String): Unit = { try { spark.conf.set(key, value) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala index dfbdd9449..85874d3b9 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala @@ -33,6 +33,7 @@ object KyuubiReservedKeys { final val KYUUBI_ENGINE_URL = "kyuubi.engine.url" final val KYUUBI_ENGINE_SUBMIT_TIME_KEY = "kyuubi.engine.submit.time" final val KYUUBI_ENGINE_CREDENTIALS_KEY = "kyuubi.engine.credentials" + final val KYUUBI_SESSION_HANDLE_KEY = "kyuubi.session.handle" final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_GUID = "kyuubi.session.engine.launch.handle.guid" final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_SECRET = diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala index babe73745..dbca8ca32 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala @@ -53,6 +53,9 @@ class KyuubiSyncThriftClient private ( private val lock = new ReentrantLock() + // Visible for testing. + private[kyuubi] def remoteSessionHandle: TSessionHandle = _remoteSessionHandle + @volatile private var _aliveProbeSessionHandle: TSessionHandle = _ @volatile private var remoteEngineBroken: Boolean = false private val engineAliveProbeClient = engineAliveProbeProtocol.map(new TCLIService.Client(_)) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index ce94b0275..e4203b301 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -27,7 +27,7 @@ import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.client.KyuubiSyncThriftClient import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ -import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KEY, KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_SIGN} +import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KEY, KYUUBI_SESSION_HANDLE_KEY, KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_SIGN} import org.apache.kyuubi.engine.{EngineRef, KyuubiApplicationManager} import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent} import org.apache.kyuubi.ha.client.DiscoveryClientProvider._ @@ -121,11 +121,12 @@ class KyuubiSessionImpl( private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit = handleSessionException { withDiscoveryClient(sessionConf) { discoveryClient => - var openEngineSessionConf = optimizedConf + var openEngineSessionConf = + optimizedConf ++ Map(KYUUBI_SESSION_HANDLE_KEY -> handle.identifier.toString) if (engineCredentials.nonEmpty) { sessionConf.set(KYUUBI_ENGINE_CREDENTIALS_KEY, engineCredentials) openEngineSessionConf = - optimizedConf ++ Map(KYUUBI_ENGINE_CREDENTIALS_KEY -> engineCredentials) + openEngineSessionConf ++ Map(KYUUBI_ENGINE_CREDENTIALS_KEY -> engineCredentials) } if (sessionConf.get(SESSION_USER_SIGN_ENABLED)) { diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala index 2ece2379a..26fc89ba2 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala @@ -364,4 +364,13 @@ class KyuubiOperationPerUserSuite val snapshot = MetricsSystem.histogramSnapshot(metric).get assert(snapshot.getMax > 0 && snapshot.getMedian > 0) } + + test("align the server session handle and engine session handle for Spark engine") { + withJdbcStatement() { _ => + val session = + server.backendService.sessionManager.allSessions().head.asInstanceOf[KyuubiSessionImpl] + val engineSessionHandle = SessionHandle.apply(session.client.remoteSessionHandle) + assert(session.handle === engineSessionHandle) + } + } }