diff --git a/README.md b/README.md index 64dff6f..e265c46 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![Build Status](https://travis-ci.org/databricks/spark-sql-perf.svg)](https://travis-ci.org/databricks/spark-sql-perf) -This is a performance testing framework for [Spark SQL](https://spark.apache.org/sql/) in [Apache Spark](https://spark.apache.org/) 1.6+. +This is a performance testing framework for [Spark SQL](https://spark.apache.org/sql/) in [Apache Spark](https://spark.apache.org/) 2.2+. **Note: This README is still under development. Please also check our source code for more information.** @@ -26,6 +26,11 @@ Usage: spark-sql-perf [options] $ bin/run --benchmark DatasetPerformance ``` +### MLlib tests + +To run MLlib tests, run `/bin/run-ml yamlfile`, where `yamlfile` is the path to a YAML configuration +file describing tests to run and their parameters. + # TPC-DS ## How to use it diff --git a/bin/run-ml b/bin/run-ml new file mode 100755 index 0000000..4819ab5 --- /dev/null +++ b/bin/run-ml @@ -0,0 +1,8 @@ +#!/bin/bash + +# runs spark-sql-perf ML tests from the current directory +# accepts a single command-line argument; the path to a YAML configuration file +# specifying the tests to be run & their params + +ARGS="runMLBenchmark $@" +build/sbt "$ARGS" diff --git a/build.sbt b/build.sbt index cd292a5..825dd03 100644 --- a/build.sbt +++ b/build.sbt @@ -14,7 +14,7 @@ sparkPackageName := "databricks/spark-sql-perf" // All Spark Packages need a license licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0")) -sparkVersion := "2.0.1" +sparkVersion := "2.2.0" sparkComponents ++= Seq("sql", "hive", "mllib") @@ -72,6 +72,19 @@ runBenchmark := { streams.value.log) } + +val runMLBenchmark = inputKey[Unit]("runs an ML benchmark") + +runMLBenchmark := { + import complete.DefaultParsers._ + val args = spaceDelimited("[args]").parsed + val scalaRun = (runner in run).value + val classpath = (fullClasspath in Compile).value + scalaRun.run("com.databricks.spark.sql.perf.mllib.MLLib", classpath.map(_.data), args, + streams.value.log) +} + + import ReleaseTransformations._ /** Push to the team directory instead of the user's homedir for releases. */ diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala index 4ff08ea..a4bbcbd 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -334,7 +334,7 @@ object Benchmark { .flatMap { query => try { query.newDataFrame().queryExecution.logical.collect { - case UnresolvedRelation(t, _) => t.table + case UnresolvedRelation(t) => t.table } } catch { // ignore the queries that can't be parsed diff --git a/src/main/scala/com/databricks/spark/sql/perf/Query.scala b/src/main/scala/com/databricks/spark/sql/perf/Query.scala index 7a8f75e..b3b7255 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Query.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Query.scala @@ -16,9 +16,9 @@ package com.databricks.spark.sql.perf -import scala.language.implicitConversions import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.language.implicitConversions import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -54,7 +54,7 @@ class Query( } lazy val tablesInvolved = buildDataFrame.queryExecution.logical collect { - case UnresolvedRelation(tableIdentifier, _) => { + case UnresolvedRelation(tableIdentifier) => { // We are ignoring the database name. tableIdentifier.table } @@ -85,14 +85,14 @@ class Query( val breakdownResults = if (includeBreakdown) { val depth = queryExecution.executedPlan.collect { case p: SparkPlan => p }.size - val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan(i))) + val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan.p(i))) val indexMap = physicalOperators.map { case (index, op) => (op, index) }.toMap val timeMap = new mutable.HashMap[Int, Double] physicalOperators.reverse.map { case (index, node) => messages += s"Breakdown: ${node.simpleString}" - val newNode = buildDataFrame.queryExecution.executedPlan(index) + val newNode = buildDataFrame.queryExecution.executedPlan.p(index) val executionTime = measureTimeMs { newNode.execute().foreach((row: Any) => Unit) } @@ -100,7 +100,6 @@ class Query( val childIndexes = node.children.map(indexMap) val childTime = childIndexes.map(timeMap).sum - messages += s"Breakdown time: $executionTime (+${executionTime - childTime})" BreakdownResult( diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala index 858f911..8b15f1a 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala @@ -3,7 +3,7 @@ package com.databricks.spark.sql.perf.mllib import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging} import org.apache.spark.ml.attribute.{NominalAttribute, NumericAttribute} -import org.apache.spark.ml.{Estimator, Transformer} +import org.apache.spark.ml.{Estimator, PipelineStage, Transformer} import org.apache.spark.ml.evaluation.Evaluator import org.apache.spark.sql._ import org.apache.spark.sql.functions._ @@ -27,9 +27,9 @@ trait BenchmarkAlgorithm extends Logging { def testDataSet(ctx: MLBenchContext): DataFrame /** - * Create an [[Estimator]] with params set from the given [[MLBenchContext]]. + * Create an [[Estimator]] or [[Transformer]] with params set from the given [[MLBenchContext]]. */ - def getEstimator(ctx: MLBenchContext): Estimator[_] + def getPipelineStage(ctx: MLBenchContext): PipelineStage /** * The unnormalized score of the training procedure on a dataset. The normalization is diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLBenchmarks.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLBenchmarks.scala index 632940e..b25453b 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLBenchmarks.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLBenchmarks.scala @@ -30,8 +30,8 @@ object MLBenchmarks { val context = SparkContext.getOrCreate() val sqlContext: SQLContext = SQLContext.getOrCreate(context) - def benchmarkObjects: Seq[MLTransformerBenchmarkable] = benchmarks.map { mlb => - new MLTransformerBenchmarkable(mlb.params, mlb.benchmark, sqlContext) + def benchmarkObjects: Seq[MLPipelineStageBenchmarkable] = benchmarks.map { mlb => + new MLPipelineStageBenchmarkable(mlb.params, mlb.benchmark, sqlContext) } } diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala index 81df78f..eceef96 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala @@ -2,12 +2,12 @@ package com.databricks.spark.sql.perf.mllib import scala.language.implicitConversions -import com.databricks.spark.sql.perf._ - import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging} -import org.apache.spark.SparkContext import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.{SparkConf, SparkContext} + +import com.databricks.spark.sql.perf._ class MLLib(@transient sqlContext: SQLContext) @@ -34,6 +34,16 @@ object MLLib extends Logging { e.getCurrentResults() } + /** + * Entry point for running ML tests. Expects a single command-line argument: the path to + * a YAML config file specifying which ML tests to run and their parameters. + * @param args command line args + */ + def main(args: Array[String]): Unit = { + val configFile = args(0) + run(yamlFile = configFile) + } + /** * Runs all the experiments and blocks on completion * @@ -46,13 +56,15 @@ object MLLib extends Logging { require(yamlConfig != null) YamlConfig.readString(yamlConfig) } - val sc = SparkContext.getOrCreate() + + val sparkConf = new SparkConf().setAppName("MLlib QA").setMaster("local[2]") + val sc = SparkContext.getOrCreate(sparkConf) sc.setLogLevel("INFO") val b = new com.databricks.spark.sql.perf.mllib.MLLib() val sqlContext = com.databricks.spark.sql.perf.mllib.MLBenchmarks.sqlContext val benchmarksDescriptions = conf.runnableBenchmarks val benchmarks = benchmarksDescriptions.map { mlb => - new MLTransformerBenchmarkable(mlb.params, mlb.benchmark, sqlContext) + new MLPipelineStageBenchmarkable(mlb.params, mlb.benchmark, sqlContext) } println(s"${benchmarks.size} benchmarks identified:") val str = benchmarks.map(_.prettyPrint).mkString("\n") diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLTransformerBenchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala similarity index 77% rename from src/main/scala/com/databricks/spark/sql/perf/mllib/MLTransformerBenchmarkable.scala rename to src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala index 4b0d100..ffef088 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLTransformerBenchmarkable.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala @@ -1,20 +1,21 @@ package com.databricks.spark.sql.perf.mllib -import com.databricks.spark.sql.perf._ -import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging} - -import org.apache.spark.sql._ import scala.collection.mutable.ArrayBuffer -import org.apache.spark.ml.Transformer +import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging} -class MLTransformerBenchmarkable( +import org.apache.spark.ml.{Estimator, Transformer} +import org.apache.spark.sql._ + +import com.databricks.spark.sql.perf._ + +class MLPipelineStageBenchmarkable( params: MLParams, test: BenchmarkAlgorithm, sqlContext: SQLContext) extends Benchmarkable with Serializable with Logging { - import MLTransformerBenchmarkable._ + import MLPipelineStageBenchmarkable._ private var testData: DataFrame = null private var trainingData: DataFrame = null @@ -48,17 +49,27 @@ class MLTransformerBenchmarkable( try { val (trainingTime, model: Transformer) = measureTime { logger.info(s"$this: train: trainingSet=${trainingData.schema}") - val estimator = test.getEstimator(param) - estimator.fit(trainingData) + test.getPipelineStage(param) match { + case est: Estimator[_] => est.fit(trainingData) + case transformer: Transformer => + transformer.transform(trainingData) + transformer + case other: Any => throw new UnsupportedOperationException("Algorithm to benchmark must" + + s" be an estimator or transformer, found ${other.getClass} instead.") + } } logger.info(s"model: $model") - val (_, scoreTraining) = measureTime { + val (scoreTrainTime, scoreTraining) = measureTime { test.score(param, trainingData, model) } val (scoreTestTime, scoreTest) = measureTime { test.score(param, testData, model) } + logger.info(s"$this doBenchmark: Trained model in ${trainingTime.toMillis / 1000.0}" + + s" s, Scored training dataset in ${scoreTrainTime.toMillis / 1000.0} s," + + s" test dataset in ${scoreTestTime.toMillis / 1000.0} s") + val ml = MLResult( trainingTime = Some(trainingTime.toMillis), @@ -96,7 +107,7 @@ class MLTransformerBenchmarkable( } -object MLTransformerBenchmarkable { +object MLPipelineStageBenchmarkable { private def pprint(p: AnyRef): Seq[String] = { val m = getCCParams(p) m.flatMap { diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/DecisionTreeClassification.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/DecisionTreeClassification.scala index 47cf4c4..31747da 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/DecisionTreeClassification.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/DecisionTreeClassification.scala @@ -1,6 +1,6 @@ package com.databricks.spark.sql.perf.mllib.classification -import org.apache.spark.ml.{Estimator, ModelBuilder, Transformer, TreeUtils} +import org.apache.spark.ml._ import org.apache.spark.ml.classification.DecisionTreeClassifier import org.apache.spark.ml.evaluation.{Evaluator, MulticlassClassificationEvaluator} import org.apache.spark.sql.DataFrame @@ -34,7 +34,7 @@ abstract class TreeOrForestClassification extends BenchmarkAlgorithm object DecisionTreeClassification extends TreeOrForestClassification { - override def getEstimator(ctx: MLBenchContext): Estimator[_] = { + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { import ctx.params._ new DecisionTreeClassifier() .setMaxDepth(depth) diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/GBTClassification.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/GBTClassification.scala index 547a050..80d3b3c 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/GBTClassification.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/GBTClassification.scala @@ -1,12 +1,12 @@ package com.databricks.spark.sql.perf.mllib.classification -import org.apache.spark.ml.{Estimator, ModelBuilder, Transformer, TreeUtils} +import org.apache.spark.ml.{ModelBuilder, PipelineStage, Transformer, TreeUtils} import org.apache.spark.ml.classification.GBTClassifier import org.apache.spark.ml.evaluation.{Evaluator, MulticlassClassificationEvaluator} import org.apache.spark.sql._ -import com.databricks.spark.sql.perf.mllib._ import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib._ import com.databricks.spark.sql.perf.mllib.data.DataGenerator @@ -31,7 +31,7 @@ object GBTClassification extends BenchmarkAlgorithm ctx.seed()) } - override def getEstimator(ctx: MLBenchContext): Estimator[_] = { + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { import ctx.params._ // TODO: subsamplingRate, featureSubsetStrategy // TODO: cacheNodeIds, checkpoint? diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/LogisticRegression.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/LogisticRegression.scala index edb3a68..d726b7d 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/LogisticRegression.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/LogisticRegression.scala @@ -3,9 +3,8 @@ package com.databricks.spark.sql.perf.mllib.classification import com.databricks.spark.sql.perf.mllib.OptionImplicits._ import com.databricks.spark.sql.perf.mllib._ import com.databricks.spark.sql.perf.mllib.data.DataGenerator - import org.apache.spark.ml.evaluation.{Evaluator, MulticlassClassificationEvaluator} -import org.apache.spark.ml.{Estimator, ModelBuilder, Transformer} +import org.apache.spark.ml.{Estimator, ModelBuilder, PipelineStage, Transformer} import org.apache.spark.ml import org.apache.spark.ml.linalg.Vectors @@ -32,7 +31,7 @@ object LogisticRegression extends BenchmarkAlgorithm ModelBuilder.newLogisticRegressionModel(coefficients, intercept) } - override def getEstimator(ctx: MLBenchContext): Estimator[_] = { + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { import ctx.params._ new ml.classification.LogisticRegression() .setTol(tol) diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/NaiveBayes.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/NaiveBayes.scala new file mode 100644 index 0000000..c866635 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/NaiveBayes.scala @@ -0,0 +1,65 @@ +package com.databricks.spark.sql.perf.mllib.classification + +import org.apache.spark.ml +import org.apache.spark.ml.{ModelBuilder, PipelineStage, Transformer} +import org.apache.spark.ml.evaluation.{Evaluator, MulticlassClassificationEvaluator} +import org.apache.spark.ml.linalg.{DenseMatrix, Vectors} + +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib._ +import com.databricks.spark.sql.perf.mllib.data.DataGenerator + +/** Object containing methods used in performance tests for (multinomial) NaiveBayesModels */ +object NaiveBayes extends BenchmarkAlgorithm + with TestFromTraining with TrainingSetFromTransformer with ScoringWithEvaluator { + + override protected def initialData(ctx: MLBenchContext) = { + import ctx.params._ + val rng = ctx.newGenerator() + // Max possible arity of a feature in generated training/test data for NaiveBayes models + val maxFeatureArity = 20 + // All features for Naive Bayes must be categorical, i.e. have arity >= 2 + val featureArity = 0.until(numFeatures).map(_ => 2 + rng.nextInt(maxFeatureArity - 2)).toArray + DataGenerator.generateMixedFeatures( + ctx.sqlContext, + numExamples, + ctx.seed(), + numPartitions, + featureArity) + } + + override protected def trueModel(ctx: MLBenchContext): Transformer = { + import ctx.params._ + val rng = ctx.newGenerator() + // pi = log of class priors, whose dimension is C (number of classes) + // theta = log of class conditional probabilities, whose dimension is C (number of classes) + // by D (number of features) + val unnormalizedProbs = 0.until(numClasses).map(_ => rng.nextDouble() + 1e-5).toArray + val logProbSum = math.log(unnormalizedProbs.sum) + val piArray = unnormalizedProbs.map(prob => math.log(prob) - logProbSum) + + // For class i, set the class-conditional probability of feature i to 0.7, and split up the + // remaining probability mass across the other features + val currClassProb = 0.7 + val thetaArray = Array.tabulate(numClasses) { i: Int => + val baseProbMass = (1 - currClassProb) / (numFeatures - 1) + val probs = Array.fill[Double](numFeatures)(baseProbMass) + probs(i) = currClassProb + probs + }.map(_.map(math.log)) + + // Initialize new Naive Bayes model + val pi = Vectors.dense(piArray) + val theta = new DenseMatrix(numClasses, numFeatures, thetaArray.flatten, true) + ModelBuilder.newNaiveBayesModel(pi, theta) + } + + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { + import ctx.params._ + new ml.classification.NaiveBayes() + .setSmoothing(naiveBayesSmoothing) + } + + override protected def evaluator(ctx: MLBenchContext): Evaluator = + new MulticlassClassificationEvaluator() +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/RandomForestClassification.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/RandomForestClassification.scala index 3aff023..f67668c 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/RandomForestClassification.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/RandomForestClassification.scala @@ -1,6 +1,6 @@ package com.databricks.spark.sql.perf.mllib.classification -import org.apache.spark.ml.Estimator +import org.apache.spark.ml.{Estimator, PipelineStage} import org.apache.spark.ml.classification.RandomForestClassifier import com.databricks.spark.sql.perf.mllib._ @@ -9,7 +9,7 @@ import com.databricks.spark.sql.perf.mllib.OptionImplicits._ object RandomForestClassification extends TreeOrForestClassification { - override def getEstimator(ctx: MLBenchContext): Estimator[_] = { + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { import ctx.params._ // TODO: subsamplingRate, featureSubsetStrategy // TODO: cacheNodeIds, checkpoint? diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/KMeans.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/KMeans.scala index 4f1a19b..5dae199 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/KMeans.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/KMeans.scala @@ -1,7 +1,7 @@ package com.databricks.spark.sql.perf.mllib.clustering import org.apache.spark.ml -import org.apache.spark.ml.Estimator +import org.apache.spark.ml.{Estimator, PipelineStage} import org.apache.spark.sql._ import com.databricks.spark.sql.perf.mllib.OptionImplicits._ @@ -17,7 +17,7 @@ object KMeans extends BenchmarkAlgorithm with TestFromTraining { numPartitions, numFeatures) } - override def getEstimator(ctx: MLBenchContext): Estimator[_] = { + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { import ctx.params._ new ml.clustering.KMeans() .setK(k) diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/LDA.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/LDA.scala index a6daf4b..e4a836a 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/LDA.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/LDA.scala @@ -4,7 +4,7 @@ import scala.collection.mutable.{HashMap => MHashMap} import org.apache.commons.math3.random.Well19937c -import org.apache.spark.ml.Estimator +import org.apache.spark.ml.{Estimator, PipelineStage} import org.apache.spark.ml import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -44,7 +44,7 @@ object LDA extends BenchmarkAlgorithm with TestFromTraining { ctx.sqlContext.createDataFrame(data).toDF("docIndex", "features") } - override def getEstimator(ctx: MLBenchContext): Estimator[_] = { + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { import ctx.params._ new ml.clustering.LDA() .setK(k) diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/Bucketizer.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/Bucketizer.scala new file mode 100644 index 0000000..789aba9 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/Bucketizer.scala @@ -0,0 +1,42 @@ +package com.databricks.spark.sql.perf.mllib.feature + +import scala.util.Random + +import org.apache.spark.ml +import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.PipelineStage +import org.apache.spark.sql._ + +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib.data.DataGenerator +import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext, TestFromTraining} + +/** Object for testing Bucketizer performance */ +object Bucketizer extends BenchmarkAlgorithm with TestFromTraining with UnaryTransformer { + + override def trainingDataSet(ctx: MLBenchContext): DataFrame = { + import ctx.params._ + import ctx.sqlContext.implicits._ + val rng = ctx.newGenerator() + // For a bucketizer, training data consists of a single column of random doubles + DataGenerator.generateContinuousFeatures(ctx.sqlContext, + numExamples, ctx.seed(), numPartitions, numFeatures = 1).rdd.map { case Row(vec: Vector) => + vec(0) // extract the single generated double value for each row + }.toDF(inputCol) + } + + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { + import ctx.params._ + val rng = ctx.newGenerator() + // Generate an array of (finite) splitting points in [-1, 1) for the Bucketizer + val splitPoints = 0.until(bucketizerNumBuckets - 1).map { _ => + 2 * rng.nextDouble() - 1 + }.sorted.toArray + // Final array of splits contains +/- infinity + val splits = Array(Double.NegativeInfinity) ++ splitPoints ++ Array(Double.PositiveInfinity) + new ml.feature.Bucketizer() + .setSplits(splits) + .setInputCol(inputCol) + } + +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/UnaryTransformer.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/UnaryTransformer.scala new file mode 100644 index 0000000..8209bec --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/UnaryTransformer.scala @@ -0,0 +1,6 @@ +package com.databricks.spark.sql.perf.mllib.feature + +/** Trait defining common state/methods for featurizers taking a single input col */ +private[feature] trait UnaryTransformer { + private[feature] val inputCol = "inputCol" +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/recommendation/ALS.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/recommendation/ALS.scala index f6dcb7b..9c21947 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/recommendation/ALS.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/recommendation/ALS.scala @@ -1,13 +1,13 @@ package com.databricks.spark.sql.perf.mllib.recommendation import org.apache.spark.ml -import org.apache.spark.ml.evaluation.{RegressionEvaluator, Evaluator} -import org.apache.spark.ml.{Transformer, Estimator} +import org.apache.spark.ml.evaluation.{Evaluator, RegressionEvaluator} +import org.apache.spark.ml.PipelineStage import org.apache.spark.sql._ import com.databricks.spark.sql.perf.mllib.OptionImplicits._ import com.databricks.spark.sql.perf.mllib.data.DataGenerator -import com.databricks.spark.sql.perf.mllib.{ScoringWithEvaluator, BenchmarkAlgorithm, MLBenchContext} +import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext, ScoringWithEvaluator} object ALS extends BenchmarkAlgorithm with ScoringWithEvaluator { @@ -37,7 +37,7 @@ object ALS extends BenchmarkAlgorithm with ScoringWithEvaluator { ctx.seed())._2 } - override def getEstimator(ctx: MLBenchContext): Estimator[_] = { + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { import ctx.params._ new ml.recommendation.ALS() .setSeed(ctx.seed()) diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/GLMRegression.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/GLMRegression.scala index aea75fd..8e6644d 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/GLMRegression.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/GLMRegression.scala @@ -3,7 +3,7 @@ package com.databricks.spark.sql.perf.mllib.regression import org.apache.spark.ml.evaluation.{Evaluator, RegressionEvaluator} import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.regression.GeneralizedLinearRegression -import org.apache.spark.ml.{Estimator, ModelBuilder, Transformer} +import org.apache.spark.ml.{ModelBuilder, PipelineStage, Transformer} import com.databricks.spark.sql.perf.mllib.OptionImplicits._ import com.databricks.spark.sql.perf.mllib._ @@ -36,7 +36,7 @@ object GLMRegression extends BenchmarkAlgorithm with TestFromTraining with m } - override def getEstimator(ctx: MLBenchContext): Estimator[_] = { + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { import ctx.params._ new GeneralizedLinearRegression() .setLink(link) diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/LinearRegression.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/LinearRegression.scala index 8acbb51..be3d467 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/LinearRegression.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/LinearRegression.scala @@ -3,7 +3,7 @@ package com.databricks.spark.sql.perf.mllib.regression import org.apache.spark.ml import org.apache.spark.ml.evaluation.{Evaluator, RegressionEvaluator} import org.apache.spark.ml.linalg.Vectors -import org.apache.spark.ml.{Estimator, ModelBuilder, Transformer} +import org.apache.spark.ml.{ModelBuilder, PipelineStage, Transformer} import com.databricks.spark.sql.perf.mllib.OptionImplicits._ import com.databricks.spark.sql.perf.mllib._ @@ -32,7 +32,7 @@ object LinearRegression extends BenchmarkAlgorithm with TestFromTraining with ModelBuilder.newLinearRegressionModel(coefficients, intercept) } - override def getEstimator(ctx: MLBenchContext): Estimator[_] = { + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { import ctx.params._ new ml.regression.LinearRegression() .setSolver("l-bfgs") diff --git a/src/main/scala/com/databricks/spark/sql/perf/results.scala b/src/main/scala/com/databricks/spark/sql/perf/results.scala index f133ef6..5ad1762 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/results.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/results.scala @@ -113,6 +113,7 @@ case class MLParams( numTestExamples: Option[Long] = None, numPartitions: Option[Int] = None, // *** Specialized and sorted by name *** + bucketizerNumBuckets: Option[Int] = None, depth: Option[Int] = None, elasticNetParam: Option[Double] = None, family: Option[String] = None, @@ -121,6 +122,7 @@ case class MLParams( ldaNumVocabulary: Option[Int] = None, link: Option[String] = None, maxIter: Option[Int] = None, + naiveBayesSmoothing: Option[Double] = None, numClasses: Option[Int] = None, numFeatures: Option[Int] = None, numItems: Option[Int] = None, diff --git a/src/main/scala/configs/mllib-small.yaml b/src/main/scala/configs/mllib-small.yaml index 3e574b4..dcde90c 100644 --- a/src/main/scala/configs/mllib-small.yaml +++ b/src/main/scala/configs/mllib-small.yaml @@ -80,3 +80,13 @@ benchmarks: regParam: 0.1 rank: 10 maxIter: 6 + - name: feature.Bucketizer + params: + numExamples: 100 + bucketizerNumBuckets: 10 + - name: classification.NaiveBayes + params: + numExamples: 100 + naiveBayesSmoothing: 1.0 + numClasses: 10 + numFeatures: [10] diff --git a/src/main/scala/org/apache/spark/ml/ModelBuilder.scala b/src/main/scala/org/apache/spark/ml/ModelBuilder.scala index ccddb57..b45b68a 100644 --- a/src/main/scala/org/apache/spark/ml/ModelBuilder.scala +++ b/src/main/scala/org/apache/spark/ml/ModelBuilder.scala @@ -1,8 +1,8 @@ package org.apache.spark.ml -import org.apache.spark.ml.classification.{DecisionTreeClassificationModel, LogisticRegressionModel} -import org.apache.spark.ml.linalg.Vector -import org.apache.spark.ml.regression.{LinearRegressionModel, GeneralizedLinearRegressionModel, DecisionTreeRegressionModel} +import org.apache.spark.ml.classification.{DecisionTreeClassificationModel, LogisticRegressionModel, NaiveBayesModel} +import org.apache.spark.ml.linalg.{Matrix, Vector} +import org.apache.spark.ml.regression.{DecisionTreeRegressionModel, GeneralizedLinearRegressionModel, LinearRegressionModel} import org.apache.spark.ml.tree._ import org.apache.spark.mllib.random.RandomDataGenerator import org.apache.spark.mllib.tree.impurity.ImpurityCalculator @@ -51,6 +51,11 @@ object ModelBuilder { featureArity = featureArity, seed = seed) new DecisionTreeRegressionModel(rootNode, numFeatures = featureArity.length) } + + def newNaiveBayesModel(pi: Vector, theta: Matrix): NaiveBayesModel = { + val model = new NaiveBayesModel("naivebayes-uid", pi, theta) + model.set(model.modelType, "multinomial") + } } /**