[KYUUBI-116][Experimental] Support long cache spark session in kerberized cluster (#117)
* fixes @116 Support long caching SparkSession/SparkContext for secured hadoop cluster * handle sub classes of coarse grained scheduler backend * fix ut for spark 2.2 * updating doc
This commit is contained in:
parent
6352c6aeb3
commit
72e664fced
@ -30,71 +30,74 @@ $ bin/start-kyuubi.sh \
|
||||
|
||||
Name|Default|Description
|
||||
---|---|---
|
||||
spark.kyuubi.ha.enabled|false|Whether KyuubiServer supports dynamic service discovery for its clients. To support this, each instance of KyuubiServer currently uses ZooKeeper to register itself, when it is brought up. JDBC/ODBC clients should use the ZooKeeper ensemble: spark.kyuubi.ha.zk.quorum in their connection string.
|
||||
spark.kyuubi.ha.mode|load-balance|High availability mode, one is load-balance which is used by default, another is failover as master-slave mode.
|
||||
spark.kyuubi.ha.zk.quorum|none|Comma separated list of ZooKeeper servers to talk to, when KyuubiServer supports service discovery via Zookeeper.
|
||||
spark.kyuubi.ha.zk.namespace|kyuubiserver|The parent node in ZooKeeper used by KyuubiServer when supporting dynamic service discovery.
|
||||
spark.kyuubi.ha.zk.client.port|2181|The port of ZooKeeper servers to talk to. If the list of Zookeeper servers specified in spark.kyuubi.zookeeper.quorum does not contain port numbers, this value is used.
|
||||
spark.kyuubi.ha.zk.session.timeout|1,200,000|ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and as a result, all locks released, if a heartbeat is not sent in the timeout.
|
||||
spark.kyuubi.ha.zk.connection.basesleeptime|1,000|Initial amount of time (in milliseconds) to wait between retries when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy.
|
||||
spark.kyuubi.ha.zk.connection.max.retries|3|Max retry times for connecting to the zk server
|
||||
spark.kyuubi.<br />ha.enabled|false|Whether KyuubiServer supports dynamic service discovery for its clients. To support this, each instance of KyuubiServer currently uses ZooKeeper to register itself, when it is brought up. JDBC/ODBC clients should use the ZooKeeper ensemble: spark.kyuubi.ha.zk.quorum in their connection string.
|
||||
spark.kyuubi.<br />ha.mode|load-balance|High availability mode, one is load-balance which is used by default, another is failover as master-slave mode.
|
||||
spark.kyuubi.<br />ha.zk.quorum|none|Comma separated list of ZooKeeper servers to talk to, when KyuubiServer supports service discovery via Zookeeper.
|
||||
spark.kyuubi.<br />ha.zk.namespace|kyuubiserver|The parent node in ZooKeeper used by KyuubiServer when supporting dynamic service discovery.
|
||||
spark.kyuubi.<br />ha.zk.client.port|2181|The port of ZooKeeper servers to talk to. If the list of Zookeeper servers specified in spark.kyuubi.zookeeper.quorum does not contain port numbers, this value is used.
|
||||
spark.kyuubi.<br />ha.zk.session.timeout|1,200,000|ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and as a result, all locks released, if a heartbeat is not sent in the timeout.
|
||||
spark.kyuubi.<br />ha.zk.connection.basesleeptime|1,000|Initial amount of time (in milliseconds) to wait between retries when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy.
|
||||
spark.kyuubi.<br />ha.zk.connection.max.retries|3|Max retry times for connecting to the zk server
|
||||
|
||||
#### Operation Log
|
||||
|
||||
Name|Default|Description
|
||||
---|---|---
|
||||
spark.kyuubi.logging.operation.enabled|true|When true, Kyuubi Server will save operation logs and make them available for clients
|
||||
spark.kyuubi.logging.operation.log.dir|`KYUUBI_LOG_DIR` -> `java.io.tmpdir`/operation_logs|Top level directory where operation logs are stored if logging functionality is enabled
|
||||
spark.kyuubi.<br />logging.operation.enabled|true|When true, Kyuubi Server will save operation logs and make them available for clients
|
||||
spark.kyuubi.<br />logging.operation.log.dir|`KYUUBI_LOG_DIR` -> `java.io.tmpdir`/operation_logs|Top level directory where operation logs are stored if logging functionality is enabled
|
||||
|
||||
#### Frontend Service options
|
||||
|
||||
Name|Default|Description
|
||||
---|---|---
|
||||
spark.kyuubi.frontend.bind.host | localhost | Bind host on which to run the Kyuubi Frontend service.
|
||||
spark.kyuubi.frontend.bind.port| 10009 | Port number of Kyuubi Frontend service. set 0 will get a random available one
|
||||
spark.kyuubi.frontend.min.worker.threads| 50 | Minimum number of Thrift worker threads.
|
||||
spark.kyuubi.frontend.max.worker.threads| 500 | Maximum number of Thrift worker threads
|
||||
spark.kyuubi.frontend.worker.keepalive.time | 60s| Keepalive time (in seconds) for an idle worker thread. When the number of workers exceeds min workers, excessive threads are killed after this time interval.
|
||||
spark.kyuubi.authentication | NONE | Client authentication types. NONE: no authentication check; NOSASL: no authentication check KERBEROS: Kerberos/GSSAPI authentication.
|
||||
spark.kyuubi.frontend.allow.user.substitution | true | Allow alternate user to be specified as part of Kyuubi open connection request.
|
||||
spark.kyuubi.frontend.enable.doAs | true | Set true to have Kyuubi execute SQL operations as the user making the calls to it.
|
||||
spark.kyuubi.frontend.max.message.size | 104857600 | Maximum message size in bytes a Kyuubi server will accept.
|
||||
spark.kyuubi.<br />frontend.bind.host | localhost | Bind host on which to run the Kyuubi Frontend service.
|
||||
spark.kyuubi.<br />frontend.bind.port| 10009 | Port number of Kyuubi Frontend service. set 0 will get a random available one
|
||||
spark.kyuubi.<br />frontend.min.worker.threads| 50 | Minimum number of Thrift worker threads.
|
||||
spark.kyuubi.<br />frontend.max.worker.threads| 500 | Maximum number of Thrift worker threads
|
||||
spark.kyuubi.<br />frontend.worker.keepalive.time | 60s| Keepalive time (in seconds) for an idle worker thread. When the number of workers exceeds min workers, excessive threads are killed after this time interval.
|
||||
spark.kyuubi.<br />authentication | NONE | Client authentication types. NONE: no authentication check; NOSASL: no authentication check KERBEROS: Kerberos/GSSAPI authentication.
|
||||
spark.kyuubi.<br />frontend.allow.user.substitution | true | Allow alternate user to be specified as part of Kyuubi open connection request.
|
||||
spark.kyuubi.<br />frontend.enable.doAs | true | Set true to have Kyuubi execute SQL operations as the user making the calls to it.
|
||||
spark.kyuubi.<br />frontend.max.message.size | 104857600 | Maximum message size in bytes a Kyuubi server will accept.
|
||||
|
||||
#### Background Execution Thread Pool
|
||||
|
||||
Name|Default|Description
|
||||
---|---|---
|
||||
spark.kyuubi.async.exec.threads|100|Number of threads in the async thread pool for KyuubiServer.
|
||||
spark.kyuubi.async.exec.wait.queue.size|100|Size of the wait queue for async thread pool in KyuubiServer. After hitting this limit, the async thread pool will reject new requests.
|
||||
spark.kyuubi.async.exec.keep.alive.time|10,000|Time (in milliseconds) that an idle KyuubiServer async thread (from the thread pool) will wait for a new task to arrive before terminating.
|
||||
spark.kyuubi.async.exec.shutdown.timeout|10,000|How long KyuubiServer shutdown will wait for async threads to terminate.
|
||||
spark.kyuubi.<br />async.exec.threads|100|Number of threads in the async thread pool for KyuubiServer.
|
||||
spark.kyuubi.<br />async.exec.wait.queue.size|100|Size of the wait queue for async thread pool in KyuubiServer. After hitting this limit, the async thread pool will reject new requests.
|
||||
spark.kyuubi.<br />async.exec.keep.alive.time|10,000|Time (in milliseconds) that an idle KyuubiServer async thread (from the thread pool) will wait for a new task to arrive before terminating.
|
||||
spark.kyuubi.<br />async.exec.shutdown.timeout|10,000|How long KyuubiServer shutdown will wait for async threads to terminate.
|
||||
|
||||
#### Kyuubi Session
|
||||
|
||||
Name|Default|Description
|
||||
---|---|---
|
||||
spark.kyuubi.frontend.session.check.interval|6h|The check interval for frontend session/operation timeout, which can be disabled by setting to zero or negative value.
|
||||
spark.kyuubi.frontend.session.timeout|8h|The check interval for session/operation timeout, which can be disabled by setting to zero or negative value.
|
||||
spark.kyuubi.frontend.session.check.operation| true |Session will be considered to be idle only if there is no activity, and there is no pending operation. This setting takes effect only if session idle timeout `spark.kyuubi.frontend.session.timeout` and checking `spark.kyuubi.frontend.session.check.interval` are enabled.
|
||||
spark.kyuubi.<br />frontend.session.check.interval|6h|The check interval for frontend session/operation timeout, which can be disabled by setting to zero or negative value.
|
||||
spark.kyuubi.<br />frontend.session.timeout|8h|The check interval for session/operation timeout, which can be disabled by setting to zero or negative value.
|
||||
spark.kyuubi.<br />frontend.session.check.operation| true |Session will be considered to be idle only if there is no activity, and there is no pending operation. This setting takes effect only if session idle timeout `spark.kyuubi.frontend.session.timeout` and checking `spark.kyuubi.frontend.session.check.interval` are enabled.
|
||||
|
||||
#### Spark Session
|
||||
|
||||
Name|Default|Description
|
||||
---|---|---
|
||||
spark.kyuubi.backend.session.wait.other.times | 60 | How many times to check when another session with the same user is initializing SparkContext. Total Time will be times by `spark.kyuubi.backend.session.wait.other.interval`.
|
||||
spark.kyuubi.backend.session.wait.other.interval|1s|The interval for checking whether other thread with the same user has completed SparkContext instantiation.
|
||||
spark.kyuubi.backend.session.init.timeout|60s|How long we suggest the server to give up instantiating SparkContext.
|
||||
spark.kyuubi.backend.session.check.interval|5min|The check interval for backend session a.k.a SparkSession timeout.
|
||||
spark.kyuubi.backend.session.idle.timeout|30min|SparkSession timeout.
|
||||
spark.kyuubi.<br />backend.session.wait.other.times | 60 | How many times to check when another session with the same user is initializing SparkContext. Total Time will be times by `spark.kyuubi.backend.session.wait.other.interval`.
|
||||
spark.kyuubi.<br />backend.session.wait.other.interval|1s|The interval for checking whether other thread with the same user has completed SparkContext instantiation.
|
||||
spark.kyuubi.<br />backend.session.init.timeout|60s|How long we suggest the server to give up instantiating SparkContext.
|
||||
spark.kyuubi.<br />backend.session.check.interval|5min|The check interval for backend session a.k.a SparkSession timeout.
|
||||
spark.kyuubi.<br />backend.session.idle.timeout|30min|SparkSession timeout.
|
||||
spark.kyuubi.<br />backend.session.local.dir|KYUUBI_HOME/local|Default value to set `spark.local.dir`. For YARN mode, this only affect the Kyuubi server side settings according to the rule of Spark treating `spark.local.dir`.
|
||||
spark.kyuubi.<br />backend.session.long.cache|${UserGroupInformation.isSecurityEnabled}|Whether to update the tokens of Spark's executor to support long caching SparkSessions iff this is true && `spark.kyuubi.backend.token.update.class` is loadable. This is used towards kerberized hadoop clusters in case of `spark.kyuubi.backend.session.idle.timeout` is set longer than token expiration time limit or SparkSession never idles.
|
||||
spark.kyuubi.<br />backend.session.token.update.class|org.apache.spark.<br />scheduler.cluster.<br />CoarseGrainedClusterMessages$<br />UpdateDelegationTokens|`CoarseGrainedClusterMessages` for token update message from the driver of Spark to executors, it is loadable only by higher version Spark release(2.3 and later)
|
||||
|
||||
|
||||
#### Operation
|
||||
|
||||
Name|Default|Description
|
||||
---|---|---
|
||||
spark.kyuubi.operation.idle.timeout|6h|Operation will be closed when it's not accessed for this duration of time.
|
||||
spark.kyuubi.operation.incremental.collect|false|Whether to use incremental result collection from Spark executor side to Kyuubi server side.
|
||||
spark.kyuubi.operation.result.limit|-1|In non-incremental result collection mode, set this to a positive value to limit the size of result collected to driver side.
|
||||
spark.kyuubi.<br />operation.idle.timeout|6h|Operation will be closed when it's not accessed for this duration of time.
|
||||
spark.kyuubi.<br />operation.incremental.collect|false|Whether to use incremental result collection from Spark executor side to Kyuubi server side.
|
||||
spark.kyuubi.<br />operation.result.limit|-1|In non-incremental result collection mode, set this to a positive value to limit the size of result collected to driver side.
|
||||
|
||||
---
|
||||
|
||||
|
||||
@ -18,12 +18,13 @@
|
||||
package org.apache.spark
|
||||
|
||||
import java.io.File
|
||||
import java.util.HashMap
|
||||
import java.util.{HashMap => JMap}
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.language.implicitConversions
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.spark.internal.config.{ConfigBuilder, ConfigEntry}
|
||||
|
||||
/**
|
||||
@ -32,7 +33,7 @@ import org.apache.spark.internal.config.{ConfigBuilder, ConfigEntry}
|
||||
*/
|
||||
object KyuubiConf {
|
||||
|
||||
private[this] val kyuubiConfEntries = new HashMap[String, ConfigEntry[_]]()
|
||||
private val kyuubiConfEntries = new JMap[String, ConfigEntry[_]]()
|
||||
|
||||
def register(entry: ConfigEntry[_]): Unit = {
|
||||
require(!kyuubiConfEntries.containsKey(entry.key),
|
||||
@ -40,7 +41,7 @@ object KyuubiConf {
|
||||
kyuubiConfEntries.put(entry.key, entry)
|
||||
}
|
||||
|
||||
private[this] object KyuubiConfigBuilder {
|
||||
private object KyuubiConfigBuilder {
|
||||
def apply(key: String): ConfigBuilder = ConfigBuilder(key).onCreate(register)
|
||||
}
|
||||
|
||||
@ -265,7 +266,7 @@ object KyuubiConf {
|
||||
.timeConf(TimeUnit.MILLISECONDS)
|
||||
.createWithDefault(TimeUnit.SECONDS.toMillis(1L))
|
||||
|
||||
val BACKEND_SESSTION_INIT_TIMEOUT: ConfigEntry[Long] =
|
||||
val BACKEND_SESSION_INIT_TIMEOUT: ConfigEntry[Long] =
|
||||
KyuubiConfigBuilder("spark.kyuubi.backend.session.init.timeout")
|
||||
.doc("How long we suggest the server to give up instantiating SparkContext")
|
||||
.timeConf(TimeUnit.SECONDS)
|
||||
@ -285,12 +286,31 @@ object KyuubiConf {
|
||||
|
||||
val BACKEND_SESSION_LOCAL_DIR: ConfigEntry[String] =
|
||||
KyuubiConfigBuilder("spark.kyuubi.backend.session.local.dir")
|
||||
.doc("Default value to set spark.local.dir")
|
||||
.doc("Default value to set `spark.local.dir`, for YARN mode, this only affect the Kyuubi" +
|
||||
" server side settings according to the rule of Spark treating `spark.local.dir`")
|
||||
.stringConf
|
||||
.createWithDefault(
|
||||
s"${sys.env.getOrElse("KYUUBI_HOME", System.getProperty("java.io.tmpdir"))}"
|
||||
+ File.separator + "local")
|
||||
|
||||
val BACKEND_SESSION_LONG_CACHE: ConfigEntry[Boolean] =
|
||||
KyuubiConfigBuilder("spark.kyuubi.backend.session.long.cache")
|
||||
.doc("Whether to update the tokens of Spark's executor to support long caching" +
|
||||
" SparkSessions iff this is true && `spark.kyuubi.backend.token.update.class` is" +
|
||||
" loadable. This is used towards kerberized hadoop clusters in case of" +
|
||||
" `spark.kyuubi.backend.session.idle.timeout` is set longer than token expiration time" +
|
||||
" limit or SparkSession never idles.")
|
||||
.booleanConf
|
||||
.createWithDefault(UserGroupInformation.isSecurityEnabled)
|
||||
|
||||
val BACKEND_SESSION_TOKEN_UPDATE_CLASS: ConfigEntry[String] =
|
||||
KyuubiConfigBuilder("spark.kyuubi.backend.session.token.update.class")
|
||||
.doc("`CoarseGrainedClusterMessages` for token update message from the driver of Spark to" +
|
||||
" executors, it is loadable only by higher version Spark release(2.3 and later)")
|
||||
.stringConf
|
||||
.createWithDefault(
|
||||
"org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages$UpdateDelegationTokens")
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Authentication //
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@ -23,6 +23,7 @@ import java.net.URI
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.Map
|
||||
import scala.util.Try
|
||||
import scala.util.matching.Regex
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
@ -255,6 +256,11 @@ object KyuubiSparkUtil extends Logging {
|
||||
loader
|
||||
}
|
||||
|
||||
/** Determines whether the provided class is loadable in the current thread. */
|
||||
def classIsLoadable(clazz: String): Boolean = {
|
||||
Try { Class.forName(clazz, false, getContextOrSparkClassLoader) }.isSuccess
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate proper configurations before server starts
|
||||
* @param conf the default [[SparkConf]]
|
||||
|
||||
@ -0,0 +1,71 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.scheduler.cluster
|
||||
|
||||
import java.io.{ByteArrayOutputStream, DataOutputStream}
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.spark.KyuubiConf._
|
||||
import org.apache.spark.SparkContext
|
||||
|
||||
import yaooqinn.kyuubi.utils.ReflectUtils._
|
||||
|
||||
/**
|
||||
* Tool for methods used for Kyuubi to talking to Spark Executors
|
||||
*/
|
||||
object KyuubiSparkExecutorUtils {
|
||||
|
||||
/**
|
||||
* Populate the tokens contained in the current KyuubiSession's ugi to the all the alive
|
||||
* executors associated with its own SparkContext.
|
||||
*
|
||||
* @param sc The SparkContext with its runtime environment which contains all the executors,
|
||||
* associated with the current KyuubiSession and UserGroupInformation.
|
||||
* @param user the UserGroupInformation associated with the current KyuubiSession
|
||||
*/
|
||||
def populateTokens(sc: SparkContext, user: UserGroupInformation): Unit = {
|
||||
val schedulerBackend = sc.schedulerBackend
|
||||
schedulerBackend match {
|
||||
case backend: CoarseGrainedSchedulerBackend =>
|
||||
try {
|
||||
val byteStream = new ByteArrayOutputStream
|
||||
val dataStream = new DataOutputStream(byteStream)
|
||||
user.getCredentials.writeTokenStorageToStream(dataStream)
|
||||
val tokens = byteStream.toByteArray
|
||||
val executorField =
|
||||
classOf[CoarseGrainedSchedulerBackend].getName.replace('.', '$') + "$$executorDataMap"
|
||||
val executors = backend match {
|
||||
case _: YarnClientSchedulerBackend | _: YarnClusterSchedulerBackend |
|
||||
_: StandaloneSchedulerBackend =>
|
||||
getAncestorField(backend, 2, executorField)
|
||||
case _ => getFieldValue(backend, executorField)
|
||||
}
|
||||
val msg = newInstance(sc.conf.get(BACKEND_SESSION_TOKEN_UPDATE_CLASS),
|
||||
Seq(classOf[Array[Byte]]), Seq(tokens))
|
||||
executors.asInstanceOf[mutable.HashMap[String, ExecutorData]]
|
||||
.values.foreach(_.executorEndpoint.send(msg))
|
||||
} catch {
|
||||
case NonFatal(e) => warn(s"Failed to populate secured tokens to executors", e)
|
||||
}
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.session.OperationLog
|
||||
import org.apache.hive.service.cli.thrift.TProtocolVersion
|
||||
import org.apache.spark.KyuubiConf._
|
||||
import org.apache.spark.KyuubiSparkUtil
|
||||
import org.apache.spark.scheduler.cluster.KyuubiSparkExecutorUtils
|
||||
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils}
|
||||
import org.apache.spark.sql.catalyst.catalog.FunctionResource
|
||||
import org.apache.spark.sql.catalyst.parser.ParseException
|
||||
@ -54,7 +55,8 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
private val opHandle: OperationHandle =
|
||||
new OperationHandle(EXECUTE_STATEMENT, session.getProtocolVersion)
|
||||
|
||||
private val conf = session.sparkSession.conf
|
||||
private val sparkSession = session.sparkSession
|
||||
private val conf = sparkSession.conf
|
||||
|
||||
private val operationTimeout =
|
||||
KyuubiSparkUtil.timeStringAsMs(conf.get(OPERATION_IDLE_TIMEOUT))
|
||||
@ -213,7 +215,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
debug(s"CLOSING $statementId")
|
||||
cleanup(CLOSED)
|
||||
cleanupOperationLog()
|
||||
session.sparkSession.sparkContext.clearJobGroup()
|
||||
sparkSession.sparkContext.clearJobGroup()
|
||||
}
|
||||
|
||||
def cancel(): Unit = {
|
||||
@ -312,7 +314,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
val destFileName = src.getName
|
||||
val destFile =
|
||||
new File(session.getResourcesSessionDir, destFileName).getCanonicalPath
|
||||
val fs = src.getFileSystem(session.sparkSession.sparkContext.hadoopConfiguration)
|
||||
val fs = src.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
|
||||
fs.copyToLocalFile(src, new Path(destFile))
|
||||
FileUtil.chmod(destFile, "ugo+rx", true)
|
||||
AddJarCommand(destFile).run(session.sparkSession)
|
||||
@ -327,7 +329,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
info(s"Running query '$statement' with $statementId")
|
||||
setState(RUNNING)
|
||||
|
||||
val classLoader = SparkSQLUtils.getUserJarClassLoader(session.sparkSession)
|
||||
val classLoader = SparkSQLUtils.getUserJarClassLoader(sparkSession)
|
||||
Thread.currentThread().setContextClassLoader(classLoader)
|
||||
|
||||
KyuubiServerMonitor.getListener(session.getUserName).foreach {
|
||||
@ -338,10 +340,10 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
statementId,
|
||||
session.getUserName)
|
||||
}
|
||||
session.sparkSession.sparkContext.setJobGroup(statementId, statement)
|
||||
KyuubiSparkUtil.setActiveSparkContext(session.sparkSession.sparkContext)
|
||||
sparkSession.sparkContext.setJobGroup(statementId, statement)
|
||||
KyuubiSparkUtil.setActiveSparkContext(sparkSession.sparkContext)
|
||||
|
||||
val parsedPlan = SparkSQLUtils.parsePlan(session.sparkSession, statement)
|
||||
val parsedPlan = SparkSQLUtils.parsePlan(sparkSession, statement)
|
||||
parsedPlan match {
|
||||
case c if c.nodeName == "CreateFunctionCommand" =>
|
||||
val resources =
|
||||
@ -354,10 +356,15 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
localizeAndAndResource(path)
|
||||
case _ =>
|
||||
}
|
||||
result = SparkSQLUtils.toDataFrame(session.sparkSession, parsedPlan)
|
||||
result = SparkSQLUtils.toDataFrame(sparkSession, parsedPlan)
|
||||
KyuubiServerMonitor.getListener(session.getUserName).foreach {
|
||||
_.onStatementParsed(statementId, result.queryExecution.toString())
|
||||
}
|
||||
|
||||
if (conf.get(BACKEND_SESSION_LONG_CACHE).toBoolean &&
|
||||
KyuubiSparkUtil.classIsLoadable(conf.get(BACKEND_SESSION_TOKEN_UPDATE_CLASS))) {
|
||||
KyuubiSparkExecutorUtils.populateTokens(sparkSession.sparkContext, session.ugi)
|
||||
}
|
||||
debug(result.queryExecution.toString())
|
||||
iter = if (incrementalCollect) {
|
||||
info("Executing query in incremental collection mode")
|
||||
@ -402,7 +409,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
}
|
||||
} finally {
|
||||
if (statementId != null) {
|
||||
session.sparkSession.sparkContext.cancelJobGroup(statementId)
|
||||
sparkSession.sparkContext.cancelJobGroup(statementId)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -429,7 +436,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
backgroundHandle.cancel(true)
|
||||
}
|
||||
if (statementId != null) {
|
||||
session.sparkSession.sparkContext.cancelJobGroup(statementId)
|
||||
sparkSession.sparkContext.cancelJobGroup(statementId)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -158,7 +158,7 @@ class SparkSessionWithUGI(
|
||||
"kyuubi", userName, conf.get(FRONTEND_BIND_HOST), conf.get(FRONTEND_BIND_PORT)).mkString("|")
|
||||
conf.setAppName(appName)
|
||||
configureSparkConf(sessionConf)
|
||||
val totalWaitTime: Long = conf.getTimeAsSeconds(BACKEND_SESSTION_INIT_TIMEOUT)
|
||||
val totalWaitTime: Long = conf.getTimeAsSeconds(BACKEND_SESSION_INIT_TIMEOUT)
|
||||
try {
|
||||
KyuubiHadoopUtil.doAs(user) {
|
||||
newContext.start()
|
||||
|
||||
@ -18,7 +18,11 @@
|
||||
<appender-ref ref="console" />
|
||||
</logger>
|
||||
<logger name="yaooqinn.kyuubi.service" additivity="false">
|
||||
<level value="INFO" />
|
||||
<level value="ERROR" />
|
||||
<appender-ref ref="console" />
|
||||
</logger>
|
||||
<logger name="org.apache.spark" additivity="false">
|
||||
<level value="ERROR" />
|
||||
<appender-ref ref="console" />
|
||||
</logger>
|
||||
<logger name="org.apache.zookeeper" additivity="false">
|
||||
@ -30,7 +34,7 @@
|
||||
<appender-ref ref="console" />
|
||||
</logger>
|
||||
<root>
|
||||
<level value="OFF" />
|
||||
<level value="WARN" />
|
||||
<appender-ref ref="console" />
|
||||
</root>
|
||||
</log4j:configuration>
|
||||
|
||||
@ -0,0 +1,117 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.scheduler.cluster
|
||||
|
||||
import scala.collection.mutable
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkFunSuite}
|
||||
import org.apache.spark.KyuubiConf._
|
||||
import org.apache.spark.scheduler.TaskSchedulerImpl
|
||||
import org.apache.spark.scheduler.local.LocalSchedulerBackend
|
||||
import org.scalatest.BeforeAndAfterEach
|
||||
|
||||
import yaooqinn.kyuubi.utils.ReflectUtils
|
||||
|
||||
class KyuubiSparkExecutorUtilsSuite
|
||||
extends SparkFunSuite with BeforeAndAfterEach {
|
||||
import KyuubiSparkUtil._
|
||||
|
||||
val conf: SparkConf = new SparkConf(true)
|
||||
.setAppName(this.getClass.getSimpleName)
|
||||
.setMaster("local")
|
||||
KyuubiSparkUtil.setupCommonConfig(conf)
|
||||
|
||||
var sc: SparkContext = _
|
||||
|
||||
override def beforeEach(): Unit = {
|
||||
sc = new SparkContext(conf)
|
||||
super.beforeEach()
|
||||
}
|
||||
|
||||
override def afterEach(): Unit = {
|
||||
if (sc != null) {
|
||||
sc.stop()
|
||||
}
|
||||
super.afterEach()
|
||||
}
|
||||
|
||||
test("populate tokens for non CoarseGrainedSchedulerBackend") {
|
||||
val taskSchedulerImpl = new TaskSchedulerImpl(sc)
|
||||
val backend = new LocalSchedulerBackend(conf, taskSchedulerImpl, 1)
|
||||
ReflectUtils.setFieldValue(sc, "_schedulerBackend", backend)
|
||||
val user = UserGroupInformation.getCurrentUser
|
||||
KyuubiSparkExecutorUtils.populateTokens(sc, user)
|
||||
}
|
||||
|
||||
test("populate tokens for CoarseGrainedSchedulerBackend") {
|
||||
val taskSchedulerImpl = new TaskSchedulerImpl(sc)
|
||||
val backend = new CoarseGrainedSchedulerBackend(taskSchedulerImpl, sc.env.rpcEnv)
|
||||
ReflectUtils.setFieldValue(sc, "_schedulerBackend", backend)
|
||||
val user = UserGroupInformation.getCurrentUser
|
||||
KyuubiSparkExecutorUtils.populateTokens(sc, user)
|
||||
}
|
||||
|
||||
test("populate tokens for YarnClientSchedulerBackend") {
|
||||
val taskSchedulerImpl = new TaskSchedulerImpl(sc)
|
||||
val backend = new YarnClientSchedulerBackend(taskSchedulerImpl, sc)
|
||||
ReflectUtils.setFieldValue(sc, "_schedulerBackend", backend)
|
||||
val user = UserGroupInformation.getCurrentUser
|
||||
KyuubiSparkExecutorUtils.populateTokens(sc, user)
|
||||
}
|
||||
|
||||
test("populate tokens for YarnClusterSchedulerBackend") {
|
||||
val taskSchedulerImpl = new TaskSchedulerImpl(sc)
|
||||
val backend = new YarnClusterSchedulerBackend(taskSchedulerImpl, sc)
|
||||
ReflectUtils.setFieldValue(sc, "_schedulerBackend", backend)
|
||||
val user = UserGroupInformation.getCurrentUser
|
||||
KyuubiSparkExecutorUtils.populateTokens(sc, user)
|
||||
}
|
||||
|
||||
test("populate tokens for StandaloneSchedulerBackend") {
|
||||
val taskSchedulerImpl = new TaskSchedulerImpl(sc)
|
||||
val backend = new StandaloneSchedulerBackend(taskSchedulerImpl, sc, null)
|
||||
ReflectUtils.setFieldValue(sc, "_schedulerBackend", backend)
|
||||
val user = UserGroupInformation.getCurrentUser
|
||||
KyuubiSparkExecutorUtils.populateTokens(sc, user)
|
||||
}
|
||||
|
||||
test("get executor data map") {
|
||||
val taskSchedulerImpl = new TaskSchedulerImpl(sc)
|
||||
val backend = new CoarseGrainedSchedulerBackend(taskSchedulerImpl, sc.env.rpcEnv)
|
||||
val executorDataMap = ReflectUtils.getFieldValue(backend,
|
||||
"org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$$executorDataMap")
|
||||
assert(executorDataMap.asInstanceOf[mutable.HashMap[String, ExecutorData]].values.isEmpty)
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("create update token class via reflection") {
|
||||
val className = conf.get(BACKEND_SESSION_TOKEN_UPDATE_CLASS)
|
||||
assert(classIsLoadable(className) ===
|
||||
(majorVersion(SPARK_VERSION) == 2 && minorVersion(SPARK_VERSION) >= 3))
|
||||
|
||||
if (classIsLoadable(className)) {
|
||||
val tokens1 = Array(0.toByte)
|
||||
val tokens2 = Array(1, 2, 3, 4).map(_.toByte)
|
||||
val msg1 = ReflectUtils.newInstance(className, Seq(classOf[Array[Byte]]), Seq(tokens1))
|
||||
assert(ReflectUtils.getFieldValue(msg1, "tokens") === tokens1)
|
||||
val msg2 = ReflectUtils.newInstance(className, Seq(classOf[Array[Byte]]), Seq(tokens2))
|
||||
assert(ReflectUtils.getFieldValue(msg2, "tokens") === tokens2)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -63,7 +63,7 @@ class SparkSessionWithUGISuite extends SparkFunSuite {
|
||||
test("test init failed with sc init failing") {
|
||||
assert(!spark.sparkContext.isStopped)
|
||||
val confClone = conf.clone().remove(KyuubiSparkUtil.MULTIPLE_CONTEXTS)
|
||||
.set(KyuubiConf.BACKEND_SESSTION_INIT_TIMEOUT.key, "3")
|
||||
.set(KyuubiConf.BACKEND_SESSION_INIT_TIMEOUT.key, "3")
|
||||
val userName1 = "test1"
|
||||
val ru = UserGroupInformation.createRemoteUser(userName1)
|
||||
val sparkSessionWithUGI = new SparkSessionWithUGI(ru, confClone, cache)
|
||||
@ -146,7 +146,7 @@ class SparkSessionWithUGISuite extends SparkFunSuite {
|
||||
test("test init failed with time out exception") {
|
||||
// point to an non-exist cluster manager
|
||||
val confClone = conf.clone().setMaster("spark://localhost:7077")
|
||||
.set(KyuubiConf.BACKEND_SESSTION_INIT_TIMEOUT.key, "3")
|
||||
.set(KyuubiConf.BACKEND_SESSION_INIT_TIMEOUT.key, "3")
|
||||
val userName1 = "test"
|
||||
val ru = UserGroupInformation.createRemoteUser(userName1)
|
||||
val sparkSessionWithUGI = new SparkSessionWithUGI(ru, confClone, cache)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user