From 6136ecea6ea667a9b9d55f2c6447dad23ed28d06 Mon Sep 17 00:00:00 2001 From: Nico Poggi Date: Mon, 10 Sep 2018 23:18:33 +0200 Subject: [PATCH] TPC-H datagenerator and instructions (#136) * Adding basic partitioning to TPCH tables following VectorH paper as baseline * Multi datagen (TPC- H and DS) and multi scale factor notebook/script. Generates all the selected scale factors and benchmarks in one run. * TPCH runner notebook or script for spark-shell * Adding basic TPCH documentation --- README.md | 20 +- src/main/notebooks/TPC-multi_datagen.scala | 277 ++++++++++++++++++ src/main/notebooks/tpch_run.scala | 79 +++++ .../databricks/spark/sql/perf/tpch/TPCH.scala | 8 +- 4 files changed, 378 insertions(+), 6 deletions(-) create mode 100644 src/main/notebooks/TPC-multi_datagen.scala create mode 100644 src/main/notebooks/tpch_run.scala diff --git a/README.md b/README.md index 7d241c9..7a27871 100644 --- a/README.md +++ b/README.md @@ -140,9 +140,21 @@ experiment.getCurrentResults // or: spark.read.json(resultLocation).filter("time .select('Name, 'Runtime) ``` -## Running in Databricks. +# TPC-H -There are example notebooks in `src/main/notebooks` for running TPCDS in the Databricks environment. +TPC-H can be run similarly to TPC-DS replacing `tpcds` for `tpch`. +Take a look at the data generator and `tpch_run` notebook code below. + +## Running in Databricks workspace (or spark-shell) + +There are example notebooks in `src/main/notebooks` for running TPCDS and TPCH in the Databricks environment. +_These scripts can also be run from spark-shell command line with minor modifications using `:load file_name.scala`._ + +### TPC-multi_datagen notebook +This notebook (or scala script) can be use to generate both TPCDS and TPCH data at selected scale factors. +It is a newer version from the `tpcds_datagen` notebook below. To use it: +* Edit the config variables the top of the script. +* Run the whole notebook. ### tpcds_datagen notebook @@ -160,3 +172,7 @@ For running parallel TPCDS streams: * Create a Job using the notebook and attaching to the created cluster as "existing cluster". * Allow concurrent runs of the created job. * Launch appriopriate number of Runs of the Job to run in parallel on the cluster. + +### tpch_run notebook + +This notebook can be used to run TPCH queries. Data needs be generated first. \ No newline at end of file diff --git a/src/main/notebooks/TPC-multi_datagen.scala b/src/main/notebooks/TPC-multi_datagen.scala new file mode 100644 index 0000000..eef98f4 --- /dev/null +++ b/src/main/notebooks/TPC-multi_datagen.scala @@ -0,0 +1,277 @@ +// Databricks notebook source +// Multi TPC- H and DS generator and database importer using spark-sql-perf, typically to generate parquet files in S3/blobstore objects +val benchmarks = Seq("TPCDS", "TPCH") // Options: TCPDS", "TPCH" +val scaleFactors = Seq("1", "10", "100", "1000", "10000") // "1", "10", "100", "1000", "10000" list of scale factors to generate and import + +val baseLocation = s"s3a://mybucket" // S3 bucket, blob, or local root path +val baseDatagenFolder = "/tmp" // usually /tmp if enough space is available for datagen files + +// Output file formats +val fileFormat = "parquet" // only parquet was tested +val shuffle = true // If true, partitions will be coalesced into a single file during generation up to spark.sql.files.maxRecordsPerFile (if set) +val overwrite = false //if to delete existing files (doesn't check if results are complete on no-overwrite) + +// Generate stats for CBO +val createTableStats = true +val createColumnStats = true + +val workers: Int = spark.conf.get("spark.databricks.clusterUsageTags.clusterTargetWorkers").toInt //number of nodes, assumes one executor per node +val cores: Int = Runtime.getRuntime.availableProcessors.toInt //number of CPU-cores + +val dbSuffix = "" // set only if creating multiple DBs or source file folders with different settings, use a leading _ +val TPCDSUseLegacyOptions = false // set to generate file/DB naming and table options compatible with older results + +// COMMAND ---------- + +// Imports, fail fast if we are missing any library + +// For datagens +import java.io._ +import scala.sys.process._ + +// spark-sql-perf +import com.databricks.spark.sql.perf._ +import com.databricks.spark.sql.perf.tpch._ +import com.databricks.spark.sql.perf.tpcds._ + +// Spark/Hadoop config +import org.apache.spark.deploy.SparkHadoopUtil + +// COMMAND ---------- + +// Set Spark config to produce same and comparable source files across systems +// do not change unless you want to derive from default source file composition, in that case also set a DB suffix +spark.sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy") + +// Prevent very large files. 20 million records creates between 500 and 1500MB files in TPCH +spark.sqlContext.setConf("spark.sql.files.maxRecordsPerFile", "20000000") // This also regulates the file coalesce + +// COMMAND ---------- + +// Checks that we have the correct number of worker nodes to start the data generation +// Make sure you have set the workers variable correctly, as the datagens binaries need to be present in all nodes +val targetWorkers: Int = workers +def numWorkers: Int = sc.getExecutorMemoryStatus.size - 1 +def waitForWorkers(requiredWorkers: Int, tries: Int) : Unit = { + for (i <- 0 to (tries-1)) { + if (numWorkers == requiredWorkers) { + println(s"Waited ${i}s. for $numWorkers workers to be ready") + return + } + if (i % 60 == 0) println(s"Waiting ${i}s. for workers to be ready, got only $numWorkers workers") + Thread sleep 1000 + } + throw new Exception(s"Timed out waiting for workers to be ready after ${tries}s.") +} +waitForWorkers(targetWorkers, 3600) //wait up to an hour + +// COMMAND ---------- + +// Time command helper +def time[R](block: => R): R = { + val t0 = System.currentTimeMillis() //nanoTime() + val result = block // call-by-name + val t1 = System.currentTimeMillis() //nanoTime() + println("Elapsed time: " + (t1 - t0) + "ms") + result +} + +// COMMAND ---------- + +// FOR INSTALLING TPCH DBGEN (with the stdout patch) +def installDBGEN(url: String = "https://github.com/databricks/tpch-dbgen.git", useStdout: Boolean = true, baseFolder: String = "/tmp")(i: java.lang.Long): String = { + // check if we want the revision which makes dbgen output to stdout + val checkoutRevision: String = if (useStdout) "git checkout 0469309147b42abac8857fa61b4cf69a6d3128a8 -- bm_utils.c" else "" + Seq("mkdir", "-p", baseFolder).! + val pw = new PrintWriter(new File(s"${baseFolder}/dbgen_$i.sh" )) + pw.write(s""" +rm -rf ${baseFolder}/dbgen +rm -rf ${baseFolder}/dbgen_install_$i +mkdir ${baseFolder}/dbgen_install_$i +cd ${baseFolder}/dbgen_install_$i +git clone '$url' +cd tpch-dbgen +$checkoutRevision +make +ln -sf ${baseFolder}/dbgen_install_$i/tpch-dbgen ${baseFolder}/dbgen || echo "ln -sf failed" +test -e ${baseFolder}/dbgen/dbgen +echo "OK" + """) + pw.close + Seq("chmod", "+x", s"${baseFolder}/dbgen_$i.sh").! + Seq(s"${baseFolder}/dbgen_$i.sh").!! +} + +// COMMAND ---------- + +// FOR INSTALLING TPCDS DSDGEN (with the stdout patch) +// Note: it assumes Debian/Ubuntu host, edit package manager if not +def installDSDGEN(url: String = "https://github.com/databricks/tpcds-kit.git", useStdout: Boolean = true, baseFolder: String = "/tmp")(i: java.lang.Long): String = { + Seq("mkdir", "-p", baseFolder).! + val pw = new PrintWriter(new File(s"${baseFolder}/dsdgen_$i.sh" )) + pw.write(s""" +sudo apt-get update +sudo apt-get -y --force-yes install gcc make flex bison byacc git +rm -rf ${baseFolder}/dsdgen +rm -rf ${baseFolder}/dsdgen_install_$i +mkdir ${baseFolder}/dsdgen_install_$i +cd ${baseFolder}/dsdgen_install_$i +git clone '$url' +cd tpcds-kit/tools +make -f Makefile.suite +ln -sf ${baseFolder}/dsdgen_install_$i/tpcds-kit/tools ${baseFolder}/dsdgen || echo "ln -sf failed" +${baseFolder}/dsdgen/dsdgen -h +test -e ${baseFolder}/dsdgen/dsdgen +echo "OK" + """) + pw.close + Seq("chmod", "+x", s"${baseFolder}/dsdgen_$i.sh").! + Seq(s"${baseFolder}/dsdgen_$i.sh").!! +} + +// COMMAND ---------- + +// install (build) the data generators in all nodes +val res = spark.range(0, workers, 1, workers).map(worker => benchmarks.map{ + case "TPCDS" => s"TPCDS worker $worker\n" + installDSDGEN(baseFolder = baseDatagenFolder)(worker) + case "TPCH" => s"TPCH worker $worker\n" + installDBGEN(baseFolder = baseDatagenFolder)(worker) + }).collect() + +// COMMAND ---------- + +// Set the benchmark name, tables, and location for each benchmark +// returns (dbname, tables, location) +def getBenchmarkData(benchmark: String, scaleFactor: String) = benchmark match { + + case "TPCH" => ( + s"tpch_sf${scaleFactor}_${fileFormat}${dbSuffix}", + new TPCHTables(spark.sqlContext, dbgenDir = s"${baseDatagenFolder}/dbgen", scaleFactor = scaleFactor, useDoubleForDecimal = false, useStringForDate = false, generatorParams = Nil), + s"$baseLocation/tpch/sf${scaleFactor}_${fileFormat}") + + case "TPCDS" if !TPCDSUseLegacyOptions => ( + s"tpcds_sf${scaleFactor}_${fileFormat}${dbSuffix}", + new TPCDSTables(spark.sqlContext, dsdgenDir = s"${baseDatagenFolder}/dsdgen", scaleFactor = scaleFactor, useDoubleForDecimal = false, useStringForDate = false), + s"$baseLocation/tpcds-2.4/sf${scaleFactor}_${fileFormat}") + + case "TPCDS" if TPCDSUseLegacyOptions => ( + s"tpcds_sf${scaleFactor}_nodecimal_nodate_withnulls${dbSuffix}", + new TPCDSTables(spark.sqlContext, s"${baseDatagenFolder}/dsdgen", scaleFactor = scaleFactor, useDoubleForDecimal = true, useStringForDate = true), + s"$baseLocation/tpcds/sf$scaleFactor-$fileFormat/useDecimal=false,useDate=false,filterNull=false") +} + +// COMMAND ---------- + +// Data generation +def isPartitioned (tables: Tables, tableName: String) : Boolean = + util.Try(tables.tables.find(_.name == tableName).get.partitionColumns.nonEmpty).getOrElse(false) + +def loadData(tables: Tables, location: String, scaleFactor: String) = { + val tableNames = tables.tables.map(_.name) + tableNames.foreach { tableName => + // generate data + time { + tables.genData( + location = location, + format = fileFormat, + overwrite = overwrite, + partitionTables = true, + // if to coallesce into a single file (only one writter for non partitioned tables = slow) + clusterByPartitionColumns = shuffle, //if (isPartitioned(tables, tableName)) false else true, + filterOutNullPartitionValues = false, + tableFilter = tableName, + // this controlls parallelism on datagen and number of writers (# of files for non-partitioned) + // in general we want many writers to S3, and smaller tasks for large scale factors to avoid OOM and shuffle errors + numPartitions = if (scaleFactor.toInt <= 100 || !isPartitioned(tables, tableName)) (workers * cores) else (workers * cores * 4)) + } + } +} + +// COMMAND ---------- + +// Create the DB, import data, create +def createExternal(location: String, dbname: String, tables: Tables) = { + tables.createExternalTables(location, fileFormat, dbname, overwrite = overwrite, discoverPartitions = true) +} + +def loadDB(dbname: String, tables: Tables, location: String) = { + val tableNames = tables.tables.map(_.name) + time { + println(s"Creating external tables at $location") + createExternal(location, dbname, tables) + } + // Show table information and attempt to vacuum + tableNames.foreach { tableName => + println(s"Table $tableName has " + util.Try(sql(s"SHOW PARTITIONS $tableName").count() + " partitions").getOrElse(s"no partitions")) + util.Try(sql(s"VACUUM $tableName RETAIN 0.0. HOURS"))getOrElse(println(s"Cannot VACUUM $tableName")) + sql(s"DESCRIBE EXTENDED $tableName").show(999, false) + println + } +} + +// COMMAND ---------- + +def setScaleConfig(scaleFactor: String): Unit = { + // Avoid OOM when shuffling large scale fators + // and errors like 2GB shuffle limit at 10TB like: Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Too large frame: 9640891355 + // For 10TB 16x4core nodes were needed with the config below, 8x for 1TB and below. + // About 24hrs. for SF 1 to 10,000. + if (scaleFactor.toInt >= 10000) { + spark.conf.set("spark.sql.shuffle.partitions", "20000") + SparkHadoopUtil.get.conf.set("parquet.memory.pool.ratio", "0.1") + } + else if (scaleFactor.toInt >= 1000) { + spark.conf.set("spark.sql.shuffle.partitions", "2001") //one above 2000 to use HighlyCompressedMapStatus + SparkHadoopUtil.get.conf.set("parquet.memory.pool.ratio", "0.3") + } + else { + spark.conf.set("spark.sql.shuffle.partitions", "200") //default + SparkHadoopUtil.get.conf.set("parquet.memory.pool.ratio", "0.5") + } +} + +// COMMAND ---------- + +// Generate the data, import the tables, generate stats for selected benchmarks and scale factors +scaleFactors.foreach { scaleFactor => { + + // First set some config settings affecting OOMs/performance + setScaleConfig(scaleFactor) + + benchmarks.foreach{ benchmark => { + val (dbname, tables, location) = getBenchmarkData(benchmark, scaleFactor) + // Start the actual loading + time { + println(s"Generating data for $benchmark SF $scaleFactor at $location") + loadData(tables = tables, location = location, scaleFactor = scaleFactor) + } + time { + println(s"\nImporting data for $benchmark into DB $dbname from $location") + loadDB(dbname = dbname, tables = tables, location = location) + } + if (createTableStats) time { + println(s"\nGenerating table statistics for DB $dbname (with analyzeColumns=$createColumnStats)") + tables.analyzeTables(dbname, analyzeColumns = createColumnStats) + } + }} +}} + + +// COMMAND ---------- + +// Print table structure for manual validation +scaleFactors.foreach { scaleFactor => + benchmarks.foreach{ benchmark => { + val (dbname, tables, location) = getBenchmarkData(benchmark, scaleFactor) + sql(s"use $dbname") + time { + sql(s"show tables").select("tableName").collect().foreach{ tableName => + val name: String = tableName.toString().drop(1).dropRight(1) + println(s"Printing table information for $benchmark SF $scaleFactor table $name") + val count = sql(s"select count(*) as ${name}_count from $name").collect()(0)(0) + println(s"Table $name has " + util.Try(sql(s"SHOW PARTITIONS $name").count() + " partitions").getOrElse(s"no partitions") + s" and $count rows.") + sql(s"describe extended $name").show(999, false) + } + } + println + }} +} diff --git a/src/main/notebooks/tpch_run.scala b/src/main/notebooks/tpch_run.scala new file mode 100644 index 0000000..d93edc2 --- /dev/null +++ b/src/main/notebooks/tpch_run.scala @@ -0,0 +1,79 @@ +// Databricks notebook source +// TPCH runner (from spark-sql-perf) to be used on existing tables +// edit the main configuration below + +val scaleFactors = Seq(1, 10, 100, 1000) //set scale factors to run +val format = "parquet" //format has have already been generated + +def perfDatasetsLocation(scaleFactor: Int, format: String) = + s"s3a://my-bucket/tpch/sf${scaleFactor}_${format}" + +val resultLocation = "s3a://my-bucket/results" +val iterations = 2 +def databaseName(scaleFactor: Int, format: String) = s"tpch_sf${scaleFactor}_${format}" +val randomizeQueries = false //to use on concurrency tests + +// Experiment metadata for results, edit if outside Databricks +val configuration = "default" //use default when using the out-of-box config +val runtype = "TPCH run" // Edit +val workers = 10 // Edit to the number of worker +val workerInstanceType = "my_VM_instance" // Edit to the instance type + +// Make sure spark-sql-perf library is available (use the assembly version) +import com.databricks.spark.sql.perf.tpch._ +import org.apache.spark.sql.functions._ + +// default config (for all notebooks) +var config : Map[String, String] = Map ( + "spark.sql.broadcastTimeout" -> "7200" // Enable for SF 10,000 +) +// Set the spark config +for ((k, v) <- config) spark.conf.set(k, v) +// Print the custom configs first +for ((k,v) <- config) println(k, spark.conf.get(k)) +// Print all for easy debugging +print(spark.conf.getAll) + +val tpch = new TPCH(sqlContext = spark.sqlContext) + +// filter queries (if selected) +import com.databricks.spark.sql.perf.Query +import com.databricks.spark.sql.perf.ExecutionMode.CollectResults +import org.apache.commons.io.IOUtils + +val queries = (1 to 22).map { q => + val queryContent: String = IOUtils.toString( + getClass().getClassLoader().getResourceAsStream(s"tpch/queries/$q.sql")) + new Query(s"Q$q", spark.sqlContext.sql(queryContent), description = s"TPCH Query $q", + executionMode = CollectResults) +} + +// COMMAND ---------- + +scaleFactors.foreach{ scaleFactor => + println("DB SF " + databaseName(scaleFactor, format)) + sql(s"USE ${databaseName(scaleFactor, format)}") + val experiment = tpch.runExperiment( + queries, + iterations = iterations, + resultLocation = resultLocation, + tags = Map( + "runtype" -> runtype, + "date" -> java.time.LocalDate.now.toString, + "database" -> databaseName(scaleFactor, format), + "scale_factor" -> scaleFactor.toString, + "spark_version" -> spark.version, + "system" -> "Spark", + "workers" -> workers, + "workerInstanceType" -> workerInstanceType, + "configuration" -> configuration + ) + ) + println(s"Running SF $scaleFactor") + experiment.waitForFinish(36 * 60 * 60) //36hours + val summary = experiment.getCurrentResults + .withColumn("Name", substring(col("name"), 2, 100)) + .withColumn("Runtime", (col("parsingTime") + col("analysisTime") + col("optimizationTime") + col("planningTime") + col("executionTime")) / 1000.0) + .select('Name, 'Runtime) + summary.show(9999, false) +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpch/TPCH.scala b/src/main/scala/com/databricks/spark/sql/perf/tpch/TPCH.scala index 6d78f13..52f7a3a 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpch/TPCH.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpch/TPCH.scala @@ -77,7 +77,7 @@ class TPCHTables( val tables = Seq( Table("part", - partitionColumns = Nil, + partitionColumns = "p_brand" :: Nil, 'p_partkey.long, 'p_name.string, 'p_mfgr.string, @@ -107,7 +107,7 @@ class TPCHTables( 'ps_comment.string ), Table("customer", - partitionColumns = Nil, + partitionColumns = "c_mktsegment" :: Nil, 'c_custkey.long, 'c_name.string, 'c_address.string, @@ -118,7 +118,7 @@ class TPCHTables( 'c_comment.string ), Table("orders", - partitionColumns = Nil, + partitionColumns = "o_orderdate" :: Nil, 'o_orderkey.long, 'o_custkey.long, 'o_orderstatus.string, @@ -130,7 +130,7 @@ class TPCHTables( 'o_comment.string ), Table("lineitem", - partitionColumns = Nil, + partitionColumns = "l_shipdate" :: Nil, 'l_orderkey.long, 'l_partkey.long, 'l_suppkey.long,