[KYUUBI #660] Add UDF session_user

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

As we now fully support GROUP share level, the session user and the system user can be different, so we now need to add UDF session_user.

Also, we fix the system_user to always return the `sparkUser`

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1274 from yaooqinn/660.

Closes #660

c8ed1f77 [Kent Yao] dead code
20c18640 [Kent Yao] nit
4ab1476c [Kent Yao] nit
081f32f2 [Kent Yao] nit
e332ca78 [Kent Yao] Merge branch 'master' into 660
135b058f [Kent Yao] ci
646ddeec [Kent Yao] [KYUUBI #660] Add UDF session_user

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
Kent Yao 2021-10-26 17:12:50 +08:00
parent 275d0bd4eb
commit 7b3442a709
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
8 changed files with 49 additions and 18 deletions

View File

@ -33,4 +33,5 @@ kyuubi_version | Return the version of Kyuubi Server | string | 1.3.0
engine_name | Return the spark application name for the associated query engine | string | 1.3.0
engine_id | Return the spark application id for the associated query engine | string | 1.4.0
system_user | Return the system user name for the associated query engine | string | 1.3.0
session_user | Return the session username for the associated query engine | string | 1.4.0

View File

@ -141,6 +141,7 @@ class ExecuteStatement(
private def withLocalProperties[T](f: => T): T = {
try {
spark.sparkContext.setJobGroup(statementId, statement, forceCancel)
spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, session.user)
spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, statementId)
schedulerPool match {
case Some(pool) =>
@ -151,6 +152,7 @@ class ExecuteStatement(
f
} finally {
spark.sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL_KEY, null)
spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, null)
spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, null)
spark.sparkContext.clearJobGroup()
}

View File

@ -19,12 +19,13 @@ package org.apache.kyuubi.engine.spark.udf
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkEnv
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
import org.apache.kyuubi.KYUUBI_VERSION
import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
import org.apache.kyuubi.KyuubiSparkUtils.KYUUBI_SESSION_USER_KEY
object KDFRegistry {
@ -59,11 +60,22 @@ object KDFRegistry {
val system_user: KyuubiDefinedFunction = create(
"system_user",
udf(() => System.getProperty("user.name")).asNonNullable(),
udf(() => Utils.currentUser).asNonNullable(),
"Return the system user name for the associated query engine",
"string",
"1.3.0")
val session_user: KyuubiDefinedFunction = create(
"session_user",
udf { () =>
Option(TaskContext.get()).map(_.getLocalProperty(KYUUBI_SESSION_USER_KEY))
.getOrElse(throw new RuntimeException("Unable to get session_user"))
},
"Return the session username for the associated query engine",
"string",
"1.4.0"
)
def create(
name: String,
udf: UserDefinedFunction,

View File

@ -18,6 +18,7 @@
package org.apache.kyuubi
object KyuubiSparkUtils {
final val KYUUBI_SESSION_USER_KEY = "kyuubi.session.user"
final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
final val SPARK_SCHEDULER_POOL_KEY = "spark.scheduler.pool"
final val SPARK_SQL_EXECUTION_ID_KEY = "spark.sql.execution.id"

View File

@ -382,19 +382,6 @@ trait JDBCTests extends BasicJDBCTests {
}
}
test("kyuubi defined function - system_user") {
withJdbcStatement() { statement =>
val rs = statement.executeQuery("SELECT system_user()")
assert(rs.next())
val user = rs.getString(1)
// skip minikube test since dockerfile use kyuubi as user which is not same with non-k8s env.
if (user == "kyuubi") {
} else {
assert(user == System.getProperty("user.name"))
}
}
}
test("KYUUBI #1059: Plan only operations") {
val ddl = "create table t(a int) using parquet"
val dql = "select * from t"

View File

@ -58,4 +58,16 @@ class KyuubiOperationGroupSuite extends WithKyuubiServer with JDBCTests {
assert(r1 === r2)
assert(r1.startsWith(s"kyuubi_GROUP_testGG"))
}
test("kyuubi defined function - system_user/session_user") {
withSessionConf(Map("hive.server2.proxy.user" -> "user1"))(Map.empty)(Map.empty) {
withJdbcStatement() { statement =>
val res = statement.executeQuery("select system_user() as c1, session_user() as c2")
assert(res.next())
assert(res.getString("c1") === "testGG")
assert(res.getString("c2") === "user1")
}
}
}
}

View File

@ -19,7 +19,7 @@ package org.apache.kyuubi.operation
import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.WithKyuubiServer
import org.apache.kyuubi.{Utils, WithKyuubiServer}
import org.apache.kyuubi.config.KyuubiConf
class KyuubiOperationPerUserSuite extends WithKyuubiServer with JDBCTests {
@ -30,6 +30,15 @@ class KyuubiOperationPerUserSuite extends WithKyuubiServer with JDBCTests {
KyuubiConf().set(KyuubiConf.ENGINE_SHARE_LEVEL, "user")
}
test("kyuubi defined function - system_user/session_user") {
withJdbcStatement() { statement =>
val rs = statement.executeQuery("SELECT system_user(), session_user()")
assert(rs.next())
assert(rs.getString(1) === Utils.currentUser)
assert(rs.getString(2) === Utils.currentUser)
}
}
test("ensure two connections in user mode share the same engine") {
var r1: String = null
var r2: String = null
@ -87,5 +96,4 @@ class KyuubiOperationPerUserSuite extends WithKyuubiServer with JDBCTests {
assert(r1 === r2)
}
}
}

View File

@ -42,4 +42,12 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with JDBCTe
assert(resultSet.getString("id").startsWith("application_"))
}
}
test("session_user shall work on yarn") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("SELECT SESSION_USER() as su")
assert(resultSet.next())
assert(resultSet.getString("su") === user)
}
}
}