diff --git a/docs/client/rest/rest_api.md b/docs/client/rest/rest_api.md index fbff59f05..bf4917e88 100644 --- a/docs/client/rest/rest_api.md +++ b/docs/client/rest/rest_api.md @@ -451,6 +451,12 @@ Refresh the Hadoop configurations of the Kyuubi server. Refresh the [user defaults configs](../../deployment/settings.html#user-defaults) with key in format in the form of `___{username}___.{config key}` from default property file. +### POST /admin/refresh/kubernetes_conf + +Refresh the kubernetes configs with key prefixed with `kyuubi.kubernetes` from default property file. + +It is helpful if you need to support multiple kubernetes contexts and namespaces, see [KYUUBI #4843](https://github.com/apache/kyuubi/issues/4843). + ### DELETE /admin/engine Delete the specified engine. diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/refresh/RefreshConfigCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/refresh/RefreshConfigCommand.scala index 69aa0c3d0..571e7eef3 100644 --- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/refresh/RefreshConfigCommand.scala +++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/refresh/RefreshConfigCommand.scala @@ -21,7 +21,7 @@ import org.apache.kyuubi.KyuubiException import org.apache.kyuubi.client.AdminRestApi import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient import org.apache.kyuubi.ctl.cmd.AdminCtlCommand -import org.apache.kyuubi.ctl.cmd.refresh.RefreshConfigCommandConfigType.{HADOOP_CONF, UNLIMITED_USERS, USER_DEFAULTS_CONF} +import org.apache.kyuubi.ctl.cmd.refresh.RefreshConfigCommandConfigType.{HADOOP_CONF, KUBERNETES_CONF, UNLIMITED_USERS, USER_DEFAULTS_CONF} import org.apache.kyuubi.ctl.opt.CliConfig import org.apache.kyuubi.ctl.util.{Tabulator, Validator} @@ -36,6 +36,7 @@ class RefreshConfigCommand(cliConfig: CliConfig) extends AdminCtlCommand[String] normalizedCliConfig.adminConfigOpts.configType match { case HADOOP_CONF => adminRestApi.refreshHadoopConf() case USER_DEFAULTS_CONF => adminRestApi.refreshUserDefaultsConf() + case KUBERNETES_CONF => adminRestApi.refreshKubernetesConf() case UNLIMITED_USERS => adminRestApi.refreshUnlimitedUsers() case configType => throw new KyuubiException(s"Invalid config type:$configType") } @@ -49,5 +50,6 @@ class RefreshConfigCommand(cliConfig: CliConfig) extends AdminCtlCommand[String] object RefreshConfigCommandConfigType { final val HADOOP_CONF = "hadoopConf" final val USER_DEFAULTS_CONF = "userDefaultsConf" + final val KUBERNETES_CONF = "kubernetesConf" final val UNLIMITED_USERS = "unlimitedUsers" } diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/AdminCommandLine.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/AdminCommandLine.scala index 71e4068e5..588f3ea37 100644 --- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/AdminCommandLine.scala +++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/AdminCommandLine.scala @@ -108,6 +108,6 @@ object AdminCommandLine extends CommonCommandLine { .optional() .action((v, c) => c.copy(adminConfigOpts = c.adminConfigOpts.copy(configType = v))) .text("The valid config type can be one of the following: " + - s"$HADOOP_CONF, $USER_DEFAULTS_CONF, $UNLIMITED_USERS.")) + s"$HADOOP_CONF, $USER_DEFAULTS_CONF, $KUBERNETES_CONF, $UNLIMITED_USERS.")) } } diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/AdminControlCliArgumentsSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/AdminControlCliArgumentsSuite.scala index fdadd011b..72b6fd3e8 100644 --- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/AdminControlCliArgumentsSuite.scala +++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/AdminControlCliArgumentsSuite.scala @@ -83,6 +83,15 @@ class AdminControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExi assert(opArgs3.cliConfig.resource === ControlObject.CONFIG) assert(opArgs3.cliConfig.adminConfigOpts.configType === UNLIMITED_USERS) + args = Array( + "refresh", + "config", + "kubernetesConf") + val opArgs4 = new AdminControlCliArguments(args) + assert(opArgs4.cliConfig.action === ControlAction.REFRESH) + assert(opArgs4.cliConfig.resource === ControlObject.CONFIG) + assert(opArgs4.cliConfig.adminConfigOpts.configType === KUBERNETES_CONF) + args = Array( "refresh", "config", @@ -165,7 +174,7 @@ class AdminControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExi | Refresh the resource. |Command: refresh config [] | Refresh the config with specified type. - | The valid config type can be one of the following: $HADOOP_CONF, $USER_DEFAULTS_CONF, $UNLIMITED_USERS. + | The valid config type can be one of the following: $HADOOP_CONF, $USER_DEFAULTS_CONF, $KUBERNETES_CONF, $UNLIMITED_USERS. | | -h, --help Show help message and exit.""".stripMargin // scalastyle:on diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java index a983827c8..8287e7368 100644 --- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java @@ -47,6 +47,11 @@ public class AdminRestApi { return this.getClient().post(path, null, client.getAuthHeader()); } + public String refreshKubernetesConf() { + String path = String.format("%s/%s", API_BASE_PATH, "refresh/kubernetes_conf"); + return this.getClient().post(path, null, client.getAuthHeader()); + } + public String refreshUnlimitedUsers() { String path = String.format("%s/%s", API_BASE_PATH, "refresh/unlimited_users"); return this.getClient().post(path, null, client.getAuthHeader()); diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index d6dfba2fe..984273051 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -40,12 +40,14 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { private val enginePodInformers: ConcurrentHashMap[KubernetesInfo, SharedIndexInformer[Pod]] = new ConcurrentHashMap[KubernetesInfo, SharedIndexInformer[Pod]] - private var allowedContexts: Seq[String] = Seq.empty - private var allowedNamespaces: Seq[String] = Seq.empty - private var submitTimeout: Long = _ private var kyuubiConf: KyuubiConf = _ + private def allowedContexts: Seq[String] = + kyuubiConf.get(KyuubiConf.KUBERNETES_CONTEXT_ALLOW_LIST) + private def allowedNamespaces: Seq[String] = + kyuubiConf.get(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST) + // key is kyuubi_unique_key private val appInfoStore: ConcurrentHashMap[String, ApplicationInfo] = new ConcurrentHashMap[String, ApplicationInfo] @@ -90,8 +92,6 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { kyuubiConf = conf info("Start initializing Kubernetes application operation.") submitTimeout = conf.get(KyuubiConf.ENGINE_KUBERNETES_SUBMIT_TIMEOUT) - allowedContexts = conf.get(KyuubiConf.KUBERNETES_CONTEXT_ALLOW_LIST) - allowedNamespaces = conf.get(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST) // Defer cleaning terminated application information val retainPeriod = conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD) cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder() diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala index 491b11e90..12120ea55 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.kyuubi._ import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_PROTOCOLS, FrontendProtocols} +import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_PROTOCOLS, FrontendProtocols, KYUUBI_KUBERNETES_CONF_PREFIX} import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols._ import org.apache.kyuubi.events.{EventBus, KyuubiServerInfoEvent, ServerEventHandlerRegister} import org.apache.kyuubi.ha.HighAvailabilityConf._ @@ -111,14 +111,29 @@ object KyuubiServer extends Logging { private[kyuubi] def refreshUserDefaultsConf(): Unit = kyuubiServer.conf.synchronized { val existedUserDefaults = kyuubiServer.conf.getAllUserDefaults val refreshedUserDefaults = KyuubiConf().loadFileDefaults().getAllUserDefaults + refreshConfig("user defaults", existedUserDefaults, refreshedUserDefaults) + } + + private[kyuubi] def refreshKubernetesConf(): Unit = kyuubiServer.conf.synchronized { + val existedKubernetesConf = + kyuubiServer.conf.getAll.filter(_._1.startsWith(KYUUBI_KUBERNETES_CONF_PREFIX)) + val refreshedKubernetesConf = + KyuubiConf().loadFileDefaults().getAll.filter(_._1.startsWith(KYUUBI_KUBERNETES_CONF_PREFIX)) + refreshConfig("kubernetes", existedKubernetesConf, refreshedKubernetesConf) + } + + private def refreshConfig( + configDomain: String, + existing: Map[String, String], + refreshed: Map[String, String]): Unit = { var (unsetCount, updatedCount, addedCount) = (0, 0, 0) - for ((k, _) <- existedUserDefaults if !refreshedUserDefaults.contains(k)) { + for ((k, _) <- existing if !refreshed.contains(k)) { kyuubiServer.conf.unset(k) unsetCount = unsetCount + 1 } - for ((k, v) <- refreshedUserDefaults) { - if (existedUserDefaults.contains(k)) { - if (!StringUtils.equals(existedUserDefaults.get(k).orNull, v)) { + for ((k, v) <- refreshed) { + if (existing.contains(k)) { + if (!StringUtils.equals(existing.get(k).orNull, v)) { updatedCount = updatedCount + 1 } } else { @@ -126,7 +141,7 @@ object KyuubiServer extends Logging { } kyuubiServer.conf.set(k, v) } - info(s"Refreshed user defaults configs with changes of " + + info(s"Refreshed $configDomain configs with changes of " + s"unset: $unsetCount, updated: $updatedCount, added: $addedCount") } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala index 735efa71b..a0e136e5e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala @@ -87,6 +87,25 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { Response.ok(s"Refresh the user defaults conf successfully.").build() } + @ApiResponse( + responseCode = "200", + content = Array(new Content(mediaType = MediaType.APPLICATION_JSON)), + description = "refresh the kubernetes configs") + @POST + @Path("refresh/kubernetes_conf") + def refreshKubernetesConf(): Response = { + val userName = fe.getSessionUser(Map.empty[String, String]) + val ipAddress = fe.getIpAddress + info(s"Receive refresh kubernetes conf request from $userName/$ipAddress") + if (!isAdministrator(userName)) { + throw new NotAllowedException( + s"$userName is not allowed to refresh the kubernetes conf") + } + info(s"Reloading kubernetes conf") + KyuubiServer.refreshKubernetesConf() + Response.ok(s"Refresh the kubernetes conf successfully.").build() + } + @ApiResponse( responseCode = "200", content = Array(new Content(mediaType = MediaType.APPLICATION_JSON)),