Benchmark for SparkR UDF *apply() APIs

Add a benchmark for SparkR `spark.lapply, dapply/dapplyCollect, gapply/gapplyCollect` APIs. Test on synthesized data with different types and sizes.

Author: Liang Zhang <liang.zhang@databricks.com>

Closes #163 from morewood/sparkr.
This commit is contained in:
Liang Zhang 2018-07-12 17:12:35 -07:00 committed by Hossein
parent 8e8c08d75b
commit 0ab6bf606b
5 changed files with 990 additions and 0 deletions

2
src/main/R/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
results
metastore_db

123
src/main/R/README.md Normal file
View File

@ -0,0 +1,123 @@
# SparkR \*apply() benchmark
This is a performance task for SparkR \*apply API, including spark.lapply, dapply/dapplyCollect, gapply/gapplyCollect.
`define_benchmark.r` generates data in different types and sizes.
`run_benchmark.r` runs tests and save results in results/results.csv and plots in .png files.
## Requirements
- R library: microbenchmark
- databricks/spark
To install microbenchmark, run R script in R shell:
```
install.packages("microbenchmark")
```
Build [Databricks Spark](https://github.com/databricks/spark) locally ([instructions](https://databricks.atlassian.net/wiki/spaces/UN/pages/194805843/0.+Building+and+Running+Spark+Locally)).
Use the path to the root of the above repository as SPARK_HOME, and use it in the shell command below.
## How to run
In shell:
```
sparkr-tests $ ./run_benchmark.sh <your SPARK_HOME> small # run small test (~10 min)
sparkr-tests $ ./run_benchmark.sh <your SPARK_HOME> medium # run medium test (~30 min)
sparkr-tests $ ./run_benchmark.sh <your SPARK_HOME> large # run large test (~90 min)
```
## Synthetic Data
For benchmarking spark.lapply, we generate
```
lists
```
1. with different types (7):
```
Length = 100,
Type = integer,
logical,
double,
character1,
character10,
character100,
character1k.
```
2. with different lengths (4):
```
Type = integer,
Length = 10,
100,
1k,
10k
```
For benchmarking dapply+rnow/dapplyCollect, we generate
```
data.frame
```
1. with different types (7):
```
nrow = 100,
ncol = 1,
Type = integer,
logical,
double,
character1,
character10,
character100,
character1k.
```
2. with different lengths (4):
```
ncol = 1,
Type = integer,
nrow = 10,
100,
1k,
10k
```
3. with different ncols (3):
```
nrow = 100,
Type = integer,
ncol = 1,
10,
100
```
For benchmarking gapply+rnow/gapplyCollect, we generate
```
data.frame
```
1. with different number of keys (3):
```
ncol = 2,
nrow = 1k,
Type = <integer, double>,
nkeys = 10,
100,
1000
```
2. with different lengths (4):
```
ncol = 2,
Type = <integer, double>,
nkeys = 10,
nrow = 10,
100,
1k,
10k
```
3. with different key types (3):
```
ncol = 2,
nkeys = 10,
nrow = 1k,
Type = <integer, double>
<character10, double>
<character100, double>
```

View File

@ -0,0 +1,671 @@
library("SparkR")
library("microbenchmark")
library("magrittr")
library("ggplot2")
library("pryr")
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
dir_path <- "results/"
# For benchmarking spark.lapply, we generate
# lists
# 1. with different types (7):
# Length = 100,
# Type = integer,
# logical,
# double,
# character1,
# character10,
# character100,
# character1k.
data.list.type.double <- runif(100)
data.list.type.int <- as.integer(data.list.type.double * 100)
data.list.type.logical <- data.list.type.double < 0.5
data.list.type.char1 <- sapply(1:100, function(x) { "a" })
data.list.type.char10 <- sapply(1:100, function(x) { "0123456789" })
data.list.type.char100 <- sapply(1:100, function(x) { paste(replicate(20, "hello"), collapse = "") })
data.list.type.char1k <- sapply(1:100, function(x) { paste(replicate(200, "hello"), collapse = "") })
# 2. with different lengths (4):
# Type = integer,
# Length = 10,
# 100,
# 1k,
# 10k
data.list.len.10 <- 1:10
data.list.len.100 <- 1:100
data.list.len.1k <- 1:1000
data.list.len.10k <- 1:10000
# For benchmarking dapply+rnow/dapplyCollect, we generate
# data.frame
# 1. with different types (7):
# nrow = 100,
# ncol = 1,
# Type = integer,
# logical,
# double,
# character1,
# character10,
# character100,
# character1k.
data.df.type.double <- data.frame(data.list.type.double) %>% createDataFrame %>% cache
data.df.type.int <- data.frame(data.list.type.int) %>% createDataFrame %>% cache
data.df.type.logical <- data.frame(data.list.type.logical) %>% createDataFrame %>% cache
data.df.type.char1 <- data.frame(data.list.type.char1) %>% createDataFrame %>% cache
data.df.type.char10 <- data.frame(data.list.type.char10) %>% createDataFrame %>% cache
data.df.type.char100 <- data.frame(data.list.type.char100) %>% createDataFrame %>% cache
data.df.type.char1k <- data.frame(data.list.type.char1k) %>% createDataFrame %>% cache
# counting to materialize the cache
data.df.type.double %>% nrow
data.df.type.int %>% nrow
data.df.type.logical %>% nrow
data.df.type.char1 %>% nrow
data.df.type.char10 %>% nrow
data.df.type.char100 %>% nrow
data.df.type.char1k %>% nrow
# 2. with different lengths (4):
# ncol = 1,
# Type = integer,
# nrow = 10,
# 100,
# 1k,
# 10k
data.df.len.10 <- data.frame(data.list.len.10) %>% createDataFrame %>% cache
data.rdf.len.100 <- data.frame(data.list.len.100)
data.df.len.100 <- data.rdf.len.100 %>% createDataFrame %>% cache
data.df.len.1k <- data.frame(data.list.len.1k) %>% createDataFrame %>% cache
data.df.len.10k <- data.frame(data.list.len.10k) %>% createDataFrame %>% cache
# counting to materialize the cache
data.df.len.10 %>% nrow
data.df.len.100 %>% nrow
data.df.len.1k %>% nrow
data.df.len.10k %>% nrow
# 3. with different ncols (3):
# nrow = 100,
# Type = integer,
# ncol = 1,
# 10,
# 100
data.df.ncol.1 <- data.df.len.100
data.df.ncol.10 <- data.frame(rep(data.rdf.len.100, each = 10)) %>% createDataFrame %>% cache
data.df.ncol.100 <- data.frame(rep(data.rdf.len.100, each = 100)) %>% createDataFrame %>% cache
# counting to materialize the cache
data.df.ncol.1 %>% nrow
data.df.ncol.10 %>% nrow
data.df.ncol.100 %>% nrow
# For benchmarking gapply+rnow/gapplyCollect, we generate
# data.frame
# 1. with different number of keys (3):
# ncol = 2,
# nrow = 1k,
# Type = <integer, double>,
# nkeys = 10,
# 100,
# 1000
data.rand.1k <- runif(1000)
data.df.nkey.10 <- data.frame(key = rep(1:10, 100), val = data.rand.1k) %>% createDataFrame %>% cache
data.df.nkey.100 <- data.frame(key = rep(1:100, 10), val = data.rand.1k) %>% createDataFrame %>% cache
data.df.nkey.1k <- data.frame(key = 1:1000, val = data.rand.1k) %>% createDataFrame %>% cache
# counting to materialize the cache
data.df.nkey.10 %>% nrow
data.df.nkey.100 %>% nrow
data.df.nkey.1k %>% nrow
# 2. with different lengths (4):
# ncol = 2,
# Type = <integer, double>,
# nkeys = 10,
# nrow = 10,
# 100,
# 1k,
# 10k
data.df.nrow.10 <- data.frame(key = 1:10, val = runif(10)) %>% createDataFrame %>% cache
data.df.nrow.100 <- data.frame(key = rep(1:10, 10), val = runif(100)) %>% createDataFrame %>% cache
data.df.nrow.1k <- data.frame(key = rep(1:10, 100), val = data.rand.1k) %>% createDataFrame %>% cache
data.df.nrow.10k <- data.frame(key = rep(1:10, 1000), val = runif(10000)) %>% createDataFrame %>% cache
# counting to materialize the cache
data.df.nrow.10 %>% nrow
data.df.nrow.100 %>% nrow
data.df.nrow.1k %>% nrow
data.df.nrow.10k %>% nrow
# 3. with different key types (3):
# ncol = 2,
# nkeys = 10,
# nrow = 1k,
# Type = <integer, double>
# <character10, double>
# <character100, double>
key.char10 <- sapply(1:10, function(x) { sprintf("%010d", x) })
key.char100 <- sapply(1:10, function(x) { sprintf("%0100d", x) })
data.df.keytype.int <- data.df.nrow.1k
data.df.keytype.char10 <- data.frame(key = rep(key.char10, 100), val = data.rand.1k) %>% createDataFrame %>% cache
data.df.keytype.char100 <- data.frame(key = rep(key.char100, 10), val = data.rand.1k) %>% createDataFrame %>% cache
# counting to materialize the cache
data.df.keytype.int %>% nrow
data.df.keytype.char10 %>% nrow
data.df.keytype.char100 %>% nrow
# ========== benchmark functions ==============
# plot utils
plot.box.mbm <- function(mbm_result) {
ggplot(mbm_result, aes(x = expr, y = time/1000000)) +
geom_boxplot(outlier.colour = "red", outlier.shape = 1) + labs(x = "", y = "Time (milliseconds)") +
geom_jitter(position=position_jitter(0.2), alpha = 0.2) +
coord_flip()
}
plot.box.throughput <- function(throughput_result) {
ggplot(throughput_result, aes(x = expr, y = throughput)) +
geom_boxplot(outlier.colour = "red", outlier.shape = 1) + labs(x = "", y = "Throughput (B/s)") +
geom_jitter(position=position_jitter(0.2), alpha = 0.2) +
coord_flip()
}
# dummy function
func.dummy <- function(x) { x }
# group function
gfunc.mean <- function(key, sdf) {
y <- data.frame(key, mean(sdf$val), stringsAsFactors = F)
names(y) <- c("key", "val")
y
}
# lapply, type
run.mbm.spark.lapply.type <- function(n) {
microbenchmark(
"lapply.int.100" = { spark.lapply(data.list.type.int, func.dummy) },
"lapply.double.100" = { spark.lapply(data.list.type.double, func.dummy) },
"lapply.logical.100" = { spark.lapply(data.list.type.logical, func.dummy) },
"lapply.char1.100" = { spark.lapply(data.list.type.char1, func.dummy) },
"lapply.char10.100" = { spark.lapply(data.list.type.char10, func.dummy) },
"lapply.char100.100" = { spark.lapply(data.list.type.char100, func.dummy) },
"lapply.char1k.100" = { spark.lapply(data.list.type.char1k, func.dummy) },
times = n
)
}
# lapply, len
run.mbm.spark.lapply.len <- function(mode, n) {
if (mode == "small") {
microbenchmark(
"lapply.len.10" = { spark.lapply(data.list.len.10, func.dummy) },
"lapply.len.100" = { spark.lapply(data.list.len.100, func.dummy) },
times = n
)
} else if (mode == "medium") {
microbenchmark(
"lapply.len.10" = { spark.lapply(data.list.len.10, func.dummy) },
"lapply.len.100" = { spark.lapply(data.list.len.100, func.dummy) },
"lapply.len.1k" = { spark.lapply(data.list.len.1k, func.dummy) },
times = n
)
} else {
microbenchmark(
"lapply.len.10" = { spark.lapply(data.list.len.10, func.dummy) },
"lapply.len.100" = { spark.lapply(data.list.len.100, func.dummy) },
"lapply.len.1k" = { spark.lapply(data.list.len.1k, func.dummy) },
"lapply.len.10k" = { spark.lapply(data.list.len.10k, func.dummy) },
times = n
)
}
}
# dapply, type
run.mbm.dapply.type <- function(n) {
microbenchmark(
"dapply.int.100" = { dapply(data.df.type.int, func.dummy, schema(data.df.type.int)) %>% nrow },
"dapply.double.100" = { dapply(data.df.type.double, func.dummy, schema(data.df.type.double)) %>% nrow },
"dapply.logical.100" = { dapply(data.df.type.logical, func.dummy, schema(data.df.type.logical)) %>% nrow },
"dapply.char1.100" = { dapply(data.df.type.char1, func.dummy, schema(data.df.type.char1)) %>% nrow },
"dapply.char10.100" = { dapply(data.df.type.char10, func.dummy, schema(data.df.type.char10)) %>% nrow },
"dapply.char100.100" = { dapply(data.df.type.char100, func.dummy, schema(data.df.type.char100)) %>% nrow },
"dapply.char1k.100" = { dapply(data.df.type.char1k, func.dummy, schema(data.df.type.char1k)) %>% nrow },
times = n
)
}
# dapply, len
run.mbm.dapply.len <- function(mode, n) {
if (mode == "small") {
microbenchmark(
"dapply.len.10" = { dapply(data.df.len.10, func.dummy, schema(data.df.len.10)) %>% nrow },
"dapply.len.100" = { dapply(data.df.len.100, func.dummy, schema(data.df.len.100)) %>% nrow },
times = n
)
} else if (mode == "medium") {
microbenchmark(
"dapply.len.10" = { dapply(data.df.len.10, func.dummy, schema(data.df.len.10)) %>% nrow },
"dapply.len.100" = { dapply(data.df.len.100, func.dummy, schema(data.df.len.100)) %>% nrow },
"dapply.len.1k" = { dapply(data.df.len.1k, func.dummy, schema(data.df.len.1k)) %>% nrow },
times = n
)
} else {
microbenchmark(
"dapply.len.10" = { dapply(data.df.len.10, func.dummy, schema(data.df.len.10)) %>% nrow },
"dapply.len.100" = { dapply(data.df.len.100, func.dummy, schema(data.df.len.100)) %>% nrow },
"dapply.len.1k" = { dapply(data.df.len.1k, func.dummy, schema(data.df.len.1k)) %>% nrow },
"dapply.len.10k" = { dapply(data.df.len.10k, func.dummy, schema(data.df.len.10k)) %>% nrow },
times = n
)
}
}
# dapply, ncol
run.mbm.dapply.ncol <- function(mode, n) {
if (mode == "small") {
microbenchmark(
"dapply.ncol.1" = { dapply(data.df.ncol.1, func.dummy, schema(data.df.ncol.1)) %>% nrow },
"dapply.ncol.10" = { dapply(data.df.ncol.10, func.dummy, schema(data.df.ncol.10)) %>% nrow },
times = n
)
} else {
microbenchmark(
"dapply.ncol.1" = { dapply(data.df.ncol.1, func.dummy, schema(data.df.ncol.1)) %>% nrow },
"dapply.ncol.10" = { dapply(data.df.ncol.10, func.dummy, schema(data.df.ncol.10)) %>% nrow },
"dapply.ncol.100" = { dapply(data.df.ncol.100, func.dummy, schema(data.df.ncol.100)) %>% nrow },
times = n
)
}
}
# dapplyCollect, type
run.mbm.dapplyCollect.type <- function(n) {
microbenchmark(
"dapplyCollect.int.100" = { dapplyCollect(data.df.type.int, func.dummy) },
"dapplyCollect.double.100" = { dapplyCollect(data.df.type.double, func.dummy) },
"dapplyCollect.logical.100" = { dapplyCollect(data.df.type.logical, func.dummy) },
"dapplyCollect.char1.100" = { dapplyCollect(data.df.type.char1, func.dummy) },
"dapplyCollect.char10.100" = { dapplyCollect(data.df.type.char10, func.dummy) },
"dapplyCollect.char100.100" = { dapplyCollect(data.df.type.char100, func.dummy) },
"dapplyCollect.char1k.100" = { dapplyCollect(data.df.type.char1k, func.dummy) },
times = n
)
}
# dapplyCollect, len
run.mbm.dapplyCollect.len <- function(mode, n) {
if (mode != "large") {
microbenchmark(
"dapplyCollect.len.10" = { dapplyCollect(data.df.len.10, func.dummy) },
"dapplyCollect.len.100" = { dapplyCollect(data.df.len.100, func.dummy) },
"dapplyCollect.len.1k" = { dapplyCollect(data.df.len.1k, func.dummy) },
times = n
)
} else {
microbenchmark(
"dapplyCollect.len.10" = { dapplyCollect(data.df.len.10, func.dummy) },
"dapplyCollect.len.100" = { dapplyCollect(data.df.len.100, func.dummy) },
"dapplyCollect.len.1k" = { dapplyCollect(data.df.len.1k, func.dummy) },
"dapplyCollect.len.10k" = { dapplyCollect(data.df.len.10k, func.dummy) },
times = n
)
}
}
# dapplyCollect, ncol
run.mbm.dapplyCollect.ncol <- function(mode, n) {
if (mode == "small") {
microbenchmark(
"dapplyCollect.ncol.1" = { dapplyCollect(data.df.ncol.1, func.dummy) },
"dapplyCollect.ncol.10" = { dapplyCollect(data.df.ncol.10, func.dummy) },
times = n
)
} else {
microbenchmark(
"dapplyCollect.ncol.1" = { dapplyCollect(data.df.ncol.1, func.dummy) },
"dapplyCollect.ncol.10" = { dapplyCollect(data.df.ncol.10, func.dummy) },
"dapplyCollect.ncol.100" = { dapplyCollect(data.df.ncol.100, func.dummy) },
times = n
)
}
}
# gapply, nkey
run.mbm.gapply.nkey <- function(mode, n) {
if (mode == "small") {
microbenchmark(
"gapply.nkey.10" = { gapply(data.df.nkey.10, data.df.nkey.10$key, gfunc.mean, schema(data.df.nkey.10)) %>% nrow },
"gapply.nkey.100" = { gapply(data.df.nkey.100, data.df.nkey.100$key, gfunc.mean, schema(data.df.nkey.100)) %>% nrow },
times = n
)
} else if (mode == "medium") {
microbenchmark(
"gapply.nkey.10" = { gapply(data.df.nkey.10, data.df.nkey.10$key, gfunc.mean, schema(data.df.nkey.10)) %>% nrow },
"gapply.nkey.100" = { gapply(data.df.nkey.100, data.df.nkey.100$key, gfunc.mean, schema(data.df.nkey.100)) %>% nrow },
"gapply.nkey.1k" = { gapply(data.df.nkey.1k, data.df.nkey.1k$key, gfunc.mean, schema(data.df.nkey.1k)) %>% nrow },
times = n
)
} else {
microbenchmark(
"gapply.nkey.10" = { gapply(data.df.nkey.10, data.df.nkey.10$key, gfunc.mean, schema(data.df.nkey.10)) %>% nrow },
"gapply.nkey.100" = { gapply(data.df.nkey.100, data.df.nkey.100$key, gfunc.mean, schema(data.df.nkey.100)) %>% nrow },
"gapply.nkey.1k" = { gapply(data.df.nkey.1k, data.df.nkey.1k$key, gfunc.mean, schema(data.df.nkey.1k)) %>% nrow },
times = n
)
}
}
# gapply, nrow
run.mbm.gapply.nrow <- function(mode, n) {
if (mode == "small") {
microbenchmark(
"gapply.nrow.10" = { gapply(data.df.nrow.10, data.df.nrow.10$key, gfunc.mean, schema(data.df.nrow.10)) %>% nrow },
"gapply.nrow.100" = { gapply(data.df.nrow.100, data.df.nrow.100$key, gfunc.mean, schema(data.df.nrow.100)) %>% nrow },
times = n
)
} else if (mode == "medium") {
microbenchmark(
"gapply.nrow.10" = { gapply(data.df.nrow.10, data.df.nrow.10$key, gfunc.mean, schema(data.df.nrow.10)) %>% nrow },
"gapply.nrow.100" = { gapply(data.df.nrow.100, data.df.nrow.100$key, gfunc.mean, schema(data.df.nrow.100)) %>% nrow },
"gapply.nrow.1k" = { gapply(data.df.nrow.1k, data.df.nrow.1k$key, gfunc.mean, schema(data.df.nrow.1k)) %>% nrow },
times = n
)
} else {
microbenchmark(
"gapply.nrow.10" = { gapply(data.df.nrow.10, data.df.nrow.10$key, gfunc.mean, schema(data.df.nrow.10)) %>% nrow },
"gapply.nrow.100" = { gapply(data.df.nrow.100, data.df.nrow.100$key, gfunc.mean, schema(data.df.nrow.100)) %>% nrow },
"gapply.nrow.1k" = { gapply(data.df.nrow.1k, data.df.nrow.1k$key, gfunc.mean, schema(data.df.nrow.1k)) %>% nrow },
"gapply.nrow.10k" = { gapply(data.df.nrow.10k, data.df.nrow.10k$key, gfunc.mean, schema(data.df.nrow.10k)) %>% nrow },
times = n
)
}
}
# gapply, keytype
run.mbm.gapply.keytype <- function(n) {
microbenchmark(
"gapply.keytype.int" = { gapply(data.df.keytype.int, data.df.keytype.int$key, gfunc.mean, schema(data.df.keytype.int)) %>% nrow },
"gapply.keytype.char10" = { gapply(data.df.keytype.char10, data.df.keytype.char10$key, gfunc.mean, schema(data.df.keytype.char10)) %>% nrow },
"gapply.keytype.char100" = { gapply(data.df.keytype.char100, data.df.keytype.char100$key, gfunc.mean, schema(data.df.keytype.char100)) %>% nrow },
times = n
)
}
# gapplyCollect, nkey
run.mbm.gapplyCollect.nkey <- function(mode, n) {
if (mode == "small") {
microbenchmark(
"gapplyCollect.nkey.10" = { gapplyCollect(data.df.nkey.10, data.df.nkey.10$key, gfunc.mean) },
"gapplyCollect.nkey.100" = { gapplyCollect(data.df.nkey.100, data.df.nkey.100$key, gfunc.mean) },
times = n
)
} else if (mode == "medium") {
microbenchmark(
"gapplyCollect.nkey.10" = { gapplyCollect(data.df.nkey.10, data.df.nkey.10$key, gfunc.mean) },
"gapplyCollect.nkey.100" = { gapplyCollect(data.df.nkey.100, data.df.nkey.100$key, gfunc.mean) },
"gapplyCollect.nkey.1k" = { gapplyCollect(data.df.nkey.1k, data.df.nkey.1k$key, gfunc.mean) },
times = n
)
} else {
microbenchmark(
"gapplyCollect.nkey.10" = { gapplyCollect(data.df.nkey.10, data.df.nkey.10$key, gfunc.mean) },
"gapplyCollect.nkey.100" = { gapplyCollect(data.df.nkey.100, data.df.nkey.100$key, gfunc.mean) },
"gapplyCollect.nkey.1k" = { gapplyCollect(data.df.nkey.1k, data.df.nkey.1k$key, gfunc.mean) },
times = n
)
}
}
# gapplyCollect, nrow
run.mbm.gapplyCollect.nrow <- function(mode, n) {
if (mode == "small") {
microbenchmark(
"gapplyCollect.nrow.10" = { gapplyCollect(data.df.nrow.10, data.df.nrow.10$key, gfunc.mean) },
"gapplyCollect.nrow.100" = { gapplyCollect(data.df.nrow.100, data.df.nrow.100$key, gfunc.mean) },
times = n
)
} else if (mode == "medium") {
microbenchmark(
"gapplyCollect.nrow.10" = { gapplyCollect(data.df.nrow.10, data.df.nrow.10$key, gfunc.mean) },
"gapplyCollect.nrow.100" = { gapplyCollect(data.df.nrow.100, data.df.nrow.100$key, gfunc.mean) },
"gapplyCollect.nrow.1k" = { gapplyCollect(data.df.nrow.1k, data.df.nrow.1k$key, gfunc.mean) },
times = n
)
} else {
microbenchmark(
"gapplyCollect.nrow.10" = { gapplyCollect(data.df.nrow.10, data.df.nrow.10$key, gfunc.mean) },
"gapplyCollect.nrow.100" = { gapplyCollect(data.df.nrow.100, data.df.nrow.100$key, gfunc.mean) },
"gapplyCollect.nrow.1k" = { gapplyCollect(data.df.nrow.1k, data.df.nrow.1k$key, gfunc.mean) },
"gapplyCollect.nrow.10k" = { gapplyCollect(data.df.nrow.10k, data.df.nrow.10k$key, gfunc.mean) },
times = n
)
}
}
# gapplyCollect, keytype
run.mbm.gapplyCollect.keytype <- function(n) {
microbenchmark(
"gapplyCollect.keytype.int" = { gapplyCollect(data.df.keytype.int, data.df.keytype.int$key, gfunc.mean) },
"gapplyCollect.keytype.char10" = { gapplyCollect(data.df.keytype.char10, data.df.keytype.char10$key, gfunc.mean) },
"gapplyCollect.keytype.char100" = { gapplyCollect(data.df.keytype.char100, data.df.keytype.char100$key, gfunc.mean) },
times = n
)
}
# ============== compute object sizes ==================
ser <- function(data) {
serialize(data, connection = NULL)
}
get.sizes <- function(obj_names, objs) {
obj_sizes <- objs %>% lapply(ser) %>% lapply(object.size)
data.frame(cbind(obj_names, obj_sizes))
}
# lapply
sizes.lapply.type <- get.sizes(
list(
"lapply.double.100",
"lapply.int.100",
"lapply.logical.100",
"lapply.char1.100",
"lapply.char10.100",
"lapply.char100.100",
"lapply.char1k.100"),
list(
data.list.type.double,
data.list.type.int,
data.list.type.logical,
data.list.type.char1,
data.list.type.char10,
data.list.type.char100,
data.list.type.char1k)
)
sizes.lapply.len <- get.sizes(
list(
"lapply.len.10",
"lapply.len.100",
"lapply.len.1k",
"lapply.len.10k"),
list(
data.list.len.10,
data.list.len.100,
data.list.len.1k,
data.list.len.10k)
)
# data for dapply
df.dapply.type <- lapply(list(
data.list.type.double,
data.list.type.int,
data.list.type.logical,
data.list.type.char1,
data.list.type.char10,
data.list.type.char100,
data.list.type.char1k), data.frame)
df.dapply.len <- lapply(list(
data.list.len.10,
data.list.len.100,
data.list.len.1k,
data.list.len.10k), data.frame)
df.dapply.ncol <- lapply(list(
data.list.len.100,
rep(data.rdf.len.100, each = 10),
rep(data.rdf.len.100, each = 100)), data.frame)
# dapply
sizes.dapply.type <- get.sizes(
list(
"dapply.double.100",
"dapply.int.100",
"dapply.logical.100",
"dapply.char1.100",
"dapply.char10.100",
"dapply.char100.100",
"dapply.char1k.100"),
df.dapply.type
)
sizes.dapply.len <- get.sizes(
list(
"dapply.len.10",
"dapply.len.100",
"dapply.len.1k",
"dapply.len.10k"),
df.dapply.len
)
sizes.dapply.ncol <- get.sizes(
list(
"dapply.ncol.1",
"dapply.ncol.10",
"dapply.ncol.100"),
df.dapply.ncol
)
# dapplyCollect
sizes.dapplyCollect.type <- get.sizes(
list(
"dapplyCollect.double.100",
"dapplyCollect.int.100",
"dapplyCollect.logical.100",
"dapplyCollect.char1.100",
"dapplyCollect.char10.100",
"dapplyCollect.char100.100",
"dapplyCollect.char1k.100"),
df.dapply.type
)
sizes.dapplyCollect.len <- get.sizes(
list(
"dapplyCollect.len.10",
"dapplyCollect.len.100",
"dapplyCollect.len.1k",
"dapplyCollect.len.10k"),
df.dapply.len
)
sizes.dapplyCollect.ncol <- get.sizes(
list(
"dapplyCollect.ncol.1",
"dapplyCollect.ncol.10",
"dapplyCollect.ncol.100"),
df.dapply.ncol
)
# data for gapply
df.gapply.nkey <- list(
data.frame(key = rep(1:10, 100), val = data.rand.1k),
data.frame(key = rep(1:100, 10), val = data.rand.1k),
data.frame(key = 1:1000, val = data.rand.1k))
df.gapply.nrow <- list(
data.frame(key = 1:10, val = runif(10)),
data.frame(key = rep(1:10, 10), val = runif(100)),
data.frame(key = rep(1:10, 100), val = data.rand.1k),
data.frame(key = rep(1:10, 1000), val = runif(10000)))
df.gapply.keytype <- list(
data.frame(key = rep(1:10, 100), val = data.rand.1k),
data.frame(key = rep(key.char10, 100), val = data.rand.1k),
data.frame(key = rep(key.char100, 10), val = data.rand.1k))
# gapply
sizes.gapply.nkey <- get.sizes(
list(
"gapply.nkey.10",
"gapply.nkey.100",
"gapply.nkey.1k"),
df.gapply.nkey
)
sizes.gapply.nrow <- get.sizes(
list(
"gapply.nrow.10",
"gapply.nrow.100",
"gapply.nrow.1k",
"gapply.nrow.10k"),
df.gapply.nrow
)
sizes.gapply.keytype <- get.sizes(
list(
"gapply.keytype.int",
"gapply.keytype.char10",
"gapply.keytype.char100"),
df.gapply.keytype
)
# gapplyCollect
sizes.gapplyCollect.nkey <- get.sizes(
list(
"gapplyCollect.nkey.10",
"gapplyCollect.nkey.100",
"gapplyCollect.nkey.1k"),
df.gapply.nkey
)
sizes.gapplyCollect.nrow <- get.sizes(
list(
"gapplyCollect.nrow.10",
"gapplyCollect.nrow.100",
"gapplyCollect.nrow.1k",
"gapplyCollect.nrow.10k"),
df.gapply.nrow
)
sizes.gapplyCollect.keytype <- get.sizes(
list(
"gapplyCollect.keytype.int",
"gapplyCollect.keytype.char10",
"gapplyCollect.keytype.char100"),
df.gapply.keytype
)
df.sizes <- rbind(
sizes.lapply.type,
sizes.lapply.len,
sizes.dapply.type,
sizes.dapply.len,
sizes.dapply.ncol,
sizes.dapplyCollect.type,
sizes.dapplyCollect.len,
sizes.dapplyCollect.ncol,
sizes.gapply.nkey,
sizes.gapply.nrow,
sizes.gapply.keytype,
sizes.gapplyCollect.nkey,
sizes.gapplyCollect.nrow,
sizes.gapplyCollect.keytype)
df.sizes$obj_names <- unlist(df.sizes$obj_names)
df.sizes$obj_sizes <- unlist(df.sizes$obj_sizes)

165
src/main/R/run_benchmark.r Normal file
View File

@ -0,0 +1,165 @@
# ========== generating results =================
source("define_benchmark.r")
args = commandArgs(trailingOnly=TRUE)
if (args[1] == "small") {
mode = "small"
n1 = 10
n2 = 10
} else if (args[1] == "medium") {
mode = "medium"
n1 = 10
n2 = 20
} else {
mode = "large"
n1 = 20
n2 = 20
}
N = 20
# lapply, type
mbm.spark.lapply.type <- run.mbm.spark.lapply.type(n2)
p <- mbm.spark.lapply.type %>% plot.box.mbm
filename <- sprintf("%slapply.type.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# lapply, len
mbm.spark.lapply.len <- run.mbm.spark.lapply.len(mode, n1)
p <- mbm.spark.lapply.len %>% plot.box.mbm
filename <- sprintf("%slapply.len.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# dapply, type
mbm.dapply.type <- run.mbm.dapply.type(n2)
p <- mbm.dapply.type %>% plot.box.mbm
filename <- sprintf("%sdapply.type.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# dapply, len
mbm.dapply.len <- run.mbm.dapply.len(mode, n1)
p <- mbm.dapply.len %>% plot.box.mbm
filename <- sprintf("%sdapply.len.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# dapply, ncol
mbm.dapply.ncol <- run.mbm.dapply.ncol(mode, n1)
p <- mbm.dapply.ncol %>% plot.box.mbm
filename <- sprintf("%sdapply.ncol.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# dapplyCollect, type
mbm.dapplyCollect.type <- run.mbm.dapplyCollect.type(N)
p <- mbm.dapplyCollect.type %>% plot.box.mbm
filename <- sprintf("%sdapplyCollect.type.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# dapplyCollect, len
mbm.dapplyCollect.len <- run.mbm.dapplyCollect.len(mode, N)
p <- mbm.dapplyCollect.len %>% plot.box.mbm
filename <- sprintf("%sdapplyCollect.len.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# dapplyCollect, ncol
mbm.dapplyCollect.ncol <- run.mbm.dapplyCollect.ncol(mode, n1)
p <- mbm.dapplyCollect.ncol %>% plot.box.mbm
filename <- sprintf("%sdapplyCollect.ncol.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# gapply, nkey
mbm.gapply.nkey <- run.mbm.gapply.nkey(mode, n1)
p <- mbm.gapply.nkey %>% plot.box.mbm
filename <- sprintf("%sgapply.nkey.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# gapply, nrow
mbm.gapply.nrow <- run.mbm.gapply.nrow(mode, n1)
p <- mbm.gapply.nrow %>% plot.box.mbm
filename <- sprintf("%sgapply.nrow.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# gapply, keytype
mbm.gapply.keytype <- run.mbm.gapply.keytype(n1)
p <- mbm.gapply.keytype %>% plot.box.mbm
filename <- sprintf("%sgapply.keytype.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# gapplyCollect, nkey
mbm.gapplyCollect.nkey <- run.mbm.gapplyCollect.nkey(mode, n1)
p <- mbm.gapplyCollect.nkey %>% plot.box.mbm
filename <- sprintf("%sgapplyCollect.nkey.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# gapplyCollect, nrow
mbm.gapplyCollect.nrow <- run.mbm.gapplyCollect.nrow(mode, n1)
p <- mbm.gapplyCollect.nrow %>% plot.box.mbm
filename <- sprintf("%sgapplyCollect.nrow.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# gapplyCollect, keytype
mbm.gapplyCollect.keytype <- run.mbm.gapplyCollect.keytype(n1)
p <- mbm.gapplyCollect.keytype %>% plot.box.mbm
filename <- sprintf("%sgapplyCollect.keytype.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
tmp <- rbind(
mbm.spark.lapply.type,
mbm.spark.lapply.len,
mbm.dapply.type,
mbm.dapply.len,
mbm.dapply.ncol,
mbm.dapplyCollect.type,
mbm.dapplyCollect.len,
mbm.dapplyCollect.ncol,
mbm.gapply.nkey,
mbm.gapply.nrow,
mbm.gapply.keytype,
mbm.gapplyCollect.nkey,
mbm.gapplyCollect.nrow,
mbm.gapplyCollect.keytype)
# compute throughput
tmp_size <- merge(tmp, df.sizes, by.x = "expr", by.y = "obj_names", all.x=TRUE)
tmp_size$throughput <- round(tmp_size$obj_sizes*1000000/tmp_size$time, digits=2) # bytes per second
# plot throughput
p <- tmp_size %>% plot.box.throughput
filename <- sprintf("%sall.throughput.%s.png", dir_path, mode)
ggsave(filename, width=7, height=6)
# save raw data to csv file
towrite <- tmp_size[order(tmp_size$expr, tmp_size$time),]
write.csv(towrite, file="results/results.csv", row.names = F)
# save mean value in ml.perf_metrics format
# timestamp: timestamp, benchmarkId: string, benchmarkName: string,
# metricName: string, metricValue: string, isLargerBetter: boolean, parameters map<string, string>
op <- options(digits.secs = 3)
curTimestamp <- Sys.time()
benchmarkName <- "com.databricks.spark.sql.perf.sparkr.UserDefinedFunction"
metricName <- "throughput.byte.per.second"
isLargerBetter <- TRUE
perf_metric <- aggregate(towrite$throughput, list(towrite$expr), mean)
names(perf_metric) <- c("benchmarkId", "throughput")
perf_metric$timestamp <- curTimestamp
perf_metric$benchmarkName <- benchmarkName
perf_metric$metricName <- metricName
perf_metric$isLargerBetter <- isLargerBetter
perf_metric$parameters <- NULL
write.csv(perf_metric, file="results/perf_metrics.csv", row.names = F)

29
src/main/R/run_benchmark.sh Executable file
View File

@ -0,0 +1,29 @@
#!/bin/bash
# Usage: ./run_benchmark.sh /Users/liang/spark small
if [[ ($# -ne 1 && $# -ne 2) ]]; then
echo "Usage: $0 <your SPARK_HOME> [small/medium/large] (default: small)" >&2
exit 1
fi
if ! [ -e "$1" ]; then
echo "$1 not found" >&2
exit 1
fi
if ! [ -d "$1" ]; then
echo "$1 not a directory" >&2
exit 1
fi
if [[ ( $# -eq 2 && ($2 -ne "small" || $2 -ne "medium" || $2 -ne "large" ) ) ]]; then
echo "The second argument should be 'small' or 'medium' or 'large'" >&2
exit 1
fi
mkdir -p results
export SPARK_HOME=$1
export R_LIBS_USER=$SPARK_HOME/R/lib
if [[ $# -eq 1 ]]; then
Rscript run_benchmark.r small
else
Rscript run_benchmark.r $2
fi