diff --git a/docs/sql/functions.md b/docs/sql/functions.md index d2ab17632..449fb6a79 100644 --- a/docs/sql/functions.md +++ b/docs/sql/functions.md @@ -33,4 +33,5 @@ kyuubi_version | Return the version of Kyuubi Server | string | 1.3.0 engine_name | Return the spark application name for the associated query engine | string | 1.3.0 engine_id | Return the spark application id for the associated query engine | string | 1.4.0 system_user | Return the system user name for the associated query engine | string | 1.3.0 +session_user | Return the session username for the associated query engine | string | 1.4.0 diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index 800f0e81c..891a79fb2 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -141,6 +141,7 @@ class ExecuteStatement( private def withLocalProperties[T](f: => T): T = { try { spark.sparkContext.setJobGroup(statementId, statement, forceCancel) + spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, session.user) spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, statementId) schedulerPool match { case Some(pool) => @@ -151,6 +152,7 @@ class ExecuteStatement( f } finally { spark.sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL_KEY, null) + spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, null) spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, null) spark.sparkContext.clearJobGroup() } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KDFRegistry.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KDFRegistry.scala index 310d20056..db18a0e0e 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KDFRegistry.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KDFRegistry.scala @@ -19,12 +19,13 @@ package org.apache.kyuubi.engine.spark.udf import scala.collection.mutable.ArrayBuffer -import org.apache.spark.SparkEnv +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions.udf -import org.apache.kyuubi.KYUUBI_VERSION +import org.apache.kyuubi.{KYUUBI_VERSION, Utils} +import org.apache.kyuubi.KyuubiSparkUtils.KYUUBI_SESSION_USER_KEY object KDFRegistry { @@ -59,11 +60,22 @@ object KDFRegistry { val system_user: KyuubiDefinedFunction = create( "system_user", - udf(() => System.getProperty("user.name")).asNonNullable(), + udf(() => Utils.currentUser).asNonNullable(), "Return the system user name for the associated query engine", "string", "1.3.0") + val session_user: KyuubiDefinedFunction = create( + "session_user", + udf { () => + Option(TaskContext.get()).map(_.getLocalProperty(KYUUBI_SESSION_USER_KEY)) + .getOrElse(throw new RuntimeException("Unable to get session_user")) + }, + "Return the session username for the associated query engine", + "string", + "1.4.0" + ) + def create( name: String, udf: UserDefinedFunction, diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSparkUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSparkUtils.scala index 964db8501..1e3db4920 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSparkUtils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSparkUtils.scala @@ -18,6 +18,7 @@ package org.apache.kyuubi object KyuubiSparkUtils { + final val KYUUBI_SESSION_USER_KEY = "kyuubi.session.user" final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id" final val SPARK_SCHEDULER_POOL_KEY = "spark.scheduler.pool" final val SPARK_SQL_EXECUTION_ID_KEY = "spark.sql.execution.id" diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTests.scala index 1b0e2d8c0..6104b8b46 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTests.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTests.scala @@ -382,19 +382,6 @@ trait JDBCTests extends BasicJDBCTests { } } - test("kyuubi defined function - system_user") { - withJdbcStatement() { statement => - val rs = statement.executeQuery("SELECT system_user()") - assert(rs.next()) - val user = rs.getString(1) - // skip minikube test since dockerfile use kyuubi as user which is not same with non-k8s env. - if (user == "kyuubi") { - } else { - assert(user == System.getProperty("user.name")) - } - } - } - test("KYUUBI #1059: Plan only operations") { val ddl = "create table t(a int) using parquet" val dql = "select * from t" diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationGroupSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationGroupSuite.scala index 652fe9ce9..209269b77 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationGroupSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationGroupSuite.scala @@ -58,4 +58,16 @@ class KyuubiOperationGroupSuite extends WithKyuubiServer with JDBCTests { assert(r1 === r2) assert(r1.startsWith(s"kyuubi_GROUP_testGG")) } + + + test("kyuubi defined function - system_user/session_user") { + withSessionConf(Map("hive.server2.proxy.user" -> "user1"))(Map.empty)(Map.empty) { + withJdbcStatement() { statement => + val res = statement.executeQuery("select system_user() as c1, session_user() as c2") + assert(res.next()) + assert(res.getString("c1") === "testGG") + assert(res.getString("c2") === "user1") + } + } + } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala index bdd41d260..42b126da2 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.operation import org.scalatest.time.SpanSugar._ -import org.apache.kyuubi.WithKyuubiServer +import org.apache.kyuubi.{Utils, WithKyuubiServer} import org.apache.kyuubi.config.KyuubiConf class KyuubiOperationPerUserSuite extends WithKyuubiServer with JDBCTests { @@ -30,6 +30,15 @@ class KyuubiOperationPerUserSuite extends WithKyuubiServer with JDBCTests { KyuubiConf().set(KyuubiConf.ENGINE_SHARE_LEVEL, "user") } + test("kyuubi defined function - system_user/session_user") { + withJdbcStatement() { statement => + val rs = statement.executeQuery("SELECT system_user(), session_user()") + assert(rs.next()) + assert(rs.getString(1) === Utils.currentUser) + assert(rs.getString(2) === Utils.currentUser) + } + } + test("ensure two connections in user mode share the same engine") { var r1: String = null var r2: String = null @@ -87,5 +96,4 @@ class KyuubiOperationPerUserSuite extends WithKyuubiServer with JDBCTests { assert(r1 === r2) } } - } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationYarnClusterSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationYarnClusterSuite.scala index f1eb93060..ee0e76647 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationYarnClusterSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationYarnClusterSuite.scala @@ -42,4 +42,12 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with JDBCTe assert(resultSet.getString("id").startsWith("application_")) } } + + test("session_user shall work on yarn") { + withJdbcStatement() { statement => + val resultSet = statement.executeQuery("SELECT SESSION_USER() as su") + assert(resultSet.next()) + assert(resultSet.getString("su") === user) + } + } }