diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala index ad714e5..52415b4 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala @@ -2,8 +2,8 @@ package com.databricks.spark.sql.perf.mllib import com.typesafe.scalalogging.slf4j.Logging -import org.apache.spark.ml.Transformer -import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator, Evaluator} +import org.apache.spark.ml.{Estimator, Transformer} +import org.apache.spark.ml.evaluation.Evaluator import org.apache.spark.sql._ import org.apache.spark.sql.functions._ @@ -25,10 +25,10 @@ trait BenchmarkAlgorithm extends Logging { def testDataSet(ctx: MLBenchContext): DataFrame - @throws[Exception]("if training fails") - def train( - ctx: MLBenchContext, - trainingSet: DataFrame): Transformer + /** + * Create an [[Estimator]] with params set from the given [[MLBenchContext]]. + */ + def getEstimator(ctx: MLBenchContext): Estimator[_] /** * The unnormalized score of the training procedure on a dataset. The normalization is diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLTransformerBenchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLTransformerBenchmarkable.scala index 8794c07..61886fe 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLTransformerBenchmarkable.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLTransformerBenchmarkable.scala @@ -2,10 +2,12 @@ package com.databricks.spark.sql.perf.mllib import com.databricks.spark.sql.perf._ import com.typesafe.scalalogging.slf4j.Logging -import org.apache.spark.sql._ +import org.apache.spark.sql._ import scala.collection.mutable.ArrayBuffer +import org.apache.spark.ml.Transformer + class MLTransformerBenchmarkable( params: MLParams, test: BenchmarkAlgorithm, @@ -44,7 +46,12 @@ class MLTransformerBenchmarkable( description: String, messages: ArrayBuffer[String]): BenchmarkResult = { try { - val (trainingTime, model) = measureTime(test.train(param, trainingData)) + val (trainingTime, model: Transformer) = measureTime { + logger.info(s"$this: train: trainingSet=${trainingData.schema}") + val estimator = test.getEstimator(param) + estimator.fit(trainingData) + //test.train(param, trainingData) + } logger.info(s"model: $model") val (_, scoreTraining) = measureTime { test.score(param, trainingData, model) diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/DecisionTreeClassification.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/DecisionTreeClassification.scala index 4eebd74..2543284 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/DecisionTreeClassification.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/DecisionTreeClassification.scala @@ -1,45 +1,46 @@ package com.databricks.spark.sql.perf.mllib.classification -import org.apache.spark.ml.ModelBuilder -import org.apache.spark.ml.Transformer +import org.apache.spark.ml.{Estimator, ModelBuilder, Transformer} import org.apache.spark.ml.classification.DecisionTreeClassifier -import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator, Evaluator} -import org.apache.spark.sql.DataFrame +import org.apache.spark.ml.evaluation.{Evaluator, MulticlassClassificationEvaluator} import com.databricks.spark.sql.perf.mllib.OptionImplicits._ import com.databricks.spark.sql.perf.mllib._ import com.databricks.spark.sql.perf.mllib.data.DataGenerator -object DecisionTreeClassification extends BenchmarkAlgorithm +abstract class TreeOrForestClassification extends BenchmarkAlgorithm with TestFromTraining with TrainingSetFromTransformer with ScoringWithEvaluator { + def getFeatureArity(ctx: MLBenchContext): Array[Int] = { + val numFeatures = ctx.params.numFeatures + val fourthFeatures = numFeatures / 4 + Array.fill[Int](fourthFeatures)(2) ++ // low-arity categorical + Array.fill[Int](fourthFeatures)(20) ++ // high-arity categorical + Array.fill[Int](numFeatures - 2 * fourthFeatures)(0) // continuous + } + override protected def initialData(ctx: MLBenchContext) = { import ctx.params._ - DataGenerator.generateFeatures(ctx.sqlContext, numExamples, ctx.seed(), numPartitions, - numFeatures) + DataGenerator.generateMixedFeatures(ctx.sqlContext, numExamples, ctx.seed(), numPartitions, + getFeatureArity(ctx)) } override protected def trueModel(ctx: MLBenchContext): Transformer = { - //val rng = ctx.newGenerator() - val numFeatures = ctx.params.numFeatures.get - val fourthFeatures = numFeatures / 4 - val featureArity: Array[Int] = - Array.fill[Int](fourthFeatures)(2) ++ // low-arity categorical - Array.fill[Int](fourthFeatures)(20) ++ // high-arity categorical - Array.fill[Int](numFeatures - 2 * fourthFeatures)(0) // continuous - ModelBuilder.newDecisionTreeClassificationModel(ctx.params.depth.get, ctx.params.numClasses.get, - featureArity, ctx.seed()) - } - - override def train(ctx: MLBenchContext, trainingSet: DataFrame): Transformer = { - logger.info(s"$this: train: trainingSet=${trainingSet.schema}") - import ctx.params._ - new DecisionTreeClassifier() - .setMaxDepth(depth.get) - .fit(trainingSet) + ModelBuilder.newDecisionTreeClassificationModel(ctx.params.depth, ctx.params.numClasses, + getFeatureArity(ctx), ctx.seed()) } override protected def evaluator(ctx: MLBenchContext): Evaluator = new MulticlassClassificationEvaluator() } + +object DecisionTreeClassification extends TreeOrForestClassification { + + override def getEstimator(ctx: MLBenchContext): Estimator[_] = { + import ctx.params._ + new DecisionTreeClassifier() + .setMaxDepth(depth) + .setSeed(ctx.seed()) + } +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/GBTClassification.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/GBTClassification.scala new file mode 100644 index 0000000..3e62587 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/GBTClassification.scala @@ -0,0 +1,48 @@ +package com.databricks.spark.sql.perf.mllib.classification + +import org.apache.spark.ml.{Estimator, ModelBuilder, Transformer} +import org.apache.spark.ml.classification.GBTClassifier +import org.apache.spark.ml.evaluation.{Evaluator, MulticlassClassificationEvaluator} + +import com.databricks.spark.sql.perf.mllib._ +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib.data.DataGenerator + + +object GBTClassification extends BenchmarkAlgorithm + with TestFromTraining with TrainingSetFromTransformer with ScoringWithEvaluator { + + def getFeatureArity(ctx: MLBenchContext): Array[Int] = { + val numFeatures = ctx.params.numFeatures + val fourthFeatures = numFeatures / 4 + Array.fill[Int](fourthFeatures)(2) ++ // low-arity categorical + Array.fill[Int](fourthFeatures)(20) ++ // high-arity categorical + Array.fill[Int](numFeatures - 2 * fourthFeatures)(0) // continuous + } + + override protected def initialData(ctx: MLBenchContext) = { + import ctx.params._ + DataGenerator.generateMixedFeatures(ctx.sqlContext, numExamples, ctx.seed(), numPartitions, + getFeatureArity(ctx)) + } + + override protected def trueModel(ctx: MLBenchContext): Transformer = { + // We add +1 to the depth to make it more likely that many iterations of boosting are needed + // to model the true tree. + ModelBuilder.newDecisionTreeClassificationModel(ctx.params.depth + 1, ctx.params.numClasses, + getFeatureArity(ctx), ctx.seed()) + } + + override def getEstimator(ctx: MLBenchContext): Estimator[_] = { + import ctx.params._ + // TODO: subsamplingRate, featureSubsetStrategy + // TODO: cacheNodeIds, checkpoint? + new GBTClassifier() + .setMaxDepth(depth) + .setMaxIter(maxIter) + .setSeed(ctx.seed()) + } + + override protected def evaluator(ctx: MLBenchContext): Evaluator = + new MulticlassClassificationEvaluator() +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/LogisticRegression.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/LogisticRegression.scala index 284905d..edb3a68 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/LogisticRegression.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/LogisticRegression.scala @@ -3,19 +3,19 @@ package com.databricks.spark.sql.perf.mllib.classification import com.databricks.spark.sql.perf.mllib.OptionImplicits._ import com.databricks.spark.sql.perf.mllib._ import com.databricks.spark.sql.perf.mllib.data.DataGenerator -import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator, Evaluator} -import org.apache.spark.ml.{Transformer, ModelBuilder} +import org.apache.spark.ml.evaluation.{Evaluator, MulticlassClassificationEvaluator} +import org.apache.spark.ml.{Estimator, ModelBuilder, Transformer} import org.apache.spark.ml import org.apache.spark.ml.linalg.Vectors -import org.apache.spark.sql.DataFrame + object LogisticRegression extends BenchmarkAlgorithm with TestFromTraining with TrainingSetFromTransformer with ScoringWithEvaluator { override protected def initialData(ctx: MLBenchContext) = { import ctx.params._ - DataGenerator.generateFeatures( + DataGenerator.generateContinuousFeatures( ctx.sqlContext, numExamples, ctx.seed(), @@ -32,15 +32,12 @@ object LogisticRegression extends BenchmarkAlgorithm ModelBuilder.newLogisticRegressionModel(coefficients, intercept) } - override def train(ctx: MLBenchContext, - trainingSet: DataFrame): Transformer = { - logger.info(s"$this: train: trainingSet=${trainingSet.schema}") + override def getEstimator(ctx: MLBenchContext): Estimator[_] = { import ctx.params._ - val lr = new ml.classification.LogisticRegression() + new ml.classification.LogisticRegression() .setTol(tol) .setMaxIter(maxIter) .setRegParam(regParam) - lr.fit(trainingSet) } override protected def evaluator(ctx: MLBenchContext): Evaluator = diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/RandomForestClassification.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/RandomForestClassification.scala new file mode 100644 index 0000000..b4352fe --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/RandomForestClassification.scala @@ -0,0 +1,20 @@ +package com.databricks.spark.sql.perf.mllib.classification + +import org.apache.spark.ml.Estimator +import org.apache.spark.ml.classification.RandomForestClassifier + +import com.databricks.spark.sql.perf.mllib._ + + +object RandomForestClassification extends TreeOrForestClassification { + + override def getEstimator(ctx: MLBenchContext): Estimator[_] = { + import ctx.params._ + // TODO: subsamplingRate, featureSubsetStrategy + // TODO: cacheNodeIds, checkpoint? + new RandomForestClassifier() + .setMaxDepth(depth.get) + .setNumTrees(maxIter.get) + .setSeed(ctx.seed()) + } +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/LDA.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/LDA.scala index 3fe5688..dbccf3f 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/LDA.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/LDA.scala @@ -1,13 +1,14 @@ package com.databricks.spark.sql.perf.mllib.clustering -import com.databricks.spark.sql.perf.mllib.{MLBenchContext, TestFromTraining, BenchmarkAlgorithm} +import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext, TestFromTraining} import com.databricks.spark.sql.perf.mllib.OptionImplicits._ import org.apache.commons.math3.random.Well19937c -import org.apache.spark.ml.Transformer + +import org.apache.spark.ml.Estimator import org.apache.spark.ml import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.ml.linalg.{Vectors, Vector} +import org.apache.spark.ml.linalg.{Vector, Vectors} import scala.collection.mutable.{HashMap => MHashMap} object LDA extends BenchmarkAlgorithm with TestFromTraining { @@ -40,15 +41,13 @@ object LDA extends BenchmarkAlgorithm with TestFromTraining { ctx.sqlContext.createDataFrame(data).toDF("docIndex", "features") } - override def train(ctx: MLBenchContext, - trainingSet: DataFrame): Transformer = { + override def getEstimator(ctx: MLBenchContext): Estimator[_] = { import ctx.params._ new ml.clustering.LDA() - .setK(k) - .setSeed(randomSeed.toLong) - .setMaxIter(maxIter) - .setOptimizer(optimizer) - .fit(trainingSet) + .setK(k) + .setSeed(randomSeed.toLong) + .setMaxIter(maxIter) + .setOptimizer(optimizer) } // TODO(?) add a scoring method here. diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/data/data_generation.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/data/data_generation.scala index ca36704..d34a828 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/data/data_generation.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/data/data_generation.scala @@ -3,82 +3,65 @@ package com.databricks.spark.sql.perf.mllib.data import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.linalg.Vector import org.apache.spark.mllib.random._ -import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SQLContext, DataFrame} + object DataGenerator { - def generateFeatures( + def generateContinuousFeatures( sql: SQLContext, numExamples: Long, seed: Long, numPartitions: Int, numFeatures: Int): DataFrame = { - val categoricalArities = Array.empty[Int] + val featureArity = Array.fill[Int](numFeatures)(0) val rdd: RDD[Vector] = RandomRDDs.randomRDD(sql.sparkContext, - new FeaturesGenerator(categoricalArities, numFeatures), - numExamples, numPartitions, seed) + new FeaturesGenerator(featureArity), numExamples, numPartitions, seed) sql.createDataFrame(rdd.map(Tuple1.apply)).toDF("features") } -} -class BinaryLabeledDataGenerator( - private val numFeatures: Int, - private val threshold: Double) extends RandomDataGenerator[LabeledPoint] { - - private val rng = new java.util.Random() - - override def nextValue(): LabeledPoint = { - val y = if (rng.nextDouble() < threshold) 0.0 else 1.0 - val x = Array.fill[Double](numFeatures) { - if (rng.nextDouble() < threshold) 0.0 else 1.0 - } - ??? -// LabeledPoint(y, Vectors.dense(x)) + def generateMixedFeatures( + sql: SQLContext, + numExamples: Long, + seed: Long, + numPartitions: Int, + featureArity: Array[Int]): DataFrame = { + val rdd: RDD[Vector] = RandomRDDs.randomRDD(sql.sparkContext, + new FeaturesGenerator(featureArity), numExamples, numPartitions, seed) + sql.createDataFrame(rdd.map(Tuple1.apply)).toDF("features") } - - override def setSeed(seed: Long) { - rng.setSeed(seed) - } - - override def copy(): BinaryLabeledDataGenerator = - new BinaryLabeledDataGenerator(numFeatures, threshold) - } /** * Generator for a feature vector which can include a mix of categorical and continuous features. - * @param categoricalArities Specifies the number of categories for each categorical feature. - * @param numContinuous Number of continuous features. Feature values are in range [0,1]. + * @param featureArity Length numFeatures, where 0 indicates continuous feature and > 0 + * indicates a categorical feature of that arity. */ -class FeaturesGenerator(val categoricalArities: Array[Int], val numContinuous: Int) +class FeaturesGenerator(val featureArity: Array[Int]) extends RandomDataGenerator[Vector] { - categoricalArities.foreach { arity => - require(arity >= 2, s"FeaturesGenerator given categorical arity = $arity, " + - s"but arity should be >= 2.") + featureArity.foreach { arity => + require(arity >= 0, s"FeaturesGenerator given categorical arity = $arity, " + + s"but arity should be >= 0.") } - val numFeatures = categoricalArities.length + numContinuous + val numFeatures = featureArity.length private val rng = new java.util.Random() /** - * Generates vector with categorical features first, and continuous features in [0,1] second. + * Generates vector with features in the order given by [[featureArity]] */ override def nextValue(): Vector = { - // Feature ordering matches getCategoricalFeaturesInfo. val arr = new Array[Double](numFeatures) var j = 0 - while (j < categoricalArities.length) { - arr(j) = rng.nextInt(categoricalArities(j)) - j += 1 - } - while (j < numFeatures) { - // Generating some centered data - arr(j) = 2 * rng.nextDouble() - 1 + while (j < featureArity.length) { + if (featureArity(j) == 0) + arr(j) = 2 * rng.nextDouble() - 1 // centered uniform data + else + arr(j) = rng.nextInt(featureArity(j)) j += 1 } Vectors.dense(arr) @@ -88,15 +71,5 @@ class FeaturesGenerator(val categoricalArities: Array[Int], val numContinuous: I rng.setSeed(seed) } - override def copy(): FeaturesGenerator = new FeaturesGenerator(categoricalArities, numContinuous) - - /** - * @return categoricalFeaturesInfo Map storing arity of categorical features. - * E.g., an entry (n -> k) indicates that feature n is categorical - * with k categories indexed from 0: {0, 1, ..., k-1}. - */ - def getCategoricalFeaturesInfo: Map[Int, Int] = { - // Categorical features are indexed from 0 because of the implementation of nextValue(). - categoricalArities.zipWithIndex.map(_.swap).toMap - } -} \ No newline at end of file + override def copy(): FeaturesGenerator = new FeaturesGenerator(featureArity) +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/GLMRegression.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/GLMRegression.scala index 6797e50..36d0e36 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/GLMRegression.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/GLMRegression.scala @@ -3,11 +3,11 @@ package com.databricks.spark.sql.perf.mllib.regression import com.databricks.spark.sql.perf.mllib.OptionImplicits._ import com.databricks.spark.sql.perf.mllib._ import com.databricks.spark.sql.perf.mllib.data.DataGenerator + import org.apache.spark.ml.evaluation.{Evaluator, RegressionEvaluator} import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.regression.GeneralizedLinearRegression -import org.apache.spark.ml.{ModelBuilder, Transformer} -import org.apache.spark.sql._ +import org.apache.spark.ml.{Estimator, ModelBuilder, Transformer} object GLMRegression extends BenchmarkAlgorithm with TestFromTraining with @@ -15,7 +15,7 @@ object GLMRegression extends BenchmarkAlgorithm with TestFromTraining with override protected def initialData(ctx: MLBenchContext) = { import ctx.params._ - DataGenerator.generateFeatures( + DataGenerator.generateContinuousFeatures( ctx.sqlContext, numExamples, ctx.seed(), @@ -36,18 +36,14 @@ object GLMRegression extends BenchmarkAlgorithm with TestFromTraining with m } - override def train( - ctx: MLBenchContext, - trainingSet: DataFrame): Transformer = { - logger.info(s"$this: train: trainingSet=${trainingSet.schema}") + override def getEstimator(ctx: MLBenchContext): Estimator[_] = { import ctx.params._ - val glr = new GeneralizedLinearRegression() + new GeneralizedLinearRegression() .setLink(link) .setFamily(family) .setRegParam(regParam) .setMaxIter(maxIter) .setTol(tol) - glr.fit(trainingSet) } override protected def evaluator(ctx: MLBenchContext): Evaluator = diff --git a/src/main/scala/org/apache/spark/ml/ModelBuilder.scala b/src/main/scala/org/apache/spark/ml/ModelBuilder.scala index 0466ba6..bbae1ff 100644 --- a/src/main/scala/org/apache/spark/ml/ModelBuilder.scala +++ b/src/main/scala/org/apache/spark/ml/ModelBuilder.scala @@ -190,7 +190,7 @@ object TreeBuilder { // nCatsSplit is in {1,...,arity-1}. val nCatsSplit = rng.nextInt(featureArity(feature) - 1) + 1 val splitCategories: Array[Double] = - rng.shuffle(Range(0,featureArity(feature))).map(_.toDouble).toArray.take(nCatsSplit) + rng.shuffle(Range(0,featureArity(feature)).toList).toArray.map(_.toDouble).take(nCatsSplit) new CategoricalSplit(featureIndex = feature, _leftCategories = splitCategories, numCategories = featureArity(feature)) }