Fix compile for Spark 2.4 SNAPSHOT and only catch NonFatal (#164)

* only catch non-fatal exceptions

* remove afterBenchmark for MLlib

* fix compile

* use Apache snapshot releases
This commit is contained in:
Xiangrui Meng 2018-09-10 08:49:31 -07:00 committed by GitHub
parent 0ab6bf606b
commit bb12958874
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 58 additions and 24 deletions

View File

@ -14,7 +14,7 @@ sparkPackageName := "databricks/spark-sql-perf"
// All Spark Packages need a license
licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0"))
sparkVersion := "2.3.0"
sparkVersion := "2.4.0-SNAPSHOT"
sparkComponents ++= Seq("sql", "hive", "mllib")
@ -44,6 +44,8 @@ libraryDependencies += "org.yaml" % "snakeyaml" % "1.17"
libraryDependencies += "com.typesafe.scala-logging" %% "scala-logging-slf4j" % "2.1.2"
resolvers += "Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots"
fork := true
// Your username to login to Databricks Cloud

View File

@ -22,6 +22,7 @@ import scala.concurrent._
import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.util.{Success, Try, Failure => SFailure}
import scala.util.control.NonFatal
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, DataFrame, SQLContext}
@ -435,7 +436,9 @@ object Benchmark {
.format("json")
.save(resultPath)
} catch {
case e: Throwable => logMessage(s"Failed to write data: $e")
case NonFatal(e) =>
logMessage(s"Failed to write data: $e")
throw e
}
logCollection()
@ -448,7 +451,7 @@ object Benchmark {
val location = cpu.collectLogs(sqlContext, fs, timestamp)
logMessage(s"cpu results recorded to $location")
} catch {
case e: Throwable =>
case NonFatal(e) =>
logMessage(s"Error collecting logs: $e")
throw e
}

View File

@ -22,6 +22,7 @@ import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging}
import scala.concurrent.duration._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkEnv, SparkContext}
@ -74,7 +75,7 @@ trait Benchmarkable extends Logging {
try {
result = doBenchmark(includeBreakdown, description, messages)
} catch {
case e: Throwable =>
case NonFatal(e) =>
logger.info(s"$that: failure in runBenchmark: $e")
println(s"$that: failure in runBenchmark: $e")
result = BenchmarkResult(

View File

@ -1,6 +1,7 @@
package com.databricks.spark.sql.perf.mllib
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging}
@ -37,21 +38,12 @@ class MLPipelineStageBenchmarkable(
trainingData.cache()
trainingData.count()
} catch {
case e: Throwable =>
case NonFatal(e) =>
println(s"$this error in beforeBenchmark: ${e.getStackTraceString}")
throw e
}
}
override protected[mllib] def afterBenchmark(sc: SparkContext): Unit = {
// Best-effort clean up of weakly referenced RDDs, shuffles, and broadcasts
// Remove any leftover blocks that still exist
sc.getExecutorStorageStatus
.flatMap { status => status.blocks.map { case (bid, _) => bid } }
.foreach { bid => SparkEnv.get.blockManager.master.removeBlock(bid) }
super.afterBenchmark(sc)
}
override protected def doBenchmark(
includeBreakdown: Boolean,
description: String,

View File

@ -45,6 +45,7 @@ object ModelBuilderSSP {
s" but was given $numClasses")
val rootNode = TreeBuilder.randomBalancedDecisionTree(depth = depth, labelType = numClasses,
featureArity = featureArity, seed = seed)
.asInstanceOf[ClassificationNode]
new DecisionTreeClassificationModel(rootNode, numFeatures = featureArity.length,
numClasses = numClasses)
}
@ -55,6 +56,7 @@ object ModelBuilderSSP {
seed: Long): DecisionTreeRegressionModel = {
val rootNode = TreeBuilder.randomBalancedDecisionTree(depth = depth, labelType = 0,
featureArity = featureArity, seed = seed)
.asInstanceOf[RegressionNode]
new DecisionTreeRegressionModel(rootNode, numFeatures = featureArity.length)
}
@ -165,12 +167,45 @@ object TreeBuilder {
ImpurityCalculator.getCalculator("gini", Array.fill[Double](labelType)(0.0))
}
randomBalancedDecisionTreeHelper(depth, featureArity, impurityCalculator,
randomBalancedDecisionTreeHelper(isRegression, depth, featureArity, impurityCalculator,
labelGenerator, Set.empty, rng)
}
private def createLeafNode(
isRegression: Boolean,
prediction: Double,
impurity: Double,
impurityStats: ImpurityCalculator): LeafNode = {
if (isRegression) {
new RegressionLeafNode(prediction, impurity, impurityStats)
} else {
new ClassificationLeafNode(prediction, impurity, impurityStats)
}
}
private def createInternalNode(
isRegression: Boolean,
prediction: Double,
impurity: Double,
gain: Double,
leftChild: Node,
rightChild: Node,
split: Split,
impurityStats: ImpurityCalculator): InternalNode = {
if (isRegression) {
new RegressionInternalNode(prediction, impurity, gain,
leftChild.asInstanceOf[RegressionNode], rightChild.asInstanceOf[RegressionNode],
split, impurityStats)
} else {
new ClassificationInternalNode(prediction, impurity, gain,
leftChild.asInstanceOf[ClassificationNode], rightChild.asInstanceOf[ClassificationNode],
split, impurityStats)
}
}
/**
* Create an internal node. Either create the leaf nodes beneath it, or recurse as needed.
* @param isRegression Whether the tree is a regressor or not (classifier)
* @param subtreeDepth Depth of subtree to build. Depth 0 means this is a leaf node.
* @param featureArity Indicates feature type. Value 0 indicates continuous feature.
* Other values >= 2 indicate a categorical feature,
@ -182,6 +217,7 @@ object TreeBuilder {
* @return
*/
private def randomBalancedDecisionTreeHelper(
isRegression: Boolean,
subtreeDepth: Int,
featureArity: Array[Int],
impurityCalculator: ImpurityCalculator,
@ -191,7 +227,7 @@ object TreeBuilder {
if (subtreeDepth == 0) {
// This case only happens for a depth 0 tree.
return new LeafNode(prediction = 0.0, impurity = 0.0, impurityStats = impurityCalculator)
createLeafNode(isRegression, prediction = 0.0, impurity = 0.0, impurityCalculator)
}
val numFeatures = featureArity.length
@ -221,19 +257,20 @@ object TreeBuilder {
val (leftChild: Node, rightChild: Node) = if (subtreeDepth == 1) {
// Add leaf nodes. Assign these jointly so they make different predictions.
val predictions = labelGenerator.nextValue()
val leftChild = new LeafNode(prediction = predictions._1, impurity = 0.0,
val leftChild = createLeafNode(isRegression, prediction = predictions._1, impurity = 0.0,
impurityStats = impurityCalculator)
val rightChild = new LeafNode(prediction = predictions._2, impurity = 0.0,
val rightChild = createLeafNode(isRegression, prediction = predictions._2, impurity = 0.0,
impurityStats = impurityCalculator)
(leftChild, rightChild)
} else {
val leftChild = randomBalancedDecisionTreeHelper(subtreeDepth - 1, featureArity,
val leftChild = randomBalancedDecisionTreeHelper(isRegression, subtreeDepth - 1, featureArity,
impurityCalculator, labelGenerator, usedFeatures + feature, rng)
val rightChild = randomBalancedDecisionTreeHelper(subtreeDepth - 1, featureArity,
val rightChild = randomBalancedDecisionTreeHelper(isRegression, subtreeDepth - 1, featureArity,
impurityCalculator, labelGenerator, usedFeatures + feature, rng)
(leftChild, rightChild)
}
new InternalNode(prediction = 0.0, impurity = 0.0, gain = 0.0, leftChild = leftChild,
rightChild = rightChild, split = split, impurityStats = impurityCalculator)
createInternalNode(isRegression, prediction = 0.0, impurity = 0.0, gain = 0.0,
leftChild = leftChild, rightChild = rightChild, split = split,
impurityStats = impurityCalculator)
}
}

View File

@ -41,11 +41,10 @@ class MLLibSuite extends FunSuite with BeforeAndAfterAll {
}
}
test("test before & after benchmark methods for pipeline benchmarks.") {
test("test before benchmark methods for pipeline benchmarks.") {
val benchmarks = MLLib.getBenchmarks(MLLib.getConf(yamlConfig = MLLib.smallConfig))
benchmarks.foreach { b =>
b.beforeBenchmark()
b.afterBenchmark(sparkSession.sparkContext)
}
}
}