[KYUUBI #1987] Support preserve user context in group/server share level

### _Why are the changes needed?_

close #1987

note that, this pr only for Spark engine

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #2435 from ulysses-you/kyuubi-1987.

Closes #1987

85826a14 [ulysses-you] style
9ddcc0c9 [ulysses-you] shutdown
a568054e [ulysses-you] release
d1339056 [ulysses-you] address comment
8e958ff2 [ulysses-you] docs
dd9b4422 [ulysses-you] Support preserve user context in group share level

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
This commit is contained in:
ulysses-you 2022-04-25 18:02:32 +08:00 committed by ulysses-you
parent 9fd62a2143
commit 7cede6fd19
No known key found for this signature in database
GPG Key ID: 4C500BC62D576766
8 changed files with 233 additions and 32 deletions

View File

@ -228,6 +228,9 @@ Key | Default | Meaning | Type | Since
<code>kyuubi.engine.ui.retainedSessions</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>200</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The number of SQL client sessions kept in the Kyuubi Query Engine web UI.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
<code>kyuubi.engine.ui.retainedStatements</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>200</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The number of statements kept in the Kyuubi Query Engine web UI.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
<code>kyuubi.engine.ui.stop.enabled</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When true, allows Kyuubi engine to be killed from the Spark Web UI.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.3.0</div>
<code>kyuubi.engine.user.isolated.spark.session</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When set to false, if the engine is running in a group or server share level, all the JDBC/ODBC connections will be isolated against the user. Including: the temporary views, function registries, SQL configuration and the current database. Note that, it does not affect if the share level is connection or user.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.6.0</div>
<code>kyuubi.engine.user.isolated.spark.session.idle.interval</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The interval to check if the user isolated spark session is timeout.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.6.0</div>
<code>kyuubi.engine.user.isolated.spark.session.idle.timeout</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT6H</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>If kyuubi.engine.user.isolated.spark.session is false, we will release the spark session if its corresponding user is inactive after this configured timeout.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.6.0</div>
### Frontend

View File

@ -25,9 +25,9 @@ import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.kvstore.KVIndex
import org.apache.kyuubi.Utils
import org.apache.kyuubi.{Logging, Utils}
object KyuubiSparkUtil {
object KyuubiSparkUtil extends Logging {
type KVIndexParam = KVIndex @getter
@ -36,6 +36,21 @@ object KyuubiSparkUtil {
def globalSparkContext: SparkContext = SparkSession.active.sparkContext
def initializeSparkSession(spark: SparkSession, initializationSQLs: Seq[String]): Unit = {
initializationSQLs.foreach { sql =>
spark.sparkContext.setJobGroup(
"initialization sql queries",
sql,
interruptOnCancel = true)
debug(s"Execute initialization sql: $sql")
try {
spark.sql(sql).isEmpty
} finally {
spark.sparkContext.clearJobGroup()
}
}
}
def engineId: String = globalSparkContext.applicationId
lazy val diagnostics: String = {

View File

@ -187,16 +187,9 @@ object SparkSQLEngine extends Logging {
def createSpark(): SparkSession = {
val session = SparkSession.builder.config(_sparkConf).getOrCreate
(kyuubiConf.get(ENGINE_INITIALIZE_SQL) ++ kyuubiConf.get(ENGINE_SESSION_INITIALIZE_SQL))
.foreach { sqlStr =>
session.sparkContext.setJobGroup(
"engine_initializing_queries",
sqlStr,
interruptOnCancel = true)
debug(s"Execute session initializing sql: $sqlStr")
session.sql(sqlStr).isEmpty
session.sparkContext.clearJobGroup()
}
KyuubiSparkUtil.initializeSparkSession(
session,
kyuubiConf.get(ENGINE_INITIALIZE_SQL) ++ kyuubiConf.get(ENGINE_SESSION_INITIALIZE_SQL))
session
}

View File

@ -17,6 +17,8 @@
package org.apache.kyuubi.engine.spark.session
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.spark.sql.SparkSession
@ -24,9 +26,11 @@ import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.spark.SparkSQLEngine
import org.apache.kyuubi.engine.ShareLevel._
import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.session._
import org.apache.kyuubi.util.ThreadUtils
/**
* A [[SessionManager]] constructed with [[SparkSession]] which give it the ability to talk with
@ -50,6 +54,87 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
val operationManager = new SparkSQLOperationManager()
private lazy val singleSparkSession = conf.get(ENGINE_SINGLE_SPARK_SESSION)
private lazy val shareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL))
private lazy val userIsolatedSparkSession = conf.get(ENGINE_USER_ISOLATED_SPARK_SESSION)
private lazy val userIsolatedIdleInterval =
conf.get(ENGINE_USER_ISOLATED_SPARK_SESSION_IDLE_INTERVAL)
private lazy val userIsolatedIdleTimeout =
conf.get(ENGINE_USER_ISOLATED_SPARK_SESSION_IDLE_TIMEOUT)
private val userIsolatedCacheLock = new Object
private lazy val userIsolatedCache = new java.util.HashMap[String, SparkSession]()
private lazy val userIsolatedCacheCount =
new java.util.HashMap[String, (Integer, java.lang.Long)]()
private var userIsolatedSparkSessionThread: Option[ScheduledExecutorService] = None
private def startUserIsolatedCacheChecker(): Unit = {
if (!userIsolatedSparkSession) {
userIsolatedSparkSessionThread =
Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor("user-isolated-cache-checker"))
userIsolatedSparkSessionThread.foreach {
_.scheduleWithFixedDelay(
() => {
userIsolatedCacheLock.synchronized {
val iter = userIsolatedCacheCount.entrySet().iterator()
while (iter.hasNext) {
val kv = iter.next()
if (kv.getValue._1 == 0 &&
kv.getValue._2 + userIsolatedIdleTimeout < System.currentTimeMillis()) {
userIsolatedCache.remove(kv.getKey)
iter.remove()
}
}
}
},
userIsolatedIdleInterval,
userIsolatedIdleInterval,
TimeUnit.MILLISECONDS)
}
}
}
override def start(): Unit = {
startUserIsolatedCacheChecker()
super.start()
}
override def stop(): Unit = {
super.stop()
userIsolatedSparkSessionThread.foreach(_.shutdown())
}
private def getOrNewSparkSession(user: String): SparkSession = {
if (singleSparkSession) {
spark
} else {
shareLevel match {
// it's unnecessary to create a new spark session in connection share level
// since the session is only one
case CONNECTION => spark
case USER => newSparkSession(spark)
case GROUP | SERVER if userIsolatedSparkSession => newSparkSession(spark)
case GROUP | SERVER =>
userIsolatedCacheLock.synchronized {
if (userIsolatedCache.containsKey(user)) {
val (count, _) = userIsolatedCacheCount.get(user)
userIsolatedCacheCount.put(user, (count + 1, System.currentTimeMillis()))
userIsolatedCache.get(user)
} else {
userIsolatedCacheCount.put(user, (1, System.currentTimeMillis()))
val newSession = newSparkSession(spark)
userIsolatedCache.put(user, newSession)
newSession
}
}
}
}
}
private def newSparkSession(rootSparkSession: SparkSession): SparkSession = {
val newSparkSession = rootSparkSession.newSession()
KyuubiSparkUtil.initializeSparkSession(newSparkSession, conf.get(ENGINE_SESSION_INITIALIZE_SQL))
newSparkSession
}
override protected def createSession(
protocol: TProtocolVersion,
@ -60,21 +145,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
val clientIp = conf.getOrElse(CLIENT_IP_KEY, ipAddress)
val sparkSession =
try {
if (singleSparkSession) {
spark
} else {
val ss = spark.newSession()
this.conf.get(ENGINE_SESSION_INITIALIZE_SQL).foreach { sqlStr =>
ss.sparkContext.setJobGroup(
"engine_initializing_queries",
sqlStr,
interruptOnCancel = true)
debug(s"Execute session initializing sql: $sqlStr")
ss.sql(sqlStr).isEmpty
ss.sparkContext.clearJobGroup()
}
ss
}
getOrNewSparkSession(user)
} catch {
case e: Exception => throw KyuubiSQLException(e)
}
@ -91,8 +162,19 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
if (!userIsolatedSparkSession) {
val session = getSession(sessionHandle)
if (session != null) {
userIsolatedCacheLock.synchronized {
if (userIsolatedCacheCount.containsKey(session.user)) {
val (count, _) = userIsolatedCacheCount.get(session.user)
userIsolatedCacheCount.put(session.user, (count - 1, System.currentTimeMillis()))
}
}
}
}
super.closeSession(sessionHandle)
if (conf.get(ENGINE_SHARE_LEVEL) == ShareLevel.CONNECTION.toString) {
if (shareLevel == ShareLevel.CONNECTION) {
info("Session stopped due to shared level is Connection.")
stopSession()
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.session
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsReq, TOpenSessionReq}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
import org.apache.kyuubi.operation.HiveJDBCTestHelper
class UserIsolatedSessionSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
override def withKyuubiConf: Map[String, String] = {
Map(
ENGINE_SHARE_LEVEL.key -> "GROUP",
ENGINE_USER_ISOLATED_SPARK_SESSION.key -> "false",
ENGINE_USER_ISOLATED_SPARK_SESSION_IDLE_INTERVAL.key -> "100",
ENGINE_USER_ISOLATED_SPARK_SESSION_IDLE_TIMEOUT.key -> "5000")
}
override protected def jdbcUrl: String =
s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;#spark.ui.enabled=false"
private def executeSetStatement(user: String, statement: String): String = {
withThriftClient(Some(user)) { client =>
val req = new TOpenSessionReq()
req.setUsername(user)
req.setPassword("anonymous")
val tOpenSessionResp = client.OpenSession(req)
val tExecuteStatementReq = new TExecuteStatementReq()
tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
tExecuteStatementReq.setStatement(statement)
tExecuteStatementReq.setRunAsync(false)
val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
val operationHandle = tExecuteStatementResp.getOperationHandle
val tFetchResultsReq = new TFetchResultsReq()
tFetchResultsReq.setOperationHandle(operationHandle)
tFetchResultsReq.setFetchType(0)
tFetchResultsReq.setMaxRows(1)
val tFetchResultsResp = client.FetchResults(tFetchResultsReq)
tFetchResultsResp.getResults.getColumns.get(1).getStringVal.getValues.get(0)
}
}
test("isolated user spark session") {
executeSetStatement("user1", "set a=1")
assert(executeSetStatement("user1", "set a") == "1")
assert(executeSetStatement("user1", "set a") == "1")
assert(executeSetStatement("user2", "set a") == "<undefined>")
executeSetStatement("user2", "set a=2")
assert(executeSetStatement("user1", "set a") == "1")
assert(executeSetStatement("user2", "set a") == "2")
Thread.sleep(6000)
assert(executeSetStatement("user1", "set a") == "<undefined>")
assert(executeSetStatement("user2", "set a") == "<undefined>")
}
}

View File

@ -1143,6 +1143,31 @@ object KyuubiConf {
.booleanConf
.createWithDefault(false)
val ENGINE_USER_ISOLATED_SPARK_SESSION: ConfigEntry[Boolean] =
buildConf("kyuubi.engine.user.isolated.spark.session")
.doc("When set to false, if the engine is running in a group or server share level, " +
"all the JDBC/ODBC connections will be isolated against the user. Including: " +
"the temporary views, function registries, SQL configuration and the current database. " +
"Note that, it does not affect if the share level is connection or user.")
.version("1.6.0")
.booleanConf
.createWithDefault(true)
val ENGINE_USER_ISOLATED_SPARK_SESSION_IDLE_TIMEOUT: ConfigEntry[Long] =
buildConf("kyuubi.engine.user.isolated.spark.session.idle.timeout")
.doc(s"If ${ENGINE_USER_ISOLATED_SPARK_SESSION.key} is false, we will release the " +
s"spark session if its corresponding user is inactive after this configured timeout.")
.version("1.6.0")
.timeConf
.createWithDefault(Duration.ofHours(6).toMillis)
val ENGINE_USER_ISOLATED_SPARK_SESSION_IDLE_INTERVAL: ConfigEntry[Long] =
buildConf("kyuubi.engine.user.isolated.spark.session.idle.interval")
.doc(s"The interval to check if the user isolated spark session is timeout.")
.version("1.6.0")
.timeConf
.createWithDefault(Duration.ofMinutes(1).toMillis)
val SERVER_EVENT_JSON_LOG_PATH: ConfigEntry[String] =
buildConf("kyuubi.backend.server.event.json.log.path")
.doc("The location of server events go for the builtin JSON logger")

View File

@ -77,7 +77,13 @@ trait HiveJDBCTestHelper extends JDBCTestHelper {
}
def withThriftClient[T](f: TCLIService.Iface => T): T = {
TClientTestUtils.withThriftClient(jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head)(f)
withThriftClient()(f)
}
def withThriftClient[T](user: Option[String] = None)(f: TCLIService.Iface => T): T = {
TClientTestUtils.withThriftClient(
jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head,
user)(f)
}
def withSessionHandle[T](f: (TCLIService.Iface, TSessionHandle) => T): T = {

View File

@ -30,10 +30,13 @@ import org.apache.kyuubi.service.authentication.PlainSASLHelper
object TClientTestUtils extends Logging {
def withThriftClient[T](url: String)(f: Iface => T): T = {
def withThriftClient[T](url: String, user: Option[String] = None)(f: Iface => T): T = {
val hostport = url.split(':')
val socket = new TSocket(hostport.head, hostport.last.toInt)
val transport = PlainSASLHelper.getPlainTransport(Utils.currentUser, "anonymous", socket)
val transport = PlainSASLHelper.getPlainTransport(
user.getOrElse(Utils.currentUser),
"anonymous",
socket)
val protocol = new TBinaryProtocol(transport)
val client = new TCLIService.Client(protocol)
transport.open()