From 3b9ce670b99ccb64bc9a73227c0501122fdec752 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 11 Jan 2018 16:41:40 +0800 Subject: [PATCH] regist kyuubi conf to spark conf --- .../scala/org/apache/spark/KyuubiConf.scala | 63 +++++++++---------- .../kyuubi/ha/HighAvailabilityUtils.scala | 20 +++--- .../kyuubi/operation/OperationManager.scala | 2 +- .../yaooqinn/kyuubi/server/KyuubiServer.scala | 6 +- .../kyuubi/session/SessionManager.scala | 32 +++++----- 5 files changed, 59 insertions(+), 64 deletions(-) diff --git a/src/main/scala/org/apache/spark/KyuubiConf.scala b/src/main/scala/org/apache/spark/KyuubiConf.scala index ec348c2e9..007af3740 100644 --- a/src/main/scala/org/apache/spark/KyuubiConf.scala +++ b/src/main/scala/org/apache/spark/KyuubiConf.scala @@ -18,28 +18,30 @@ package org.apache.spark import java.io.File +import java.util.HashMap import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ + import org.apache.spark.internal.config.{ConfigBuilder, ConfigEntry} object KyuubiConf { - private val kyuubiConfEntries = java.util.Collections.synchronizedMap( - new java.util.HashMap[String, ConfigEntry[_]]()) + private[this] val kyuubiConfEntries = new HashMap[String, ConfigEntry[_]]() - def register(entry: ConfigEntry[_]): Unit = kyuubiConfEntries.synchronized { + def register(entry: ConfigEntry[_]): Unit = { require(!kyuubiConfEntries.containsKey(entry.key), s"Duplicate SQLConfigEntry. ${entry.key} has been registered") kyuubiConfEntries.put(entry.key, entry) } - object KyuubiConfigBuilder { + private[this] object KyuubiConfigBuilder { def apply(key: String): ConfigBuilder = ConfigBuilder(key).onCreate(register) } - ////////////////////////////////////////////////////// - // ha // - ////////////////////////////////////////////////////// + ///////////////////////////////////////////////////////////////////////////////////////////////// + // High Availability by ZooKeeper // + ///////////////////////////////////////////////////////////////////////////////////////////////// val SUPPORT_DYNAMIC_SERVICE_DISCOVERY = KyuubiConfigBuilder("spark.kyuubi.support.dynamic.service.discovery") @@ -85,14 +87,14 @@ object KyuubiConf { .createWithDefault(TimeUnit.SECONDS.toMillis(1L)) val KYUUBI_ZOOKEEPER_CONNECTION_MAX_RETRIES = - KyuubiConfigBuilder("") - .doc("") + KyuubiConfigBuilder("spark.kyuubi.zookeeper.connection.max.retries") + .doc("max retry time connecting to the zk server") .intConf .createWithDefault(3) - ////////////////////////////////////////////////////// - // log // - ////////////////////////////////////////////////////// + ///////////////////////////////////////////////////////////////////////////////////////////////// + // Operation Log // + ///////////////////////////////////////////////////////////////////////////////////////////////// val KYUUBI_LOGGING_OPERATION_ENABLED = KyuubiConfigBuilder("spark.kyuubi.logging.operation.enabled") @@ -109,9 +111,9 @@ object KyuubiConf { s"${sys.env.getOrElse("SPARK_LOG_DIR", System.getProperty("java.io.tmpdir"))}" + File.separator + "operation_logs") - ////////////////////////////////////////////////////// - // background exec thread // - ////////////////////////////////////////////////////// + ///////////////////////////////////////////////////////////////////////////////////////////////// + // Background Execution Thread Pool // + ///////////////////////////////////////////////////////////////////////////////////////////////// val KYUUBI_ASYNC_EXEC_THREADS = KyuubiConfigBuilder("spark.kyuubi.async.exec.threads") @@ -139,9 +141,9 @@ object KyuubiConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(TimeUnit.SECONDS.toMillis(10L)) - ////////////////////////////////////////////////////// - // checking idle session // - ////////////////////////////////////////////////////// + ///////////////////////////////////////////////////////////////////////////////////////////////// + // Session Idle Check // + ///////////////////////////////////////////////////////////////////////////////////////////////// val KYUUBI_SESSION_CHECK_INTERVAL = KyuubiConfigBuilder("spark.kyuubi.session.check.interval") @@ -173,20 +175,13 @@ object KyuubiConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(TimeUnit.MINUTES.toMillis(20L)) - ////////////////////////////////////////////////////// - // security // - ////////////////////////////////////////////////////// - - val KYUUBI_ENABLE_DOAS = - KyuubiConfigBuilder("spark.kyuubi.enable.doAs") - .doc("enable proxying of running Spark") - .booleanConf - .createWithDefault(true) - - val KYUUBI_AUTHENTICATION = - KyuubiConfigBuilder("spark.kyuubi.authentication") - .doc("Client authentication types. NONE: no authentication check;" + - " KERBEROS: Kerberos/GSSAPI authentication") - .stringConf - .createWithDefault("NONE") + /** + * Return all the configuration definitions that have been defined in [[KyuubiConf]]. Each + * definition contains key, defaultValue. + */ + def getAllDefaults: Map[String, String] = { + kyuubiConfEntries.entrySet().asScala.map {kv => + (kv.getKey, kv.getValue.defaultValueString) + }.toMap + } } diff --git a/src/main/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtils.scala b/src/main/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtils.scala index ed60233ef..5128ccb12 100644 --- a/src/main/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtils.scala +++ b/src/main/scala/yaooqinn/kyuubi/ha/HighAvailabilityUtils.scala @@ -50,22 +50,22 @@ object HighAvailabilityUtils extends Logging { private[this] var deregisteredWithZooKeeper = false def isSupportDynamicServiceDiscovery(conf: SparkConf): Boolean = { - conf.getBoolean(SUPPORT_DYNAMIC_SERVICE_DISCOVERY.key, defaultValue = false) && - conf.get(KYUUBI_ZOOKEEPER_QUORUM.key, "").split(",").nonEmpty + conf.get(SUPPORT_DYNAMIC_SERVICE_DISCOVERY.key).toBoolean && + conf.get(KYUUBI_ZOOKEEPER_QUORUM.key).split(",").nonEmpty } @throws[Exception] - def addServerInstanceToZooKeeper(kyuubiServer: KyuubiServer): Unit = { - val conf = kyuubiServer.getConf + def addServerInstanceToZooKeeper(server: KyuubiServer): Unit = { + val conf = server.getConf val zooKeeperEnsemble = getQuorumServers(conf) - val rootNamespace = conf.get(KYUUBI_ZOOKEEPER_NAMESPACE.key, "kyuubiserver") - val instanceURI = getServerInstanceURI(kyuubiServer.feService) + val rootNamespace = conf.get(KYUUBI_ZOOKEEPER_NAMESPACE.key) + val instanceURI = getServerInstanceURI(server.feService) setUpZooKeeperAuth(conf) - val sessionTimeout = conf.getInt(KYUUBI_ZOOKEEPER_SESSION_TIMEOUT.key, 1200000) - val baseSleepTime = conf.getInt(KYUUBI_ZOOKEEPER_CONNECTION_BASESLEEPTIME.key, 1000) - val maxRetries = conf.getInt(KYUUBI_ZOOKEEPER_CONNECTION_MAX_RETRIES.key, 3) + val sessionTimeout = conf.getTimeAsMs(KYUUBI_ZOOKEEPER_SESSION_TIMEOUT.key).toInt + val baseSleepTime = conf.getTimeAsMs(KYUUBI_ZOOKEEPER_CONNECTION_BASESLEEPTIME.key).toInt + val maxRetries = conf.get(KYUUBI_ZOOKEEPER_CONNECTION_MAX_RETRIES.key).toInt // Create a CuratorFramework instance to be used as the ZooKeeper client // Use the zooKeeperAclProvider to create appropriate ACLs zooKeeperClient = @@ -111,7 +111,7 @@ object HighAvailabilityUtils extends Logging { setDeregisteredWithZooKeeper(false) znodePath = znode.getActualPath // Set a watch on the znode - if (zooKeeperClient.checkExists.usingWatcher(new DeRegisterWatcher(kyuubiServer)) + if (zooKeeperClient.checkExists.usingWatcher(new DeRegisterWatcher(server)) .forPath(znodePath) == null) { // No node exists, throw exception throw new Exception("Unable to create znode for this KyuubiServer instance on ZooKeeper.") diff --git a/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala b/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala index ecbdcd69d..2c86d8697 100644 --- a/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala +++ b/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala @@ -44,7 +44,7 @@ private[kyuubi] class OperationManager private(name: String) val userToOperationLog = new ConcurrentHashMap[String, OperationLog]() override def init(conf: SparkConf): Unit = synchronized { - if (conf.getBoolean(KYUUBI_LOGGING_OPERATION_ENABLED.key, defaultValue = true)) { + if (conf.get(KYUUBI_LOGGING_OPERATION_ENABLED.key).toBoolean) { initOperationLogCapture() } else { debug("Operation level logging is turned off") diff --git a/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala b/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala index f691daeb4..d298683a5 100644 --- a/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala +++ b/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala @@ -21,7 +21,7 @@ import java.io.File import java.util.concurrent.atomic.AtomicBoolean import org.apache.hadoop.hive.cli.OptionsProcessor -import org.apache.spark.{SparkConf, SparkUtils} +import org.apache.spark.{KyuubiConf, SparkConf, SparkUtils} import yaooqinn.kyuubi.Logging import yaooqinn.kyuubi.ha.HighAvailabilityUtils @@ -104,6 +104,8 @@ object KyuubiServer extends Logging { error("SET spark.driver.userClassPathFirst to true") System.exit(-1) } + // overwrite later for each SparkC + conf.set("spark.app.name", classOf[KyuubiServer].getSimpleName) // avoid max port retries reached conf.set("spark.ui.port", "0") conf.set("spark.driver.allowMultipleContexts", "true") @@ -119,5 +121,7 @@ object KyuubiServer extends Logging { // but for the later [[SparkContext]] must be set to client mode conf.set("spark.submit.deployMode", "client") + // Set missing Kyuubi configs to SparkConf + KyuubiConf.getAllDefaults.foreach(kv => conf.setIfMissing(kv._1, kv._2)) } } diff --git a/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala b/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala index d42efd628..0f68cb09a 100644 --- a/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala +++ b/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.SparkSession import yaooqinn.kyuubi.Logging import yaooqinn.kyuubi.operation.OperationManager +import yaooqinn.kyuubi.server.KyuubiServer import yaooqinn.kyuubi.service.CompositeService import yaooqinn.kyuubi.ui.KyuubiServerMonitor @@ -58,12 +59,12 @@ private[kyuubi] class SessionManager private( private[this] var shutdown: Boolean = false - def this() = this(getClass.getSimpleName) + def this() = this(classOf[SessionManager].getSimpleName) override def init(conf: SparkConf): Unit = synchronized { this.conf = conf // Create operation log root directory, if operation logging is enabled - if (conf.getBoolean(KYUUBI_LOGGING_OPERATION_ENABLED.key, defaultValue = true)) { + if (conf.get(KYUUBI_LOGGING_OPERATION_ENABLED.key).toBoolean) { initOperationLogRootDir() } createBackgroundOperationPool() @@ -72,33 +73,29 @@ private[kyuubi] class SessionManager private( } private[this] def createBackgroundOperationPool(): Unit = { - val poolSize = conf.getInt(KYUUBI_ASYNC_EXEC_THREADS.key, 100) + val poolSize = conf.get(KYUUBI_ASYNC_EXEC_THREADS.key).toInt info("Background operation thread pool size: " + poolSize) - val poolQueueSize = conf.getInt(KYUUBI_ASYNC_EXEC_WAIT_QUEUE_SIZE.key, 100) + val poolQueueSize = conf.get(KYUUBI_ASYNC_EXEC_WAIT_QUEUE_SIZE.key).toInt info("Background operation thread wait queue size: " + poolQueueSize) - val keepAliveTime = conf.getTimeAsMs(KYUUBI_EXEC_KEEPALIVE_TIME.key, "10s") + val keepAliveTime = conf.getTimeAsSeconds(KYUUBI_EXEC_KEEPALIVE_TIME.key) info("Background operation thread keepalive time: " + keepAliveTime + " seconds") - val threadPoolName = "SparkThriftServer-Background-Pool" + val threadPoolName = classOf[KyuubiServer].getSimpleName + "-Background-Pool" backgroundOperationPool = new ThreadPoolExecutor( poolSize, poolSize, keepAliveTime, - TimeUnit.MILLISECONDS, + TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable](poolQueueSize), new ThreadFactoryWithGarbageCleanup(threadPoolName)) backgroundOperationPool.allowCoreThreadTimeOut(true) - checkInterval = conf.getTimeAsMs(KYUUBI_SESSION_CHECK_INTERVAL.key, "6h") - sessionTimeout = conf.getTimeAsMs(KYUUBI_IDLE_SESSION_TIMEOUT.key, "8h") - checkOperation = conf.getBoolean(KYUUBI_IDLE_SESSION_CHECK_OPERATION.key, - defaultValue = true) + checkInterval = conf.getTimeAsMs(KYUUBI_SESSION_CHECK_INTERVAL.key) + sessionTimeout = conf.getTimeAsMs(KYUUBI_IDLE_SESSION_TIMEOUT.key) + checkOperation = conf.get(KYUUBI_IDLE_SESSION_CHECK_OPERATION.key).toBoolean } private[this] def initOperationLogRootDir(): Unit = { - operationLogRootDir = - new File(conf.get(KYUUBI_LOGGING_OPERATION_LOG_LOCATION.key, - s"${sys.env.getOrElse("SPARK_LOG_DIR", System.getProperty("java.io.tmpdir"))}" - + File.separator + "operation_logs")) + operationLogRootDir = new File(conf.get(KYUUBI_LOGGING_OPERATION_LOG_LOCATION.key)) isOperationLogEnabled = true if (operationLogRootDir.exists && !operationLogRootDir.isDirectory) { info("The operation log root directory exists, but it is not a directory: " @@ -176,8 +173,7 @@ private[kyuubi] class SessionManager private( private[this] def startSparkSessionCleaner(): Unit = { // at least 10 min val interval = math.max( - conf.getTimeAsMs(KYUUBI_SPARK_SESSION_CHECK_INTERVAL.key, "20min"), - 10 * 60 * 1000L) + conf.getTimeAsMs(KYUUBI_SPARK_SESSION_CHECK_INTERVAL.key), 10 * 60 * 1000L) val sessionCleaner = new Runnable { override def run(): Unit = { sleepInterval(interval) @@ -281,7 +277,7 @@ private[kyuubi] class SessionManager private( shutdown = true if (backgroundOperationPool != null) { backgroundOperationPool.shutdown() - val timeout = conf.getTimeAsSeconds(KYUUBI_ASYNC_EXEC_SHUTDOWN_TIMEOUT.key, "10s") + val timeout = conf.getTimeAsSeconds(KYUUBI_ASYNC_EXEC_SHUTDOWN_TIMEOUT.key) try { backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS) } catch {