regist kyuubi conf to spark conf

This commit is contained in:
Kent Yao 2018-01-11 16:41:40 +08:00
parent 5b75e65bc4
commit 3b9ce670b9
5 changed files with 59 additions and 64 deletions

View File

@ -18,28 +18,30 @@
package org.apache.spark
import java.io.File
import java.util.HashMap
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import org.apache.spark.internal.config.{ConfigBuilder, ConfigEntry}
object KyuubiConf {
private val kyuubiConfEntries = java.util.Collections.synchronizedMap(
new java.util.HashMap[String, ConfigEntry[_]]())
private[this] val kyuubiConfEntries = new HashMap[String, ConfigEntry[_]]()
def register(entry: ConfigEntry[_]): Unit = kyuubiConfEntries.synchronized {
def register(entry: ConfigEntry[_]): Unit = {
require(!kyuubiConfEntries.containsKey(entry.key),
s"Duplicate SQLConfigEntry. ${entry.key} has been registered")
kyuubiConfEntries.put(entry.key, entry)
}
object KyuubiConfigBuilder {
private[this] object KyuubiConfigBuilder {
def apply(key: String): ConfigBuilder = ConfigBuilder(key).onCreate(register)
}
//////////////////////////////////////////////////////
// ha //
//////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////
// High Availability by ZooKeeper //
/////////////////////////////////////////////////////////////////////////////////////////////////
val SUPPORT_DYNAMIC_SERVICE_DISCOVERY =
KyuubiConfigBuilder("spark.kyuubi.support.dynamic.service.discovery")
@ -85,14 +87,14 @@ object KyuubiConf {
.createWithDefault(TimeUnit.SECONDS.toMillis(1L))
val KYUUBI_ZOOKEEPER_CONNECTION_MAX_RETRIES =
KyuubiConfigBuilder("")
.doc("")
KyuubiConfigBuilder("spark.kyuubi.zookeeper.connection.max.retries")
.doc("max retry time connecting to the zk server")
.intConf
.createWithDefault(3)
//////////////////////////////////////////////////////
// log //
//////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////
// Operation Log //
/////////////////////////////////////////////////////////////////////////////////////////////////
val KYUUBI_LOGGING_OPERATION_ENABLED =
KyuubiConfigBuilder("spark.kyuubi.logging.operation.enabled")
@ -109,9 +111,9 @@ object KyuubiConf {
s"${sys.env.getOrElse("SPARK_LOG_DIR", System.getProperty("java.io.tmpdir"))}"
+ File.separator + "operation_logs")
//////////////////////////////////////////////////////
// background exec thread //
//////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////
// Background Execution Thread Pool //
/////////////////////////////////////////////////////////////////////////////////////////////////
val KYUUBI_ASYNC_EXEC_THREADS =
KyuubiConfigBuilder("spark.kyuubi.async.exec.threads")
@ -139,9 +141,9 @@ object KyuubiConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.SECONDS.toMillis(10L))
//////////////////////////////////////////////////////
// checking idle session //
//////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////
// Session Idle Check //
/////////////////////////////////////////////////////////////////////////////////////////////////
val KYUUBI_SESSION_CHECK_INTERVAL =
KyuubiConfigBuilder("spark.kyuubi.session.check.interval")
@ -173,20 +175,13 @@ object KyuubiConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.MINUTES.toMillis(20L))
//////////////////////////////////////////////////////
// security //
//////////////////////////////////////////////////////
val KYUUBI_ENABLE_DOAS =
KyuubiConfigBuilder("spark.kyuubi.enable.doAs")
.doc("enable proxying of running Spark")
.booleanConf
.createWithDefault(true)
val KYUUBI_AUTHENTICATION =
KyuubiConfigBuilder("spark.kyuubi.authentication")
.doc("Client authentication types. NONE: no authentication check;" +
" KERBEROS: Kerberos/GSSAPI authentication")
.stringConf
.createWithDefault("NONE")
/**
* Return all the configuration definitions that have been defined in [[KyuubiConf]]. Each
* definition contains key, defaultValue.
*/
def getAllDefaults: Map[String, String] = {
kyuubiConfEntries.entrySet().asScala.map {kv =>
(kv.getKey, kv.getValue.defaultValueString)
}.toMap
}
}

View File

@ -50,22 +50,22 @@ object HighAvailabilityUtils extends Logging {
private[this] var deregisteredWithZooKeeper = false
def isSupportDynamicServiceDiscovery(conf: SparkConf): Boolean = {
conf.getBoolean(SUPPORT_DYNAMIC_SERVICE_DISCOVERY.key, defaultValue = false) &&
conf.get(KYUUBI_ZOOKEEPER_QUORUM.key, "").split(",").nonEmpty
conf.get(SUPPORT_DYNAMIC_SERVICE_DISCOVERY.key).toBoolean &&
conf.get(KYUUBI_ZOOKEEPER_QUORUM.key).split(",").nonEmpty
}
@throws[Exception]
def addServerInstanceToZooKeeper(kyuubiServer: KyuubiServer): Unit = {
val conf = kyuubiServer.getConf
def addServerInstanceToZooKeeper(server: KyuubiServer): Unit = {
val conf = server.getConf
val zooKeeperEnsemble = getQuorumServers(conf)
val rootNamespace = conf.get(KYUUBI_ZOOKEEPER_NAMESPACE.key, "kyuubiserver")
val instanceURI = getServerInstanceURI(kyuubiServer.feService)
val rootNamespace = conf.get(KYUUBI_ZOOKEEPER_NAMESPACE.key)
val instanceURI = getServerInstanceURI(server.feService)
setUpZooKeeperAuth(conf)
val sessionTimeout = conf.getInt(KYUUBI_ZOOKEEPER_SESSION_TIMEOUT.key, 1200000)
val baseSleepTime = conf.getInt(KYUUBI_ZOOKEEPER_CONNECTION_BASESLEEPTIME.key, 1000)
val maxRetries = conf.getInt(KYUUBI_ZOOKEEPER_CONNECTION_MAX_RETRIES.key, 3)
val sessionTimeout = conf.getTimeAsMs(KYUUBI_ZOOKEEPER_SESSION_TIMEOUT.key).toInt
val baseSleepTime = conf.getTimeAsMs(KYUUBI_ZOOKEEPER_CONNECTION_BASESLEEPTIME.key).toInt
val maxRetries = conf.get(KYUUBI_ZOOKEEPER_CONNECTION_MAX_RETRIES.key).toInt
// Create a CuratorFramework instance to be used as the ZooKeeper client
// Use the zooKeeperAclProvider to create appropriate ACLs
zooKeeperClient =
@ -111,7 +111,7 @@ object HighAvailabilityUtils extends Logging {
setDeregisteredWithZooKeeper(false)
znodePath = znode.getActualPath
// Set a watch on the znode
if (zooKeeperClient.checkExists.usingWatcher(new DeRegisterWatcher(kyuubiServer))
if (zooKeeperClient.checkExists.usingWatcher(new DeRegisterWatcher(server))
.forPath(znodePath) == null) {
// No node exists, throw exception
throw new Exception("Unable to create znode for this KyuubiServer instance on ZooKeeper.")

View File

@ -44,7 +44,7 @@ private[kyuubi] class OperationManager private(name: String)
val userToOperationLog = new ConcurrentHashMap[String, OperationLog]()
override def init(conf: SparkConf): Unit = synchronized {
if (conf.getBoolean(KYUUBI_LOGGING_OPERATION_ENABLED.key, defaultValue = true)) {
if (conf.get(KYUUBI_LOGGING_OPERATION_ENABLED.key).toBoolean) {
initOperationLogCapture()
} else {
debug("Operation level logging is turned off")

View File

@ -21,7 +21,7 @@ import java.io.File
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.hadoop.hive.cli.OptionsProcessor
import org.apache.spark.{SparkConf, SparkUtils}
import org.apache.spark.{KyuubiConf, SparkConf, SparkUtils}
import yaooqinn.kyuubi.Logging
import yaooqinn.kyuubi.ha.HighAvailabilityUtils
@ -104,6 +104,8 @@ object KyuubiServer extends Logging {
error("SET spark.driver.userClassPathFirst to true")
System.exit(-1)
}
// overwrite later for each SparkC
conf.set("spark.app.name", classOf[KyuubiServer].getSimpleName)
// avoid max port retries reached
conf.set("spark.ui.port", "0")
conf.set("spark.driver.allowMultipleContexts", "true")
@ -119,5 +121,7 @@ object KyuubiServer extends Logging {
// but for the later [[SparkContext]] must be set to client mode
conf.set("spark.submit.deployMode", "client")
// Set missing Kyuubi configs to SparkConf
KyuubiConf.getAllDefaults.foreach(kv => conf.setIfMissing(kv._1, kv._2))
}
}

View File

@ -35,6 +35,7 @@ import org.apache.spark.sql.SparkSession
import yaooqinn.kyuubi.Logging
import yaooqinn.kyuubi.operation.OperationManager
import yaooqinn.kyuubi.server.KyuubiServer
import yaooqinn.kyuubi.service.CompositeService
import yaooqinn.kyuubi.ui.KyuubiServerMonitor
@ -58,12 +59,12 @@ private[kyuubi] class SessionManager private(
private[this] var shutdown: Boolean = false
def this() = this(getClass.getSimpleName)
def this() = this(classOf[SessionManager].getSimpleName)
override def init(conf: SparkConf): Unit = synchronized {
this.conf = conf
// Create operation log root directory, if operation logging is enabled
if (conf.getBoolean(KYUUBI_LOGGING_OPERATION_ENABLED.key, defaultValue = true)) {
if (conf.get(KYUUBI_LOGGING_OPERATION_ENABLED.key).toBoolean) {
initOperationLogRootDir()
}
createBackgroundOperationPool()
@ -72,33 +73,29 @@ private[kyuubi] class SessionManager private(
}
private[this] def createBackgroundOperationPool(): Unit = {
val poolSize = conf.getInt(KYUUBI_ASYNC_EXEC_THREADS.key, 100)
val poolSize = conf.get(KYUUBI_ASYNC_EXEC_THREADS.key).toInt
info("Background operation thread pool size: " + poolSize)
val poolQueueSize = conf.getInt(KYUUBI_ASYNC_EXEC_WAIT_QUEUE_SIZE.key, 100)
val poolQueueSize = conf.get(KYUUBI_ASYNC_EXEC_WAIT_QUEUE_SIZE.key).toInt
info("Background operation thread wait queue size: " + poolQueueSize)
val keepAliveTime = conf.getTimeAsMs(KYUUBI_EXEC_KEEPALIVE_TIME.key, "10s")
val keepAliveTime = conf.getTimeAsSeconds(KYUUBI_EXEC_KEEPALIVE_TIME.key)
info("Background operation thread keepalive time: " + keepAliveTime + " seconds")
val threadPoolName = "SparkThriftServer-Background-Pool"
val threadPoolName = classOf[KyuubiServer].getSimpleName + "-Background-Pool"
backgroundOperationPool =
new ThreadPoolExecutor(
poolSize,
poolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
TimeUnit.SECONDS,
new LinkedBlockingQueue[Runnable](poolQueueSize),
new ThreadFactoryWithGarbageCleanup(threadPoolName))
backgroundOperationPool.allowCoreThreadTimeOut(true)
checkInterval = conf.getTimeAsMs(KYUUBI_SESSION_CHECK_INTERVAL.key, "6h")
sessionTimeout = conf.getTimeAsMs(KYUUBI_IDLE_SESSION_TIMEOUT.key, "8h")
checkOperation = conf.getBoolean(KYUUBI_IDLE_SESSION_CHECK_OPERATION.key,
defaultValue = true)
checkInterval = conf.getTimeAsMs(KYUUBI_SESSION_CHECK_INTERVAL.key)
sessionTimeout = conf.getTimeAsMs(KYUUBI_IDLE_SESSION_TIMEOUT.key)
checkOperation = conf.get(KYUUBI_IDLE_SESSION_CHECK_OPERATION.key).toBoolean
}
private[this] def initOperationLogRootDir(): Unit = {
operationLogRootDir =
new File(conf.get(KYUUBI_LOGGING_OPERATION_LOG_LOCATION.key,
s"${sys.env.getOrElse("SPARK_LOG_DIR", System.getProperty("java.io.tmpdir"))}"
+ File.separator + "operation_logs"))
operationLogRootDir = new File(conf.get(KYUUBI_LOGGING_OPERATION_LOG_LOCATION.key))
isOperationLogEnabled = true
if (operationLogRootDir.exists && !operationLogRootDir.isDirectory) {
info("The operation log root directory exists, but it is not a directory: "
@ -176,8 +173,7 @@ private[kyuubi] class SessionManager private(
private[this] def startSparkSessionCleaner(): Unit = {
// at least 10 min
val interval = math.max(
conf.getTimeAsMs(KYUUBI_SPARK_SESSION_CHECK_INTERVAL.key, "20min"),
10 * 60 * 1000L)
conf.getTimeAsMs(KYUUBI_SPARK_SESSION_CHECK_INTERVAL.key), 10 * 60 * 1000L)
val sessionCleaner = new Runnable {
override def run(): Unit = {
sleepInterval(interval)
@ -281,7 +277,7 @@ private[kyuubi] class SessionManager private(
shutdown = true
if (backgroundOperationPool != null) {
backgroundOperationPool.shutdown()
val timeout = conf.getTimeAsSeconds(KYUUBI_ASYNC_EXEC_SHUTDOWN_TIMEOUT.key, "10s")
val timeout = conf.getTimeAsSeconds(KYUUBI_ASYNC_EXEC_SHUTDOWN_TIMEOUT.key)
try {
backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS)
} catch {