diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 35a072c10..bf547c5ff 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1781,6 +1781,25 @@ object KyuubiConf { .timeConf .createWithDefault(Duration.ofSeconds(20).toMillis) + val BATCH_INTERNAL_REST_CLIENT_REQUEST_MAX_ATTEMPTS: ConfigEntry[Int] = + buildConf("kyuubi.batch.internal.rest.client.request.max.attempts") + .internal + .doc("The internal rest client max attempts number for batch request redirection across" + + " Kyuubi instances.") + .version("1.10.0") + .intConf + .createWithDefault(3) + + val BATCH_INTERNAL_REST_CLIENT_REQUEST_ATTEMPT_WAIT: ConfigEntry[Long] = + buildConf("kyuubi.batch.internal.rest.client.request.attempt.wait") + .internal + .doc( + "The internal rest client wait time between attempts for batch request redirection " + + "across Kyuubi instances.") + .version("1.10.0") + .timeConf + .createWithDefault(Duration.ofSeconds(3).toMillis) + val BATCH_CHECK_INTERVAL: ConfigEntry[Long] = buildConf("kyuubi.batch.check.interval") .internal diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index 3c2f02c2f..aed806714 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -58,6 +58,10 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { fe.getConf.get(BATCH_INTERNAL_REST_CLIENT_SOCKET_TIMEOUT).toInt private lazy val internalConnectTimeout = fe.getConf.get(BATCH_INTERNAL_REST_CLIENT_CONNECT_TIMEOUT).toInt + private lazy val internalRequestMaxAttempts = + fe.getConf.get(BATCH_INTERNAL_REST_CLIENT_REQUEST_MAX_ATTEMPTS) + private lazy val internalRequestAttemptWait = + fe.getConf.get(BATCH_INTERNAL_REST_CLIENT_REQUEST_ATTEMPT_WAIT).toInt private lazy val internalSecurityEnabled = fe.getConf.get(ENGINE_SECURITY_ENABLED) @@ -74,7 +78,9 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { kyuubiInstance, internalSocketTimeout, internalConnectTimeout, - internalSecurityEnabled)) + internalSecurityEnabled, + internalRequestMaxAttempts, + internalRequestAttemptWait)) } private def sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager] diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/InternalRestClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/InternalRestClient.scala index 59d14dacd..011e0dc4c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/InternalRestClient.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/InternalRestClient.scala @@ -32,12 +32,17 @@ import org.apache.kyuubi.service.authentication.InternalSecurityAccessor * @param kyuubiInstance the kyuubi instance host:port. * @param socketTimeout the socket timeout for http client. * @param connectTimeout the connect timeout for http client. + * @param securityEnabled if enable secure access. + * @param requestMaxAttempts the request max attempts for http client. + * @param requestAttemptWait the request attempt wait for http client. */ class InternalRestClient( kyuubiInstance: String, socketTimeout: Int, connectTimeout: Int, - securityEnabled: Boolean) { + securityEnabled: Boolean, + requestMaxAttempts: Int, + requestAttemptWait: Int) { if (securityEnabled) { require( InternalSecurityAccessor.get() != null, @@ -69,6 +74,8 @@ class InternalRestClient( .apiVersion(KyuubiRestClient.ApiVersion.V1) .socketTimeout(socketTimeout) .connectionTimeout(connectTimeout) + .maxAttempts(requestMaxAttempts) + .attemptWaitTime(requestAttemptWait) if (securityEnabled) { builder.authHeaderGenerator(InternalRestClient.internalAuthHeaderGenerator) }