From 29b6076319efe295644fdf960605e845d20b8fa8 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Thu, 24 Apr 2025 22:42:26 -0700 Subject: [PATCH] [KYUUBI #7043] Support to construct the batch info from metadata directly ### Why are the changes needed? Add an option to allow construct the batch info from metadata directly instead of redirecting the requests to reduce the RPC latency. ### How was this patch tested? Minor change and Existing GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #7043 from turboFei/support_no_redirect. Closes #7043 7f7a2fb80 [Wang, Fei] comments bb0e324a1 [Wang, Fei] save Authored-by: Wang, Fei Signed-off-by: Wang, Fei --- docs/configuration/settings.md | 1 + .../scala/org/apache/kyuubi/config/KyuubiConf.scala | 10 ++++++++++ .../apache/kyuubi/server/api/v1/BatchesResource.scala | 4 +++- 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index ab3d47ebf..1e4f4b3b8 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -83,6 +83,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.batch.application.starvation.timeout | PT3M | Threshold above which to warn batch application may be starved. | duration | 1.7.0 | | kyuubi.batch.conf.ignore.list || A comma-separated list of ignored keys for batch conf. If the batch conf contains any of them, the key and the corresponding value will be removed silently during batch job submission. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering. You can also pre-define some config for batch job submission with the prefix: kyuubi.batchConf.[batchType]. For example, you can pre-define `spark.master` for the Spark batch job with key `kyuubi.batchConf.spark.spark.master`. | set | 1.6.0 | | kyuubi.batch.extra.resource.file.max.size | 0 | The maximum size in bytes of each uploaded extra resource file when creating batch. 0 or negative value means no limit. | long | 1.10.0 | +| kyuubi.batch.info.internal.redirect | true | When set to true, the batch info is retrieved by forwarding the request to the Kyuubi instance that submitted this job via internal RPC. If false, the batch info is constructed directly from the metadata store. It is recommended to set this to false to reduce the RPC latency for multiple kyuubi instances HA mode. | boolean | 1.11.0 | | kyuubi.batch.resource.file.max.size | 0 | The maximum size in bytes of the uploaded resource file when creating batch. 0 or negative value means no limit. | long | 1.10.0 | | kyuubi.batch.session.idle.timeout | PT6H | Batch session idle timeout, it will be closed when it's not accessed for this duration | duration | 1.6.2 | diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index c1604054d..faa23a9ae 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1951,6 +1951,16 @@ object KyuubiConf { .stringConf .createWithDefault("1") + val BATCH_INFO_INTERNAL_REDIRECT = buildConf("kyuubi.batch.info.internal.redirect") + .serverOnly + .doc("When set to true, the batch info is retrieved by forwarding the request to the " + + "Kyuubi instance that submitted this job via internal RPC. If false, the batch info is " + + "constructed directly from the metadata store. It is recommended to set this to false to " + + "reduce the RPC latency for multiple kyuubi instances HA mode.") + .version("1.11.0") + .booleanConf + .createWithDefault(true) + val SERVER_EXEC_POOL_SIZE: ConfigEntry[Int] = buildConf("kyuubi.backend.server.exec.pool.size") .doc("Number of threads in the operation execution thread pool of Kyuubi server") diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index 499110cf6..71ef564da 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -68,6 +68,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { private lazy val resourceFileMaxSize = fe.getConf.get(BATCH_RESOURCE_FILE_MAX_SIZE) private lazy val extraResourceFileMaxSize = fe.getConf.get(BATCH_EXTRA_RESOURCE_FILE_MAX_SIZE) private lazy val metadataSearchWindow = fe.getConf.get(METADATA_SEARCH_WINDOW) + private lazy val batchInfoInternalRedirect = fe.getConf.get(BATCH_INFO_INTERNAL_REDIRECT) private def batchV2Enabled(reqConf: Map[String, String]): Boolean = { fe.getConf.get(BATCH_SUBMITTER_ENABLED) && @@ -355,7 +356,8 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { val isApplicationTerminated = (StringUtils.isNotBlank(metadata.engineState) && ApplicationState.isTerminated(ApplicationState.withName(metadata.engineState))) - if (batchV2Enabled(metadata.requestConf) || + if (!batchInfoInternalRedirect || + batchV2Enabled(metadata.requestConf) || isOperationTerminated || isApplicationTerminated || metadata.kyuubiInstance == fe.connectionUrl) {