diff --git a/.travis.yml b/.travis.yml
index 329618c20..116fda877 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -20,6 +20,9 @@ install:
script:
- ./build/mvn package -q -Dmaven.javadoc.skip=true -B -V
+ - ./build/mvn package -Pspark-2.2 -q -Dmaven.javadoc.skip=true -B -V
+ - ./build/mvn package -Pspark-2.3 -q -Dmaven.javadoc.skip=true -B -V
+
language: java
after_success:
diff --git a/pom.xml b/pom.xml
index 1cc64e16a..52f4185f1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,6 @@
3.3.9org.apache.spark2.1.2
- 2.1provided2.7.3provided
@@ -327,7 +326,6 @@
-
org.apache.maven.pluginsmaven-antrun-plugin
@@ -448,18 +446,6 @@
-
- add-overwrite-spark-source
- generate-sources
-
- add-source
-
-
-
- src/extra/v${spark.branch}/scala
-
-
-
@@ -504,6 +490,7 @@
**/org/apache/hive/service/cli/thrift/*.class**/org/apache/spark/launcher/*.class**/org/apache/spark/deploy/*.class
+ **/org/apache/spark/SparkEnv.class
@@ -527,19 +514,10 @@
-
- spark-2.0
-
- 2.0.2
- 2.0
-
-
-
spark-2.12.1.2
- 2.1
@@ -547,7 +525,6 @@
spark-2.22.2.1
- 2.2
@@ -555,7 +532,6 @@
spark-2.32.3.0
- 2.33.0.3
diff --git a/src/extra/v2.1/scala/org/apache/spark/SparkEnv.scala b/src/extra/v2.1/scala/org/apache/spark/SparkEnv.scala
deleted file mode 100644
index b4cfe4448..000000000
--- a/src/extra/v2.1/scala/org/apache/spark/SparkEnv.scala
+++ /dev/null
@@ -1,463 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io.File
-import java.net.Socket
-import java.util.concurrent.ConcurrentHashMap
-
-import scala.collection.mutable
-import scala.util.Properties
-
-import com.google.common.collect.MapMaker
-import org.apache.hadoop.security.UserGroupInformation
-
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.api.python.PythonWorkerFactory
-import org.apache.spark.broadcast.BroadcastManager
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
-import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
-import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.network.netty.NettyBlockTransferService
-import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
-import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator}
-import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint
-import org.apache.spark.security.CryptoStreamUtils
-import org.apache.spark.serializer.{JavaSerializer, Serializer, SerializerManager}
-import org.apache.spark.shuffle.ShuffleManager
-import org.apache.spark.storage._
-import org.apache.spark.util.{RpcUtils, Utils}
-
-/**
- * :: DeveloperApi ::
- * Holds all the runtime environment objects for a running Spark instance (either master or worker),
- * including the serializer, RpcEnv, block manager, map output tracker, etc. Currently
- * Spark code finds the SparkEnv through a global variable, so all the threads can access the same
- * SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
- *
- * NOTE: This is not intended for external use. This is exposed for Shark and may be made private
- * in a future release.
- */
-@DeveloperApi
-class SparkEnv (
- val executorId: String,
- private[spark] val rpcEnv: RpcEnv,
- val serializer: Serializer,
- val closureSerializer: Serializer,
- val serializerManager: SerializerManager,
- val mapOutputTracker: MapOutputTracker,
- val shuffleManager: ShuffleManager,
- val broadcastManager: BroadcastManager,
- val blockManager: BlockManager,
- val securityManager: SecurityManager,
- val metricsSystem: MetricsSystem,
- val memoryManager: MemoryManager,
- val outputCommitCoordinator: OutputCommitCoordinator,
- val conf: SparkConf) extends Logging {
-
- private[spark] var isStopped = false
- private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
-
- // A general, soft-reference map for metadata needed during HadoopRDD split computation
- // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
- private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
-
- private[spark] var driverTmpDir: Option[String] = None
-
- private[spark] def stop() {
-
- if (!isStopped) {
- isStopped = true
- pythonWorkers.values.foreach(_.stop())
- mapOutputTracker.stop()
- shuffleManager.stop()
- broadcastManager.stop()
- blockManager.stop()
- blockManager.master.stop()
- metricsSystem.stop()
- outputCommitCoordinator.stop()
- rpcEnv.shutdown()
- rpcEnv.awaitTermination()
-
- // If we only stop sc, but the driver process still run as a services then we need to delete
- // the tmp dir, if not, it will create too many tmp dirs.
- // We only need to delete the tmp dir create by driver
- driverTmpDir match {
- case Some(path) =>
- try {
- Utils.deleteRecursively(new File(path))
- } catch {
- case e: Exception =>
- logWarning(s"Exception while deleting Spark temp dir: $path", e)
- }
- case None => // We just need to delete tmp dir created by driver, so do nothing on executor
- }
- }
- }
-
- private[spark]
- def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
- synchronized {
- val key = (pythonExec, envVars)
- pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
- }
- }
-
- private[spark]
- def destroyPythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) {
- synchronized {
- val key = (pythonExec, envVars)
- pythonWorkers.get(key).foreach(_.stopWorker(worker))
- }
- }
-
- private[spark]
- def releasePythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) {
- synchronized {
- val key = (pythonExec, envVars)
- pythonWorkers.get(key).foreach(_.releaseWorker(worker))
- }
- }
-}
-
-object SparkEnv extends Logging {
- logInfo("Loaded Kyuubi Supplied SparkEnv Class...")
- private val env = new ConcurrentHashMap[String, SparkEnv]()
-
- private[spark] val driverSystemName = "sparkDriver"
- private[spark] val executorSystemName = "sparkExecutor"
-
- private[this] def user = UserGroupInformation.getCurrentUser.getShortUserName
-
- def set(e: SparkEnv) {
- if (e == null) {
- logDebug(s"Kyuubi: Removing SparkEnv for $user")
- env.remove(user)
- } else {
- logDebug(s"Kyuubi: Registering SparkEnv for $user")
- env.put(user, e)
- }
- }
-
- /**
- * Returns the SparkEnv.
- */
- def get: SparkEnv = {
- logDebug(s"Kyuubi: Get SparkEnv for $user")
- env.get(user)
- }
-
- /**
- * Create a SparkEnv for the driver.
- */
- private[spark] def createDriverEnv(
- conf: SparkConf,
- isLocal: Boolean,
- listenerBus: LiveListenerBus,
- numCores: Int,
- mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
- assert(conf.contains(DRIVER_HOST_ADDRESS),
- s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
- assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
- val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
- val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
- val port = conf.get("spark.driver.port").toInt
- val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
- Some(CryptoStreamUtils.createKey(conf))
- } else {
- None
- }
- create(
- conf,
- SparkContext.DRIVER_IDENTIFIER,
- bindAddress,
- advertiseAddress,
- port,
- isLocal,
- numCores,
- ioEncryptionKey,
- listenerBus = listenerBus,
- mockOutputCommitCoordinator = mockOutputCommitCoordinator
- )
- }
-
- /**
- * Create a SparkEnv for an executor.
- * In coarse-grained mode, the executor provides an RpcEnv that is already instantiated.
- */
- private[spark] def createExecutorEnv(
- conf: SparkConf,
- executorId: String,
- hostname: String,
- port: Int,
- numCores: Int,
- ioEncryptionKey: Option[Array[Byte]],
- isLocal: Boolean): SparkEnv = {
- val env = create(
- conf,
- executorId,
- hostname,
- hostname,
- port,
- isLocal,
- numCores,
- ioEncryptionKey
- )
- SparkEnv.set(env)
- env
- }
-
- /**
- * Helper method to create a SparkEnv for a driver or an executor.
- */
- private def create(
- conf: SparkConf,
- executorId: String,
- bindAddress: String,
- advertiseAddress: String,
- port: Int,
- isLocal: Boolean,
- numUsableCores: Int,
- ioEncryptionKey: Option[Array[Byte]],
- listenerBus: LiveListenerBus = null,
- mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
-
- val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
-
- // Listener bus is only used on the driver
- if (isDriver) {
- assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
- }
-
- val securityManager = new SecurityManager(conf, ioEncryptionKey)
- ioEncryptionKey.foreach { _ =>
- if (!securityManager.isSaslEncryptionEnabled()) {
- logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
- "wire.")
- }
- }
-
- val systemName = if (isDriver) driverSystemName else executorSystemName
- val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
- securityManager, clientMode = !isDriver)
-
- // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
- // In the non-driver case, the RPC env's address may be null since it may not be listening
- // for incoming connections.
- if (isDriver) {
- conf.set("spark.driver.port", rpcEnv.address.port.toString)
- } else if (rpcEnv.address != null) {
- conf.set("spark.executor.port", rpcEnv.address.port.toString)
- logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}")
- }
-
- // Create an instance of the class with the given name, possibly initializing it with our conf
- def instantiateClass[T](className: String): T = {
- val cls = Utils.classForName(className)
- // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
- // SparkConf, then one taking no arguments
- try {
- cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
- .newInstance(conf, new java.lang.Boolean(isDriver))
- .asInstanceOf[T]
- } catch {
- case _: NoSuchMethodException =>
- try {
- cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
- } catch {
- case _: NoSuchMethodException =>
- cls.getConstructor().newInstance().asInstanceOf[T]
- }
- }
- }
-
- // Create an instance of the class named by the given SparkConf property, or defaultClassName
- // if the property is not set, possibly initializing it with our conf
- def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
- instantiateClass[T](conf.get(propertyName, defaultClassName))
- }
-
- val serializer = instantiateClassFromConf[Serializer](
- "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
- logDebug(s"Using serializer: ${serializer.getClass}")
-
- val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
-
- val closureSerializer = new JavaSerializer(conf)
-
- def registerOrLookupEndpoint(
- name: String, endpointCreator: => RpcEndpoint):
- RpcEndpointRef = {
- if (isDriver) {
- logInfo("Registering " + name)
- rpcEnv.setupEndpoint(name, endpointCreator)
- } else {
- RpcUtils.makeDriverRef(name, conf, rpcEnv)
- }
- }
-
- val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
-
- val mapOutputTracker = if (isDriver) {
- new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
- } else {
- new MapOutputTrackerWorker(conf)
- }
-
- // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
- // requires the MapOutputTracker itself
- mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
- new MapOutputTrackerMasterEndpoint(
- rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
-
- // Let the user specify short names for shuffle managers
- val shortShuffleMgrNames = Map(
- "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
- "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
- val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
- val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
- val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
-
- val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
- val memoryManager: MemoryManager =
- if (useLegacyMemoryManager) {
- new StaticMemoryManager(conf, numUsableCores)
- } else {
- UnifiedMemoryManager(conf, numUsableCores)
- }
-
- val blockManagerPort = if (isDriver) {
- conf.get(DRIVER_BLOCK_MANAGER_PORT)
- } else {
- conf.get(BLOCK_MANAGER_PORT)
- }
-
- val blockTransferService =
- new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
- blockManagerPort, numUsableCores)
-
- val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
- BlockManagerMaster.DRIVER_ENDPOINT_NAME,
- new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
- conf, isDriver)
-
- // NB: blockManager is not valid until initialize() is called later.
- val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
- serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
- blockTransferService, securityManager, numUsableCores)
-
- val metricsSystem = if (isDriver) {
- // Don't start metrics system right now for Driver.
- // We need to wait for the task scheduler to give us an app ID.
- // Then we can start the metrics system.
- MetricsSystem.createMetricsSystem("driver", conf, securityManager)
- } else {
- // We need to set the executor ID before the MetricsSystem is created because sources and
- // sinks specified in the metrics configuration file will want to incorporate this executor's
- // ID into the metrics they report.
- conf.set("spark.executor.id", executorId)
- val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
- ms.start()
- ms
- }
-
- val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
- new OutputCommitCoordinator(conf, isDriver)
- }
- val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
- new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
- outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
-
- val envInstance = new SparkEnv(
- executorId,
- rpcEnv,
- serializer,
- closureSerializer,
- serializerManager,
- mapOutputTracker,
- shuffleManager,
- broadcastManager,
- blockManager,
- securityManager,
- metricsSystem,
- memoryManager,
- outputCommitCoordinator,
- conf)
-
- // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
- // called, and we only need to do it for driver. Because driver may run as a service, and if we
- // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
- if (isDriver) {
- val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
- envInstance.driverTmpDir = Some(sparkFilesDir)
- }
-
- envInstance
- }
-
- /**
- * Return a map representation of jvm information, Spark properties, system properties, and
- * class paths. Map keys define the category, and map values represent the corresponding
- * attributes as a sequence of KV pairs. This is used mainly for SparkListenerEnvironmentUpdate.
- */
- private[spark]
- def environmentDetails(
- conf: SparkConf,
- schedulingMode: String,
- addedJars: Seq[String],
- addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = {
-
- import Properties._
- val jvmInformation = Seq(
- ("Java Version", s"$javaVersion ($javaVendor)"),
- ("Java Home", javaHome),
- ("Scala Version", versionString)
- ).sorted
-
- // Spark properties
- // This includes the scheduling mode whether or not it is configured (used by SparkUI)
- val schedulerMode =
- if (!conf.contains("spark.scheduler.mode")) {
- Seq(("spark.scheduler.mode", schedulingMode))
- } else {
- Seq[(String, String)]()
- }
- val sparkProperties = (conf.getAll ++ schedulerMode).sorted
-
- // System properties that are not java classpaths
- val systemProperties = Utils.getSystemProperties.toSeq
- val otherProperties = systemProperties.filter { case (k, _) =>
- k != "java.class.path" && !k.startsWith("spark.")
- }.sorted
-
- // Class paths including all added jars and files
- val classPathEntries = javaClassPath
- .split(File.pathSeparator)
- .filterNot(_.isEmpty)
- .map((_, "System Classpath"))
- val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
- val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted
-
- Map[String, Seq[(String, String)]](
- "JVM Information" -> jvmInformation,
- "Spark Properties" -> sparkProperties,
- "System Properties" -> otherProperties,
- "Classpath Entries" -> classPaths)
- }
-}
diff --git a/src/extra/v2.2/scala/org/apache/spark/SparkEnv.scala b/src/extra/v2.2/scala/org/apache/spark/SparkEnv.scala
deleted file mode 100644
index 38f91660c..000000000
--- a/src/extra/v2.2/scala/org/apache/spark/SparkEnv.scala
+++ /dev/null
@@ -1,465 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io.File
-import java.net.Socket
-import java.util.concurrent.ConcurrentHashMap
-import java.util.Locale
-
-import scala.collection.mutable
-import scala.util.Properties
-
-import com.google.common.collect.MapMaker
-import org.apache.hadoop.security.UserGroupInformation
-
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.api.python.PythonWorkerFactory
-import org.apache.spark.broadcast.BroadcastManager
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
-import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
-import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.network.netty.NettyBlockTransferService
-import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
-import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator}
-import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint
-import org.apache.spark.security.CryptoStreamUtils
-import org.apache.spark.serializer.{JavaSerializer, Serializer, SerializerManager}
-import org.apache.spark.shuffle.ShuffleManager
-import org.apache.spark.storage._
-import org.apache.spark.util.{RpcUtils, Utils}
-
-/**
- * :: DeveloperApi ::
- * Holds all the runtime environment objects for a running Spark instance (either master or worker),
- * including the serializer, RpcEnv, block manager, map output tracker, etc. Currently
- * Spark code finds the SparkEnv through a global variable, so all the threads can access the same
- * SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
- *
- * NOTE: This is not intended for external use. This is exposed for Shark and may be made private
- * in a future release.
- */
-@DeveloperApi
-class SparkEnv (
- val executorId: String,
- private[spark] val rpcEnv: RpcEnv,
- val serializer: Serializer,
- val closureSerializer: Serializer,
- val serializerManager: SerializerManager,
- val mapOutputTracker: MapOutputTracker,
- val shuffleManager: ShuffleManager,
- val broadcastManager: BroadcastManager,
- val blockManager: BlockManager,
- val securityManager: SecurityManager,
- val metricsSystem: MetricsSystem,
- val memoryManager: MemoryManager,
- val outputCommitCoordinator: OutputCommitCoordinator,
- val conf: SparkConf) extends Logging {
-
- private[spark] var isStopped = false
- private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
-
- // A general, soft-reference map for metadata needed during HadoopRDD split computation
- // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
- private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
-
- private[spark] var driverTmpDir: Option[String] = None
-
- private[spark] def stop() {
-
- if (!isStopped) {
- isStopped = true
- pythonWorkers.values.foreach(_.stop())
- mapOutputTracker.stop()
- shuffleManager.stop()
- broadcastManager.stop()
- blockManager.stop()
- blockManager.master.stop()
- metricsSystem.stop()
- outputCommitCoordinator.stop()
- rpcEnv.shutdown()
- rpcEnv.awaitTermination()
-
- // If we only stop sc, but the driver process still run as a services then we need to delete
- // the tmp dir, if not, it will create too many tmp dirs.
- // We only need to delete the tmp dir create by driver
- driverTmpDir match {
- case Some(path) =>
- try {
- Utils.deleteRecursively(new File(path))
- } catch {
- case e: Exception =>
- logWarning(s"Exception while deleting Spark temp dir: $path", e)
- }
- case None => // We just need to delete tmp dir created by driver, so do nothing on executor
- }
- }
- }
-
- private[spark]
- def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
- synchronized {
- val key = (pythonExec, envVars)
- pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
- }
- }
-
- private[spark]
- def destroyPythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) {
- synchronized {
- val key = (pythonExec, envVars)
- pythonWorkers.get(key).foreach(_.stopWorker(worker))
- }
- }
-
- private[spark]
- def releasePythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) {
- synchronized {
- val key = (pythonExec, envVars)
- pythonWorkers.get(key).foreach(_.releaseWorker(worker))
- }
- }
-}
-
-object SparkEnv extends Logging {
- logInfo("Loaded Kyuubi Supplied SparkEnv Class...")
- private val env = new ConcurrentHashMap[String, SparkEnv]()
-
- private[spark] val driverSystemName = "sparkDriver"
- private[spark] val executorSystemName = "sparkExecutor"
-
- private[this] def user = UserGroupInformation.getCurrentUser.getShortUserName
-
- def set(e: SparkEnv) {
- if (e == null) {
- logDebug(s"Kyuubi: Removing SparkEnv for $user")
- env.remove(user)
- } else {
- logDebug(s"Kyuubi: Registering SparkEnv for $user")
- env.put(user, e)
- }
- }
-
- /**
- * Returns the SparkEnv.
- */
- def get: SparkEnv = {
- logDebug(s"Kyuubi: Get SparkEnv for $user")
- env.get(user)
- }
-
- /**
- * Create a SparkEnv for the driver.
- */
- private[spark] def createDriverEnv(
- conf: SparkConf,
- isLocal: Boolean,
- listenerBus: LiveListenerBus,
- numCores: Int,
- mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
- assert(conf.contains(DRIVER_HOST_ADDRESS),
- s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
- assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
- val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
- val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
- val port = conf.get("spark.driver.port").toInt
- val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
- Some(CryptoStreamUtils.createKey(conf))
- } else {
- None
- }
- create(
- conf,
- SparkContext.DRIVER_IDENTIFIER,
- bindAddress,
- advertiseAddress,
- port,
- isLocal,
- numCores,
- ioEncryptionKey,
- listenerBus = listenerBus,
- mockOutputCommitCoordinator = mockOutputCommitCoordinator
- )
- }
-
- /**
- * Create a SparkEnv for an executor.
- * In coarse-grained mode, the executor provides an RpcEnv that is already instantiated.
- */
- private[spark] def createExecutorEnv(
- conf: SparkConf,
- executorId: String,
- hostname: String,
- port: Int,
- numCores: Int,
- ioEncryptionKey: Option[Array[Byte]],
- isLocal: Boolean): SparkEnv = {
- val env = create(
- conf,
- executorId,
- hostname,
- hostname,
- port,
- isLocal,
- numCores,
- ioEncryptionKey
- )
- SparkEnv.set(env)
- env
- }
-
- /**
- * Helper method to create a SparkEnv for a driver or an executor.
- */
- private def create(
- conf: SparkConf,
- executorId: String,
- bindAddress: String,
- advertiseAddress: String,
- port: Int,
- isLocal: Boolean,
- numUsableCores: Int,
- ioEncryptionKey: Option[Array[Byte]],
- listenerBus: LiveListenerBus = null,
- mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
-
- val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
-
- // Listener bus is only used on the driver
- if (isDriver) {
- assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
- }
-
- val securityManager = new SecurityManager(conf, ioEncryptionKey)
- ioEncryptionKey.foreach { _ =>
- if (!securityManager.isEncryptionEnabled()) {
- logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
- "wire.")
- }
- }
-
- val systemName = if (isDriver) driverSystemName else executorSystemName
- val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
- securityManager, clientMode = !isDriver)
-
- // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
- // In the non-driver case, the RPC env's address may be null since it may not be listening
- // for incoming connections.
- if (isDriver) {
- conf.set("spark.driver.port", rpcEnv.address.port.toString)
- } else if (rpcEnv.address != null) {
- conf.set("spark.executor.port", rpcEnv.address.port.toString)
- logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}")
- }
-
- // Create an instance of the class with the given name, possibly initializing it with our conf
- def instantiateClass[T](className: String): T = {
- val cls = Utils.classForName(className)
- // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
- // SparkConf, then one taking no arguments
- try {
- cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
- .newInstance(conf, new java.lang.Boolean(isDriver))
- .asInstanceOf[T]
- } catch {
- case _: NoSuchMethodException =>
- try {
- cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
- } catch {
- case _: NoSuchMethodException =>
- cls.getConstructor().newInstance().asInstanceOf[T]
- }
- }
- }
-
- // Create an instance of the class named by the given SparkConf property, or defaultClassName
- // if the property is not set, possibly initializing it with our conf
- def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
- instantiateClass[T](conf.get(propertyName, defaultClassName))
- }
-
- val serializer = instantiateClassFromConf[Serializer](
- "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
- logDebug(s"Using serializer: ${serializer.getClass}")
-
- val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
-
- val closureSerializer = new JavaSerializer(conf)
-
- def registerOrLookupEndpoint(
- name: String, endpointCreator: => RpcEndpoint):
- RpcEndpointRef = {
- if (isDriver) {
- logInfo("Registering " + name)
- rpcEnv.setupEndpoint(name, endpointCreator)
- } else {
- RpcUtils.makeDriverRef(name, conf, rpcEnv)
- }
- }
-
- val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
-
- val mapOutputTracker = if (isDriver) {
- new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
- } else {
- new MapOutputTrackerWorker(conf)
- }
-
- // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
- // requires the MapOutputTracker itself
- mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
- new MapOutputTrackerMasterEndpoint(
- rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
-
- // Let the user specify short names for shuffle managers
- val shortShuffleMgrNames = Map(
- "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
- "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
- val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
- val shuffleMgrClass =
- shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
- val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
-
- val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
- val memoryManager: MemoryManager =
- if (useLegacyMemoryManager) {
- new StaticMemoryManager(conf, numUsableCores)
- } else {
- UnifiedMemoryManager(conf, numUsableCores)
- }
-
- val blockManagerPort = if (isDriver) {
- conf.get(DRIVER_BLOCK_MANAGER_PORT)
- } else {
- conf.get(BLOCK_MANAGER_PORT)
- }
-
- val blockTransferService =
- new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
- blockManagerPort, numUsableCores)
-
- val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
- BlockManagerMaster.DRIVER_ENDPOINT_NAME,
- new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
- conf, isDriver)
-
- // NB: blockManager is not valid until initialize() is called later.
- val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
- serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
- blockTransferService, securityManager, numUsableCores)
-
- val metricsSystem = if (isDriver) {
- // Don't start metrics system right now for Driver.
- // We need to wait for the task scheduler to give us an app ID.
- // Then we can start the metrics system.
- MetricsSystem.createMetricsSystem("driver", conf, securityManager)
- } else {
- // We need to set the executor ID before the MetricsSystem is created because sources and
- // sinks specified in the metrics configuration file will want to incorporate this executor's
- // ID into the metrics they report.
- conf.set("spark.executor.id", executorId)
- val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
- ms.start()
- ms
- }
-
- val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
- new OutputCommitCoordinator(conf, isDriver)
- }
- val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
- new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
- outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
-
- val envInstance = new SparkEnv(
- executorId,
- rpcEnv,
- serializer,
- closureSerializer,
- serializerManager,
- mapOutputTracker,
- shuffleManager,
- broadcastManager,
- blockManager,
- securityManager,
- metricsSystem,
- memoryManager,
- outputCommitCoordinator,
- conf)
-
- // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
- // called, and we only need to do it for driver. Because driver may run as a service, and if we
- // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
- if (isDriver) {
- val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
- envInstance.driverTmpDir = Some(sparkFilesDir)
- }
-
- envInstance
- }
-
- /**
- * Return a map representation of jvm information, Spark properties, system properties, and
- * class paths. Map keys define the category, and map values represent the corresponding
- * attributes as a sequence of KV pairs. This is used mainly for SparkListenerEnvironmentUpdate.
- */
- private[spark]
- def environmentDetails(
- conf: SparkConf,
- schedulingMode: String,
- addedJars: Seq[String],
- addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = {
-
- import Properties._
- val jvmInformation = Seq(
- ("Java Version", s"$javaVersion ($javaVendor)"),
- ("Java Home", javaHome),
- ("Scala Version", versionString)
- ).sorted
-
- // Spark properties
- // This includes the scheduling mode whether or not it is configured (used by SparkUI)
- val schedulerMode =
- if (!conf.contains("spark.scheduler.mode")) {
- Seq(("spark.scheduler.mode", schedulingMode))
- } else {
- Seq[(String, String)]()
- }
- val sparkProperties = (conf.getAll ++ schedulerMode).sorted
-
- // System properties that are not java classpaths
- val systemProperties = Utils.getSystemProperties.toSeq
- val otherProperties = systemProperties.filter { case (k, _) =>
- k != "java.class.path" && !k.startsWith("spark.")
- }.sorted
-
- // Class paths including all added jars and files
- val classPathEntries = javaClassPath
- .split(File.pathSeparator)
- .filterNot(_.isEmpty)
- .map((_, "System Classpath"))
- val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
- val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted
-
- Map[String, Seq[(String, String)]](
- "JVM Information" -> jvmInformation,
- "Spark Properties" -> sparkProperties,
- "System Properties" -> otherProperties,
- "Classpath Entries" -> classPaths)
- }
-}
diff --git a/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala b/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala
index b1de3196b..b2e29324e 100644
--- a/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala
+++ b/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala
@@ -111,4 +111,15 @@ object KyuubiSparkUtil extends Logging {
def minorVersion(sparkVersion: String): Int = VersionUtils.minorVersion(sparkVersion)
+ /**
+ * Check whether the runtime version of Spark is >= the specified one.
+ */
+ def isSparkVersionOrLater(version: String): Boolean = {
+ val tMajor = majorVersion(SPARK_VERSION)
+ val tMinor = minorVersion(SPARK_VERSION)
+ val sMajor = majorVersion(version)
+ val sMinor = minorVersion(version)
+ tMajor > sMajor || (tMajor == sMajor && tMinor >= sMinor)
+ }
+
}
diff --git a/src/extra/v2.3/scala/org/apache/spark/SparkEnv.scala b/src/main/scala/org/apache/spark/SparkEnv.scala
similarity index 91%
rename from src/extra/v2.3/scala/org/apache/spark/SparkEnv.scala
rename to src/main/scala/org/apache/spark/SparkEnv.scala
index 99fc6293b..7a1a9f64d 100644
--- a/src/extra/v2.3/scala/org/apache/spark/SparkEnv.scala
+++ b/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -19,15 +19,14 @@ package org.apache.spark
import java.io.File
import java.net.Socket
-import java.util.concurrent.ConcurrentHashMap
import java.util.Locale
+import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import scala.util.Properties
import com.google.common.collect.MapMaker
import org.apache.hadoop.security.UserGroupInformation
-
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
@@ -45,6 +44,8 @@ import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage._
import org.apache.spark.util.{RpcUtils, Utils}
+import yaooqinn.kyuubi.utils.ReflectUtils
+
/**
* :: DeveloperApi ::
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
@@ -201,6 +202,34 @@ object SparkEnv extends Logging {
/**
* Create a SparkEnv for an executor.
* In coarse-grained mode, the executor provides an RpcEnv that is already instantiated.
+ * @since 2.2
+ */
+ private[spark] def createExecutorEnv(
+ conf: SparkConf,
+ executorId: String,
+ hostname: String,
+ port: Int,
+ numCores: Int,
+ ioEncryptionKey: Option[Array[Byte]],
+ isLocal: Boolean): SparkEnv = {
+ val env = create(
+ conf,
+ executorId,
+ hostname,
+ hostname,
+ Some(port),
+ isLocal,
+ numCores,
+ ioEncryptionKey
+ )
+ SparkEnv.set(env)
+ env
+ }
+
+ /**
+ * Create a SparkEnv for an executor.
+ * In coarse-grained mode, the executor provides an RpcEnv that is already instantiated.
+ * @since 2.3
*/
private[spark] def createExecutorEnv(
conf: SparkConf,
@@ -246,20 +275,30 @@ object SparkEnv extends Logging {
}
val securityManager = new SecurityManager(conf, ioEncryptionKey)
- if (isDriver) {
- securityManager.initializeAuth()
- }
-
- ioEncryptionKey.foreach { _ =>
- if (!securityManager.isEncryptionEnabled()) {
- logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
- "wire.")
- }
- }
val systemName = if (isDriver) driverSystemName else executorSystemName
- val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
- securityManager, numUsableCores, !isDriver)
+
+ val rpcEnv = {
+ val p = port.getOrElse(-1).asInstanceOf[Integer]
+
+ val isNotDriver = (!isDriver).asInstanceOf[java.lang.Boolean]
+ if (KyuubiSparkUtil.isSparkVersionOrLater("2.3")) {
+ ReflectUtils.invokeStaticMethod(
+ classOf[RpcEnv],
+ "create",
+ Seq(classOf[String], classOf[String], classOf[String], classOf[Int],
+ classOf[SparkConf], classOf[SecurityManager], classOf[Int], classOf[Boolean]),
+ Seq(systemName, bindAddress, advertiseAddress, p, conf,
+ securityManager, numUsableCores.asInstanceOf[Integer], isNotDriver))
+ } else {
+ ReflectUtils.invokeStaticMethod(
+ classOf[RpcEnv],
+ "create",
+ Seq(classOf[String], classOf[String], classOf[String], classOf[Int],
+ classOf[SparkConf], classOf[SecurityManager], classOf[Boolean]),
+ Seq(systemName, bindAddress, advertiseAddress, p, conf, securityManager, isNotDriver))
+ }
+ }.asInstanceOf[RpcEnv]
// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
if (isDriver) {
diff --git a/src/main/scala/org/apache/spark/deploy/KyuubiSubmit.scala b/src/main/scala/org/apache/spark/deploy/KyuubiSubmit.scala
index 1ca83097a..dcb87ccc9 100644
--- a/src/main/scala/org/apache/spark/deploy/KyuubiSubmit.scala
+++ b/src/main/scala/org/apache/spark/deploy/KyuubiSubmit.scala
@@ -135,28 +135,24 @@ object KyuubiSubmit {
// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"
- Seq[OptionAssigner](
- OptionAssigner(args.master, YARN, CLIENT, sysProp = "spark.master"),
- OptionAssigner(args.deployMode, YARN, CLIENT, sysProp = "spark.submit.deployMode"),
- OptionAssigner(args.name, YARN, CLIENT, sysProp = "spark.app.name"),
- OptionAssigner(args.driverMemory, YARN, CLIENT, sysProp = "spark.driver.memory"),
- OptionAssigner(args.driverExtraClassPath, YARN, CLIENT,
- sysProp = "spark.driver.extraClassPath"),
- OptionAssigner(args.driverExtraJavaOptions, YARN, CLIENT,
- sysProp = "spark.driver.extraJavaOptions"),
- OptionAssigner(args.driverExtraLibraryPath, YARN, CLIENT,
- sysProp = "spark.driver.extraLibraryPath"),
- OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
- OptionAssigner(args.numExecutors, YARN, CLIENT,
- sysProp = "spark.executor.instances"),
- OptionAssigner(args.jars, YARN, CLIENT, sysProp = "spark.yarn.dist.jars"),
- OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"),
- OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),
- OptionAssigner(args.principal, YARN, CLIENT, sysProp = "spark.yarn.principal"),
- OptionAssigner(args.keytab, YARN, CLIENT, sysProp = "spark.yarn.keytab"),
- OptionAssigner(args.executorCores, YARN, CLIENT, sysProp = "spark.executor.cores"),
- OptionAssigner(args.executorMemory, YARN, CLIENT, sysProp = "spark.executor.memory")
- ).filter(_.value != null).foreach(opt => sysProps.put(opt.sysProp, opt.value))
+ Seq(
+ "spark.master" -> args.master,
+ "spark.submit.deployMode" -> args.deployMode,
+ "spark.app.name" -> args.name,
+ "spark.driver.memory" -> args.driverMemory,
+ "spark.driver.extraClassPath" -> args.driverExtraClassPath,
+ "spark.driver.extraJavaOptions" -> args.driverExtraJavaOptions,
+ "spark.driver.extraLibraryPath" -> args.driverExtraLibraryPath,
+ "spark.yarn.queue" -> args.queue,
+ "spark.executor.instances" -> args.numExecutors,
+ "spark.yarn.dist.jars" -> args.jars,
+ "spark.yarn.dist.files" -> args.files,
+ "spark.yarn.dist.archives" -> args.archives,
+ "spark.yarn.principal" -> args.principal,
+ "spark.yarn.keytab" -> args.keytab,
+ "spark.executor.cores" -> args.executorCores,
+ "spark.executor.memory" -> args.executorMemory
+ ).filter(_._2 != null).foreach(o => sysProps.put(o._1, o._2))
childClasspath += args.primaryResource
if (args.jars != null) { childClasspath ++= args.jars.split(",") }
diff --git a/src/main/scala/org/apache/spark/launcher/KyuubiSubmitCommandBuilder.scala b/src/main/scala/org/apache/spark/launcher/KyuubiSubmitCommandBuilder.scala
index a1d7a8c35..0e13e494c 100644
--- a/src/main/scala/org/apache/spark/launcher/KyuubiSubmitCommandBuilder.scala
+++ b/src/main/scala/org/apache/spark/launcher/KyuubiSubmitCommandBuilder.scala
@@ -20,6 +20,8 @@ package org.apache.spark.launcher
import java.io.File
import java.util.{List => JList, Map => JMap}
+import scala.collection.JavaConverters._
+
import org.apache.spark.launcher.CommandBuilderUtils._
class KyuubiSubmitCommandBuilder(args: JList[String]) extends SparkSubmitCommandBuilder(args) {
@@ -55,9 +57,29 @@ class KyuubiSubmitCommandBuilder(args: JList[String]) extends SparkSubmitCommand
cmd.add("-Xmx" + memory)
addOptionString(cmd, driverExtraJavaOptions)
mergeEnvPathList(env, getLibPathEnvName, config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH))
- addPermGenSizeOpt(cmd)
+ addPermSize(cmd)
cmd.add("org.apache.spark.deploy.KyuubiSubmit")
cmd.addAll(buildSparkSubmitArgs)
cmd
}
+
+ /**
+ * Adds the default perm gen size option for Spark if the VM requires it and the user hasn't
+ * set it.
+ */
+ private def addPermSize(cmd: JList[String]): Unit = {
+ // Don't set MaxPermSize for IBM Java, or Oracle Java 8 and later.
+ if (getJavaVendor eq JavaVendor.IBM) {
+ return
+ }
+ if (javaMajorVersion(System.getProperty("java.version")) > 7) {
+ return
+ }
+ cmd.asScala.foreach { arg =>
+ if (arg.contains("-XX:MaxPermSize=")) {
+ return
+ }
+ }
+ cmd.add("-XX:MaxPermSize=256m")
+ }
}