diff --git a/README.md b/README.md index e265c46..28e03d0 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,8 @@ This is a performance testing framework for [Spark SQL](https://spark.apache.org # Quick Start +## Running from command line. + ``` $ bin/run --help @@ -26,61 +28,94 @@ Usage: spark-sql-perf [options] $ bin/run --benchmark DatasetPerformance ``` -### MLlib tests +The first run of `bin/run` will build the library. + +## Build + +Use `sbt package` or `sbt assembly` to build the library jar. + +# MLlib tests To run MLlib tests, run `/bin/run-ml yamlfile`, where `yamlfile` is the path to a YAML configuration file describing tests to run and their parameters. # TPC-DS -## How to use it -The rest of document will use TPC-DS benchmark as an example. We will add contents to explain how to use other benchmarks add the support of a new benchmark dataset in future. +## Setup a benchmark -### Setup a benchmark Before running any query, a dataset needs to be setup by creating a `Benchmark` object. Generating the TPCDS data requires dsdgen built and available on the machines. We have a fork of dsdgen that -you will need. It can be found [here](https://github.com/davies/tpcds-kit). +you will need. The fork includes changes to generate TPCDS data to stdout, so that this library can +pipe them directly to Spark, without intermediate files. Therefore, this library will not work with +the vanilla TPCDS kit. +It can be found [here](https://github.com/databricks/tpcds-kit). ``` -// If not done already, you have to set the path for the results -spark.config("spark.sql.perf.results", "/tmp/results") +import com.databricks.spark.sql.perf.tpcds.TPCDSTables -import com.databricks.spark.sql.perf.tpcds.Tables -// Tables in TPC-DS benchmark used by experiments. -// dsdgenDir is the location of dsdgen tool installed in your machines. -// scaleFactor defines the size of the dataset to generate (in GB) -val tables = new Tables(sqlContext, dsdgenDir, scaleFactor) +// Set: +val rootDir = ... // root directory of location to create data in. +val databaseName = ... // name of database to create. +val scaleFactor = ... // scaleFactor defines the size of the dataset to generate (in GB). +val format = ... // valid spark format like parquet "parquet". +// Run: +val tables = new TPCDSTables(sqlContext, + dsdgenDir = "/tmp/tpcds-kit/tools", // location of dsdgen tool + scaleFactor = scaleFactor) -// Generate data. -// location is the place there the generated data will be written -// format is a valid spark format like "parquet" -tables.genData(location, format, overwrite, partitionTables, useDoubleForDecimal, clusterByPartitionColumns, filterOutNullPartitionValues) +tables.genData( + location = rootDir, + format = format, + overwrite = true, + partitionTables = true, + clusterByPartitionColumns = true, + filterOutNullPartitionValues = false, + tableFilter = "", // all tables + numPartitions = 100) // how many dsdgen partitions to run. + +// Create the specified database +sql(s"create database $databaseName") // Create metastore tables in a specified database for your data. // Once tables are created, the current database will be switched to the specified database. -tables.createExternalTables(location, format, databaseName, overwrite) +tables.createExternalTables(rootDir, "parquet", databaseName, overwrite = true, discoverPartitions = true) // Or, if you want to create temporary tables -tables.createTemporaryTables(location, format) -// Setup TPC-DS experiment -import com.databricks.spark.sql.perf.tpcds.TPCDS -val tpcds = new TPCDS (sqlContext = sqlContext) +// tables.createTemporaryTables(location, format) + +// For CBO only, gather statistics on all columns: +tables.analyzeTables(databaseName, analyzeColumns = true) ``` -### Run benchmarking queries +## Run benchmarking queries After setup, users can use `runExperiment` function to run benchmarking queries and record query execution time. Taking TPC-DS as an example, you can start an experiment by using ``` -val experiment = tpcds.runExperiment(tpcds.interactiveQueries) -experiment.waitForFinish(60*60*10) // optional: wait for results (with timeout) +import com.databricks.spark.sql.perf.tpcds.TPCDS + +val tpcds = new TPCDS (sqlContext = sqlContext) +// Set: +val resultLocation = ... // place to write results +val iterations = 1 // how many iterations of queries to run. +val queries = tpcds.tpcds2_4Queries // queries to run. +val timeout = 24*60*60 // timeout, in seconds. +// Run: +val experiment = tpcds.runExperiment( + queries, + iterations = iterations, + resultLocation = resultLocation, + forkThread = true) +experiment.waitForFinish(timeout) ``` +By default, experiment will be started in a background thread. For every experiment run (i.e. every call of `runExperiment`), Spark SQL Perf will use the timestamp of the start time to identify this experiment. Performance results will be stored in the sub-dir named by the timestamp in the given `spark.sql.perf.results` (for example `/tmp/results/timestamp=1429213883272`). The performance results are stored in the JSON format. -### Retrieve results -While the experiment is running you can use `experiment.html` to list the status. Once the experiment is complete, you can load the results from disk. +## Retrieve results +While the experiment is running you can use `experiment.html` to get a summary, or `experiment.getCurrentResults` to get complete current results. +Once the experiment is complete, you can still access `experiment.getCurrentResults`, or you can load the results from disk. ``` // Get all experiments results. -val resultTable = spark.read.json(spark.conf.get("spark.sql.perf.results")) +val resultTable = spark.read.json(resultLocation) resultTable.createOrReplaceTempView("sqlPerformance") sqlContext.table("sqlPerformance") // Get the result of a particular run by specifying the timestamp of that run. @@ -88,3 +123,32 @@ sqlContext.table("sqlPerformance").filter("timestamp = 1429132621024") // or val specificResultTable = spark.read.json(experiment.resultPath) ``` + +You can get a basic summary by running: +``` +experiment.getCurrentResults // or: spark.read.json(resultLocation).filter("timestamp = 1429132621024") + .withColumn("Name", substring(col("name"), 2, 100)) + .withColumn("Runtime", (col("parsingTime") + col("analysisTime") + col("optimizationTime") + col("planningTime") + col("executionTime")) / 1000.0) + .select('Name, 'Runtime) +``` + +## Running in Databricks. + +There are example notebooks in `src/main/notebooks` for running TPCDS in the Databricks environment. + +### tpcds_datagen notebook + +This notebook can be used to install dsdgen on all worker nodes, run data generation, and create the TPCDS database. +Note that because of the way dsdgen is installed, it will not work on an autoscaling cluster, and `num_workers` has +to be updated to the number of worker instances on the cluster. +Data generation may also break if any of the workers is killed - the restarted worker container will not have `dsdgen` anymore. + +### tpcds_run notebook + +This notebook can be used to run TPCDS queries. + +For running parallel TPCDS streams: +* Create a Cluster and attach the spark-sql-perf library to it. +* Create a Job using the notebook and attaching to the created cluster as "existing cluster". +* Allow concurrent runs of the created job. +* Launch appriopriate number of Runs of the Job to run in parallel on the cluster.