Compare commits
3 Commits
master
...
newversion
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7f454dffec | ||
|
|
322efe9343 | ||
|
|
cb9479a58e |
@ -1,8 +1,7 @@
|
||||
language: scala
|
||||
scala:
|
||||
- 2.12.10
|
||||
- 2.11.8
|
||||
sudo: false
|
||||
dist: trusty
|
||||
jdk:
|
||||
oraclejdk8
|
||||
cache:
|
||||
|
||||
63
README.md
63
README.md
@ -18,8 +18,6 @@ Usage: spark-sql-perf [options]
|
||||
|
||||
-b <value> | --benchmark <value>
|
||||
the name of the benchmark to run
|
||||
-m <value> | --master <value
|
||||
the master url to use
|
||||
-f <value> | --filter <value>
|
||||
a filter on the name of the queries to run
|
||||
-i <value> | --iterations <value>
|
||||
@ -34,18 +32,7 @@ The first run of `bin/run` will build the library.
|
||||
|
||||
## Build
|
||||
|
||||
Use `sbt package` or `sbt assembly` to build the library jar.
|
||||
Use `sbt +package` to build for scala 2.11 and 2.12.
|
||||
|
||||
## Local performance tests
|
||||
The framework contains twelve benchmarks that can be executed in local mode. They are organized into three classes and target different components and functions of Spark:
|
||||
* [DatasetPerformance](https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/DatasetPerformance.scala) compares the performance of the old RDD API with the new Dataframe and Dataset APIs.
|
||||
These benchmarks can be launched with the command `bin/run --benchmark DatasetPerformance`
|
||||
* [JoinPerformance](https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala) compares the performance of joining different table sizes and shapes with different join types.
|
||||
These benchmarks can be launched with the command `bin/run --benchmark JoinPerformance`
|
||||
* [AggregationPerformance](https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala) compares the performance of aggregating different table sizes using different aggregation types.
|
||||
These benchmarks can be launched with the command `bin/run --benchmark AggregationPerformance`
|
||||
|
||||
Use `sbt package` or `sbt assembly` to build the library jar.
|
||||
|
||||
# MLlib tests
|
||||
|
||||
@ -67,11 +54,31 @@ TPCDS kit needs to be installed on all cluster executor nodes under the same pat
|
||||
It can be found [here](https://github.com/databricks/tpcds-kit).
|
||||
|
||||
```
|
||||
// Generate the data
|
||||
build/sbt "test:runMain com.databricks.spark.sql.perf.tpcds.GenTPCDSData -d <dsdgenDir> -s <scaleFactor> -l <location> -f <format>"
|
||||
```
|
||||
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
|
||||
|
||||
// Set:
|
||||
val rootDir = ... // root directory of location to create data in.
|
||||
val databaseName = ... // name of database to create.
|
||||
val scaleFactor = ... // scaleFactor defines the size of the dataset to generate (in GB).
|
||||
val format = ... // valid spark format like parquet "parquet".
|
||||
// Run:
|
||||
val tables = new TPCDSTables(sqlContext,
|
||||
dsdgenDir = "/tmp/tpcds-kit/tools", // location of dsdgen
|
||||
scaleFactor = scaleFactor,
|
||||
useDoubleForDecimal = false, // true to replace DecimalType with DoubleType
|
||||
useStringForDate = false) // true to replace DateType with StringType
|
||||
|
||||
|
||||
tables.genData(
|
||||
location = rootDir,
|
||||
format = format,
|
||||
overwrite = true, // overwrite the data that is already there
|
||||
partitionTables = true, // create the partitioned fact tables
|
||||
clusterByPartitionColumns = true, // shuffle to get partitions coalesced into single files.
|
||||
filterOutNullPartitionValues = false, // true to filter out the partition with NULL key value
|
||||
tableFilter = "", // "" means generate all tables
|
||||
numPartitions = 100) // how many dsdgen partitions to run - number of input tasks.
|
||||
|
||||
```
|
||||
// Create the specified database
|
||||
sql(s"create database $databaseName")
|
||||
// Create metastore tables in a specified database for your data.
|
||||
@ -133,21 +140,9 @@ experiment.getCurrentResults // or: spark.read.json(resultLocation).filter("time
|
||||
.select('Name, 'Runtime)
|
||||
```
|
||||
|
||||
# TPC-H
|
||||
## Running in Databricks.
|
||||
|
||||
TPC-H can be run similarly to TPC-DS replacing `tpcds` for `tpch`.
|
||||
Take a look at the data generator and `tpch_run` notebook code below.
|
||||
|
||||
## Running in Databricks workspace (or spark-shell)
|
||||
|
||||
There are example notebooks in `src/main/notebooks` for running TPCDS and TPCH in the Databricks environment.
|
||||
_These scripts can also be run from spark-shell command line with minor modifications using `:load file_name.scala`._
|
||||
|
||||
### TPC-multi_datagen notebook
|
||||
This notebook (or scala script) can be use to generate both TPCDS and TPCH data at selected scale factors.
|
||||
It is a newer version from the `tpcds_datagen` notebook below. To use it:
|
||||
* Edit the config variables the top of the script.
|
||||
* Run the whole notebook.
|
||||
There are example notebooks in `src/main/notebooks` for running TPCDS in the Databricks environment.
|
||||
|
||||
### tpcds_datagen notebook
|
||||
|
||||
@ -165,7 +160,3 @@ For running parallel TPCDS streams:
|
||||
* Create a Job using the notebook and attaching to the created cluster as "existing cluster".
|
||||
* Allow concurrent runs of the created job.
|
||||
* Launch appriopriate number of Runs of the Job to run in parallel on the cluster.
|
||||
|
||||
### tpch_run notebook
|
||||
|
||||
This notebook can be used to run TPCH queries. Data needs be generated first.
|
||||
18
build.sbt
18
build.sbt
@ -5,16 +5,16 @@ name := "spark-sql-perf"
|
||||
|
||||
organization := "com.databricks"
|
||||
|
||||
scalaVersion := "2.12.10"
|
||||
scalaVersion := "2.11.8"
|
||||
|
||||
crossScalaVersions := Seq("2.12.10")
|
||||
crossScalaVersions := Seq("2.11.8")
|
||||
|
||||
sparkPackageName := "databricks/spark-sql-perf"
|
||||
|
||||
// All Spark Packages need a license
|
||||
licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0"))
|
||||
|
||||
sparkVersion := "3.0.0"
|
||||
sparkVersion := "2.3.0"
|
||||
|
||||
sparkComponents ++= Seq("sql", "hive", "mllib")
|
||||
|
||||
@ -32,13 +32,17 @@ initialCommands in console :=
|
||||
|import sqlContext.implicits._
|
||||
""".stripMargin
|
||||
|
||||
libraryDependencies += "com.github.scopt" %% "scopt" % "3.7.1"
|
||||
libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.5"
|
||||
|
||||
libraryDependencies += "com.twitter" %% "util-jvm" % "6.45.0" % "provided"
|
||||
libraryDependencies += "com.github.scopt" %% "scopt" % "3.3.0"
|
||||
|
||||
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test"
|
||||
libraryDependencies += "com.twitter" %% "util-jvm" % "6.23.0" % "provided"
|
||||
|
||||
libraryDependencies += "org.yaml" % "snakeyaml" % "1.23"
|
||||
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.1" % "test"
|
||||
|
||||
libraryDependencies += "org.yaml" % "snakeyaml" % "1.17"
|
||||
|
||||
libraryDependencies += "com.typesafe.scala-logging" %% "scala-logging-slf4j" % "2.1.2"
|
||||
|
||||
fork := true
|
||||
|
||||
|
||||
@ -26,13 +26,6 @@ else
|
||||
declare java_cmd=java
|
||||
fi
|
||||
|
||||
if test -x "$SBT_HOME"; then
|
||||
echo -e "Using $SBT_HOME as default SBT_HOME - should be the jar name!"
|
||||
# Could be at /usr/share/sbt-launcher-packaging/bin/sbt-launch.jar
|
||||
# so this would be export SBT_HOME=/usr/share/sbt-launcher-packaging/bin/sbt-launch.jar
|
||||
sbt_jar=${SBT_HOME}
|
||||
fi
|
||||
|
||||
echoerr () {
|
||||
echo 1>&2 "$@"
|
||||
}
|
||||
@ -172,9 +165,7 @@ process_args () {
|
||||
}
|
||||
|
||||
run() {
|
||||
# first check SBT_HOME is present so we use what's already available
|
||||
sbt_jar=$SBT_HOME
|
||||
# if there's no jar let's download it.
|
||||
# no jar? download it.
|
||||
[[ -f "$sbt_jar" ]] || acquire_sbt_jar "$sbt_version" || {
|
||||
# still no jar? uh-oh.
|
||||
echo "Download failed. Obtain the sbt-launch.jar manually and place it at $sbt_jar"
|
||||
|
||||
@ -1,2 +1,2 @@
|
||||
// This file should only contain the version of sbt to use.
|
||||
sbt.version=0.13.18
|
||||
sbt.version=0.13.8
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
// You may use this file to add plugin dependencies for sbt.
|
||||
|
||||
resolvers += "Spark Packages repo" at "https://repos.spark-packages.org/"
|
||||
resolvers += "Spark Packages repo" at "https://dl.bintray.com/spark-packages/maven/"
|
||||
|
||||
resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/"
|
||||
|
||||
@ -14,4 +14,4 @@ addSbtPlugin("com.databricks" %% "sbt-databricks" % "0.1.3")
|
||||
|
||||
addSbtPlugin("me.lessis" % "bintray-sbt" % "0.3.0")
|
||||
|
||||
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
|
||||
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
|
||||
2
src/main/R/.gitignore
vendored
2
src/main/R/.gitignore
vendored
@ -1,2 +0,0 @@
|
||||
results
|
||||
metastore_db
|
||||
@ -1,123 +0,0 @@
|
||||
# SparkR \*apply() benchmark
|
||||
|
||||
This is a performance task for SparkR \*apply API, including spark.lapply, dapply/dapplyCollect, gapply/gapplyCollect.
|
||||
|
||||
`define_benchmark.r` generates data in different types and sizes.
|
||||
|
||||
`run_benchmark.r` runs tests and save results in results/results.csv and plots in .png files.
|
||||
|
||||
## Requirements
|
||||
|
||||
- R library: microbenchmark
|
||||
- databricks/spark
|
||||
|
||||
To install microbenchmark, run R script in R shell:
|
||||
```
|
||||
install.packages("microbenchmark")
|
||||
```
|
||||
|
||||
Build [Databricks Spark](https://github.com/databricks/spark) locally ([instructions](https://databricks.atlassian.net/wiki/spaces/UN/pages/194805843/0.+Building+and+Running+Spark+Locally)).
|
||||
|
||||
Use the path to the root of the above repository as SPARK_HOME, and use it in the shell command below.
|
||||
|
||||
## How to run
|
||||
|
||||
In shell:
|
||||
|
||||
```
|
||||
sparkr-tests $ ./run_benchmark.sh <your SPARK_HOME> small # run small test (~10 min)
|
||||
sparkr-tests $ ./run_benchmark.sh <your SPARK_HOME> medium # run medium test (~30 min)
|
||||
sparkr-tests $ ./run_benchmark.sh <your SPARK_HOME> large # run large test (~90 min)
|
||||
```
|
||||
|
||||
## Synthetic Data
|
||||
|
||||
|
||||
For benchmarking spark.lapply, we generate
|
||||
```
|
||||
lists
|
||||
```
|
||||
1. with different types (7):
|
||||
```
|
||||
Length = 100,
|
||||
Type = integer,
|
||||
logical,
|
||||
double,
|
||||
character1,
|
||||
character10,
|
||||
character100,
|
||||
character1k.
|
||||
```
|
||||
2. with different lengths (4):
|
||||
```
|
||||
Type = integer,
|
||||
Length = 10,
|
||||
100,
|
||||
1k,
|
||||
10k
|
||||
```
|
||||
For benchmarking dapply+rnow/dapplyCollect, we generate
|
||||
```
|
||||
data.frame
|
||||
```
|
||||
1. with different types (7):
|
||||
```
|
||||
nrow = 100,
|
||||
ncol = 1,
|
||||
Type = integer,
|
||||
logical,
|
||||
double,
|
||||
character1,
|
||||
character10,
|
||||
character100,
|
||||
character1k.
|
||||
```
|
||||
2. with different lengths (4):
|
||||
```
|
||||
ncol = 1,
|
||||
Type = integer,
|
||||
nrow = 10,
|
||||
100,
|
||||
1k,
|
||||
10k
|
||||
```
|
||||
3. with different ncols (3):
|
||||
```
|
||||
nrow = 100,
|
||||
Type = integer,
|
||||
ncol = 1,
|
||||
10,
|
||||
100
|
||||
```
|
||||
For benchmarking gapply+rnow/gapplyCollect, we generate
|
||||
```
|
||||
data.frame
|
||||
```
|
||||
1. with different number of keys (3):
|
||||
```
|
||||
ncol = 2,
|
||||
nrow = 1k,
|
||||
Type = <integer, double>,
|
||||
nkeys = 10,
|
||||
100,
|
||||
1000
|
||||
```
|
||||
2. with different lengths (4):
|
||||
```
|
||||
ncol = 2,
|
||||
Type = <integer, double>,
|
||||
nkeys = 10,
|
||||
nrow = 10,
|
||||
100,
|
||||
1k,
|
||||
10k
|
||||
```
|
||||
3. with different key types (3):
|
||||
```
|
||||
ncol = 2,
|
||||
nkeys = 10,
|
||||
nrow = 1k,
|
||||
Type = <integer, double>
|
||||
<character10, double>
|
||||
<character100, double>
|
||||
```
|
||||
@ -1,671 +0,0 @@
|
||||
library("SparkR")
|
||||
library("microbenchmark")
|
||||
library("magrittr")
|
||||
library("ggplot2")
|
||||
library("pryr")
|
||||
|
||||
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
|
||||
|
||||
dir_path <- "results/"
|
||||
|
||||
# For benchmarking spark.lapply, we generate
|
||||
|
||||
# lists
|
||||
|
||||
# 1. with different types (7):
|
||||
# Length = 100,
|
||||
# Type = integer,
|
||||
# logical,
|
||||
# double,
|
||||
# character1,
|
||||
# character10,
|
||||
# character100,
|
||||
# character1k.
|
||||
|
||||
data.list.type.double <- runif(100)
|
||||
data.list.type.int <- as.integer(data.list.type.double * 100)
|
||||
data.list.type.logical <- data.list.type.double < 0.5
|
||||
data.list.type.char1 <- sapply(1:100, function(x) { "a" })
|
||||
data.list.type.char10 <- sapply(1:100, function(x) { "0123456789" })
|
||||
data.list.type.char100 <- sapply(1:100, function(x) { paste(replicate(20, "hello"), collapse = "") })
|
||||
data.list.type.char1k <- sapply(1:100, function(x) { paste(replicate(200, "hello"), collapse = "") })
|
||||
|
||||
# 2. with different lengths (4):
|
||||
# Type = integer,
|
||||
# Length = 10,
|
||||
# 100,
|
||||
# 1k,
|
||||
# 10k
|
||||
|
||||
data.list.len.10 <- 1:10
|
||||
data.list.len.100 <- 1:100
|
||||
data.list.len.1k <- 1:1000
|
||||
data.list.len.10k <- 1:10000
|
||||
|
||||
# For benchmarking dapply+rnow/dapplyCollect, we generate
|
||||
|
||||
# data.frame
|
||||
|
||||
# 1. with different types (7):
|
||||
# nrow = 100,
|
||||
# ncol = 1,
|
||||
# Type = integer,
|
||||
# logical,
|
||||
# double,
|
||||
# character1,
|
||||
# character10,
|
||||
# character100,
|
||||
# character1k.
|
||||
|
||||
data.df.type.double <- data.frame(data.list.type.double) %>% createDataFrame %>% cache
|
||||
data.df.type.int <- data.frame(data.list.type.int) %>% createDataFrame %>% cache
|
||||
data.df.type.logical <- data.frame(data.list.type.logical) %>% createDataFrame %>% cache
|
||||
data.df.type.char1 <- data.frame(data.list.type.char1) %>% createDataFrame %>% cache
|
||||
data.df.type.char10 <- data.frame(data.list.type.char10) %>% createDataFrame %>% cache
|
||||
data.df.type.char100 <- data.frame(data.list.type.char100) %>% createDataFrame %>% cache
|
||||
data.df.type.char1k <- data.frame(data.list.type.char1k) %>% createDataFrame %>% cache
|
||||
|
||||
# counting to materialize the cache
|
||||
data.df.type.double %>% nrow
|
||||
data.df.type.int %>% nrow
|
||||
data.df.type.logical %>% nrow
|
||||
data.df.type.char1 %>% nrow
|
||||
data.df.type.char10 %>% nrow
|
||||
data.df.type.char100 %>% nrow
|
||||
data.df.type.char1k %>% nrow
|
||||
|
||||
# 2. with different lengths (4):
|
||||
# ncol = 1,
|
||||
# Type = integer,
|
||||
# nrow = 10,
|
||||
# 100,
|
||||
# 1k,
|
||||
# 10k
|
||||
|
||||
data.df.len.10 <- data.frame(data.list.len.10) %>% createDataFrame %>% cache
|
||||
data.rdf.len.100 <- data.frame(data.list.len.100)
|
||||
data.df.len.100 <- data.rdf.len.100 %>% createDataFrame %>% cache
|
||||
data.df.len.1k <- data.frame(data.list.len.1k) %>% createDataFrame %>% cache
|
||||
data.df.len.10k <- data.frame(data.list.len.10k) %>% createDataFrame %>% cache
|
||||
|
||||
# counting to materialize the cache
|
||||
data.df.len.10 %>% nrow
|
||||
data.df.len.100 %>% nrow
|
||||
data.df.len.1k %>% nrow
|
||||
data.df.len.10k %>% nrow
|
||||
|
||||
# 3. with different ncols (3):
|
||||
# nrow = 100,
|
||||
# Type = integer,
|
||||
# ncol = 1,
|
||||
# 10,
|
||||
# 100
|
||||
|
||||
data.df.ncol.1 <- data.df.len.100
|
||||
data.df.ncol.10 <- data.frame(rep(data.rdf.len.100, each = 10)) %>% createDataFrame %>% cache
|
||||
data.df.ncol.100 <- data.frame(rep(data.rdf.len.100, each = 100)) %>% createDataFrame %>% cache
|
||||
|
||||
# counting to materialize the cache
|
||||
data.df.ncol.1 %>% nrow
|
||||
data.df.ncol.10 %>% nrow
|
||||
data.df.ncol.100 %>% nrow
|
||||
|
||||
# For benchmarking gapply+rnow/gapplyCollect, we generate
|
||||
|
||||
# data.frame
|
||||
|
||||
# 1. with different number of keys (3):
|
||||
# ncol = 2,
|
||||
# nrow = 1k,
|
||||
# Type = <integer, double>,
|
||||
# nkeys = 10,
|
||||
# 100,
|
||||
# 1000
|
||||
|
||||
data.rand.1k <- runif(1000)
|
||||
|
||||
data.df.nkey.10 <- data.frame(key = rep(1:10, 100), val = data.rand.1k) %>% createDataFrame %>% cache
|
||||
data.df.nkey.100 <- data.frame(key = rep(1:100, 10), val = data.rand.1k) %>% createDataFrame %>% cache
|
||||
data.df.nkey.1k <- data.frame(key = 1:1000, val = data.rand.1k) %>% createDataFrame %>% cache
|
||||
|
||||
# counting to materialize the cache
|
||||
data.df.nkey.10 %>% nrow
|
||||
data.df.nkey.100 %>% nrow
|
||||
data.df.nkey.1k %>% nrow
|
||||
|
||||
# 2. with different lengths (4):
|
||||
# ncol = 2,
|
||||
# Type = <integer, double>,
|
||||
# nkeys = 10,
|
||||
# nrow = 10,
|
||||
# 100,
|
||||
# 1k,
|
||||
# 10k
|
||||
|
||||
data.df.nrow.10 <- data.frame(key = 1:10, val = runif(10)) %>% createDataFrame %>% cache
|
||||
data.df.nrow.100 <- data.frame(key = rep(1:10, 10), val = runif(100)) %>% createDataFrame %>% cache
|
||||
data.df.nrow.1k <- data.frame(key = rep(1:10, 100), val = data.rand.1k) %>% createDataFrame %>% cache
|
||||
data.df.nrow.10k <- data.frame(key = rep(1:10, 1000), val = runif(10000)) %>% createDataFrame %>% cache
|
||||
|
||||
# counting to materialize the cache
|
||||
data.df.nrow.10 %>% nrow
|
||||
data.df.nrow.100 %>% nrow
|
||||
data.df.nrow.1k %>% nrow
|
||||
data.df.nrow.10k %>% nrow
|
||||
|
||||
# 3. with different key types (3):
|
||||
# ncol = 2,
|
||||
# nkeys = 10,
|
||||
# nrow = 1k,
|
||||
# Type = <integer, double>
|
||||
# <character10, double>
|
||||
# <character100, double>
|
||||
|
||||
key.char10 <- sapply(1:10, function(x) { sprintf("%010d", x) })
|
||||
key.char100 <- sapply(1:10, function(x) { sprintf("%0100d", x) })
|
||||
|
||||
data.df.keytype.int <- data.df.nrow.1k
|
||||
data.df.keytype.char10 <- data.frame(key = rep(key.char10, 100), val = data.rand.1k) %>% createDataFrame %>% cache
|
||||
data.df.keytype.char100 <- data.frame(key = rep(key.char100, 10), val = data.rand.1k) %>% createDataFrame %>% cache
|
||||
|
||||
# counting to materialize the cache
|
||||
data.df.keytype.int %>% nrow
|
||||
data.df.keytype.char10 %>% nrow
|
||||
data.df.keytype.char100 %>% nrow
|
||||
|
||||
# ========== benchmark functions ==============
|
||||
|
||||
# plot utils
|
||||
plot.box.mbm <- function(mbm_result) {
|
||||
ggplot(mbm_result, aes(x = expr, y = time/1000000)) +
|
||||
geom_boxplot(outlier.colour = "red", outlier.shape = 1) + labs(x = "", y = "Time (milliseconds)") +
|
||||
geom_jitter(position=position_jitter(0.2), alpha = 0.2) +
|
||||
coord_flip()
|
||||
}
|
||||
plot.box.throughput <- function(throughput_result) {
|
||||
ggplot(throughput_result, aes(x = expr, y = throughput)) +
|
||||
geom_boxplot(outlier.colour = "red", outlier.shape = 1) + labs(x = "", y = "Throughput (B/s)") +
|
||||
geom_jitter(position=position_jitter(0.2), alpha = 0.2) +
|
||||
coord_flip()
|
||||
}
|
||||
|
||||
# dummy function
|
||||
func.dummy <- function(x) { x }
|
||||
|
||||
# group function
|
||||
gfunc.mean <- function(key, sdf) {
|
||||
y <- data.frame(key, mean(sdf$val), stringsAsFactors = F)
|
||||
names(y) <- c("key", "val")
|
||||
y
|
||||
}
|
||||
|
||||
# lapply, type
|
||||
run.mbm.spark.lapply.type <- function(n) {
|
||||
microbenchmark(
|
||||
"lapply.int.100" = { spark.lapply(data.list.type.int, func.dummy) },
|
||||
"lapply.double.100" = { spark.lapply(data.list.type.double, func.dummy) },
|
||||
"lapply.logical.100" = { spark.lapply(data.list.type.logical, func.dummy) },
|
||||
"lapply.char1.100" = { spark.lapply(data.list.type.char1, func.dummy) },
|
||||
"lapply.char10.100" = { spark.lapply(data.list.type.char10, func.dummy) },
|
||||
"lapply.char100.100" = { spark.lapply(data.list.type.char100, func.dummy) },
|
||||
"lapply.char1k.100" = { spark.lapply(data.list.type.char1k, func.dummy) },
|
||||
times = n
|
||||
)
|
||||
}
|
||||
|
||||
# lapply, len
|
||||
run.mbm.spark.lapply.len <- function(mode, n) {
|
||||
if (mode == "small") {
|
||||
microbenchmark(
|
||||
"lapply.len.10" = { spark.lapply(data.list.len.10, func.dummy) },
|
||||
"lapply.len.100" = { spark.lapply(data.list.len.100, func.dummy) },
|
||||
times = n
|
||||
)
|
||||
} else if (mode == "medium") {
|
||||
microbenchmark(
|
||||
"lapply.len.10" = { spark.lapply(data.list.len.10, func.dummy) },
|
||||
"lapply.len.100" = { spark.lapply(data.list.len.100, func.dummy) },
|
||||
"lapply.len.1k" = { spark.lapply(data.list.len.1k, func.dummy) },
|
||||
times = n
|
||||
)
|
||||
} else {
|
||||
microbenchmark(
|
||||
"lapply.len.10" = { spark.lapply(data.list.len.10, func.dummy) },
|
||||
"lapply.len.100" = { spark.lapply(data.list.len.100, func.dummy) },
|
||||
"lapply.len.1k" = { spark.lapply(data.list.len.1k, func.dummy) },
|
||||
"lapply.len.10k" = { spark.lapply(data.list.len.10k, func.dummy) },
|
||||
times = n
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
# dapply, type
|
||||
run.mbm.dapply.type <- function(n) {
|
||||
microbenchmark(
|
||||
"dapply.int.100" = { dapply(data.df.type.int, func.dummy, schema(data.df.type.int)) %>% nrow },
|
||||
"dapply.double.100" = { dapply(data.df.type.double, func.dummy, schema(data.df.type.double)) %>% nrow },
|
||||
"dapply.logical.100" = { dapply(data.df.type.logical, func.dummy, schema(data.df.type.logical)) %>% nrow },
|
||||
"dapply.char1.100" = { dapply(data.df.type.char1, func.dummy, schema(data.df.type.char1)) %>% nrow },
|
||||
"dapply.char10.100" = { dapply(data.df.type.char10, func.dummy, schema(data.df.type.char10)) %>% nrow },
|
||||
"dapply.char100.100" = { dapply(data.df.type.char100, func.dummy, schema(data.df.type.char100)) %>% nrow },
|
||||
"dapply.char1k.100" = { dapply(data.df.type.char1k, func.dummy, schema(data.df.type.char1k)) %>% nrow },
|
||||
times = n
|
||||
)
|
||||
}
|
||||
|
||||
# dapply, len
|
||||
run.mbm.dapply.len <- function(mode, n) {
|
||||
if (mode == "small") {
|
||||
microbenchmark(
|
||||
"dapply.len.10" = { dapply(data.df.len.10, func.dummy, schema(data.df.len.10)) %>% nrow },
|
||||
"dapply.len.100" = { dapply(data.df.len.100, func.dummy, schema(data.df.len.100)) %>% nrow },
|
||||
times = n
|
||||
)
|
||||
} else if (mode == "medium") {
|
||||
microbenchmark(
|
||||
"dapply.len.10" = { dapply(data.df.len.10, func.dummy, schema(data.df.len.10)) %>% nrow },
|
||||
"dapply.len.100" = { dapply(data.df.len.100, func.dummy, schema(data.df.len.100)) %>% nrow },
|
||||
"dapply.len.1k" = { dapply(data.df.len.1k, func.dummy, schema(data.df.len.1k)) %>% nrow },
|
||||
times = n
|
||||
)
|
||||
} else {
|
||||
microbenchmark(
|
||||
"dapply.len.10" = { dapply(data.df.len.10, func.dummy, schema(data.df.len.10)) %>% nrow },
|
||||
"dapply.len.100" = { dapply(data.df.len.100, func.dummy, schema(data.df.len.100)) %>% nrow },
|
||||
"dapply.len.1k" = { dapply(data.df.len.1k, func.dummy, schema(data.df.len.1k)) %>% nrow },
|
||||
"dapply.len.10k" = { dapply(data.df.len.10k, func.dummy, schema(data.df.len.10k)) %>% nrow },
|
||||
times = n
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
# dapply, ncol
|
||||
run.mbm.dapply.ncol <- function(mode, n) {
|
||||
if (mode == "small") {
|
||||
microbenchmark(
|
||||
"dapply.ncol.1" = { dapply(data.df.ncol.1, func.dummy, schema(data.df.ncol.1)) %>% nrow },
|
||||
"dapply.ncol.10" = { dapply(data.df.ncol.10, func.dummy, schema(data.df.ncol.10)) %>% nrow },
|
||||
times = n
|
||||
)
|
||||
} else {
|
||||
microbenchmark(
|
||||
"dapply.ncol.1" = { dapply(data.df.ncol.1, func.dummy, schema(data.df.ncol.1)) %>% nrow },
|
||||
"dapply.ncol.10" = { dapply(data.df.ncol.10, func.dummy, schema(data.df.ncol.10)) %>% nrow },
|
||||
"dapply.ncol.100" = { dapply(data.df.ncol.100, func.dummy, schema(data.df.ncol.100)) %>% nrow },
|
||||
times = n
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
# dapplyCollect, type
|
||||
run.mbm.dapplyCollect.type <- function(n) {
|
||||
microbenchmark(
|
||||
"dapplyCollect.int.100" = { dapplyCollect(data.df.type.int, func.dummy) },
|
||||
"dapplyCollect.double.100" = { dapplyCollect(data.df.type.double, func.dummy) },
|
||||
"dapplyCollect.logical.100" = { dapplyCollect(data.df.type.logical, func.dummy) },
|
||||
"dapplyCollect.char1.100" = { dapplyCollect(data.df.type.char1, func.dummy) },
|
||||
"dapplyCollect.char10.100" = { dapplyCollect(data.df.type.char10, func.dummy) },
|
||||
"dapplyCollect.char100.100" = { dapplyCollect(data.df.type.char100, func.dummy) },
|
||||
"dapplyCollect.char1k.100" = { dapplyCollect(data.df.type.char1k, func.dummy) },
|
||||
times = n
|
||||
)
|
||||
}
|
||||
|
||||
# dapplyCollect, len
|
||||
run.mbm.dapplyCollect.len <- function(mode, n) {
|
||||
if (mode != "large") {
|
||||
microbenchmark(
|
||||
"dapplyCollect.len.10" = { dapplyCollect(data.df.len.10, func.dummy) },
|
||||
"dapplyCollect.len.100" = { dapplyCollect(data.df.len.100, func.dummy) },
|
||||
"dapplyCollect.len.1k" = { dapplyCollect(data.df.len.1k, func.dummy) },
|
||||
times = n
|
||||
)
|
||||
} else {
|
||||
microbenchmark(
|
||||
"dapplyCollect.len.10" = { dapplyCollect(data.df.len.10, func.dummy) },
|
||||
"dapplyCollect.len.100" = { dapplyCollect(data.df.len.100, func.dummy) },
|
||||
"dapplyCollect.len.1k" = { dapplyCollect(data.df.len.1k, func.dummy) },
|
||||
"dapplyCollect.len.10k" = { dapplyCollect(data.df.len.10k, func.dummy) },
|
||||
times = n
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
# dapplyCollect, ncol
|
||||
run.mbm.dapplyCollect.ncol <- function(mode, n) {
|
||||
if (mode == "small") {
|
||||
microbenchmark(
|
||||
"dapplyCollect.ncol.1" = { dapplyCollect(data.df.ncol.1, func.dummy) },
|
||||
"dapplyCollect.ncol.10" = { dapplyCollect(data.df.ncol.10, func.dummy) },
|
||||
times = n
|
||||
)
|
||||
} else {
|
||||
microbenchmark(
|
||||
"dapplyCollect.ncol.1" = { dapplyCollect(data.df.ncol.1, func.dummy) },
|
||||
"dapplyCollect.ncol.10" = { dapplyCollect(data.df.ncol.10, func.dummy) },
|
||||
"dapplyCollect.ncol.100" = { dapplyCollect(data.df.ncol.100, func.dummy) },
|
||||
times = n
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
# gapply, nkey
|
||||
run.mbm.gapply.nkey <- function(mode, n) {
|
||||
if (mode == "small") {
|
||||
microbenchmark(
|
||||
"gapply.nkey.10" = { gapply(data.df.nkey.10, data.df.nkey.10$key, gfunc.mean, schema(data.df.nkey.10)) %>% nrow },
|
||||
"gapply.nkey.100" = { gapply(data.df.nkey.100, data.df.nkey.100$key, gfunc.mean, schema(data.df.nkey.100)) %>% nrow },
|
||||
times = n
|
||||
)
|
||||
} else if (mode == "medium") {
|
||||
microbenchmark(
|
||||
"gapply.nkey.10" = { gapply(data.df.nkey.10, data.df.nkey.10$key, gfunc.mean, schema(data.df.nkey.10)) %>% nrow },
|
||||
"gapply.nkey.100" = { gapply(data.df.nkey.100, data.df.nkey.100$key, gfunc.mean, schema(data.df.nkey.100)) %>% nrow },
|
||||
"gapply.nkey.1k" = { gapply(data.df.nkey.1k, data.df.nkey.1k$key, gfunc.mean, schema(data.df.nkey.1k)) %>% nrow },
|
||||
times = n
|
||||
)
|
||||
} else {
|
||||
microbenchmark(
|
||||
"gapply.nkey.10" = { gapply(data.df.nkey.10, data.df.nkey.10$key, gfunc.mean, schema(data.df.nkey.10)) %>% nrow },
|
||||
"gapply.nkey.100" = { gapply(data.df.nkey.100, data.df.nkey.100$key, gfunc.mean, schema(data.df.nkey.100)) %>% nrow },
|
||||
"gapply.nkey.1k" = { gapply(data.df.nkey.1k, data.df.nkey.1k$key, gfunc.mean, schema(data.df.nkey.1k)) %>% nrow },
|
||||
times = n
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
# gapply, nrow
|
||||
run.mbm.gapply.nrow <- function(mode, n) {
|
||||
if (mode == "small") {
|
||||
microbenchmark(
|
||||
"gapply.nrow.10" = { gapply(data.df.nrow.10, data.df.nrow.10$key, gfunc.mean, schema(data.df.nrow.10)) %>% nrow },
|
||||
"gapply.nrow.100" = { gapply(data.df.nrow.100, data.df.nrow.100$key, gfunc.mean, schema(data.df.nrow.100)) %>% nrow },
|
||||
times = n
|
||||
)
|
||||
} else if (mode == "medium") {
|
||||
microbenchmark(
|
||||
"gapply.nrow.10" = { gapply(data.df.nrow.10, data.df.nrow.10$key, gfunc.mean, schema(data.df.nrow.10)) %>% nrow },
|
||||
"gapply.nrow.100" = { gapply(data.df.nrow.100, data.df.nrow.100$key, gfunc.mean, schema(data.df.nrow.100)) %>% nrow },
|
||||
"gapply.nrow.1k" = { gapply(data.df.nrow.1k, data.df.nrow.1k$key, gfunc.mean, schema(data.df.nrow.1k)) %>% nrow },
|
||||
times = n
|
||||
)
|
||||
} else {
|
||||
microbenchmark(
|
||||
"gapply.nrow.10" = { gapply(data.df.nrow.10, data.df.nrow.10$key, gfunc.mean, schema(data.df.nrow.10)) %>% nrow },
|
||||
"gapply.nrow.100" = { gapply(data.df.nrow.100, data.df.nrow.100$key, gfunc.mean, schema(data.df.nrow.100)) %>% nrow },
|
||||
"gapply.nrow.1k" = { gapply(data.df.nrow.1k, data.df.nrow.1k$key, gfunc.mean, schema(data.df.nrow.1k)) %>% nrow },
|
||||
"gapply.nrow.10k" = { gapply(data.df.nrow.10k, data.df.nrow.10k$key, gfunc.mean, schema(data.df.nrow.10k)) %>% nrow },
|
||||
times = n
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
# gapply, keytype
|
||||
run.mbm.gapply.keytype <- function(n) {
|
||||
microbenchmark(
|
||||
"gapply.keytype.int" = { gapply(data.df.keytype.int, data.df.keytype.int$key, gfunc.mean, schema(data.df.keytype.int)) %>% nrow },
|
||||
"gapply.keytype.char10" = { gapply(data.df.keytype.char10, data.df.keytype.char10$key, gfunc.mean, schema(data.df.keytype.char10)) %>% nrow },
|
||||
"gapply.keytype.char100" = { gapply(data.df.keytype.char100, data.df.keytype.char100$key, gfunc.mean, schema(data.df.keytype.char100)) %>% nrow },
|
||||
times = n
|
||||
)
|
||||
}
|
||||
|
||||
# gapplyCollect, nkey
|
||||
run.mbm.gapplyCollect.nkey <- function(mode, n) {
|
||||
if (mode == "small") {
|
||||
microbenchmark(
|
||||
"gapplyCollect.nkey.10" = { gapplyCollect(data.df.nkey.10, data.df.nkey.10$key, gfunc.mean) },
|
||||
"gapplyCollect.nkey.100" = { gapplyCollect(data.df.nkey.100, data.df.nkey.100$key, gfunc.mean) },
|
||||
times = n
|
||||
)
|
||||
} else if (mode == "medium") {
|
||||
microbenchmark(
|
||||
"gapplyCollect.nkey.10" = { gapplyCollect(data.df.nkey.10, data.df.nkey.10$key, gfunc.mean) },
|
||||
"gapplyCollect.nkey.100" = { gapplyCollect(data.df.nkey.100, data.df.nkey.100$key, gfunc.mean) },
|
||||
"gapplyCollect.nkey.1k" = { gapplyCollect(data.df.nkey.1k, data.df.nkey.1k$key, gfunc.mean) },
|
||||
times = n
|
||||
)
|
||||
} else {
|
||||
microbenchmark(
|
||||
"gapplyCollect.nkey.10" = { gapplyCollect(data.df.nkey.10, data.df.nkey.10$key, gfunc.mean) },
|
||||
"gapplyCollect.nkey.100" = { gapplyCollect(data.df.nkey.100, data.df.nkey.100$key, gfunc.mean) },
|
||||
"gapplyCollect.nkey.1k" = { gapplyCollect(data.df.nkey.1k, data.df.nkey.1k$key, gfunc.mean) },
|
||||
times = n
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
# gapplyCollect, nrow
|
||||
run.mbm.gapplyCollect.nrow <- function(mode, n) {
|
||||
if (mode == "small") {
|
||||
microbenchmark(
|
||||
"gapplyCollect.nrow.10" = { gapplyCollect(data.df.nrow.10, data.df.nrow.10$key, gfunc.mean) },
|
||||
"gapplyCollect.nrow.100" = { gapplyCollect(data.df.nrow.100, data.df.nrow.100$key, gfunc.mean) },
|
||||
times = n
|
||||
)
|
||||
} else if (mode == "medium") {
|
||||
microbenchmark(
|
||||
"gapplyCollect.nrow.10" = { gapplyCollect(data.df.nrow.10, data.df.nrow.10$key, gfunc.mean) },
|
||||
"gapplyCollect.nrow.100" = { gapplyCollect(data.df.nrow.100, data.df.nrow.100$key, gfunc.mean) },
|
||||
"gapplyCollect.nrow.1k" = { gapplyCollect(data.df.nrow.1k, data.df.nrow.1k$key, gfunc.mean) },
|
||||
times = n
|
||||
)
|
||||
} else {
|
||||
microbenchmark(
|
||||
"gapplyCollect.nrow.10" = { gapplyCollect(data.df.nrow.10, data.df.nrow.10$key, gfunc.mean) },
|
||||
"gapplyCollect.nrow.100" = { gapplyCollect(data.df.nrow.100, data.df.nrow.100$key, gfunc.mean) },
|
||||
"gapplyCollect.nrow.1k" = { gapplyCollect(data.df.nrow.1k, data.df.nrow.1k$key, gfunc.mean) },
|
||||
"gapplyCollect.nrow.10k" = { gapplyCollect(data.df.nrow.10k, data.df.nrow.10k$key, gfunc.mean) },
|
||||
times = n
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
# gapplyCollect, keytype
|
||||
run.mbm.gapplyCollect.keytype <- function(n) {
|
||||
microbenchmark(
|
||||
"gapplyCollect.keytype.int" = { gapplyCollect(data.df.keytype.int, data.df.keytype.int$key, gfunc.mean) },
|
||||
"gapplyCollect.keytype.char10" = { gapplyCollect(data.df.keytype.char10, data.df.keytype.char10$key, gfunc.mean) },
|
||||
"gapplyCollect.keytype.char100" = { gapplyCollect(data.df.keytype.char100, data.df.keytype.char100$key, gfunc.mean) },
|
||||
times = n
|
||||
)
|
||||
}
|
||||
|
||||
# ============== compute object sizes ==================
|
||||
ser <- function(data) {
|
||||
serialize(data, connection = NULL)
|
||||
}
|
||||
get.sizes <- function(obj_names, objs) {
|
||||
obj_sizes <- objs %>% lapply(ser) %>% lapply(object.size)
|
||||
data.frame(cbind(obj_names, obj_sizes))
|
||||
}
|
||||
|
||||
# lapply
|
||||
sizes.lapply.type <- get.sizes(
|
||||
list(
|
||||
"lapply.double.100",
|
||||
"lapply.int.100",
|
||||
"lapply.logical.100",
|
||||
"lapply.char1.100",
|
||||
"lapply.char10.100",
|
||||
"lapply.char100.100",
|
||||
"lapply.char1k.100"),
|
||||
list(
|
||||
data.list.type.double,
|
||||
data.list.type.int,
|
||||
data.list.type.logical,
|
||||
data.list.type.char1,
|
||||
data.list.type.char10,
|
||||
data.list.type.char100,
|
||||
data.list.type.char1k)
|
||||
)
|
||||
sizes.lapply.len <- get.sizes(
|
||||
list(
|
||||
"lapply.len.10",
|
||||
"lapply.len.100",
|
||||
"lapply.len.1k",
|
||||
"lapply.len.10k"),
|
||||
list(
|
||||
data.list.len.10,
|
||||
data.list.len.100,
|
||||
data.list.len.1k,
|
||||
data.list.len.10k)
|
||||
)
|
||||
|
||||
# data for dapply
|
||||
df.dapply.type <- lapply(list(
|
||||
data.list.type.double,
|
||||
data.list.type.int,
|
||||
data.list.type.logical,
|
||||
data.list.type.char1,
|
||||
data.list.type.char10,
|
||||
data.list.type.char100,
|
||||
data.list.type.char1k), data.frame)
|
||||
df.dapply.len <- lapply(list(
|
||||
data.list.len.10,
|
||||
data.list.len.100,
|
||||
data.list.len.1k,
|
||||
data.list.len.10k), data.frame)
|
||||
df.dapply.ncol <- lapply(list(
|
||||
data.list.len.100,
|
||||
rep(data.rdf.len.100, each = 10),
|
||||
rep(data.rdf.len.100, each = 100)), data.frame)
|
||||
|
||||
# dapply
|
||||
sizes.dapply.type <- get.sizes(
|
||||
list(
|
||||
"dapply.double.100",
|
||||
"dapply.int.100",
|
||||
"dapply.logical.100",
|
||||
"dapply.char1.100",
|
||||
"dapply.char10.100",
|
||||
"dapply.char100.100",
|
||||
"dapply.char1k.100"),
|
||||
df.dapply.type
|
||||
)
|
||||
sizes.dapply.len <- get.sizes(
|
||||
list(
|
||||
"dapply.len.10",
|
||||
"dapply.len.100",
|
||||
"dapply.len.1k",
|
||||
"dapply.len.10k"),
|
||||
df.dapply.len
|
||||
)
|
||||
sizes.dapply.ncol <- get.sizes(
|
||||
list(
|
||||
"dapply.ncol.1",
|
||||
"dapply.ncol.10",
|
||||
"dapply.ncol.100"),
|
||||
df.dapply.ncol
|
||||
)
|
||||
|
||||
# dapplyCollect
|
||||
sizes.dapplyCollect.type <- get.sizes(
|
||||
list(
|
||||
"dapplyCollect.double.100",
|
||||
"dapplyCollect.int.100",
|
||||
"dapplyCollect.logical.100",
|
||||
"dapplyCollect.char1.100",
|
||||
"dapplyCollect.char10.100",
|
||||
"dapplyCollect.char100.100",
|
||||
"dapplyCollect.char1k.100"),
|
||||
df.dapply.type
|
||||
)
|
||||
sizes.dapplyCollect.len <- get.sizes(
|
||||
list(
|
||||
"dapplyCollect.len.10",
|
||||
"dapplyCollect.len.100",
|
||||
"dapplyCollect.len.1k",
|
||||
"dapplyCollect.len.10k"),
|
||||
df.dapply.len
|
||||
)
|
||||
sizes.dapplyCollect.ncol <- get.sizes(
|
||||
list(
|
||||
"dapplyCollect.ncol.1",
|
||||
"dapplyCollect.ncol.10",
|
||||
"dapplyCollect.ncol.100"),
|
||||
df.dapply.ncol
|
||||
)
|
||||
|
||||
# data for gapply
|
||||
df.gapply.nkey <- list(
|
||||
data.frame(key = rep(1:10, 100), val = data.rand.1k),
|
||||
data.frame(key = rep(1:100, 10), val = data.rand.1k),
|
||||
data.frame(key = 1:1000, val = data.rand.1k))
|
||||
df.gapply.nrow <- list(
|
||||
data.frame(key = 1:10, val = runif(10)),
|
||||
data.frame(key = rep(1:10, 10), val = runif(100)),
|
||||
data.frame(key = rep(1:10, 100), val = data.rand.1k),
|
||||
data.frame(key = rep(1:10, 1000), val = runif(10000)))
|
||||
df.gapply.keytype <- list(
|
||||
data.frame(key = rep(1:10, 100), val = data.rand.1k),
|
||||
data.frame(key = rep(key.char10, 100), val = data.rand.1k),
|
||||
data.frame(key = rep(key.char100, 10), val = data.rand.1k))
|
||||
|
||||
# gapply
|
||||
sizes.gapply.nkey <- get.sizes(
|
||||
list(
|
||||
"gapply.nkey.10",
|
||||
"gapply.nkey.100",
|
||||
"gapply.nkey.1k"),
|
||||
df.gapply.nkey
|
||||
)
|
||||
sizes.gapply.nrow <- get.sizes(
|
||||
list(
|
||||
"gapply.nrow.10",
|
||||
"gapply.nrow.100",
|
||||
"gapply.nrow.1k",
|
||||
"gapply.nrow.10k"),
|
||||
df.gapply.nrow
|
||||
)
|
||||
sizes.gapply.keytype <- get.sizes(
|
||||
list(
|
||||
"gapply.keytype.int",
|
||||
"gapply.keytype.char10",
|
||||
"gapply.keytype.char100"),
|
||||
df.gapply.keytype
|
||||
)
|
||||
|
||||
# gapplyCollect
|
||||
sizes.gapplyCollect.nkey <- get.sizes(
|
||||
list(
|
||||
"gapplyCollect.nkey.10",
|
||||
"gapplyCollect.nkey.100",
|
||||
"gapplyCollect.nkey.1k"),
|
||||
df.gapply.nkey
|
||||
)
|
||||
sizes.gapplyCollect.nrow <- get.sizes(
|
||||
list(
|
||||
"gapplyCollect.nrow.10",
|
||||
"gapplyCollect.nrow.100",
|
||||
"gapplyCollect.nrow.1k",
|
||||
"gapplyCollect.nrow.10k"),
|
||||
df.gapply.nrow
|
||||
)
|
||||
sizes.gapplyCollect.keytype <- get.sizes(
|
||||
list(
|
||||
"gapplyCollect.keytype.int",
|
||||
"gapplyCollect.keytype.char10",
|
||||
"gapplyCollect.keytype.char100"),
|
||||
df.gapply.keytype
|
||||
)
|
||||
|
||||
df.sizes <- rbind(
|
||||
sizes.lapply.type,
|
||||
sizes.lapply.len,
|
||||
sizes.dapply.type,
|
||||
sizes.dapply.len,
|
||||
sizes.dapply.ncol,
|
||||
sizes.dapplyCollect.type,
|
||||
sizes.dapplyCollect.len,
|
||||
sizes.dapplyCollect.ncol,
|
||||
sizes.gapply.nkey,
|
||||
sizes.gapply.nrow,
|
||||
sizes.gapply.keytype,
|
||||
sizes.gapplyCollect.nkey,
|
||||
sizes.gapplyCollect.nrow,
|
||||
sizes.gapplyCollect.keytype)
|
||||
df.sizes$obj_names <- unlist(df.sizes$obj_names)
|
||||
df.sizes$obj_sizes <- unlist(df.sizes$obj_sizes)
|
||||
@ -1,165 +0,0 @@
|
||||
# ========== generating results =================
|
||||
|
||||
source("define_benchmark.r")
|
||||
|
||||
args = commandArgs(trailingOnly=TRUE)
|
||||
|
||||
if (args[1] == "small") {
|
||||
mode = "small"
|
||||
n1 = 10
|
||||
n2 = 10
|
||||
} else if (args[1] == "medium") {
|
||||
mode = "medium"
|
||||
n1 = 10
|
||||
n2 = 20
|
||||
} else {
|
||||
mode = "large"
|
||||
n1 = 20
|
||||
n2 = 20
|
||||
}
|
||||
N = 20
|
||||
|
||||
# lapply, type
|
||||
mbm.spark.lapply.type <- run.mbm.spark.lapply.type(n2)
|
||||
p <- mbm.spark.lapply.type %>% plot.box.mbm
|
||||
filename <- sprintf("%slapply.type.%s.png", dir_path, mode)
|
||||
ggsave(filename, width=7, height=4)
|
||||
|
||||
|
||||
# lapply, len
|
||||
mbm.spark.lapply.len <- run.mbm.spark.lapply.len(mode, n1)
|
||||
p <- mbm.spark.lapply.len %>% plot.box.mbm
|
||||
filename <- sprintf("%slapply.len.%s.png", dir_path, mode)
|
||||
ggsave(filename, width=7, height=4)
|
||||
|
||||
|
||||
# dapply, type
|
||||
mbm.dapply.type <- run.mbm.dapply.type(n2)
|
||||
p <- mbm.dapply.type %>% plot.box.mbm
|
||||
filename <- sprintf("%sdapply.type.%s.png", dir_path, mode)
|
||||
ggsave(filename, width=7, height=4)
|
||||
|
||||
|
||||
# dapply, len
|
||||
mbm.dapply.len <- run.mbm.dapply.len(mode, n1)
|
||||
p <- mbm.dapply.len %>% plot.box.mbm
|
||||
filename <- sprintf("%sdapply.len.%s.png", dir_path, mode)
|
||||
ggsave(filename, width=7, height=4)
|
||||
|
||||
|
||||
# dapply, ncol
|
||||
mbm.dapply.ncol <- run.mbm.dapply.ncol(mode, n1)
|
||||
p <- mbm.dapply.ncol %>% plot.box.mbm
|
||||
filename <- sprintf("%sdapply.ncol.%s.png", dir_path, mode)
|
||||
ggsave(filename, width=7, height=4)
|
||||
|
||||
|
||||
# dapplyCollect, type
|
||||
mbm.dapplyCollect.type <- run.mbm.dapplyCollect.type(N)
|
||||
p <- mbm.dapplyCollect.type %>% plot.box.mbm
|
||||
filename <- sprintf("%sdapplyCollect.type.%s.png", dir_path, mode)
|
||||
ggsave(filename, width=7, height=4)
|
||||
|
||||
|
||||
# dapplyCollect, len
|
||||
mbm.dapplyCollect.len <- run.mbm.dapplyCollect.len(mode, N)
|
||||
p <- mbm.dapplyCollect.len %>% plot.box.mbm
|
||||
filename <- sprintf("%sdapplyCollect.len.%s.png", dir_path, mode)
|
||||
ggsave(filename, width=7, height=4)
|
||||
|
||||
|
||||
# dapplyCollect, ncol
|
||||
mbm.dapplyCollect.ncol <- run.mbm.dapplyCollect.ncol(mode, n1)
|
||||
p <- mbm.dapplyCollect.ncol %>% plot.box.mbm
|
||||
filename <- sprintf("%sdapplyCollect.ncol.%s.png", dir_path, mode)
|
||||
ggsave(filename, width=7, height=4)
|
||||
|
||||
|
||||
# gapply, nkey
|
||||
mbm.gapply.nkey <- run.mbm.gapply.nkey(mode, n1)
|
||||
p <- mbm.gapply.nkey %>% plot.box.mbm
|
||||
filename <- sprintf("%sgapply.nkey.%s.png", dir_path, mode)
|
||||
ggsave(filename, width=7, height=4)
|
||||
|
||||
|
||||
# gapply, nrow
|
||||
mbm.gapply.nrow <- run.mbm.gapply.nrow(mode, n1)
|
||||
p <- mbm.gapply.nrow %>% plot.box.mbm
|
||||
filename <- sprintf("%sgapply.nrow.%s.png", dir_path, mode)
|
||||
ggsave(filename, width=7, height=4)
|
||||
|
||||
|
||||
# gapply, keytype
|
||||
mbm.gapply.keytype <- run.mbm.gapply.keytype(n1)
|
||||
p <- mbm.gapply.keytype %>% plot.box.mbm
|
||||
filename <- sprintf("%sgapply.keytype.%s.png", dir_path, mode)
|
||||
ggsave(filename, width=7, height=4)
|
||||
|
||||
|
||||
# gapplyCollect, nkey
|
||||
mbm.gapplyCollect.nkey <- run.mbm.gapplyCollect.nkey(mode, n1)
|
||||
p <- mbm.gapplyCollect.nkey %>% plot.box.mbm
|
||||
filename <- sprintf("%sgapplyCollect.nkey.%s.png", dir_path, mode)
|
||||
ggsave(filename, width=7, height=4)
|
||||
|
||||
|
||||
# gapplyCollect, nrow
|
||||
mbm.gapplyCollect.nrow <- run.mbm.gapplyCollect.nrow(mode, n1)
|
||||
p <- mbm.gapplyCollect.nrow %>% plot.box.mbm
|
||||
filename <- sprintf("%sgapplyCollect.nrow.%s.png", dir_path, mode)
|
||||
ggsave(filename, width=7, height=4)
|
||||
|
||||
|
||||
# gapplyCollect, keytype
|
||||
mbm.gapplyCollect.keytype <- run.mbm.gapplyCollect.keytype(n1)
|
||||
p <- mbm.gapplyCollect.keytype %>% plot.box.mbm
|
||||
filename <- sprintf("%sgapplyCollect.keytype.%s.png", dir_path, mode)
|
||||
ggsave(filename, width=7, height=4)
|
||||
|
||||
|
||||
tmp <- rbind(
|
||||
mbm.spark.lapply.type,
|
||||
mbm.spark.lapply.len,
|
||||
mbm.dapply.type,
|
||||
mbm.dapply.len,
|
||||
mbm.dapply.ncol,
|
||||
mbm.dapplyCollect.type,
|
||||
mbm.dapplyCollect.len,
|
||||
mbm.dapplyCollect.ncol,
|
||||
mbm.gapply.nkey,
|
||||
mbm.gapply.nrow,
|
||||
mbm.gapply.keytype,
|
||||
mbm.gapplyCollect.nkey,
|
||||
mbm.gapplyCollect.nrow,
|
||||
mbm.gapplyCollect.keytype)
|
||||
|
||||
# compute throughput
|
||||
tmp_size <- merge(tmp, df.sizes, by.x = "expr", by.y = "obj_names", all.x=TRUE)
|
||||
tmp_size$throughput <- round(tmp_size$obj_sizes*1000000/tmp_size$time, digits=2) # bytes per second
|
||||
|
||||
# plot throughput
|
||||
p <- tmp_size %>% plot.box.throughput
|
||||
filename <- sprintf("%sall.throughput.%s.png", dir_path, mode)
|
||||
ggsave(filename, width=7, height=6)
|
||||
|
||||
# save raw data to csv file
|
||||
towrite <- tmp_size[order(tmp_size$expr, tmp_size$time),]
|
||||
write.csv(towrite, file="results/results.csv", row.names = F)
|
||||
|
||||
# save mean value in ml.perf_metrics format
|
||||
# timestamp: timestamp, benchmarkId: string, benchmarkName: string,
|
||||
# metricName: string, metricValue: string, isLargerBetter: boolean, parameters map<string, string>
|
||||
op <- options(digits.secs = 3)
|
||||
curTimestamp <- Sys.time()
|
||||
benchmarkName <- "com.databricks.spark.sql.perf.sparkr.UserDefinedFunction"
|
||||
metricName <- "throughput.byte.per.second"
|
||||
isLargerBetter <- TRUE
|
||||
|
||||
perf_metric <- aggregate(towrite$throughput, list(towrite$expr), mean)
|
||||
names(perf_metric) <- c("benchmarkId", "throughput")
|
||||
perf_metric$timestamp <- curTimestamp
|
||||
perf_metric$benchmarkName <- benchmarkName
|
||||
perf_metric$metricName <- metricName
|
||||
perf_metric$isLargerBetter <- isLargerBetter
|
||||
perf_metric$parameters <- NULL
|
||||
write.csv(perf_metric, file="results/perf_metrics.csv", row.names = F)
|
||||
@ -1,29 +0,0 @@
|
||||
#!/bin/bash
|
||||
# Usage: ./run_benchmark.sh /Users/liang/spark small
|
||||
if [[ ($# -ne 1 && $# -ne 2) ]]; then
|
||||
echo "Usage: $0 <your SPARK_HOME> [small/medium/large] (default: small)" >&2
|
||||
exit 1
|
||||
fi
|
||||
if ! [ -e "$1" ]; then
|
||||
echo "$1 not found" >&2
|
||||
exit 1
|
||||
fi
|
||||
if ! [ -d "$1" ]; then
|
||||
echo "$1 not a directory" >&2
|
||||
exit 1
|
||||
fi
|
||||
if [[ ( $# -eq 2 && ($2 -ne "small" || $2 -ne "medium" || $2 -ne "large" ) ) ]]; then
|
||||
echo "The second argument should be 'small' or 'medium' or 'large'" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
mkdir -p results
|
||||
|
||||
export SPARK_HOME=$1
|
||||
export R_LIBS_USER=$SPARK_HOME/R/lib
|
||||
|
||||
if [[ $# -eq 1 ]]; then
|
||||
Rscript run_benchmark.r small
|
||||
else
|
||||
Rscript run_benchmark.r $2
|
||||
fi
|
||||
@ -1,277 +0,0 @@
|
||||
// Databricks notebook source
|
||||
// Multi TPC- H and DS generator and database importer using spark-sql-perf, typically to generate parquet files in S3/blobstore objects
|
||||
val benchmarks = Seq("TPCDS", "TPCH") // Options: TCPDS", "TPCH"
|
||||
val scaleFactors = Seq("1", "10", "100", "1000", "10000") // "1", "10", "100", "1000", "10000" list of scale factors to generate and import
|
||||
|
||||
val baseLocation = s"s3a://mybucket" // S3 bucket, blob, or local root path
|
||||
val baseDatagenFolder = "/tmp" // usually /tmp if enough space is available for datagen files
|
||||
|
||||
// Output file formats
|
||||
val fileFormat = "parquet" // only parquet was tested
|
||||
val shuffle = true // If true, partitions will be coalesced into a single file during generation up to spark.sql.files.maxRecordsPerFile (if set)
|
||||
val overwrite = false //if to delete existing files (doesn't check if results are complete on no-overwrite)
|
||||
|
||||
// Generate stats for CBO
|
||||
val createTableStats = true
|
||||
val createColumnStats = true
|
||||
|
||||
val workers: Int = spark.conf.get("spark.databricks.clusterUsageTags.clusterTargetWorkers").toInt //number of nodes, assumes one executor per node
|
||||
val cores: Int = Runtime.getRuntime.availableProcessors.toInt //number of CPU-cores
|
||||
|
||||
val dbSuffix = "" // set only if creating multiple DBs or source file folders with different settings, use a leading _
|
||||
val TPCDSUseLegacyOptions = false // set to generate file/DB naming and table options compatible with older results
|
||||
|
||||
// COMMAND ----------
|
||||
|
||||
// Imports, fail fast if we are missing any library
|
||||
|
||||
// For datagens
|
||||
import java.io._
|
||||
import scala.sys.process._
|
||||
|
||||
// spark-sql-perf
|
||||
import com.databricks.spark.sql.perf._
|
||||
import com.databricks.spark.sql.perf.tpch._
|
||||
import com.databricks.spark.sql.perf.tpcds._
|
||||
|
||||
// Spark/Hadoop config
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
|
||||
// COMMAND ----------
|
||||
|
||||
// Set Spark config to produce same and comparable source files across systems
|
||||
// do not change unless you want to derive from default source file composition, in that case also set a DB suffix
|
||||
spark.sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
|
||||
|
||||
// Prevent very large files. 20 million records creates between 500 and 1500MB files in TPCH
|
||||
spark.sqlContext.setConf("spark.sql.files.maxRecordsPerFile", "20000000") // This also regulates the file coalesce
|
||||
|
||||
// COMMAND ----------
|
||||
|
||||
// Checks that we have the correct number of worker nodes to start the data generation
|
||||
// Make sure you have set the workers variable correctly, as the datagens binaries need to be present in all nodes
|
||||
val targetWorkers: Int = workers
|
||||
def numWorkers: Int = sc.getExecutorMemoryStatus.size - 1
|
||||
def waitForWorkers(requiredWorkers: Int, tries: Int) : Unit = {
|
||||
for (i <- 0 to (tries-1)) {
|
||||
if (numWorkers == requiredWorkers) {
|
||||
println(s"Waited ${i}s. for $numWorkers workers to be ready")
|
||||
return
|
||||
}
|
||||
if (i % 60 == 0) println(s"Waiting ${i}s. for workers to be ready, got only $numWorkers workers")
|
||||
Thread sleep 1000
|
||||
}
|
||||
throw new Exception(s"Timed out waiting for workers to be ready after ${tries}s.")
|
||||
}
|
||||
waitForWorkers(targetWorkers, 3600) //wait up to an hour
|
||||
|
||||
// COMMAND ----------
|
||||
|
||||
// Time command helper
|
||||
def time[R](block: => R): R = {
|
||||
val t0 = System.currentTimeMillis() //nanoTime()
|
||||
val result = block // call-by-name
|
||||
val t1 = System.currentTimeMillis() //nanoTime()
|
||||
println("Elapsed time: " + (t1 - t0) + "ms")
|
||||
result
|
||||
}
|
||||
|
||||
// COMMAND ----------
|
||||
|
||||
// FOR INSTALLING TPCH DBGEN (with the stdout patch)
|
||||
def installDBGEN(url: String = "https://github.com/databricks/tpch-dbgen.git", useStdout: Boolean = true, baseFolder: String = "/tmp")(i: java.lang.Long): String = {
|
||||
// check if we want the revision which makes dbgen output to stdout
|
||||
val checkoutRevision: String = if (useStdout) "git checkout 0469309147b42abac8857fa61b4cf69a6d3128a8 -- bm_utils.c" else ""
|
||||
Seq("mkdir", "-p", baseFolder).!
|
||||
val pw = new PrintWriter(new File(s"${baseFolder}/dbgen_$i.sh" ))
|
||||
pw.write(s"""
|
||||
rm -rf ${baseFolder}/dbgen
|
||||
rm -rf ${baseFolder}/dbgen_install_$i
|
||||
mkdir ${baseFolder}/dbgen_install_$i
|
||||
cd ${baseFolder}/dbgen_install_$i
|
||||
git clone '$url'
|
||||
cd tpch-dbgen
|
||||
$checkoutRevision
|
||||
make
|
||||
ln -sf ${baseFolder}/dbgen_install_$i/tpch-dbgen ${baseFolder}/dbgen || echo "ln -sf failed"
|
||||
test -e ${baseFolder}/dbgen/dbgen
|
||||
echo "OK"
|
||||
""")
|
||||
pw.close
|
||||
Seq("chmod", "+x", s"${baseFolder}/dbgen_$i.sh").!
|
||||
Seq(s"${baseFolder}/dbgen_$i.sh").!!
|
||||
}
|
||||
|
||||
// COMMAND ----------
|
||||
|
||||
// FOR INSTALLING TPCDS DSDGEN (with the stdout patch)
|
||||
// Note: it assumes Debian/Ubuntu host, edit package manager if not
|
||||
def installDSDGEN(url: String = "https://github.com/databricks/tpcds-kit.git", useStdout: Boolean = true, baseFolder: String = "/tmp")(i: java.lang.Long): String = {
|
||||
Seq("mkdir", "-p", baseFolder).!
|
||||
val pw = new PrintWriter(new File(s"${baseFolder}/dsdgen_$i.sh" ))
|
||||
pw.write(s"""
|
||||
sudo apt-get update
|
||||
sudo apt-get -y --force-yes install gcc make flex bison byacc git
|
||||
rm -rf ${baseFolder}/dsdgen
|
||||
rm -rf ${baseFolder}/dsdgen_install_$i
|
||||
mkdir ${baseFolder}/dsdgen_install_$i
|
||||
cd ${baseFolder}/dsdgen_install_$i
|
||||
git clone '$url'
|
||||
cd tpcds-kit/tools
|
||||
make -f Makefile.suite
|
||||
ln -sf ${baseFolder}/dsdgen_install_$i/tpcds-kit/tools ${baseFolder}/dsdgen || echo "ln -sf failed"
|
||||
${baseFolder}/dsdgen/dsdgen -h
|
||||
test -e ${baseFolder}/dsdgen/dsdgen
|
||||
echo "OK"
|
||||
""")
|
||||
pw.close
|
||||
Seq("chmod", "+x", s"${baseFolder}/dsdgen_$i.sh").!
|
||||
Seq(s"${baseFolder}/dsdgen_$i.sh").!!
|
||||
}
|
||||
|
||||
// COMMAND ----------
|
||||
|
||||
// install (build) the data generators in all nodes
|
||||
val res = spark.range(0, workers, 1, workers).map(worker => benchmarks.map{
|
||||
case "TPCDS" => s"TPCDS worker $worker\n" + installDSDGEN(baseFolder = baseDatagenFolder)(worker)
|
||||
case "TPCH" => s"TPCH worker $worker\n" + installDBGEN(baseFolder = baseDatagenFolder)(worker)
|
||||
}).collect()
|
||||
|
||||
// COMMAND ----------
|
||||
|
||||
// Set the benchmark name, tables, and location for each benchmark
|
||||
// returns (dbname, tables, location)
|
||||
def getBenchmarkData(benchmark: String, scaleFactor: String) = benchmark match {
|
||||
|
||||
case "TPCH" => (
|
||||
s"tpch_sf${scaleFactor}_${fileFormat}${dbSuffix}",
|
||||
new TPCHTables(spark.sqlContext, dbgenDir = s"${baseDatagenFolder}/dbgen", scaleFactor = scaleFactor, useDoubleForDecimal = false, useStringForDate = false, generatorParams = Nil),
|
||||
s"$baseLocation/tpch/sf${scaleFactor}_${fileFormat}")
|
||||
|
||||
case "TPCDS" if !TPCDSUseLegacyOptions => (
|
||||
s"tpcds_sf${scaleFactor}_${fileFormat}${dbSuffix}",
|
||||
new TPCDSTables(spark.sqlContext, dsdgenDir = s"${baseDatagenFolder}/dsdgen", scaleFactor = scaleFactor, useDoubleForDecimal = false, useStringForDate = false),
|
||||
s"$baseLocation/tpcds-2.4/sf${scaleFactor}_${fileFormat}")
|
||||
|
||||
case "TPCDS" if TPCDSUseLegacyOptions => (
|
||||
s"tpcds_sf${scaleFactor}_nodecimal_nodate_withnulls${dbSuffix}",
|
||||
new TPCDSTables(spark.sqlContext, s"${baseDatagenFolder}/dsdgen", scaleFactor = scaleFactor, useDoubleForDecimal = true, useStringForDate = true),
|
||||
s"$baseLocation/tpcds/sf$scaleFactor-$fileFormat/useDecimal=false,useDate=false,filterNull=false")
|
||||
}
|
||||
|
||||
// COMMAND ----------
|
||||
|
||||
// Data generation
|
||||
def isPartitioned (tables: Tables, tableName: String) : Boolean =
|
||||
util.Try(tables.tables.find(_.name == tableName).get.partitionColumns.nonEmpty).getOrElse(false)
|
||||
|
||||
def loadData(tables: Tables, location: String, scaleFactor: String) = {
|
||||
val tableNames = tables.tables.map(_.name)
|
||||
tableNames.foreach { tableName =>
|
||||
// generate data
|
||||
time {
|
||||
tables.genData(
|
||||
location = location,
|
||||
format = fileFormat,
|
||||
overwrite = overwrite,
|
||||
partitionTables = true,
|
||||
// if to coallesce into a single file (only one writter for non partitioned tables = slow)
|
||||
clusterByPartitionColumns = shuffle, //if (isPartitioned(tables, tableName)) false else true,
|
||||
filterOutNullPartitionValues = false,
|
||||
tableFilter = tableName,
|
||||
// this controlls parallelism on datagen and number of writers (# of files for non-partitioned)
|
||||
// in general we want many writers to S3, and smaller tasks for large scale factors to avoid OOM and shuffle errors
|
||||
numPartitions = if (scaleFactor.toInt <= 100 || !isPartitioned(tables, tableName)) (workers * cores) else (workers * cores * 4))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// COMMAND ----------
|
||||
|
||||
// Create the DB, import data, create
|
||||
def createExternal(location: String, dbname: String, tables: Tables) = {
|
||||
tables.createExternalTables(location, fileFormat, dbname, overwrite = overwrite, discoverPartitions = true)
|
||||
}
|
||||
|
||||
def loadDB(dbname: String, tables: Tables, location: String) = {
|
||||
val tableNames = tables.tables.map(_.name)
|
||||
time {
|
||||
println(s"Creating external tables at $location")
|
||||
createExternal(location, dbname, tables)
|
||||
}
|
||||
// Show table information and attempt to vacuum
|
||||
tableNames.foreach { tableName =>
|
||||
println(s"Table $tableName has " + util.Try(sql(s"SHOW PARTITIONS $tableName").count() + " partitions").getOrElse(s"no partitions"))
|
||||
util.Try(sql(s"VACUUM $tableName RETAIN 0.0. HOURS"))getOrElse(println(s"Cannot VACUUM $tableName"))
|
||||
sql(s"DESCRIBE EXTENDED $tableName").show(999, false)
|
||||
println
|
||||
}
|
||||
}
|
||||
|
||||
// COMMAND ----------
|
||||
|
||||
def setScaleConfig(scaleFactor: String): Unit = {
|
||||
// Avoid OOM when shuffling large scale fators
|
||||
// and errors like 2GB shuffle limit at 10TB like: Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Too large frame: 9640891355
|
||||
// For 10TB 16x4core nodes were needed with the config below, 8x for 1TB and below.
|
||||
// About 24hrs. for SF 1 to 10,000.
|
||||
if (scaleFactor.toInt >= 10000) {
|
||||
spark.conf.set("spark.sql.shuffle.partitions", "20000")
|
||||
SparkHadoopUtil.get.conf.set("parquet.memory.pool.ratio", "0.1")
|
||||
}
|
||||
else if (scaleFactor.toInt >= 1000) {
|
||||
spark.conf.set("spark.sql.shuffle.partitions", "2001") //one above 2000 to use HighlyCompressedMapStatus
|
||||
SparkHadoopUtil.get.conf.set("parquet.memory.pool.ratio", "0.3")
|
||||
}
|
||||
else {
|
||||
spark.conf.set("spark.sql.shuffle.partitions", "200") //default
|
||||
SparkHadoopUtil.get.conf.set("parquet.memory.pool.ratio", "0.5")
|
||||
}
|
||||
}
|
||||
|
||||
// COMMAND ----------
|
||||
|
||||
// Generate the data, import the tables, generate stats for selected benchmarks and scale factors
|
||||
scaleFactors.foreach { scaleFactor => {
|
||||
|
||||
// First set some config settings affecting OOMs/performance
|
||||
setScaleConfig(scaleFactor)
|
||||
|
||||
benchmarks.foreach{ benchmark => {
|
||||
val (dbname, tables, location) = getBenchmarkData(benchmark, scaleFactor)
|
||||
// Start the actual loading
|
||||
time {
|
||||
println(s"Generating data for $benchmark SF $scaleFactor at $location")
|
||||
loadData(tables = tables, location = location, scaleFactor = scaleFactor)
|
||||
}
|
||||
time {
|
||||
println(s"\nImporting data for $benchmark into DB $dbname from $location")
|
||||
loadDB(dbname = dbname, tables = tables, location = location)
|
||||
}
|
||||
if (createTableStats) time {
|
||||
println(s"\nGenerating table statistics for DB $dbname (with analyzeColumns=$createColumnStats)")
|
||||
tables.analyzeTables(dbname, analyzeColumns = createColumnStats)
|
||||
}
|
||||
}}
|
||||
}}
|
||||
|
||||
|
||||
// COMMAND ----------
|
||||
|
||||
// Print table structure for manual validation
|
||||
scaleFactors.foreach { scaleFactor =>
|
||||
benchmarks.foreach{ benchmark => {
|
||||
val (dbname, tables, location) = getBenchmarkData(benchmark, scaleFactor)
|
||||
sql(s"use $dbname")
|
||||
time {
|
||||
sql(s"show tables").select("tableName").collect().foreach{ tableName =>
|
||||
val name: String = tableName.toString().drop(1).dropRight(1)
|
||||
println(s"Printing table information for $benchmark SF $scaleFactor table $name")
|
||||
val count = sql(s"select count(*) as ${name}_count from $name").collect()(0)(0)
|
||||
println(s"Table $name has " + util.Try(sql(s"SHOW PARTITIONS $name").count() + " partitions").getOrElse(s"no partitions") + s" and $count rows.")
|
||||
sql(s"describe extended $name").show(999, false)
|
||||
}
|
||||
}
|
||||
println
|
||||
}}
|
||||
}
|
||||
@ -1,79 +0,0 @@
|
||||
// Databricks notebook source
|
||||
// TPCH runner (from spark-sql-perf) to be used on existing tables
|
||||
// edit the main configuration below
|
||||
|
||||
val scaleFactors = Seq(1, 10, 100, 1000) //set scale factors to run
|
||||
val format = "parquet" //format has have already been generated
|
||||
|
||||
def perfDatasetsLocation(scaleFactor: Int, format: String) =
|
||||
s"s3a://my-bucket/tpch/sf${scaleFactor}_${format}"
|
||||
|
||||
val resultLocation = "s3a://my-bucket/results"
|
||||
val iterations = 2
|
||||
def databaseName(scaleFactor: Int, format: String) = s"tpch_sf${scaleFactor}_${format}"
|
||||
val randomizeQueries = false //to use on concurrency tests
|
||||
|
||||
// Experiment metadata for results, edit if outside Databricks
|
||||
val configuration = "default" //use default when using the out-of-box config
|
||||
val runtype = "TPCH run" // Edit
|
||||
val workers = 10 // Edit to the number of worker
|
||||
val workerInstanceType = "my_VM_instance" // Edit to the instance type
|
||||
|
||||
// Make sure spark-sql-perf library is available (use the assembly version)
|
||||
import com.databricks.spark.sql.perf.tpch._
|
||||
import org.apache.spark.sql.functions._
|
||||
|
||||
// default config (for all notebooks)
|
||||
var config : Map[String, String] = Map (
|
||||
"spark.sql.broadcastTimeout" -> "7200" // Enable for SF 10,000
|
||||
)
|
||||
// Set the spark config
|
||||
for ((k, v) <- config) spark.conf.set(k, v)
|
||||
// Print the custom configs first
|
||||
for ((k,v) <- config) println(k, spark.conf.get(k))
|
||||
// Print all for easy debugging
|
||||
print(spark.conf.getAll)
|
||||
|
||||
val tpch = new TPCH(sqlContext = spark.sqlContext)
|
||||
|
||||
// filter queries (if selected)
|
||||
import com.databricks.spark.sql.perf.Query
|
||||
import com.databricks.spark.sql.perf.ExecutionMode.CollectResults
|
||||
import org.apache.commons.io.IOUtils
|
||||
|
||||
val queries = (1 to 22).map { q =>
|
||||
val queryContent: String = IOUtils.toString(
|
||||
getClass().getClassLoader().getResourceAsStream(s"tpch/queries/$q.sql"))
|
||||
new Query(s"Q$q", spark.sqlContext.sql(queryContent), description = s"TPCH Query $q",
|
||||
executionMode = CollectResults)
|
||||
}
|
||||
|
||||
// COMMAND ----------
|
||||
|
||||
scaleFactors.foreach{ scaleFactor =>
|
||||
println("DB SF " + databaseName(scaleFactor, format))
|
||||
sql(s"USE ${databaseName(scaleFactor, format)}")
|
||||
val experiment = tpch.runExperiment(
|
||||
queries,
|
||||
iterations = iterations,
|
||||
resultLocation = resultLocation,
|
||||
tags = Map(
|
||||
"runtype" -> runtype,
|
||||
"date" -> java.time.LocalDate.now.toString,
|
||||
"database" -> databaseName(scaleFactor, format),
|
||||
"scale_factor" -> scaleFactor.toString,
|
||||
"spark_version" -> spark.version,
|
||||
"system" -> "Spark",
|
||||
"workers" -> workers,
|
||||
"workerInstanceType" -> workerInstanceType,
|
||||
"configuration" -> configuration
|
||||
)
|
||||
)
|
||||
println(s"Running SF $scaleFactor")
|
||||
experiment.waitForFinish(36 * 60 * 60) //36hours
|
||||
val summary = experiment.getCurrentResults
|
||||
.withColumn("Name", substring(col("name"), 2, 100))
|
||||
.withColumn("Runtime", (col("parsingTime") + col("analysisTime") + col("optimizationTime") + col("planningTime") + col("executionTime")) / 1000.0)
|
||||
.select('Name, 'Runtime)
|
||||
summary.show(9999, false)
|
||||
}
|
||||
@ -1,5 +1,5 @@
|
||||
output: /databricks/spark/sql/mllib-perf-ci
|
||||
timeoutSeconds: 43200 # This limit is for all benchmarks and should be bumped as more are added.
|
||||
timeoutSeconds: 1000 # This limit is for all benchmarks and should be bumped as more are added.
|
||||
common:
|
||||
numExamples: 1000000
|
||||
numTestExamples: 1000000
|
||||
@ -9,58 +9,53 @@ common:
|
||||
benchmarks:
|
||||
- name: classification.DecisionTreeClassification
|
||||
params:
|
||||
depth: [5, 12]
|
||||
depth: [5, 10]
|
||||
numClasses: 4
|
||||
- name: classification.GBTClassification
|
||||
params:
|
||||
numFeatures: 1500
|
||||
depth: 4
|
||||
numFeatures: 3000
|
||||
depth: 5
|
||||
numClasses: 4
|
||||
maxIter: 8
|
||||
maxIter: 10
|
||||
- name: classification.RandomForestClassification
|
||||
params:
|
||||
depth: 10
|
||||
numFeatures: 1000
|
||||
numClasses: 4
|
||||
maxIter: 100
|
||||
maxIter: 200 # number of trees
|
||||
- name: classification.LogisticRegression
|
||||
params:
|
||||
numExamples: 700000
|
||||
numFeatures: 5000
|
||||
elasticNetParam: [0.0, 0.5]
|
||||
regParam: 0.01
|
||||
tol: 0.0
|
||||
maxIter: 10
|
||||
maxIter: 20
|
||||
- name: classification.LinearSVC
|
||||
params:
|
||||
numExamples: 500000
|
||||
regParam: 0.01
|
||||
tol: 0
|
||||
maxIter: 10
|
||||
maxIter: 20
|
||||
- name: classification.NaiveBayes
|
||||
params:
|
||||
numExamples: 2000000
|
||||
numFeatures: 5000
|
||||
numClasses: 10
|
||||
numClasses: 2
|
||||
smoothing: 1.0
|
||||
- name: clustering.GaussianMixture
|
||||
params:
|
||||
numFeatures: 30
|
||||
k: 15
|
||||
maxIter: 15
|
||||
tol: 0.0
|
||||
numExamples: 100000
|
||||
numTestExamples: 100000
|
||||
numFeatures: 1000
|
||||
k: 10
|
||||
maxIter: 10
|
||||
tol: 0.01
|
||||
- name: clustering.KMeans
|
||||
params:
|
||||
k: 20
|
||||
k: 50
|
||||
maxIter: 20
|
||||
tol: 1e-3
|
||||
- name: clustering.LDA
|
||||
params:
|
||||
numExamples: 200000
|
||||
docLength: 100
|
||||
vocabSize: 5000
|
||||
k: 20
|
||||
maxIter: 10
|
||||
k: 60
|
||||
maxIter: 20
|
||||
optimizer:
|
||||
- em
|
||||
- online
|
||||
@ -98,48 +93,40 @@ benchmarks:
|
||||
params:
|
||||
numExamples: 10000
|
||||
vocabSize: 100
|
||||
docLength: 300
|
||||
docLength: 1000
|
||||
numSynonymsToFind: 3
|
||||
- name: fpm.FPGrowth
|
||||
params:
|
||||
numExamples: 10000000
|
||||
numItems: 10000
|
||||
itemSetSize: [4, 14]
|
||||
itemSetSize: [4, 10]
|
||||
- name: recommendation.ALS
|
||||
params:
|
||||
numExamples: 20000000
|
||||
numExamples: 50000000
|
||||
numTestExamples: 50000000
|
||||
numUsers: 4000000
|
||||
numItems: 4000000
|
||||
numUsers: 6000000
|
||||
numItems: 6000000
|
||||
regParam: 0.01
|
||||
rank: 10
|
||||
maxIter: 8
|
||||
maxIter: 10
|
||||
- name: regression.DecisionTreeRegression
|
||||
params:
|
||||
depth: [4, 7]
|
||||
- name: regression.GBTRegression
|
||||
params:
|
||||
numFeatures: 1000
|
||||
depth: 4
|
||||
maxIter: 8
|
||||
depth: [5, 10]
|
||||
- name: regression.GLMRegression
|
||||
params:
|
||||
numExamples: 400000
|
||||
numExamples: 500000
|
||||
numTestExamples: 500000
|
||||
numFeatures: 500
|
||||
numFeatures: 1000
|
||||
link: log
|
||||
family: gaussian
|
||||
tol: 0.0
|
||||
maxIter: 8
|
||||
maxIter: 10
|
||||
regParam: 0.1
|
||||
- name: regression.LinearRegression
|
||||
params:
|
||||
numFeatures: 5000
|
||||
regParam: 0.01
|
||||
tol: 0.0
|
||||
maxIter: 9
|
||||
maxIter: 20
|
||||
- name: regression.RandomForestRegression
|
||||
params:
|
||||
depth: 7
|
||||
numFeatures: 500
|
||||
maxIter: 16
|
||||
depth: 10
|
||||
maxIter: 4
|
||||
|
||||
@ -142,12 +142,6 @@ benchmarks:
|
||||
depth: 3
|
||||
numClasses: 4
|
||||
numFeatures: 5
|
||||
- name: regression.GBTRegression
|
||||
params:
|
||||
numExamples: 100
|
||||
numTestExamples: 10
|
||||
depth: 3
|
||||
maxIter: 3
|
||||
- name: regression.GLMRegression
|
||||
params:
|
||||
numExamples: 100
|
||||
|
||||
@ -1,14 +0,0 @@
|
||||
select
|
||||
count(*) as total,
|
||||
count(ss_sold_date_sk) as not_null_total,
|
||||
--count(distinct ss_sold_date_sk) as unique_days,
|
||||
max(ss_sold_date_sk) as max_ss_sold_date_sk,
|
||||
max(ss_sold_time_sk) as max_ss_sold_time_sk,
|
||||
max(ss_item_sk) as max_ss_item_sk,
|
||||
max(ss_customer_sk) as max_ss_customer_sk,
|
||||
max(ss_cdemo_sk) as max_ss_cdemo_sk,
|
||||
max(ss_hdemo_sk) as max_ss_hdemo_sk,
|
||||
max(ss_addr_sk) as max_ss_addr_sk,
|
||||
max(ss_store_sk) as max_ss_store_sk,
|
||||
max(ss_promo_sk) as max_ss_promo_sk
|
||||
from store_sales
|
||||
@ -1,55 +1,27 @@
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
class AggregationPerformance extends Benchmark {
|
||||
import org.apache.spark.sql.{Row, SQLContext}
|
||||
|
||||
trait AggregationPerformance extends Benchmark {
|
||||
|
||||
import sqlContext.implicits._
|
||||
import ExecutionMode._
|
||||
|
||||
|
||||
val sizes = (1 to 6).map(math.pow(10, _).toInt)
|
||||
|
||||
val x = Table(
|
||||
"1milints", {
|
||||
val df = sqlContext.range(0, 1000000).repartition(1)
|
||||
df.createTempView("1milints")
|
||||
df
|
||||
})
|
||||
|
||||
val joinTables = Seq(
|
||||
Table(
|
||||
"100milints", {
|
||||
val df = sqlContext.range(0, 100000000).repartition(10)
|
||||
df.createTempView("100milints")
|
||||
df
|
||||
}),
|
||||
|
||||
Table(
|
||||
"1bilints", {
|
||||
val df = sqlContext.range(0, 1000000000).repartition(10)
|
||||
df.createTempView("1bilints")
|
||||
df
|
||||
}
|
||||
)
|
||||
)
|
||||
val sizes = (1 to 6).map(math.pow(10, _).toInt).toSeq
|
||||
|
||||
val variousCardinality = sizes.map { size =>
|
||||
Table(s"ints$size", {
|
||||
val df = sparkContext.parallelize(1 to size).flatMap { group =>
|
||||
Table(s"ints$size",
|
||||
sparkContext.parallelize(1 to size).flatMap { group =>
|
||||
(1 to 10000).map(i => (group, i))
|
||||
}.toDF("a", "b")
|
||||
df.createTempView(s"ints$size")
|
||||
df
|
||||
})
|
||||
}.toDF("a", "b"))
|
||||
}
|
||||
|
||||
val lowCardinality = sizes.map { size =>
|
||||
val fullSize = size * 10000L
|
||||
Table(
|
||||
s"twoGroups$fullSize", {
|
||||
val df = sqlContext.range(0, fullSize).select($"id" % 2 as 'a, $"id" as 'b)
|
||||
df.createTempView(s"twoGroups$fullSize")
|
||||
df
|
||||
})
|
||||
s"twoGroups$fullSize",
|
||||
sqlContext.range(0, fullSize).select($"id" % 2 as 'a, $"id" as 'b))
|
||||
}
|
||||
|
||||
val newAggreation = Variation("aggregationType", Seq("new", "old")) {
|
||||
@ -57,7 +29,7 @@ class AggregationPerformance extends Benchmark {
|
||||
case "new" => sqlContext.setConf("spark.sql.useAggregate2", "true")
|
||||
}
|
||||
|
||||
val varyNumGroupsAvg: Seq[Benchmarkable] = variousCardinality.map(_.name).map { table =>
|
||||
val varyNumGroupsAvg: Seq[Query] = variousCardinality.map(_.name).map { table =>
|
||||
Query(
|
||||
s"avg-$table",
|
||||
s"SELECT AVG(b) FROM $table GROUP BY a",
|
||||
@ -65,7 +37,7 @@ class AggregationPerformance extends Benchmark {
|
||||
executionMode = ForeachResults)
|
||||
}
|
||||
|
||||
val twoGroupsAvg: Seq[Benchmarkable] = lowCardinality.map(_.name).map { table =>
|
||||
val twoGroupsAvg: Seq[Query] = lowCardinality.map(_.name).map { table =>
|
||||
Query(
|
||||
s"avg-$table",
|
||||
s"SELECT AVG(b) FROM $table GROUP BY a",
|
||||
@ -73,7 +45,7 @@ class AggregationPerformance extends Benchmark {
|
||||
executionMode = ForeachResults)
|
||||
}
|
||||
|
||||
val complexInput: Seq[Benchmarkable] =
|
||||
val complexInput =
|
||||
Seq("1milints", "100milints", "1bilints").map { table =>
|
||||
Query(
|
||||
s"aggregation-complex-input-$table",
|
||||
@ -82,7 +54,7 @@ class AggregationPerformance extends Benchmark {
|
||||
executionMode = CollectResults)
|
||||
}
|
||||
|
||||
val aggregates: Seq[Benchmarkable] =
|
||||
val aggregates =
|
||||
Seq("1milints", "100milints", "1bilints").flatMap { table =>
|
||||
Seq("SUM", "AVG", "COUNT", "STDDEV").map { agg =>
|
||||
Query(
|
||||
|
||||
@ -22,10 +22,9 @@ import scala.concurrent._
|
||||
import scala.concurrent.duration._
|
||||
import scala.language.implicitConversions
|
||||
import scala.util.{Success, Try, Failure => SFailure}
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.{Dataset, DataFrame, SQLContext, SparkSession}
|
||||
import org.apache.spark.sql.{Dataset, DataFrame, SQLContext}
|
||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
|
||||
import org.apache.spark.SparkContext
|
||||
|
||||
@ -42,7 +41,7 @@ abstract class Benchmark(
|
||||
|
||||
import Benchmark._
|
||||
|
||||
def this() = this(SparkSession.builder.getOrCreate().sqlContext)
|
||||
def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate()))
|
||||
|
||||
val resultsLocation =
|
||||
sqlContext.getAllConfs.getOrElse(
|
||||
@ -335,7 +334,7 @@ object Benchmark {
|
||||
.flatMap { query =>
|
||||
try {
|
||||
query.newDataFrame().queryExecution.logical.collect {
|
||||
case r: UnresolvedRelation => r.tableName
|
||||
case UnresolvedRelation(t) => t.table
|
||||
}
|
||||
} catch {
|
||||
// ignore the queries that can't be parsed
|
||||
@ -436,9 +435,7 @@ object Benchmark {
|
||||
.format("json")
|
||||
.save(resultPath)
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
logMessage(s"Failed to write data: $e")
|
||||
throw e
|
||||
case e: Throwable => logMessage(s"Failed to write data: $e")
|
||||
}
|
||||
|
||||
logCollection()
|
||||
@ -451,7 +448,7 @@ object Benchmark {
|
||||
val location = cpu.collectLogs(sqlContext, fs, timestamp)
|
||||
logMessage(s"cpu results recorded to $location")
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
case e: Throwable =>
|
||||
logMessage(s"Error collecting logs: $e")
|
||||
throw e
|
||||
}
|
||||
@ -476,14 +473,14 @@ object Benchmark {
|
||||
/** Returns results from an actively running experiment. */
|
||||
def getCurrentResults() = {
|
||||
val tbl = sqlContext.createDataFrame(currentResults)
|
||||
tbl.createOrReplaceTempView("currentResults")
|
||||
tbl.registerTempTable("currentResults")
|
||||
tbl
|
||||
}
|
||||
|
||||
/** Returns full iterations from an actively running experiment. */
|
||||
def getCurrentRuns() = {
|
||||
val tbl = sqlContext.createDataFrame(currentRuns)
|
||||
tbl.createOrReplaceTempView("currentRuns")
|
||||
tbl.registerTempTable("currentRuns")
|
||||
tbl
|
||||
}
|
||||
|
||||
|
||||
@ -18,25 +18,22 @@ package com.databricks.spark.sql.perf
|
||||
|
||||
import java.util.UUID
|
||||
|
||||
import org.slf4j.LoggerFactory
|
||||
import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging}
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import org.apache.spark.sql.{SQLContext,SparkSession}
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.{SparkEnv, SparkContext}
|
||||
|
||||
|
||||
/** A trait to describe things that can be benchmarked. */
|
||||
trait Benchmarkable {
|
||||
@transient protected[this] val sqlSession = SparkSession.builder.getOrCreate()
|
||||
@transient protected[this] val sqlContext = sqlSession.sqlContext
|
||||
@transient protected[this] val sparkContext = sqlSession.sparkContext
|
||||
trait Benchmarkable extends Logging {
|
||||
@transient protected[this] val sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
|
||||
@transient protected[this] val sparkContext = sqlContext.sparkContext
|
||||
|
||||
val name: String
|
||||
protected val executionMode: ExecutionMode
|
||||
lazy val logger = LoggerFactory.getLogger(this.getClass.getName)
|
||||
|
||||
final def benchmark(
|
||||
includeBreakdown: Boolean,
|
||||
|
||||
@ -104,7 +104,7 @@ package object cpu {
|
||||
}
|
||||
|
||||
val counts = cpuLogs.groupBy($"stack").agg(count($"*")).collect().flatMap {
|
||||
case Row(stackLines: Array[String], count: Long) => stackLines.toSeq.map(toStackElement) -> count :: Nil
|
||||
case Row(stackLines: Seq[String], count: Long) => stackLines.map(toStackElement) -> count :: Nil
|
||||
case other => println(s"Failed to parse $other"); Nil
|
||||
}.toMap
|
||||
val profile = new com.twitter.jvm.CpuProfile(counts, com.twitter.util.Duration.fromSeconds(10), cpuLogs.count().toInt, 0)
|
||||
|
||||
@ -16,31 +16,9 @@
|
||||
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
import org.apache.spark.sql.Encoders
|
||||
import org.apache.spark.sql.{Encoder, SQLContext}
|
||||
import org.apache.spark.sql.expressions.Aggregator
|
||||
|
||||
object TypedAverage extends Aggregator[Long, SumAndCount, Double] {
|
||||
override def zero: SumAndCount = SumAndCount(0L, 0)
|
||||
|
||||
override def reduce(b: SumAndCount, a: Long): SumAndCount = {
|
||||
b.count += 1
|
||||
b.sum += a
|
||||
b
|
||||
}
|
||||
|
||||
override def bufferEncoder = Encoders.product
|
||||
|
||||
override def outputEncoder = Encoders.scalaDouble
|
||||
|
||||
override def finish(reduction: SumAndCount): Double = reduction.sum.toDouble / reduction.count
|
||||
|
||||
override def merge(b1: SumAndCount, b2: SumAndCount): SumAndCount = {
|
||||
b1.count += b2.count
|
||||
b1.sum += b2.sum
|
||||
b1
|
||||
}
|
||||
}
|
||||
|
||||
case class Data(id: Long)
|
||||
|
||||
case class SumAndCount(var sum: Long, var count: Int)
|
||||
@ -54,7 +32,7 @@ class DatasetPerformance extends Benchmark {
|
||||
val rdd = sparkContext.range(1, numLongs)
|
||||
|
||||
val smallNumLongs = 1000000
|
||||
val smallds = sqlContext.range(1, smallNumLongs).as[Long]
|
||||
val smallds = sqlContext.range(1, smallNumLongs)
|
||||
val smallrdd = sparkContext.range(1, smallNumLongs)
|
||||
|
||||
def allBenchmarks = range ++ backToBackFilters ++ backToBackMaps ++ computeAverage
|
||||
@ -121,10 +99,32 @@ class DatasetPerformance extends Benchmark {
|
||||
.map(d => Data(d.id + 1L)))
|
||||
)
|
||||
|
||||
val average = new Aggregator[Long, SumAndCount, Double] {
|
||||
override def zero: SumAndCount = SumAndCount(0, 0)
|
||||
|
||||
override def reduce(b: SumAndCount, a: Long): SumAndCount = {
|
||||
b.count += 1
|
||||
b.sum += a
|
||||
b
|
||||
}
|
||||
|
||||
override def bufferEncoder = implicitly[Encoder[SumAndCount]]
|
||||
|
||||
override def outputEncoder = implicitly[Encoder[Double]]
|
||||
|
||||
override def finish(reduction: SumAndCount): Double = reduction.sum.toDouble / reduction.count
|
||||
|
||||
override def merge(b1: SumAndCount, b2: SumAndCount): SumAndCount = {
|
||||
b1.count += b2.count
|
||||
b1.sum += b2.sum
|
||||
b1
|
||||
}
|
||||
}.toColumn
|
||||
|
||||
val computeAverage = Seq(
|
||||
new Query(
|
||||
"DS: average",
|
||||
smallds.select(TypedAverage.toColumn).toDF(),
|
||||
smallds.as[Long].select(average).toDF(),
|
||||
executionMode = ExecutionMode.CollectResults),
|
||||
new Query(
|
||||
"DF: average",
|
||||
@ -140,4 +140,4 @@ class DatasetPerformance extends Benchmark {
|
||||
sumAndCount._1.toDouble / sumAndCount._2
|
||||
})
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@ -3,8 +3,8 @@ package com.databricks.spark.sql.perf
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
class JoinPerformance extends Benchmark {
|
||||
|
||||
trait JoinPerformance extends Benchmark {
|
||||
// 1.5 mb, 1 file
|
||||
|
||||
import ExecutionMode._
|
||||
import sqlContext.implicits._
|
||||
@ -12,27 +12,22 @@ class JoinPerformance extends Benchmark {
|
||||
private val table = sqlContext.table _
|
||||
|
||||
val x = Table(
|
||||
"1milints", { // 1.5 mb, 1 file
|
||||
val df = sqlContext.range(0, 1000000).repartition(1)
|
||||
df.createTempView("1milints")
|
||||
df
|
||||
})
|
||||
"1milints",
|
||||
sqlContext.range(0, 1000000)
|
||||
.repartition(1))
|
||||
|
||||
val joinTables = Seq(
|
||||
// 143.542mb, 10 files
|
||||
Table(
|
||||
"100milints", { // 143.542mb, 10 files
|
||||
val df = sqlContext.range(0, 100000000).repartition(10)
|
||||
df.createTempView("100milints")
|
||||
df
|
||||
}),
|
||||
"100milints",
|
||||
sqlContext.range(0, 100000000)
|
||||
.repartition(10)),
|
||||
|
||||
// 1.4348gb, 10 files
|
||||
Table(
|
||||
"1bilints", { // 143.542mb, 10 files
|
||||
val df = sqlContext.range(0, 1000000000).repartition(10)
|
||||
df.createTempView("1bilints")
|
||||
df
|
||||
}
|
||||
)
|
||||
"1bilints",
|
||||
sqlContext.range(0, 1000000000)
|
||||
.repartition(10))
|
||||
)
|
||||
|
||||
val sortMergeJoin = Variation("sortMergeJoin", Seq("on", "off")) {
|
||||
@ -40,7 +35,7 @@ class JoinPerformance extends Benchmark {
|
||||
case "on" => sqlContext.setConf("spark.sql.planner.sortMergeJoin", "true")
|
||||
}
|
||||
|
||||
val singleKeyJoins: Seq[Benchmarkable] = Seq("1milints", "100milints", "1bilints").flatMap { table1 =>
|
||||
val singleKeyJoins = Seq("1milints", "100milints", "1bilints").flatMap { table1 =>
|
||||
Seq("1milints", "100milints", "1bilints").flatMap { table2 =>
|
||||
Seq("JOIN", "RIGHT JOIN", "LEFT JOIN", "FULL OUTER JOIN").map { join =>
|
||||
Query(
|
||||
@ -68,9 +63,9 @@ class JoinPerformance extends Benchmark {
|
||||
|
||||
val varyNumMatches = Seq(1, 2, 4, 8, 16).map { numCopies =>
|
||||
val ints = table("100milints")
|
||||
val copiedInts = Seq.fill(numCopies)(ints).reduce(_ union _)
|
||||
val copiedInts = Seq.fill(numCopies)(ints).reduce(_ unionAll _)
|
||||
new Query(
|
||||
s"join - numMatches: $numCopies",
|
||||
copiedInts.as("a").join(ints.as("b"), $"a.id" === $"b.id"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -54,7 +54,10 @@ class Query(
|
||||
}
|
||||
|
||||
lazy val tablesInvolved = buildDataFrame.queryExecution.logical collect {
|
||||
case r: UnresolvedRelation => r.tableName
|
||||
case UnresolvedRelation(tableIdentifier) => {
|
||||
// We are ignoring the database name.
|
||||
tableIdentifier.table
|
||||
}
|
||||
}
|
||||
|
||||
def newDataFrame() = buildDataFrame
|
||||
@ -85,11 +88,10 @@ class Query(
|
||||
val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan.p(i)))
|
||||
val indexMap = physicalOperators.map { case (index, op) => (op, index) }.toMap
|
||||
val timeMap = new mutable.HashMap[Int, Double]
|
||||
val maxFields = 999 // Maximum number of fields that will be converted to strings
|
||||
|
||||
physicalOperators.reverse.map {
|
||||
case (index, node) =>
|
||||
messages += s"Breakdown: ${node.simpleString(maxFields)}"
|
||||
messages += s"Breakdown: ${node.simpleString}"
|
||||
val newNode = buildDataFrame.queryExecution.executedPlan.p(index)
|
||||
val executionTime = measureTimeMs {
|
||||
newNode.execute().foreach((row: Any) => Unit)
|
||||
@ -102,7 +104,7 @@ class Query(
|
||||
|
||||
BreakdownResult(
|
||||
node.nodeName,
|
||||
node.simpleString(maxFields).replaceAll("#\\d+", ""),
|
||||
node.simpleString.replaceAll("#\\d+", ""),
|
||||
index,
|
||||
childIndexes,
|
||||
executionTime,
|
||||
@ -120,7 +122,7 @@ class Query(
|
||||
val executionTime = measureTimeMs {
|
||||
executionMode match {
|
||||
case ExecutionMode.CollectResults => dataFrame.collect()
|
||||
case ExecutionMode.ForeachResults => dataFrame.foreach { _ => ():Unit }
|
||||
case ExecutionMode.ForeachResults => dataFrame.foreach { row => Unit }
|
||||
case ExecutionMode.WriteParquet(location) =>
|
||||
dataFrame.write.parquet(s"$location/$name.parquet")
|
||||
case ExecutionMode.HashResults =>
|
||||
|
||||
@ -17,14 +17,14 @@
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
import java.net.InetAddress
|
||||
import java.io.File
|
||||
import org.apache.spark.sql.{SQLContext, SparkSession}
|
||||
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.{SparkContext, SparkConf}
|
||||
|
||||
import scala.util.Try
|
||||
|
||||
case class RunConfig(
|
||||
master: String = "local[*]",
|
||||
benchmarkName: String = null,
|
||||
filter: Option[String] = None,
|
||||
iterations: Int = 3,
|
||||
@ -37,9 +37,6 @@ object RunBenchmark {
|
||||
def main(args: Array[String]): Unit = {
|
||||
val parser = new scopt.OptionParser[RunConfig]("spark-sql-perf") {
|
||||
head("spark-sql-perf", "0.2.0")
|
||||
opt[String]('m', "master")
|
||||
.action { (x, c) => c.copy(master = x) }
|
||||
.text("the Spark master to use, default to local[*]")
|
||||
opt[String]('b', "benchmark")
|
||||
.action { (x, c) => c.copy(benchmarkName = x) }
|
||||
.text("the name of the benchmark to run")
|
||||
@ -67,17 +64,14 @@ object RunBenchmark {
|
||||
|
||||
def run(config: RunConfig): Unit = {
|
||||
val conf = new SparkConf()
|
||||
.setMaster(config.master)
|
||||
.setAppName(getClass.getName)
|
||||
.setMaster("local[*]")
|
||||
.setAppName(getClass.getName)
|
||||
|
||||
val sparkSession = SparkSession.builder.config(conf).getOrCreate()
|
||||
val sc = sparkSession.sparkContext
|
||||
val sqlContext = sparkSession.sqlContext
|
||||
val sc = SparkContext.getOrCreate(conf)
|
||||
val sqlContext = SQLContext.getOrCreate(sc)
|
||||
import sqlContext.implicits._
|
||||
|
||||
sqlContext.setConf("spark.sql.perf.results",
|
||||
new File("performance").toURI.toString)
|
||||
|
||||
sqlContext.setConf("spark.sql.perf.results", new java.io.File("performance").toURI.toString)
|
||||
val benchmark = Try {
|
||||
Class.forName(config.benchmarkName)
|
||||
.newInstance()
|
||||
@ -108,8 +102,7 @@ object RunBenchmark {
|
||||
experiment.waitForFinish(1000 * 60 * 30)
|
||||
|
||||
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
|
||||
|
||||
val toShow = experiment.getCurrentRuns()
|
||||
experiment.getCurrentRuns()
|
||||
.withColumn("result", explode($"results"))
|
||||
.select("result.*")
|
||||
.groupBy("name")
|
||||
@ -117,13 +110,9 @@ object RunBenchmark {
|
||||
min($"executionTime") as 'minTimeMs,
|
||||
max($"executionTime") as 'maxTimeMs,
|
||||
avg($"executionTime") as 'avgTimeMs,
|
||||
stddev($"executionTime") as 'stdDev,
|
||||
(stddev($"executionTime") / avg($"executionTime") * 100) as 'stdDevPercent)
|
||||
stddev($"executionTime") as 'stdDev)
|
||||
.orderBy("name")
|
||||
|
||||
println("Showing at most 100 query results now")
|
||||
toShow.show(100)
|
||||
|
||||
.show(truncate = false)
|
||||
println(s"""Results: sqlContext.read.json("${experiment.resultPath}")""")
|
||||
|
||||
config.baseline.foreach { baseTimestamp =>
|
||||
@ -147,4 +136,4 @@ object RunBenchmark {
|
||||
data.show(truncate = false)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -179,7 +179,7 @@ abstract class Tables(sqlContext: SQLContext, scaleFactor: String,
|
||||
|
||||
val data = df(format != "text", numPartitions)
|
||||
val tempTableName = s"${name}_text"
|
||||
data.createOrReplaceTempView(tempTableName)
|
||||
data.registerTempTable(tempTableName)
|
||||
|
||||
val writer = if (partitionColumns.nonEmpty) {
|
||||
if (clusterByPartitionColumns) {
|
||||
@ -211,24 +211,9 @@ abstract class Tables(sqlContext: SQLContext, scaleFactor: String,
|
||||
data.write
|
||||
}
|
||||
} else {
|
||||
// treat non-partitioned tables as "one partition" that we want to coalesce
|
||||
if (clusterByPartitionColumns) {
|
||||
// in case data has more than maxRecordsPerFile, split into multiple writers to improve datagen speed
|
||||
// files will be truncated to maxRecordsPerFile value, so the final result will be the same
|
||||
val numRows = data.count
|
||||
val maxRecordPerFile = util.Try(sqlContext.getConf("spark.sql.files.maxRecordsPerFile").toInt).getOrElse(0)
|
||||
|
||||
println(s"Data has $numRows rows clustered $clusterByPartitionColumns for $maxRecordPerFile")
|
||||
log.info(s"Data has $numRows rows clustered $clusterByPartitionColumns for $maxRecordPerFile")
|
||||
|
||||
if (maxRecordPerFile > 0 && numRows > maxRecordPerFile) {
|
||||
val numFiles = (numRows.toDouble/maxRecordPerFile).ceil.toInt
|
||||
println(s"Coalescing into $numFiles files")
|
||||
log.info(s"Coalescing into $numFiles files")
|
||||
data.coalesce(numFiles).write
|
||||
} else {
|
||||
data.coalesce(1).write
|
||||
}
|
||||
// treat non-partitioned tables as "one partition" that we want to coalesce
|
||||
data.coalesce(1).write
|
||||
} else {
|
||||
data.write
|
||||
}
|
||||
@ -266,7 +251,7 @@ abstract class Tables(sqlContext: SQLContext, scaleFactor: String,
|
||||
def createTemporaryTable(location: String, format: String): Unit = {
|
||||
println(s"Creating temporary table $name using data stored in $location.")
|
||||
log.info(s"Creating temporary table $name using data stored in $location.")
|
||||
sqlContext.read.format(format).load(location).createOrReplaceTempView(name)
|
||||
sqlContext.read.format(format).load(location).registerTempTable(name)
|
||||
}
|
||||
|
||||
def analyzeTable(databaseName: String, analyzeColumns: Boolean = false): Unit = {
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package com.databricks.spark.sql.perf.mllib
|
||||
|
||||
import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging}
|
||||
import org.apache.spark.ml.attribute.{NominalAttribute, NumericAttribute}
|
||||
import org.apache.spark.ml.{Estimator, PipelineStage, Transformer}
|
||||
import org.apache.spark.ml.evaluation.Evaluator
|
||||
@ -20,7 +21,7 @@ import com.databricks.spark.sql.perf._
|
||||
*
|
||||
* It is assumed that the implementation is going to be an object.
|
||||
*/
|
||||
trait BenchmarkAlgorithm {
|
||||
trait BenchmarkAlgorithm extends Logging {
|
||||
|
||||
def trainingDataSet(ctx: MLBenchContext): DataFrame
|
||||
|
||||
@ -34,21 +35,12 @@ trait BenchmarkAlgorithm {
|
||||
/**
|
||||
* The unnormalized score of the training procedure on a dataset. The normalization is
|
||||
* performed by the caller.
|
||||
* This calls `count()` on the transformed data to attempt to materialize the result for
|
||||
* recording timing metrics.
|
||||
*/
|
||||
@throws[Exception]("if scoring fails")
|
||||
def score(
|
||||
ctx: MLBenchContext,
|
||||
testSet: DataFrame,
|
||||
model: Transformer): MLMetric = {
|
||||
val output = model.transform(testSet)
|
||||
// We create a useless UDF to make sure the entire DataFrame is instantiated.
|
||||
val fakeUDF = udf { (_: Any) => 0 }
|
||||
val columns = testSet.columns
|
||||
output.select(sum(fakeUDF(struct(columns.map(col) : _*)))).first()
|
||||
MLMetric.Invalid
|
||||
}
|
||||
model: Transformer): MLMetric = MLMetric.Invalid
|
||||
|
||||
def name: String = {
|
||||
this.getClass.getCanonicalName.replace("$", "")
|
||||
|
||||
@ -2,7 +2,7 @@ package com.databricks.spark.sql.perf.mllib
|
||||
|
||||
import com.databricks.spark.sql.perf.mllib.classification.LogisticRegression
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.sql.{SQLContext,SparkSession}
|
||||
import org.apache.spark.sql.SQLContext
|
||||
|
||||
import com.databricks.spark.sql.perf.{MLParams}
|
||||
import OptionImplicits._
|
||||
@ -27,9 +27,8 @@ object MLBenchmarks {
|
||||
)
|
||||
)
|
||||
|
||||
val sparkSession = SparkSession.builder.getOrCreate()
|
||||
val sqlContext: SQLContext = sparkSession.sqlContext
|
||||
val context = sqlContext.sparkContext
|
||||
val context = SparkContext.getOrCreate()
|
||||
val sqlContext: SQLContext = SQLContext.getOrCreate(context)
|
||||
|
||||
def benchmarkObjects: Seq[MLPipelineStageBenchmarkable] = benchmarks.map { mlb =>
|
||||
new MLPipelineStageBenchmarkable(mlb.params, mlb.benchmark, sqlContext)
|
||||
|
||||
@ -4,7 +4,7 @@ package com.databricks.spark.sql.perf.mllib
|
||||
import scala.io.Source
|
||||
import scala.language.implicitConversions
|
||||
|
||||
import org.slf4j.LoggerFactory
|
||||
import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging}
|
||||
|
||||
import org.apache.spark.sql.{DataFrame, SQLContext}
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
@ -18,7 +18,7 @@ class MLLib(sqlContext: SQLContext)
|
||||
def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate()))
|
||||
}
|
||||
|
||||
object MLLib {
|
||||
object MLLib extends Logging {
|
||||
|
||||
/**
|
||||
* Runs a set of preprogrammed experiments and blocks on completion.
|
||||
@ -26,9 +26,6 @@ object MLLib {
|
||||
* @param runConfig a configuration that is av
|
||||
* @return
|
||||
*/
|
||||
|
||||
lazy val logger = LoggerFactory.getLogger(this.getClass.getName)
|
||||
|
||||
def runDefault(runConfig: RunConfig): DataFrame = {
|
||||
val ml = new MLLib()
|
||||
val benchmarks = MLBenchmarks.benchmarkObjects
|
||||
@ -57,21 +54,6 @@ object MLLib {
|
||||
run(yamlFile = configFile)
|
||||
}
|
||||
|
||||
private[mllib] def getConf(yamlFile: String = null, yamlConfig: String = null): YamlConfig = {
|
||||
Option(yamlFile).map(YamlConfig.readFile).getOrElse {
|
||||
require(yamlConfig != null)
|
||||
YamlConfig.readString(yamlConfig)
|
||||
}
|
||||
}
|
||||
|
||||
private[mllib] def getBenchmarks(conf: YamlConfig): Seq[MLPipelineStageBenchmarkable] = {
|
||||
val sqlContext = com.databricks.spark.sql.perf.mllib.MLBenchmarks.sqlContext
|
||||
val benchmarksDescriptions = conf.runnableBenchmarks
|
||||
benchmarksDescriptions.map { mlb =>
|
||||
new MLPipelineStageBenchmarkable(mlb.params, mlb.benchmark, sqlContext)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs all the experiments and blocks on completion
|
||||
*
|
||||
@ -80,12 +62,20 @@ object MLLib {
|
||||
*/
|
||||
def run(yamlFile: String = null, yamlConfig: String = null): DataFrame = {
|
||||
logger.info("Starting run")
|
||||
val conf = getConf(yamlFile, yamlConfig)
|
||||
val conf: YamlConfig = Option(yamlFile).map(YamlConfig.readFile).getOrElse {
|
||||
require(yamlConfig != null)
|
||||
YamlConfig.readString(yamlConfig)
|
||||
}
|
||||
|
||||
val sparkConf = new SparkConf().setAppName("MLlib QA").setMaster("local[2]")
|
||||
val sc = SparkContext.getOrCreate(sparkConf)
|
||||
sc.setLogLevel("INFO")
|
||||
val b = new com.databricks.spark.sql.perf.mllib.MLLib()
|
||||
val benchmarks = getBenchmarks(conf)
|
||||
val sqlContext = com.databricks.spark.sql.perf.mllib.MLBenchmarks.sqlContext
|
||||
val benchmarksDescriptions = conf.runnableBenchmarks
|
||||
val benchmarks = benchmarksDescriptions.map { mlb =>
|
||||
new MLPipelineStageBenchmarkable(mlb.params, mlb.benchmark, sqlContext)
|
||||
}
|
||||
println(s"${benchmarks.size} benchmarks identified:")
|
||||
val str = benchmarks.map(_.prettyPrint).mkString("\n")
|
||||
println(str)
|
||||
|
||||
@ -1,7 +1,8 @@
|
||||
package com.databricks.spark.sql.perf.mllib
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging}
|
||||
|
||||
import org.apache.spark.ml.{Estimator, Transformer}
|
||||
import org.apache.spark.sql._
|
||||
@ -13,7 +14,7 @@ class MLPipelineStageBenchmarkable(
|
||||
params: MLParams,
|
||||
test: BenchmarkAlgorithm,
|
||||
sqlContext: SQLContext)
|
||||
extends Benchmarkable with Serializable {
|
||||
extends Benchmarkable with Serializable with Logging {
|
||||
|
||||
import MLPipelineStageBenchmarkable._
|
||||
|
||||
@ -26,7 +27,7 @@ class MLPipelineStageBenchmarkable(
|
||||
|
||||
override protected val executionMode: ExecutionMode = ExecutionMode.SparkPerfResults
|
||||
|
||||
override protected[mllib] def beforeBenchmark(): Unit = {
|
||||
override protected def beforeBenchmark(): Unit = {
|
||||
logger.info(s"$this beforeBenchmark")
|
||||
try {
|
||||
testData = test.testDataSet(param)
|
||||
@ -36,12 +37,21 @@ class MLPipelineStageBenchmarkable(
|
||||
trainingData.cache()
|
||||
trainingData.count()
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
case e: Throwable =>
|
||||
println(s"$this error in beforeBenchmark: ${e.getStackTraceString}")
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
override protected def afterBenchmark(sc: SparkContext): Unit = {
|
||||
// Best-effort clean up of weakly referenced RDDs, shuffles, and broadcasts
|
||||
// Remove any leftover blocks that still exist
|
||||
sc.getExecutorStorageStatus
|
||||
.flatMap { status => status.blocks.map { case (bid, _) => bid } }
|
||||
.foreach { bid => SparkEnv.get.blockManager.master.removeBlock(bid) }
|
||||
super.afterBenchmark(sc)
|
||||
}
|
||||
|
||||
override protected def doBenchmark(
|
||||
includeBreakdown: Boolean,
|
||||
description: String,
|
||||
|
||||
@ -51,7 +51,6 @@ object NaiveBayes extends BenchmarkAlgorithm
|
||||
// Initialize new Naive Bayes model
|
||||
val pi = Vectors.dense(piArray)
|
||||
val theta = new DenseMatrix(numClasses, numFeatures, thetaArray.flatten, true)
|
||||
|
||||
ModelBuilderSSP.newNaiveBayesModel(pi, theta)
|
||||
}
|
||||
|
||||
|
||||
@ -13,41 +13,35 @@ import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext,
|
||||
/** Object for testing VectorAssembler performance */
|
||||
object VectorAssembler extends BenchmarkAlgorithm with TestFromTraining {
|
||||
|
||||
private def getInputCols(numInputCols: Int): Array[String] = {
|
||||
Array.tabulate(numInputCols)(i => s"c${i}")
|
||||
}
|
||||
|
||||
override def trainingDataSet(ctx: MLBenchContext): DataFrame = {
|
||||
import ctx.params._
|
||||
import ctx.sqlContext.implicits._
|
||||
|
||||
require(numInputCols.get <= numFeatures.get,
|
||||
s"numInputCols (${numInputCols}) cannot be greater than numFeatures (${numFeatures}).")
|
||||
|
||||
val df = DataGenerator.generateContinuousFeatures(
|
||||
var df = DataGenerator.generateContinuousFeatures(
|
||||
ctx.sqlContext,
|
||||
numExamples,
|
||||
ctx.seed(),
|
||||
numPartitions,
|
||||
numFeatures)
|
||||
|
||||
val slice = udf { (v: Vector, numSlices: Int) =>
|
||||
val data = v.toArray
|
||||
val n = data.length.toLong
|
||||
(0 until numSlices).map { i =>
|
||||
val start = ((i * n) / numSlices).toInt
|
||||
val end = ((i + 1) * n / numSlices).toInt
|
||||
Vectors.dense(data.slice(start, end))
|
||||
}
|
||||
numFeatures * numInputCols
|
||||
)
|
||||
val sliceVec = udf { (v: Vector, from: Int, until: Int) =>
|
||||
Vectors.dense(v.toArray.slice(from, until))
|
||||
}
|
||||
|
||||
val inputCols = getInputCols(numInputCols.get)
|
||||
df.select(slice(col("features"), lit(numInputCols.get)).as("slices"))
|
||||
.select((0 until numInputCols.get).map(i => col("slices")(i).as(inputCols(i))): _*)
|
||||
for (i <- (1 to numInputCols.get)) {
|
||||
val colName = s"inputCol${i.toString}"
|
||||
val fromIndex = (i - 1) * numFeatures
|
||||
val untilIndex = i * numFeatures
|
||||
df = df.withColumn(colName, sliceVec(col("features"), lit(fromIndex), lit(untilIndex)))
|
||||
}
|
||||
df.drop(col("features"))
|
||||
}
|
||||
|
||||
override def getPipelineStage(ctx: MLBenchContext): PipelineStage = {
|
||||
import ctx.params._
|
||||
val inputCols = getInputCols(numInputCols.get)
|
||||
|
||||
val inputCols = (1 to numInputCols.get)
|
||||
.map(i => s"inputCol${i.toString}").toArray
|
||||
|
||||
new ml.feature.VectorAssembler()
|
||||
.setInputCols(inputCols)
|
||||
}
|
||||
|
||||
@ -1,18 +0,0 @@
|
||||
package com.databricks.spark.sql.perf.mllib.regression
|
||||
|
||||
import org.apache.spark.ml.PipelineStage
|
||||
import org.apache.spark.ml.regression.GBTRegressor
|
||||
|
||||
import com.databricks.spark.sql.perf.mllib.OptionImplicits._
|
||||
import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext,
|
||||
TreeOrForestRegressor}
|
||||
|
||||
object GBTRegression extends BenchmarkAlgorithm with TreeOrForestRegressor {
|
||||
override def getPipelineStage(ctx: MLBenchContext): PipelineStage = {
|
||||
import ctx.params._
|
||||
new GBTRegressor()
|
||||
.setMaxDepth(depth)
|
||||
.setMaxIter(maxIter)
|
||||
.setSeed(ctx.seed())
|
||||
}
|
||||
}
|
||||
@ -1,121 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015 Databricks Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.databricks.spark.sql.perf.tpcds
|
||||
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
case class GenTPCDSDataConfig(
|
||||
master: String = "local[*]",
|
||||
dsdgenDir: String = null,
|
||||
scaleFactor: String = null,
|
||||
location: String = null,
|
||||
format: String = null,
|
||||
useDoubleForDecimal: Boolean = false,
|
||||
useStringForDate: Boolean = false,
|
||||
overwrite: Boolean = false,
|
||||
partitionTables: Boolean = true,
|
||||
clusterByPartitionColumns: Boolean = true,
|
||||
filterOutNullPartitionValues: Boolean = true,
|
||||
tableFilter: String = "",
|
||||
numPartitions: Int = 100)
|
||||
|
||||
/**
|
||||
* Gen TPCDS data.
|
||||
* To run this:
|
||||
* {{{
|
||||
* build/sbt "test:runMain <this class> -d <dsdgenDir> -s <scaleFactor> -l <location> -f <format>"
|
||||
* }}}
|
||||
*/
|
||||
object GenTPCDSData {
|
||||
def main(args: Array[String]): Unit = {
|
||||
val parser = new scopt.OptionParser[GenTPCDSDataConfig]("Gen-TPC-DS-data") {
|
||||
opt[String]('m', "master")
|
||||
.action { (x, c) => c.copy(master = x) }
|
||||
.text("the Spark master to use, default to local[*]")
|
||||
opt[String]('d', "dsdgenDir")
|
||||
.action { (x, c) => c.copy(dsdgenDir = x) }
|
||||
.text("location of dsdgen")
|
||||
.required()
|
||||
opt[String]('s', "scaleFactor")
|
||||
.action((x, c) => c.copy(scaleFactor = x))
|
||||
.text("scaleFactor defines the size of the dataset to generate (in GB)")
|
||||
opt[String]('l', "location")
|
||||
.action((x, c) => c.copy(location = x))
|
||||
.text("root directory of location to create data in")
|
||||
opt[String]('f', "format")
|
||||
.action((x, c) => c.copy(format = x))
|
||||
.text("valid spark format, Parquet, ORC ...")
|
||||
opt[Boolean]('i', "useDoubleForDecimal")
|
||||
.action((x, c) => c.copy(useDoubleForDecimal = x))
|
||||
.text("true to replace DecimalType with DoubleType")
|
||||
opt[Boolean]('e', "useStringForDate")
|
||||
.action((x, c) => c.copy(useStringForDate = x))
|
||||
.text("true to replace DateType with StringType")
|
||||
opt[Boolean]('o', "overwrite")
|
||||
.action((x, c) => c.copy(overwrite = x))
|
||||
.text("overwrite the data that is already there")
|
||||
opt[Boolean]('p', "partitionTables")
|
||||
.action((x, c) => c.copy(partitionTables = x))
|
||||
.text("create the partitioned fact tables")
|
||||
opt[Boolean]('c', "clusterByPartitionColumns")
|
||||
.action((x, c) => c.copy(clusterByPartitionColumns = x))
|
||||
.text("shuffle to get partitions coalesced into single files")
|
||||
opt[Boolean]('v', "filterOutNullPartitionValues")
|
||||
.action((x, c) => c.copy(filterOutNullPartitionValues = x))
|
||||
.text("true to filter out the partition with NULL key value")
|
||||
opt[String]('t', "tableFilter")
|
||||
.action((x, c) => c.copy(tableFilter = x))
|
||||
.text("\"\" means generate all tables")
|
||||
opt[Int]('n', "numPartitions")
|
||||
.action((x, c) => c.copy(numPartitions = x))
|
||||
.text("how many dsdgen partitions to run - number of input tasks.")
|
||||
help("help")
|
||||
.text("prints this usage text")
|
||||
}
|
||||
|
||||
parser.parse(args, GenTPCDSDataConfig()) match {
|
||||
case Some(config) =>
|
||||
run(config)
|
||||
case None =>
|
||||
System.exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
private def run(config: GenTPCDSDataConfig) {
|
||||
val spark = SparkSession
|
||||
.builder()
|
||||
.appName(getClass.getName)
|
||||
.master(config.master)
|
||||
.getOrCreate()
|
||||
|
||||
val tables = new TPCDSTables(spark.sqlContext,
|
||||
dsdgenDir = config.dsdgenDir,
|
||||
scaleFactor = config.scaleFactor,
|
||||
useDoubleForDecimal = config.useDoubleForDecimal,
|
||||
useStringForDate = config.useStringForDate)
|
||||
|
||||
tables.genData(
|
||||
location = config.location,
|
||||
format = config.format,
|
||||
overwrite = config.overwrite,
|
||||
partitionTables = config.partitionTables,
|
||||
clusterByPartitionColumns = config.clusterByPartitionColumns,
|
||||
filterOutNullPartitionValues = config.filterOutNullPartitionValues,
|
||||
tableFilter = config.tableFilter,
|
||||
numPartitions = config.numPartitions)
|
||||
}
|
||||
}
|
||||
@ -17,9 +17,10 @@
|
||||
package com.databricks.spark.sql.perf.tpcds
|
||||
|
||||
import scala.collection.mutable
|
||||
|
||||
import com.databricks.spark.sql.perf._
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.sql.{SQLContext, SparkSession}
|
||||
import org.apache.spark.sql.SQLContext
|
||||
|
||||
/**
|
||||
* TPC-DS benchmark's dataset.
|
||||
@ -34,7 +35,7 @@ class TPCDS(@transient sqlContext: SQLContext)
|
||||
with Tpcds_2_4_Queries
|
||||
with Serializable {
|
||||
|
||||
def this() = this(SparkSession.builder.getOrCreate().sqlContext)
|
||||
def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate()))
|
||||
|
||||
/*
|
||||
def setupBroadcast(skipTables: Seq[String] = Seq("store_sales", "customer")) = {
|
||||
|
||||
@ -27,7 +27,7 @@ trait Tpcds_2_4_Queries extends Benchmark {
|
||||
|
||||
import ExecutionMode._
|
||||
|
||||
val queryNames = Seq(
|
||||
private val queryNames = Seq(
|
||||
"q1", "q2", "q3", "q4", "q5", "q6", "q7", "q8", "q9", "q10",
|
||||
"q11", "q12", "q13", "q14a", "q14b", "q15", "q16", "q17", "q18", "q19",
|
||||
"q20", "q21", "q22", "q23a", "q23b", "q24a", "q24b", "q25", "q26", "q27",
|
||||
|
||||
@ -77,7 +77,7 @@ class TPCHTables(
|
||||
|
||||
val tables = Seq(
|
||||
Table("part",
|
||||
partitionColumns = "p_brand" :: Nil,
|
||||
partitionColumns = Nil,
|
||||
'p_partkey.long,
|
||||
'p_name.string,
|
||||
'p_mfgr.string,
|
||||
@ -107,18 +107,18 @@ class TPCHTables(
|
||||
'ps_comment.string
|
||||
),
|
||||
Table("customer",
|
||||
partitionColumns = "c_mktsegment" :: Nil,
|
||||
partitionColumns = Nil,
|
||||
'c_custkey.long,
|
||||
'c_name.string,
|
||||
'c_address.string,
|
||||
'c_nationkey.long,
|
||||
'c_nationkey.string,
|
||||
'c_phone.string,
|
||||
'c_acctbal.decimal(12, 2),
|
||||
'c_mktsegment.string,
|
||||
'c_comment.string
|
||||
),
|
||||
Table("orders",
|
||||
partitionColumns = "o_orderdate" :: Nil,
|
||||
partitionColumns = Nil,
|
||||
'o_orderkey.long,
|
||||
'o_custkey.long,
|
||||
'o_orderstatus.string,
|
||||
@ -130,7 +130,7 @@ class TPCHTables(
|
||||
'o_comment.string
|
||||
),
|
||||
Table("lineitem",
|
||||
partitionColumns = "l_shipdate" :: Nil,
|
||||
partitionColumns = Nil,
|
||||
'l_orderkey.long,
|
||||
'l_partkey.long,
|
||||
'l_suppkey.long,
|
||||
|
||||
@ -59,7 +59,7 @@ object ModelBuilderSSP {
|
||||
}
|
||||
|
||||
def newNaiveBayesModel(pi: Vector, theta: Matrix): NaiveBayesModel = {
|
||||
val model = new NaiveBayesModel("naivebayes-uid", pi, theta, null)
|
||||
val model = new NaiveBayesModel("naivebayes-uid", pi, theta)
|
||||
model.set(model.modelType, "multinomial")
|
||||
}
|
||||
|
||||
@ -160,9 +160,9 @@ object TreeBuilder {
|
||||
labelGenerator.setSeed(rng.nextLong)
|
||||
// We use a dummy impurityCalculator for all nodes.
|
||||
val impurityCalculator = if (isRegression) {
|
||||
ImpurityCalculator.getCalculator("variance", Array.fill[Double](3)(0.0), 0L)
|
||||
ImpurityCalculator.getCalculator("variance", Array.fill[Double](3)(0.0))
|
||||
} else {
|
||||
ImpurityCalculator.getCalculator("gini", Array.fill[Double](labelType)(0.0), 0L)
|
||||
ImpurityCalculator.getCalculator("gini", Array.fill[Double](labelType)(0.0))
|
||||
}
|
||||
|
||||
randomBalancedDecisionTreeHelper(depth, featureArity, impurityCalculator,
|
||||
|
||||
@ -1,9 +1,11 @@
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
import org.apache.spark.sql.hive.test.TestHive
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
class DatasetPerformanceSuite extends FunSuite {
|
||||
ignore("run benchmark") {
|
||||
TestHive // Init HiveContext
|
||||
val benchmark = new DatasetPerformance() {
|
||||
override val numLongs = 100
|
||||
}
|
||||
|
||||
@ -1,46 +1,10 @@
|
||||
package com.databricks.spark.sql.perf.mllib
|
||||
|
||||
import org.scalatest.{BeforeAndAfterAll, FunSuite}
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
import org.apache.log4j.{Level, Logger}
|
||||
import org.apache.spark.sql.Row
|
||||
|
||||
import org.apache.spark.sql.{Row, SparkSession}
|
||||
|
||||
class MLLibSuite extends FunSuite with BeforeAndAfterAll {
|
||||
|
||||
private var sparkSession: SparkSession = _
|
||||
var savedLevels: Map[String, Level] = _
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
sparkSession = SparkSession.builder.master("local[2]").appName("MLlib QA").getOrCreate()
|
||||
|
||||
// Travis limits the size of the log file produced by a build. Because we do run a small
|
||||
// version of all the ML benchmarks in this suite, we produce a ton of logs. Here we set the
|
||||
// log level to ERROR, just for this suite, to avoid displeasing travis.
|
||||
savedLevels = Seq("akka", "org", "com.databricks").map { name =>
|
||||
val logger = Logger.getLogger(name)
|
||||
val curLevel = logger.getLevel
|
||||
logger.setLevel(Level.ERROR)
|
||||
name -> curLevel
|
||||
}.toMap
|
||||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
savedLevels.foreach { case (name, level) =>
|
||||
Logger.getLogger(name).setLevel(level)
|
||||
}
|
||||
try {
|
||||
if (sparkSession != null) {
|
||||
sparkSession.stop()
|
||||
}
|
||||
// To avoid RPC rebinding to the same port, since it doesn't unbind immediately on shutdown
|
||||
System.clearProperty("spark.driver.port")
|
||||
sparkSession = null
|
||||
} finally {
|
||||
super.afterAll()
|
||||
}
|
||||
}
|
||||
class MLLibTest extends FunSuite {
|
||||
|
||||
test("test MlLib benchmarks with mllib-small.yaml.") {
|
||||
val results = MLLib.run(yamlConfig = MLLib.smallConfig)
|
||||
@ -56,11 +20,4 @@ class MLLibSuite extends FunSuite with BeforeAndAfterAll {
|
||||
fail("Unable to run all benchmarks successfully, see console output for more info.")
|
||||
}
|
||||
}
|
||||
|
||||
test("test before benchmark methods for pipeline benchmarks.") {
|
||||
val benchmarks = MLLib.getBenchmarks(MLLib.getConf(yamlConfig = MLLib.smallConfig))
|
||||
benchmarks.foreach { b =>
|
||||
b.beforeBenchmark()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1 +1 @@
|
||||
version in ThisBuild := "0.5.1-SNAPSHOT"
|
||||
version in ThisBuild := "0.5.0-SNAPSHOT"
|
||||
|
||||
Loading…
Reference in New Issue
Block a user