fix #70 runtime support running on different versions of spark
This commit is contained in:
parent
dd09f5694a
commit
8d612732fb
@ -20,6 +20,9 @@ install:
|
||||
|
||||
script:
|
||||
- ./build/mvn package -q -Dmaven.javadoc.skip=true -B -V
|
||||
- ./build/mvn package -Pspark-2.2 -q -Dmaven.javadoc.skip=true -B -V
|
||||
- ./build/mvn package -Pspark-2.3 -q -Dmaven.javadoc.skip=true -B -V
|
||||
|
||||
|
||||
language: java
|
||||
after_success:
|
||||
|
||||
26
pom.xml
26
pom.xml
@ -53,7 +53,6 @@
|
||||
<maven.version>3.3.9</maven.version>
|
||||
<spark.group>org.apache.spark</spark.group>
|
||||
<spark.version>2.1.2</spark.version>
|
||||
<spark.branch>2.1</spark.branch>
|
||||
<spark.scope>provided</spark.scope>
|
||||
<hadoop.version>2.7.3</hadoop.version>
|
||||
<hadoop.deps.scope>provided</hadoop.deps.scope>
|
||||
@ -327,7 +326,6 @@
|
||||
</resource>
|
||||
</resources>
|
||||
<plugins>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-antrun-plugin</artifactId>
|
||||
@ -448,18 +446,6 @@
|
||||
</sources>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>add-overwrite-spark-source</id>
|
||||
<phase>generate-sources</phase>
|
||||
<goals>
|
||||
<goal>add-source</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<sources>
|
||||
<source>src/extra/v${spark.branch}/scala</source>
|
||||
</sources>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
@ -504,6 +490,7 @@
|
||||
<exclude>**/org/apache/hive/service/cli/thrift/*.class</exclude>
|
||||
<exclude>**/org/apache/spark/launcher/*.class</exclude>
|
||||
<exclude>**/org/apache/spark/deploy/*.class</exclude>
|
||||
<exclude>**/org/apache/spark/SparkEnv.class</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
<executions>
|
||||
@ -527,19 +514,10 @@
|
||||
</build>
|
||||
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>spark-2.0</id>
|
||||
<properties>
|
||||
<spark.version>2.0.2</spark.version>
|
||||
<spark.branch>2.0</spark.branch>
|
||||
</properties>
|
||||
</profile>
|
||||
|
||||
<profile>
|
||||
<id>spark-2.1</id>
|
||||
<properties>
|
||||
<spark.version>2.1.2</spark.version>
|
||||
<spark.branch>2.1</spark.branch>
|
||||
</properties>
|
||||
</profile>
|
||||
|
||||
@ -547,7 +525,6 @@
|
||||
<id>spark-2.2</id>
|
||||
<properties>
|
||||
<spark.version>2.2.1</spark.version>
|
||||
<spark.branch>2.2</spark.branch>
|
||||
</properties>
|
||||
</profile>
|
||||
|
||||
@ -555,7 +532,6 @@
|
||||
<id>spark-2.3</id>
|
||||
<properties>
|
||||
<spark.version>2.3.0</spark.version>
|
||||
<spark.branch>2.3</spark.branch>
|
||||
<scalatest.version>3.0.3</scalatest.version>
|
||||
</properties>
|
||||
</profile>
|
||||
|
||||
@ -1,463 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark
|
||||
|
||||
import java.io.File
|
||||
import java.net.Socket
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.util.Properties
|
||||
|
||||
import com.google.common.collect.MapMaker
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.api.python.PythonWorkerFactory
|
||||
import org.apache.spark.broadcast.BroadcastManager
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
|
||||
import org.apache.spark.metrics.MetricsSystem
|
||||
import org.apache.spark.network.netty.NettyBlockTransferService
|
||||
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
|
||||
import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator}
|
||||
import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint
|
||||
import org.apache.spark.security.CryptoStreamUtils
|
||||
import org.apache.spark.serializer.{JavaSerializer, Serializer, SerializerManager}
|
||||
import org.apache.spark.shuffle.ShuffleManager
|
||||
import org.apache.spark.storage._
|
||||
import org.apache.spark.util.{RpcUtils, Utils}
|
||||
|
||||
/**
|
||||
* :: DeveloperApi ::
|
||||
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
|
||||
* including the serializer, RpcEnv, block manager, map output tracker, etc. Currently
|
||||
* Spark code finds the SparkEnv through a global variable, so all the threads can access the same
|
||||
* SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
|
||||
*
|
||||
* NOTE: This is not intended for external use. This is exposed for Shark and may be made private
|
||||
* in a future release.
|
||||
*/
|
||||
@DeveloperApi
|
||||
class SparkEnv (
|
||||
val executorId: String,
|
||||
private[spark] val rpcEnv: RpcEnv,
|
||||
val serializer: Serializer,
|
||||
val closureSerializer: Serializer,
|
||||
val serializerManager: SerializerManager,
|
||||
val mapOutputTracker: MapOutputTracker,
|
||||
val shuffleManager: ShuffleManager,
|
||||
val broadcastManager: BroadcastManager,
|
||||
val blockManager: BlockManager,
|
||||
val securityManager: SecurityManager,
|
||||
val metricsSystem: MetricsSystem,
|
||||
val memoryManager: MemoryManager,
|
||||
val outputCommitCoordinator: OutputCommitCoordinator,
|
||||
val conf: SparkConf) extends Logging {
|
||||
|
||||
private[spark] var isStopped = false
|
||||
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
|
||||
|
||||
// A general, soft-reference map for metadata needed during HadoopRDD split computation
|
||||
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
|
||||
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
|
||||
|
||||
private[spark] var driverTmpDir: Option[String] = None
|
||||
|
||||
private[spark] def stop() {
|
||||
|
||||
if (!isStopped) {
|
||||
isStopped = true
|
||||
pythonWorkers.values.foreach(_.stop())
|
||||
mapOutputTracker.stop()
|
||||
shuffleManager.stop()
|
||||
broadcastManager.stop()
|
||||
blockManager.stop()
|
||||
blockManager.master.stop()
|
||||
metricsSystem.stop()
|
||||
outputCommitCoordinator.stop()
|
||||
rpcEnv.shutdown()
|
||||
rpcEnv.awaitTermination()
|
||||
|
||||
// If we only stop sc, but the driver process still run as a services then we need to delete
|
||||
// the tmp dir, if not, it will create too many tmp dirs.
|
||||
// We only need to delete the tmp dir create by driver
|
||||
driverTmpDir match {
|
||||
case Some(path) =>
|
||||
try {
|
||||
Utils.deleteRecursively(new File(path))
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
logWarning(s"Exception while deleting Spark temp dir: $path", e)
|
||||
}
|
||||
case None => // We just need to delete tmp dir created by driver, so do nothing on executor
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private[spark]
|
||||
def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
|
||||
synchronized {
|
||||
val key = (pythonExec, envVars)
|
||||
pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
|
||||
}
|
||||
}
|
||||
|
||||
private[spark]
|
||||
def destroyPythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) {
|
||||
synchronized {
|
||||
val key = (pythonExec, envVars)
|
||||
pythonWorkers.get(key).foreach(_.stopWorker(worker))
|
||||
}
|
||||
}
|
||||
|
||||
private[spark]
|
||||
def releasePythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) {
|
||||
synchronized {
|
||||
val key = (pythonExec, envVars)
|
||||
pythonWorkers.get(key).foreach(_.releaseWorker(worker))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object SparkEnv extends Logging {
|
||||
logInfo("Loaded Kyuubi Supplied SparkEnv Class...")
|
||||
private val env = new ConcurrentHashMap[String, SparkEnv]()
|
||||
|
||||
private[spark] val driverSystemName = "sparkDriver"
|
||||
private[spark] val executorSystemName = "sparkExecutor"
|
||||
|
||||
private[this] def user = UserGroupInformation.getCurrentUser.getShortUserName
|
||||
|
||||
def set(e: SparkEnv) {
|
||||
if (e == null) {
|
||||
logDebug(s"Kyuubi: Removing SparkEnv for $user")
|
||||
env.remove(user)
|
||||
} else {
|
||||
logDebug(s"Kyuubi: Registering SparkEnv for $user")
|
||||
env.put(user, e)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the SparkEnv.
|
||||
*/
|
||||
def get: SparkEnv = {
|
||||
logDebug(s"Kyuubi: Get SparkEnv for $user")
|
||||
env.get(user)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a SparkEnv for the driver.
|
||||
*/
|
||||
private[spark] def createDriverEnv(
|
||||
conf: SparkConf,
|
||||
isLocal: Boolean,
|
||||
listenerBus: LiveListenerBus,
|
||||
numCores: Int,
|
||||
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
|
||||
assert(conf.contains(DRIVER_HOST_ADDRESS),
|
||||
s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
|
||||
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
|
||||
val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
|
||||
val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
|
||||
val port = conf.get("spark.driver.port").toInt
|
||||
val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
|
||||
Some(CryptoStreamUtils.createKey(conf))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
create(
|
||||
conf,
|
||||
SparkContext.DRIVER_IDENTIFIER,
|
||||
bindAddress,
|
||||
advertiseAddress,
|
||||
port,
|
||||
isLocal,
|
||||
numCores,
|
||||
ioEncryptionKey,
|
||||
listenerBus = listenerBus,
|
||||
mockOutputCommitCoordinator = mockOutputCommitCoordinator
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a SparkEnv for an executor.
|
||||
* In coarse-grained mode, the executor provides an RpcEnv that is already instantiated.
|
||||
*/
|
||||
private[spark] def createExecutorEnv(
|
||||
conf: SparkConf,
|
||||
executorId: String,
|
||||
hostname: String,
|
||||
port: Int,
|
||||
numCores: Int,
|
||||
ioEncryptionKey: Option[Array[Byte]],
|
||||
isLocal: Boolean): SparkEnv = {
|
||||
val env = create(
|
||||
conf,
|
||||
executorId,
|
||||
hostname,
|
||||
hostname,
|
||||
port,
|
||||
isLocal,
|
||||
numCores,
|
||||
ioEncryptionKey
|
||||
)
|
||||
SparkEnv.set(env)
|
||||
env
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to create a SparkEnv for a driver or an executor.
|
||||
*/
|
||||
private def create(
|
||||
conf: SparkConf,
|
||||
executorId: String,
|
||||
bindAddress: String,
|
||||
advertiseAddress: String,
|
||||
port: Int,
|
||||
isLocal: Boolean,
|
||||
numUsableCores: Int,
|
||||
ioEncryptionKey: Option[Array[Byte]],
|
||||
listenerBus: LiveListenerBus = null,
|
||||
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
|
||||
|
||||
val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
|
||||
|
||||
// Listener bus is only used on the driver
|
||||
if (isDriver) {
|
||||
assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
|
||||
}
|
||||
|
||||
val securityManager = new SecurityManager(conf, ioEncryptionKey)
|
||||
ioEncryptionKey.foreach { _ =>
|
||||
if (!securityManager.isSaslEncryptionEnabled()) {
|
||||
logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
|
||||
"wire.")
|
||||
}
|
||||
}
|
||||
|
||||
val systemName = if (isDriver) driverSystemName else executorSystemName
|
||||
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
|
||||
securityManager, clientMode = !isDriver)
|
||||
|
||||
// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
|
||||
// In the non-driver case, the RPC env's address may be null since it may not be listening
|
||||
// for incoming connections.
|
||||
if (isDriver) {
|
||||
conf.set("spark.driver.port", rpcEnv.address.port.toString)
|
||||
} else if (rpcEnv.address != null) {
|
||||
conf.set("spark.executor.port", rpcEnv.address.port.toString)
|
||||
logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}")
|
||||
}
|
||||
|
||||
// Create an instance of the class with the given name, possibly initializing it with our conf
|
||||
def instantiateClass[T](className: String): T = {
|
||||
val cls = Utils.classForName(className)
|
||||
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
|
||||
// SparkConf, then one taking no arguments
|
||||
try {
|
||||
cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
|
||||
.newInstance(conf, new java.lang.Boolean(isDriver))
|
||||
.asInstanceOf[T]
|
||||
} catch {
|
||||
case _: NoSuchMethodException =>
|
||||
try {
|
||||
cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
|
||||
} catch {
|
||||
case _: NoSuchMethodException =>
|
||||
cls.getConstructor().newInstance().asInstanceOf[T]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create an instance of the class named by the given SparkConf property, or defaultClassName
|
||||
// if the property is not set, possibly initializing it with our conf
|
||||
def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
|
||||
instantiateClass[T](conf.get(propertyName, defaultClassName))
|
||||
}
|
||||
|
||||
val serializer = instantiateClassFromConf[Serializer](
|
||||
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
|
||||
logDebug(s"Using serializer: ${serializer.getClass}")
|
||||
|
||||
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
|
||||
|
||||
val closureSerializer = new JavaSerializer(conf)
|
||||
|
||||
def registerOrLookupEndpoint(
|
||||
name: String, endpointCreator: => RpcEndpoint):
|
||||
RpcEndpointRef = {
|
||||
if (isDriver) {
|
||||
logInfo("Registering " + name)
|
||||
rpcEnv.setupEndpoint(name, endpointCreator)
|
||||
} else {
|
||||
RpcUtils.makeDriverRef(name, conf, rpcEnv)
|
||||
}
|
||||
}
|
||||
|
||||
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
|
||||
|
||||
val mapOutputTracker = if (isDriver) {
|
||||
new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
|
||||
} else {
|
||||
new MapOutputTrackerWorker(conf)
|
||||
}
|
||||
|
||||
// Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
|
||||
// requires the MapOutputTracker itself
|
||||
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
|
||||
new MapOutputTrackerMasterEndpoint(
|
||||
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
|
||||
|
||||
// Let the user specify short names for shuffle managers
|
||||
val shortShuffleMgrNames = Map(
|
||||
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
|
||||
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
|
||||
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
|
||||
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
|
||||
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
|
||||
|
||||
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
|
||||
val memoryManager: MemoryManager =
|
||||
if (useLegacyMemoryManager) {
|
||||
new StaticMemoryManager(conf, numUsableCores)
|
||||
} else {
|
||||
UnifiedMemoryManager(conf, numUsableCores)
|
||||
}
|
||||
|
||||
val blockManagerPort = if (isDriver) {
|
||||
conf.get(DRIVER_BLOCK_MANAGER_PORT)
|
||||
} else {
|
||||
conf.get(BLOCK_MANAGER_PORT)
|
||||
}
|
||||
|
||||
val blockTransferService =
|
||||
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
|
||||
blockManagerPort, numUsableCores)
|
||||
|
||||
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
|
||||
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
|
||||
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
|
||||
conf, isDriver)
|
||||
|
||||
// NB: blockManager is not valid until initialize() is called later.
|
||||
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
|
||||
serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
|
||||
blockTransferService, securityManager, numUsableCores)
|
||||
|
||||
val metricsSystem = if (isDriver) {
|
||||
// Don't start metrics system right now for Driver.
|
||||
// We need to wait for the task scheduler to give us an app ID.
|
||||
// Then we can start the metrics system.
|
||||
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
|
||||
} else {
|
||||
// We need to set the executor ID before the MetricsSystem is created because sources and
|
||||
// sinks specified in the metrics configuration file will want to incorporate this executor's
|
||||
// ID into the metrics they report.
|
||||
conf.set("spark.executor.id", executorId)
|
||||
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
|
||||
ms.start()
|
||||
ms
|
||||
}
|
||||
|
||||
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
|
||||
new OutputCommitCoordinator(conf, isDriver)
|
||||
}
|
||||
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
|
||||
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
|
||||
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
|
||||
|
||||
val envInstance = new SparkEnv(
|
||||
executorId,
|
||||
rpcEnv,
|
||||
serializer,
|
||||
closureSerializer,
|
||||
serializerManager,
|
||||
mapOutputTracker,
|
||||
shuffleManager,
|
||||
broadcastManager,
|
||||
blockManager,
|
||||
securityManager,
|
||||
metricsSystem,
|
||||
memoryManager,
|
||||
outputCommitCoordinator,
|
||||
conf)
|
||||
|
||||
// Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
|
||||
// called, and we only need to do it for driver. Because driver may run as a service, and if we
|
||||
// don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
|
||||
if (isDriver) {
|
||||
val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
|
||||
envInstance.driverTmpDir = Some(sparkFilesDir)
|
||||
}
|
||||
|
||||
envInstance
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a map representation of jvm information, Spark properties, system properties, and
|
||||
* class paths. Map keys define the category, and map values represent the corresponding
|
||||
* attributes as a sequence of KV pairs. This is used mainly for SparkListenerEnvironmentUpdate.
|
||||
*/
|
||||
private[spark]
|
||||
def environmentDetails(
|
||||
conf: SparkConf,
|
||||
schedulingMode: String,
|
||||
addedJars: Seq[String],
|
||||
addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = {
|
||||
|
||||
import Properties._
|
||||
val jvmInformation = Seq(
|
||||
("Java Version", s"$javaVersion ($javaVendor)"),
|
||||
("Java Home", javaHome),
|
||||
("Scala Version", versionString)
|
||||
).sorted
|
||||
|
||||
// Spark properties
|
||||
// This includes the scheduling mode whether or not it is configured (used by SparkUI)
|
||||
val schedulerMode =
|
||||
if (!conf.contains("spark.scheduler.mode")) {
|
||||
Seq(("spark.scheduler.mode", schedulingMode))
|
||||
} else {
|
||||
Seq[(String, String)]()
|
||||
}
|
||||
val sparkProperties = (conf.getAll ++ schedulerMode).sorted
|
||||
|
||||
// System properties that are not java classpaths
|
||||
val systemProperties = Utils.getSystemProperties.toSeq
|
||||
val otherProperties = systemProperties.filter { case (k, _) =>
|
||||
k != "java.class.path" && !k.startsWith("spark.")
|
||||
}.sorted
|
||||
|
||||
// Class paths including all added jars and files
|
||||
val classPathEntries = javaClassPath
|
||||
.split(File.pathSeparator)
|
||||
.filterNot(_.isEmpty)
|
||||
.map((_, "System Classpath"))
|
||||
val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
|
||||
val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted
|
||||
|
||||
Map[String, Seq[(String, String)]](
|
||||
"JVM Information" -> jvmInformation,
|
||||
"Spark Properties" -> sparkProperties,
|
||||
"System Properties" -> otherProperties,
|
||||
"Classpath Entries" -> classPaths)
|
||||
}
|
||||
}
|
||||
@ -1,465 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark
|
||||
|
||||
import java.io.File
|
||||
import java.net.Socket
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.Locale
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.util.Properties
|
||||
|
||||
import com.google.common.collect.MapMaker
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.api.python.PythonWorkerFactory
|
||||
import org.apache.spark.broadcast.BroadcastManager
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
|
||||
import org.apache.spark.metrics.MetricsSystem
|
||||
import org.apache.spark.network.netty.NettyBlockTransferService
|
||||
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
|
||||
import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator}
|
||||
import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint
|
||||
import org.apache.spark.security.CryptoStreamUtils
|
||||
import org.apache.spark.serializer.{JavaSerializer, Serializer, SerializerManager}
|
||||
import org.apache.spark.shuffle.ShuffleManager
|
||||
import org.apache.spark.storage._
|
||||
import org.apache.spark.util.{RpcUtils, Utils}
|
||||
|
||||
/**
|
||||
* :: DeveloperApi ::
|
||||
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
|
||||
* including the serializer, RpcEnv, block manager, map output tracker, etc. Currently
|
||||
* Spark code finds the SparkEnv through a global variable, so all the threads can access the same
|
||||
* SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
|
||||
*
|
||||
* NOTE: This is not intended for external use. This is exposed for Shark and may be made private
|
||||
* in a future release.
|
||||
*/
|
||||
@DeveloperApi
|
||||
class SparkEnv (
|
||||
val executorId: String,
|
||||
private[spark] val rpcEnv: RpcEnv,
|
||||
val serializer: Serializer,
|
||||
val closureSerializer: Serializer,
|
||||
val serializerManager: SerializerManager,
|
||||
val mapOutputTracker: MapOutputTracker,
|
||||
val shuffleManager: ShuffleManager,
|
||||
val broadcastManager: BroadcastManager,
|
||||
val blockManager: BlockManager,
|
||||
val securityManager: SecurityManager,
|
||||
val metricsSystem: MetricsSystem,
|
||||
val memoryManager: MemoryManager,
|
||||
val outputCommitCoordinator: OutputCommitCoordinator,
|
||||
val conf: SparkConf) extends Logging {
|
||||
|
||||
private[spark] var isStopped = false
|
||||
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
|
||||
|
||||
// A general, soft-reference map for metadata needed during HadoopRDD split computation
|
||||
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
|
||||
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
|
||||
|
||||
private[spark] var driverTmpDir: Option[String] = None
|
||||
|
||||
private[spark] def stop() {
|
||||
|
||||
if (!isStopped) {
|
||||
isStopped = true
|
||||
pythonWorkers.values.foreach(_.stop())
|
||||
mapOutputTracker.stop()
|
||||
shuffleManager.stop()
|
||||
broadcastManager.stop()
|
||||
blockManager.stop()
|
||||
blockManager.master.stop()
|
||||
metricsSystem.stop()
|
||||
outputCommitCoordinator.stop()
|
||||
rpcEnv.shutdown()
|
||||
rpcEnv.awaitTermination()
|
||||
|
||||
// If we only stop sc, but the driver process still run as a services then we need to delete
|
||||
// the tmp dir, if not, it will create too many tmp dirs.
|
||||
// We only need to delete the tmp dir create by driver
|
||||
driverTmpDir match {
|
||||
case Some(path) =>
|
||||
try {
|
||||
Utils.deleteRecursively(new File(path))
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
logWarning(s"Exception while deleting Spark temp dir: $path", e)
|
||||
}
|
||||
case None => // We just need to delete tmp dir created by driver, so do nothing on executor
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private[spark]
|
||||
def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
|
||||
synchronized {
|
||||
val key = (pythonExec, envVars)
|
||||
pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
|
||||
}
|
||||
}
|
||||
|
||||
private[spark]
|
||||
def destroyPythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) {
|
||||
synchronized {
|
||||
val key = (pythonExec, envVars)
|
||||
pythonWorkers.get(key).foreach(_.stopWorker(worker))
|
||||
}
|
||||
}
|
||||
|
||||
private[spark]
|
||||
def releasePythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) {
|
||||
synchronized {
|
||||
val key = (pythonExec, envVars)
|
||||
pythonWorkers.get(key).foreach(_.releaseWorker(worker))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object SparkEnv extends Logging {
|
||||
logInfo("Loaded Kyuubi Supplied SparkEnv Class...")
|
||||
private val env = new ConcurrentHashMap[String, SparkEnv]()
|
||||
|
||||
private[spark] val driverSystemName = "sparkDriver"
|
||||
private[spark] val executorSystemName = "sparkExecutor"
|
||||
|
||||
private[this] def user = UserGroupInformation.getCurrentUser.getShortUserName
|
||||
|
||||
def set(e: SparkEnv) {
|
||||
if (e == null) {
|
||||
logDebug(s"Kyuubi: Removing SparkEnv for $user")
|
||||
env.remove(user)
|
||||
} else {
|
||||
logDebug(s"Kyuubi: Registering SparkEnv for $user")
|
||||
env.put(user, e)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the SparkEnv.
|
||||
*/
|
||||
def get: SparkEnv = {
|
||||
logDebug(s"Kyuubi: Get SparkEnv for $user")
|
||||
env.get(user)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a SparkEnv for the driver.
|
||||
*/
|
||||
private[spark] def createDriverEnv(
|
||||
conf: SparkConf,
|
||||
isLocal: Boolean,
|
||||
listenerBus: LiveListenerBus,
|
||||
numCores: Int,
|
||||
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
|
||||
assert(conf.contains(DRIVER_HOST_ADDRESS),
|
||||
s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
|
||||
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
|
||||
val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
|
||||
val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
|
||||
val port = conf.get("spark.driver.port").toInt
|
||||
val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
|
||||
Some(CryptoStreamUtils.createKey(conf))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
create(
|
||||
conf,
|
||||
SparkContext.DRIVER_IDENTIFIER,
|
||||
bindAddress,
|
||||
advertiseAddress,
|
||||
port,
|
||||
isLocal,
|
||||
numCores,
|
||||
ioEncryptionKey,
|
||||
listenerBus = listenerBus,
|
||||
mockOutputCommitCoordinator = mockOutputCommitCoordinator
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a SparkEnv for an executor.
|
||||
* In coarse-grained mode, the executor provides an RpcEnv that is already instantiated.
|
||||
*/
|
||||
private[spark] def createExecutorEnv(
|
||||
conf: SparkConf,
|
||||
executorId: String,
|
||||
hostname: String,
|
||||
port: Int,
|
||||
numCores: Int,
|
||||
ioEncryptionKey: Option[Array[Byte]],
|
||||
isLocal: Boolean): SparkEnv = {
|
||||
val env = create(
|
||||
conf,
|
||||
executorId,
|
||||
hostname,
|
||||
hostname,
|
||||
port,
|
||||
isLocal,
|
||||
numCores,
|
||||
ioEncryptionKey
|
||||
)
|
||||
SparkEnv.set(env)
|
||||
env
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to create a SparkEnv for a driver or an executor.
|
||||
*/
|
||||
private def create(
|
||||
conf: SparkConf,
|
||||
executorId: String,
|
||||
bindAddress: String,
|
||||
advertiseAddress: String,
|
||||
port: Int,
|
||||
isLocal: Boolean,
|
||||
numUsableCores: Int,
|
||||
ioEncryptionKey: Option[Array[Byte]],
|
||||
listenerBus: LiveListenerBus = null,
|
||||
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
|
||||
|
||||
val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
|
||||
|
||||
// Listener bus is only used on the driver
|
||||
if (isDriver) {
|
||||
assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
|
||||
}
|
||||
|
||||
val securityManager = new SecurityManager(conf, ioEncryptionKey)
|
||||
ioEncryptionKey.foreach { _ =>
|
||||
if (!securityManager.isEncryptionEnabled()) {
|
||||
logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
|
||||
"wire.")
|
||||
}
|
||||
}
|
||||
|
||||
val systemName = if (isDriver) driverSystemName else executorSystemName
|
||||
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
|
||||
securityManager, clientMode = !isDriver)
|
||||
|
||||
// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
|
||||
// In the non-driver case, the RPC env's address may be null since it may not be listening
|
||||
// for incoming connections.
|
||||
if (isDriver) {
|
||||
conf.set("spark.driver.port", rpcEnv.address.port.toString)
|
||||
} else if (rpcEnv.address != null) {
|
||||
conf.set("spark.executor.port", rpcEnv.address.port.toString)
|
||||
logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}")
|
||||
}
|
||||
|
||||
// Create an instance of the class with the given name, possibly initializing it with our conf
|
||||
def instantiateClass[T](className: String): T = {
|
||||
val cls = Utils.classForName(className)
|
||||
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
|
||||
// SparkConf, then one taking no arguments
|
||||
try {
|
||||
cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
|
||||
.newInstance(conf, new java.lang.Boolean(isDriver))
|
||||
.asInstanceOf[T]
|
||||
} catch {
|
||||
case _: NoSuchMethodException =>
|
||||
try {
|
||||
cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
|
||||
} catch {
|
||||
case _: NoSuchMethodException =>
|
||||
cls.getConstructor().newInstance().asInstanceOf[T]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create an instance of the class named by the given SparkConf property, or defaultClassName
|
||||
// if the property is not set, possibly initializing it with our conf
|
||||
def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
|
||||
instantiateClass[T](conf.get(propertyName, defaultClassName))
|
||||
}
|
||||
|
||||
val serializer = instantiateClassFromConf[Serializer](
|
||||
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
|
||||
logDebug(s"Using serializer: ${serializer.getClass}")
|
||||
|
||||
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
|
||||
|
||||
val closureSerializer = new JavaSerializer(conf)
|
||||
|
||||
def registerOrLookupEndpoint(
|
||||
name: String, endpointCreator: => RpcEndpoint):
|
||||
RpcEndpointRef = {
|
||||
if (isDriver) {
|
||||
logInfo("Registering " + name)
|
||||
rpcEnv.setupEndpoint(name, endpointCreator)
|
||||
} else {
|
||||
RpcUtils.makeDriverRef(name, conf, rpcEnv)
|
||||
}
|
||||
}
|
||||
|
||||
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
|
||||
|
||||
val mapOutputTracker = if (isDriver) {
|
||||
new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
|
||||
} else {
|
||||
new MapOutputTrackerWorker(conf)
|
||||
}
|
||||
|
||||
// Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
|
||||
// requires the MapOutputTracker itself
|
||||
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
|
||||
new MapOutputTrackerMasterEndpoint(
|
||||
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
|
||||
|
||||
// Let the user specify short names for shuffle managers
|
||||
val shortShuffleMgrNames = Map(
|
||||
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
|
||||
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
|
||||
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
|
||||
val shuffleMgrClass =
|
||||
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
|
||||
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
|
||||
|
||||
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
|
||||
val memoryManager: MemoryManager =
|
||||
if (useLegacyMemoryManager) {
|
||||
new StaticMemoryManager(conf, numUsableCores)
|
||||
} else {
|
||||
UnifiedMemoryManager(conf, numUsableCores)
|
||||
}
|
||||
|
||||
val blockManagerPort = if (isDriver) {
|
||||
conf.get(DRIVER_BLOCK_MANAGER_PORT)
|
||||
} else {
|
||||
conf.get(BLOCK_MANAGER_PORT)
|
||||
}
|
||||
|
||||
val blockTransferService =
|
||||
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
|
||||
blockManagerPort, numUsableCores)
|
||||
|
||||
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
|
||||
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
|
||||
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
|
||||
conf, isDriver)
|
||||
|
||||
// NB: blockManager is not valid until initialize() is called later.
|
||||
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
|
||||
serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
|
||||
blockTransferService, securityManager, numUsableCores)
|
||||
|
||||
val metricsSystem = if (isDriver) {
|
||||
// Don't start metrics system right now for Driver.
|
||||
// We need to wait for the task scheduler to give us an app ID.
|
||||
// Then we can start the metrics system.
|
||||
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
|
||||
} else {
|
||||
// We need to set the executor ID before the MetricsSystem is created because sources and
|
||||
// sinks specified in the metrics configuration file will want to incorporate this executor's
|
||||
// ID into the metrics they report.
|
||||
conf.set("spark.executor.id", executorId)
|
||||
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
|
||||
ms.start()
|
||||
ms
|
||||
}
|
||||
|
||||
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
|
||||
new OutputCommitCoordinator(conf, isDriver)
|
||||
}
|
||||
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
|
||||
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
|
||||
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
|
||||
|
||||
val envInstance = new SparkEnv(
|
||||
executorId,
|
||||
rpcEnv,
|
||||
serializer,
|
||||
closureSerializer,
|
||||
serializerManager,
|
||||
mapOutputTracker,
|
||||
shuffleManager,
|
||||
broadcastManager,
|
||||
blockManager,
|
||||
securityManager,
|
||||
metricsSystem,
|
||||
memoryManager,
|
||||
outputCommitCoordinator,
|
||||
conf)
|
||||
|
||||
// Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
|
||||
// called, and we only need to do it for driver. Because driver may run as a service, and if we
|
||||
// don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
|
||||
if (isDriver) {
|
||||
val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
|
||||
envInstance.driverTmpDir = Some(sparkFilesDir)
|
||||
}
|
||||
|
||||
envInstance
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a map representation of jvm information, Spark properties, system properties, and
|
||||
* class paths. Map keys define the category, and map values represent the corresponding
|
||||
* attributes as a sequence of KV pairs. This is used mainly for SparkListenerEnvironmentUpdate.
|
||||
*/
|
||||
private[spark]
|
||||
def environmentDetails(
|
||||
conf: SparkConf,
|
||||
schedulingMode: String,
|
||||
addedJars: Seq[String],
|
||||
addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = {
|
||||
|
||||
import Properties._
|
||||
val jvmInformation = Seq(
|
||||
("Java Version", s"$javaVersion ($javaVendor)"),
|
||||
("Java Home", javaHome),
|
||||
("Scala Version", versionString)
|
||||
).sorted
|
||||
|
||||
// Spark properties
|
||||
// This includes the scheduling mode whether or not it is configured (used by SparkUI)
|
||||
val schedulerMode =
|
||||
if (!conf.contains("spark.scheduler.mode")) {
|
||||
Seq(("spark.scheduler.mode", schedulingMode))
|
||||
} else {
|
||||
Seq[(String, String)]()
|
||||
}
|
||||
val sparkProperties = (conf.getAll ++ schedulerMode).sorted
|
||||
|
||||
// System properties that are not java classpaths
|
||||
val systemProperties = Utils.getSystemProperties.toSeq
|
||||
val otherProperties = systemProperties.filter { case (k, _) =>
|
||||
k != "java.class.path" && !k.startsWith("spark.")
|
||||
}.sorted
|
||||
|
||||
// Class paths including all added jars and files
|
||||
val classPathEntries = javaClassPath
|
||||
.split(File.pathSeparator)
|
||||
.filterNot(_.isEmpty)
|
||||
.map((_, "System Classpath"))
|
||||
val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
|
||||
val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted
|
||||
|
||||
Map[String, Seq[(String, String)]](
|
||||
"JVM Information" -> jvmInformation,
|
||||
"Spark Properties" -> sparkProperties,
|
||||
"System Properties" -> otherProperties,
|
||||
"Classpath Entries" -> classPaths)
|
||||
}
|
||||
}
|
||||
@ -111,4 +111,15 @@ object KyuubiSparkUtil extends Logging {
|
||||
|
||||
def minorVersion(sparkVersion: String): Int = VersionUtils.minorVersion(sparkVersion)
|
||||
|
||||
/**
|
||||
* Check whether the runtime version of Spark is >= the specified one.
|
||||
*/
|
||||
def isSparkVersionOrLater(version: String): Boolean = {
|
||||
val tMajor = majorVersion(SPARK_VERSION)
|
||||
val tMinor = minorVersion(SPARK_VERSION)
|
||||
val sMajor = majorVersion(version)
|
||||
val sMinor = minorVersion(version)
|
||||
tMajor > sMajor || (tMajor == sMajor && tMinor >= sMinor)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -19,15 +19,14 @@ package org.apache.spark
|
||||
|
||||
import java.io.File
|
||||
import java.net.Socket
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.Locale
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.util.Properties
|
||||
|
||||
import com.google.common.collect.MapMaker
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.api.python.PythonWorkerFactory
|
||||
import org.apache.spark.broadcast.BroadcastManager
|
||||
@ -45,6 +44,8 @@ import org.apache.spark.shuffle.ShuffleManager
|
||||
import org.apache.spark.storage._
|
||||
import org.apache.spark.util.{RpcUtils, Utils}
|
||||
|
||||
import yaooqinn.kyuubi.utils.ReflectUtils
|
||||
|
||||
/**
|
||||
* :: DeveloperApi ::
|
||||
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
|
||||
@ -201,6 +202,34 @@ object SparkEnv extends Logging {
|
||||
/**
|
||||
* Create a SparkEnv for an executor.
|
||||
* In coarse-grained mode, the executor provides an RpcEnv that is already instantiated.
|
||||
* @since 2.2
|
||||
*/
|
||||
private[spark] def createExecutorEnv(
|
||||
conf: SparkConf,
|
||||
executorId: String,
|
||||
hostname: String,
|
||||
port: Int,
|
||||
numCores: Int,
|
||||
ioEncryptionKey: Option[Array[Byte]],
|
||||
isLocal: Boolean): SparkEnv = {
|
||||
val env = create(
|
||||
conf,
|
||||
executorId,
|
||||
hostname,
|
||||
hostname,
|
||||
Some(port),
|
||||
isLocal,
|
||||
numCores,
|
||||
ioEncryptionKey
|
||||
)
|
||||
SparkEnv.set(env)
|
||||
env
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a SparkEnv for an executor.
|
||||
* In coarse-grained mode, the executor provides an RpcEnv that is already instantiated.
|
||||
* @since 2.3
|
||||
*/
|
||||
private[spark] def createExecutorEnv(
|
||||
conf: SparkConf,
|
||||
@ -246,20 +275,30 @@ object SparkEnv extends Logging {
|
||||
}
|
||||
|
||||
val securityManager = new SecurityManager(conf, ioEncryptionKey)
|
||||
if (isDriver) {
|
||||
securityManager.initializeAuth()
|
||||
}
|
||||
|
||||
ioEncryptionKey.foreach { _ =>
|
||||
if (!securityManager.isEncryptionEnabled()) {
|
||||
logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
|
||||
"wire.")
|
||||
}
|
||||
}
|
||||
|
||||
val systemName = if (isDriver) driverSystemName else executorSystemName
|
||||
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
|
||||
securityManager, numUsableCores, !isDriver)
|
||||
|
||||
val rpcEnv = {
|
||||
val p = port.getOrElse(-1).asInstanceOf[Integer]
|
||||
|
||||
val isNotDriver = (!isDriver).asInstanceOf[java.lang.Boolean]
|
||||
if (KyuubiSparkUtil.isSparkVersionOrLater("2.3")) {
|
||||
ReflectUtils.invokeStaticMethod(
|
||||
classOf[RpcEnv],
|
||||
"create",
|
||||
Seq(classOf[String], classOf[String], classOf[String], classOf[Int],
|
||||
classOf[SparkConf], classOf[SecurityManager], classOf[Int], classOf[Boolean]),
|
||||
Seq(systemName, bindAddress, advertiseAddress, p, conf,
|
||||
securityManager, numUsableCores.asInstanceOf[Integer], isNotDriver))
|
||||
} else {
|
||||
ReflectUtils.invokeStaticMethod(
|
||||
classOf[RpcEnv],
|
||||
"create",
|
||||
Seq(classOf[String], classOf[String], classOf[String], classOf[Int],
|
||||
classOf[SparkConf], classOf[SecurityManager], classOf[Boolean]),
|
||||
Seq(systemName, bindAddress, advertiseAddress, p, conf, securityManager, isNotDriver))
|
||||
}
|
||||
}.asInstanceOf[RpcEnv]
|
||||
|
||||
// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
|
||||
if (isDriver) {
|
||||
@ -135,28 +135,24 @@ object KyuubiSubmit {
|
||||
// Special flag to avoid deprecation warnings at the client
|
||||
sysProps("SPARK_SUBMIT") = "true"
|
||||
|
||||
Seq[OptionAssigner](
|
||||
OptionAssigner(args.master, YARN, CLIENT, sysProp = "spark.master"),
|
||||
OptionAssigner(args.deployMode, YARN, CLIENT, sysProp = "spark.submit.deployMode"),
|
||||
OptionAssigner(args.name, YARN, CLIENT, sysProp = "spark.app.name"),
|
||||
OptionAssigner(args.driverMemory, YARN, CLIENT, sysProp = "spark.driver.memory"),
|
||||
OptionAssigner(args.driverExtraClassPath, YARN, CLIENT,
|
||||
sysProp = "spark.driver.extraClassPath"),
|
||||
OptionAssigner(args.driverExtraJavaOptions, YARN, CLIENT,
|
||||
sysProp = "spark.driver.extraJavaOptions"),
|
||||
OptionAssigner(args.driverExtraLibraryPath, YARN, CLIENT,
|
||||
sysProp = "spark.driver.extraLibraryPath"),
|
||||
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
|
||||
OptionAssigner(args.numExecutors, YARN, CLIENT,
|
||||
sysProp = "spark.executor.instances"),
|
||||
OptionAssigner(args.jars, YARN, CLIENT, sysProp = "spark.yarn.dist.jars"),
|
||||
OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"),
|
||||
OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),
|
||||
OptionAssigner(args.principal, YARN, CLIENT, sysProp = "spark.yarn.principal"),
|
||||
OptionAssigner(args.keytab, YARN, CLIENT, sysProp = "spark.yarn.keytab"),
|
||||
OptionAssigner(args.executorCores, YARN, CLIENT, sysProp = "spark.executor.cores"),
|
||||
OptionAssigner(args.executorMemory, YARN, CLIENT, sysProp = "spark.executor.memory")
|
||||
).filter(_.value != null).foreach(opt => sysProps.put(opt.sysProp, opt.value))
|
||||
Seq(
|
||||
"spark.master" -> args.master,
|
||||
"spark.submit.deployMode" -> args.deployMode,
|
||||
"spark.app.name" -> args.name,
|
||||
"spark.driver.memory" -> args.driverMemory,
|
||||
"spark.driver.extraClassPath" -> args.driverExtraClassPath,
|
||||
"spark.driver.extraJavaOptions" -> args.driverExtraJavaOptions,
|
||||
"spark.driver.extraLibraryPath" -> args.driverExtraLibraryPath,
|
||||
"spark.yarn.queue" -> args.queue,
|
||||
"spark.executor.instances" -> args.numExecutors,
|
||||
"spark.yarn.dist.jars" -> args.jars,
|
||||
"spark.yarn.dist.files" -> args.files,
|
||||
"spark.yarn.dist.archives" -> args.archives,
|
||||
"spark.yarn.principal" -> args.principal,
|
||||
"spark.yarn.keytab" -> args.keytab,
|
||||
"spark.executor.cores" -> args.executorCores,
|
||||
"spark.executor.memory" -> args.executorMemory
|
||||
).filter(_._2 != null).foreach(o => sysProps.put(o._1, o._2))
|
||||
|
||||
childClasspath += args.primaryResource
|
||||
if (args.jars != null) { childClasspath ++= args.jars.split(",") }
|
||||
|
||||
@ -20,6 +20,8 @@ package org.apache.spark.launcher
|
||||
import java.io.File
|
||||
import java.util.{List => JList, Map => JMap}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.spark.launcher.CommandBuilderUtils._
|
||||
|
||||
class KyuubiSubmitCommandBuilder(args: JList[String]) extends SparkSubmitCommandBuilder(args) {
|
||||
@ -55,9 +57,29 @@ class KyuubiSubmitCommandBuilder(args: JList[String]) extends SparkSubmitCommand
|
||||
cmd.add("-Xmx" + memory)
|
||||
addOptionString(cmd, driverExtraJavaOptions)
|
||||
mergeEnvPathList(env, getLibPathEnvName, config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH))
|
||||
addPermGenSizeOpt(cmd)
|
||||
addPermSize(cmd)
|
||||
cmd.add("org.apache.spark.deploy.KyuubiSubmit")
|
||||
cmd.addAll(buildSparkSubmitArgs)
|
||||
cmd
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the default perm gen size option for Spark if the VM requires it and the user hasn't
|
||||
* set it.
|
||||
*/
|
||||
private def addPermSize(cmd: JList[String]): Unit = {
|
||||
// Don't set MaxPermSize for IBM Java, or Oracle Java 8 and later.
|
||||
if (getJavaVendor eq JavaVendor.IBM) {
|
||||
return
|
||||
}
|
||||
if (javaMajorVersion(System.getProperty("java.version")) > 7) {
|
||||
return
|
||||
}
|
||||
cmd.asScala.foreach { arg =>
|
||||
if (arg.contains("-XX:MaxPermSize=")) {
|
||||
return
|
||||
}
|
||||
}
|
||||
cmd.add("-XX:MaxPermSize=256m")
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user