[KYUUBI #4412] Align the server session handle and engine session handle for Spark engine

### _Why are the changes needed?_

Align the server session handle and engine session handle for Spark engine.

It make it easy to recovery the engine session in any kyuubi instance easy.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4412 from turboFei/server_engine_handle_align.

Closes #4412

a20e0f155 [fwang12] fix
9d590e38b [fwang12] fix
94267e583 [fwang12] save
7012c2bef [fwang12] align

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
fwang12 2023-02-25 14:04:38 +08:00
parent 4277710174
commit c0241052ae
6 changed files with 38 additions and 18 deletions

View File

@ -24,6 +24,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel._
import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
@ -135,21 +136,23 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
password: String,
ipAddress: String,
conf: Map[String, String]): Session = {
val sparkSession =
try {
getOrNewSparkSession(user)
} catch {
case e: Exception => throw KyuubiSQLException(e)
}
getSessionOption(SessionHandle.fromUUID(conf(KYUUBI_SESSION_HANDLE_KEY))).getOrElse {
val sparkSession =
try {
getOrNewSparkSession(user)
} catch {
case e: Exception => throw KyuubiSQLException(e)
}
new SparkSessionImpl(
protocol,
user,
password,
ipAddress,
conf,
this,
sparkSession)
new SparkSessionImpl(
protocol,
user,
password,
ipAddress,
conf,
this,
sparkSession)
}
}
override def closeSession(sessionHandle: SessionHandle): Unit = {

View File

@ -21,13 +21,14 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtoco
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.spark.events.SessionEvent
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
import org.apache.kyuubi.engine.spark.udf.KDFRegistry
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.operation.{Operation, OperationHandle}
import org.apache.kyuubi.session.{AbstractSession, SessionManager}
import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
class SparkSessionImpl(
protocol: TProtocolVersion,
@ -39,6 +40,8 @@ class SparkSessionImpl(
val spark: SparkSession)
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
override val handle: SessionHandle = SessionHandle.fromUUID(conf(KYUUBI_SESSION_HANDLE_KEY))
private def setModifiableConfig(key: String, value: String): Unit = {
try {
spark.conf.set(key, value)

View File

@ -33,6 +33,7 @@ object KyuubiReservedKeys {
final val KYUUBI_ENGINE_URL = "kyuubi.engine.url"
final val KYUUBI_ENGINE_SUBMIT_TIME_KEY = "kyuubi.engine.submit.time"
final val KYUUBI_ENGINE_CREDENTIALS_KEY = "kyuubi.engine.credentials"
final val KYUUBI_SESSION_HANDLE_KEY = "kyuubi.session.handle"
final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_GUID =
"kyuubi.session.engine.launch.handle.guid"
final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_SECRET =

View File

@ -53,6 +53,9 @@ class KyuubiSyncThriftClient private (
private val lock = new ReentrantLock()
// Visible for testing.
private[kyuubi] def remoteSessionHandle: TSessionHandle = _remoteSessionHandle
@volatile private var _aliveProbeSessionHandle: TSessionHandle = _
@volatile private var remoteEngineBroken: Boolean = false
private val engineAliveProbeClient = engineAliveProbeProtocol.map(new TCLIService.Client(_))

View File

@ -27,7 +27,7 @@ import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.client.KyuubiSyncThriftClient
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KEY, KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_SIGN}
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KEY, KYUUBI_SESSION_HANDLE_KEY, KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_SIGN}
import org.apache.kyuubi.engine.{EngineRef, KyuubiApplicationManager}
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider._
@ -121,11 +121,12 @@ class KyuubiSessionImpl(
private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit =
handleSessionException {
withDiscoveryClient(sessionConf) { discoveryClient =>
var openEngineSessionConf = optimizedConf
var openEngineSessionConf =
optimizedConf ++ Map(KYUUBI_SESSION_HANDLE_KEY -> handle.identifier.toString)
if (engineCredentials.nonEmpty) {
sessionConf.set(KYUUBI_ENGINE_CREDENTIALS_KEY, engineCredentials)
openEngineSessionConf =
optimizedConf ++ Map(KYUUBI_ENGINE_CREDENTIALS_KEY -> engineCredentials)
openEngineSessionConf ++ Map(KYUUBI_ENGINE_CREDENTIALS_KEY -> engineCredentials)
}
if (sessionConf.get(SESSION_USER_SIGN_ENABLED)) {

View File

@ -364,4 +364,13 @@ class KyuubiOperationPerUserSuite
val snapshot = MetricsSystem.histogramSnapshot(metric).get
assert(snapshot.getMax > 0 && snapshot.getMedian > 0)
}
test("align the server session handle and engine session handle for Spark engine") {
withJdbcStatement() { _ =>
val session =
server.backendService.sessionManager.allSessions().head.asInstanceOf[KyuubiSessionImpl]
val engineSessionHandle = SessionHandle.apply(session.client.remoteSessionHandle)
assert(session.handle === engineSessionHandle)
}
}
}