From 8fc7adee970bb81c4f449c5bf8a1be886a511ab3 Mon Sep 17 00:00:00 2001 From: fwang12 Date: Sun, 19 Feb 2023 01:02:00 +0800 Subject: [PATCH] [KYUUBI #4360][FOLLOWUP] Get valid unlimited users from existing limiters instead of conf ### _Why are the changes needed?_ It is more accurate to get the unlimited users from existing limiters. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4363 from turboFei/refresh_unlimit. Closes #4360 a880b413 [fwang12] refactor 6da9f9bd [fwang12] get Authored-by: fwang12 Signed-off-by: Cheng Pan --- .../scala/org/apache/kyuubi/server/KyuubiServer.scala | 10 ++++------ .../apache/kyuubi/session/KyuubiSessionManager.scala | 11 ++++++++--- .../org/apache/kyuubi/session/SessionLimiter.scala | 6 +++++- 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala index df163bd1e..a27e18bbf 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala @@ -131,12 +131,10 @@ object KyuubiServer extends Logging { } private[kyuubi] def refreshUnlimitedUsers(): Unit = synchronized { - val existingUnlimitedUsers = - kyuubiServer.conf.get(KyuubiConf.SERVER_LIMIT_CONNECTIONS_USER_UNLIMITED_LIST).toSet - val refreshedUnlimitedUsers = KyuubiConf().loadFileDefaults().get( - KyuubiConf.SERVER_LIMIT_CONNECTIONS_USER_UNLIMITED_LIST).toSet - kyuubiServer.backendService.sessionManager.asInstanceOf[KyuubiSessionManager] - .refreshUnlimitedUsers(refreshedUnlimitedUsers) + val sessionMgr = kyuubiServer.backendService.sessionManager.asInstanceOf[KyuubiSessionManager] + val existingUnlimitedUsers = sessionMgr.getUnlimitedUsers() + sessionMgr.refreshUnlimitedUsers(KyuubiConf().loadFileDefaults()) + val refreshedUnlimitedUsers = sessionMgr.getUnlimitedUsers() info(s"Refreshed unlimited users from $existingUnlimitedUsers to $refreshedUnlimitedUsers") } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index 038c02564..73248cd56 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -300,9 +300,14 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { userUnlimitedList) } - private[kyuubi] def refreshUnlimitedUsers(userUnlimitedList: Set[String]): Unit = { - limiter.foreach(SessionLimiter.resetUnlimitedUsers(_, userUnlimitedList)) - batchLimiter.foreach(SessionLimiter.resetUnlimitedUsers(_, userUnlimitedList)) + private[kyuubi] def getUnlimitedUsers(): Set[String] = { + limiter.orElse(batchLimiter).map(SessionLimiter.getUnlimitedUsers).getOrElse(Set.empty) + } + + private[kyuubi] def refreshUnlimitedUsers(conf: KyuubiConf): Unit = { + val unlimitedUsers = conf.get(SERVER_LIMIT_CONNECTIONS_USER_UNLIMITED_LIST).toSet + limiter.foreach(SessionLimiter.resetUnlimitedUsers(_, unlimitedUsers)) + batchLimiter.foreach(SessionLimiter.resetUnlimitedUsers(_, unlimitedUsers)) } private def applySessionLimiter( diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala index 6cf739c39..96ca36df1 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala @@ -138,10 +138,14 @@ object SessionLimiter { unlimitedUsers) } - def resetUnlimitedUsers(limiter: SessionLimiter, unlimitedUsers: Set[String]): Unit = { + def resetUnlimitedUsers(limiter: SessionLimiter, unlimitedUsers: Set[String]): Unit = limiter match { case l: SessionLimiterWithUnlimitedUsersImpl => l.setUnlimitedUsers(unlimitedUsers) case _ => } + + def getUnlimitedUsers(limiter: SessionLimiter): Set[String] = limiter match { + case l: SessionLimiterWithUnlimitedUsersImpl => l.unlimitedUsers + case _ => Set.empty } }