add benchmark for FPGrowth (#113)

Note: 
Add a `ItemSetGenerator` class, use following algo:

1.Create P=`numItems` items (integers 0 to P-1)
2. Generate `numExample` rows, where each row (an itemset) is selected as follows:
  2.1 Choose the size of the itemset from a Poisson distribution
  2.2 Generate `size - 2` items by choosing integers from a Poisson distribution. Eliminate duplicates as needed.
  2.3 Add 2 new items in order to create actual association rules.
    2.3.1 For each itemset, pick the first item, and compute a new item = (firstItem + P / 2) % P, add new item to the set.
    2.3.2 For each itemset, pick the first 2 items (integers) and add them together (modulo P) to compute a new item to add to the set.
This commit is contained in:
WeichenXu 2017-09-05 01:48:05 +08:00 committed by jkbradley
parent bcda8fc1e5
commit f08bf31d18
5 changed files with 116 additions and 4 deletions

View File

@ -0,0 +1,59 @@
package com.databricks.spark.sql.perf.mllib.data
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.mllib.random.{PoissonGenerator, RandomDataGenerator}
class ItemSetGenerator(
val numItems: Int,
val avgItemSetSize: Int)
extends RandomDataGenerator[Array[String]] {
assert(avgItemSetSize > 2)
assert(numItems > 2)
private val rng = new java.util.Random()
private val itemSetSizeRng = new PoissonGenerator(avgItemSetSize - 2)
private val itemRng = new PoissonGenerator(numItems / 2.0)
override def setSeed(seed: Long) {
rng.setSeed(seed)
itemSetSizeRng.setSeed(seed)
itemRng.setSeed(seed)
}
override def nextValue(): Array[String] = {
// 1. generate size of itemset
val size = DataGenUtil.nextPoisson(itemSetSizeRng, v => v >= 1 && v <= numItems).toInt
val arrayBuff = new ArrayBuffer[Int](size + 2)
// 2. generate items in the itemset
var i = 0
while (i < size) {
val nextVal = DataGenUtil.nextPoisson(itemRng, (item: Double) => {
item >= 0 && item < numItems && !arrayBuff.contains(item)
}).toInt
arrayBuff.append(nextVal)
i += 1
}
// 3 generate association rules by adding two computed items
// 3.1 add a new item = (firstItem + numItems / 2) % numItems
val newItem1 = (arrayBuff(0) + numItems / 2) % numItems
if (!arrayBuff.contains(newItem1)) {
arrayBuff.append(newItem1)
}
// 3.2 add a new item = (firstItem + secondItem) % numItems
if (arrayBuff.size >= 2) {
val newItem2 = (arrayBuff(0) + arrayBuff(1)) % numItems
if (!arrayBuff.contains(newItem2)) {
arrayBuff.append(newItem2)
}
}
arrayBuff.map(_.toString).toArray
}
override def copy(): ItemSetGenerator
= new ItemSetGenerator(numItems, avgItemSetSize)
}

View File

@ -5,8 +5,7 @@ import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.recommendation.ALS.Rating
import org.apache.spark.mllib.random._
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
import org.apache.spark.sql.{SQLContext, DataFrame}
import org.apache.spark.sql.{DataFrame, SQLContext}
object DataGenerator {
@ -128,6 +127,22 @@ object DataGenerator {
new DocGenerator(vocabSize, avgDocLength), numExamples, numPartitions, seed)
sql.createDataFrame(rdd.map(Tuple1.apply)).toDF(dataColName)
}
def generateItemSet(
sql: SQLContext,
numExamples: Long,
seed: Long,
numPartitions: Int,
numItems: Int,
avgItemSetSize: Int): DataFrame = {
val rdd: RDD[Array[String]] = RandomRDDs.randomRDD(
sql.sparkContext,
new ItemSetGenerator(numItems, avgItemSetSize),
numExamples,
numPartitions,
seed)
sql.createDataFrame(rdd.map(Tuple1.apply)).toDF("items")
}
}

View File

@ -0,0 +1,31 @@
package com.databricks.spark.sql.perf.mllib.fpm
import org.apache.spark.ml
import org.apache.spark.ml.PipelineStage
import org.apache.spark.sql.DataFrame
import com.databricks.spark.sql.perf.mllib._
import com.databricks.spark.sql.perf.mllib.OptionImplicits._
import com.databricks.spark.sql.perf.mllib.data.DataGenerator
/** Object containing methods used in performance tests for FPGrowth */
object FPGrowth extends BenchmarkAlgorithm with TestFromTraining {
def trainingDataSet(ctx: MLBenchContext): DataFrame = {
import ctx.params._
DataGenerator.generateItemSet(
ctx.sqlContext,
numExamples,
ctx.seed(),
numPartitions,
numItems,
itemSetSize)
}
override def getPipelineStage(ctx: MLBenchContext): PipelineStage = {
new ml.fpm.FPGrowth()
.setItemsCol("items")
}
}

View File

@ -124,6 +124,7 @@ class MLParams(
val elasticNetParam: Option[Double] = None,
val family: Option[String] = None,
val featureArity: Option[Int] = None,
val itemSetSize: Option[Int] = None,
val k: Option[Int] = None,
val link: Option[String] = None,
val maxIter: Option[Int] = None,
@ -164,6 +165,7 @@ class MLParams(
elasticNetParam: Option[Double] = elasticNetParam,
family: Option[String] = family,
featureArity: Option[Int] = featureArity,
itemSetSize: Option[Int] = itemSetSize,
k: Option[Int] = k,
link: Option[String] = link,
maxIter: Option[Int] = maxIter,
@ -181,8 +183,8 @@ class MLParams(
new MLParams(randomSeed = randomSeed, numExamples = numExamples,
numTestExamples = numTestExamples, numPartitions = numPartitions,
bucketizerNumBuckets = bucketizerNumBuckets, depth = depth, docLength = docLength,
elasticNetParam = elasticNetParam, family = family, featureArity = featureArity, k = k,
link = link, maxIter = maxIter,
elasticNetParam = elasticNetParam, family = family, featureArity = featureArity,
itemSetSize = itemSetSize, k = k, link = link, maxIter = maxIter,
numClasses = numClasses, numFeatures = numFeatures, numInputCols = numInputCols,
numItems = numItems, numUsers = numUsers, optimizer = optimizer, regParam = regParam,
rank = rank, smoothing = smoothing, tol = tol, vocabSize = vocabSize)

View File

@ -146,3 +146,8 @@ benchmarks:
depth: 3
numFeatures: 5
maxIter: 3
- name: fpm.FPGrowth
params:
numExamples: 200
numItems: 6
itemSetSize: 3