update VectorAssembler test such that the dataset size is numExamples * numFeatures (#158)
Currently the dataset size is numExamples * numFeatures * numInputCols, which is much bigger than other ML perf tests. This PR updates its implementation and makes it more efficient at slicing vectors. Tested on the mllib-big.yaml and 3 runs finished in < 2 minutes.
This commit is contained in:
parent
e9ef9788c2
commit
2895ae1139
@ -13,35 +13,41 @@ import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext,
|
||||
/** Object for testing VectorAssembler performance */
|
||||
object VectorAssembler extends BenchmarkAlgorithm with TestFromTraining {
|
||||
|
||||
private def getInputCols(numInputCols: Int): Array[String] = {
|
||||
Array.tabulate(numInputCols)(i => s"c${i}")
|
||||
}
|
||||
|
||||
override def trainingDataSet(ctx: MLBenchContext): DataFrame = {
|
||||
import ctx.params._
|
||||
import ctx.sqlContext.implicits._
|
||||
|
||||
var df = DataGenerator.generateContinuousFeatures(
|
||||
require(numInputCols.get <= numFeatures.get,
|
||||
s"numInputCols (${numInputCols}) cannot be greater than numFeatures (${numFeatures}).")
|
||||
|
||||
val df = DataGenerator.generateContinuousFeatures(
|
||||
ctx.sqlContext,
|
||||
numExamples,
|
||||
ctx.seed(),
|
||||
numPartitions,
|
||||
numFeatures * numInputCols
|
||||
)
|
||||
val sliceVec = udf { (v: Vector, from: Int, until: Int) =>
|
||||
Vectors.dense(v.toArray.slice(from, until))
|
||||
numFeatures)
|
||||
|
||||
val slice = udf { (v: Vector, numSlices: Int) =>
|
||||
val data = v.toArray
|
||||
val n = data.length.toLong
|
||||
(0 until numSlices).map { i =>
|
||||
val start = ((i * n) / numSlices).toInt
|
||||
val end = ((i + 1) * n / numSlices).toInt
|
||||
Vectors.dense(data.slice(start, end))
|
||||
}
|
||||
}
|
||||
for (i <- (1 to numInputCols.get)) {
|
||||
val colName = s"inputCol${i.toString}"
|
||||
val fromIndex = (i - 1) * numFeatures
|
||||
val untilIndex = i * numFeatures
|
||||
df = df.withColumn(colName, sliceVec(col("features"), lit(fromIndex), lit(untilIndex)))
|
||||
}
|
||||
df.drop(col("features"))
|
||||
|
||||
val inputCols = getInputCols(numInputCols.get)
|
||||
df.select(slice(col("features"), lit(numInputCols.get)).as("slices"))
|
||||
.select((0 until numInputCols.get).map(i => col("slices")(i).as(inputCols(i))): _*)
|
||||
}
|
||||
|
||||
override def getPipelineStage(ctx: MLBenchContext): PipelineStage = {
|
||||
import ctx.params._
|
||||
|
||||
val inputCols = (1 to numInputCols.get)
|
||||
.map(i => s"inputCol${i.toString}").toArray
|
||||
|
||||
val inputCols = getInputCols(numInputCols.get)
|
||||
new ml.feature.VectorAssembler()
|
||||
.setInputCols(inputCols)
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user