[KYUUBI #7043] Support to construct the batch info from metadata directly

### Why are the changes needed?

Add an option to allow construct the batch info from metadata directly instead of redirecting the requests to reduce the RPC latency.

### How was this patch tested?

Minor change and Existing GA.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #7043 from turboFei/support_no_redirect.

Closes #7043

7f7a2fb80 [Wang, Fei] comments
bb0e324a1 [Wang, Fei] save

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Wang, Fei 2025-04-24 22:42:26 -07:00
parent 75891d1a92
commit 29b6076319
3 changed files with 14 additions and 1 deletions

View File

@ -83,6 +83,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.batch.application.starvation.timeout | PT3M | Threshold above which to warn batch application may be starved. | duration | 1.7.0 |
| kyuubi.batch.conf.ignore.list || A comma-separated list of ignored keys for batch conf. If the batch conf contains any of them, the key and the corresponding value will be removed silently during batch job submission. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering. You can also pre-define some config for batch job submission with the prefix: kyuubi.batchConf.[batchType]. For example, you can pre-define `spark.master` for the Spark batch job with key `kyuubi.batchConf.spark.spark.master`. | set | 1.6.0 |
| kyuubi.batch.extra.resource.file.max.size | 0 | The maximum size in bytes of each uploaded extra resource file when creating batch. 0 or negative value means no limit. | long | 1.10.0 |
| kyuubi.batch.info.internal.redirect | true | When set to true, the batch info is retrieved by forwarding the request to the Kyuubi instance that submitted this job via internal RPC. If false, the batch info is constructed directly from the metadata store. It is recommended to set this to false to reduce the RPC latency for multiple kyuubi instances HA mode. | boolean | 1.11.0 |
| kyuubi.batch.resource.file.max.size | 0 | The maximum size in bytes of the uploaded resource file when creating batch. 0 or negative value means no limit. | long | 1.10.0 |
| kyuubi.batch.session.idle.timeout | PT6H | Batch session idle timeout, it will be closed when it's not accessed for this duration | duration | 1.6.2 |

View File

@ -1951,6 +1951,16 @@ object KyuubiConf {
.stringConf
.createWithDefault("1")
val BATCH_INFO_INTERNAL_REDIRECT = buildConf("kyuubi.batch.info.internal.redirect")
.serverOnly
.doc("When set to true, the batch info is retrieved by forwarding the request to the " +
"Kyuubi instance that submitted this job via internal RPC. If false, the batch info is " +
"constructed directly from the metadata store. It is recommended to set this to false to " +
"reduce the RPC latency for multiple kyuubi instances HA mode.")
.version("1.11.0")
.booleanConf
.createWithDefault(true)
val SERVER_EXEC_POOL_SIZE: ConfigEntry[Int] =
buildConf("kyuubi.backend.server.exec.pool.size")
.doc("Number of threads in the operation execution thread pool of Kyuubi server")

View File

@ -68,6 +68,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
private lazy val resourceFileMaxSize = fe.getConf.get(BATCH_RESOURCE_FILE_MAX_SIZE)
private lazy val extraResourceFileMaxSize = fe.getConf.get(BATCH_EXTRA_RESOURCE_FILE_MAX_SIZE)
private lazy val metadataSearchWindow = fe.getConf.get(METADATA_SEARCH_WINDOW)
private lazy val batchInfoInternalRedirect = fe.getConf.get(BATCH_INFO_INTERNAL_REDIRECT)
private def batchV2Enabled(reqConf: Map[String, String]): Boolean = {
fe.getConf.get(BATCH_SUBMITTER_ENABLED) &&
@ -355,7 +356,8 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
val isApplicationTerminated = (StringUtils.isNotBlank(metadata.engineState)
&& ApplicationState.isTerminated(ApplicationState.withName(metadata.engineState)))
if (batchV2Enabled(metadata.requestConf) ||
if (!batchInfoInternalRedirect ||
batchV2Enabled(metadata.requestConf) ||
isOperationTerminated ||
isApplicationTerminated ||
metadata.kyuubiInstance == fe.connectionUrl) {