From 6ec83fd0f7410fd7b6eb40e18fa442d7138a765c Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Fri, 1 Sep 2017 04:56:43 +0800 Subject: [PATCH] Add benchmark for LinearSVC/OnehotEncoder/VectorSlicer/VectorAssembler/StringIndexer/Tokenizer (#112) Add benchmark for: LinearSVC OnehotEncoder VectorSlicer VectorAssembler StringIndexer Tokenizer --- .../spark/sql/perf/mllib/MLLib.scala | 2 +- .../perf/mllib/classification/LinearSVC.scala | 45 +++++++++++ .../sql/perf/mllib/data/dataGeneration.scala | 81 +++++++++++++++++++ .../perf/mllib/feature/OneHotEncoder.scala | 34 ++++++++ .../perf/mllib/feature/StringIndexer.scala | 35 ++++++++ .../sql/perf/mllib/feature/Tokenizer.scala | 33 ++++++++ .../perf/mllib/feature/VectorAssembler.scala | 49 +++++++++++ .../sql/perf/mllib/feature/VectorSlicer.scala | 35 ++++++++ .../databricks/spark/sql/perf/results.scala | 8 +- src/main/scala/configs/mllib-small.yaml | 29 +++++++ .../org/apache/spark/ml/ModelBuilder.scala | 8 +- .../ClassificationModelBuilder.scala | 13 +++ 12 files changed, 368 insertions(+), 4 deletions(-) create mode 100644 src/main/scala/com/databricks/spark/sql/perf/mllib/classification/LinearSVC.scala create mode 100644 src/main/scala/com/databricks/spark/sql/perf/mllib/feature/OneHotEncoder.scala create mode 100644 src/main/scala/com/databricks/spark/sql/perf/mllib/feature/StringIndexer.scala create mode 100644 src/main/scala/com/databricks/spark/sql/perf/mllib/feature/Tokenizer.scala create mode 100644 src/main/scala/com/databricks/spark/sql/perf/mllib/feature/VectorAssembler.scala create mode 100644 src/main/scala/com/databricks/spark/sql/perf/mllib/feature/VectorSlicer.scala create mode 100644 src/main/scala/org/apache/spark/ml/classification/ClassificationModelBuilder.scala diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala index eceef96..80f73de 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala @@ -10,7 +10,7 @@ import org.apache.spark.{SparkConf, SparkContext} import com.databricks.spark.sql.perf._ -class MLLib(@transient sqlContext: SQLContext) +class MLLib(sqlContext: SQLContext) extends Benchmark(sqlContext) with Serializable { def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate())) diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/LinearSVC.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/LinearSVC.scala new file mode 100644 index 0000000..f3a87c6 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/LinearSVC.scala @@ -0,0 +1,45 @@ +package com.databricks.spark.sql.perf.mllib.classification + +import org.apache.spark.ml.evaluation.{Evaluator, MulticlassClassificationEvaluator} +import org.apache.spark.ml.{ModelBuilder, PipelineStage, Transformer} +import org.apache.spark.ml +import org.apache.spark.ml.linalg.Vectors + +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib._ +import com.databricks.spark.sql.perf.mllib.data.DataGenerator + +object LinearSVC extends BenchmarkAlgorithm + with TestFromTraining with TrainingSetFromTransformer with ScoringWithEvaluator { + + override protected def initialData(ctx: MLBenchContext) = { + import ctx.params._ + DataGenerator.generateContinuousFeatures( + ctx.sqlContext, + numExamples, + ctx.seed(), + numPartitions, + numFeatures) + } + + override protected def trueModel(ctx: MLBenchContext): Transformer = { + val rng = ctx.newGenerator() + val coefficients = + Vectors.dense(Array.fill[Double](ctx.params.numFeatures)(2 * rng.nextDouble() - 1)) + // Small intercept to prevent some skew in the data. + val intercept = 0.01 * (2 * rng.nextDouble - 1) + ModelBuilder.newLinearSVCModel(coefficients, intercept) + } + + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { + import ctx.params._ + new ml.classification.LinearSVC() + .setTol(tol) + .setMaxIter(maxIter) + .setRegParam(regParam) + } + + override protected def evaluator(ctx: MLBenchContext): Evaluator = + new MulticlassClassificationEvaluator() +} + diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/data/dataGeneration.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/data/dataGeneration.scala index 07a68a6..e0d98cf 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/data/dataGeneration.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/data/dataGeneration.scala @@ -103,6 +103,31 @@ object DataGenerator { (sql.createDataFrame(trainPruned), sql.createDataFrame(finalTest)) } + + def generateRandString( + sql: SQLContext, + numExamples: Long, + seed: Long, + numPartitions: Int, + distinctCount: Int, + dataColName: String): DataFrame = { + val rdd: RDD[String] = RandomRDDs.randomRDD(sql.sparkContext, + new RandStringGenerator(distinctCount), numExamples, numPartitions, seed) + sql.createDataFrame(rdd.map(Tuple1.apply)).toDF(dataColName) + } + + def generateDoc( + sql: SQLContext, + numExamples: Long, + seed: Long, + numPartitions: Int, + vocabSize: Int, + avgDocLength: Int, + dataColName: String): DataFrame = { + val rdd: RDD[String] = RandomRDDs.randomRDD(sql.sparkContext, + new DocGenerator(vocabSize, avgDocLength), numExamples, numPartitions, seed) + sql.createDataFrame(rdd.map(Tuple1.apply)).toDF(dataColName) + } } @@ -189,3 +214,59 @@ class GaussianMixtureDataGenerator( override def copy(): GaussianMixtureDataGenerator = new GaussianMixtureDataGenerator(numCenters, numFeatures, seed) } + +class RandStringGenerator( + distinctCount: Int) extends RandomDataGenerator[String] { + + private val rng = new java.util.Random() + + override def nextValue(): String = { + rng.nextInt(distinctCount).toString + } + + override def setSeed(seed: Long) { + rng.setSeed(seed) + } + + override def copy(): RandStringGenerator = new RandStringGenerator(distinctCount) +} + +class DocGenerator( + vocabSize: Int, + avgDocLength: Int, + maxDocLength: Int = 65535) extends RandomDataGenerator[String] { + + private val wordRng = new java.util.Random() + private val docLengthRng = new PoissonGenerator(avgDocLength) + + override def setSeed(seed: Long) { + wordRng.setSeed(seed) + docLengthRng.setSeed(seed) + } + + override def nextValue(): String = { + val docLength = DataGenUtil.nextPoisson(docLengthRng, v => v > 0 && v <= maxDocLength).toInt + val sb = new StringBuffer() + + var i = 0 + while (i < docLength) { + sb.append(" ") + sb.append(wordRng.nextInt(vocabSize).toString) + i += 1 + } + sb.toString + } + + override def copy(): DocGenerator = + new DocGenerator(vocabSize, avgDocLength) +} + +object DataGenUtil { + def nextPoisson(rng: PoissonGenerator, condition: Double => Boolean): Double = { + var value = 0.0 + do { + value = rng.nextValue() + } while (!condition(value)) + value + } +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/OneHotEncoder.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/OneHotEncoder.scala new file mode 100644 index 0000000..9ad4ceb --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/OneHotEncoder.scala @@ -0,0 +1,34 @@ +package com.databricks.spark.sql.perf.mllib.feature + +import org.apache.spark.ml +import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.PipelineStage +import org.apache.spark.sql._ + +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib.data.DataGenerator +import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext, TestFromTraining} + +/** Object for testing OneHotEncoder performance */ +object OneHotEncoder extends BenchmarkAlgorithm with TestFromTraining with UnaryTransformer { + + override def trainingDataSet(ctx: MLBenchContext): DataFrame = { + import ctx.params._ + import ctx.sqlContext.implicits._ + + DataGenerator.generateMixedFeatures( + ctx.sqlContext, + numExamples, + ctx.seed(), + numPartitions, + Array.fill(1)(featureArity.get) + ).rdd.map { case Row(vec: Vector) => + vec(0) // extract the single generated double value for each row + }.toDF(inputCol) + } + + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { + new ml.feature.OneHotEncoder() + .setInputCol(inputCol) + } +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/StringIndexer.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/StringIndexer.scala new file mode 100644 index 0000000..852cefa --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/StringIndexer.scala @@ -0,0 +1,35 @@ +package com.databricks.spark.sql.perf.mllib.feature + +import org.apache.spark.ml +import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.PipelineStage +import org.apache.spark.sql._ + +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib.data.DataGenerator +import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext, TestFromTraining} + +/** Object for testing StringIndexer performance */ +object StringIndexer extends BenchmarkAlgorithm with TestFromTraining with UnaryTransformer { + + override def trainingDataSet(ctx: MLBenchContext): DataFrame = { + import ctx.params._ + import ctx.sqlContext.implicits._ + + DataGenerator.generateRandString(ctx.sqlContext, + numExamples, + ctx.seed(), + numPartitions, + vocabSize, + inputCol) + } + + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { + import ctx.params._ + import ctx.sqlContext.implicits._ + + new ml.feature.StringIndexer() + .setInputCol(inputCol) + .setHandleInvalid("skip") + } +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/Tokenizer.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/Tokenizer.scala new file mode 100644 index 0000000..aa06666 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/Tokenizer.scala @@ -0,0 +1,33 @@ +package com.databricks.spark.sql.perf.mllib.feature + +import org.apache.spark.ml +import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.PipelineStage +import org.apache.spark.sql._ + +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib.data.DataGenerator +import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext, TestFromTraining} + +/** Object for testing Tokenizer performance */ +object Tokenizer extends BenchmarkAlgorithm with TestFromTraining with UnaryTransformer { + + override def trainingDataSet(ctx: MLBenchContext): DataFrame = { + import ctx.params._ + import ctx.sqlContext.implicits._ + + DataGenerator.generateDoc( + ctx.sqlContext, + numExamples, + ctx.seed(), + numPartitions, + vocabSize, + docLength, + inputCol) + } + + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { + new ml.feature.Tokenizer() + .setInputCol(inputCol) + } +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/VectorAssembler.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/VectorAssembler.scala new file mode 100644 index 0000000..914abec --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/VectorAssembler.scala @@ -0,0 +1,49 @@ +package com.databricks.spark.sql.perf.mllib.feature + +import org.apache.spark.ml +import org.apache.spark.ml.linalg.{Vector, Vectors} +import org.apache.spark.ml.PipelineStage +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ + +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib.data.DataGenerator +import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext, TestFromTraining} + +/** Object for testing VectorAssembler performance */ +object VectorAssembler extends BenchmarkAlgorithm with TestFromTraining { + + override def trainingDataSet(ctx: MLBenchContext): DataFrame = { + import ctx.params._ + import ctx.sqlContext.implicits._ + + var df = DataGenerator.generateContinuousFeatures( + ctx.sqlContext, + numExamples, + ctx.seed(), + numPartitions, + numFeatures * numInputCols + ) + val sliceVec = udf { (v: Vector, from: Int, until: Int) => + Vectors.dense(v.toArray.slice(from, until)) + } + for (i <- (1 to numInputCols.get)) { + val colName = s"inputCol${i.toString}" + val fromIndex = (i - 1) * numFeatures + val untilIndex = i * numFeatures + df = df.withColumn(colName, sliceVec(col("features"), lit(fromIndex), lit(untilIndex))) + } + df.drop(col("features")) + df + } + + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { + import ctx.params._ + + val inputCols = (1 to numInputCols.get) + .map(i => s"inputCol${i.toString}").toArray + + new ml.feature.VectorAssembler() + .setInputCols(inputCols) + } +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/VectorSlicer.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/VectorSlicer.scala new file mode 100644 index 0000000..77b66ca --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/feature/VectorSlicer.scala @@ -0,0 +1,35 @@ +package com.databricks.spark.sql.perf.mllib.feature + +import org.apache.spark.ml +import org.apache.spark.ml.PipelineStage +import org.apache.spark.sql._ + +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib.data.DataGenerator +import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext, TestFromTraining} + +/** Object for testing VectorSlicer performance */ +object VectorSlicer extends BenchmarkAlgorithm with TestFromTraining { + + override def trainingDataSet(ctx: MLBenchContext): DataFrame = { + import ctx.params._ + + DataGenerator.generateContinuousFeatures( + ctx.sqlContext, + numExamples, + ctx.seed(), + numPartitions, + numFeatures + ) + } + + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { + import ctx.params._ + + val indices = (0 until numFeatures by 2).toArray + + new ml.feature.VectorSlicer() + .setInputCol("features") + .setIndices(indices) + } +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/results.scala b/src/main/scala/com/databricks/spark/sql/perf/results.scala index 11cfcde..04898a0 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/results.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/results.scala @@ -123,11 +123,13 @@ class MLParams( val docLength: Option[Int] = None, val elasticNetParam: Option[Double] = None, val family: Option[String] = None, + val featureArity: Option[Int] = None, val k: Option[Int] = None, val link: Option[String] = None, val maxIter: Option[Int] = None, val numClasses: Option[Int] = None, val numFeatures: Option[Int] = None, + val numInputCols: Option[Int] = None, val numItems: Option[Int] = None, val numUsers: Option[Int] = None, val optimizer: Option[String] = None, @@ -161,11 +163,13 @@ class MLParams( docLength: Option[Int] = docLength, elasticNetParam: Option[Double] = elasticNetParam, family: Option[String] = family, + featureArity: Option[Int] = featureArity, k: Option[Int] = k, link: Option[String] = link, maxIter: Option[Int] = maxIter, numClasses: Option[Int] = numClasses, numFeatures: Option[Int] = numFeatures, + numInputCols: Option[Int] = numInputCols, numItems: Option[Int] = numItems, numUsers: Option[Int] = numUsers, vocabSize: Option[Int] = vocabSize, @@ -177,8 +181,8 @@ class MLParams( new MLParams(randomSeed = randomSeed, numExamples = numExamples, numTestExamples = numTestExamples, numPartitions = numPartitions, bucketizerNumBuckets = bucketizerNumBuckets, depth = depth, docLength = docLength, - elasticNetParam = elasticNetParam, family = family, k = k, link = link, maxIter = maxIter, - numClasses = numClasses, numFeatures = numFeatures, + elasticNetParam = elasticNetParam, family = family, featureArity = featureArity, k = k, link = link, maxIter = maxIter, + numClasses = numClasses, numFeatures = numFeatures, numInputCols = numInputCols, numItems = numItems, numUsers = numUsers, optimizer = optimizer, regParam = regParam, rank = rank, smoothing = smoothing, tol = tol, vocabSize = vocabSize) } diff --git a/src/main/scala/configs/mllib-small.yaml b/src/main/scala/configs/mllib-small.yaml index c0bb650..558ccf0 100644 --- a/src/main/scala/configs/mllib-small.yaml +++ b/src/main/scala/configs/mllib-small.yaml @@ -90,3 +90,32 @@ benchmarks: smoothing: 1.0 numClasses: 10 numFeatures: [10] + - name: feature.OneHotEncoder + params: + numExamples: 100 + featureArity: 10 + - name: feature.StringIndexer + params: + numExamples: 100 + vocabSize: 10 + - name: feature.Tokenizer + params: + numExamples: 100 + vocabSize: 10 + docLength: 10 + - name: feature.VectorAssembler + params: + numExamples: 100 + numInputCols: 5 + numFeatures: 10 + - name: feature.VectorSlicer + params: + numExamples: 100 + numFeatures: 10 + - name: classification.LinearSVC + params: + numExamples: 100 + numFeatures: 10 + regParam: 0.1 + tol: 0.001 + maxIter: 10 \ No newline at end of file diff --git a/src/main/scala/org/apache/spark/ml/ModelBuilder.scala b/src/main/scala/org/apache/spark/ml/ModelBuilder.scala index b45b68a..7ba8eb4 100644 --- a/src/main/scala/org/apache/spark/ml/ModelBuilder.scala +++ b/src/main/scala/org/apache/spark/ml/ModelBuilder.scala @@ -1,6 +1,6 @@ package org.apache.spark.ml -import org.apache.spark.ml.classification.{DecisionTreeClassificationModel, LogisticRegressionModel, NaiveBayesModel} +import org.apache.spark.ml.classification.{ClassificationModelBuilder, DecisionTreeClassificationModel, LinearSVCModel, LogisticRegressionModel, NaiveBayesModel} import org.apache.spark.ml.linalg.{Matrix, Vector} import org.apache.spark.ml.regression.{DecisionTreeRegressionModel, GeneralizedLinearRegressionModel, LinearRegressionModel} import org.apache.spark.ml.tree._ @@ -56,6 +56,12 @@ object ModelBuilder { val model = new NaiveBayesModel("naivebayes-uid", pi, theta) model.set(model.modelType, "multinomial") } + + def newLinearSVCModel( + coefficients: Vector, + intercept: Double): LinearSVCModel = { + ClassificationModelBuilder.newLinearSVCModel(coefficients, intercept) + } } /** diff --git a/src/main/scala/org/apache/spark/ml/classification/ClassificationModelBuilder.scala b/src/main/scala/org/apache/spark/ml/classification/ClassificationModelBuilder.scala new file mode 100644 index 0000000..485b883 --- /dev/null +++ b/src/main/scala/org/apache/spark/ml/classification/ClassificationModelBuilder.scala @@ -0,0 +1,13 @@ +package org.apache.spark.ml.classification + +import org.apache.spark.ml.linalg.{Matrix, Vector} + + +object ClassificationModelBuilder { + + def newLinearSVCModel( + coefficients: Vector, + intercept: Double): LinearSVCModel = { + new LinearSVCModel("linearSVC", coefficients, intercept) + } +}