Compare commits

..

3 Commits

Author SHA1 Message Date
Xiangrui Meng
7f454dffec specify scala version in travis 2018-06-14 10:20:19 -07:00
Lu WANG
322efe9343 rm scala 2.10 2018-06-14 10:10:20 -07:00
Lu WANG
cb9479a58e spark-sql-perf with spark2.3 2018-06-14 09:38:20 -07:00
40 changed files with 232 additions and 1884 deletions

View File

@ -1,8 +1,7 @@
language: scala
scala:
- 2.12.10
- 2.11.8
sudo: false
dist: trusty
jdk:
oraclejdk8
cache:

View File

@ -18,8 +18,6 @@ Usage: spark-sql-perf [options]
-b <value> | --benchmark <value>
the name of the benchmark to run
-m <value> | --master <value
the master url to use
-f <value> | --filter <value>
a filter on the name of the queries to run
-i <value> | --iterations <value>
@ -34,18 +32,7 @@ The first run of `bin/run` will build the library.
## Build
Use `sbt package` or `sbt assembly` to build the library jar.
Use `sbt +package` to build for scala 2.11 and 2.12.
## Local performance tests
The framework contains twelve benchmarks that can be executed in local mode. They are organized into three classes and target different components and functions of Spark:
* [DatasetPerformance](https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/DatasetPerformance.scala) compares the performance of the old RDD API with the new Dataframe and Dataset APIs.
These benchmarks can be launched with the command `bin/run --benchmark DatasetPerformance`
* [JoinPerformance](https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala) compares the performance of joining different table sizes and shapes with different join types.
These benchmarks can be launched with the command `bin/run --benchmark JoinPerformance`
* [AggregationPerformance](https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala) compares the performance of aggregating different table sizes using different aggregation types.
These benchmarks can be launched with the command `bin/run --benchmark AggregationPerformance`
Use `sbt package` or `sbt assembly` to build the library jar.
# MLlib tests
@ -67,11 +54,31 @@ TPCDS kit needs to be installed on all cluster executor nodes under the same pat
It can be found [here](https://github.com/databricks/tpcds-kit).
```
// Generate the data
build/sbt "test:runMain com.databricks.spark.sql.perf.tpcds.GenTPCDSData -d <dsdgenDir> -s <scaleFactor> -l <location> -f <format>"
```
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
// Set:
val rootDir = ... // root directory of location to create data in.
val databaseName = ... // name of database to create.
val scaleFactor = ... // scaleFactor defines the size of the dataset to generate (in GB).
val format = ... // valid spark format like parquet "parquet".
// Run:
val tables = new TPCDSTables(sqlContext,
dsdgenDir = "/tmp/tpcds-kit/tools", // location of dsdgen
scaleFactor = scaleFactor,
useDoubleForDecimal = false, // true to replace DecimalType with DoubleType
useStringForDate = false) // true to replace DateType with StringType
tables.genData(
location = rootDir,
format = format,
overwrite = true, // overwrite the data that is already there
partitionTables = true, // create the partitioned fact tables
clusterByPartitionColumns = true, // shuffle to get partitions coalesced into single files.
filterOutNullPartitionValues = false, // true to filter out the partition with NULL key value
tableFilter = "", // "" means generate all tables
numPartitions = 100) // how many dsdgen partitions to run - number of input tasks.
```
// Create the specified database
sql(s"create database $databaseName")
// Create metastore tables in a specified database for your data.
@ -133,21 +140,9 @@ experiment.getCurrentResults // or: spark.read.json(resultLocation).filter("time
.select('Name, 'Runtime)
```
# TPC-H
## Running in Databricks.
TPC-H can be run similarly to TPC-DS replacing `tpcds` for `tpch`.
Take a look at the data generator and `tpch_run` notebook code below.
## Running in Databricks workspace (or spark-shell)
There are example notebooks in `src/main/notebooks` for running TPCDS and TPCH in the Databricks environment.
_These scripts can also be run from spark-shell command line with minor modifications using `:load file_name.scala`._
### TPC-multi_datagen notebook
This notebook (or scala script) can be use to generate both TPCDS and TPCH data at selected scale factors.
It is a newer version from the `tpcds_datagen` notebook below. To use it:
* Edit the config variables the top of the script.
* Run the whole notebook.
There are example notebooks in `src/main/notebooks` for running TPCDS in the Databricks environment.
### tpcds_datagen notebook
@ -165,7 +160,3 @@ For running parallel TPCDS streams:
* Create a Job using the notebook and attaching to the created cluster as "existing cluster".
* Allow concurrent runs of the created job.
* Launch appriopriate number of Runs of the Job to run in parallel on the cluster.
### tpch_run notebook
This notebook can be used to run TPCH queries. Data needs be generated first.

View File

@ -5,16 +5,16 @@ name := "spark-sql-perf"
organization := "com.databricks"
scalaVersion := "2.12.10"
scalaVersion := "2.11.8"
crossScalaVersions := Seq("2.12.10")
crossScalaVersions := Seq("2.11.8")
sparkPackageName := "databricks/spark-sql-perf"
// All Spark Packages need a license
licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0"))
sparkVersion := "3.0.0"
sparkVersion := "2.3.0"
sparkComponents ++= Seq("sql", "hive", "mllib")
@ -32,13 +32,17 @@ initialCommands in console :=
|import sqlContext.implicits._
""".stripMargin
libraryDependencies += "com.github.scopt" %% "scopt" % "3.7.1"
libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.5"
libraryDependencies += "com.twitter" %% "util-jvm" % "6.45.0" % "provided"
libraryDependencies += "com.github.scopt" %% "scopt" % "3.3.0"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test"
libraryDependencies += "com.twitter" %% "util-jvm" % "6.23.0" % "provided"
libraryDependencies += "org.yaml" % "snakeyaml" % "1.23"
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.1" % "test"
libraryDependencies += "org.yaml" % "snakeyaml" % "1.17"
libraryDependencies += "com.typesafe.scala-logging" %% "scala-logging-slf4j" % "2.1.2"
fork := true

View File

@ -26,13 +26,6 @@ else
declare java_cmd=java
fi
if test -x "$SBT_HOME"; then
echo -e "Using $SBT_HOME as default SBT_HOME - should be the jar name!"
# Could be at /usr/share/sbt-launcher-packaging/bin/sbt-launch.jar
# so this would be export SBT_HOME=/usr/share/sbt-launcher-packaging/bin/sbt-launch.jar
sbt_jar=${SBT_HOME}
fi
echoerr () {
echo 1>&2 "$@"
}
@ -172,9 +165,7 @@ process_args () {
}
run() {
# first check SBT_HOME is present so we use what's already available
sbt_jar=$SBT_HOME
# if there's no jar let's download it.
# no jar? download it.
[[ -f "$sbt_jar" ]] || acquire_sbt_jar "$sbt_version" || {
# still no jar? uh-oh.
echo "Download failed. Obtain the sbt-launch.jar manually and place it at $sbt_jar"

View File

@ -1,2 +1,2 @@
// This file should only contain the version of sbt to use.
sbt.version=0.13.18
sbt.version=0.13.8

View File

@ -1,6 +1,6 @@
// You may use this file to add plugin dependencies for sbt.
resolvers += "Spark Packages repo" at "https://repos.spark-packages.org/"
resolvers += "Spark Packages repo" at "https://dl.bintray.com/spark-packages/maven/"
resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/"
@ -14,4 +14,4 @@ addSbtPlugin("com.databricks" %% "sbt-databricks" % "0.1.3")
addSbtPlugin("me.lessis" % "bintray-sbt" % "0.3.0")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")

View File

@ -1,2 +0,0 @@
results
metastore_db

View File

@ -1,123 +0,0 @@
# SparkR \*apply() benchmark
This is a performance task for SparkR \*apply API, including spark.lapply, dapply/dapplyCollect, gapply/gapplyCollect.
`define_benchmark.r` generates data in different types and sizes.
`run_benchmark.r` runs tests and save results in results/results.csv and plots in .png files.
## Requirements
- R library: microbenchmark
- databricks/spark
To install microbenchmark, run R script in R shell:
```
install.packages("microbenchmark")
```
Build [Databricks Spark](https://github.com/databricks/spark) locally ([instructions](https://databricks.atlassian.net/wiki/spaces/UN/pages/194805843/0.+Building+and+Running+Spark+Locally)).
Use the path to the root of the above repository as SPARK_HOME, and use it in the shell command below.
## How to run
In shell:
```
sparkr-tests $ ./run_benchmark.sh <your SPARK_HOME> small # run small test (~10 min)
sparkr-tests $ ./run_benchmark.sh <your SPARK_HOME> medium # run medium test (~30 min)
sparkr-tests $ ./run_benchmark.sh <your SPARK_HOME> large # run large test (~90 min)
```
## Synthetic Data
For benchmarking spark.lapply, we generate
```
lists
```
1. with different types (7):
```
Length = 100,
Type = integer,
logical,
double,
character1,
character10,
character100,
character1k.
```
2. with different lengths (4):
```
Type = integer,
Length = 10,
100,
1k,
10k
```
For benchmarking dapply+rnow/dapplyCollect, we generate
```
data.frame
```
1. with different types (7):
```
nrow = 100,
ncol = 1,
Type = integer,
logical,
double,
character1,
character10,
character100,
character1k.
```
2. with different lengths (4):
```
ncol = 1,
Type = integer,
nrow = 10,
100,
1k,
10k
```
3. with different ncols (3):
```
nrow = 100,
Type = integer,
ncol = 1,
10,
100
```
For benchmarking gapply+rnow/gapplyCollect, we generate
```
data.frame
```
1. with different number of keys (3):
```
ncol = 2,
nrow = 1k,
Type = <integer, double>,
nkeys = 10,
100,
1000
```
2. with different lengths (4):
```
ncol = 2,
Type = <integer, double>,
nkeys = 10,
nrow = 10,
100,
1k,
10k
```
3. with different key types (3):
```
ncol = 2,
nkeys = 10,
nrow = 1k,
Type = <integer, double>
<character10, double>
<character100, double>
```

View File

@ -1,671 +0,0 @@
library("SparkR")
library("microbenchmark")
library("magrittr")
library("ggplot2")
library("pryr")
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
dir_path <- "results/"
# For benchmarking spark.lapply, we generate
# lists
# 1. with different types (7):
# Length = 100,
# Type = integer,
# logical,
# double,
# character1,
# character10,
# character100,
# character1k.
data.list.type.double <- runif(100)
data.list.type.int <- as.integer(data.list.type.double * 100)
data.list.type.logical <- data.list.type.double < 0.5
data.list.type.char1 <- sapply(1:100, function(x) { "a" })
data.list.type.char10 <- sapply(1:100, function(x) { "0123456789" })
data.list.type.char100 <- sapply(1:100, function(x) { paste(replicate(20, "hello"), collapse = "") })
data.list.type.char1k <- sapply(1:100, function(x) { paste(replicate(200, "hello"), collapse = "") })
# 2. with different lengths (4):
# Type = integer,
# Length = 10,
# 100,
# 1k,
# 10k
data.list.len.10 <- 1:10
data.list.len.100 <- 1:100
data.list.len.1k <- 1:1000
data.list.len.10k <- 1:10000
# For benchmarking dapply+rnow/dapplyCollect, we generate
# data.frame
# 1. with different types (7):
# nrow = 100,
# ncol = 1,
# Type = integer,
# logical,
# double,
# character1,
# character10,
# character100,
# character1k.
data.df.type.double <- data.frame(data.list.type.double) %>% createDataFrame %>% cache
data.df.type.int <- data.frame(data.list.type.int) %>% createDataFrame %>% cache
data.df.type.logical <- data.frame(data.list.type.logical) %>% createDataFrame %>% cache
data.df.type.char1 <- data.frame(data.list.type.char1) %>% createDataFrame %>% cache
data.df.type.char10 <- data.frame(data.list.type.char10) %>% createDataFrame %>% cache
data.df.type.char100 <- data.frame(data.list.type.char100) %>% createDataFrame %>% cache
data.df.type.char1k <- data.frame(data.list.type.char1k) %>% createDataFrame %>% cache
# counting to materialize the cache
data.df.type.double %>% nrow
data.df.type.int %>% nrow
data.df.type.logical %>% nrow
data.df.type.char1 %>% nrow
data.df.type.char10 %>% nrow
data.df.type.char100 %>% nrow
data.df.type.char1k %>% nrow
# 2. with different lengths (4):
# ncol = 1,
# Type = integer,
# nrow = 10,
# 100,
# 1k,
# 10k
data.df.len.10 <- data.frame(data.list.len.10) %>% createDataFrame %>% cache
data.rdf.len.100 <- data.frame(data.list.len.100)
data.df.len.100 <- data.rdf.len.100 %>% createDataFrame %>% cache
data.df.len.1k <- data.frame(data.list.len.1k) %>% createDataFrame %>% cache
data.df.len.10k <- data.frame(data.list.len.10k) %>% createDataFrame %>% cache
# counting to materialize the cache
data.df.len.10 %>% nrow
data.df.len.100 %>% nrow
data.df.len.1k %>% nrow
data.df.len.10k %>% nrow
# 3. with different ncols (3):
# nrow = 100,
# Type = integer,
# ncol = 1,
# 10,
# 100
data.df.ncol.1 <- data.df.len.100
data.df.ncol.10 <- data.frame(rep(data.rdf.len.100, each = 10)) %>% createDataFrame %>% cache
data.df.ncol.100 <- data.frame(rep(data.rdf.len.100, each = 100)) %>% createDataFrame %>% cache
# counting to materialize the cache
data.df.ncol.1 %>% nrow
data.df.ncol.10 %>% nrow
data.df.ncol.100 %>% nrow
# For benchmarking gapply+rnow/gapplyCollect, we generate
# data.frame
# 1. with different number of keys (3):
# ncol = 2,
# nrow = 1k,
# Type = <integer, double>,
# nkeys = 10,
# 100,
# 1000
data.rand.1k <- runif(1000)
data.df.nkey.10 <- data.frame(key = rep(1:10, 100), val = data.rand.1k) %>% createDataFrame %>% cache
data.df.nkey.100 <- data.frame(key = rep(1:100, 10), val = data.rand.1k) %>% createDataFrame %>% cache
data.df.nkey.1k <- data.frame(key = 1:1000, val = data.rand.1k) %>% createDataFrame %>% cache
# counting to materialize the cache
data.df.nkey.10 %>% nrow
data.df.nkey.100 %>% nrow
data.df.nkey.1k %>% nrow
# 2. with different lengths (4):
# ncol = 2,
# Type = <integer, double>,
# nkeys = 10,
# nrow = 10,
# 100,
# 1k,
# 10k
data.df.nrow.10 <- data.frame(key = 1:10, val = runif(10)) %>% createDataFrame %>% cache
data.df.nrow.100 <- data.frame(key = rep(1:10, 10), val = runif(100)) %>% createDataFrame %>% cache
data.df.nrow.1k <- data.frame(key = rep(1:10, 100), val = data.rand.1k) %>% createDataFrame %>% cache
data.df.nrow.10k <- data.frame(key = rep(1:10, 1000), val = runif(10000)) %>% createDataFrame %>% cache
# counting to materialize the cache
data.df.nrow.10 %>% nrow
data.df.nrow.100 %>% nrow
data.df.nrow.1k %>% nrow
data.df.nrow.10k %>% nrow
# 3. with different key types (3):
# ncol = 2,
# nkeys = 10,
# nrow = 1k,
# Type = <integer, double>
# <character10, double>
# <character100, double>
key.char10 <- sapply(1:10, function(x) { sprintf("%010d", x) })
key.char100 <- sapply(1:10, function(x) { sprintf("%0100d", x) })
data.df.keytype.int <- data.df.nrow.1k
data.df.keytype.char10 <- data.frame(key = rep(key.char10, 100), val = data.rand.1k) %>% createDataFrame %>% cache
data.df.keytype.char100 <- data.frame(key = rep(key.char100, 10), val = data.rand.1k) %>% createDataFrame %>% cache
# counting to materialize the cache
data.df.keytype.int %>% nrow
data.df.keytype.char10 %>% nrow
data.df.keytype.char100 %>% nrow
# ========== benchmark functions ==============
# plot utils
plot.box.mbm <- function(mbm_result) {
ggplot(mbm_result, aes(x = expr, y = time/1000000)) +
geom_boxplot(outlier.colour = "red", outlier.shape = 1) + labs(x = "", y = "Time (milliseconds)") +
geom_jitter(position=position_jitter(0.2), alpha = 0.2) +
coord_flip()
}
plot.box.throughput <- function(throughput_result) {
ggplot(throughput_result, aes(x = expr, y = throughput)) +
geom_boxplot(outlier.colour = "red", outlier.shape = 1) + labs(x = "", y = "Throughput (B/s)") +
geom_jitter(position=position_jitter(0.2), alpha = 0.2) +
coord_flip()
}
# dummy function
func.dummy <- function(x) { x }
# group function
gfunc.mean <- function(key, sdf) {
y <- data.frame(key, mean(sdf$val), stringsAsFactors = F)
names(y) <- c("key", "val")
y
}
# lapply, type
run.mbm.spark.lapply.type <- function(n) {
microbenchmark(
"lapply.int.100" = { spark.lapply(data.list.type.int, func.dummy) },
"lapply.double.100" = { spark.lapply(data.list.type.double, func.dummy) },
"lapply.logical.100" = { spark.lapply(data.list.type.logical, func.dummy) },
"lapply.char1.100" = { spark.lapply(data.list.type.char1, func.dummy) },
"lapply.char10.100" = { spark.lapply(data.list.type.char10, func.dummy) },
"lapply.char100.100" = { spark.lapply(data.list.type.char100, func.dummy) },
"lapply.char1k.100" = { spark.lapply(data.list.type.char1k, func.dummy) },
times = n
)
}
# lapply, len
run.mbm.spark.lapply.len <- function(mode, n) {
if (mode == "small") {
microbenchmark(
"lapply.len.10" = { spark.lapply(data.list.len.10, func.dummy) },
"lapply.len.100" = { spark.lapply(data.list.len.100, func.dummy) },
times = n
)
} else if (mode == "medium") {
microbenchmark(
"lapply.len.10" = { spark.lapply(data.list.len.10, func.dummy) },
"lapply.len.100" = { spark.lapply(data.list.len.100, func.dummy) },
"lapply.len.1k" = { spark.lapply(data.list.len.1k, func.dummy) },
times = n
)
} else {
microbenchmark(
"lapply.len.10" = { spark.lapply(data.list.len.10, func.dummy) },
"lapply.len.100" = { spark.lapply(data.list.len.100, func.dummy) },
"lapply.len.1k" = { spark.lapply(data.list.len.1k, func.dummy) },
"lapply.len.10k" = { spark.lapply(data.list.len.10k, func.dummy) },
times = n
)
}
}
# dapply, type
run.mbm.dapply.type <- function(n) {
microbenchmark(
"dapply.int.100" = { dapply(data.df.type.int, func.dummy, schema(data.df.type.int)) %>% nrow },
"dapply.double.100" = { dapply(data.df.type.double, func.dummy, schema(data.df.type.double)) %>% nrow },
"dapply.logical.100" = { dapply(data.df.type.logical, func.dummy, schema(data.df.type.logical)) %>% nrow },
"dapply.char1.100" = { dapply(data.df.type.char1, func.dummy, schema(data.df.type.char1)) %>% nrow },
"dapply.char10.100" = { dapply(data.df.type.char10, func.dummy, schema(data.df.type.char10)) %>% nrow },
"dapply.char100.100" = { dapply(data.df.type.char100, func.dummy, schema(data.df.type.char100)) %>% nrow },
"dapply.char1k.100" = { dapply(data.df.type.char1k, func.dummy, schema(data.df.type.char1k)) %>% nrow },
times = n
)
}
# dapply, len
run.mbm.dapply.len <- function(mode, n) {
if (mode == "small") {
microbenchmark(
"dapply.len.10" = { dapply(data.df.len.10, func.dummy, schema(data.df.len.10)) %>% nrow },
"dapply.len.100" = { dapply(data.df.len.100, func.dummy, schema(data.df.len.100)) %>% nrow },
times = n
)
} else if (mode == "medium") {
microbenchmark(
"dapply.len.10" = { dapply(data.df.len.10, func.dummy, schema(data.df.len.10)) %>% nrow },
"dapply.len.100" = { dapply(data.df.len.100, func.dummy, schema(data.df.len.100)) %>% nrow },
"dapply.len.1k" = { dapply(data.df.len.1k, func.dummy, schema(data.df.len.1k)) %>% nrow },
times = n
)
} else {
microbenchmark(
"dapply.len.10" = { dapply(data.df.len.10, func.dummy, schema(data.df.len.10)) %>% nrow },
"dapply.len.100" = { dapply(data.df.len.100, func.dummy, schema(data.df.len.100)) %>% nrow },
"dapply.len.1k" = { dapply(data.df.len.1k, func.dummy, schema(data.df.len.1k)) %>% nrow },
"dapply.len.10k" = { dapply(data.df.len.10k, func.dummy, schema(data.df.len.10k)) %>% nrow },
times = n
)
}
}
# dapply, ncol
run.mbm.dapply.ncol <- function(mode, n) {
if (mode == "small") {
microbenchmark(
"dapply.ncol.1" = { dapply(data.df.ncol.1, func.dummy, schema(data.df.ncol.1)) %>% nrow },
"dapply.ncol.10" = { dapply(data.df.ncol.10, func.dummy, schema(data.df.ncol.10)) %>% nrow },
times = n
)
} else {
microbenchmark(
"dapply.ncol.1" = { dapply(data.df.ncol.1, func.dummy, schema(data.df.ncol.1)) %>% nrow },
"dapply.ncol.10" = { dapply(data.df.ncol.10, func.dummy, schema(data.df.ncol.10)) %>% nrow },
"dapply.ncol.100" = { dapply(data.df.ncol.100, func.dummy, schema(data.df.ncol.100)) %>% nrow },
times = n
)
}
}
# dapplyCollect, type
run.mbm.dapplyCollect.type <- function(n) {
microbenchmark(
"dapplyCollect.int.100" = { dapplyCollect(data.df.type.int, func.dummy) },
"dapplyCollect.double.100" = { dapplyCollect(data.df.type.double, func.dummy) },
"dapplyCollect.logical.100" = { dapplyCollect(data.df.type.logical, func.dummy) },
"dapplyCollect.char1.100" = { dapplyCollect(data.df.type.char1, func.dummy) },
"dapplyCollect.char10.100" = { dapplyCollect(data.df.type.char10, func.dummy) },
"dapplyCollect.char100.100" = { dapplyCollect(data.df.type.char100, func.dummy) },
"dapplyCollect.char1k.100" = { dapplyCollect(data.df.type.char1k, func.dummy) },
times = n
)
}
# dapplyCollect, len
run.mbm.dapplyCollect.len <- function(mode, n) {
if (mode != "large") {
microbenchmark(
"dapplyCollect.len.10" = { dapplyCollect(data.df.len.10, func.dummy) },
"dapplyCollect.len.100" = { dapplyCollect(data.df.len.100, func.dummy) },
"dapplyCollect.len.1k" = { dapplyCollect(data.df.len.1k, func.dummy) },
times = n
)
} else {
microbenchmark(
"dapplyCollect.len.10" = { dapplyCollect(data.df.len.10, func.dummy) },
"dapplyCollect.len.100" = { dapplyCollect(data.df.len.100, func.dummy) },
"dapplyCollect.len.1k" = { dapplyCollect(data.df.len.1k, func.dummy) },
"dapplyCollect.len.10k" = { dapplyCollect(data.df.len.10k, func.dummy) },
times = n
)
}
}
# dapplyCollect, ncol
run.mbm.dapplyCollect.ncol <- function(mode, n) {
if (mode == "small") {
microbenchmark(
"dapplyCollect.ncol.1" = { dapplyCollect(data.df.ncol.1, func.dummy) },
"dapplyCollect.ncol.10" = { dapplyCollect(data.df.ncol.10, func.dummy) },
times = n
)
} else {
microbenchmark(
"dapplyCollect.ncol.1" = { dapplyCollect(data.df.ncol.1, func.dummy) },
"dapplyCollect.ncol.10" = { dapplyCollect(data.df.ncol.10, func.dummy) },
"dapplyCollect.ncol.100" = { dapplyCollect(data.df.ncol.100, func.dummy) },
times = n
)
}
}
# gapply, nkey
run.mbm.gapply.nkey <- function(mode, n) {
if (mode == "small") {
microbenchmark(
"gapply.nkey.10" = { gapply(data.df.nkey.10, data.df.nkey.10$key, gfunc.mean, schema(data.df.nkey.10)) %>% nrow },
"gapply.nkey.100" = { gapply(data.df.nkey.100, data.df.nkey.100$key, gfunc.mean, schema(data.df.nkey.100)) %>% nrow },
times = n
)
} else if (mode == "medium") {
microbenchmark(
"gapply.nkey.10" = { gapply(data.df.nkey.10, data.df.nkey.10$key, gfunc.mean, schema(data.df.nkey.10)) %>% nrow },
"gapply.nkey.100" = { gapply(data.df.nkey.100, data.df.nkey.100$key, gfunc.mean, schema(data.df.nkey.100)) %>% nrow },
"gapply.nkey.1k" = { gapply(data.df.nkey.1k, data.df.nkey.1k$key, gfunc.mean, schema(data.df.nkey.1k)) %>% nrow },
times = n
)
} else {
microbenchmark(
"gapply.nkey.10" = { gapply(data.df.nkey.10, data.df.nkey.10$key, gfunc.mean, schema(data.df.nkey.10)) %>% nrow },
"gapply.nkey.100" = { gapply(data.df.nkey.100, data.df.nkey.100$key, gfunc.mean, schema(data.df.nkey.100)) %>% nrow },
"gapply.nkey.1k" = { gapply(data.df.nkey.1k, data.df.nkey.1k$key, gfunc.mean, schema(data.df.nkey.1k)) %>% nrow },
times = n
)
}
}
# gapply, nrow
run.mbm.gapply.nrow <- function(mode, n) {
if (mode == "small") {
microbenchmark(
"gapply.nrow.10" = { gapply(data.df.nrow.10, data.df.nrow.10$key, gfunc.mean, schema(data.df.nrow.10)) %>% nrow },
"gapply.nrow.100" = { gapply(data.df.nrow.100, data.df.nrow.100$key, gfunc.mean, schema(data.df.nrow.100)) %>% nrow },
times = n
)
} else if (mode == "medium") {
microbenchmark(
"gapply.nrow.10" = { gapply(data.df.nrow.10, data.df.nrow.10$key, gfunc.mean, schema(data.df.nrow.10)) %>% nrow },
"gapply.nrow.100" = { gapply(data.df.nrow.100, data.df.nrow.100$key, gfunc.mean, schema(data.df.nrow.100)) %>% nrow },
"gapply.nrow.1k" = { gapply(data.df.nrow.1k, data.df.nrow.1k$key, gfunc.mean, schema(data.df.nrow.1k)) %>% nrow },
times = n
)
} else {
microbenchmark(
"gapply.nrow.10" = { gapply(data.df.nrow.10, data.df.nrow.10$key, gfunc.mean, schema(data.df.nrow.10)) %>% nrow },
"gapply.nrow.100" = { gapply(data.df.nrow.100, data.df.nrow.100$key, gfunc.mean, schema(data.df.nrow.100)) %>% nrow },
"gapply.nrow.1k" = { gapply(data.df.nrow.1k, data.df.nrow.1k$key, gfunc.mean, schema(data.df.nrow.1k)) %>% nrow },
"gapply.nrow.10k" = { gapply(data.df.nrow.10k, data.df.nrow.10k$key, gfunc.mean, schema(data.df.nrow.10k)) %>% nrow },
times = n
)
}
}
# gapply, keytype
run.mbm.gapply.keytype <- function(n) {
microbenchmark(
"gapply.keytype.int" = { gapply(data.df.keytype.int, data.df.keytype.int$key, gfunc.mean, schema(data.df.keytype.int)) %>% nrow },
"gapply.keytype.char10" = { gapply(data.df.keytype.char10, data.df.keytype.char10$key, gfunc.mean, schema(data.df.keytype.char10)) %>% nrow },
"gapply.keytype.char100" = { gapply(data.df.keytype.char100, data.df.keytype.char100$key, gfunc.mean, schema(data.df.keytype.char100)) %>% nrow },
times = n
)
}
# gapplyCollect, nkey
run.mbm.gapplyCollect.nkey <- function(mode, n) {
if (mode == "small") {
microbenchmark(
"gapplyCollect.nkey.10" = { gapplyCollect(data.df.nkey.10, data.df.nkey.10$key, gfunc.mean) },
"gapplyCollect.nkey.100" = { gapplyCollect(data.df.nkey.100, data.df.nkey.100$key, gfunc.mean) },
times = n
)
} else if (mode == "medium") {
microbenchmark(
"gapplyCollect.nkey.10" = { gapplyCollect(data.df.nkey.10, data.df.nkey.10$key, gfunc.mean) },
"gapplyCollect.nkey.100" = { gapplyCollect(data.df.nkey.100, data.df.nkey.100$key, gfunc.mean) },
"gapplyCollect.nkey.1k" = { gapplyCollect(data.df.nkey.1k, data.df.nkey.1k$key, gfunc.mean) },
times = n
)
} else {
microbenchmark(
"gapplyCollect.nkey.10" = { gapplyCollect(data.df.nkey.10, data.df.nkey.10$key, gfunc.mean) },
"gapplyCollect.nkey.100" = { gapplyCollect(data.df.nkey.100, data.df.nkey.100$key, gfunc.mean) },
"gapplyCollect.nkey.1k" = { gapplyCollect(data.df.nkey.1k, data.df.nkey.1k$key, gfunc.mean) },
times = n
)
}
}
# gapplyCollect, nrow
run.mbm.gapplyCollect.nrow <- function(mode, n) {
if (mode == "small") {
microbenchmark(
"gapplyCollect.nrow.10" = { gapplyCollect(data.df.nrow.10, data.df.nrow.10$key, gfunc.mean) },
"gapplyCollect.nrow.100" = { gapplyCollect(data.df.nrow.100, data.df.nrow.100$key, gfunc.mean) },
times = n
)
} else if (mode == "medium") {
microbenchmark(
"gapplyCollect.nrow.10" = { gapplyCollect(data.df.nrow.10, data.df.nrow.10$key, gfunc.mean) },
"gapplyCollect.nrow.100" = { gapplyCollect(data.df.nrow.100, data.df.nrow.100$key, gfunc.mean) },
"gapplyCollect.nrow.1k" = { gapplyCollect(data.df.nrow.1k, data.df.nrow.1k$key, gfunc.mean) },
times = n
)
} else {
microbenchmark(
"gapplyCollect.nrow.10" = { gapplyCollect(data.df.nrow.10, data.df.nrow.10$key, gfunc.mean) },
"gapplyCollect.nrow.100" = { gapplyCollect(data.df.nrow.100, data.df.nrow.100$key, gfunc.mean) },
"gapplyCollect.nrow.1k" = { gapplyCollect(data.df.nrow.1k, data.df.nrow.1k$key, gfunc.mean) },
"gapplyCollect.nrow.10k" = { gapplyCollect(data.df.nrow.10k, data.df.nrow.10k$key, gfunc.mean) },
times = n
)
}
}
# gapplyCollect, keytype
run.mbm.gapplyCollect.keytype <- function(n) {
microbenchmark(
"gapplyCollect.keytype.int" = { gapplyCollect(data.df.keytype.int, data.df.keytype.int$key, gfunc.mean) },
"gapplyCollect.keytype.char10" = { gapplyCollect(data.df.keytype.char10, data.df.keytype.char10$key, gfunc.mean) },
"gapplyCollect.keytype.char100" = { gapplyCollect(data.df.keytype.char100, data.df.keytype.char100$key, gfunc.mean) },
times = n
)
}
# ============== compute object sizes ==================
ser <- function(data) {
serialize(data, connection = NULL)
}
get.sizes <- function(obj_names, objs) {
obj_sizes <- objs %>% lapply(ser) %>% lapply(object.size)
data.frame(cbind(obj_names, obj_sizes))
}
# lapply
sizes.lapply.type <- get.sizes(
list(
"lapply.double.100",
"lapply.int.100",
"lapply.logical.100",
"lapply.char1.100",
"lapply.char10.100",
"lapply.char100.100",
"lapply.char1k.100"),
list(
data.list.type.double,
data.list.type.int,
data.list.type.logical,
data.list.type.char1,
data.list.type.char10,
data.list.type.char100,
data.list.type.char1k)
)
sizes.lapply.len <- get.sizes(
list(
"lapply.len.10",
"lapply.len.100",
"lapply.len.1k",
"lapply.len.10k"),
list(
data.list.len.10,
data.list.len.100,
data.list.len.1k,
data.list.len.10k)
)
# data for dapply
df.dapply.type <- lapply(list(
data.list.type.double,
data.list.type.int,
data.list.type.logical,
data.list.type.char1,
data.list.type.char10,
data.list.type.char100,
data.list.type.char1k), data.frame)
df.dapply.len <- lapply(list(
data.list.len.10,
data.list.len.100,
data.list.len.1k,
data.list.len.10k), data.frame)
df.dapply.ncol <- lapply(list(
data.list.len.100,
rep(data.rdf.len.100, each = 10),
rep(data.rdf.len.100, each = 100)), data.frame)
# dapply
sizes.dapply.type <- get.sizes(
list(
"dapply.double.100",
"dapply.int.100",
"dapply.logical.100",
"dapply.char1.100",
"dapply.char10.100",
"dapply.char100.100",
"dapply.char1k.100"),
df.dapply.type
)
sizes.dapply.len <- get.sizes(
list(
"dapply.len.10",
"dapply.len.100",
"dapply.len.1k",
"dapply.len.10k"),
df.dapply.len
)
sizes.dapply.ncol <- get.sizes(
list(
"dapply.ncol.1",
"dapply.ncol.10",
"dapply.ncol.100"),
df.dapply.ncol
)
# dapplyCollect
sizes.dapplyCollect.type <- get.sizes(
list(
"dapplyCollect.double.100",
"dapplyCollect.int.100",
"dapplyCollect.logical.100",
"dapplyCollect.char1.100",
"dapplyCollect.char10.100",
"dapplyCollect.char100.100",
"dapplyCollect.char1k.100"),
df.dapply.type
)
sizes.dapplyCollect.len <- get.sizes(
list(
"dapplyCollect.len.10",
"dapplyCollect.len.100",
"dapplyCollect.len.1k",
"dapplyCollect.len.10k"),
df.dapply.len
)
sizes.dapplyCollect.ncol <- get.sizes(
list(
"dapplyCollect.ncol.1",
"dapplyCollect.ncol.10",
"dapplyCollect.ncol.100"),
df.dapply.ncol
)
# data for gapply
df.gapply.nkey <- list(
data.frame(key = rep(1:10, 100), val = data.rand.1k),
data.frame(key = rep(1:100, 10), val = data.rand.1k),
data.frame(key = 1:1000, val = data.rand.1k))
df.gapply.nrow <- list(
data.frame(key = 1:10, val = runif(10)),
data.frame(key = rep(1:10, 10), val = runif(100)),
data.frame(key = rep(1:10, 100), val = data.rand.1k),
data.frame(key = rep(1:10, 1000), val = runif(10000)))
df.gapply.keytype <- list(
data.frame(key = rep(1:10, 100), val = data.rand.1k),
data.frame(key = rep(key.char10, 100), val = data.rand.1k),
data.frame(key = rep(key.char100, 10), val = data.rand.1k))
# gapply
sizes.gapply.nkey <- get.sizes(
list(
"gapply.nkey.10",
"gapply.nkey.100",
"gapply.nkey.1k"),
df.gapply.nkey
)
sizes.gapply.nrow <- get.sizes(
list(
"gapply.nrow.10",
"gapply.nrow.100",
"gapply.nrow.1k",
"gapply.nrow.10k"),
df.gapply.nrow
)
sizes.gapply.keytype <- get.sizes(
list(
"gapply.keytype.int",
"gapply.keytype.char10",
"gapply.keytype.char100"),
df.gapply.keytype
)
# gapplyCollect
sizes.gapplyCollect.nkey <- get.sizes(
list(
"gapplyCollect.nkey.10",
"gapplyCollect.nkey.100",
"gapplyCollect.nkey.1k"),
df.gapply.nkey
)
sizes.gapplyCollect.nrow <- get.sizes(
list(
"gapplyCollect.nrow.10",
"gapplyCollect.nrow.100",
"gapplyCollect.nrow.1k",
"gapplyCollect.nrow.10k"),
df.gapply.nrow
)
sizes.gapplyCollect.keytype <- get.sizes(
list(
"gapplyCollect.keytype.int",
"gapplyCollect.keytype.char10",
"gapplyCollect.keytype.char100"),
df.gapply.keytype
)
df.sizes <- rbind(
sizes.lapply.type,
sizes.lapply.len,
sizes.dapply.type,
sizes.dapply.len,
sizes.dapply.ncol,
sizes.dapplyCollect.type,
sizes.dapplyCollect.len,
sizes.dapplyCollect.ncol,
sizes.gapply.nkey,
sizes.gapply.nrow,
sizes.gapply.keytype,
sizes.gapplyCollect.nkey,
sizes.gapplyCollect.nrow,
sizes.gapplyCollect.keytype)
df.sizes$obj_names <- unlist(df.sizes$obj_names)
df.sizes$obj_sizes <- unlist(df.sizes$obj_sizes)

View File

@ -1,165 +0,0 @@
# ========== generating results =================
source("define_benchmark.r")
args = commandArgs(trailingOnly=TRUE)
if (args[1] == "small") {
mode = "small"
n1 = 10
n2 = 10
} else if (args[1] == "medium") {
mode = "medium"
n1 = 10
n2 = 20
} else {
mode = "large"
n1 = 20
n2 = 20
}
N = 20
# lapply, type
mbm.spark.lapply.type <- run.mbm.spark.lapply.type(n2)
p <- mbm.spark.lapply.type %>% plot.box.mbm
filename <- sprintf("%slapply.type.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# lapply, len
mbm.spark.lapply.len <- run.mbm.spark.lapply.len(mode, n1)
p <- mbm.spark.lapply.len %>% plot.box.mbm
filename <- sprintf("%slapply.len.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# dapply, type
mbm.dapply.type <- run.mbm.dapply.type(n2)
p <- mbm.dapply.type %>% plot.box.mbm
filename <- sprintf("%sdapply.type.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# dapply, len
mbm.dapply.len <- run.mbm.dapply.len(mode, n1)
p <- mbm.dapply.len %>% plot.box.mbm
filename <- sprintf("%sdapply.len.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# dapply, ncol
mbm.dapply.ncol <- run.mbm.dapply.ncol(mode, n1)
p <- mbm.dapply.ncol %>% plot.box.mbm
filename <- sprintf("%sdapply.ncol.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# dapplyCollect, type
mbm.dapplyCollect.type <- run.mbm.dapplyCollect.type(N)
p <- mbm.dapplyCollect.type %>% plot.box.mbm
filename <- sprintf("%sdapplyCollect.type.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# dapplyCollect, len
mbm.dapplyCollect.len <- run.mbm.dapplyCollect.len(mode, N)
p <- mbm.dapplyCollect.len %>% plot.box.mbm
filename <- sprintf("%sdapplyCollect.len.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# dapplyCollect, ncol
mbm.dapplyCollect.ncol <- run.mbm.dapplyCollect.ncol(mode, n1)
p <- mbm.dapplyCollect.ncol %>% plot.box.mbm
filename <- sprintf("%sdapplyCollect.ncol.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# gapply, nkey
mbm.gapply.nkey <- run.mbm.gapply.nkey(mode, n1)
p <- mbm.gapply.nkey %>% plot.box.mbm
filename <- sprintf("%sgapply.nkey.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# gapply, nrow
mbm.gapply.nrow <- run.mbm.gapply.nrow(mode, n1)
p <- mbm.gapply.nrow %>% plot.box.mbm
filename <- sprintf("%sgapply.nrow.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# gapply, keytype
mbm.gapply.keytype <- run.mbm.gapply.keytype(n1)
p <- mbm.gapply.keytype %>% plot.box.mbm
filename <- sprintf("%sgapply.keytype.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# gapplyCollect, nkey
mbm.gapplyCollect.nkey <- run.mbm.gapplyCollect.nkey(mode, n1)
p <- mbm.gapplyCollect.nkey %>% plot.box.mbm
filename <- sprintf("%sgapplyCollect.nkey.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# gapplyCollect, nrow
mbm.gapplyCollect.nrow <- run.mbm.gapplyCollect.nrow(mode, n1)
p <- mbm.gapplyCollect.nrow %>% plot.box.mbm
filename <- sprintf("%sgapplyCollect.nrow.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
# gapplyCollect, keytype
mbm.gapplyCollect.keytype <- run.mbm.gapplyCollect.keytype(n1)
p <- mbm.gapplyCollect.keytype %>% plot.box.mbm
filename <- sprintf("%sgapplyCollect.keytype.%s.png", dir_path, mode)
ggsave(filename, width=7, height=4)
tmp <- rbind(
mbm.spark.lapply.type,
mbm.spark.lapply.len,
mbm.dapply.type,
mbm.dapply.len,
mbm.dapply.ncol,
mbm.dapplyCollect.type,
mbm.dapplyCollect.len,
mbm.dapplyCollect.ncol,
mbm.gapply.nkey,
mbm.gapply.nrow,
mbm.gapply.keytype,
mbm.gapplyCollect.nkey,
mbm.gapplyCollect.nrow,
mbm.gapplyCollect.keytype)
# compute throughput
tmp_size <- merge(tmp, df.sizes, by.x = "expr", by.y = "obj_names", all.x=TRUE)
tmp_size$throughput <- round(tmp_size$obj_sizes*1000000/tmp_size$time, digits=2) # bytes per second
# plot throughput
p <- tmp_size %>% plot.box.throughput
filename <- sprintf("%sall.throughput.%s.png", dir_path, mode)
ggsave(filename, width=7, height=6)
# save raw data to csv file
towrite <- tmp_size[order(tmp_size$expr, tmp_size$time),]
write.csv(towrite, file="results/results.csv", row.names = F)
# save mean value in ml.perf_metrics format
# timestamp: timestamp, benchmarkId: string, benchmarkName: string,
# metricName: string, metricValue: string, isLargerBetter: boolean, parameters map<string, string>
op <- options(digits.secs = 3)
curTimestamp <- Sys.time()
benchmarkName <- "com.databricks.spark.sql.perf.sparkr.UserDefinedFunction"
metricName <- "throughput.byte.per.second"
isLargerBetter <- TRUE
perf_metric <- aggregate(towrite$throughput, list(towrite$expr), mean)
names(perf_metric) <- c("benchmarkId", "throughput")
perf_metric$timestamp <- curTimestamp
perf_metric$benchmarkName <- benchmarkName
perf_metric$metricName <- metricName
perf_metric$isLargerBetter <- isLargerBetter
perf_metric$parameters <- NULL
write.csv(perf_metric, file="results/perf_metrics.csv", row.names = F)

View File

@ -1,29 +0,0 @@
#!/bin/bash
# Usage: ./run_benchmark.sh /Users/liang/spark small
if [[ ($# -ne 1 && $# -ne 2) ]]; then
echo "Usage: $0 <your SPARK_HOME> [small/medium/large] (default: small)" >&2
exit 1
fi
if ! [ -e "$1" ]; then
echo "$1 not found" >&2
exit 1
fi
if ! [ -d "$1" ]; then
echo "$1 not a directory" >&2
exit 1
fi
if [[ ( $# -eq 2 && ($2 -ne "small" || $2 -ne "medium" || $2 -ne "large" ) ) ]]; then
echo "The second argument should be 'small' or 'medium' or 'large'" >&2
exit 1
fi
mkdir -p results
export SPARK_HOME=$1
export R_LIBS_USER=$SPARK_HOME/R/lib
if [[ $# -eq 1 ]]; then
Rscript run_benchmark.r small
else
Rscript run_benchmark.r $2
fi

View File

@ -1,277 +0,0 @@
// Databricks notebook source
// Multi TPC- H and DS generator and database importer using spark-sql-perf, typically to generate parquet files in S3/blobstore objects
val benchmarks = Seq("TPCDS", "TPCH") // Options: TCPDS", "TPCH"
val scaleFactors = Seq("1", "10", "100", "1000", "10000") // "1", "10", "100", "1000", "10000" list of scale factors to generate and import
val baseLocation = s"s3a://mybucket" // S3 bucket, blob, or local root path
val baseDatagenFolder = "/tmp" // usually /tmp if enough space is available for datagen files
// Output file formats
val fileFormat = "parquet" // only parquet was tested
val shuffle = true // If true, partitions will be coalesced into a single file during generation up to spark.sql.files.maxRecordsPerFile (if set)
val overwrite = false //if to delete existing files (doesn't check if results are complete on no-overwrite)
// Generate stats for CBO
val createTableStats = true
val createColumnStats = true
val workers: Int = spark.conf.get("spark.databricks.clusterUsageTags.clusterTargetWorkers").toInt //number of nodes, assumes one executor per node
val cores: Int = Runtime.getRuntime.availableProcessors.toInt //number of CPU-cores
val dbSuffix = "" // set only if creating multiple DBs or source file folders with different settings, use a leading _
val TPCDSUseLegacyOptions = false // set to generate file/DB naming and table options compatible with older results
// COMMAND ----------
// Imports, fail fast if we are missing any library
// For datagens
import java.io._
import scala.sys.process._
// spark-sql-perf
import com.databricks.spark.sql.perf._
import com.databricks.spark.sql.perf.tpch._
import com.databricks.spark.sql.perf.tpcds._
// Spark/Hadoop config
import org.apache.spark.deploy.SparkHadoopUtil
// COMMAND ----------
// Set Spark config to produce same and comparable source files across systems
// do not change unless you want to derive from default source file composition, in that case also set a DB suffix
spark.sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
// Prevent very large files. 20 million records creates between 500 and 1500MB files in TPCH
spark.sqlContext.setConf("spark.sql.files.maxRecordsPerFile", "20000000") // This also regulates the file coalesce
// COMMAND ----------
// Checks that we have the correct number of worker nodes to start the data generation
// Make sure you have set the workers variable correctly, as the datagens binaries need to be present in all nodes
val targetWorkers: Int = workers
def numWorkers: Int = sc.getExecutorMemoryStatus.size - 1
def waitForWorkers(requiredWorkers: Int, tries: Int) : Unit = {
for (i <- 0 to (tries-1)) {
if (numWorkers == requiredWorkers) {
println(s"Waited ${i}s. for $numWorkers workers to be ready")
return
}
if (i % 60 == 0) println(s"Waiting ${i}s. for workers to be ready, got only $numWorkers workers")
Thread sleep 1000
}
throw new Exception(s"Timed out waiting for workers to be ready after ${tries}s.")
}
waitForWorkers(targetWorkers, 3600) //wait up to an hour
// COMMAND ----------
// Time command helper
def time[R](block: => R): R = {
val t0 = System.currentTimeMillis() //nanoTime()
val result = block // call-by-name
val t1 = System.currentTimeMillis() //nanoTime()
println("Elapsed time: " + (t1 - t0) + "ms")
result
}
// COMMAND ----------
// FOR INSTALLING TPCH DBGEN (with the stdout patch)
def installDBGEN(url: String = "https://github.com/databricks/tpch-dbgen.git", useStdout: Boolean = true, baseFolder: String = "/tmp")(i: java.lang.Long): String = {
// check if we want the revision which makes dbgen output to stdout
val checkoutRevision: String = if (useStdout) "git checkout 0469309147b42abac8857fa61b4cf69a6d3128a8 -- bm_utils.c" else ""
Seq("mkdir", "-p", baseFolder).!
val pw = new PrintWriter(new File(s"${baseFolder}/dbgen_$i.sh" ))
pw.write(s"""
rm -rf ${baseFolder}/dbgen
rm -rf ${baseFolder}/dbgen_install_$i
mkdir ${baseFolder}/dbgen_install_$i
cd ${baseFolder}/dbgen_install_$i
git clone '$url'
cd tpch-dbgen
$checkoutRevision
make
ln -sf ${baseFolder}/dbgen_install_$i/tpch-dbgen ${baseFolder}/dbgen || echo "ln -sf failed"
test -e ${baseFolder}/dbgen/dbgen
echo "OK"
""")
pw.close
Seq("chmod", "+x", s"${baseFolder}/dbgen_$i.sh").!
Seq(s"${baseFolder}/dbgen_$i.sh").!!
}
// COMMAND ----------
// FOR INSTALLING TPCDS DSDGEN (with the stdout patch)
// Note: it assumes Debian/Ubuntu host, edit package manager if not
def installDSDGEN(url: String = "https://github.com/databricks/tpcds-kit.git", useStdout: Boolean = true, baseFolder: String = "/tmp")(i: java.lang.Long): String = {
Seq("mkdir", "-p", baseFolder).!
val pw = new PrintWriter(new File(s"${baseFolder}/dsdgen_$i.sh" ))
pw.write(s"""
sudo apt-get update
sudo apt-get -y --force-yes install gcc make flex bison byacc git
rm -rf ${baseFolder}/dsdgen
rm -rf ${baseFolder}/dsdgen_install_$i
mkdir ${baseFolder}/dsdgen_install_$i
cd ${baseFolder}/dsdgen_install_$i
git clone '$url'
cd tpcds-kit/tools
make -f Makefile.suite
ln -sf ${baseFolder}/dsdgen_install_$i/tpcds-kit/tools ${baseFolder}/dsdgen || echo "ln -sf failed"
${baseFolder}/dsdgen/dsdgen -h
test -e ${baseFolder}/dsdgen/dsdgen
echo "OK"
""")
pw.close
Seq("chmod", "+x", s"${baseFolder}/dsdgen_$i.sh").!
Seq(s"${baseFolder}/dsdgen_$i.sh").!!
}
// COMMAND ----------
// install (build) the data generators in all nodes
val res = spark.range(0, workers, 1, workers).map(worker => benchmarks.map{
case "TPCDS" => s"TPCDS worker $worker\n" + installDSDGEN(baseFolder = baseDatagenFolder)(worker)
case "TPCH" => s"TPCH worker $worker\n" + installDBGEN(baseFolder = baseDatagenFolder)(worker)
}).collect()
// COMMAND ----------
// Set the benchmark name, tables, and location for each benchmark
// returns (dbname, tables, location)
def getBenchmarkData(benchmark: String, scaleFactor: String) = benchmark match {
case "TPCH" => (
s"tpch_sf${scaleFactor}_${fileFormat}${dbSuffix}",
new TPCHTables(spark.sqlContext, dbgenDir = s"${baseDatagenFolder}/dbgen", scaleFactor = scaleFactor, useDoubleForDecimal = false, useStringForDate = false, generatorParams = Nil),
s"$baseLocation/tpch/sf${scaleFactor}_${fileFormat}")
case "TPCDS" if !TPCDSUseLegacyOptions => (
s"tpcds_sf${scaleFactor}_${fileFormat}${dbSuffix}",
new TPCDSTables(spark.sqlContext, dsdgenDir = s"${baseDatagenFolder}/dsdgen", scaleFactor = scaleFactor, useDoubleForDecimal = false, useStringForDate = false),
s"$baseLocation/tpcds-2.4/sf${scaleFactor}_${fileFormat}")
case "TPCDS" if TPCDSUseLegacyOptions => (
s"tpcds_sf${scaleFactor}_nodecimal_nodate_withnulls${dbSuffix}",
new TPCDSTables(spark.sqlContext, s"${baseDatagenFolder}/dsdgen", scaleFactor = scaleFactor, useDoubleForDecimal = true, useStringForDate = true),
s"$baseLocation/tpcds/sf$scaleFactor-$fileFormat/useDecimal=false,useDate=false,filterNull=false")
}
// COMMAND ----------
// Data generation
def isPartitioned (tables: Tables, tableName: String) : Boolean =
util.Try(tables.tables.find(_.name == tableName).get.partitionColumns.nonEmpty).getOrElse(false)
def loadData(tables: Tables, location: String, scaleFactor: String) = {
val tableNames = tables.tables.map(_.name)
tableNames.foreach { tableName =>
// generate data
time {
tables.genData(
location = location,
format = fileFormat,
overwrite = overwrite,
partitionTables = true,
// if to coallesce into a single file (only one writter for non partitioned tables = slow)
clusterByPartitionColumns = shuffle, //if (isPartitioned(tables, tableName)) false else true,
filterOutNullPartitionValues = false,
tableFilter = tableName,
// this controlls parallelism on datagen and number of writers (# of files for non-partitioned)
// in general we want many writers to S3, and smaller tasks for large scale factors to avoid OOM and shuffle errors
numPartitions = if (scaleFactor.toInt <= 100 || !isPartitioned(tables, tableName)) (workers * cores) else (workers * cores * 4))
}
}
}
// COMMAND ----------
// Create the DB, import data, create
def createExternal(location: String, dbname: String, tables: Tables) = {
tables.createExternalTables(location, fileFormat, dbname, overwrite = overwrite, discoverPartitions = true)
}
def loadDB(dbname: String, tables: Tables, location: String) = {
val tableNames = tables.tables.map(_.name)
time {
println(s"Creating external tables at $location")
createExternal(location, dbname, tables)
}
// Show table information and attempt to vacuum
tableNames.foreach { tableName =>
println(s"Table $tableName has " + util.Try(sql(s"SHOW PARTITIONS $tableName").count() + " partitions").getOrElse(s"no partitions"))
util.Try(sql(s"VACUUM $tableName RETAIN 0.0. HOURS"))getOrElse(println(s"Cannot VACUUM $tableName"))
sql(s"DESCRIBE EXTENDED $tableName").show(999, false)
println
}
}
// COMMAND ----------
def setScaleConfig(scaleFactor: String): Unit = {
// Avoid OOM when shuffling large scale fators
// and errors like 2GB shuffle limit at 10TB like: Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Too large frame: 9640891355
// For 10TB 16x4core nodes were needed with the config below, 8x for 1TB and below.
// About 24hrs. for SF 1 to 10,000.
if (scaleFactor.toInt >= 10000) {
spark.conf.set("spark.sql.shuffle.partitions", "20000")
SparkHadoopUtil.get.conf.set("parquet.memory.pool.ratio", "0.1")
}
else if (scaleFactor.toInt >= 1000) {
spark.conf.set("spark.sql.shuffle.partitions", "2001") //one above 2000 to use HighlyCompressedMapStatus
SparkHadoopUtil.get.conf.set("parquet.memory.pool.ratio", "0.3")
}
else {
spark.conf.set("spark.sql.shuffle.partitions", "200") //default
SparkHadoopUtil.get.conf.set("parquet.memory.pool.ratio", "0.5")
}
}
// COMMAND ----------
// Generate the data, import the tables, generate stats for selected benchmarks and scale factors
scaleFactors.foreach { scaleFactor => {
// First set some config settings affecting OOMs/performance
setScaleConfig(scaleFactor)
benchmarks.foreach{ benchmark => {
val (dbname, tables, location) = getBenchmarkData(benchmark, scaleFactor)
// Start the actual loading
time {
println(s"Generating data for $benchmark SF $scaleFactor at $location")
loadData(tables = tables, location = location, scaleFactor = scaleFactor)
}
time {
println(s"\nImporting data for $benchmark into DB $dbname from $location")
loadDB(dbname = dbname, tables = tables, location = location)
}
if (createTableStats) time {
println(s"\nGenerating table statistics for DB $dbname (with analyzeColumns=$createColumnStats)")
tables.analyzeTables(dbname, analyzeColumns = createColumnStats)
}
}}
}}
// COMMAND ----------
// Print table structure for manual validation
scaleFactors.foreach { scaleFactor =>
benchmarks.foreach{ benchmark => {
val (dbname, tables, location) = getBenchmarkData(benchmark, scaleFactor)
sql(s"use $dbname")
time {
sql(s"show tables").select("tableName").collect().foreach{ tableName =>
val name: String = tableName.toString().drop(1).dropRight(1)
println(s"Printing table information for $benchmark SF $scaleFactor table $name")
val count = sql(s"select count(*) as ${name}_count from $name").collect()(0)(0)
println(s"Table $name has " + util.Try(sql(s"SHOW PARTITIONS $name").count() + " partitions").getOrElse(s"no partitions") + s" and $count rows.")
sql(s"describe extended $name").show(999, false)
}
}
println
}}
}

View File

@ -1,79 +0,0 @@
// Databricks notebook source
// TPCH runner (from spark-sql-perf) to be used on existing tables
// edit the main configuration below
val scaleFactors = Seq(1, 10, 100, 1000) //set scale factors to run
val format = "parquet" //format has have already been generated
def perfDatasetsLocation(scaleFactor: Int, format: String) =
s"s3a://my-bucket/tpch/sf${scaleFactor}_${format}"
val resultLocation = "s3a://my-bucket/results"
val iterations = 2
def databaseName(scaleFactor: Int, format: String) = s"tpch_sf${scaleFactor}_${format}"
val randomizeQueries = false //to use on concurrency tests
// Experiment metadata for results, edit if outside Databricks
val configuration = "default" //use default when using the out-of-box config
val runtype = "TPCH run" // Edit
val workers = 10 // Edit to the number of worker
val workerInstanceType = "my_VM_instance" // Edit to the instance type
// Make sure spark-sql-perf library is available (use the assembly version)
import com.databricks.spark.sql.perf.tpch._
import org.apache.spark.sql.functions._
// default config (for all notebooks)
var config : Map[String, String] = Map (
"spark.sql.broadcastTimeout" -> "7200" // Enable for SF 10,000
)
// Set the spark config
for ((k, v) <- config) spark.conf.set(k, v)
// Print the custom configs first
for ((k,v) <- config) println(k, spark.conf.get(k))
// Print all for easy debugging
print(spark.conf.getAll)
val tpch = new TPCH(sqlContext = spark.sqlContext)
// filter queries (if selected)
import com.databricks.spark.sql.perf.Query
import com.databricks.spark.sql.perf.ExecutionMode.CollectResults
import org.apache.commons.io.IOUtils
val queries = (1 to 22).map { q =>
val queryContent: String = IOUtils.toString(
getClass().getClassLoader().getResourceAsStream(s"tpch/queries/$q.sql"))
new Query(s"Q$q", spark.sqlContext.sql(queryContent), description = s"TPCH Query $q",
executionMode = CollectResults)
}
// COMMAND ----------
scaleFactors.foreach{ scaleFactor =>
println("DB SF " + databaseName(scaleFactor, format))
sql(s"USE ${databaseName(scaleFactor, format)}")
val experiment = tpch.runExperiment(
queries,
iterations = iterations,
resultLocation = resultLocation,
tags = Map(
"runtype" -> runtype,
"date" -> java.time.LocalDate.now.toString,
"database" -> databaseName(scaleFactor, format),
"scale_factor" -> scaleFactor.toString,
"spark_version" -> spark.version,
"system" -> "Spark",
"workers" -> workers,
"workerInstanceType" -> workerInstanceType,
"configuration" -> configuration
)
)
println(s"Running SF $scaleFactor")
experiment.waitForFinish(36 * 60 * 60) //36hours
val summary = experiment.getCurrentResults
.withColumn("Name", substring(col("name"), 2, 100))
.withColumn("Runtime", (col("parsingTime") + col("analysisTime") + col("optimizationTime") + col("planningTime") + col("executionTime")) / 1000.0)
.select('Name, 'Runtime)
summary.show(9999, false)
}

View File

@ -1,5 +1,5 @@
output: /databricks/spark/sql/mllib-perf-ci
timeoutSeconds: 43200 # This limit is for all benchmarks and should be bumped as more are added.
timeoutSeconds: 1000 # This limit is for all benchmarks and should be bumped as more are added.
common:
numExamples: 1000000
numTestExamples: 1000000
@ -9,58 +9,53 @@ common:
benchmarks:
- name: classification.DecisionTreeClassification
params:
depth: [5, 12]
depth: [5, 10]
numClasses: 4
- name: classification.GBTClassification
params:
numFeatures: 1500
depth: 4
numFeatures: 3000
depth: 5
numClasses: 4
maxIter: 8
maxIter: 10
- name: classification.RandomForestClassification
params:
depth: 10
numFeatures: 1000
numClasses: 4
maxIter: 100
maxIter: 200 # number of trees
- name: classification.LogisticRegression
params:
numExamples: 700000
numFeatures: 5000
elasticNetParam: [0.0, 0.5]
regParam: 0.01
tol: 0.0
maxIter: 10
maxIter: 20
- name: classification.LinearSVC
params:
numExamples: 500000
regParam: 0.01
tol: 0
maxIter: 10
maxIter: 20
- name: classification.NaiveBayes
params:
numExamples: 2000000
numFeatures: 5000
numClasses: 10
numClasses: 2
smoothing: 1.0
- name: clustering.GaussianMixture
params:
numFeatures: 30
k: 15
maxIter: 15
tol: 0.0
numExamples: 100000
numTestExamples: 100000
numFeatures: 1000
k: 10
maxIter: 10
tol: 0.01
- name: clustering.KMeans
params:
k: 20
k: 50
maxIter: 20
tol: 1e-3
- name: clustering.LDA
params:
numExamples: 200000
docLength: 100
vocabSize: 5000
k: 20
maxIter: 10
k: 60
maxIter: 20
optimizer:
- em
- online
@ -98,48 +93,40 @@ benchmarks:
params:
numExamples: 10000
vocabSize: 100
docLength: 300
docLength: 1000
numSynonymsToFind: 3
- name: fpm.FPGrowth
params:
numExamples: 10000000
numItems: 10000
itemSetSize: [4, 14]
itemSetSize: [4, 10]
- name: recommendation.ALS
params:
numExamples: 20000000
numExamples: 50000000
numTestExamples: 50000000
numUsers: 4000000
numItems: 4000000
numUsers: 6000000
numItems: 6000000
regParam: 0.01
rank: 10
maxIter: 8
maxIter: 10
- name: regression.DecisionTreeRegression
params:
depth: [4, 7]
- name: regression.GBTRegression
params:
numFeatures: 1000
depth: 4
maxIter: 8
depth: [5, 10]
- name: regression.GLMRegression
params:
numExamples: 400000
numExamples: 500000
numTestExamples: 500000
numFeatures: 500
numFeatures: 1000
link: log
family: gaussian
tol: 0.0
maxIter: 8
maxIter: 10
regParam: 0.1
- name: regression.LinearRegression
params:
numFeatures: 5000
regParam: 0.01
tol: 0.0
maxIter: 9
maxIter: 20
- name: regression.RandomForestRegression
params:
depth: 7
numFeatures: 500
maxIter: 16
depth: 10
maxIter: 4

View File

@ -142,12 +142,6 @@ benchmarks:
depth: 3
numClasses: 4
numFeatures: 5
- name: regression.GBTRegression
params:
numExamples: 100
numTestExamples: 10
depth: 3
maxIter: 3
- name: regression.GLMRegression
params:
numExamples: 100

View File

@ -1,14 +0,0 @@
select
count(*) as total,
count(ss_sold_date_sk) as not_null_total,
--count(distinct ss_sold_date_sk) as unique_days,
max(ss_sold_date_sk) as max_ss_sold_date_sk,
max(ss_sold_time_sk) as max_ss_sold_time_sk,
max(ss_item_sk) as max_ss_item_sk,
max(ss_customer_sk) as max_ss_customer_sk,
max(ss_cdemo_sk) as max_ss_cdemo_sk,
max(ss_hdemo_sk) as max_ss_hdemo_sk,
max(ss_addr_sk) as max_ss_addr_sk,
max(ss_store_sk) as max_ss_store_sk,
max(ss_promo_sk) as max_ss_promo_sk
from store_sales

View File

@ -1,55 +1,27 @@
package com.databricks.spark.sql.perf
class AggregationPerformance extends Benchmark {
import org.apache.spark.sql.{Row, SQLContext}
trait AggregationPerformance extends Benchmark {
import sqlContext.implicits._
import ExecutionMode._
val sizes = (1 to 6).map(math.pow(10, _).toInt)
val x = Table(
"1milints", {
val df = sqlContext.range(0, 1000000).repartition(1)
df.createTempView("1milints")
df
})
val joinTables = Seq(
Table(
"100milints", {
val df = sqlContext.range(0, 100000000).repartition(10)
df.createTempView("100milints")
df
}),
Table(
"1bilints", {
val df = sqlContext.range(0, 1000000000).repartition(10)
df.createTempView("1bilints")
df
}
)
)
val sizes = (1 to 6).map(math.pow(10, _).toInt).toSeq
val variousCardinality = sizes.map { size =>
Table(s"ints$size", {
val df = sparkContext.parallelize(1 to size).flatMap { group =>
Table(s"ints$size",
sparkContext.parallelize(1 to size).flatMap { group =>
(1 to 10000).map(i => (group, i))
}.toDF("a", "b")
df.createTempView(s"ints$size")
df
})
}.toDF("a", "b"))
}
val lowCardinality = sizes.map { size =>
val fullSize = size * 10000L
Table(
s"twoGroups$fullSize", {
val df = sqlContext.range(0, fullSize).select($"id" % 2 as 'a, $"id" as 'b)
df.createTempView(s"twoGroups$fullSize")
df
})
s"twoGroups$fullSize",
sqlContext.range(0, fullSize).select($"id" % 2 as 'a, $"id" as 'b))
}
val newAggreation = Variation("aggregationType", Seq("new", "old")) {
@ -57,7 +29,7 @@ class AggregationPerformance extends Benchmark {
case "new" => sqlContext.setConf("spark.sql.useAggregate2", "true")
}
val varyNumGroupsAvg: Seq[Benchmarkable] = variousCardinality.map(_.name).map { table =>
val varyNumGroupsAvg: Seq[Query] = variousCardinality.map(_.name).map { table =>
Query(
s"avg-$table",
s"SELECT AVG(b) FROM $table GROUP BY a",
@ -65,7 +37,7 @@ class AggregationPerformance extends Benchmark {
executionMode = ForeachResults)
}
val twoGroupsAvg: Seq[Benchmarkable] = lowCardinality.map(_.name).map { table =>
val twoGroupsAvg: Seq[Query] = lowCardinality.map(_.name).map { table =>
Query(
s"avg-$table",
s"SELECT AVG(b) FROM $table GROUP BY a",
@ -73,7 +45,7 @@ class AggregationPerformance extends Benchmark {
executionMode = ForeachResults)
}
val complexInput: Seq[Benchmarkable] =
val complexInput =
Seq("1milints", "100milints", "1bilints").map { table =>
Query(
s"aggregation-complex-input-$table",
@ -82,7 +54,7 @@ class AggregationPerformance extends Benchmark {
executionMode = CollectResults)
}
val aggregates: Seq[Benchmarkable] =
val aggregates =
Seq("1milints", "100milints", "1bilints").flatMap { table =>
Seq("SUM", "AVG", "COUNT", "STDDEV").map { agg =>
Query(

View File

@ -22,10 +22,9 @@ import scala.concurrent._
import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.util.{Success, Try, Failure => SFailure}
import scala.util.control.NonFatal
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, DataFrame, SQLContext, SparkSession}
import org.apache.spark.sql.{Dataset, DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.SparkContext
@ -42,7 +41,7 @@ abstract class Benchmark(
import Benchmark._
def this() = this(SparkSession.builder.getOrCreate().sqlContext)
def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate()))
val resultsLocation =
sqlContext.getAllConfs.getOrElse(
@ -335,7 +334,7 @@ object Benchmark {
.flatMap { query =>
try {
query.newDataFrame().queryExecution.logical.collect {
case r: UnresolvedRelation => r.tableName
case UnresolvedRelation(t) => t.table
}
} catch {
// ignore the queries that can't be parsed
@ -436,9 +435,7 @@ object Benchmark {
.format("json")
.save(resultPath)
} catch {
case NonFatal(e) =>
logMessage(s"Failed to write data: $e")
throw e
case e: Throwable => logMessage(s"Failed to write data: $e")
}
logCollection()
@ -451,7 +448,7 @@ object Benchmark {
val location = cpu.collectLogs(sqlContext, fs, timestamp)
logMessage(s"cpu results recorded to $location")
} catch {
case NonFatal(e) =>
case e: Throwable =>
logMessage(s"Error collecting logs: $e")
throw e
}
@ -476,14 +473,14 @@ object Benchmark {
/** Returns results from an actively running experiment. */
def getCurrentResults() = {
val tbl = sqlContext.createDataFrame(currentResults)
tbl.createOrReplaceTempView("currentResults")
tbl.registerTempTable("currentResults")
tbl
}
/** Returns full iterations from an actively running experiment. */
def getCurrentRuns() = {
val tbl = sqlContext.createDataFrame(currentRuns)
tbl.createOrReplaceTempView("currentRuns")
tbl.registerTempTable("currentRuns")
tbl
}

View File

@ -18,25 +18,22 @@ package com.databricks.spark.sql.perf
import java.util.UUID
import org.slf4j.LoggerFactory
import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging}
import scala.concurrent.duration._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import org.apache.spark.sql.{SQLContext,SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkEnv, SparkContext}
/** A trait to describe things that can be benchmarked. */
trait Benchmarkable {
@transient protected[this] val sqlSession = SparkSession.builder.getOrCreate()
@transient protected[this] val sqlContext = sqlSession.sqlContext
@transient protected[this] val sparkContext = sqlSession.sparkContext
trait Benchmarkable extends Logging {
@transient protected[this] val sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
@transient protected[this] val sparkContext = sqlContext.sparkContext
val name: String
protected val executionMode: ExecutionMode
lazy val logger = LoggerFactory.getLogger(this.getClass.getName)
final def benchmark(
includeBreakdown: Boolean,

View File

@ -104,7 +104,7 @@ package object cpu {
}
val counts = cpuLogs.groupBy($"stack").agg(count($"*")).collect().flatMap {
case Row(stackLines: Array[String], count: Long) => stackLines.toSeq.map(toStackElement) -> count :: Nil
case Row(stackLines: Seq[String], count: Long) => stackLines.map(toStackElement) -> count :: Nil
case other => println(s"Failed to parse $other"); Nil
}.toMap
val profile = new com.twitter.jvm.CpuProfile(counts, com.twitter.util.Duration.fromSeconds(10), cpuLogs.count().toInt, 0)

View File

@ -16,31 +16,9 @@
package com.databricks.spark.sql.perf
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.{Encoder, SQLContext}
import org.apache.spark.sql.expressions.Aggregator
object TypedAverage extends Aggregator[Long, SumAndCount, Double] {
override def zero: SumAndCount = SumAndCount(0L, 0)
override def reduce(b: SumAndCount, a: Long): SumAndCount = {
b.count += 1
b.sum += a
b
}
override def bufferEncoder = Encoders.product
override def outputEncoder = Encoders.scalaDouble
override def finish(reduction: SumAndCount): Double = reduction.sum.toDouble / reduction.count
override def merge(b1: SumAndCount, b2: SumAndCount): SumAndCount = {
b1.count += b2.count
b1.sum += b2.sum
b1
}
}
case class Data(id: Long)
case class SumAndCount(var sum: Long, var count: Int)
@ -54,7 +32,7 @@ class DatasetPerformance extends Benchmark {
val rdd = sparkContext.range(1, numLongs)
val smallNumLongs = 1000000
val smallds = sqlContext.range(1, smallNumLongs).as[Long]
val smallds = sqlContext.range(1, smallNumLongs)
val smallrdd = sparkContext.range(1, smallNumLongs)
def allBenchmarks = range ++ backToBackFilters ++ backToBackMaps ++ computeAverage
@ -121,10 +99,32 @@ class DatasetPerformance extends Benchmark {
.map(d => Data(d.id + 1L)))
)
val average = new Aggregator[Long, SumAndCount, Double] {
override def zero: SumAndCount = SumAndCount(0, 0)
override def reduce(b: SumAndCount, a: Long): SumAndCount = {
b.count += 1
b.sum += a
b
}
override def bufferEncoder = implicitly[Encoder[SumAndCount]]
override def outputEncoder = implicitly[Encoder[Double]]
override def finish(reduction: SumAndCount): Double = reduction.sum.toDouble / reduction.count
override def merge(b1: SumAndCount, b2: SumAndCount): SumAndCount = {
b1.count += b2.count
b1.sum += b2.sum
b1
}
}.toColumn
val computeAverage = Seq(
new Query(
"DS: average",
smallds.select(TypedAverage.toColumn).toDF(),
smallds.as[Long].select(average).toDF(),
executionMode = ExecutionMode.CollectResults),
new Query(
"DF: average",
@ -140,4 +140,4 @@ class DatasetPerformance extends Benchmark {
sumAndCount._1.toDouble / sumAndCount._2
})
)
}
}

View File

@ -3,8 +3,8 @@ package com.databricks.spark.sql.perf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
class JoinPerformance extends Benchmark {
trait JoinPerformance extends Benchmark {
// 1.5 mb, 1 file
import ExecutionMode._
import sqlContext.implicits._
@ -12,27 +12,22 @@ class JoinPerformance extends Benchmark {
private val table = sqlContext.table _
val x = Table(
"1milints", { // 1.5 mb, 1 file
val df = sqlContext.range(0, 1000000).repartition(1)
df.createTempView("1milints")
df
})
"1milints",
sqlContext.range(0, 1000000)
.repartition(1))
val joinTables = Seq(
// 143.542mb, 10 files
Table(
"100milints", { // 143.542mb, 10 files
val df = sqlContext.range(0, 100000000).repartition(10)
df.createTempView("100milints")
df
}),
"100milints",
sqlContext.range(0, 100000000)
.repartition(10)),
// 1.4348gb, 10 files
Table(
"1bilints", { // 143.542mb, 10 files
val df = sqlContext.range(0, 1000000000).repartition(10)
df.createTempView("1bilints")
df
}
)
"1bilints",
sqlContext.range(0, 1000000000)
.repartition(10))
)
val sortMergeJoin = Variation("sortMergeJoin", Seq("on", "off")) {
@ -40,7 +35,7 @@ class JoinPerformance extends Benchmark {
case "on" => sqlContext.setConf("spark.sql.planner.sortMergeJoin", "true")
}
val singleKeyJoins: Seq[Benchmarkable] = Seq("1milints", "100milints", "1bilints").flatMap { table1 =>
val singleKeyJoins = Seq("1milints", "100milints", "1bilints").flatMap { table1 =>
Seq("1milints", "100milints", "1bilints").flatMap { table2 =>
Seq("JOIN", "RIGHT JOIN", "LEFT JOIN", "FULL OUTER JOIN").map { join =>
Query(
@ -68,9 +63,9 @@ class JoinPerformance extends Benchmark {
val varyNumMatches = Seq(1, 2, 4, 8, 16).map { numCopies =>
val ints = table("100milints")
val copiedInts = Seq.fill(numCopies)(ints).reduce(_ union _)
val copiedInts = Seq.fill(numCopies)(ints).reduce(_ unionAll _)
new Query(
s"join - numMatches: $numCopies",
copiedInts.as("a").join(ints.as("b"), $"a.id" === $"b.id"))
}
}
}

View File

@ -54,7 +54,10 @@ class Query(
}
lazy val tablesInvolved = buildDataFrame.queryExecution.logical collect {
case r: UnresolvedRelation => r.tableName
case UnresolvedRelation(tableIdentifier) => {
// We are ignoring the database name.
tableIdentifier.table
}
}
def newDataFrame() = buildDataFrame
@ -85,11 +88,10 @@ class Query(
val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan.p(i)))
val indexMap = physicalOperators.map { case (index, op) => (op, index) }.toMap
val timeMap = new mutable.HashMap[Int, Double]
val maxFields = 999 // Maximum number of fields that will be converted to strings
physicalOperators.reverse.map {
case (index, node) =>
messages += s"Breakdown: ${node.simpleString(maxFields)}"
messages += s"Breakdown: ${node.simpleString}"
val newNode = buildDataFrame.queryExecution.executedPlan.p(index)
val executionTime = measureTimeMs {
newNode.execute().foreach((row: Any) => Unit)
@ -102,7 +104,7 @@ class Query(
BreakdownResult(
node.nodeName,
node.simpleString(maxFields).replaceAll("#\\d+", ""),
node.simpleString.replaceAll("#\\d+", ""),
index,
childIndexes,
executionTime,
@ -120,7 +122,7 @@ class Query(
val executionTime = measureTimeMs {
executionMode match {
case ExecutionMode.CollectResults => dataFrame.collect()
case ExecutionMode.ForeachResults => dataFrame.foreach { _ => ():Unit }
case ExecutionMode.ForeachResults => dataFrame.foreach { row => Unit }
case ExecutionMode.WriteParquet(location) =>
dataFrame.write.parquet(s"$location/$name.parquet")
case ExecutionMode.HashResults =>

View File

@ -17,14 +17,14 @@
package com.databricks.spark.sql.perf
import java.net.InetAddress
import java.io.File
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.{SparkContext, SparkConf}
import scala.util.Try
case class RunConfig(
master: String = "local[*]",
benchmarkName: String = null,
filter: Option[String] = None,
iterations: Int = 3,
@ -37,9 +37,6 @@ object RunBenchmark {
def main(args: Array[String]): Unit = {
val parser = new scopt.OptionParser[RunConfig]("spark-sql-perf") {
head("spark-sql-perf", "0.2.0")
opt[String]('m', "master")
.action { (x, c) => c.copy(master = x) }
.text("the Spark master to use, default to local[*]")
opt[String]('b', "benchmark")
.action { (x, c) => c.copy(benchmarkName = x) }
.text("the name of the benchmark to run")
@ -67,17 +64,14 @@ object RunBenchmark {
def run(config: RunConfig): Unit = {
val conf = new SparkConf()
.setMaster(config.master)
.setAppName(getClass.getName)
.setMaster("local[*]")
.setAppName(getClass.getName)
val sparkSession = SparkSession.builder.config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val sqlContext = sparkSession.sqlContext
val sc = SparkContext.getOrCreate(conf)
val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
sqlContext.setConf("spark.sql.perf.results",
new File("performance").toURI.toString)
sqlContext.setConf("spark.sql.perf.results", new java.io.File("performance").toURI.toString)
val benchmark = Try {
Class.forName(config.benchmarkName)
.newInstance()
@ -108,8 +102,7 @@ object RunBenchmark {
experiment.waitForFinish(1000 * 60 * 30)
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
val toShow = experiment.getCurrentRuns()
experiment.getCurrentRuns()
.withColumn("result", explode($"results"))
.select("result.*")
.groupBy("name")
@ -117,13 +110,9 @@ object RunBenchmark {
min($"executionTime") as 'minTimeMs,
max($"executionTime") as 'maxTimeMs,
avg($"executionTime") as 'avgTimeMs,
stddev($"executionTime") as 'stdDev,
(stddev($"executionTime") / avg($"executionTime") * 100) as 'stdDevPercent)
stddev($"executionTime") as 'stdDev)
.orderBy("name")
println("Showing at most 100 query results now")
toShow.show(100)
.show(truncate = false)
println(s"""Results: sqlContext.read.json("${experiment.resultPath}")""")
config.baseline.foreach { baseTimestamp =>
@ -147,4 +136,4 @@ object RunBenchmark {
data.show(truncate = false)
}
}
}
}

View File

@ -179,7 +179,7 @@ abstract class Tables(sqlContext: SQLContext, scaleFactor: String,
val data = df(format != "text", numPartitions)
val tempTableName = s"${name}_text"
data.createOrReplaceTempView(tempTableName)
data.registerTempTable(tempTableName)
val writer = if (partitionColumns.nonEmpty) {
if (clusterByPartitionColumns) {
@ -211,24 +211,9 @@ abstract class Tables(sqlContext: SQLContext, scaleFactor: String,
data.write
}
} else {
// treat non-partitioned tables as "one partition" that we want to coalesce
if (clusterByPartitionColumns) {
// in case data has more than maxRecordsPerFile, split into multiple writers to improve datagen speed
// files will be truncated to maxRecordsPerFile value, so the final result will be the same
val numRows = data.count
val maxRecordPerFile = util.Try(sqlContext.getConf("spark.sql.files.maxRecordsPerFile").toInt).getOrElse(0)
println(s"Data has $numRows rows clustered $clusterByPartitionColumns for $maxRecordPerFile")
log.info(s"Data has $numRows rows clustered $clusterByPartitionColumns for $maxRecordPerFile")
if (maxRecordPerFile > 0 && numRows > maxRecordPerFile) {
val numFiles = (numRows.toDouble/maxRecordPerFile).ceil.toInt
println(s"Coalescing into $numFiles files")
log.info(s"Coalescing into $numFiles files")
data.coalesce(numFiles).write
} else {
data.coalesce(1).write
}
// treat non-partitioned tables as "one partition" that we want to coalesce
data.coalesce(1).write
} else {
data.write
}
@ -266,7 +251,7 @@ abstract class Tables(sqlContext: SQLContext, scaleFactor: String,
def createTemporaryTable(location: String, format: String): Unit = {
println(s"Creating temporary table $name using data stored in $location.")
log.info(s"Creating temporary table $name using data stored in $location.")
sqlContext.read.format(format).load(location).createOrReplaceTempView(name)
sqlContext.read.format(format).load(location).registerTempTable(name)
}
def analyzeTable(databaseName: String, analyzeColumns: Boolean = false): Unit = {

View File

@ -1,5 +1,6 @@
package com.databricks.spark.sql.perf.mllib
import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging}
import org.apache.spark.ml.attribute.{NominalAttribute, NumericAttribute}
import org.apache.spark.ml.{Estimator, PipelineStage, Transformer}
import org.apache.spark.ml.evaluation.Evaluator
@ -20,7 +21,7 @@ import com.databricks.spark.sql.perf._
*
* It is assumed that the implementation is going to be an object.
*/
trait BenchmarkAlgorithm {
trait BenchmarkAlgorithm extends Logging {
def trainingDataSet(ctx: MLBenchContext): DataFrame
@ -34,21 +35,12 @@ trait BenchmarkAlgorithm {
/**
* The unnormalized score of the training procedure on a dataset. The normalization is
* performed by the caller.
* This calls `count()` on the transformed data to attempt to materialize the result for
* recording timing metrics.
*/
@throws[Exception]("if scoring fails")
def score(
ctx: MLBenchContext,
testSet: DataFrame,
model: Transformer): MLMetric = {
val output = model.transform(testSet)
// We create a useless UDF to make sure the entire DataFrame is instantiated.
val fakeUDF = udf { (_: Any) => 0 }
val columns = testSet.columns
output.select(sum(fakeUDF(struct(columns.map(col) : _*)))).first()
MLMetric.Invalid
}
model: Transformer): MLMetric = MLMetric.Invalid
def name: String = {
this.getClass.getCanonicalName.replace("$", "")

View File

@ -2,7 +2,7 @@ package com.databricks.spark.sql.perf.mllib
import com.databricks.spark.sql.perf.mllib.classification.LogisticRegression
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext,SparkSession}
import org.apache.spark.sql.SQLContext
import com.databricks.spark.sql.perf.{MLParams}
import OptionImplicits._
@ -27,9 +27,8 @@ object MLBenchmarks {
)
)
val sparkSession = SparkSession.builder.getOrCreate()
val sqlContext: SQLContext = sparkSession.sqlContext
val context = sqlContext.sparkContext
val context = SparkContext.getOrCreate()
val sqlContext: SQLContext = SQLContext.getOrCreate(context)
def benchmarkObjects: Seq[MLPipelineStageBenchmarkable] = benchmarks.map { mlb =>
new MLPipelineStageBenchmarkable(mlb.params, mlb.benchmark, sqlContext)

View File

@ -4,7 +4,7 @@ package com.databricks.spark.sql.perf.mllib
import scala.io.Source
import scala.language.implicitConversions
import org.slf4j.LoggerFactory
import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
@ -18,7 +18,7 @@ class MLLib(sqlContext: SQLContext)
def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate()))
}
object MLLib {
object MLLib extends Logging {
/**
* Runs a set of preprogrammed experiments and blocks on completion.
@ -26,9 +26,6 @@ object MLLib {
* @param runConfig a configuration that is av
* @return
*/
lazy val logger = LoggerFactory.getLogger(this.getClass.getName)
def runDefault(runConfig: RunConfig): DataFrame = {
val ml = new MLLib()
val benchmarks = MLBenchmarks.benchmarkObjects
@ -57,21 +54,6 @@ object MLLib {
run(yamlFile = configFile)
}
private[mllib] def getConf(yamlFile: String = null, yamlConfig: String = null): YamlConfig = {
Option(yamlFile).map(YamlConfig.readFile).getOrElse {
require(yamlConfig != null)
YamlConfig.readString(yamlConfig)
}
}
private[mllib] def getBenchmarks(conf: YamlConfig): Seq[MLPipelineStageBenchmarkable] = {
val sqlContext = com.databricks.spark.sql.perf.mllib.MLBenchmarks.sqlContext
val benchmarksDescriptions = conf.runnableBenchmarks
benchmarksDescriptions.map { mlb =>
new MLPipelineStageBenchmarkable(mlb.params, mlb.benchmark, sqlContext)
}
}
/**
* Runs all the experiments and blocks on completion
*
@ -80,12 +62,20 @@ object MLLib {
*/
def run(yamlFile: String = null, yamlConfig: String = null): DataFrame = {
logger.info("Starting run")
val conf = getConf(yamlFile, yamlConfig)
val conf: YamlConfig = Option(yamlFile).map(YamlConfig.readFile).getOrElse {
require(yamlConfig != null)
YamlConfig.readString(yamlConfig)
}
val sparkConf = new SparkConf().setAppName("MLlib QA").setMaster("local[2]")
val sc = SparkContext.getOrCreate(sparkConf)
sc.setLogLevel("INFO")
val b = new com.databricks.spark.sql.perf.mllib.MLLib()
val benchmarks = getBenchmarks(conf)
val sqlContext = com.databricks.spark.sql.perf.mllib.MLBenchmarks.sqlContext
val benchmarksDescriptions = conf.runnableBenchmarks
val benchmarks = benchmarksDescriptions.map { mlb =>
new MLPipelineStageBenchmarkable(mlb.params, mlb.benchmark, sqlContext)
}
println(s"${benchmarks.size} benchmarks identified:")
val str = benchmarks.map(_.prettyPrint).mkString("\n")
println(str)

View File

@ -1,7 +1,8 @@
package com.databricks.spark.sql.perf.mllib
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging}
import org.apache.spark.ml.{Estimator, Transformer}
import org.apache.spark.sql._
@ -13,7 +14,7 @@ class MLPipelineStageBenchmarkable(
params: MLParams,
test: BenchmarkAlgorithm,
sqlContext: SQLContext)
extends Benchmarkable with Serializable {
extends Benchmarkable with Serializable with Logging {
import MLPipelineStageBenchmarkable._
@ -26,7 +27,7 @@ class MLPipelineStageBenchmarkable(
override protected val executionMode: ExecutionMode = ExecutionMode.SparkPerfResults
override protected[mllib] def beforeBenchmark(): Unit = {
override protected def beforeBenchmark(): Unit = {
logger.info(s"$this beforeBenchmark")
try {
testData = test.testDataSet(param)
@ -36,12 +37,21 @@ class MLPipelineStageBenchmarkable(
trainingData.cache()
trainingData.count()
} catch {
case NonFatal(e) =>
case e: Throwable =>
println(s"$this error in beforeBenchmark: ${e.getStackTraceString}")
throw e
}
}
override protected def afterBenchmark(sc: SparkContext): Unit = {
// Best-effort clean up of weakly referenced RDDs, shuffles, and broadcasts
// Remove any leftover blocks that still exist
sc.getExecutorStorageStatus
.flatMap { status => status.blocks.map { case (bid, _) => bid } }
.foreach { bid => SparkEnv.get.blockManager.master.removeBlock(bid) }
super.afterBenchmark(sc)
}
override protected def doBenchmark(
includeBreakdown: Boolean,
description: String,

View File

@ -51,7 +51,6 @@ object NaiveBayes extends BenchmarkAlgorithm
// Initialize new Naive Bayes model
val pi = Vectors.dense(piArray)
val theta = new DenseMatrix(numClasses, numFeatures, thetaArray.flatten, true)
ModelBuilderSSP.newNaiveBayesModel(pi, theta)
}

View File

@ -13,41 +13,35 @@ import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext,
/** Object for testing VectorAssembler performance */
object VectorAssembler extends BenchmarkAlgorithm with TestFromTraining {
private def getInputCols(numInputCols: Int): Array[String] = {
Array.tabulate(numInputCols)(i => s"c${i}")
}
override def trainingDataSet(ctx: MLBenchContext): DataFrame = {
import ctx.params._
import ctx.sqlContext.implicits._
require(numInputCols.get <= numFeatures.get,
s"numInputCols (${numInputCols}) cannot be greater than numFeatures (${numFeatures}).")
val df = DataGenerator.generateContinuousFeatures(
var df = DataGenerator.generateContinuousFeatures(
ctx.sqlContext,
numExamples,
ctx.seed(),
numPartitions,
numFeatures)
val slice = udf { (v: Vector, numSlices: Int) =>
val data = v.toArray
val n = data.length.toLong
(0 until numSlices).map { i =>
val start = ((i * n) / numSlices).toInt
val end = ((i + 1) * n / numSlices).toInt
Vectors.dense(data.slice(start, end))
}
numFeatures * numInputCols
)
val sliceVec = udf { (v: Vector, from: Int, until: Int) =>
Vectors.dense(v.toArray.slice(from, until))
}
val inputCols = getInputCols(numInputCols.get)
df.select(slice(col("features"), lit(numInputCols.get)).as("slices"))
.select((0 until numInputCols.get).map(i => col("slices")(i).as(inputCols(i))): _*)
for (i <- (1 to numInputCols.get)) {
val colName = s"inputCol${i.toString}"
val fromIndex = (i - 1) * numFeatures
val untilIndex = i * numFeatures
df = df.withColumn(colName, sliceVec(col("features"), lit(fromIndex), lit(untilIndex)))
}
df.drop(col("features"))
}
override def getPipelineStage(ctx: MLBenchContext): PipelineStage = {
import ctx.params._
val inputCols = getInputCols(numInputCols.get)
val inputCols = (1 to numInputCols.get)
.map(i => s"inputCol${i.toString}").toArray
new ml.feature.VectorAssembler()
.setInputCols(inputCols)
}

View File

@ -1,18 +0,0 @@
package com.databricks.spark.sql.perf.mllib.regression
import org.apache.spark.ml.PipelineStage
import org.apache.spark.ml.regression.GBTRegressor
import com.databricks.spark.sql.perf.mllib.OptionImplicits._
import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext,
TreeOrForestRegressor}
object GBTRegression extends BenchmarkAlgorithm with TreeOrForestRegressor {
override def getPipelineStage(ctx: MLBenchContext): PipelineStage = {
import ctx.params._
new GBTRegressor()
.setMaxDepth(depth)
.setMaxIter(maxIter)
.setSeed(ctx.seed())
}
}

View File

@ -1,121 +0,0 @@
/*
* Copyright 2015 Databricks Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.databricks.spark.sql.perf.tpcds
import org.apache.spark.sql.SparkSession
case class GenTPCDSDataConfig(
master: String = "local[*]",
dsdgenDir: String = null,
scaleFactor: String = null,
location: String = null,
format: String = null,
useDoubleForDecimal: Boolean = false,
useStringForDate: Boolean = false,
overwrite: Boolean = false,
partitionTables: Boolean = true,
clusterByPartitionColumns: Boolean = true,
filterOutNullPartitionValues: Boolean = true,
tableFilter: String = "",
numPartitions: Int = 100)
/**
* Gen TPCDS data.
* To run this:
* {{{
* build/sbt "test:runMain <this class> -d <dsdgenDir> -s <scaleFactor> -l <location> -f <format>"
* }}}
*/
object GenTPCDSData {
def main(args: Array[String]): Unit = {
val parser = new scopt.OptionParser[GenTPCDSDataConfig]("Gen-TPC-DS-data") {
opt[String]('m', "master")
.action { (x, c) => c.copy(master = x) }
.text("the Spark master to use, default to local[*]")
opt[String]('d', "dsdgenDir")
.action { (x, c) => c.copy(dsdgenDir = x) }
.text("location of dsdgen")
.required()
opt[String]('s', "scaleFactor")
.action((x, c) => c.copy(scaleFactor = x))
.text("scaleFactor defines the size of the dataset to generate (in GB)")
opt[String]('l', "location")
.action((x, c) => c.copy(location = x))
.text("root directory of location to create data in")
opt[String]('f', "format")
.action((x, c) => c.copy(format = x))
.text("valid spark format, Parquet, ORC ...")
opt[Boolean]('i', "useDoubleForDecimal")
.action((x, c) => c.copy(useDoubleForDecimal = x))
.text("true to replace DecimalType with DoubleType")
opt[Boolean]('e', "useStringForDate")
.action((x, c) => c.copy(useStringForDate = x))
.text("true to replace DateType with StringType")
opt[Boolean]('o', "overwrite")
.action((x, c) => c.copy(overwrite = x))
.text("overwrite the data that is already there")
opt[Boolean]('p', "partitionTables")
.action((x, c) => c.copy(partitionTables = x))
.text("create the partitioned fact tables")
opt[Boolean]('c', "clusterByPartitionColumns")
.action((x, c) => c.copy(clusterByPartitionColumns = x))
.text("shuffle to get partitions coalesced into single files")
opt[Boolean]('v', "filterOutNullPartitionValues")
.action((x, c) => c.copy(filterOutNullPartitionValues = x))
.text("true to filter out the partition with NULL key value")
opt[String]('t', "tableFilter")
.action((x, c) => c.copy(tableFilter = x))
.text("\"\" means generate all tables")
opt[Int]('n', "numPartitions")
.action((x, c) => c.copy(numPartitions = x))
.text("how many dsdgen partitions to run - number of input tasks.")
help("help")
.text("prints this usage text")
}
parser.parse(args, GenTPCDSDataConfig()) match {
case Some(config) =>
run(config)
case None =>
System.exit(1)
}
}
private def run(config: GenTPCDSDataConfig) {
val spark = SparkSession
.builder()
.appName(getClass.getName)
.master(config.master)
.getOrCreate()
val tables = new TPCDSTables(spark.sqlContext,
dsdgenDir = config.dsdgenDir,
scaleFactor = config.scaleFactor,
useDoubleForDecimal = config.useDoubleForDecimal,
useStringForDate = config.useStringForDate)
tables.genData(
location = config.location,
format = config.format,
overwrite = config.overwrite,
partitionTables = config.partitionTables,
clusterByPartitionColumns = config.clusterByPartitionColumns,
filterOutNullPartitionValues = config.filterOutNullPartitionValues,
tableFilter = config.tableFilter,
numPartitions = config.numPartitions)
}
}

View File

@ -17,9 +17,10 @@
package com.databricks.spark.sql.perf.tpcds
import scala.collection.mutable
import com.databricks.spark.sql.perf._
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.SQLContext
/**
* TPC-DS benchmark's dataset.
@ -34,7 +35,7 @@ class TPCDS(@transient sqlContext: SQLContext)
with Tpcds_2_4_Queries
with Serializable {
def this() = this(SparkSession.builder.getOrCreate().sqlContext)
def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate()))
/*
def setupBroadcast(skipTables: Seq[String] = Seq("store_sales", "customer")) = {

View File

@ -27,7 +27,7 @@ trait Tpcds_2_4_Queries extends Benchmark {
import ExecutionMode._
val queryNames = Seq(
private val queryNames = Seq(
"q1", "q2", "q3", "q4", "q5", "q6", "q7", "q8", "q9", "q10",
"q11", "q12", "q13", "q14a", "q14b", "q15", "q16", "q17", "q18", "q19",
"q20", "q21", "q22", "q23a", "q23b", "q24a", "q24b", "q25", "q26", "q27",

View File

@ -77,7 +77,7 @@ class TPCHTables(
val tables = Seq(
Table("part",
partitionColumns = "p_brand" :: Nil,
partitionColumns = Nil,
'p_partkey.long,
'p_name.string,
'p_mfgr.string,
@ -107,18 +107,18 @@ class TPCHTables(
'ps_comment.string
),
Table("customer",
partitionColumns = "c_mktsegment" :: Nil,
partitionColumns = Nil,
'c_custkey.long,
'c_name.string,
'c_address.string,
'c_nationkey.long,
'c_nationkey.string,
'c_phone.string,
'c_acctbal.decimal(12, 2),
'c_mktsegment.string,
'c_comment.string
),
Table("orders",
partitionColumns = "o_orderdate" :: Nil,
partitionColumns = Nil,
'o_orderkey.long,
'o_custkey.long,
'o_orderstatus.string,
@ -130,7 +130,7 @@ class TPCHTables(
'o_comment.string
),
Table("lineitem",
partitionColumns = "l_shipdate" :: Nil,
partitionColumns = Nil,
'l_orderkey.long,
'l_partkey.long,
'l_suppkey.long,

View File

@ -59,7 +59,7 @@ object ModelBuilderSSP {
}
def newNaiveBayesModel(pi: Vector, theta: Matrix): NaiveBayesModel = {
val model = new NaiveBayesModel("naivebayes-uid", pi, theta, null)
val model = new NaiveBayesModel("naivebayes-uid", pi, theta)
model.set(model.modelType, "multinomial")
}
@ -160,9 +160,9 @@ object TreeBuilder {
labelGenerator.setSeed(rng.nextLong)
// We use a dummy impurityCalculator for all nodes.
val impurityCalculator = if (isRegression) {
ImpurityCalculator.getCalculator("variance", Array.fill[Double](3)(0.0), 0L)
ImpurityCalculator.getCalculator("variance", Array.fill[Double](3)(0.0))
} else {
ImpurityCalculator.getCalculator("gini", Array.fill[Double](labelType)(0.0), 0L)
ImpurityCalculator.getCalculator("gini", Array.fill[Double](labelType)(0.0))
}
randomBalancedDecisionTreeHelper(depth, featureArity, impurityCalculator,

View File

@ -1,9 +1,11 @@
package com.databricks.spark.sql.perf
import org.apache.spark.sql.hive.test.TestHive
import org.scalatest.FunSuite
class DatasetPerformanceSuite extends FunSuite {
ignore("run benchmark") {
TestHive // Init HiveContext
val benchmark = new DatasetPerformance() {
override val numLongs = 100
}

View File

@ -1,46 +1,10 @@
package com.databricks.spark.sql.perf.mllib
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.scalatest.FunSuite
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.Row
import org.apache.spark.sql.{Row, SparkSession}
class MLLibSuite extends FunSuite with BeforeAndAfterAll {
private var sparkSession: SparkSession = _
var savedLevels: Map[String, Level] = _
override def beforeAll(): Unit = {
super.beforeAll()
sparkSession = SparkSession.builder.master("local[2]").appName("MLlib QA").getOrCreate()
// Travis limits the size of the log file produced by a build. Because we do run a small
// version of all the ML benchmarks in this suite, we produce a ton of logs. Here we set the
// log level to ERROR, just for this suite, to avoid displeasing travis.
savedLevels = Seq("akka", "org", "com.databricks").map { name =>
val logger = Logger.getLogger(name)
val curLevel = logger.getLevel
logger.setLevel(Level.ERROR)
name -> curLevel
}.toMap
}
override def afterAll(): Unit = {
savedLevels.foreach { case (name, level) =>
Logger.getLogger(name).setLevel(level)
}
try {
if (sparkSession != null) {
sparkSession.stop()
}
// To avoid RPC rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
sparkSession = null
} finally {
super.afterAll()
}
}
class MLLibTest extends FunSuite {
test("test MlLib benchmarks with mllib-small.yaml.") {
val results = MLLib.run(yamlConfig = MLLib.smallConfig)
@ -56,11 +20,4 @@ class MLLibSuite extends FunSuite with BeforeAndAfterAll {
fail("Unable to run all benchmarks successfully, see console output for more info.")
}
}
test("test before benchmark methods for pipeline benchmarks.") {
val benchmarks = MLLib.getBenchmarks(MLLib.getConf(yamlConfig = MLLib.smallConfig))
benchmarks.foreach { b =>
b.beforeBenchmark()
}
}
}

View File

@ -1 +1 @@
version in ThisBuild := "0.5.1-SNAPSHOT"
version in ThisBuild := "0.5.0-SNAPSHOT"