[KYUUBI #5438] Add common method to get session level config

### _Why are the changes needed?_

Current now, in spark-engine module, some session-level configurations are ignored due to the complexity of get session-level configurations in kyuubi spark engine, so As discussed in https://github.com/apache/kyuubi/pull/5410#discussion_r1360164253. If we need unit test use withSessionConf method, we need make the code get configuration from the right session

The PR is unfinished, it need wait the pr https://github.com/apache/kyuubi/pull/5410 success so that i can use the new change in unit test

closes #5438
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

No

Closes #5487 from davidyuan1223/5438_add_common_method_to_support_session_config.

Closes #5438

e1ded3654 [davidyuan] add more optional session level to get conf
84c4568d9 [davidyuan] add more optional session level to get conf
4d709023e [davidyuan] add more optional session level to get conf
96d7cde05 [davidyuan] Revert "add more optional session level to get conf"
940f8f878 [davidyuan] add more optional session level to get conf
15641e8ec [davidyuan] add more optional session level to get conf
d83893119 [davidyuan] Merge branch '5438_add_common_method_to_support_session_config' of https://github.com/davidyuan1223/kyuubi into 5438_add_common_method_to_support_session_config
2de96b5f8 [davidyuan] add common method to get session level config
3ec73adf8 [liangbowen] [KYUUBI #5522] [BATCH] Ignore main class for PySpark batch job submission
d8b808dbe [Cheng Pan] [KYUUBI #5523] [DOC] Update the Kyuubi supported components version
c7d15aed0 [Cheng Pan] [KYUUBI #5483] Release Spark TPC-H/DS Connectors with Scala 2.13
4a1db4206 [zwangsheng] [KYUUBI #5513][BATCH] Always redirect delete batch request to Kyuubi instance that owns batch session
b06e04485 [labbomb] [KYUUBI #5517] [UI] Initial implement the SQL Lab page
88bb6b4a8 [liangbowen] [KYUUBI #5486] Bump Kafka client version from 3.4.0 to 3.5.1
538a648cd [davidyuan] [KYUUBI #4186] Spark showProgress with JobInfo
682e5b5e3 [Xianxun Ye] [KYUUBI #5405] [FLINK] Support Flink 1.18
c71528ea3 [Cheng Pan] [KYUUBI #5484] Remove legacy Web UI
ee52b2a69 [Angerszhuuuu] [KYUUBI #5446][AUTHZ] Support Create/Drop/Show/Reresh index command for Hudi
6a5bb1026 [weixi] [KYUUBI #5380][UT] Create PySpark batch jobs tests for RESTful API
86f692d73 [Kent Yao] [KYUUBI #5512] [AuthZ] Remove the non-existent query specs in Deletes and Updates
dfdd7a3f4 [fwang12] [KYUUBI #5499][KYUUBI #2503] Catch any exception when closing idle session
b7b354485 [伟程] [KYUUBI #5212] Fix configuration errors causing by helm charts of prometheus services
d123a5a1e [liupeiyue] [KYUUBI #5282] Support configure Trino session conf in `kyuubi-default.conf`
075043754 [yangming] [KYUUBI #5294] [DOC] Update supported dialects for JDBC engine
9c75d8252 [zwangsheng] [KYUUBI #5435][INFRA][TEST] Improve Kyuubi On Kubernetes IT
1dc264add [Angerszhuuuu] [KYUUBI #5479][AUTHZ] Support Hudi CallProcedureHoodieCommand for stored procedures
bc3fcbb4d [Angerszhuuuu] [KYUUBI #5472] Permanent View should pass column when child plan no output
a67b8245a [Fantasy-Jay] [KYUUBI #5382][JDBC] Duplication cleanup improvement in JdbcDialect and schema helpers
c039e1b6f [Kent Yao] [KYUUBI #5497] [AuthZ] Simplify debug message for missing field/method in ReflectUtils
0c8be79ab [Angerszhuuuu] [KYUUBI #5475][FOLLOWUP] Authz check permanent view's subquery should check view's correct privilege
1293cf208 [Kent Yao] [KYUUBI #5500] Add Kyuubi Code Program to Doc
e2754fedd [Angerszhuuuu] [KYUUBI #5492][AUTHZ] saveAsTable create DataSource table miss db info
0c53d0079 [Angerszhuuuu] [KYUUBI #5447][FOLLOWUP] Remove unrelated debug prints in TableIdentifierTableExtractor
119c393fc [Angerszhuuuu] [KYUUBI #5447][AUTHZ] Support Hudi DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand
3af5ed13c [yikaifei] [KYUUBI #5427] [AUTHZ] Shade spark authz plugin
503c3f7fd [davidyuan] Merge remote-tracking branch 'origin/5438_add_common_method_to_support_session_config' into 5438_add_common_method_to_support_session_config
7a67ace08 [davidyuan] add common method to get session level config
3f4231735 [davidyuan] add common method to get session level config
bb5d5ce44 [davidyuan] add common method to get session level config
623200ff9 [davidyuan] Merge remote-tracking branch 'origin/5438_add_common_method_to_support_session_config' into 5438_add_common_method_to_support_session_config
8011959da [davidyuan] add common method to get session level config
605ef16bc [davidyuan] Merge remote-tracking branch 'origin/5438_add_common_method_to_support_session_config' into 5438_add_common_method_to_support_session_config
bb63ed87c [davidyuan] add common method to get session level config
d9cf248d4 [davidyuan] add common method to get session level config
c8647ef53 [davidyuan] add common method to get session level config
618c0f65e [david yuan] Merge branch 'apache:master' into 5438_add_common_method_to_support_session_config
c1024bded [david yuan] Merge branch 'apache:master' into 5438_add_common_method_to_support_session_config
32028f99f [davidyuan] add common method to get session level config
03e28874c [davidyuan] add common method to get session level config

Lead-authored-by: David Yuan <yuanfuyuan@mafengwo.com>
Co-authored-by: davidyuan <yuanfuyuan@mafengwo.com>
Co-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: Cheng Pan <chengpan@apache.org>
Co-authored-by: Kent Yao <yao@apache.org>
Co-authored-by: liangbowen <liangbowen@gf.com.cn>
Co-authored-by: david yuan <51512358+davidyuan1223@users.noreply.github.com>
Co-authored-by: zwangsheng <binjieyang@apache.org>
Co-authored-by: yangming <261635393@qq.com>
Co-authored-by: 伟程 <cheng1483x@gmail.com>
Co-authored-by: weixi <weixi62961@outlook.com>
Co-authored-by: fwang12 <fwang12@ebay.com>
Co-authored-by: Xianxun Ye <yesorno828423@gmail.com>
Co-authored-by: liupeiyue <liupeiyue@yy.com>
Co-authored-by: Fantasy-Jay <13631435453@163.com>
Co-authored-by: yikaifei <yikaifei@apache.org>
Co-authored-by: labbomb <739955946@qq.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
David Yuan 2023-11-08 20:49:29 +08:00 committed by Cheng Pan
parent 456b3c3f06
commit 9615db55ce
8 changed files with 50 additions and 51 deletions

View File

@ -26,6 +26,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.util.kvstore.KVIndex
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.ConfigEntry
import org.apache.kyuubi.util.SemanticVersion
object KyuubiSparkUtil extends Logging {
@ -98,4 +99,18 @@ object KyuubiSparkUtil extends Logging {
// Given that we are on the Spark SQL engine side, the [[org.apache.spark.SPARK_VERSION]] can be
// represented as the runtime version of the Spark SQL engine.
lazy val SPARK_ENGINE_RUNTIME_VERSION: SemanticVersion = SemanticVersion(SPARK_VERSION)
/**
* Get session level config value
* @param configEntry configEntry
* @param spark sparkSession
* @tparam T any type
* @return session level config value, if spark not set this config,
* default return kyuubi's config
*/
def getSessionConf[T](configEntry: ConfigEntry[T], spark: SparkSession): T = {
spark.conf.getOption(configEntry.key).map(configEntry.valueConverter).getOrElse {
SparkSQLEngine.kyuubiConf.get(configEntry)
}
}
}

View File

@ -295,10 +295,8 @@ object ExecutePython extends Logging {
}
def getSparkPythonExecFromArchive(spark: SparkSession, session: Session): Option[String] = {
val pythonEnvArchive = spark.conf.getOption(ENGINE_SPARK_PYTHON_ENV_ARCHIVE.key)
.orElse(session.sessionManager.getConf.get(ENGINE_SPARK_PYTHON_ENV_ARCHIVE))
val pythonEnvExecPath = spark.conf.getOption(ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH.key)
.getOrElse(session.sessionManager.getConf.get(ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH))
val pythonEnvArchive = getSessionConf(ENGINE_SPARK_PYTHON_ENV_ARCHIVE, spark)
val pythonEnvExecPath = getSessionConf(ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH, spark)
pythonEnvArchive.map {
archive =>
var uri = new URI(archive)
@ -311,8 +309,7 @@ object ExecutePython extends Logging {
}
def getSparkPythonHomeFromArchive(spark: SparkSession, session: Session): Option[String] = {
val pythonHomeArchive = spark.conf.getOption(ENGINE_SPARK_PYTHON_HOME_ARCHIVE.key)
.orElse(session.sessionManager.getConf.get(ENGINE_SPARK_PYTHON_HOME_ARCHIVE))
val pythonHomeArchive = getSessionConf(ENGINE_SPARK_PYTHON_HOME_ARCHIVE, spark)
pythonHomeArchive.map {
archive =>
var uri = new URI(archive)

View File

@ -148,8 +148,7 @@ class ExecuteStatement(
s"__kyuubi_operation_result_arrow_timestampAsString__=$timestampAsString")
private def collectAsIterator(resultDF: DataFrame): FetchIterator[_] = {
val resultMaxRows = spark.conf.getOption(OPERATION_RESULT_MAX_ROWS.key).map(_.toInt)
.getOrElse(session.sessionManager.getConf.get(OPERATION_RESULT_MAX_ROWS))
val resultMaxRows: Int = getSessionConf(OPERATION_RESULT_MAX_ROWS, spark)
if (incrementalCollect) {
if (resultMaxRows > 0) {
warn(s"Ignore ${OPERATION_RESULT_MAX_ROWS.key} on incremental collect mode.")

View File

@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.config.KyuubiConf.OPERATION_GET_TABLES_IGNORE_TABLE_PROPERTIES
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.getSessionConf
import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
import org.apache.kyuubi.operation.IterableFetchIterator
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
@ -34,10 +35,7 @@ class GetTables(
extends SparkOperation(session) {
protected val ignoreTableProperties =
spark.conf.getOption(OPERATION_GET_TABLES_IGNORE_TABLE_PROPERTIES.key) match {
case Some(s) => s.toBoolean
case _ => session.sessionManager.getConf.get(OPERATION_GET_TABLES_IGNORE_TABLE_PROPERTIES)
}
getSessionConf(OPERATION_GET_TABLES_IGNORE_TABLE_PROPERTIES, spark)
override def statement: String = {
super.statement +

View File

@ -27,6 +27,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf.{LINEAGE_PARSER_PLUGIN_PROVIDER, OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.getSessionConf
import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, LineageMode, OperationHandle, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError, unknownModeError}
import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError, unknownStyleError}
@ -49,9 +50,7 @@ class PlanOnlyStatement(
.getOrElse(session.sessionManager.getConf.get(OPERATION_PLAN_ONLY_EXCLUDES))
}
private val style = PlanOnlyStyle.fromString(spark.conf.get(
OPERATION_PLAN_ONLY_OUT_STYLE.key,
session.sessionManager.getConf.get(OPERATION_PLAN_ONLY_OUT_STYLE)))
private val style = PlanOnlyStyle.fromString(getSessionConf(OPERATION_PLAN_ONLY_OUT_STYLE, spark))
spark.conf.set(OPERATION_PLAN_ONLY_OUT_STYLE.key, style.name)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
@ -74,7 +73,6 @@ class PlanOnlyStatement(
withLocalProperties {
SQLConf.withExistingConf(spark.sessionState.conf) {
val parsed = spark.sessionState.sqlParser.parsePlan(statement)
parsed match {
case cmd if planExcludes.contains(cmd.getClass.getSimpleName) =>
result = spark.sql(statement)

View File

@ -31,7 +31,7 @@ import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE, SESSION_USER_SIGN_ENABLED}
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_KEY, KYUUBI_SESSION_USER_SIGN, KYUUBI_STATEMENT_ID_KEY}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SCHEDULER_POOL_KEY
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SCHEDULER_POOL_KEY}
import org.apache.kyuubi.engine.spark.events.SparkOperationEvent
import org.apache.kyuubi.engine.spark.operation.SparkOperation.TIMEZONE_KEY
import org.apache.kyuubi.engine.spark.schema.{RowSet, SchemaHelper}
@ -63,11 +63,8 @@ abstract class SparkOperation(session: Session)
override def redactedStatement: String =
redact(spark.sessionState.conf.stringRedactionPattern, statement)
protected val operationSparkListenerEnabled =
spark.conf.getOption(OPERATION_SPARK_LISTENER_ENABLED.key) match {
case Some(s) => s.toBoolean
case _ => session.sessionManager.getConf.get(OPERATION_SPARK_LISTENER_ENABLED)
}
protected val operationSparkListenerEnabled: Boolean =
getSessionConf(OPERATION_SPARK_LISTENER_ENABLED, spark)
protected val operationListener: Option[SQLOperationListener] =
if (operationSparkListenerEnabled) {
@ -80,10 +77,7 @@ abstract class SparkOperation(session: Session)
operationListener.foreach(spark.sparkContext.addSparkListener(_))
}
private val progressEnable = spark.conf.getOption(SESSION_PROGRESS_ENABLE.key) match {
case Some(s) => s.toBoolean
case _ => session.sessionManager.getConf.get(SESSION_PROGRESS_ENABLE)
}
private val progressEnable: Boolean = getSessionConf(SESSION_PROGRESS_ENABLE, spark)
protected def supportProgress: Boolean = false
@ -113,9 +107,7 @@ abstract class SparkOperation(session: Session)
protected val forceCancel =
session.sessionManager.getConf.get(KyuubiConf.OPERATION_FORCE_CANCEL)
protected val schedulerPool =
spark.conf.getOption(KyuubiConf.OPERATION_SCHEDULER_POOL.key).orElse(
session.sessionManager.getConf.get(KyuubiConf.OPERATION_SCHEDULER_POOL))
protected val schedulerPool = getSessionConf(KyuubiConf.OPERATION_SCHEDULER_POOL, spark)
protected val isSessionUserSignEnabled: Boolean = spark.sparkContext.getConf.getBoolean(
s"spark.${SESSION_USER_SIGN_ENABLED.key}",

View File

@ -27,10 +27,9 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_SHOW_PROGRESS, ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT, ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL}
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SQL_EXECUTION_ID_KEY
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SQL_EXECUTION_ID_KEY}
import org.apache.kyuubi.engine.spark.operation.ExecuteStatement
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.operation.log.OperationLog
@ -50,15 +49,14 @@ class SQLOperationListener(
private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt, SparkStageInfo]()
private var executionId: Option[Long] = None
private val conf: KyuubiConf = operation.getSession.sessionManager.getConf
private lazy val consoleProgressBar =
if (conf.get(ENGINE_SPARK_SHOW_PROGRESS)) {
if (getSessionConf(ENGINE_SPARK_SHOW_PROGRESS, spark)) {
Some(new SparkConsoleProgressBar(
operation,
activeJobs,
activeStages,
conf.get(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL),
conf.get(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT)))
getSessionConf(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL, spark),
getSessionConf(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT, spark)))
} else {
None
}

View File

@ -29,9 +29,7 @@ import org.apache.kyuubi.operation.HiveJDBCTestHelper
class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
override def withKyuubiConf: Map[String, String] = Map(
KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS.key -> "true",
KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL.key -> "200")
override def withKyuubiConf: Map[String, String] = Map.empty
override protected def jdbcUrl: String = getJdbcUrl
@ -58,19 +56,23 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp
}
test("operation listener with progress job info") {
val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);"
withSessionHandle { (client, handle) =>
val req = new TExecuteStatementReq()
req.setSessionHandle(handle)
req.setStatement(sql)
val tExecuteStatementResp = client.ExecuteStatement(req)
val opHandle = tExecuteStatementResp.getOperationHandle
val fetchResultsReq = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_NEXT, 1000)
fetchResultsReq.setFetchType(1.toShort)
eventually(timeout(90.seconds), interval(500.milliseconds)) {
val resultsResp = client.FetchResults(fetchResultsReq)
val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala
assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]")))
withSessionConf(Map(
KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS.key -> "true",
KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL.key -> "200"))()() {
val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);"
withSessionHandle { (client, handle) =>
val req = new TExecuteStatementReq()
req.setSessionHandle(handle)
req.setStatement(sql)
val tExecuteStatementResp = client.ExecuteStatement(req)
val opHandle = tExecuteStatementResp.getOperationHandle
val fetchResultsReq = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_NEXT, 1000)
fetchResultsReq.setFetchType(1.toShort)
eventually(timeout(90.seconds), interval(500.milliseconds)) {
val resultsResp = client.FetchResults(fetchResultsReq)
val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala
assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]")))
}
}
}
}