diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index edf8b833f..d872301a0 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -228,6 +228,9 @@ Key | Default | Meaning | Type | Since
kyuubi.engine.ui.retainedSessions|
200
|The number of SQL client sessions kept in the Kyuubi Query Engine web UI.
|int
|1.4.0
kyuubi.engine.ui.retainedStatements|200
|The number of statements kept in the Kyuubi Query Engine web UI.
|int
|1.4.0
kyuubi.engine.ui.stop.enabled|true
|When true, allows Kyuubi engine to be killed from the Spark Web UI.
|boolean
|1.3.0
+kyuubi.engine.user.isolated.spark.session|true
|When set to false, if the engine is running in a group or server share level, all the JDBC/ODBC connections will be isolated against the user. Including: the temporary views, function registries, SQL configuration and the current database. Note that, it does not affect if the share level is connection or user.
|boolean
|1.6.0
+kyuubi.engine.user.isolated.spark.session.idle.interval|PT1M
|The interval to check if the user isolated spark session is timeout.
|duration
|1.6.0
+kyuubi.engine.user.isolated.spark.session.idle.timeout|PT6H
|If kyuubi.engine.user.isolated.spark.session is false, we will release the spark session if its corresponding user is inactive after this configured timeout.
|duration
|1.6.0
### Frontend
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala
index 3a708e082..85ff91d32 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala
@@ -25,9 +25,9 @@ import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.kvstore.KVIndex
-import org.apache.kyuubi.Utils
+import org.apache.kyuubi.{Logging, Utils}
-object KyuubiSparkUtil {
+object KyuubiSparkUtil extends Logging {
type KVIndexParam = KVIndex @getter
@@ -36,6 +36,21 @@ object KyuubiSparkUtil {
def globalSparkContext: SparkContext = SparkSession.active.sparkContext
+ def initializeSparkSession(spark: SparkSession, initializationSQLs: Seq[String]): Unit = {
+ initializationSQLs.foreach { sql =>
+ spark.sparkContext.setJobGroup(
+ "initialization sql queries",
+ sql,
+ interruptOnCancel = true)
+ debug(s"Execute initialization sql: $sql")
+ try {
+ spark.sql(sql).isEmpty
+ } finally {
+ spark.sparkContext.clearJobGroup()
+ }
+ }
+ }
+
def engineId: String = globalSparkContext.applicationId
lazy val diagnostics: String = {
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 151733f8b..d6a7eb797 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -187,16 +187,9 @@ object SparkSQLEngine extends Logging {
def createSpark(): SparkSession = {
val session = SparkSession.builder.config(_sparkConf).getOrCreate
- (kyuubiConf.get(ENGINE_INITIALIZE_SQL) ++ kyuubiConf.get(ENGINE_SESSION_INITIALIZE_SQL))
- .foreach { sqlStr =>
- session.sparkContext.setJobGroup(
- "engine_initializing_queries",
- sqlStr,
- interruptOnCancel = true)
- debug(s"Execute session initializing sql: $sqlStr")
- session.sql(sqlStr).isEmpty
- session.sparkContext.clearJobGroup()
- }
+ KyuubiSparkUtil.initializeSparkSession(
+ session,
+ kyuubiConf.get(ENGINE_INITIALIZE_SQL) ++ kyuubiConf.get(ENGINE_SESSION_INITIALIZE_SQL))
session
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index 4e11f9bfd..5b07f2f57 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.engine.spark.session
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.spark.sql.SparkSession
@@ -24,9 +26,11 @@ import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.ShareLevel
-import org.apache.kyuubi.engine.spark.SparkSQLEngine
+import org.apache.kyuubi.engine.ShareLevel._
+import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.session._
+import org.apache.kyuubi.util.ThreadUtils
/**
* A [[SessionManager]] constructed with [[SparkSession]] which give it the ability to talk with
@@ -50,6 +54,87 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
val operationManager = new SparkSQLOperationManager()
private lazy val singleSparkSession = conf.get(ENGINE_SINGLE_SPARK_SESSION)
+ private lazy val shareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL))
+
+ private lazy val userIsolatedSparkSession = conf.get(ENGINE_USER_ISOLATED_SPARK_SESSION)
+ private lazy val userIsolatedIdleInterval =
+ conf.get(ENGINE_USER_ISOLATED_SPARK_SESSION_IDLE_INTERVAL)
+ private lazy val userIsolatedIdleTimeout =
+ conf.get(ENGINE_USER_ISOLATED_SPARK_SESSION_IDLE_TIMEOUT)
+ private val userIsolatedCacheLock = new Object
+ private lazy val userIsolatedCache = new java.util.HashMap[String, SparkSession]()
+ private lazy val userIsolatedCacheCount =
+ new java.util.HashMap[String, (Integer, java.lang.Long)]()
+ private var userIsolatedSparkSessionThread: Option[ScheduledExecutorService] = None
+
+ private def startUserIsolatedCacheChecker(): Unit = {
+ if (!userIsolatedSparkSession) {
+ userIsolatedSparkSessionThread =
+ Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor("user-isolated-cache-checker"))
+ userIsolatedSparkSessionThread.foreach {
+ _.scheduleWithFixedDelay(
+ () => {
+ userIsolatedCacheLock.synchronized {
+ val iter = userIsolatedCacheCount.entrySet().iterator()
+ while (iter.hasNext) {
+ val kv = iter.next()
+ if (kv.getValue._1 == 0 &&
+ kv.getValue._2 + userIsolatedIdleTimeout < System.currentTimeMillis()) {
+ userIsolatedCache.remove(kv.getKey)
+ iter.remove()
+ }
+ }
+ }
+ },
+ userIsolatedIdleInterval,
+ userIsolatedIdleInterval,
+ TimeUnit.MILLISECONDS)
+ }
+ }
+ }
+
+ override def start(): Unit = {
+ startUserIsolatedCacheChecker()
+ super.start()
+ }
+
+ override def stop(): Unit = {
+ super.stop()
+ userIsolatedSparkSessionThread.foreach(_.shutdown())
+ }
+
+ private def getOrNewSparkSession(user: String): SparkSession = {
+ if (singleSparkSession) {
+ spark
+ } else {
+ shareLevel match {
+ // it's unnecessary to create a new spark session in connection share level
+ // since the session is only one
+ case CONNECTION => spark
+ case USER => newSparkSession(spark)
+ case GROUP | SERVER if userIsolatedSparkSession => newSparkSession(spark)
+ case GROUP | SERVER =>
+ userIsolatedCacheLock.synchronized {
+ if (userIsolatedCache.containsKey(user)) {
+ val (count, _) = userIsolatedCacheCount.get(user)
+ userIsolatedCacheCount.put(user, (count + 1, System.currentTimeMillis()))
+ userIsolatedCache.get(user)
+ } else {
+ userIsolatedCacheCount.put(user, (1, System.currentTimeMillis()))
+ val newSession = newSparkSession(spark)
+ userIsolatedCache.put(user, newSession)
+ newSession
+ }
+ }
+ }
+ }
+ }
+
+ private def newSparkSession(rootSparkSession: SparkSession): SparkSession = {
+ val newSparkSession = rootSparkSession.newSession()
+ KyuubiSparkUtil.initializeSparkSession(newSparkSession, conf.get(ENGINE_SESSION_INITIALIZE_SQL))
+ newSparkSession
+ }
override protected def createSession(
protocol: TProtocolVersion,
@@ -60,21 +145,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
val clientIp = conf.getOrElse(CLIENT_IP_KEY, ipAddress)
val sparkSession =
try {
- if (singleSparkSession) {
- spark
- } else {
- val ss = spark.newSession()
- this.conf.get(ENGINE_SESSION_INITIALIZE_SQL).foreach { sqlStr =>
- ss.sparkContext.setJobGroup(
- "engine_initializing_queries",
- sqlStr,
- interruptOnCancel = true)
- debug(s"Execute session initializing sql: $sqlStr")
- ss.sql(sqlStr).isEmpty
- ss.sparkContext.clearJobGroup()
- }
- ss
- }
+ getOrNewSparkSession(user)
} catch {
case e: Exception => throw KyuubiSQLException(e)
}
@@ -91,8 +162,19 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
+ if (!userIsolatedSparkSession) {
+ val session = getSession(sessionHandle)
+ if (session != null) {
+ userIsolatedCacheLock.synchronized {
+ if (userIsolatedCacheCount.containsKey(session.user)) {
+ val (count, _) = userIsolatedCacheCount.get(session.user)
+ userIsolatedCacheCount.put(session.user, (count - 1, System.currentTimeMillis()))
+ }
+ }
+ }
+ }
super.closeSession(sessionHandle)
- if (conf.get(ENGINE_SHARE_LEVEL) == ShareLevel.CONNECTION.toString) {
+ if (shareLevel == ShareLevel.CONNECTION) {
info("Session stopped due to shared level is Connection.")
stopSession()
}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/UserIsolatedSessionSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/UserIsolatedSessionSuite.scala
new file mode 100644
index 000000000..9d31e180f
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/UserIsolatedSessionSuite.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.session
+
+import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsReq, TOpenSessionReq}
+
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+
+class UserIsolatedSessionSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
+
+ override def withKyuubiConf: Map[String, String] = {
+ Map(
+ ENGINE_SHARE_LEVEL.key -> "GROUP",
+ ENGINE_USER_ISOLATED_SPARK_SESSION.key -> "false",
+ ENGINE_USER_ISOLATED_SPARK_SESSION_IDLE_INTERVAL.key -> "100",
+ ENGINE_USER_ISOLATED_SPARK_SESSION_IDLE_TIMEOUT.key -> "5000")
+ }
+
+ override protected def jdbcUrl: String =
+ s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;#spark.ui.enabled=false"
+
+ private def executeSetStatement(user: String, statement: String): String = {
+ withThriftClient(Some(user)) { client =>
+ val req = new TOpenSessionReq()
+ req.setUsername(user)
+ req.setPassword("anonymous")
+ val tOpenSessionResp = client.OpenSession(req)
+ val tExecuteStatementReq = new TExecuteStatementReq()
+ tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
+ tExecuteStatementReq.setStatement(statement)
+ tExecuteStatementReq.setRunAsync(false)
+ val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
+
+ val operationHandle = tExecuteStatementResp.getOperationHandle
+ val tFetchResultsReq = new TFetchResultsReq()
+ tFetchResultsReq.setOperationHandle(operationHandle)
+ tFetchResultsReq.setFetchType(0)
+ tFetchResultsReq.setMaxRows(1)
+ val tFetchResultsResp = client.FetchResults(tFetchResultsReq)
+ tFetchResultsResp.getResults.getColumns.get(1).getStringVal.getValues.get(0)
+ }
+ }
+
+ test("isolated user spark session") {
+ executeSetStatement("user1", "set a=1")
+ assert(executeSetStatement("user1", "set a") == "1")
+ assert(executeSetStatement("user1", "set a") == "1")
+ assert(executeSetStatement("user2", "set a") == "")
+ executeSetStatement("user2", "set a=2")
+ assert(executeSetStatement("user1", "set a") == "1")
+ assert(executeSetStatement("user2", "set a") == "2")
+
+ Thread.sleep(6000)
+ assert(executeSetStatement("user1", "set a") == "")
+ assert(executeSetStatement("user2", "set a") == "")
+ }
+}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 7cb48b0a9..bcf415fa3 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1143,6 +1143,31 @@ object KyuubiConf {
.booleanConf
.createWithDefault(false)
+ val ENGINE_USER_ISOLATED_SPARK_SESSION: ConfigEntry[Boolean] =
+ buildConf("kyuubi.engine.user.isolated.spark.session")
+ .doc("When set to false, if the engine is running in a group or server share level, " +
+ "all the JDBC/ODBC connections will be isolated against the user. Including: " +
+ "the temporary views, function registries, SQL configuration and the current database. " +
+ "Note that, it does not affect if the share level is connection or user.")
+ .version("1.6.0")
+ .booleanConf
+ .createWithDefault(true)
+
+ val ENGINE_USER_ISOLATED_SPARK_SESSION_IDLE_TIMEOUT: ConfigEntry[Long] =
+ buildConf("kyuubi.engine.user.isolated.spark.session.idle.timeout")
+ .doc(s"If ${ENGINE_USER_ISOLATED_SPARK_SESSION.key} is false, we will release the " +
+ s"spark session if its corresponding user is inactive after this configured timeout.")
+ .version("1.6.0")
+ .timeConf
+ .createWithDefault(Duration.ofHours(6).toMillis)
+
+ val ENGINE_USER_ISOLATED_SPARK_SESSION_IDLE_INTERVAL: ConfigEntry[Long] =
+ buildConf("kyuubi.engine.user.isolated.spark.session.idle.interval")
+ .doc(s"The interval to check if the user isolated spark session is timeout.")
+ .version("1.6.0")
+ .timeConf
+ .createWithDefault(Duration.ofMinutes(1).toMillis)
+
val SERVER_EVENT_JSON_LOG_PATH: ConfigEntry[String] =
buildConf("kyuubi.backend.server.event.json.log.path")
.doc("The location of server events go for the builtin JSON logger")
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
index fda700e03..13876cbe5 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
@@ -77,7 +77,13 @@ trait HiveJDBCTestHelper extends JDBCTestHelper {
}
def withThriftClient[T](f: TCLIService.Iface => T): T = {
- TClientTestUtils.withThriftClient(jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head)(f)
+ withThriftClient()(f)
+ }
+
+ def withThriftClient[T](user: Option[String] = None)(f: TCLIService.Iface => T): T = {
+ TClientTestUtils.withThriftClient(
+ jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head,
+ user)(f)
}
def withSessionHandle[T](f: (TCLIService.Iface, TSessionHandle) => T): T = {
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/TClientTestUtils.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/TClientTestUtils.scala
index 439d6c670..298e41b02 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/TClientTestUtils.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/TClientTestUtils.scala
@@ -30,10 +30,13 @@ import org.apache.kyuubi.service.authentication.PlainSASLHelper
object TClientTestUtils extends Logging {
- def withThriftClient[T](url: String)(f: Iface => T): T = {
+ def withThriftClient[T](url: String, user: Option[String] = None)(f: Iface => T): T = {
val hostport = url.split(':')
val socket = new TSocket(hostport.head, hostport.last.toInt)
- val transport = PlainSASLHelper.getPlainTransport(Utils.currentUser, "anonymous", socket)
+ val transport = PlainSASLHelper.getPlainTransport(
+ user.getOrElse(Utils.currentUser),
+ "anonymous",
+ socket)
val protocol = new TBinaryProtocol(transport)
val client = new TCLIService.Client(protocol)
transport.open()