diff --git a/src/main/scala/org/apache/spark/KyuubiConf.scala b/src/main/scala/org/apache/spark/KyuubiConf.scala index fb4dfb9b1..8ca173c47 100644 --- a/src/main/scala/org/apache/spark/KyuubiConf.scala +++ b/src/main/scala/org/apache/spark/KyuubiConf.scala @@ -25,6 +25,10 @@ import scala.collection.JavaConverters._ import org.apache.spark.internal.config.{ConfigBuilder, ConfigEntry} +/** + * Kyuubi server level configuration which will be set when at the very beginning of server start. + * + */ object KyuubiConf { private[this] val kyuubiConfEntries = new HashMap[String, ConfigEntry[_]]() @@ -43,51 +47,52 @@ object KyuubiConf { // High Availability by ZooKeeper // ///////////////////////////////////////////////////////////////////////////////////////////////// - val SUPPORT_DYNAMIC_SERVICE_DISCOVERY = - KyuubiConfigBuilder("spark.kyuubi.support.dynamic.service.discovery") - .doc("Whether KyuubiServer supports dynamic service discovery for its clients." + - " To support this, each instance of KyuubiServer currently uses ZooKeeper to" + - " register itself, when it is brought up. JDBC/ODBC clients should use the " + - "ZooKeeper ensemble: spark.kyuubi.zookeeper.quorum in their connection string.") + val HA_ENABLED: ConfigEntry[Boolean] = + KyuubiConfigBuilder("spark.kyuubi.ha.enabled") + .doc("Whether KyuubiServer supports dynamic service discovery for its clients." + + " To support this, each instance of KyuubiServer currently uses ZooKeeper to" + + " register itself, when it is brought up. JDBC/ODBC clients should use the " + + "ZooKeeper ensemble: spark.kyuubi.ha.zk.quorum in their connection string.") .booleanConf .createWithDefault(false) - val KYUUBI_ZOOKEEPER_QUORUM = KyuubiConfigBuilder("spark.kyuubi.zookeeper.quorum") - .doc("Comma separated list of ZooKeeper servers to talk to, when KyuubiServer supports" + + val HA_ZOOKEEPER_QUORUM: ConfigEntry[String] = + KyuubiConfigBuilder("spark.kyuubi.ha.zk.quorum") + .doc("Comma separated list of ZooKeeper servers to talk to, when KyuubiServer supports" + " service discovery via Zookeeper.") - .stringConf - .createWithDefault("") + .stringConf + .createWithDefault("") - val KYUUBI_ZOOKEEPER_NAMESPACE = - KyuubiConfigBuilder("spark.kyuubi.zookeeper.namespace") + val HA_ZOOKEEPER_NAMESPACE: ConfigEntry[String] = + KyuubiConfigBuilder("spark.kyuubi.ha.zk.namespace") .doc("The parent node in ZooKeeper used by KyuubiServer when supporting dynamic service" + " discovery.") .stringConf .createWithDefault("kyuubiserver") - val KYUUBI_ZOOKEEPER_CLIENT_PORT = - KyuubiConfigBuilder("spark.kyuubi.zookeeper.client.port") + val HA_ZOOKEEPER_CLIENT_PORT: ConfigEntry[String] = + KyuubiConfigBuilder("spark.kyuubi.ha.zk.client.port") .doc("The port of ZooKeeper servers to talk to. If the list of Zookeeper servers specified" + " in spark.kyuubi.zookeeper.quorum does not contain port numbers, this value is used") .stringConf .createWithDefault("2181") - val KYUUBI_ZOOKEEPER_SESSION_TIMEOUT = - KyuubiConfigBuilder("spark.kyuubi.zookeeper.session.timeout") - .doc("ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and" + + val HA_ZOOKEEPER_SESSION_TIMEOUT: ConfigEntry[Long] = + KyuubiConfigBuilder("spark.kyuubi.ha.zk.session.timeout") + .doc("ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and" + " as a result, all locks released, if a heartbeat is not sent in the timeout.") - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefault(TimeUnit.MINUTES.toMillis(20L)) + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(TimeUnit.MINUTES.toMillis(20L)) - val KYUUBI_ZOOKEEPER_CONNECTION_BASESLEEPTIME = - KyuubiConfigBuilder("spark.kyuubi.zookeeper.connection.basesleeptime") + val HA_ZOOKEEPER_CONNECTION_BASESLEEPTIME: ConfigEntry[Long] = + KyuubiConfigBuilder("spark.kyuubi.ha.zk.connection.basesleeptime") .doc("Initial amount of time (in milliseconds) to wait between retries when connecting to" + " the ZooKeeper server when using ExponentialBackoffRetry policy.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(TimeUnit.SECONDS.toMillis(1L)) - val KYUUBI_ZOOKEEPER_CONNECTION_MAX_RETRIES = - KyuubiConfigBuilder("spark.kyuubi.zookeeper.connection.max.retries") + val HA_ZOOKEEPER_CONNECTION_MAX_RETRIES: ConfigEntry[Int] = + KyuubiConfigBuilder("spark.kyuubi.ha.zk.connection.max.retries") .doc("max retry time connecting to the zk server") .intConf .createWithDefault(3) @@ -96,46 +101,47 @@ object KyuubiConf { // Operation Log // ///////////////////////////////////////////////////////////////////////////////////////////////// - val KYUUBI_LOGGING_OPERATION_ENABLED = + val LOGGING_OPERATION_ENABLED: ConfigEntry[Boolean] = KyuubiConfigBuilder("spark.kyuubi.logging.operation.enabled") .doc("When true, KyuubiServer will save operation logs and make them available for clients") .booleanConf .createWithDefault(true) - val KYUUBI_LOGGING_OPERATION_LOG_LOCATION = - KyuubiConfigBuilder("spark.kyuubi.logging.operation.log.location") + val LOGGING_OPERATION_LOG_DIR: ConfigEntry[String] = + KyuubiConfigBuilder("spark.kyuubi.logging.operation.log.dir") .doc("Top level directory where operation logs are stored if logging functionality is" + " enabled") .stringConf .createWithDefault( - s"${sys.env.getOrElse("SPARK_LOG_DIR", System.getProperty("java.io.tmpdir"))}" + s"${sys.env.getOrElse("SPARK_LOG_DIR", + sys.env.getOrElse("SPARK_HOME", System.getProperty("java.io.tmpdir")))}" + File.separator + "operation_logs") ///////////////////////////////////////////////////////////////////////////////////////////////// // Background Execution Thread Pool // ///////////////////////////////////////////////////////////////////////////////////////////////// - val KYUUBI_ASYNC_EXEC_THREADS = + val ASYNC_EXEC_THREADS: ConfigEntry[Int] = KyuubiConfigBuilder("spark.kyuubi.async.exec.threads") .doc("Number of threads in the async thread pool for KyuubiServer") .intConf .createWithDefault(100) - val KYUUBI_ASYNC_EXEC_WAIT_QUEUE_SIZE = + val ASYNC_EXEC_WAIT_QUEUE_SIZE: ConfigEntry[Int] = KyuubiConfigBuilder("spark.kyuubi.async.exec.wait.queue.size") .doc("Size of the wait queue for async thread pool in KyuubiServer. After hitting this" + " limit, the async thread pool will reject new requests.") .intConf .createWithDefault(100) - val KYUUBI_EXEC_KEEPALIVE_TIME = + val EXEC_KEEPALIVE_TIME: ConfigEntry[Long] = KyuubiConfigBuilder("spark.kyuubi.async.exec.keep.alive.time") .doc("Time that an idle KyuubiServer async thread (from the thread pool) will wait for" + " a new task to arrive before terminating") .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(TimeUnit.SECONDS.toMillis(10L)) - val KYUUBI_ASYNC_EXEC_SHUTDOWN_TIMEOUT = + val ASYNC_EXEC_SHUTDOWN_TIMEOUT: ConfigEntry[Long] = KyuubiConfigBuilder("spark.kyuubi.async.exec.shutdown.timeout") .doc("How long KyuubiServer shutdown will wait for async threads to terminate.") .timeConf(TimeUnit.MILLISECONDS) @@ -145,23 +151,23 @@ object KyuubiConf { // Session Idle Check // ///////////////////////////////////////////////////////////////////////////////////////////////// - val KYUUBI_SESSION_CHECK_INTERVAL = - KyuubiConfigBuilder("spark.kyuubi.session.check.interval") - .doc("The check interval for session/operation timeout, which can be disabled by setting" + - " to zero or negative value.") + val FRONTEND_SESSION_CHECK_INTERVAL: ConfigEntry[Long] = + KyuubiConfigBuilder("spark.kyuubi.frontend.session.check.interval") + .doc("The check interval for frontend session/operation timeout, which can be disabled by" + + " setting to zero or negative value.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(TimeUnit.HOURS.toMillis(6L)) - val KYUUBI_IDLE_SESSION_TIMEOUT = - KyuubiConfigBuilder("spark.kyuubi.idle.session.timeout") + val FRONTEND_IDLE_SESSION_TIMEOUT: ConfigEntry[Long] = + KyuubiConfigBuilder("spark.kyuubi.frontend.session.timeout") .doc("The check interval for session/operation timeout, which can be disabled by setting" + " to zero or negative value.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(TimeUnit.HOURS.toMillis(8L)) - val KYUUBI_IDLE_SESSION_CHECK_OPERATION = - KyuubiConfigBuilder("spark.kyuubi.idle.session.check.operation") + val FRONTEND_IDLE_SESSION_CHECK_OPERATION: ConfigEntry[Boolean] = + KyuubiConfigBuilder("spark.kyuubi.frontend.session.check.operation") .doc("Session will be considered to be idle only if there is no activity, and there is no" + " pending operation. This setting takes effect only if session idle timeout" + " (spark.kyuubi.idle.session.timeout) and checking (spark.kyuubi.session.check.interval)" + @@ -169,9 +175,9 @@ object KyuubiConf { .booleanConf .createWithDefault(true) - val KYUUBI_SPARK_SESSION_CHECK_INTERVAL = - KyuubiConfigBuilder("spark.kyuubi.session.clean.interval") - .doc("The check interval for SparkSession timeout") + val BACKEND_SESSION_CHECK_INTERVAL: ConfigEntry[Long] = + KyuubiConfigBuilder("spark.kyuubi.backend.session.check.interval") + .doc("The check interval for backend session a.k.a SparkSession timeout") .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(TimeUnit.MINUTES.toMillis(20L)) @@ -179,13 +185,26 @@ object KyuubiConf { // On Spark Session Init // ///////////////////////////////////////////////////////////////////////////////////////////////// - val KYUUBI_REPORT_TIMES_ON_START = - KyuubiConfigBuilder("spark.kyuubi.report.times.on.start") - .doc("How many times to check when another session with the same user is " + - "initializing SparkContext. Total Time will be times by `spark.yarn.report.interval`") + val BACKEND_SESSION_WAIT_OTHER_TIMES: ConfigEntry[Int] = + KyuubiConfigBuilder("spark.kyuubi.backend.session.wait.other.times") + .doc("How many times to check when another session with the same user is initializing " + + "SparkContext. Total Time will be times by " + + "`spark.kyuubi.backend.session.wait.other.interval`") .intConf .createWithDefault(60) + val BACKEND_SESSION_WAIT_OTHER_INTERVAL: ConfigEntry[Long] = + KyuubiConfigBuilder("spark.kyuubi.backend.session.wait.other.interval") + .doc("") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(TimeUnit.SECONDS.toMillis(1L)) + + val BACKEND_SESSTION_INIT_TIMEOUT = + KyuubiConfigBuilder("spark.kyuubi.backend.session.init.timeout") + .doc("") + .timeConf(TimeUnit.SECONDS) + .createWithDefault(TimeUnit.SECONDS.toSeconds(60L)) + /** * Return all the configuration definitions that have been defined in [[KyuubiConf]]. Each * definition contains key, defaultValue. diff --git a/src/main/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtils.scala b/src/main/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtils.scala index 5128ccb12..2a7de6ddf 100644 --- a/src/main/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtils.scala +++ b/src/main/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtils.scala @@ -50,22 +50,21 @@ object HighAvailabilityUtils extends Logging { private[this] var deregisteredWithZooKeeper = false def isSupportDynamicServiceDiscovery(conf: SparkConf): Boolean = { - conf.get(SUPPORT_DYNAMIC_SERVICE_DISCOVERY.key).toBoolean && - conf.get(KYUUBI_ZOOKEEPER_QUORUM.key).split(",").nonEmpty + conf.get(HA_ENABLED.key).toBoolean && conf.get(HA_ZOOKEEPER_QUORUM.key).split(",").nonEmpty } @throws[Exception] def addServerInstanceToZooKeeper(server: KyuubiServer): Unit = { val conf = server.getConf val zooKeeperEnsemble = getQuorumServers(conf) - val rootNamespace = conf.get(KYUUBI_ZOOKEEPER_NAMESPACE.key) + val rootNamespace = conf.get(HA_ZOOKEEPER_NAMESPACE.key) val instanceURI = getServerInstanceURI(server.feService) setUpZooKeeperAuth(conf) - val sessionTimeout = conf.getTimeAsMs(KYUUBI_ZOOKEEPER_SESSION_TIMEOUT.key).toInt - val baseSleepTime = conf.getTimeAsMs(KYUUBI_ZOOKEEPER_CONNECTION_BASESLEEPTIME.key).toInt - val maxRetries = conf.get(KYUUBI_ZOOKEEPER_CONNECTION_MAX_RETRIES.key).toInt + val sessionTimeout = conf.getTimeAsMs(HA_ZOOKEEPER_SESSION_TIMEOUT.key).toInt + val baseSleepTime = conf.getTimeAsMs(HA_ZOOKEEPER_CONNECTION_BASESLEEPTIME.key).toInt + val maxRetries = conf.get(HA_ZOOKEEPER_CONNECTION_MAX_RETRIES.key).toInt // Create a CuratorFramework instance to be used as the ZooKeeper client // Use the zooKeeperAclProvider to create appropriate ACLs zooKeeperClient = @@ -132,8 +131,8 @@ object HighAvailabilityUtils extends Logging { * @param conf SparkConf */ private def getQuorumServers(conf: SparkConf): String = { - val hosts = conf.get(KYUUBI_ZOOKEEPER_QUORUM.key, "").split(",") - val port = conf.get(KYUUBI_ZOOKEEPER_CLIENT_PORT.key, "2181") + val hosts = conf.get(HA_ZOOKEEPER_QUORUM.key, "").split(",") + val port = conf.get(HA_ZOOKEEPER_CLIENT_PORT.key, "2181") val quorum = new StringBuilder hosts.foreach { host => quorum.append(host) diff --git a/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala b/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala index 2c86d8697..030082b53 100644 --- a/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala +++ b/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala @@ -44,7 +44,7 @@ private[kyuubi] class OperationManager private(name: String) val userToOperationLog = new ConcurrentHashMap[String, OperationLog]() override def init(conf: SparkConf): Unit = synchronized { - if (conf.get(KYUUBI_LOGGING_OPERATION_ENABLED.key).toBoolean) { + if (conf.get(LOGGING_OPERATION_ENABLED.key).toBoolean) { initOperationLogCapture() } else { debug("Operation level logging is turned off") diff --git a/src/main/scala/yaooqinn/kyuubi/server/BackendService.scala b/src/main/scala/yaooqinn/kyuubi/server/BackendService.scala index 0584a1847..6be6ed8e6 100644 --- a/src/main/scala/yaooqinn/kyuubi/server/BackendService.scala +++ b/src/main/scala/yaooqinn/kyuubi/server/BackendService.scala @@ -56,7 +56,7 @@ private[server] class BackendService private(name: String) username: String, password: String, ipAddress: String, - configuration: JMap[String, String]): SessionHandle = { + configuration: Map[String, String]): SessionHandle = { val sessionHandle = sessionManager.openSession( protocol, username, password, ipAddress, configuration, withImpersonation = false) sessionHandle @@ -67,7 +67,7 @@ private[server] class BackendService private(name: String) username: String, password: String, ipAddress: String, - configuration: JMap[String, String], + configuration: Map[String, String], delegationToken: String): SessionHandle = { val sessionHandle = sessionManager.openSession( protocol, username, password, ipAddress, configuration, withImpersonation = true) diff --git a/src/main/scala/yaooqinn/kyuubi/server/FrontendService.scala b/src/main/scala/yaooqinn/kyuubi/server/FrontendService.scala index 0d3c1ef5c..25b6e4dd0 100644 --- a/src/main/scala/yaooqinn/kyuubi/server/FrontendService.scala +++ b/src/main/scala/yaooqinn/kyuubi/server/FrontendService.scala @@ -21,6 +21,7 @@ import java.net.{InetAddress, UnknownHostException} import java.util.{ArrayList => JList, HashMap => JHashMap, Map => JMap} import java.util.concurrent.{SynchronousQueue, ThreadPoolExecutor, TimeUnit} +import scala.collection.JavaConverters._ import scala.util.{Failure, Try} import org.apache.hadoop.hive.conf.HiveConf @@ -243,10 +244,10 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe val sessionHandle = if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) && (userName != null)) { beService.openSessionWithImpersonation( - protocol, userName, req.getPassword, ipAddress, req.getConfiguration, null) + protocol, userName, req.getPassword, ipAddress, req.getConfiguration.asScala.toMap, null) } else { beService.openSession( - protocol, userName, req.getPassword, ipAddress, req.getConfiguration) + protocol, userName, req.getPassword, ipAddress, req.getConfiguration.asScala.toMap) } res.setServerProtocolVersion(protocol) sessionHandle diff --git a/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala b/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala index 9a1414017..2e3cb7b84 100644 --- a/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala +++ b/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala @@ -20,20 +20,20 @@ package yaooqinn.kyuubi.session import java.io.{File, IOException} import java.lang.reflect.UndeclaredThrowableException import java.security.PrivilegedExceptionAction -import java.util.{Map => JMap, UUID} +import java.util.UUID import java.util.concurrent.TimeUnit -import scala.collection.JavaConverters._ import scala.collection.mutable.{HashSet => MHSet} import scala.concurrent.{Await, Promise, TimeoutException} import scala.concurrent.duration.Duration +import scala.util.matching.Regex import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.security.UserGroupInformation import org.apache.hive.service.cli._ import org.apache.hive.service.cli.thrift.TProtocolVersion -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{KyuubiConf, SparkConf, SparkContext} import org.apache.spark.sql.SparkSession import org.apache.spark.ui.KyuubiServerTab import org.apache.spark.KyuubiConf._ @@ -89,28 +89,29 @@ private[kyuubi] class KyuubiSession( } } - private[this] def getOrCreateSparkSession(): Unit = synchronized { - val userName = sessionUGI.getShortUserName - var checkRound = math.max(conf.get(KYUUBI_REPORT_TIMES_ON_START.key).toInt, 15) - val interval = conf.getTimeAsMs(REPORT_INTERVAL, "1s") + private[this] def getOrCreateSparkSession(sessionConf: Map[String, String]): Unit = synchronized { + var checkRound = math.max(conf.get(BACKEND_SESSION_WAIT_OTHER_TIMES.key).toInt, 15) + val interval = conf.getTimeAsMs(BACKEND_SESSION_WAIT_OTHER_INTERVAL.key) // if user's sc is being constructed by another - while (sessionManager.isSCPartiallyConstructed(userName)) { + while (sessionManager.isSCPartiallyConstructed(getUserName)) { wait(interval) checkRound -= 1 if (checkRound <= 0) { - throw new HiveSQLException(s"A partially constructed SparkContext for [$userName] " + + throw new HiveSQLException(s"A partially constructed SparkContext for [$getUserName] " + s"has last more than ${checkRound * interval} seconds") } + info(s"A partially constructed SparkContext for [$getUserName], $checkRound times countdown.") } - sessionManager.getExistSparkSession(userName) match { + sessionManager.getSparkSession(getUserName) match { case Some((ss, times)) if !ss.sparkContext.isStopped => - info(s"SparkSession for [$userName] is reused " + times.incrementAndGet() + "times") + info(s"SparkSession for [$getUserName] is reused " + times.incrementAndGet() + "times") _sparkSession = ss.newSession() + configureSparkSession(sessionConf) case _ => - sessionManager.setSCPartiallyConstructed(userName) + sessionManager.setSCPartiallyConstructed(getUserName) notifyAll() - createSparkSession() + createSparkSession(sessionConf) } } @@ -122,12 +123,12 @@ private[kyuubi] class KyuubiSession( }.start() } - private[this] def createSparkSession(): Unit = { - val user = sessionUGI.getShortUserName - info(s"------ Create new SparkSession for $user -------") - val appName = s"KyuubiSession[$user]:${UUID.randomUUID().toString}" + private[this] def createSparkSession(sessionConf: Map[String, String]): Unit = { + info(s"------ Create new SparkSession for $getUserName -------") + val appName = s"KyuubiSession[$getUserName]:${UUID.randomUUID().toString}" conf.setAppName(appName) - val totalWaitTime: Long = conf.getTimeAsSeconds(AM_MAX_WAIT_TIME, "60s") + configureSparkConf(sessionConf) + val totalWaitTime: Long = conf.getTimeAsSeconds(KyuubiConf.BACKEND_SESSTION_INIT_TIMEOUT.key) try { sessionUGI.doAs(new PrivilegedExceptionAction[Unit] { override def run(): Unit = { @@ -141,12 +142,12 @@ private[kyuubi] class KyuubiSession( } }) - sessionManager.setSparkSession(user, _sparkSession) + sessionManager.setSparkSession(getUserName, _sparkSession) // set sc fully constructed immediately - sessionManager.setSCFullyConstructed(user) - KyuubiServerMonitor.setListener(user, new KyuubiServerListener(conf)) - _sparkSession.sparkContext.addSparkListener(KyuubiServerMonitor.getListener(user)) - val uiTab = new KyuubiServerTab(user, _sparkSession.sparkContext) + sessionManager.setSCFullyConstructed(getUserName) + KyuubiServerMonitor.setListener(getUserName, new KyuubiServerListener(conf)) + _sparkSession.sparkContext.addSparkListener(KyuubiServerMonitor.getListener(getUserName)) + val uiTab = new KyuubiServerTab(getUserName, _sparkSession.sparkContext) KyuubiServerMonitor.addUITab(_sparkSession.sparkContext.sparkUser, uiTab) } catch { case ute: UndeclaredThrowableException => @@ -155,14 +156,14 @@ private[kyuubi] class KyuubiSession( sessionUGI.doAs(new PrivilegedExceptionAction[Unit] { override def run(): Unit = HadoopUtils.killYarnAppByName(appName) }) - throw new HiveSQLException(s"Get SparkSession for [$user] failed: " + te, te) + throw new HiveSQLException(s"Get SparkSession for [$getUserName] failed: " + te, te) case _ => throw new HiveSQLException(ute.toString, ute.getCause) } case e: Exception => - throw new HiveSQLException(s"Get SparkSession for [$user] failed: " + e, e) + throw new HiveSQLException(s"Get SparkSession for [$getUserName] failed: " + e, e) } finally { - sessionManager.setSCFullyConstructed(user) + sessionManager.setSCFullyConstructed(getUserName) } } @@ -213,23 +214,41 @@ private[kyuubi] class KyuubiSession( } } - private[this] def configureSession(sessionConfMap: JMap[String, String]): Unit = { - for (entry <- sessionConfMap.entrySet.asScala) { - val key = entry.getKey - if (key == "set:hivevar:mapred.job.queue.name") { - conf.set("spark.yarn.queue", entry.getValue) - } else if (key.startsWith("set:hivevar:")) { - val realKey = key.substring(12) - if (realKey.startsWith("spark.")) { - conf.set(realKey, entry.getValue) - } else { - conf.set("spark.hadoop." + realKey, entry.getValue) - } - } else if (key.startsWith("use:")) { - // deal with database later after sparkSession initialized - initialDatabase = "use " + entry.getValue - } else { + /** + * Setting configuration from connection strings before SparkConext init. + * @param sessionConf configurations for user connection string + */ + private[this] def configureSparkConf(sessionConf: Map[String, String]): Unit = { + for ((key, value) <- sessionConf) { + key match { + case HIVE_VAR_PREFIX(DEPRECATED_QUEUE) => conf.set(QUEUE, value) + case HIVE_VAR_PREFIX(k) => + if (k.startsWith(SPARK_PREFIX)) { + conf.set(k, value) + } else { + conf.set(SPARK_HADOOP_PREFIX + k, value) + } + case "use:database" => initialDatabase = "use " + value + case _ => + } + } + } + /** + * Setting configuration from connection strings for existing SparkSession + * @param sessionConf configurations for user connection string + */ + private[this] def configureSparkSession(sessionConf: Map[String, String]): Unit = { + for ((key, value) <- sessionConf) { + key match { + case HIVE_VAR_PREFIX(k) => + if (k.startsWith(SPARK_PREFIX)) { + _sparkSession.conf.set(k, value) + } else { + _sparkSession.conf.set(SPARK_HADOOP_PREFIX + k, value) + } + case "use:database" => initialDatabase = "use " + value + case _ => } } } @@ -238,9 +257,8 @@ private[kyuubi] class KyuubiSession( def ugi: UserGroupInformation = this.sessionUGI - def open(sessionConfMap: JMap[String, String]): Unit = { - configureSession(sessionConfMap) - getOrCreateSparkSession() + def open(sessionConf: Map[String, String]): Unit = { + getOrCreateSparkSession(sessionConf) assert(_sparkSession != null) sessionUGI.doAs(new PrivilegedExceptionAction[Unit] { @@ -418,7 +436,13 @@ private[kyuubi] class KyuubiSession( } object KyuubiSession { - val SPARK_APP_ID = "spark.app.id" - val AM_MAX_WAIT_TIME = "spark.yarn.am.waitTime" - val REPORT_INTERVAL = "spark.yarn.report.interval" + val HIVE_VAR_PREFIX: Regex = """set:hivevar:([^=]+)""".r + val USE_DB: Regex = """use:([^=]+)""".r + + val SPARK_APP_ID: String = "spark.app.id" + val DEPRECATED_QUEUE = "mapred.job.queue.name" + val QUEUE = "spark.yarn.queue" + + val SPARK_PREFIX = "spark." + val SPARK_HADOOP_PREFIX = "spark.hadoop." } diff --git a/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala b/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala index 76b1634c3..9046b217e 100644 --- a/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala +++ b/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala @@ -26,7 +26,6 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.HashSet import org.apache.commons.io.FileUtils -import org.apache.hadoop.yarn.client.api.YarnClient import org.apache.hive.service.cli.{HiveSQLException, SessionHandle} import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup @@ -65,7 +64,7 @@ private[kyuubi] class SessionManager private( override def init(conf: SparkConf): Unit = synchronized { this.conf = conf // Create operation log root directory, if operation logging is enabled - if (conf.get(KYUUBI_LOGGING_OPERATION_ENABLED.key).toBoolean) { + if (conf.get(LOGGING_OPERATION_ENABLED.key).toBoolean) { initOperationLogRootDir() } createExecPool() @@ -74,11 +73,11 @@ private[kyuubi] class SessionManager private( } private[this] def createExecPool(): Unit = { - val poolSize = conf.get(KYUUBI_ASYNC_EXEC_THREADS.key).toInt + val poolSize = conf.get(ASYNC_EXEC_THREADS.key).toInt info("Background operation thread pool size: " + poolSize) - val poolQueueSize = conf.get(KYUUBI_ASYNC_EXEC_WAIT_QUEUE_SIZE.key).toInt + val poolQueueSize = conf.get(ASYNC_EXEC_WAIT_QUEUE_SIZE.key).toInt info("Background operation thread wait queue size: " + poolQueueSize) - val keepAliveTime = conf.getTimeAsSeconds(KYUUBI_EXEC_KEEPALIVE_TIME.key) + val keepAliveTime = conf.getTimeAsSeconds(EXEC_KEEPALIVE_TIME.key) info("Background operation thread keepalive time: " + keepAliveTime + " seconds") val threadPoolName = classOf[KyuubiServer].getSimpleName + "-Background-Pool" execPool = @@ -90,13 +89,13 @@ private[kyuubi] class SessionManager private( new LinkedBlockingQueue[Runnable](poolQueueSize), new ThreadFactoryWithGarbageCleanup(threadPoolName)) execPool.allowCoreThreadTimeOut(true) - checkInterval = conf.getTimeAsMs(KYUUBI_SESSION_CHECK_INTERVAL.key) - sessionTimeout = conf.getTimeAsMs(KYUUBI_IDLE_SESSION_TIMEOUT.key) - checkOperation = conf.get(KYUUBI_IDLE_SESSION_CHECK_OPERATION.key).toBoolean + checkInterval = conf.getTimeAsMs(FRONTEND_SESSION_CHECK_INTERVAL.key) + sessionTimeout = conf.getTimeAsMs(FRONTEND_IDLE_SESSION_TIMEOUT.key) + checkOperation = conf.get(FRONTEND_IDLE_SESSION_CHECK_OPERATION.key).toBoolean } private[this] def initOperationLogRootDir(): Unit = { - operationLogRootDir = new File(conf.get(KYUUBI_LOGGING_OPERATION_LOG_LOCATION.key)) + operationLogRootDir = new File(conf.get(LOGGING_OPERATION_LOG_DIR.key)) isOperationLogEnabled = true if (operationLogRootDir.exists && !operationLogRootDir.isDirectory) { info("The operation log root directory exists, but it is not a directory: " @@ -174,7 +173,7 @@ private[kyuubi] class SessionManager private( private[this] def startSparkSessionCleaner(): Unit = { // at least 10 min val interval = math.max( - conf.getTimeAsMs(KYUUBI_SPARK_SESSION_CHECK_INTERVAL.key), 10 * 60 * 1000L) + conf.getTimeAsMs(BACKEND_SESSION_CHECK_INTERVAL.key), 10 * 60 * 1000L) val sessionCleaner = new Runnable { override def run(): Unit = { sleepInterval(interval) @@ -185,6 +184,9 @@ private[kyuubi] class SessionManager private( removeSparkSession(user) KyuubiServerMonitor.detachUITab(user) session.stop() + if (conf.get("spark.master").startsWith("yarn")) { + System.setProperty("SPARK_YARN_MODE", "true") + } } case _ => } @@ -215,7 +217,7 @@ private[kyuubi] class SessionManager private( username: String, password: String, ipAddress: String, - sessionConf: JMap[String, String], + sessionConf: Map[String, String], withImpersonation: Boolean): SessionHandle = { val kyuubiSession = new KyuubiSession( @@ -278,12 +280,12 @@ private[kyuubi] class SessionManager private( shutdown = true if (execPool != null) { execPool.shutdown() - val timeout = conf.getTimeAsSeconds(KYUUBI_ASYNC_EXEC_SHUTDOWN_TIMEOUT.key) + val timeout = conf.getTimeAsSeconds(ASYNC_EXEC_SHUTDOWN_TIMEOUT.key) try { execPool.awaitTermination(timeout, TimeUnit.SECONDS) } catch { case e: InterruptedException => - warn("KYUUBI_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout + + warn("ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout + " seconds has been exceeded. RUNNING background operations will be shut down", e) } execPool = null @@ -309,7 +311,7 @@ private[kyuubi] class SessionManager private( def submitBackgroundOperation(r: Runnable): Future[_] = execPool.submit(r) - def getExistSparkSession(user: String): Option[(SparkSession, AtomicInteger)] = { + def getSparkSession(user: String): Option[(SparkSession, AtomicInteger)] = { Some(userToSparkSession.get(user)) }