From f08bf31d18e4ce9011f900fce30d5209be171223 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Tue, 5 Sep 2017 01:48:05 +0800 Subject: [PATCH] add benchmark for FPGrowth (#113) Note: Add a `ItemSetGenerator` class, use following algo: 1.Create P=`numItems` items (integers 0 to P-1) 2. Generate `numExample` rows, where each row (an itemset) is selected as follows: 2.1 Choose the size of the itemset from a Poisson distribution 2.2 Generate `size - 2` items by choosing integers from a Poisson distribution. Eliminate duplicates as needed. 2.3 Add 2 new items in order to create actual association rules. 2.3.1 For each itemset, pick the first item, and compute a new item = (firstItem + P / 2) % P, add new item to the set. 2.3.2 For each itemset, pick the first 2 items (integers) and add them together (modulo P) to compute a new item to add to the set. --- .../perf/mllib/data/ItemSetGenerator.scala | 59 +++++++++++++++++++ .../sql/perf/mllib/data/dataGeneration.scala | 19 +++++- .../spark/sql/perf/mllib/fpm/FPGrowth.scala | 31 ++++++++++ .../databricks/spark/sql/perf/results.scala | 6 +- src/main/scala/configs/mllib-small.yaml | 5 ++ 5 files changed, 116 insertions(+), 4 deletions(-) create mode 100644 src/main/scala/com/databricks/spark/sql/perf/mllib/data/ItemSetGenerator.scala create mode 100644 src/main/scala/com/databricks/spark/sql/perf/mllib/fpm/FPGrowth.scala diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/data/ItemSetGenerator.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/data/ItemSetGenerator.scala new file mode 100644 index 0000000..ec47b87 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/data/ItemSetGenerator.scala @@ -0,0 +1,59 @@ +package com.databricks.spark.sql.perf.mllib.data + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.mllib.random.{PoissonGenerator, RandomDataGenerator} + +class ItemSetGenerator( + val numItems: Int, + val avgItemSetSize: Int) + extends RandomDataGenerator[Array[String]] { + + assert(avgItemSetSize > 2) + assert(numItems > 2) + + private val rng = new java.util.Random() + private val itemSetSizeRng = new PoissonGenerator(avgItemSetSize - 2) + private val itemRng = new PoissonGenerator(numItems / 2.0) + + override def setSeed(seed: Long) { + rng.setSeed(seed) + itemSetSizeRng.setSeed(seed) + itemRng.setSeed(seed) + } + + override def nextValue(): Array[String] = { + // 1. generate size of itemset + val size = DataGenUtil.nextPoisson(itemSetSizeRng, v => v >= 1 && v <= numItems).toInt + val arrayBuff = new ArrayBuffer[Int](size + 2) + + // 2. generate items in the itemset + var i = 0 + while (i < size) { + val nextVal = DataGenUtil.nextPoisson(itemRng, (item: Double) => { + item >= 0 && item < numItems && !arrayBuff.contains(item) + }).toInt + arrayBuff.append(nextVal) + i += 1 + } + + // 3 generate association rules by adding two computed items + + // 3.1 add a new item = (firstItem + numItems / 2) % numItems + val newItem1 = (arrayBuff(0) + numItems / 2) % numItems + if (!arrayBuff.contains(newItem1)) { + arrayBuff.append(newItem1) + } + // 3.2 add a new item = (firstItem + secondItem) % numItems + if (arrayBuff.size >= 2) { + val newItem2 = (arrayBuff(0) + arrayBuff(1)) % numItems + if (!arrayBuff.contains(newItem2)) { + arrayBuff.append(newItem2) + } + } + arrayBuff.map(_.toString).toArray + } + + override def copy(): ItemSetGenerator + = new ItemSetGenerator(numItems, avgItemSetSize) +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/data/dataGeneration.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/data/dataGeneration.scala index e0d98cf..d283815 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/data/dataGeneration.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/data/dataGeneration.scala @@ -5,8 +5,7 @@ import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.recommendation.ALS.Rating import org.apache.spark.mllib.random._ import org.apache.spark.rdd.{PairRDDFunctions, RDD} -import org.apache.spark.sql.{SQLContext, DataFrame} - +import org.apache.spark.sql.{DataFrame, SQLContext} object DataGenerator { @@ -128,6 +127,22 @@ object DataGenerator { new DocGenerator(vocabSize, avgDocLength), numExamples, numPartitions, seed) sql.createDataFrame(rdd.map(Tuple1.apply)).toDF(dataColName) } + + def generateItemSet( + sql: SQLContext, + numExamples: Long, + seed: Long, + numPartitions: Int, + numItems: Int, + avgItemSetSize: Int): DataFrame = { + val rdd: RDD[Array[String]] = RandomRDDs.randomRDD( + sql.sparkContext, + new ItemSetGenerator(numItems, avgItemSetSize), + numExamples, + numPartitions, + seed) + sql.createDataFrame(rdd.map(Tuple1.apply)).toDF("items") + } } diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/fpm/FPGrowth.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/fpm/FPGrowth.scala new file mode 100644 index 0000000..35cf8a7 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/fpm/FPGrowth.scala @@ -0,0 +1,31 @@ +package com.databricks.spark.sql.perf.mllib.fpm + +import org.apache.spark.ml +import org.apache.spark.ml.PipelineStage +import org.apache.spark.sql.DataFrame + +import com.databricks.spark.sql.perf.mllib._ +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib.data.DataGenerator + + +/** Object containing methods used in performance tests for FPGrowth */ +object FPGrowth extends BenchmarkAlgorithm with TestFromTraining { + + def trainingDataSet(ctx: MLBenchContext): DataFrame = { + import ctx.params._ + + DataGenerator.generateItemSet( + ctx.sqlContext, + numExamples, + ctx.seed(), + numPartitions, + numItems, + itemSetSize) + } + + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { + new ml.fpm.FPGrowth() + .setItemsCol("items") + } +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/results.scala b/src/main/scala/com/databricks/spark/sql/perf/results.scala index 1ec93fd..a9f46c1 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/results.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/results.scala @@ -124,6 +124,7 @@ class MLParams( val elasticNetParam: Option[Double] = None, val family: Option[String] = None, val featureArity: Option[Int] = None, + val itemSetSize: Option[Int] = None, val k: Option[Int] = None, val link: Option[String] = None, val maxIter: Option[Int] = None, @@ -164,6 +165,7 @@ class MLParams( elasticNetParam: Option[Double] = elasticNetParam, family: Option[String] = family, featureArity: Option[Int] = featureArity, + itemSetSize: Option[Int] = itemSetSize, k: Option[Int] = k, link: Option[String] = link, maxIter: Option[Int] = maxIter, @@ -181,8 +183,8 @@ class MLParams( new MLParams(randomSeed = randomSeed, numExamples = numExamples, numTestExamples = numTestExamples, numPartitions = numPartitions, bucketizerNumBuckets = bucketizerNumBuckets, depth = depth, docLength = docLength, - elasticNetParam = elasticNetParam, family = family, featureArity = featureArity, k = k, - link = link, maxIter = maxIter, + elasticNetParam = elasticNetParam, family = family, featureArity = featureArity, + itemSetSize = itemSetSize, k = k, link = link, maxIter = maxIter, numClasses = numClasses, numFeatures = numFeatures, numInputCols = numInputCols, numItems = numItems, numUsers = numUsers, optimizer = optimizer, regParam = regParam, rank = rank, smoothing = smoothing, tol = tol, vocabSize = vocabSize) diff --git a/src/main/scala/configs/mllib-small.yaml b/src/main/scala/configs/mllib-small.yaml index 111df0c..d01f2be 100644 --- a/src/main/scala/configs/mllib-small.yaml +++ b/src/main/scala/configs/mllib-small.yaml @@ -146,3 +146,8 @@ benchmarks: depth: 3 numFeatures: 5 maxIter: 3 + - name: fpm.FPGrowth + params: + numExamples: 200 + numItems: 6 + itemSetSize: 3