Add benchmark for LinearSVC/OnehotEncoder/VectorSlicer/VectorAssembler/StringIndexer/Tokenizer (#112)

Add benchmark for:

LinearSVC
OnehotEncoder
VectorSlicer
VectorAssembler
StringIndexer
Tokenizer
This commit is contained in:
WeichenXu 2017-09-01 04:56:43 +08:00 committed by jkbradley
parent 737a1bc355
commit 6ec83fd0f7
12 changed files with 368 additions and 4 deletions

View File

@ -10,7 +10,7 @@ import org.apache.spark.{SparkConf, SparkContext}
import com.databricks.spark.sql.perf._
class MLLib(@transient sqlContext: SQLContext)
class MLLib(sqlContext: SQLContext)
extends Benchmark(sqlContext) with Serializable {
def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate()))

View File

@ -0,0 +1,45 @@
package com.databricks.spark.sql.perf.mllib.classification
import org.apache.spark.ml.evaluation.{Evaluator, MulticlassClassificationEvaluator}
import org.apache.spark.ml.{ModelBuilder, PipelineStage, Transformer}
import org.apache.spark.ml
import org.apache.spark.ml.linalg.Vectors
import com.databricks.spark.sql.perf.mllib.OptionImplicits._
import com.databricks.spark.sql.perf.mllib._
import com.databricks.spark.sql.perf.mllib.data.DataGenerator
object LinearSVC extends BenchmarkAlgorithm
with TestFromTraining with TrainingSetFromTransformer with ScoringWithEvaluator {
override protected def initialData(ctx: MLBenchContext) = {
import ctx.params._
DataGenerator.generateContinuousFeatures(
ctx.sqlContext,
numExamples,
ctx.seed(),
numPartitions,
numFeatures)
}
override protected def trueModel(ctx: MLBenchContext): Transformer = {
val rng = ctx.newGenerator()
val coefficients =
Vectors.dense(Array.fill[Double](ctx.params.numFeatures)(2 * rng.nextDouble() - 1))
// Small intercept to prevent some skew in the data.
val intercept = 0.01 * (2 * rng.nextDouble - 1)
ModelBuilder.newLinearSVCModel(coefficients, intercept)
}
override def getPipelineStage(ctx: MLBenchContext): PipelineStage = {
import ctx.params._
new ml.classification.LinearSVC()
.setTol(tol)
.setMaxIter(maxIter)
.setRegParam(regParam)
}
override protected def evaluator(ctx: MLBenchContext): Evaluator =
new MulticlassClassificationEvaluator()
}

View File

@ -103,6 +103,31 @@ object DataGenerator {
(sql.createDataFrame(trainPruned), sql.createDataFrame(finalTest))
}
def generateRandString(
sql: SQLContext,
numExamples: Long,
seed: Long,
numPartitions: Int,
distinctCount: Int,
dataColName: String): DataFrame = {
val rdd: RDD[String] = RandomRDDs.randomRDD(sql.sparkContext,
new RandStringGenerator(distinctCount), numExamples, numPartitions, seed)
sql.createDataFrame(rdd.map(Tuple1.apply)).toDF(dataColName)
}
def generateDoc(
sql: SQLContext,
numExamples: Long,
seed: Long,
numPartitions: Int,
vocabSize: Int,
avgDocLength: Int,
dataColName: String): DataFrame = {
val rdd: RDD[String] = RandomRDDs.randomRDD(sql.sparkContext,
new DocGenerator(vocabSize, avgDocLength), numExamples, numPartitions, seed)
sql.createDataFrame(rdd.map(Tuple1.apply)).toDF(dataColName)
}
}
@ -189,3 +214,59 @@ class GaussianMixtureDataGenerator(
override def copy(): GaussianMixtureDataGenerator =
new GaussianMixtureDataGenerator(numCenters, numFeatures, seed)
}
class RandStringGenerator(
distinctCount: Int) extends RandomDataGenerator[String] {
private val rng = new java.util.Random()
override def nextValue(): String = {
rng.nextInt(distinctCount).toString
}
override def setSeed(seed: Long) {
rng.setSeed(seed)
}
override def copy(): RandStringGenerator = new RandStringGenerator(distinctCount)
}
class DocGenerator(
vocabSize: Int,
avgDocLength: Int,
maxDocLength: Int = 65535) extends RandomDataGenerator[String] {
private val wordRng = new java.util.Random()
private val docLengthRng = new PoissonGenerator(avgDocLength)
override def setSeed(seed: Long) {
wordRng.setSeed(seed)
docLengthRng.setSeed(seed)
}
override def nextValue(): String = {
val docLength = DataGenUtil.nextPoisson(docLengthRng, v => v > 0 && v <= maxDocLength).toInt
val sb = new StringBuffer()
var i = 0
while (i < docLength) {
sb.append(" ")
sb.append(wordRng.nextInt(vocabSize).toString)
i += 1
}
sb.toString
}
override def copy(): DocGenerator =
new DocGenerator(vocabSize, avgDocLength)
}
object DataGenUtil {
def nextPoisson(rng: PoissonGenerator, condition: Double => Boolean): Double = {
var value = 0.0
do {
value = rng.nextValue()
} while (!condition(value))
value
}
}

View File

@ -0,0 +1,34 @@
package com.databricks.spark.sql.perf.mllib.feature
import org.apache.spark.ml
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.PipelineStage
import org.apache.spark.sql._
import com.databricks.spark.sql.perf.mllib.OptionImplicits._
import com.databricks.spark.sql.perf.mllib.data.DataGenerator
import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext, TestFromTraining}
/** Object for testing OneHotEncoder performance */
object OneHotEncoder extends BenchmarkAlgorithm with TestFromTraining with UnaryTransformer {
override def trainingDataSet(ctx: MLBenchContext): DataFrame = {
import ctx.params._
import ctx.sqlContext.implicits._
DataGenerator.generateMixedFeatures(
ctx.sqlContext,
numExamples,
ctx.seed(),
numPartitions,
Array.fill(1)(featureArity.get)
).rdd.map { case Row(vec: Vector) =>
vec(0) // extract the single generated double value for each row
}.toDF(inputCol)
}
override def getPipelineStage(ctx: MLBenchContext): PipelineStage = {
new ml.feature.OneHotEncoder()
.setInputCol(inputCol)
}
}

View File

@ -0,0 +1,35 @@
package com.databricks.spark.sql.perf.mllib.feature
import org.apache.spark.ml
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.PipelineStage
import org.apache.spark.sql._
import com.databricks.spark.sql.perf.mllib.OptionImplicits._
import com.databricks.spark.sql.perf.mllib.data.DataGenerator
import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext, TestFromTraining}
/** Object for testing StringIndexer performance */
object StringIndexer extends BenchmarkAlgorithm with TestFromTraining with UnaryTransformer {
override def trainingDataSet(ctx: MLBenchContext): DataFrame = {
import ctx.params._
import ctx.sqlContext.implicits._
DataGenerator.generateRandString(ctx.sqlContext,
numExamples,
ctx.seed(),
numPartitions,
vocabSize,
inputCol)
}
override def getPipelineStage(ctx: MLBenchContext): PipelineStage = {
import ctx.params._
import ctx.sqlContext.implicits._
new ml.feature.StringIndexer()
.setInputCol(inputCol)
.setHandleInvalid("skip")
}
}

View File

@ -0,0 +1,33 @@
package com.databricks.spark.sql.perf.mllib.feature
import org.apache.spark.ml
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.PipelineStage
import org.apache.spark.sql._
import com.databricks.spark.sql.perf.mllib.OptionImplicits._
import com.databricks.spark.sql.perf.mllib.data.DataGenerator
import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext, TestFromTraining}
/** Object for testing Tokenizer performance */
object Tokenizer extends BenchmarkAlgorithm with TestFromTraining with UnaryTransformer {
override def trainingDataSet(ctx: MLBenchContext): DataFrame = {
import ctx.params._
import ctx.sqlContext.implicits._
DataGenerator.generateDoc(
ctx.sqlContext,
numExamples,
ctx.seed(),
numPartitions,
vocabSize,
docLength,
inputCol)
}
override def getPipelineStage(ctx: MLBenchContext): PipelineStage = {
new ml.feature.Tokenizer()
.setInputCol(inputCol)
}
}

View File

@ -0,0 +1,49 @@
package com.databricks.spark.sql.perf.mllib.feature
import org.apache.spark.ml
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.PipelineStage
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import com.databricks.spark.sql.perf.mllib.OptionImplicits._
import com.databricks.spark.sql.perf.mllib.data.DataGenerator
import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext, TestFromTraining}
/** Object for testing VectorAssembler performance */
object VectorAssembler extends BenchmarkAlgorithm with TestFromTraining {
override def trainingDataSet(ctx: MLBenchContext): DataFrame = {
import ctx.params._
import ctx.sqlContext.implicits._
var df = DataGenerator.generateContinuousFeatures(
ctx.sqlContext,
numExamples,
ctx.seed(),
numPartitions,
numFeatures * numInputCols
)
val sliceVec = udf { (v: Vector, from: Int, until: Int) =>
Vectors.dense(v.toArray.slice(from, until))
}
for (i <- (1 to numInputCols.get)) {
val colName = s"inputCol${i.toString}"
val fromIndex = (i - 1) * numFeatures
val untilIndex = i * numFeatures
df = df.withColumn(colName, sliceVec(col("features"), lit(fromIndex), lit(untilIndex)))
}
df.drop(col("features"))
df
}
override def getPipelineStage(ctx: MLBenchContext): PipelineStage = {
import ctx.params._
val inputCols = (1 to numInputCols.get)
.map(i => s"inputCol${i.toString}").toArray
new ml.feature.VectorAssembler()
.setInputCols(inputCols)
}
}

View File

@ -0,0 +1,35 @@
package com.databricks.spark.sql.perf.mllib.feature
import org.apache.spark.ml
import org.apache.spark.ml.PipelineStage
import org.apache.spark.sql._
import com.databricks.spark.sql.perf.mllib.OptionImplicits._
import com.databricks.spark.sql.perf.mllib.data.DataGenerator
import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext, TestFromTraining}
/** Object for testing VectorSlicer performance */
object VectorSlicer extends BenchmarkAlgorithm with TestFromTraining {
override def trainingDataSet(ctx: MLBenchContext): DataFrame = {
import ctx.params._
DataGenerator.generateContinuousFeatures(
ctx.sqlContext,
numExamples,
ctx.seed(),
numPartitions,
numFeatures
)
}
override def getPipelineStage(ctx: MLBenchContext): PipelineStage = {
import ctx.params._
val indices = (0 until numFeatures by 2).toArray
new ml.feature.VectorSlicer()
.setInputCol("features")
.setIndices(indices)
}
}

View File

@ -123,11 +123,13 @@ class MLParams(
val docLength: Option[Int] = None,
val elasticNetParam: Option[Double] = None,
val family: Option[String] = None,
val featureArity: Option[Int] = None,
val k: Option[Int] = None,
val link: Option[String] = None,
val maxIter: Option[Int] = None,
val numClasses: Option[Int] = None,
val numFeatures: Option[Int] = None,
val numInputCols: Option[Int] = None,
val numItems: Option[Int] = None,
val numUsers: Option[Int] = None,
val optimizer: Option[String] = None,
@ -161,11 +163,13 @@ class MLParams(
docLength: Option[Int] = docLength,
elasticNetParam: Option[Double] = elasticNetParam,
family: Option[String] = family,
featureArity: Option[Int] = featureArity,
k: Option[Int] = k,
link: Option[String] = link,
maxIter: Option[Int] = maxIter,
numClasses: Option[Int] = numClasses,
numFeatures: Option[Int] = numFeatures,
numInputCols: Option[Int] = numInputCols,
numItems: Option[Int] = numItems,
numUsers: Option[Int] = numUsers,
vocabSize: Option[Int] = vocabSize,
@ -177,8 +181,8 @@ class MLParams(
new MLParams(randomSeed = randomSeed, numExamples = numExamples,
numTestExamples = numTestExamples, numPartitions = numPartitions,
bucketizerNumBuckets = bucketizerNumBuckets, depth = depth, docLength = docLength,
elasticNetParam = elasticNetParam, family = family, k = k, link = link, maxIter = maxIter,
numClasses = numClasses, numFeatures = numFeatures,
elasticNetParam = elasticNetParam, family = family, featureArity = featureArity, k = k, link = link, maxIter = maxIter,
numClasses = numClasses, numFeatures = numFeatures, numInputCols = numInputCols,
numItems = numItems, numUsers = numUsers, optimizer = optimizer, regParam = regParam,
rank = rank, smoothing = smoothing, tol = tol, vocabSize = vocabSize)
}

View File

@ -90,3 +90,32 @@ benchmarks:
smoothing: 1.0
numClasses: 10
numFeatures: [10]
- name: feature.OneHotEncoder
params:
numExamples: 100
featureArity: 10
- name: feature.StringIndexer
params:
numExamples: 100
vocabSize: 10
- name: feature.Tokenizer
params:
numExamples: 100
vocabSize: 10
docLength: 10
- name: feature.VectorAssembler
params:
numExamples: 100
numInputCols: 5
numFeatures: 10
- name: feature.VectorSlicer
params:
numExamples: 100
numFeatures: 10
- name: classification.LinearSVC
params:
numExamples: 100
numFeatures: 10
regParam: 0.1
tol: 0.001
maxIter: 10

View File

@ -1,6 +1,6 @@
package org.apache.spark.ml
import org.apache.spark.ml.classification.{DecisionTreeClassificationModel, LogisticRegressionModel, NaiveBayesModel}
import org.apache.spark.ml.classification.{ClassificationModelBuilder, DecisionTreeClassificationModel, LinearSVCModel, LogisticRegressionModel, NaiveBayesModel}
import org.apache.spark.ml.linalg.{Matrix, Vector}
import org.apache.spark.ml.regression.{DecisionTreeRegressionModel, GeneralizedLinearRegressionModel, LinearRegressionModel}
import org.apache.spark.ml.tree._
@ -56,6 +56,12 @@ object ModelBuilder {
val model = new NaiveBayesModel("naivebayes-uid", pi, theta)
model.set(model.modelType, "multinomial")
}
def newLinearSVCModel(
coefficients: Vector,
intercept: Double): LinearSVCModel = {
ClassificationModelBuilder.newLinearSVCModel(coefficients, intercept)
}
}
/**

View File

@ -0,0 +1,13 @@
package org.apache.spark.ml.classification
import org.apache.spark.ml.linalg.{Matrix, Vector}
object ClassificationModelBuilder {
def newLinearSVCModel(
coefficients: Vector,
intercept: Double): LinearSVCModel = {
new LinearSVCModel("linearSVC", coefficients, intercept)
}
}