verify resource setting and add some comments

This commit is contained in:
Kent Yao 2018-07-26 16:36:16 +08:00
parent 94ff3f65f3
commit 248259d84f
9 changed files with 139 additions and 79 deletions

View File

@ -25,7 +25,7 @@ echo_build_properties() {
echo kyuubi_version=$1
echo spark_version=$2
echo user="$USER"
echo jar=kyuubi-$1.jar
echo jar=kyuubi-server-$1.jar
echo revision=$(git rev-parse HEAD)
echo branch=$(git rev-parse --abbrev-ref HEAD)
echo date=$(date -u +%Y-%m-%dT%H:%M:%SZ)

View File

@ -166,6 +166,20 @@
<goal>run</goal>
</goals>
</execution>
<execution>
<id>fake-jar-test</id>
<phase>generate-test-resources</phase>
<configuration>
<target>
<exec executable="touch">
<arg value="${project.build.testOutputDirectory}/kyuubi-server-${version}.jar"/>
</exec>
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
@ -288,7 +302,7 @@
<argLine>-Xmx3g -Xss4m -XX:ReservedCodeCacheSize=512m</argLine>
<stderr/>
<environmentVariables>
<KYUUBI_JAR>${project.build.testOutputDirectory}/kyuubi-fake.jar</KYUUBI_JAR>
<KYUUBI_JAR>${project.build.testOutputDirectory}/kyuubi-server-${version}.jar</KYUUBI_JAR>
</environmentVariables>
<systemProperties>
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>

View File

@ -54,6 +54,6 @@ object KyuubiDistributedCacheManager {
link: String,
statCache: Map[URI, FileStatus]): Unit = {
cacheManager.addResource(fs, conf, destPath,
localResources, resourceType, link, statCache, true)
localResources, resourceType, link, statCache, appMasterOnly = true)
}
}

View File

@ -39,6 +39,9 @@ import yaooqinn.kyuubi.ha.HighAvailabilityUtils
import yaooqinn.kyuubi.server.KyuubiServer
import yaooqinn.kyuubi.service.CompositeService
/**
* The ApplicationMaster which runs Kyuubi Server inside it.
*/
class KyuubiAppMaster private(name: String) extends CompositeService(name)
with Logging {
@ -206,4 +209,4 @@ object KyuubiAppMaster extends Logging {
System.exit(-1)
}
}
}
}

View File

@ -42,15 +42,15 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.YarnException
import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{KyuubiSparkUtil, SparkConf}
import org.apache.spark.deploy.yarn.KyuubiDistributedCacheManager
import yaooqinn.kyuubi.{Logging, _}
import yaooqinn.kyuubi.service.ServiceException
/**
* A Yarn Client for submiting Kyuubi On Yarn,
* A Yarn Client for submitting Kyuubi towards Yarn
* @param conf SparkConf
*/
private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
@ -62,8 +62,8 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
yarnClient.init(hadoopConf)
yarnClient.start()
private[this] val memory = conf.getSizeAsMb(KyuubiSparkUtil.DRIVER_MEM, "1024m").toInt
private[this] val memoryOverhead =
private[this] var memory = conf.getSizeAsMb(KyuubiSparkUtil.DRIVER_MEM, "1024m").toInt
private[this] var memoryOverhead =
conf.getSizeAsMb(KyuubiSparkUtil.DRIVER_MEM_OVERHEAD, (memory * 0.1).toInt + "m").toInt
private[this] val cores = conf.getInt(KyuubiSparkUtil.DRIVER_CORES, 1)
private[this] val principal = conf.get(KyuubiSparkUtil.PRINCIPAL, "")
@ -80,10 +80,21 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
private[this] var appId: ApplicationId = _
private[this] var appStagingDir: Path = _
@throws[IOException]
@throws[YarnException]
def submit(): Unit = {
try {
val app = yarnClient.createApplication()
val appRes = app.getNewApplicationResponse
val maxMemory = appRes.getMaximumResourceCapability.getMemory
if (memory + memoryOverhead > maxMemory) {
warn(s"Requesting AM memory(${memory}m + ${memoryOverhead}m) exceeds cluster's limit")
memory = (maxMemory * 0.9).toInt
memoryOverhead = (maxMemory * 0.1).toInt
warn(s"Adjusting Kyuubi container to ${memory}B heap and ${memoryOverhead}m overhead.")
} else {
info(s"Will allocate Kyuubi container with ${memory}m heap and ${memoryOverhead}m overhead")
}
appId = appRes.getApplicationId
appStagingDir = new Path(stagingHome, buildPath(KYUUBI_STAGING, appId.toString))
val containerContext = createContainerLaunchContext()
@ -126,6 +137,12 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
}
}
/**
* Describe container information and create the context for launching Application Master
*
* @return ContainerLaunchContext which contains all information for NodeManager to launch
* Application Master
*/
private[this] def createContainerLaunchContext(): ContainerLaunchContext = {
val launchEnv = setupLaunchEnv()
val localResources = prepareLocalResources()
@ -188,16 +205,18 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
appContext.setResource(capability)
appContext
}
/**
* Set up the environment for launching our AM container runs Kyuubi Server.
* Set up the environment for launching the ApplicationMaster which wraps Kyuubi Server.
*/
private[this] def setupLaunchEnv(): ENV = {
private[this] def setupLaunchEnv(): EnvMap = {
info("Setting up the launch environment for our Kyuubi Server container")
val env = new ENV
val env = new EnvMap
populateClasspath(hadoopConf, env)
env("KYUUBI_YARN_STAGING_DIR") = appStagingDir.toString
// Point to Localized Spark jars
env("KYUUBI_YARN_SPARK_JARS") = buildPath(Environment.PWD.$$(), SPARK_LIB_DIR, "*")
val amEnvPrefix = "spark.yarn.appMasterEnv."
val amEnvPrefix = "spark.kyuubi.yarn.appMasterEnv."
conf.getAll
.filter(_._1.startsWith(amEnvPrefix))
.map { case (k, v) => (k.stripPrefix(amEnvPrefix), v) }
@ -205,35 +224,32 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
env
}
private[this] def prepareLocalResources(): RESOURCES = {
@throws[IOException]
private[this] def prepareLocalResources(): HashMap[String, LocalResource] = {
info("Preparing resources for our AM container to start Kyuubi Server")
val fs = appStagingDir.getFileSystem(hadoopConf)
val tokenRenewer = Master.getMasterPrincipal(hadoopConf)
debug("Delegation token renewer is: " + tokenRenewer)
if (tokenRenewer == null || tokenRenewer.length() == 0) {
val errorMessage = "Can't get Master Kerberos principal for use as renewer."
error(errorMessage)
throw new ServiceException(errorMessage)
error("Can't get any Master Kerberos principal for use as renewer.")
} else {
fs.addDelegationTokens(tokenRenewer, creds)
}
fs.addDelegationTokens(tokenRenewer, creds)
val localResources = new RESOURCES
val localResources = new HashMap[String, LocalResource]
FileSystem.mkdirs(fs, appStagingDir, new FsPermission(STAGING_DIR_PERMISSION))
val symlinkCache: Map[URI, Path] = HashMap[URI, Path]()
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
/**
* Distribute a file to the cluster.
* Copy the non-local file to HDFS (if not already there) and add it to the Application Master's
* local resources
*
* If the file's path is a "local:" URI, it's actually not distributed. Other files are copied
* to HDFS (if not already there) and added to the application's distributed cache.
*
* @param path URI of the file to distribute.
* @param resType Type of resource being distributed.
* @param destName Name of the file in the distributed cache.
* @param path the canonical path of the file.
* @param resType the type of resource being distributed.
* @param destName the of the file in the distributed cache.
* @param targetDir Subdirectory where to place the file.
*/
def distribute(
def upload(
path: String,
resType: LocalResourceType = LocalResourceType.FILE,
destName: Option[String] = None,
@ -251,12 +267,13 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
}
}
// Upload keytab if exists
if (loginFromKeytab) {
distribute(keytabOrigin, destName = Option(keytabForAM))
upload(keytabOrigin, destName = Option(keytabForAM))
}
// Add KYUUBI jar to the cache
distribute(kyuubiJar)
upload(kyuubiJar)
// Add Spark to the cache
val jarsDir = new File(KyuubiSparkUtil.SPARK_JARS_DIR)
@ -277,7 +294,7 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
jarsStream.close()
}
distribute(jarsArchive.toURI.getPath, resType = LocalResourceType.ARCHIVE,
upload(jarsArchive.toURI.getPath, resType = LocalResourceType.ARCHIVE,
destName = Some(SPARK_LIB_DIR))
jarsArchive.delete()
@ -334,15 +351,7 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
try {
confStream.setLevel(0)
// Upload $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that
// the executors will use the latest configurations instead of the default values. This is
// required when user changes log4j.properties directly to set the log configurations. If
// configuration file is provided through --files then executors will be taking configurations
// from --files instead of $SPARK_CONF_DIR/log4j.properties.
// Also upload metrics.properties to distributed cache if exists in classpath.
// If user specify this file using --files then executors will use the one
// from --files instead.
// 1. log4j and metrics
for { prop <- Seq("log4j.properties", "metrics.properties")
url <- Option(KyuubiSparkUtil.getContextOrSparkClassLoader.getResource(prop))
if url.getProtocol == "file" } {
@ -352,8 +361,7 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
confStream.closeEntry()
}
// Save the Hadoop config files under a separate directory in the archive. This directory
// is appended to the classpath so that the cluster-provided configuration takes precedence.
// 2. hadoop/hive and other xml configurations
confStream.putNextEntry(new ZipEntry(s"$HADOOP_CONF_DIR/"))
confStream.closeEntry()
hadoopConfFiles.foreach { case (name, file) =>
@ -364,6 +372,7 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
}
}
// 3. spark conf
val props = new Properties()
conf.getAll.foreach { case (k, v) =>
props.setProperty(k, v)
@ -386,7 +395,7 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
* Copy the given file to a remote file system (e.g. HDFS) if needed.
* The file is only copied if the source and destination file systems are different or the source
* scheme is "file". This is used for preparing resources for launching the ApplicationMaster
* container. Exposed for testing.
* container.
*/
private[this] def copyFileToRemote(destDir: Path, srcPath: Path,
cache: Map[URI, Path], destName: Option[String] = None): Path = {
@ -410,7 +419,14 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
object KyuubiYarnClient {
private val kyuubiJar = System.getenv("KYUUBI_JAR")
/**
* Load kyuubi server jar, choose the specified kyuubi jar first, try the default compiled one
* if not found
*/
private val kyuubiJar =
Option(System.getenv("KYUUBI_JAR"))
.getOrElse(KyuubiSparkUtil.getContextOrSparkClassLoader
.getResource(KYUUBI_JAR_NAME).getPath)
/**
* Given a local URI, resolve it and return a qualified local path that corresponds to the URI.
@ -439,7 +455,7 @@ object KyuubiYarnClient {
* Add a path variable to the given environment map.
* If the map already contains this key, append the value to the existing value instead.
*/
def addPathToEnvironment(env: ENV, key: String, value: String): Unit = {
def addPathToEnvironment(env: EnvMap, key: String, value: String): Unit = {
val newValue =
if (env.contains(key)) {
env(key) + ApplicationConstants.CLASS_PATH_SEPARATOR + value
@ -453,10 +469,10 @@ object KyuubiYarnClient {
* Add the given path to the classpath entry of the given environment map.
* If the classpath is already set, this appends the new path to the existing classpath.
*/
def addClasspathEntry(path: String, env: ENV): Unit =
def addClasspathEntry(path: String, env: EnvMap): Unit =
addPathToEnvironment(env, Environment.CLASSPATH.name, path)
def populateClasspath(conf: Configuration, env: ENV): Unit = {
def populateClasspath(conf: Configuration, env: EnvMap): Unit = {
addClasspathEntry(Environment.PWD.$$(), env)
addClasspathEntry(buildPath(Environment.PWD.$$(), SPARK_CONF_DIR), env)
addClasspathEntry(buildPath(Environment.PWD.$$(), new File(kyuubiJar).getName), env)
@ -469,7 +485,7 @@ object KyuubiYarnClient {
* Populate the classpath entry in the given environment map with any application
* classpath specified through the Hadoop and Yarn configurations.
*/
private[yarn] def populateHadoopClasspath(conf: Configuration, env: ENV): Unit = {
private[yarn] def populateHadoopClasspath(conf: Configuration, env: EnvMap): Unit = {
val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf)
classPathElementsToAdd.foreach { c =>
addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim)

View File

@ -20,12 +20,10 @@ package yaooqinn.kyuubi
import scala.collection.mutable.HashMap
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.yarn.api.records.LocalResource
package object yarn {
type ENV = HashMap[String, String]
type RESOURCES = HashMap[String, LocalResource]
type EnvMap = HashMap[String, String]
val KYUUBI_YARN_APP_NAME = "KYUUBI SERVER"
val KYUUBI_YARN_APP_TYPE = "KYUUBI"

View File

@ -23,7 +23,7 @@
</logger>
<logger name="org.apache.zookeeper.ClientCnxn" additivity="false">
<level value="ERROR" />
<appender-ref ref="Console" />
<appender-ref ref="console" />
</logger>
<root>
<level value="WARN" />

View File

@ -17,6 +17,7 @@
package yaooqinn.kyuubi.yarn
import java.net.URI
import scala.collection.mutable
@ -26,7 +27,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse
import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationSubmissionContext}
import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationSubmissionContext, Resource}
import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite}
@ -34,7 +35,7 @@ import org.mockito.Mockito.{doNothing, when}
import org.scalatest.Matchers
import org.scalatest.mock.MockitoSugar
import yaooqinn.kyuubi.service.ServiceException
import yaooqinn.kyuubi.KYUUBI_JAR_NAME
import yaooqinn.kyuubi.utils.ReflectUtils
class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSugar {
@ -90,7 +91,7 @@ class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSuga
val cp = classpath(env)
cp should contain("{{PWD}}")
cp should contain(Environment.PWD.$$() + Path.SEPARATOR + SPARK_CONF_DIR)
cp should contain(KyuubiYarnClient.buildPath(Environment.PWD.$$(), "kyuubi-fake.jar"))
cp should contain(KyuubiYarnClient.buildPath(Environment.PWD.$$(), KYUUBI_JAR_NAME))
cp should contain(KyuubiYarnClient.buildPath(Environment.PWD.$$(), SPARK_LIB_DIR, "*"))
cp should contain(yarnDefCP.head)
cp should contain(mrDefCP.head)
@ -124,7 +125,6 @@ class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSuga
val uri3 = KyuubiSparkUtil.resolveURI("hdfs://1")
val path3 = KyuubiYarnClient.getQualifiedLocalPath(uri3, conf)
path3.toUri should be(uri3)
}
test("kyuubi yarn client init") {
@ -140,30 +140,47 @@ class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSuga
assert(ReflectUtils.getFieldValue(client2, "loginFromKeytab").asInstanceOf[Boolean])
}
/**
* test with mavne. not ide
*/
test("submit application") {
val conf = new SparkConf()
val client = new KyuubiYarnClient(conf)
val yarnClient = mock[YarnClient]
ReflectUtils.setFieldValue(client, "yarnClient", yarnClient)
doNothing().when(yarnClient).start()
val app = mock[YarnClientApplication]
when(yarnClient.createApplication()).thenReturn(app)
intercept[NullPointerException](client.submit()) // app res = null
val appRes = mock[GetNewApplicationResponse]
when(app.getNewApplicationResponse).thenReturn(appRes)
intercept[NullPointerException](client.submit()) // appid = null
val appId = mock[ApplicationId]
when(appRes.getApplicationId).thenReturn(appId)
when(appId.toString).thenReturn("appId1")
val jarName = "kyuubi-fake.jar"
val kyuubiJar = this.getClass.getClassLoader.getResource(jarName).getPath
ReflectUtils.setFieldValue(KyuubiSparkUtil, "SPARK_JARS_DIR", kyuubiJar.stripSuffix(jarName))
val appContext = mock[ApplicationSubmissionContext]
when(app.getApplicationSubmissionContext).thenReturn(appContext)
intercept[ServiceException](client.submit())
test("submit with exceeded memory") {
withClient { (c, kc) =>
val app = mock[YarnClientApplication]
when(c.createApplication()).thenReturn(app)
val appRes = mock[GetNewApplicationResponse]
when(app.getNewApplicationResponse).thenReturn(appRes)
val resource = mock[Resource]
when(appRes.getMaximumResourceCapability).thenReturn(resource)
when(resource.getMemory).thenReturn(10)
val appId = mock[ApplicationId]
when(appRes.getApplicationId).thenReturn(appId)
when(appId.toString).thenReturn("appId1")
val appContext = mock[ApplicationSubmissionContext]
when(app.getApplicationSubmissionContext).thenReturn(appContext)
kc.submit()
ReflectUtils.getFieldValue(kc, "yaooqinn$kyuubi$yarn$KyuubiYarnClient$$memory") should be(9)
ReflectUtils.getFieldValue(kc,
"yaooqinn$kyuubi$yarn$KyuubiYarnClient$$memoryOverhead") should be(1)
}
}
test("submit with suitable memory") {
withClient { (c, kc) =>
val app = mock[YarnClientApplication]
when(c.createApplication()).thenReturn(app)
val appRes = mock[GetNewApplicationResponse]
when(app.getNewApplicationResponse).thenReturn(appRes)
val resource = mock[Resource]
when(appRes.getMaximumResourceCapability).thenReturn(resource)
when(resource.getMemory).thenReturn(2048 *1024)
val appId = mock[ApplicationId]
when(appRes.getApplicationId).thenReturn(appId)
when(appId.toString).thenReturn("appId1")
val appContext = mock[ApplicationSubmissionContext]
when(app.getApplicationSubmissionContext).thenReturn(appContext)
kc.submit()
ReflectUtils.getFieldValue(kc,
"yaooqinn$kyuubi$yarn$KyuubiYarnClient$$memory") should be(1024)
ReflectUtils.getFieldValue(kc,
"yaooqinn$kyuubi$yarn$KyuubiYarnClient$$memoryOverhead") should be(102)
}
}
def withConf(map: Map[String, String] = Map.empty)(testCode: Configuration => Any): Unit = {
@ -172,6 +189,18 @@ class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSuga
testCode(conf)
}
def withClient(f: (YarnClient, KyuubiYarnClient) => Any): Unit = {
val conf = new SparkConf()
val client = new KyuubiYarnClient(conf)
val yarnClient = mock[YarnClient]
ReflectUtils.setFieldValue(client, "yarnClient", yarnClient)
doNothing().when(yarnClient).start()
val kyuubiJar = this.getClass.getClassLoader.getResource(KYUUBI_JAR_NAME).getPath
ReflectUtils.setFieldValue(KyuubiSparkUtil,
"SPARK_JARS_DIR", kyuubiJar.stripSuffix(KYUUBI_JAR_NAME))
f(yarnClient, client)
}
def classpath(env: mutable.HashMap[String, String]): Array[String] =
env(Environment.CLASSPATH.name).split(":|;|<CPS>")
}