formatted kyuubi configurations

This commit is contained in:
Kent Yao 2018-01-18 11:55:37 +08:00
parent 7f52205752
commit 1e40804c9a
7 changed files with 166 additions and 121 deletions

View File

@ -25,6 +25,10 @@ import scala.collection.JavaConverters._
import org.apache.spark.internal.config.{ConfigBuilder, ConfigEntry}
/**
* Kyuubi server level configuration which will be set when at the very beginning of server start.
*
*/
object KyuubiConf {
private[this] val kyuubiConfEntries = new HashMap[String, ConfigEntry[_]]()
@ -43,51 +47,52 @@ object KyuubiConf {
// High Availability by ZooKeeper //
/////////////////////////////////////////////////////////////////////////////////////////////////
val SUPPORT_DYNAMIC_SERVICE_DISCOVERY =
KyuubiConfigBuilder("spark.kyuubi.support.dynamic.service.discovery")
.doc("Whether KyuubiServer supports dynamic service discovery for its clients." +
" To support this, each instance of KyuubiServer currently uses ZooKeeper to" +
" register itself, when it is brought up. JDBC/ODBC clients should use the " +
"ZooKeeper ensemble: spark.kyuubi.zookeeper.quorum in their connection string.")
val HA_ENABLED: ConfigEntry[Boolean] =
KyuubiConfigBuilder("spark.kyuubi.ha.enabled")
.doc("Whether KyuubiServer supports dynamic service discovery for its clients." +
" To support this, each instance of KyuubiServer currently uses ZooKeeper to" +
" register itself, when it is brought up. JDBC/ODBC clients should use the " +
"ZooKeeper ensemble: spark.kyuubi.ha.zk.quorum in their connection string.")
.booleanConf
.createWithDefault(false)
val KYUUBI_ZOOKEEPER_QUORUM = KyuubiConfigBuilder("spark.kyuubi.zookeeper.quorum")
.doc("Comma separated list of ZooKeeper servers to talk to, when KyuubiServer supports" +
val HA_ZOOKEEPER_QUORUM: ConfigEntry[String] =
KyuubiConfigBuilder("spark.kyuubi.ha.zk.quorum")
.doc("Comma separated list of ZooKeeper servers to talk to, when KyuubiServer supports" +
" service discovery via Zookeeper.")
.stringConf
.createWithDefault("")
.stringConf
.createWithDefault("")
val KYUUBI_ZOOKEEPER_NAMESPACE =
KyuubiConfigBuilder("spark.kyuubi.zookeeper.namespace")
val HA_ZOOKEEPER_NAMESPACE: ConfigEntry[String] =
KyuubiConfigBuilder("spark.kyuubi.ha.zk.namespace")
.doc("The parent node in ZooKeeper used by KyuubiServer when supporting dynamic service" +
" discovery.")
.stringConf
.createWithDefault("kyuubiserver")
val KYUUBI_ZOOKEEPER_CLIENT_PORT =
KyuubiConfigBuilder("spark.kyuubi.zookeeper.client.port")
val HA_ZOOKEEPER_CLIENT_PORT: ConfigEntry[String] =
KyuubiConfigBuilder("spark.kyuubi.ha.zk.client.port")
.doc("The port of ZooKeeper servers to talk to. If the list of Zookeeper servers specified" +
" in spark.kyuubi.zookeeper.quorum does not contain port numbers, this value is used")
.stringConf
.createWithDefault("2181")
val KYUUBI_ZOOKEEPER_SESSION_TIMEOUT =
KyuubiConfigBuilder("spark.kyuubi.zookeeper.session.timeout")
.doc("ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and" +
val HA_ZOOKEEPER_SESSION_TIMEOUT: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.ha.zk.session.timeout")
.doc("ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and" +
" as a result, all locks released, if a heartbeat is not sent in the timeout.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.MINUTES.toMillis(20L))
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.MINUTES.toMillis(20L))
val KYUUBI_ZOOKEEPER_CONNECTION_BASESLEEPTIME =
KyuubiConfigBuilder("spark.kyuubi.zookeeper.connection.basesleeptime")
val HA_ZOOKEEPER_CONNECTION_BASESLEEPTIME: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.ha.zk.connection.basesleeptime")
.doc("Initial amount of time (in milliseconds) to wait between retries when connecting to" +
" the ZooKeeper server when using ExponentialBackoffRetry policy.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.SECONDS.toMillis(1L))
val KYUUBI_ZOOKEEPER_CONNECTION_MAX_RETRIES =
KyuubiConfigBuilder("spark.kyuubi.zookeeper.connection.max.retries")
val HA_ZOOKEEPER_CONNECTION_MAX_RETRIES: ConfigEntry[Int] =
KyuubiConfigBuilder("spark.kyuubi.ha.zk.connection.max.retries")
.doc("max retry time connecting to the zk server")
.intConf
.createWithDefault(3)
@ -96,46 +101,47 @@ object KyuubiConf {
// Operation Log //
/////////////////////////////////////////////////////////////////////////////////////////////////
val KYUUBI_LOGGING_OPERATION_ENABLED =
val LOGGING_OPERATION_ENABLED: ConfigEntry[Boolean] =
KyuubiConfigBuilder("spark.kyuubi.logging.operation.enabled")
.doc("When true, KyuubiServer will save operation logs and make them available for clients")
.booleanConf
.createWithDefault(true)
val KYUUBI_LOGGING_OPERATION_LOG_LOCATION =
KyuubiConfigBuilder("spark.kyuubi.logging.operation.log.location")
val LOGGING_OPERATION_LOG_DIR: ConfigEntry[String] =
KyuubiConfigBuilder("spark.kyuubi.logging.operation.log.dir")
.doc("Top level directory where operation logs are stored if logging functionality is" +
" enabled")
.stringConf
.createWithDefault(
s"${sys.env.getOrElse("SPARK_LOG_DIR", System.getProperty("java.io.tmpdir"))}"
s"${sys.env.getOrElse("SPARK_LOG_DIR",
sys.env.getOrElse("SPARK_HOME", System.getProperty("java.io.tmpdir")))}"
+ File.separator + "operation_logs")
/////////////////////////////////////////////////////////////////////////////////////////////////
// Background Execution Thread Pool //
/////////////////////////////////////////////////////////////////////////////////////////////////
val KYUUBI_ASYNC_EXEC_THREADS =
val ASYNC_EXEC_THREADS: ConfigEntry[Int] =
KyuubiConfigBuilder("spark.kyuubi.async.exec.threads")
.doc("Number of threads in the async thread pool for KyuubiServer")
.intConf
.createWithDefault(100)
val KYUUBI_ASYNC_EXEC_WAIT_QUEUE_SIZE =
val ASYNC_EXEC_WAIT_QUEUE_SIZE: ConfigEntry[Int] =
KyuubiConfigBuilder("spark.kyuubi.async.exec.wait.queue.size")
.doc("Size of the wait queue for async thread pool in KyuubiServer. After hitting this" +
" limit, the async thread pool will reject new requests.")
.intConf
.createWithDefault(100)
val KYUUBI_EXEC_KEEPALIVE_TIME =
val EXEC_KEEPALIVE_TIME: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.async.exec.keep.alive.time")
.doc("Time that an idle KyuubiServer async thread (from the thread pool) will wait for" +
" a new task to arrive before terminating")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.SECONDS.toMillis(10L))
val KYUUBI_ASYNC_EXEC_SHUTDOWN_TIMEOUT =
val ASYNC_EXEC_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.async.exec.shutdown.timeout")
.doc("How long KyuubiServer shutdown will wait for async threads to terminate.")
.timeConf(TimeUnit.MILLISECONDS)
@ -145,23 +151,23 @@ object KyuubiConf {
// Session Idle Check //
/////////////////////////////////////////////////////////////////////////////////////////////////
val KYUUBI_SESSION_CHECK_INTERVAL =
KyuubiConfigBuilder("spark.kyuubi.session.check.interval")
.doc("The check interval for session/operation timeout, which can be disabled by setting" +
" to zero or negative value.")
val FRONTEND_SESSION_CHECK_INTERVAL: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.frontend.session.check.interval")
.doc("The check interval for frontend session/operation timeout, which can be disabled by" +
" setting to zero or negative value.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.HOURS.toMillis(6L))
val KYUUBI_IDLE_SESSION_TIMEOUT =
KyuubiConfigBuilder("spark.kyuubi.idle.session.timeout")
val FRONTEND_IDLE_SESSION_TIMEOUT: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.frontend.session.timeout")
.doc("The check interval for session/operation timeout, which can be disabled by setting" +
" to zero or negative value.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.HOURS.toMillis(8L))
val KYUUBI_IDLE_SESSION_CHECK_OPERATION =
KyuubiConfigBuilder("spark.kyuubi.idle.session.check.operation")
val FRONTEND_IDLE_SESSION_CHECK_OPERATION: ConfigEntry[Boolean] =
KyuubiConfigBuilder("spark.kyuubi.frontend.session.check.operation")
.doc("Session will be considered to be idle only if there is no activity, and there is no" +
" pending operation. This setting takes effect only if session idle timeout" +
" (spark.kyuubi.idle.session.timeout) and checking (spark.kyuubi.session.check.interval)" +
@ -169,9 +175,9 @@ object KyuubiConf {
.booleanConf
.createWithDefault(true)
val KYUUBI_SPARK_SESSION_CHECK_INTERVAL =
KyuubiConfigBuilder("spark.kyuubi.session.clean.interval")
.doc("The check interval for SparkSession timeout")
val BACKEND_SESSION_CHECK_INTERVAL: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.check.interval")
.doc("The check interval for backend session a.k.a SparkSession timeout")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.MINUTES.toMillis(20L))
@ -179,13 +185,26 @@ object KyuubiConf {
// On Spark Session Init //
/////////////////////////////////////////////////////////////////////////////////////////////////
val KYUUBI_REPORT_TIMES_ON_START =
KyuubiConfigBuilder("spark.kyuubi.report.times.on.start")
.doc("How many times to check when another session with the same user is " +
"initializing SparkContext. Total Time will be times by `spark.yarn.report.interval`")
val BACKEND_SESSION_WAIT_OTHER_TIMES: ConfigEntry[Int] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.wait.other.times")
.doc("How many times to check when another session with the same user is initializing " +
"SparkContext. Total Time will be times by " +
"`spark.kyuubi.backend.session.wait.other.interval`")
.intConf
.createWithDefault(60)
val BACKEND_SESSION_WAIT_OTHER_INTERVAL: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.wait.other.interval")
.doc("")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.SECONDS.toMillis(1L))
val BACKEND_SESSTION_INIT_TIMEOUT =
KyuubiConfigBuilder("spark.kyuubi.backend.session.init.timeout")
.doc("")
.timeConf(TimeUnit.SECONDS)
.createWithDefault(TimeUnit.SECONDS.toSeconds(60L))
/**
* Return all the configuration definitions that have been defined in [[KyuubiConf]]. Each
* definition contains key, defaultValue.

View File

@ -50,22 +50,21 @@ object HighAvailabilityUtils extends Logging {
private[this] var deregisteredWithZooKeeper = false
def isSupportDynamicServiceDiscovery(conf: SparkConf): Boolean = {
conf.get(SUPPORT_DYNAMIC_SERVICE_DISCOVERY.key).toBoolean &&
conf.get(KYUUBI_ZOOKEEPER_QUORUM.key).split(",").nonEmpty
conf.get(HA_ENABLED.key).toBoolean && conf.get(HA_ZOOKEEPER_QUORUM.key).split(",").nonEmpty
}
@throws[Exception]
def addServerInstanceToZooKeeper(server: KyuubiServer): Unit = {
val conf = server.getConf
val zooKeeperEnsemble = getQuorumServers(conf)
val rootNamespace = conf.get(KYUUBI_ZOOKEEPER_NAMESPACE.key)
val rootNamespace = conf.get(HA_ZOOKEEPER_NAMESPACE.key)
val instanceURI = getServerInstanceURI(server.feService)
setUpZooKeeperAuth(conf)
val sessionTimeout = conf.getTimeAsMs(KYUUBI_ZOOKEEPER_SESSION_TIMEOUT.key).toInt
val baseSleepTime = conf.getTimeAsMs(KYUUBI_ZOOKEEPER_CONNECTION_BASESLEEPTIME.key).toInt
val maxRetries = conf.get(KYUUBI_ZOOKEEPER_CONNECTION_MAX_RETRIES.key).toInt
val sessionTimeout = conf.getTimeAsMs(HA_ZOOKEEPER_SESSION_TIMEOUT.key).toInt
val baseSleepTime = conf.getTimeAsMs(HA_ZOOKEEPER_CONNECTION_BASESLEEPTIME.key).toInt
val maxRetries = conf.get(HA_ZOOKEEPER_CONNECTION_MAX_RETRIES.key).toInt
// Create a CuratorFramework instance to be used as the ZooKeeper client
// Use the zooKeeperAclProvider to create appropriate ACLs
zooKeeperClient =
@ -132,8 +131,8 @@ object HighAvailabilityUtils extends Logging {
* @param conf SparkConf
*/
private def getQuorumServers(conf: SparkConf): String = {
val hosts = conf.get(KYUUBI_ZOOKEEPER_QUORUM.key, "").split(",")
val port = conf.get(KYUUBI_ZOOKEEPER_CLIENT_PORT.key, "2181")
val hosts = conf.get(HA_ZOOKEEPER_QUORUM.key, "").split(",")
val port = conf.get(HA_ZOOKEEPER_CLIENT_PORT.key, "2181")
val quorum = new StringBuilder
hosts.foreach { host =>
quorum.append(host)

View File

@ -44,7 +44,7 @@ private[kyuubi] class OperationManager private(name: String)
val userToOperationLog = new ConcurrentHashMap[String, OperationLog]()
override def init(conf: SparkConf): Unit = synchronized {
if (conf.get(KYUUBI_LOGGING_OPERATION_ENABLED.key).toBoolean) {
if (conf.get(LOGGING_OPERATION_ENABLED.key).toBoolean) {
initOperationLogCapture()
} else {
debug("Operation level logging is turned off")

View File

@ -56,7 +56,7 @@ private[server] class BackendService private(name: String)
username: String,
password: String,
ipAddress: String,
configuration: JMap[String, String]): SessionHandle = {
configuration: Map[String, String]): SessionHandle = {
val sessionHandle = sessionManager.openSession(
protocol, username, password, ipAddress, configuration, withImpersonation = false)
sessionHandle
@ -67,7 +67,7 @@ private[server] class BackendService private(name: String)
username: String,
password: String,
ipAddress: String,
configuration: JMap[String, String],
configuration: Map[String, String],
delegationToken: String): SessionHandle = {
val sessionHandle = sessionManager.openSession(
protocol, username, password, ipAddress, configuration, withImpersonation = true)

View File

@ -21,6 +21,7 @@ import java.net.{InetAddress, UnknownHostException}
import java.util.{ArrayList => JList, HashMap => JHashMap, Map => JMap}
import java.util.concurrent.{SynchronousQueue, ThreadPoolExecutor, TimeUnit}
import scala.collection.JavaConverters._
import scala.util.{Failure, Try}
import org.apache.hadoop.hive.conf.HiveConf
@ -243,10 +244,10 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
val sessionHandle =
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) && (userName != null)) {
beService.openSessionWithImpersonation(
protocol, userName, req.getPassword, ipAddress, req.getConfiguration, null)
protocol, userName, req.getPassword, ipAddress, req.getConfiguration.asScala.toMap, null)
} else {
beService.openSession(
protocol, userName, req.getPassword, ipAddress, req.getConfiguration)
protocol, userName, req.getPassword, ipAddress, req.getConfiguration.asScala.toMap)
}
res.setServerProtocolVersion(protocol)
sessionHandle

View File

@ -20,20 +20,20 @@ package yaooqinn.kyuubi.session
import java.io.{File, IOException}
import java.lang.reflect.UndeclaredThrowableException
import java.security.PrivilegedExceptionAction
import java.util.{Map => JMap, UUID}
import java.util.UUID
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashSet => MHSet}
import scala.concurrent.{Await, Promise, TimeoutException}
import scala.concurrent.duration.Duration
import scala.util.matching.Regex
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{KyuubiConf, SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ui.KyuubiServerTab
import org.apache.spark.KyuubiConf._
@ -89,28 +89,29 @@ private[kyuubi] class KyuubiSession(
}
}
private[this] def getOrCreateSparkSession(): Unit = synchronized {
val userName = sessionUGI.getShortUserName
var checkRound = math.max(conf.get(KYUUBI_REPORT_TIMES_ON_START.key).toInt, 15)
val interval = conf.getTimeAsMs(REPORT_INTERVAL, "1s")
private[this] def getOrCreateSparkSession(sessionConf: Map[String, String]): Unit = synchronized {
var checkRound = math.max(conf.get(BACKEND_SESSION_WAIT_OTHER_TIMES.key).toInt, 15)
val interval = conf.getTimeAsMs(BACKEND_SESSION_WAIT_OTHER_INTERVAL.key)
// if user's sc is being constructed by another
while (sessionManager.isSCPartiallyConstructed(userName)) {
while (sessionManager.isSCPartiallyConstructed(getUserName)) {
wait(interval)
checkRound -= 1
if (checkRound <= 0) {
throw new HiveSQLException(s"A partially constructed SparkContext for [$userName] " +
throw new HiveSQLException(s"A partially constructed SparkContext for [$getUserName] " +
s"has last more than ${checkRound * interval} seconds")
}
info(s"A partially constructed SparkContext for [$getUserName], $checkRound times countdown.")
}
sessionManager.getExistSparkSession(userName) match {
sessionManager.getSparkSession(getUserName) match {
case Some((ss, times)) if !ss.sparkContext.isStopped =>
info(s"SparkSession for [$userName] is reused " + times.incrementAndGet() + "times")
info(s"SparkSession for [$getUserName] is reused " + times.incrementAndGet() + "times")
_sparkSession = ss.newSession()
configureSparkSession(sessionConf)
case _ =>
sessionManager.setSCPartiallyConstructed(userName)
sessionManager.setSCPartiallyConstructed(getUserName)
notifyAll()
createSparkSession()
createSparkSession(sessionConf)
}
}
@ -122,12 +123,12 @@ private[kyuubi] class KyuubiSession(
}.start()
}
private[this] def createSparkSession(): Unit = {
val user = sessionUGI.getShortUserName
info(s"------ Create new SparkSession for $user -------")
val appName = s"KyuubiSession[$user]:${UUID.randomUUID().toString}"
private[this] def createSparkSession(sessionConf: Map[String, String]): Unit = {
info(s"------ Create new SparkSession for $getUserName -------")
val appName = s"KyuubiSession[$getUserName]:${UUID.randomUUID().toString}"
conf.setAppName(appName)
val totalWaitTime: Long = conf.getTimeAsSeconds(AM_MAX_WAIT_TIME, "60s")
configureSparkConf(sessionConf)
val totalWaitTime: Long = conf.getTimeAsSeconds(KyuubiConf.BACKEND_SESSTION_INIT_TIMEOUT.key)
try {
sessionUGI.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = {
@ -141,12 +142,12 @@ private[kyuubi] class KyuubiSession(
}
})
sessionManager.setSparkSession(user, _sparkSession)
sessionManager.setSparkSession(getUserName, _sparkSession)
// set sc fully constructed immediately
sessionManager.setSCFullyConstructed(user)
KyuubiServerMonitor.setListener(user, new KyuubiServerListener(conf))
_sparkSession.sparkContext.addSparkListener(KyuubiServerMonitor.getListener(user))
val uiTab = new KyuubiServerTab(user, _sparkSession.sparkContext)
sessionManager.setSCFullyConstructed(getUserName)
KyuubiServerMonitor.setListener(getUserName, new KyuubiServerListener(conf))
_sparkSession.sparkContext.addSparkListener(KyuubiServerMonitor.getListener(getUserName))
val uiTab = new KyuubiServerTab(getUserName, _sparkSession.sparkContext)
KyuubiServerMonitor.addUITab(_sparkSession.sparkContext.sparkUser, uiTab)
} catch {
case ute: UndeclaredThrowableException =>
@ -155,14 +156,14 @@ private[kyuubi] class KyuubiSession(
sessionUGI.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = HadoopUtils.killYarnAppByName(appName)
})
throw new HiveSQLException(s"Get SparkSession for [$user] failed: " + te, te)
throw new HiveSQLException(s"Get SparkSession for [$getUserName] failed: " + te, te)
case _ =>
throw new HiveSQLException(ute.toString, ute.getCause)
}
case e: Exception =>
throw new HiveSQLException(s"Get SparkSession for [$user] failed: " + e, e)
throw new HiveSQLException(s"Get SparkSession for [$getUserName] failed: " + e, e)
} finally {
sessionManager.setSCFullyConstructed(user)
sessionManager.setSCFullyConstructed(getUserName)
}
}
@ -213,23 +214,41 @@ private[kyuubi] class KyuubiSession(
}
}
private[this] def configureSession(sessionConfMap: JMap[String, String]): Unit = {
for (entry <- sessionConfMap.entrySet.asScala) {
val key = entry.getKey
if (key == "set:hivevar:mapred.job.queue.name") {
conf.set("spark.yarn.queue", entry.getValue)
} else if (key.startsWith("set:hivevar:")) {
val realKey = key.substring(12)
if (realKey.startsWith("spark.")) {
conf.set(realKey, entry.getValue)
} else {
conf.set("spark.hadoop." + realKey, entry.getValue)
}
} else if (key.startsWith("use:")) {
// deal with database later after sparkSession initialized
initialDatabase = "use " + entry.getValue
} else {
/**
* Setting configuration from connection strings before SparkConext init.
* @param sessionConf configurations for user connection string
*/
private[this] def configureSparkConf(sessionConf: Map[String, String]): Unit = {
for ((key, value) <- sessionConf) {
key match {
case HIVE_VAR_PREFIX(DEPRECATED_QUEUE) => conf.set(QUEUE, value)
case HIVE_VAR_PREFIX(k) =>
if (k.startsWith(SPARK_PREFIX)) {
conf.set(k, value)
} else {
conf.set(SPARK_HADOOP_PREFIX + k, value)
}
case "use:database" => initialDatabase = "use " + value
case _ =>
}
}
}
/**
* Setting configuration from connection strings for existing SparkSession
* @param sessionConf configurations for user connection string
*/
private[this] def configureSparkSession(sessionConf: Map[String, String]): Unit = {
for ((key, value) <- sessionConf) {
key match {
case HIVE_VAR_PREFIX(k) =>
if (k.startsWith(SPARK_PREFIX)) {
_sparkSession.conf.set(k, value)
} else {
_sparkSession.conf.set(SPARK_HADOOP_PREFIX + k, value)
}
case "use:database" => initialDatabase = "use " + value
case _ =>
}
}
}
@ -238,9 +257,8 @@ private[kyuubi] class KyuubiSession(
def ugi: UserGroupInformation = this.sessionUGI
def open(sessionConfMap: JMap[String, String]): Unit = {
configureSession(sessionConfMap)
getOrCreateSparkSession()
def open(sessionConf: Map[String, String]): Unit = {
getOrCreateSparkSession(sessionConf)
assert(_sparkSession != null)
sessionUGI.doAs(new PrivilegedExceptionAction[Unit] {
@ -418,7 +436,13 @@ private[kyuubi] class KyuubiSession(
}
object KyuubiSession {
val SPARK_APP_ID = "spark.app.id"
val AM_MAX_WAIT_TIME = "spark.yarn.am.waitTime"
val REPORT_INTERVAL = "spark.yarn.report.interval"
val HIVE_VAR_PREFIX: Regex = """set:hivevar:([^=]+)""".r
val USE_DB: Regex = """use:([^=]+)""".r
val SPARK_APP_ID: String = "spark.app.id"
val DEPRECATED_QUEUE = "mapred.job.queue.name"
val QUEUE = "spark.yarn.queue"
val SPARK_PREFIX = "spark."
val SPARK_HADOOP_PREFIX = "spark.hadoop."
}

View File

@ -26,7 +26,6 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.HashSet
import org.apache.commons.io.FileUtils
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hive.service.cli.{HiveSQLException, SessionHandle}
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup
@ -65,7 +64,7 @@ private[kyuubi] class SessionManager private(
override def init(conf: SparkConf): Unit = synchronized {
this.conf = conf
// Create operation log root directory, if operation logging is enabled
if (conf.get(KYUUBI_LOGGING_OPERATION_ENABLED.key).toBoolean) {
if (conf.get(LOGGING_OPERATION_ENABLED.key).toBoolean) {
initOperationLogRootDir()
}
createExecPool()
@ -74,11 +73,11 @@ private[kyuubi] class SessionManager private(
}
private[this] def createExecPool(): Unit = {
val poolSize = conf.get(KYUUBI_ASYNC_EXEC_THREADS.key).toInt
val poolSize = conf.get(ASYNC_EXEC_THREADS.key).toInt
info("Background operation thread pool size: " + poolSize)
val poolQueueSize = conf.get(KYUUBI_ASYNC_EXEC_WAIT_QUEUE_SIZE.key).toInt
val poolQueueSize = conf.get(ASYNC_EXEC_WAIT_QUEUE_SIZE.key).toInt
info("Background operation thread wait queue size: " + poolQueueSize)
val keepAliveTime = conf.getTimeAsSeconds(KYUUBI_EXEC_KEEPALIVE_TIME.key)
val keepAliveTime = conf.getTimeAsSeconds(EXEC_KEEPALIVE_TIME.key)
info("Background operation thread keepalive time: " + keepAliveTime + " seconds")
val threadPoolName = classOf[KyuubiServer].getSimpleName + "-Background-Pool"
execPool =
@ -90,13 +89,13 @@ private[kyuubi] class SessionManager private(
new LinkedBlockingQueue[Runnable](poolQueueSize),
new ThreadFactoryWithGarbageCleanup(threadPoolName))
execPool.allowCoreThreadTimeOut(true)
checkInterval = conf.getTimeAsMs(KYUUBI_SESSION_CHECK_INTERVAL.key)
sessionTimeout = conf.getTimeAsMs(KYUUBI_IDLE_SESSION_TIMEOUT.key)
checkOperation = conf.get(KYUUBI_IDLE_SESSION_CHECK_OPERATION.key).toBoolean
checkInterval = conf.getTimeAsMs(FRONTEND_SESSION_CHECK_INTERVAL.key)
sessionTimeout = conf.getTimeAsMs(FRONTEND_IDLE_SESSION_TIMEOUT.key)
checkOperation = conf.get(FRONTEND_IDLE_SESSION_CHECK_OPERATION.key).toBoolean
}
private[this] def initOperationLogRootDir(): Unit = {
operationLogRootDir = new File(conf.get(KYUUBI_LOGGING_OPERATION_LOG_LOCATION.key))
operationLogRootDir = new File(conf.get(LOGGING_OPERATION_LOG_DIR.key))
isOperationLogEnabled = true
if (operationLogRootDir.exists && !operationLogRootDir.isDirectory) {
info("The operation log root directory exists, but it is not a directory: "
@ -174,7 +173,7 @@ private[kyuubi] class SessionManager private(
private[this] def startSparkSessionCleaner(): Unit = {
// at least 10 min
val interval = math.max(
conf.getTimeAsMs(KYUUBI_SPARK_SESSION_CHECK_INTERVAL.key), 10 * 60 * 1000L)
conf.getTimeAsMs(BACKEND_SESSION_CHECK_INTERVAL.key), 10 * 60 * 1000L)
val sessionCleaner = new Runnable {
override def run(): Unit = {
sleepInterval(interval)
@ -185,6 +184,9 @@ private[kyuubi] class SessionManager private(
removeSparkSession(user)
KyuubiServerMonitor.detachUITab(user)
session.stop()
if (conf.get("spark.master").startsWith("yarn")) {
System.setProperty("SPARK_YARN_MODE", "true")
}
}
case _ =>
}
@ -215,7 +217,7 @@ private[kyuubi] class SessionManager private(
username: String,
password: String,
ipAddress: String,
sessionConf: JMap[String, String],
sessionConf: Map[String, String],
withImpersonation: Boolean): SessionHandle = {
val kyuubiSession =
new KyuubiSession(
@ -278,12 +280,12 @@ private[kyuubi] class SessionManager private(
shutdown = true
if (execPool != null) {
execPool.shutdown()
val timeout = conf.getTimeAsSeconds(KYUUBI_ASYNC_EXEC_SHUTDOWN_TIMEOUT.key)
val timeout = conf.getTimeAsSeconds(ASYNC_EXEC_SHUTDOWN_TIMEOUT.key)
try {
execPool.awaitTermination(timeout, TimeUnit.SECONDS)
} catch {
case e: InterruptedException =>
warn("KYUUBI_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout +
warn("ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout +
" seconds has been exceeded. RUNNING background operations will be shut down", e)
}
execPool = null
@ -309,7 +311,7 @@ private[kyuubi] class SessionManager private(
def submitBackgroundOperation(r: Runnable): Future[_] = execPool.submit(r)
def getExistSparkSession(user: String): Option[(SparkSession, AtomicInteger)] = {
def getSparkSession(user: String): Option[(SparkSession, AtomicInteger)] = {
Some(userToSparkSession.get(user))
}