[KYUUBI #4996] Support to refresh kubernetes configs dynamically
### _Why are the changes needed?_ This is a followup of #4843 To support load kubernetes conf during runtime, so that we can support more kuberntes contexts without restarting the kyuubi server. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request Closes #4996 from turboFei/refresh_kubernetes_conf. Closes #4996 807fb92e4 [fwang12] comments d42d25af7 [fwang12] from conf 809a7d3df [fwang12] refresh 22743f9e5 [fwang12] save dedebbe71 [fwang12] api d6f58cfc7 [fwang12] refresh kubernetes config Authored-by: fwang12 <fwang12@ebay.com> Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
parent
c2e27304fe
commit
1dd9db7492
@ -451,6 +451,12 @@ Refresh the Hadoop configurations of the Kyuubi server.
|
||||
|
||||
Refresh the [user defaults configs](../../deployment/settings.html#user-defaults) with key in format in the form of `___{username}___.{config key}` from default property file.
|
||||
|
||||
### POST /admin/refresh/kubernetes_conf
|
||||
|
||||
Refresh the kubernetes configs with key prefixed with `kyuubi.kubernetes` from default property file.
|
||||
|
||||
It is helpful if you need to support multiple kubernetes contexts and namespaces, see [KYUUBI #4843](https://github.com/apache/kyuubi/issues/4843).
|
||||
|
||||
### DELETE /admin/engine
|
||||
|
||||
Delete the specified engine.
|
||||
|
||||
@ -21,7 +21,7 @@ import org.apache.kyuubi.KyuubiException
|
||||
import org.apache.kyuubi.client.AdminRestApi
|
||||
import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
|
||||
import org.apache.kyuubi.ctl.cmd.AdminCtlCommand
|
||||
import org.apache.kyuubi.ctl.cmd.refresh.RefreshConfigCommandConfigType.{HADOOP_CONF, UNLIMITED_USERS, USER_DEFAULTS_CONF}
|
||||
import org.apache.kyuubi.ctl.cmd.refresh.RefreshConfigCommandConfigType.{HADOOP_CONF, KUBERNETES_CONF, UNLIMITED_USERS, USER_DEFAULTS_CONF}
|
||||
import org.apache.kyuubi.ctl.opt.CliConfig
|
||||
import org.apache.kyuubi.ctl.util.{Tabulator, Validator}
|
||||
|
||||
@ -36,6 +36,7 @@ class RefreshConfigCommand(cliConfig: CliConfig) extends AdminCtlCommand[String]
|
||||
normalizedCliConfig.adminConfigOpts.configType match {
|
||||
case HADOOP_CONF => adminRestApi.refreshHadoopConf()
|
||||
case USER_DEFAULTS_CONF => adminRestApi.refreshUserDefaultsConf()
|
||||
case KUBERNETES_CONF => adminRestApi.refreshKubernetesConf()
|
||||
case UNLIMITED_USERS => adminRestApi.refreshUnlimitedUsers()
|
||||
case configType => throw new KyuubiException(s"Invalid config type:$configType")
|
||||
}
|
||||
@ -49,5 +50,6 @@ class RefreshConfigCommand(cliConfig: CliConfig) extends AdminCtlCommand[String]
|
||||
object RefreshConfigCommandConfigType {
|
||||
final val HADOOP_CONF = "hadoopConf"
|
||||
final val USER_DEFAULTS_CONF = "userDefaultsConf"
|
||||
final val KUBERNETES_CONF = "kubernetesConf"
|
||||
final val UNLIMITED_USERS = "unlimitedUsers"
|
||||
}
|
||||
|
||||
@ -108,6 +108,6 @@ object AdminCommandLine extends CommonCommandLine {
|
||||
.optional()
|
||||
.action((v, c) => c.copy(adminConfigOpts = c.adminConfigOpts.copy(configType = v)))
|
||||
.text("The valid config type can be one of the following: " +
|
||||
s"$HADOOP_CONF, $USER_DEFAULTS_CONF, $UNLIMITED_USERS."))
|
||||
s"$HADOOP_CONF, $USER_DEFAULTS_CONF, $KUBERNETES_CONF, $UNLIMITED_USERS."))
|
||||
}
|
||||
}
|
||||
|
||||
@ -83,6 +83,15 @@ class AdminControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExi
|
||||
assert(opArgs3.cliConfig.resource === ControlObject.CONFIG)
|
||||
assert(opArgs3.cliConfig.adminConfigOpts.configType === UNLIMITED_USERS)
|
||||
|
||||
args = Array(
|
||||
"refresh",
|
||||
"config",
|
||||
"kubernetesConf")
|
||||
val opArgs4 = new AdminControlCliArguments(args)
|
||||
assert(opArgs4.cliConfig.action === ControlAction.REFRESH)
|
||||
assert(opArgs4.cliConfig.resource === ControlObject.CONFIG)
|
||||
assert(opArgs4.cliConfig.adminConfigOpts.configType === KUBERNETES_CONF)
|
||||
|
||||
args = Array(
|
||||
"refresh",
|
||||
"config",
|
||||
@ -165,7 +174,7 @@ class AdminControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExi
|
||||
| Refresh the resource.
|
||||
|Command: refresh config [<configType>]
|
||||
| Refresh the config with specified type.
|
||||
| <configType> The valid config type can be one of the following: $HADOOP_CONF, $USER_DEFAULTS_CONF, $UNLIMITED_USERS.
|
||||
| <configType> The valid config type can be one of the following: $HADOOP_CONF, $USER_DEFAULTS_CONF, $KUBERNETES_CONF, $UNLIMITED_USERS.
|
||||
|
|
||||
| -h, --help Show help message and exit.""".stripMargin
|
||||
// scalastyle:on
|
||||
|
||||
@ -47,6 +47,11 @@ public class AdminRestApi {
|
||||
return this.getClient().post(path, null, client.getAuthHeader());
|
||||
}
|
||||
|
||||
public String refreshKubernetesConf() {
|
||||
String path = String.format("%s/%s", API_BASE_PATH, "refresh/kubernetes_conf");
|
||||
return this.getClient().post(path, null, client.getAuthHeader());
|
||||
}
|
||||
|
||||
public String refreshUnlimitedUsers() {
|
||||
String path = String.format("%s/%s", API_BASE_PATH, "refresh/unlimited_users");
|
||||
return this.getClient().post(path, null, client.getAuthHeader());
|
||||
|
||||
@ -40,12 +40,14 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
|
||||
private val enginePodInformers: ConcurrentHashMap[KubernetesInfo, SharedIndexInformer[Pod]] =
|
||||
new ConcurrentHashMap[KubernetesInfo, SharedIndexInformer[Pod]]
|
||||
|
||||
private var allowedContexts: Seq[String] = Seq.empty
|
||||
private var allowedNamespaces: Seq[String] = Seq.empty
|
||||
|
||||
private var submitTimeout: Long = _
|
||||
private var kyuubiConf: KyuubiConf = _
|
||||
|
||||
private def allowedContexts: Seq[String] =
|
||||
kyuubiConf.get(KyuubiConf.KUBERNETES_CONTEXT_ALLOW_LIST)
|
||||
private def allowedNamespaces: Seq[String] =
|
||||
kyuubiConf.get(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST)
|
||||
|
||||
// key is kyuubi_unique_key
|
||||
private val appInfoStore: ConcurrentHashMap[String, ApplicationInfo] =
|
||||
new ConcurrentHashMap[String, ApplicationInfo]
|
||||
@ -90,8 +92,6 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
|
||||
kyuubiConf = conf
|
||||
info("Start initializing Kubernetes application operation.")
|
||||
submitTimeout = conf.get(KyuubiConf.ENGINE_KUBERNETES_SUBMIT_TIMEOUT)
|
||||
allowedContexts = conf.get(KyuubiConf.KUBERNETES_CONTEXT_ALLOW_LIST)
|
||||
allowedNamespaces = conf.get(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST)
|
||||
// Defer cleaning terminated application information
|
||||
val retainPeriod = conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD)
|
||||
cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder()
|
||||
|
||||
@ -25,7 +25,7 @@ import org.apache.hadoop.security.UserGroupInformation
|
||||
|
||||
import org.apache.kyuubi._
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_PROTOCOLS, FrontendProtocols}
|
||||
import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_PROTOCOLS, FrontendProtocols, KYUUBI_KUBERNETES_CONF_PREFIX}
|
||||
import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols._
|
||||
import org.apache.kyuubi.events.{EventBus, KyuubiServerInfoEvent, ServerEventHandlerRegister}
|
||||
import org.apache.kyuubi.ha.HighAvailabilityConf._
|
||||
@ -111,14 +111,29 @@ object KyuubiServer extends Logging {
|
||||
private[kyuubi] def refreshUserDefaultsConf(): Unit = kyuubiServer.conf.synchronized {
|
||||
val existedUserDefaults = kyuubiServer.conf.getAllUserDefaults
|
||||
val refreshedUserDefaults = KyuubiConf().loadFileDefaults().getAllUserDefaults
|
||||
refreshConfig("user defaults", existedUserDefaults, refreshedUserDefaults)
|
||||
}
|
||||
|
||||
private[kyuubi] def refreshKubernetesConf(): Unit = kyuubiServer.conf.synchronized {
|
||||
val existedKubernetesConf =
|
||||
kyuubiServer.conf.getAll.filter(_._1.startsWith(KYUUBI_KUBERNETES_CONF_PREFIX))
|
||||
val refreshedKubernetesConf =
|
||||
KyuubiConf().loadFileDefaults().getAll.filter(_._1.startsWith(KYUUBI_KUBERNETES_CONF_PREFIX))
|
||||
refreshConfig("kubernetes", existedKubernetesConf, refreshedKubernetesConf)
|
||||
}
|
||||
|
||||
private def refreshConfig(
|
||||
configDomain: String,
|
||||
existing: Map[String, String],
|
||||
refreshed: Map[String, String]): Unit = {
|
||||
var (unsetCount, updatedCount, addedCount) = (0, 0, 0)
|
||||
for ((k, _) <- existedUserDefaults if !refreshedUserDefaults.contains(k)) {
|
||||
for ((k, _) <- existing if !refreshed.contains(k)) {
|
||||
kyuubiServer.conf.unset(k)
|
||||
unsetCount = unsetCount + 1
|
||||
}
|
||||
for ((k, v) <- refreshedUserDefaults) {
|
||||
if (existedUserDefaults.contains(k)) {
|
||||
if (!StringUtils.equals(existedUserDefaults.get(k).orNull, v)) {
|
||||
for ((k, v) <- refreshed) {
|
||||
if (existing.contains(k)) {
|
||||
if (!StringUtils.equals(existing.get(k).orNull, v)) {
|
||||
updatedCount = updatedCount + 1
|
||||
}
|
||||
} else {
|
||||
@ -126,7 +141,7 @@ object KyuubiServer extends Logging {
|
||||
}
|
||||
kyuubiServer.conf.set(k, v)
|
||||
}
|
||||
info(s"Refreshed user defaults configs with changes of " +
|
||||
info(s"Refreshed $configDomain configs with changes of " +
|
||||
s"unset: $unsetCount, updated: $updatedCount, added: $addedCount")
|
||||
}
|
||||
|
||||
|
||||
@ -87,6 +87,25 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
|
||||
Response.ok(s"Refresh the user defaults conf successfully.").build()
|
||||
}
|
||||
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
content = Array(new Content(mediaType = MediaType.APPLICATION_JSON)),
|
||||
description = "refresh the kubernetes configs")
|
||||
@POST
|
||||
@Path("refresh/kubernetes_conf")
|
||||
def refreshKubernetesConf(): Response = {
|
||||
val userName = fe.getSessionUser(Map.empty[String, String])
|
||||
val ipAddress = fe.getIpAddress
|
||||
info(s"Receive refresh kubernetes conf request from $userName/$ipAddress")
|
||||
if (!isAdministrator(userName)) {
|
||||
throw new NotAllowedException(
|
||||
s"$userName is not allowed to refresh the kubernetes conf")
|
||||
}
|
||||
info(s"Reloading kubernetes conf")
|
||||
KyuubiServer.refreshKubernetesConf()
|
||||
Response.ok(s"Refresh the kubernetes conf successfully.").build()
|
||||
}
|
||||
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
content = Array(new Content(mediaType = MediaType.APPLICATION_JSON)),
|
||||
|
||||
Loading…
Reference in New Issue
Block a user