fix #147 User Latest Logout information should be cleaned after backend session is stopped by session cache manager
This commit is contained in:
parent
8979fc5e73
commit
bc48b02666
@ -17,6 +17,8 @@
|
||||
|
||||
package yaooqinn.kyuubi.spark
|
||||
|
||||
import java.text.SimpleDateFormat
|
||||
import java.util.Date
|
||||
import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
@ -40,11 +42,11 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam
|
||||
new ThreadFactoryBuilder()
|
||||
.setDaemon(true).setNameFormat(getClass.getSimpleName + "-%d").build())
|
||||
|
||||
private[this] val userToSession = new ConcurrentHashMap[String, (SparkSession, AtomicInteger)]
|
||||
private[this] val userLatestLogout = new ConcurrentHashMap[String, Long]
|
||||
private[this] var idleTimeout: Long = _
|
||||
private val userToSession = new ConcurrentHashMap[String, (SparkSession, AtomicInteger)]
|
||||
private val userLatestLogout = new ConcurrentHashMap[String, Long]
|
||||
private var idleTimeout: Long = _
|
||||
|
||||
private[this] val sessionCleaner = new Runnable {
|
||||
private val sessionCleaner = new Runnable {
|
||||
override def run(): Unit = {
|
||||
userToSession.asScala.foreach {
|
||||
case (user, (session, _)) if session.sparkContext.isStopped =>
|
||||
@ -66,7 +68,12 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam
|
||||
}
|
||||
}
|
||||
|
||||
private[this] def removeSparkSession(user: String): Unit = {
|
||||
private def removeSparkSession(user: String): Unit = {
|
||||
Option(userLatestLogout.remove(user)) match {
|
||||
case Some(t) => info("User [" + user + "] last time logout at " +
|
||||
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(t)))
|
||||
case _ =>
|
||||
}
|
||||
userToSession.remove(user)
|
||||
KyuubiServerMonitor.detachUITab(user)
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user