From bb12958874b844bc3356c26f1e570ea3ff08a93b Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Mon, 10 Sep 2018 08:49:31 -0700 Subject: [PATCH] Fix compile for Spark 2.4 SNAPSHOT and only catch NonFatal (#164) * only catch non-fatal exceptions * remove afterBenchmark for MLlib * fix compile * use Apache snapshot releases --- build.sbt | 4 +- .../databricks/spark/sql/perf/Benchmark.scala | 7 ++- .../spark/sql/perf/Benchmarkable.scala | 3 +- .../mllib/MLPipelineStageBenchmarkable.scala | 12 +---- .../org/apache/spark/ml/ModelBuilderSSP.scala | 53 ++++++++++++++++--- .../spark/sql/perf/mllib/MLLibSuite.scala | 3 +- 6 files changed, 58 insertions(+), 24 deletions(-) diff --git a/build.sbt b/build.sbt index 52d4bdd..654f155 100644 --- a/build.sbt +++ b/build.sbt @@ -14,7 +14,7 @@ sparkPackageName := "databricks/spark-sql-perf" // All Spark Packages need a license licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0")) -sparkVersion := "2.3.0" +sparkVersion := "2.4.0-SNAPSHOT" sparkComponents ++= Seq("sql", "hive", "mllib") @@ -44,6 +44,8 @@ libraryDependencies += "org.yaml" % "snakeyaml" % "1.17" libraryDependencies += "com.typesafe.scala-logging" %% "scala-logging-slf4j" % "2.1.2" +resolvers += "Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots" + fork := true // Your username to login to Databricks Cloud diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala index a4bbcbd..f2ce663 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -22,6 +22,7 @@ import scala.concurrent._ import scala.concurrent.duration._ import scala.language.implicitConversions import scala.util.{Success, Try, Failure => SFailure} +import scala.util.control.NonFatal import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, DataFrame, SQLContext} @@ -435,7 +436,9 @@ object Benchmark { .format("json") .save(resultPath) } catch { - case e: Throwable => logMessage(s"Failed to write data: $e") + case NonFatal(e) => + logMessage(s"Failed to write data: $e") + throw e } logCollection() @@ -448,7 +451,7 @@ object Benchmark { val location = cpu.collectLogs(sqlContext, fs, timestamp) logMessage(s"cpu results recorded to $location") } catch { - case e: Throwable => + case NonFatal(e) => logMessage(s"Error collecting logs: $e") throw e } diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala index 4c53b3a..62b12ea 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala @@ -22,6 +22,7 @@ import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging} import scala.concurrent.duration._ import scala.collection.mutable.ArrayBuffer +import scala.util.control.NonFatal import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkEnv, SparkContext} @@ -74,7 +75,7 @@ trait Benchmarkable extends Logging { try { result = doBenchmark(includeBreakdown, description, messages) } catch { - case e: Throwable => + case NonFatal(e) => logger.info(s"$that: failure in runBenchmark: $e") println(s"$that: failure in runBenchmark: $e") result = BenchmarkResult( diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala index 2807aaf..2ccd78d 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala @@ -1,6 +1,7 @@ package com.databricks.spark.sql.perf.mllib import scala.collection.mutable.ArrayBuffer +import scala.util.control.NonFatal import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging} @@ -37,21 +38,12 @@ class MLPipelineStageBenchmarkable( trainingData.cache() trainingData.count() } catch { - case e: Throwable => + case NonFatal(e) => println(s"$this error in beforeBenchmark: ${e.getStackTraceString}") throw e } } - override protected[mllib] def afterBenchmark(sc: SparkContext): Unit = { - // Best-effort clean up of weakly referenced RDDs, shuffles, and broadcasts - // Remove any leftover blocks that still exist - sc.getExecutorStorageStatus - .flatMap { status => status.blocks.map { case (bid, _) => bid } } - .foreach { bid => SparkEnv.get.blockManager.master.removeBlock(bid) } - super.afterBenchmark(sc) - } - override protected def doBenchmark( includeBreakdown: Boolean, description: String, diff --git a/src/main/scala/org/apache/spark/ml/ModelBuilderSSP.scala b/src/main/scala/org/apache/spark/ml/ModelBuilderSSP.scala index 511b645..39cebc0 100644 --- a/src/main/scala/org/apache/spark/ml/ModelBuilderSSP.scala +++ b/src/main/scala/org/apache/spark/ml/ModelBuilderSSP.scala @@ -45,6 +45,7 @@ object ModelBuilderSSP { s" but was given $numClasses") val rootNode = TreeBuilder.randomBalancedDecisionTree(depth = depth, labelType = numClasses, featureArity = featureArity, seed = seed) + .asInstanceOf[ClassificationNode] new DecisionTreeClassificationModel(rootNode, numFeatures = featureArity.length, numClasses = numClasses) } @@ -55,6 +56,7 @@ object ModelBuilderSSP { seed: Long): DecisionTreeRegressionModel = { val rootNode = TreeBuilder.randomBalancedDecisionTree(depth = depth, labelType = 0, featureArity = featureArity, seed = seed) + .asInstanceOf[RegressionNode] new DecisionTreeRegressionModel(rootNode, numFeatures = featureArity.length) } @@ -165,12 +167,45 @@ object TreeBuilder { ImpurityCalculator.getCalculator("gini", Array.fill[Double](labelType)(0.0)) } - randomBalancedDecisionTreeHelper(depth, featureArity, impurityCalculator, + randomBalancedDecisionTreeHelper(isRegression, depth, featureArity, impurityCalculator, labelGenerator, Set.empty, rng) } + private def createLeafNode( + isRegression: Boolean, + prediction: Double, + impurity: Double, + impurityStats: ImpurityCalculator): LeafNode = { + if (isRegression) { + new RegressionLeafNode(prediction, impurity, impurityStats) + } else { + new ClassificationLeafNode(prediction, impurity, impurityStats) + } + } + + private def createInternalNode( + isRegression: Boolean, + prediction: Double, + impurity: Double, + gain: Double, + leftChild: Node, + rightChild: Node, + split: Split, + impurityStats: ImpurityCalculator): InternalNode = { + if (isRegression) { + new RegressionInternalNode(prediction, impurity, gain, + leftChild.asInstanceOf[RegressionNode], rightChild.asInstanceOf[RegressionNode], + split, impurityStats) + } else { + new ClassificationInternalNode(prediction, impurity, gain, + leftChild.asInstanceOf[ClassificationNode], rightChild.asInstanceOf[ClassificationNode], + split, impurityStats) + } + } + /** * Create an internal node. Either create the leaf nodes beneath it, or recurse as needed. + * @param isRegression Whether the tree is a regressor or not (classifier) * @param subtreeDepth Depth of subtree to build. Depth 0 means this is a leaf node. * @param featureArity Indicates feature type. Value 0 indicates continuous feature. * Other values >= 2 indicate a categorical feature, @@ -182,6 +217,7 @@ object TreeBuilder { * @return */ private def randomBalancedDecisionTreeHelper( + isRegression: Boolean, subtreeDepth: Int, featureArity: Array[Int], impurityCalculator: ImpurityCalculator, @@ -191,7 +227,7 @@ object TreeBuilder { if (subtreeDepth == 0) { // This case only happens for a depth 0 tree. - return new LeafNode(prediction = 0.0, impurity = 0.0, impurityStats = impurityCalculator) + createLeafNode(isRegression, prediction = 0.0, impurity = 0.0, impurityCalculator) } val numFeatures = featureArity.length @@ -221,19 +257,20 @@ object TreeBuilder { val (leftChild: Node, rightChild: Node) = if (subtreeDepth == 1) { // Add leaf nodes. Assign these jointly so they make different predictions. val predictions = labelGenerator.nextValue() - val leftChild = new LeafNode(prediction = predictions._1, impurity = 0.0, + val leftChild = createLeafNode(isRegression, prediction = predictions._1, impurity = 0.0, impurityStats = impurityCalculator) - val rightChild = new LeafNode(prediction = predictions._2, impurity = 0.0, + val rightChild = createLeafNode(isRegression, prediction = predictions._2, impurity = 0.0, impurityStats = impurityCalculator) (leftChild, rightChild) } else { - val leftChild = randomBalancedDecisionTreeHelper(subtreeDepth - 1, featureArity, + val leftChild = randomBalancedDecisionTreeHelper(isRegression, subtreeDepth - 1, featureArity, impurityCalculator, labelGenerator, usedFeatures + feature, rng) - val rightChild = randomBalancedDecisionTreeHelper(subtreeDepth - 1, featureArity, + val rightChild = randomBalancedDecisionTreeHelper(isRegression, subtreeDepth - 1, featureArity, impurityCalculator, labelGenerator, usedFeatures + feature, rng) (leftChild, rightChild) } - new InternalNode(prediction = 0.0, impurity = 0.0, gain = 0.0, leftChild = leftChild, - rightChild = rightChild, split = split, impurityStats = impurityCalculator) + createInternalNode(isRegression, prediction = 0.0, impurity = 0.0, gain = 0.0, + leftChild = leftChild, rightChild = rightChild, split = split, + impurityStats = impurityCalculator) } } diff --git a/src/test/scala/com/databricks/spark/sql/perf/mllib/MLLibSuite.scala b/src/test/scala/com/databricks/spark/sql/perf/mllib/MLLibSuite.scala index c622155..7e72f5e 100644 --- a/src/test/scala/com/databricks/spark/sql/perf/mllib/MLLibSuite.scala +++ b/src/test/scala/com/databricks/spark/sql/perf/mllib/MLLibSuite.scala @@ -41,11 +41,10 @@ class MLLibSuite extends FunSuite with BeforeAndAfterAll { } } - test("test before & after benchmark methods for pipeline benchmarks.") { + test("test before benchmark methods for pipeline benchmarks.") { val benchmarks = MLLib.getBenchmarks(MLLib.getConf(yamlConfig = MLLib.smallConfig)) benchmarks.foreach { b => b.beforeBenchmark() - b.afterBenchmark(sparkSession.sparkContext) } } }