diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/TreeOrForestEstimator.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/TreeOrForestEstimator.scala new file mode 100644 index 0000000..c3448e6 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/TreeOrForestEstimator.scala @@ -0,0 +1,74 @@ +package com.databricks.spark.sql.perf.mllib + +import org.apache.spark.ml.{ModelBuilder, Transformer, TreeUtils} +import org.apache.spark.ml.evaluation.{Evaluator, MulticlassClassificationEvaluator, + RegressionEvaluator} +import org.apache.spark.sql.DataFrame + +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib.data.DataGenerator + +/** Base trait for BenchmarkAlgorithm objects testing a tree or forest estimator */ +private[mllib] trait TreeOrForestEstimator + extends TestFromTraining with TrainingSetFromTransformer with ScoringWithEvaluator { + self: BenchmarkAlgorithm => + + override protected def initialData(ctx: MLBenchContext) = { + import ctx.params._ + val featureArity: Array[Int] = TreeOrForestEstimator.getFeatureArity(ctx) + val data: DataFrame = DataGenerator.generateMixedFeatures(ctx.sqlContext, numExamples, + ctx.seed(), numPartitions, featureArity) + TreeUtils.setMetadata(data, "features", featureArity) + } +} + +/** Base trait for BenchmarkAlgorithm objects testing a tree or forest classifier */ +private[mllib] trait TreeOrForestClassifier extends TreeOrForestEstimator { + self: BenchmarkAlgorithm => + + override protected def evaluator(ctx: MLBenchContext): Evaluator = { + new MulticlassClassificationEvaluator() + } + + override protected def trueModel(ctx: MLBenchContext): Transformer = { + ModelBuilder.newDecisionTreeClassificationModel(ctx.params.depth, ctx.params.numClasses, + TreeOrForestEstimator.getFeatureArity(ctx), ctx.seed()) + } +} + +/** Base trait for BenchmarkAlgorithm objects testing a tree or forest regressor */ +private[mllib] trait TreeOrForestRegressor extends TreeOrForestEstimator { + self: BenchmarkAlgorithm => + + override protected def evaluator(ctx: MLBenchContext): Evaluator = { + new RegressionEvaluator() + } + + override protected def trueModel(ctx: MLBenchContext): Transformer = { + ModelBuilder.newDecisionTreeRegressionModel(ctx.params.depth, + TreeOrForestEstimator.getFeatureArity(ctx), ctx.seed()) + } + +} + +private[mllib] object TreeOrForestEstimator { + + /** + * Get feature arity for tree and tree ensemble tests. + * Currently, this is hard-coded as: + * - 1/4 binary features + * - 1/4 high-arity (20-category) features + * - 1/2 continuous features + * + * @return Array of length numFeatures, where 0 indicates continuous feature and + * value > 0 indicates a categorical feature of that arity. + */ + def getFeatureArity(ctx: MLBenchContext): Array[Int] = { + val numFeatures = ctx.params.numFeatures + val fourthFeatures = numFeatures / 4 + Array.fill[Int](fourthFeatures)(2) ++ // low-arity categorical + Array.fill[Int](fourthFeatures)(20) ++ // high-arity categorical + Array.fill[Int](numFeatures - 2 * fourthFeatures)(0) // continuous + } +} + diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/DecisionTreeClassification.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/DecisionTreeClassification.scala index 31747da..618fc1f 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/DecisionTreeClassification.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/DecisionTreeClassification.scala @@ -2,37 +2,11 @@ package com.databricks.spark.sql.perf.mllib.classification import org.apache.spark.ml._ import org.apache.spark.ml.classification.DecisionTreeClassifier -import org.apache.spark.ml.evaluation.{Evaluator, MulticlassClassificationEvaluator} -import org.apache.spark.sql.DataFrame import com.databricks.spark.sql.perf.mllib.OptionImplicits._ import com.databricks.spark.sql.perf.mllib._ -import com.databricks.spark.sql.perf.mllib.data.DataGenerator - -abstract class TreeOrForestClassification extends BenchmarkAlgorithm - with TestFromTraining with TrainingSetFromTransformer with ScoringWithEvaluator { - - import TreeOrForestClassification.getFeatureArity - - override protected def initialData(ctx: MLBenchContext) = { - import ctx.params._ - val featureArity: Array[Int] = getFeatureArity(ctx) - val data: DataFrame = DataGenerator.generateMixedFeatures(ctx.sqlContext, numExamples, - ctx.seed(), numPartitions, featureArity) - TreeUtils.setMetadata(data, "features", featureArity) - } - - override protected def trueModel(ctx: MLBenchContext): Transformer = { - ModelBuilder.newDecisionTreeClassificationModel(ctx.params.depth, ctx.params.numClasses, - getFeatureArity(ctx), ctx.seed()) - } - - override protected def evaluator(ctx: MLBenchContext): Evaluator = - new MulticlassClassificationEvaluator() -} - -object DecisionTreeClassification extends TreeOrForestClassification { +object DecisionTreeClassification extends BenchmarkAlgorithm with TreeOrForestClassifier { override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { import ctx.params._ @@ -41,24 +15,3 @@ object DecisionTreeClassification extends TreeOrForestClassification { .setSeed(ctx.seed()) } } - -object TreeOrForestClassification { - - /** - * Get feature arity for tree and tree ensemble tests. - * Currently, this is hard-coded as: - * - 1/2 binary features - * - 1/2 high-arity (20-category) features - * - 1/2 continuous features - * - * @return Array of length numFeatures, where 0 indicates continuous feature and - * value > 0 indicates a categorical feature of that arity. - */ - def getFeatureArity(ctx: MLBenchContext): Array[Int] = { - val numFeatures = ctx.params.numFeatures - val fourthFeatures = numFeatures / 4 - Array.fill[Int](fourthFeatures)(2) ++ // low-arity categorical - Array.fill[Int](fourthFeatures)(20) ++ // high-arity categorical - Array.fill[Int](numFeatures - 2 * fourthFeatures)(0) // continuous - } -} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/GBTClassification.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/GBTClassification.scala index 80d3b3c..5a77c1c 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/GBTClassification.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/GBTClassification.scala @@ -1,27 +1,14 @@ package com.databricks.spark.sql.perf.mllib.classification -import org.apache.spark.ml.{ModelBuilder, PipelineStage, Transformer, TreeUtils} import org.apache.spark.ml.classification.GBTClassifier -import org.apache.spark.ml.evaluation.{Evaluator, MulticlassClassificationEvaluator} -import org.apache.spark.sql._ +import org.apache.spark.ml.{ModelBuilder, PipelineStage, Transformer} import com.databricks.spark.sql.perf.mllib.OptionImplicits._ import com.databricks.spark.sql.perf.mllib._ -import com.databricks.spark.sql.perf.mllib.data.DataGenerator +object GBTClassification extends BenchmarkAlgorithm with TreeOrForestClassifier { -object GBTClassification extends BenchmarkAlgorithm - with TestFromTraining with TrainingSetFromTransformer with ScoringWithEvaluator { - - import TreeOrForestClassification.getFeatureArity - - override protected def initialData(ctx: MLBenchContext) = { - import ctx.params._ - val featureArity: Array[Int] = getFeatureArity(ctx) - val data: DataFrame = DataGenerator.generateMixedFeatures(ctx.sqlContext, numExamples, - ctx.seed(), numPartitions, featureArity) - TreeUtils.setMetadata(data, "features", featureArity) - } + import TreeOrForestEstimator.getFeatureArity override protected def trueModel(ctx: MLBenchContext): Transformer = { import ctx.params._ @@ -41,6 +28,4 @@ object GBTClassification extends BenchmarkAlgorithm .setSeed(ctx.seed()) } - override protected def evaluator(ctx: MLBenchContext): Evaluator = - new MulticlassClassificationEvaluator() } diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/RandomForestClassification.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/RandomForestClassification.scala index f67668c..cfb1a95 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/RandomForestClassification.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/RandomForestClassification.scala @@ -7,7 +7,7 @@ import com.databricks.spark.sql.perf.mllib._ import com.databricks.spark.sql.perf.mllib.OptionImplicits._ -object RandomForestClassification extends TreeOrForestClassification { +object RandomForestClassification extends BenchmarkAlgorithm with TreeOrForestClassifier { override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { import ctx.params._ diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/GaussianMixture.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/GaussianMixture.scala new file mode 100644 index 0000000..3c684a7 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/GaussianMixture.scala @@ -0,0 +1,30 @@ +package com.databricks.spark.sql.perf.mllib.clustering + +import org.apache.spark.ml +import org.apache.spark.ml.PipelineStage +import org.apache.spark.sql.DataFrame + +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext, TestFromTraining} +import com.databricks.spark.sql.perf.mllib.data.DataGenerator + +object GaussianMixture extends BenchmarkAlgorithm with TestFromTraining { + + override def trainingDataSet(ctx: MLBenchContext): DataFrame = { + import ctx.params._ + DataGenerator.generateGaussianMixtureData(ctx.sqlContext, numCenters = k, + numExamples = numExamples, seed = ctx.seed(), numPartitions = numPartitions, + numFeatures = numFeatures) + } + + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { + import ctx.params._ + new ml.clustering.GaussianMixture() + .setK(k) + .setSeed(randomSeed.toLong) + .setMaxIter(maxIter) + .setTol(tol) + } + + // TODO(?) add a scoring method here. +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/KMeans.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/KMeans.scala index 5dae199..9b2f233 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/KMeans.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/KMeans.scala @@ -1,7 +1,7 @@ package com.databricks.spark.sql.perf.mllib.clustering import org.apache.spark.ml -import org.apache.spark.ml.{Estimator, PipelineStage} +import org.apache.spark.ml.{PipelineStage} import org.apache.spark.sql._ import com.databricks.spark.sql.perf.mllib.OptionImplicits._ @@ -23,6 +23,7 @@ object KMeans extends BenchmarkAlgorithm with TestFromTraining { .setK(k) .setSeed(randomSeed.toLong) .setMaxIter(maxIter) + .setTol(tol) } // TODO(?) add a scoring method here. diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/HashingTF.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/HashingTF.scala new file mode 100644 index 0000000..5fb7d76 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/HashingTF.scala @@ -0,0 +1,42 @@ +package com.databricks.spark.sql.perf.mllib.feature + +import scala.util.Random + +import org.apache.spark.ml +import org.apache.spark.ml.PipelineStage +import org.apache.spark.sql._ +import org.apache.spark.sql.functions.split + +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib.data.DataGenerator +import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext, TestFromTraining} + +object HashingTF extends BenchmarkAlgorithm with TestFromTraining with UnaryTransformer { + + // Sample a random sentence of length up to maxLen from the provided array of words + private def randomSentence(rng: Random, maxLen: Int, dictionary: Array[String]): Array[String] = { + val length = rng.nextInt(maxLen - 1) + 1 + val dictLength = dictionary.length + Array.tabulate[String](length)(_ => dictionary(rng.nextInt(dictLength))) + } + + override def trainingDataSet(ctx: MLBenchContext): DataFrame = { + import ctx.params._ + // To test HashingTF, we generate arrays of (on average) docLength strings, where + // each string is selected from a pool of vocabSize strings + // The expected # of occurrences of each word in our vocabulary is + // (docLength * numExamples) / vocabSize + val df = DataGenerator.generateDoc(ctx.sqlContext, numExamples = numExamples, seed = ctx.seed(), + numPartitions = numPartitions, vocabSize = vocabSize, avgDocLength = docLength, + dataColName = inputCol) + df.withColumn(inputCol, split(df(inputCol), " ")) + } + + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { + import ctx.params._ + new ml.feature.HashingTF() + .setInputCol(inputCol) + .setNumFeatures(numFeatures) + } + +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/DecisionTreeRegression.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/DecisionTreeRegression.scala new file mode 100644 index 0000000..126ffe4 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/DecisionTreeRegression.scala @@ -0,0 +1,18 @@ +package com.databricks.spark.sql.perf.mllib.regression + +import org.apache.spark.ml.PipelineStage +import org.apache.spark.ml.regression.DecisionTreeRegressor + +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib._ + + +object DecisionTreeRegression extends BenchmarkAlgorithm with TreeOrForestRegressor { + + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { + import ctx.params._ + new DecisionTreeRegressor() + .setMaxDepth(depth) + .setSeed(ctx.seed()) + } +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/RandomForestRegression.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/RandomForestRegression.scala new file mode 100644 index 0000000..c9ed4e8 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/RandomForestRegression.scala @@ -0,0 +1,18 @@ +package com.databricks.spark.sql.perf.mllib.regression + +import org.apache.spark.ml.PipelineStage +import org.apache.spark.ml.regression.RandomForestRegressor + +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext, + TreeOrForestRegressor} + +object RandomForestRegression extends BenchmarkAlgorithm with TreeOrForestRegressor { + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { + import ctx.params._ + new RandomForestRegressor() + .setMaxDepth(depth) + .setNumTrees(maxIter) + .setSeed(ctx.seed()) + } +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/results.scala b/src/main/scala/com/databricks/spark/sql/perf/results.scala index 04898a0..1ec93fd 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/results.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/results.scala @@ -181,14 +181,14 @@ class MLParams( new MLParams(randomSeed = randomSeed, numExamples = numExamples, numTestExamples = numTestExamples, numPartitions = numPartitions, bucketizerNumBuckets = bucketizerNumBuckets, depth = depth, docLength = docLength, - elasticNetParam = elasticNetParam, family = family, featureArity = featureArity, k = k, link = link, maxIter = maxIter, + elasticNetParam = elasticNetParam, family = family, featureArity = featureArity, k = k, + link = link, maxIter = maxIter, numClasses = numClasses, numFeatures = numFeatures, numInputCols = numInputCols, numItems = numItems, numUsers = numUsers, optimizer = optimizer, regParam = regParam, rank = rank, smoothing = smoothing, tol = tol, vocabSize = vocabSize) } } - object MLParams { val empty = new MLParams() } diff --git a/src/main/scala/configs/mllib-small.yaml b/src/main/scala/configs/mllib-small.yaml index 558ccf0..111df0c 100644 --- a/src/main/scala/configs/mllib-small.yaml +++ b/src/main/scala/configs/mllib-small.yaml @@ -7,12 +7,62 @@ common: numPartitions: 3 randomSeed: [1] benchmarks: + - name: classification.DecisionTreeClassification + params: + numExamples: 100 + numTestExamples: 10 + depth: 3 + numClasses: 4 + numFeatures: 5 + - name: classification.GBTClassification + params: + numExamples: 100 + numTestExamples: 10 + depth: 3 + numClasses: 2 + numFeatures: 5 + maxIter: 3 + - name: classification.LinearSVC + params: + numExamples: 100 + numFeatures: 10 + regParam: 0.1 + tol: 0.001 + maxIter: 10 - name: classification.LogisticRegression params: numFeatures: 100 regParam: 0.1 tol: [0.2, 0.1] maxIter: 10 + - name: classification.NaiveBayes + params: + numExamples: 100 + smoothing: 1.0 + numClasses: 10 + numFeatures: [10] + - name: classification.RandomForestClassification + params: + numExamples: 100 + numTestExamples: 10 + depth: 3 + numClasses: 4 + numFeatures: 5 + maxIter: 3 + - name: clustering.GaussianMixture + params: + numExamples: 10 + numTestExamples: 10 + k: 5 + maxIter: 10 + tol: 0.01 + - name: clustering.KMeans + params: + numExamples: 10 + numTestExamples: 10 + k: 5 + maxIter: 10 + tol: 1e-4 - name: clustering.LDA params: numExamples: 10 @@ -24,72 +74,15 @@ benchmarks: optimizer: - em - online - - name: clustering.KMeans - params: - numExamples: 10 - numTestExamples: 10 - k: 5 - maxIter: 10 - - name: regression.GLMRegression - params: - numExamples: 100 - numTestExamples: 10 - numFeatures: 5 - link: log - family: gaussian - tol: 0.0 - maxIter: 10 - regParam: 0.1 - - name: classification.DecisionTreeClassification - params: - numExamples: 100 - numTestExamples: 10 - depth: 3 - numClasses: 4 - numFeatures: 5 - - name: classification.RandomForestClassification - params: - numExamples: 100 - numTestExamples: 10 - depth: 3 - numClasses: 4 - numFeatures: 5 - maxIter: 3 - - name: classification.GBTClassification - params: - numExamples: 100 - numTestExamples: 10 - depth: 3 - numClasses: 2 - numFeatures: 5 - maxIter: 3 - - name: regression.LinearRegression - params: - numExamples: 100 - numTestExamples: 100 - numFeatures: 100 - regParam: 0.1 - tol: [0.0] - maxIter: 10 - - name: recommendation.ALS - params: - numExamples: 100 - numTestExamples: 100 - numUsers: 100 - numItems: 100 - regParam: 0.1 - rank: 10 - maxIter: 6 - name: feature.Bucketizer params: numExamples: 100 bucketizerNumBuckets: 10 - - name: classification.NaiveBayes + - name: feature.HashingTF params: numExamples: 100 - smoothing: 1.0 - numClasses: 10 - numFeatures: [10] + docLength: 20 + vocabSize: 4 - name: feature.OneHotEncoder params: numExamples: 100 @@ -112,10 +105,44 @@ benchmarks: params: numExamples: 100 numFeatures: 10 - - name: classification.LinearSVC + - name: recommendation.ALS params: numExamples: 100 - numFeatures: 10 + numTestExamples: 100 + numUsers: 100 + numItems: 100 regParam: 0.1 - tol: 0.001 - maxIter: 10 \ No newline at end of file + rank: 10 + maxIter: 6 + - name: regression.DecisionTreeRegression + params: + numExamples: 100 + numTestExamples: 10 + depth: 3 + numClasses: 4 + numFeatures: 5 + - name: regression.GLMRegression + params: + numExamples: 100 + numTestExamples: 10 + numFeatures: 5 + link: log + family: gaussian + tol: 1e-6 + maxIter: 10 + regParam: 0.1 + - name: regression.LinearRegression + params: + numExamples: 100 + numTestExamples: 100 + numFeatures: 100 + regParam: 0.1 + tol: [1e-6] + maxIter: 10 + - name: regression.RandomForestRegression + params: + numExamples: 100 + numTestExamples: 10 + depth: 3 + numFeatures: 5 + maxIter: 3