diff --git a/src/main/resources/tpch/README b/src/main/resources/tpch/README new file mode 100644 index 0000000..8c07ee3 --- /dev/null +++ b/src/main/resources/tpch/README @@ -0,0 +1,14 @@ +The queries were generated with: + + DSS_QUERIES=queries qgen -d 1 + +with the following changes: +* "first -1" removed +* "first N" changed to "limit N" +* ; (semicolon) at end of queries removed +* Q1: "interval '90' day (3)" changed to "interval '90' day" +* Q7, Q8, Q9: "extract(year from X)" changed to "year(X)" +* Q13: "as c_orders (c_custkey, c_count)" changed to "c_orders" and c_count alias moved inside subquery +* Q15: CREATE VIEW changed to WITH +* Q22: "substring(c_phone from 1 to 2)" changed to "substring(c_phone, 1, 2)" + diff --git a/src/main/resources/tpch/queries/1.sql b/src/main/resources/tpch/queries/1.sql new file mode 100644 index 0000000..0c77db7 --- /dev/null +++ b/src/main/resources/tpch/queries/1.sql @@ -0,0 +1,24 @@ +-- using default substitutions + + +select + l_returnflag, + l_linestatus, + sum(l_quantity) as sum_qty, + sum(l_extendedprice) as sum_base_price, + sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, + sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, + avg(l_quantity) as avg_qty, + avg(l_extendedprice) as avg_price, + avg(l_discount) as avg_disc, + count(*) as count_order +from + lineitem +where + l_shipdate <= date '1998-12-01' - interval '90' day +group by + l_returnflag, + l_linestatus +order by + l_returnflag, + l_linestatus diff --git a/src/main/resources/tpch/queries/10.sql b/src/main/resources/tpch/queries/10.sql new file mode 100644 index 0000000..b843703 --- /dev/null +++ b/src/main/resources/tpch/queries/10.sql @@ -0,0 +1,35 @@ +-- using default substitutions + + +select + c_custkey, + c_name, + sum(l_extendedprice * (1 - l_discount)) as revenue, + c_acctbal, + n_name, + c_address, + c_phone, + c_comment +from + customer, + orders, + lineitem, + nation +where + c_custkey = o_custkey + and l_orderkey = o_orderkey + and o_orderdate >= date '1993-10-01' + and o_orderdate < date '1993-10-01' + interval '3' month + and l_returnflag = 'R' + and c_nationkey = n_nationkey +group by + c_custkey, + c_name, + c_acctbal, + c_phone, + n_name, + c_address, + c_comment +order by + revenue desc +limit 20 diff --git a/src/main/resources/tpch/queries/11.sql b/src/main/resources/tpch/queries/11.sql new file mode 100644 index 0000000..a85be27 --- /dev/null +++ b/src/main/resources/tpch/queries/11.sql @@ -0,0 +1,30 @@ +-- using default substitutions + + +select + ps_partkey, + sum(ps_supplycost * ps_availqty) as value +from + partsupp, + supplier, + nation +where + ps_suppkey = s_suppkey + and s_nationkey = n_nationkey + and n_name = 'GERMANY' +group by + ps_partkey having + sum(ps_supplycost * ps_availqty) > ( + select + sum(ps_supplycost * ps_availqty) * 0.0001000000 + from + partsupp, + supplier, + nation + where + ps_suppkey = s_suppkey + and s_nationkey = n_nationkey + and n_name = 'GERMANY' + ) +order by + value desc diff --git a/src/main/resources/tpch/queries/12.sql b/src/main/resources/tpch/queries/12.sql new file mode 100644 index 0000000..810cc56 --- /dev/null +++ b/src/main/resources/tpch/queries/12.sql @@ -0,0 +1,31 @@ +-- using default substitutions + + +select + l_shipmode, + sum(case + when o_orderpriority = '1-URGENT' + or o_orderpriority = '2-HIGH' + then 1 + else 0 + end) as high_line_count, + sum(case + when o_orderpriority <> '1-URGENT' + and o_orderpriority <> '2-HIGH' + then 1 + else 0 + end) as low_line_count +from + orders, + lineitem +where + o_orderkey = l_orderkey + and l_shipmode in ('MAIL', 'SHIP') + and l_commitdate < l_receiptdate + and l_shipdate < l_commitdate + and l_receiptdate >= date '1994-01-01' + and l_receiptdate < date '1994-01-01' + interval '1' year +group by + l_shipmode +order by + l_shipmode diff --git a/src/main/resources/tpch/queries/13.sql b/src/main/resources/tpch/queries/13.sql new file mode 100644 index 0000000..b3fdbcb --- /dev/null +++ b/src/main/resources/tpch/queries/13.sql @@ -0,0 +1,23 @@ +-- using default substitutions + + +select + c_count, + count(*) as custdist +from + ( + select + c_custkey, + count(o_orderkey) as c_count + from + customer left outer join orders on + c_custkey = o_custkey + and o_comment not like '%special%requests%' + group by + c_custkey + ) as c_orders +group by + c_count +order by + custdist desc, + c_count desc diff --git a/src/main/resources/tpch/queries/14.sql b/src/main/resources/tpch/queries/14.sql new file mode 100644 index 0000000..0ad826f --- /dev/null +++ b/src/main/resources/tpch/queries/14.sql @@ -0,0 +1,16 @@ +-- using default substitutions + + +select + 100.00 * sum(case + when p_type like 'PROMO%' + then l_extendedprice * (1 - l_discount) + else 0 + end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue +from + lineitem, + part +where + l_partkey = p_partkey + and l_shipdate >= date '1995-09-01' + and l_shipdate < date '1995-09-01' + interval '1' month diff --git a/src/main/resources/tpch/queries/15.sql b/src/main/resources/tpch/queries/15.sql new file mode 100644 index 0000000..64d0b48 --- /dev/null +++ b/src/main/resources/tpch/queries/15.sql @@ -0,0 +1,35 @@ +-- using default substitutions + +with revenue0 as + (select + l_suppkey as supplier_no, + sum(l_extendedprice * (1 - l_discount)) as total_revenue + from + lineitem + where + l_shipdate >= date '1996-01-01' + and l_shipdate < date '1996-01-01' + interval '3' month + group by + l_suppkey) + + +select + s_suppkey, + s_name, + s_address, + s_phone, + total_revenue +from + supplier, + revenue0 +where + s_suppkey = supplier_no + and total_revenue = ( + select + max(total_revenue) + from + revenue0 + ) +order by + s_suppkey + diff --git a/src/main/resources/tpch/queries/16.sql b/src/main/resources/tpch/queries/16.sql new file mode 100644 index 0000000..f155702 --- /dev/null +++ b/src/main/resources/tpch/queries/16.sql @@ -0,0 +1,33 @@ +-- using default substitutions + + +select + p_brand, + p_type, + p_size, + count(distinct ps_suppkey) as supplier_cnt +from + partsupp, + part +where + p_partkey = ps_partkey + and p_brand <> 'Brand#45' + and p_type not like 'MEDIUM POLISHED%' + and p_size in (49, 14, 23, 45, 19, 3, 36, 9) + and ps_suppkey not in ( + select + s_suppkey + from + supplier + where + s_comment like '%Customer%Complaints%' + ) +group by + p_brand, + p_type, + p_size +order by + supplier_cnt desc, + p_brand, + p_type, + p_size diff --git a/src/main/resources/tpch/queries/17.sql b/src/main/resources/tpch/queries/17.sql new file mode 100644 index 0000000..fcc60d2 --- /dev/null +++ b/src/main/resources/tpch/queries/17.sql @@ -0,0 +1,20 @@ +-- using default substitutions + + +select + sum(l_extendedprice) / 7.0 as avg_yearly +from + lineitem, + part +where + p_partkey = l_partkey + and p_brand = 'Brand#23' + and p_container = 'MED BOX' + and l_quantity < ( + select + 0.2 * avg(l_quantity) + from + lineitem + where + l_partkey = p_partkey + ) diff --git a/src/main/resources/tpch/queries/18.sql b/src/main/resources/tpch/queries/18.sql new file mode 100644 index 0000000..a74f316 --- /dev/null +++ b/src/main/resources/tpch/queries/18.sql @@ -0,0 +1,36 @@ +-- using default substitutions + + +select + c_name, + c_custkey, + o_orderkey, + o_orderdate, + o_totalprice, + sum(l_quantity) +from + customer, + orders, + lineitem +where + o_orderkey in ( + select + l_orderkey + from + lineitem + group by + l_orderkey having + sum(l_quantity) > 300 + ) + and c_custkey = o_custkey + and o_orderkey = l_orderkey +group by + c_name, + c_custkey, + o_orderkey, + o_orderdate, + o_totalprice +order by + o_totalprice desc, + o_orderdate +limit 100 diff --git a/src/main/resources/tpch/queries/19.sql b/src/main/resources/tpch/queries/19.sql new file mode 100644 index 0000000..0163e15 --- /dev/null +++ b/src/main/resources/tpch/queries/19.sql @@ -0,0 +1,38 @@ +-- using default substitutions + + +select + sum(l_extendedprice* (1 - l_discount)) as revenue +from + lineitem, + part +where + ( + p_partkey = l_partkey + and p_brand = 'Brand#12' + and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') + and l_quantity >= 1 and l_quantity <= 1 + 10 + and p_size between 1 and 5 + and l_shipmode in ('AIR', 'AIR REG') + and l_shipinstruct = 'DELIVER IN PERSON' + ) + or + ( + p_partkey = l_partkey + and p_brand = 'Brand#23' + and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') + and l_quantity >= 10 and l_quantity <= 10 + 10 + and p_size between 1 and 10 + and l_shipmode in ('AIR', 'AIR REG') + and l_shipinstruct = 'DELIVER IN PERSON' + ) + or + ( + p_partkey = l_partkey + and p_brand = 'Brand#34' + and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') + and l_quantity >= 20 and l_quantity <= 20 + 10 + and p_size between 1 and 15 + and l_shipmode in ('AIR', 'AIR REG') + and l_shipinstruct = 'DELIVER IN PERSON' + ) diff --git a/src/main/resources/tpch/queries/2.sql b/src/main/resources/tpch/queries/2.sql new file mode 100644 index 0000000..2cf1165 --- /dev/null +++ b/src/main/resources/tpch/queries/2.sql @@ -0,0 +1,47 @@ +-- using default substitutions + + +select + s_acctbal, + s_name, + n_name, + p_partkey, + p_mfgr, + s_address, + s_phone, + s_comment +from + part, + supplier, + partsupp, + nation, + region +where + p_partkey = ps_partkey + and s_suppkey = ps_suppkey + and p_size = 15 + and p_type like '%BRASS' + and s_nationkey = n_nationkey + and n_regionkey = r_regionkey + and r_name = 'EUROPE' + and ps_supplycost = ( + select + min(ps_supplycost) + from + partsupp, + supplier, + nation, + region + where + p_partkey = ps_partkey + and s_suppkey = ps_suppkey + and s_nationkey = n_nationkey + and n_regionkey = r_regionkey + and r_name = 'EUROPE' + ) +order by + s_acctbal desc, + n_name, + s_name, + p_partkey +limit 100 diff --git a/src/main/resources/tpch/queries/20.sql b/src/main/resources/tpch/queries/20.sql new file mode 100644 index 0000000..a7b8706 --- /dev/null +++ b/src/main/resources/tpch/queries/20.sql @@ -0,0 +1,40 @@ +-- using default substitutions + + +select + s_name, + s_address +from + supplier, + nation +where + s_suppkey in ( + select + ps_suppkey + from + partsupp + where + ps_partkey in ( + select + p_partkey + from + part + where + p_name like 'forest%' + ) + and ps_availqty > ( + select + 0.5 * sum(l_quantity) + from + lineitem + where + l_partkey = ps_partkey + and l_suppkey = ps_suppkey + and l_shipdate >= date '1994-01-01' + and l_shipdate < date '1994-01-01' + interval '1' year + ) + ) + and s_nationkey = n_nationkey + and n_name = 'CANADA' +order by + s_name diff --git a/src/main/resources/tpch/queries/21.sql b/src/main/resources/tpch/queries/21.sql new file mode 100644 index 0000000..7a39f25 --- /dev/null +++ b/src/main/resources/tpch/queries/21.sql @@ -0,0 +1,43 @@ +-- using default substitutions + + +select + s_name, + count(*) as numwait +from + supplier, + lineitem l1, + orders, + nation +where + s_suppkey = l1.l_suppkey + and o_orderkey = l1.l_orderkey + and o_orderstatus = 'F' + and l1.l_receiptdate > l1.l_commitdate + and exists ( + select + * + from + lineitem l2 + where + l2.l_orderkey = l1.l_orderkey + and l2.l_suppkey <> l1.l_suppkey + ) + and not exists ( + select + * + from + lineitem l3 + where + l3.l_orderkey = l1.l_orderkey + and l3.l_suppkey <> l1.l_suppkey + and l3.l_receiptdate > l3.l_commitdate + ) + and s_nationkey = n_nationkey + and n_name = 'SAUDI ARABIA' +group by + s_name +order by + numwait desc, + s_name +limit 100 diff --git a/src/main/resources/tpch/queries/22.sql b/src/main/resources/tpch/queries/22.sql new file mode 100644 index 0000000..5c76407 --- /dev/null +++ b/src/main/resources/tpch/queries/22.sql @@ -0,0 +1,40 @@ +-- using default substitutions + + +select + cntrycode, + count(*) as numcust, + sum(c_acctbal) as totacctbal +from + ( + select + substring(c_phone, 1, 2) as cntrycode, + c_acctbal + from + customer + where + substring(c_phone, 1, 2) in + ('13', '31', '23', '29', '30', '18', '17') + and c_acctbal > ( + select + avg(c_acctbal) + from + customer + where + c_acctbal > 0.00 + and substring(c_phone, 1, 2) in + ('13', '31', '23', '29', '30', '18', '17') + ) + and not exists ( + select + * + from + orders + where + o_custkey = c_custkey + ) + ) as custsale +group by + cntrycode +order by + cntrycode diff --git a/src/main/resources/tpch/queries/3.sql b/src/main/resources/tpch/queries/3.sql new file mode 100644 index 0000000..c30c528 --- /dev/null +++ b/src/main/resources/tpch/queries/3.sql @@ -0,0 +1,26 @@ +-- using default substitutions + + +select + l_orderkey, + sum(l_extendedprice * (1 - l_discount)) as revenue, + o_orderdate, + o_shippriority +from + customer, + orders, + lineitem +where + c_mktsegment = 'BUILDING' + and c_custkey = o_custkey + and l_orderkey = o_orderkey + and o_orderdate < date '1995-03-15' + and l_shipdate > date '1995-03-15' +group by + l_orderkey, + o_orderdate, + o_shippriority +order by + revenue desc, + o_orderdate +limit 10 diff --git a/src/main/resources/tpch/queries/4.sql b/src/main/resources/tpch/queries/4.sql new file mode 100644 index 0000000..659f6f5 --- /dev/null +++ b/src/main/resources/tpch/queries/4.sql @@ -0,0 +1,24 @@ +-- using default substitutions + + +select + o_orderpriority, + count(*) as order_count +from + orders +where + o_orderdate >= date '1993-07-01' + and o_orderdate < date '1993-07-01' + interval '3' month + and exists ( + select + * + from + lineitem + where + l_orderkey = o_orderkey + and l_commitdate < l_receiptdate + ) +group by + o_orderpriority +order by + o_orderpriority diff --git a/src/main/resources/tpch/queries/5.sql b/src/main/resources/tpch/queries/5.sql new file mode 100644 index 0000000..bc6fece --- /dev/null +++ b/src/main/resources/tpch/queries/5.sql @@ -0,0 +1,27 @@ +-- using default substitutions + + +select + n_name, + sum(l_extendedprice * (1 - l_discount)) as revenue +from + customer, + orders, + lineitem, + supplier, + nation, + region +where + c_custkey = o_custkey + and l_orderkey = o_orderkey + and l_suppkey = s_suppkey + and c_nationkey = s_nationkey + and s_nationkey = n_nationkey + and n_regionkey = r_regionkey + and r_name = 'ASIA' + and o_orderdate >= date '1994-01-01' + and o_orderdate < date '1994-01-01' + interval '1' year +group by + n_name +order by + revenue desc diff --git a/src/main/resources/tpch/queries/6.sql b/src/main/resources/tpch/queries/6.sql new file mode 100644 index 0000000..92496da --- /dev/null +++ b/src/main/resources/tpch/queries/6.sql @@ -0,0 +1,12 @@ +-- using default substitutions + + +select + sum(l_extendedprice * l_discount) as revenue +from + lineitem +where + l_shipdate >= date '1994-01-01' + and l_shipdate < date '1994-01-01' + interval '1' year + and l_discount between .06 - 0.01 and .06 + 0.01 + and l_quantity < 24 diff --git a/src/main/resources/tpch/queries/7.sql b/src/main/resources/tpch/queries/7.sql new file mode 100644 index 0000000..eb3c51c --- /dev/null +++ b/src/main/resources/tpch/queries/7.sql @@ -0,0 +1,42 @@ +-- using default substitutions + + +select + supp_nation, + cust_nation, + l_year, + sum(volume) as revenue +from + ( + select + n1.n_name as supp_nation, + n2.n_name as cust_nation, + year(l_shipdate) as l_year, + l_extendedprice * (1 - l_discount) as volume + from + supplier, + lineitem, + orders, + customer, + nation n1, + nation n2 + where + s_suppkey = l_suppkey + and o_orderkey = l_orderkey + and c_custkey = o_custkey + and s_nationkey = n1.n_nationkey + and c_nationkey = n2.n_nationkey + and ( + (n1.n_name = 'FRANCE' and n2.n_name = 'GERMANY') + or (n1.n_name = 'GERMANY' and n2.n_name = 'FRANCE') + ) + and l_shipdate between date '1995-01-01' and date '1996-12-31' + ) as shipping +group by + supp_nation, + cust_nation, + l_year +order by + supp_nation, + cust_nation, + l_year diff --git a/src/main/resources/tpch/queries/8.sql b/src/main/resources/tpch/queries/8.sql new file mode 100644 index 0000000..ca15ebf --- /dev/null +++ b/src/main/resources/tpch/queries/8.sql @@ -0,0 +1,40 @@ +-- using default substitutions + + +select + o_year, + sum(case + when nation = 'BRAZIL' then volume + else 0 + end) / sum(volume) as mkt_share +from + ( + select + year(o_orderdate) as o_year, + l_extendedprice * (1 - l_discount) as volume, + n2.n_name as nation + from + part, + supplier, + lineitem, + orders, + customer, + nation n1, + nation n2, + region + where + p_partkey = l_partkey + and s_suppkey = l_suppkey + and l_orderkey = o_orderkey + and o_custkey = c_custkey + and c_nationkey = n1.n_nationkey + and n1.n_regionkey = r_regionkey + and r_name = 'AMERICA' + and s_nationkey = n2.n_nationkey + and o_orderdate between date '1995-01-01' and date '1996-12-31' + and p_type = 'ECONOMY ANODIZED STEEL' + ) as all_nations +group by + o_year +order by + o_year diff --git a/src/main/resources/tpch/queries/9.sql b/src/main/resources/tpch/queries/9.sql new file mode 100644 index 0000000..9925b18 --- /dev/null +++ b/src/main/resources/tpch/queries/9.sql @@ -0,0 +1,35 @@ +-- using default substitutions + + +select + nation, + o_year, + sum(amount) as sum_profit +from + ( + select + n_name as nation, + year(o_orderdate) as o_year, + l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount + from + part, + supplier, + lineitem, + partsupp, + orders, + nation + where + s_suppkey = l_suppkey + and ps_suppkey = l_suppkey + and ps_partkey = l_partkey + and p_partkey = l_partkey + and o_orderkey = l_orderkey + and s_nationkey = n_nationkey + and p_name like '%green%' + ) as profit +group by + nation, + o_year +order by + nation, + o_year desc diff --git a/src/main/scala/com/databricks/spark/sql/perf/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/Tables.scala new file mode 100644 index 0000000..842780b --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/Tables.scala @@ -0,0 +1,278 @@ +/* + * Copyright 2015 Databricks Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.spark.sql.perf + +import scala.sys.process._ + +import org.slf4j.LoggerFactory + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{Row, SQLContext, SaveMode} + +trait DataGenerator extends Serializable { + def generate( + sparkContext: SparkContext, + name: String, + partitions: Int, + scaleFactor: String): RDD[String] +} + + +abstract class Tables(sqlContext: SQLContext, scaleFactor: String, + useDoubleForDecimal: Boolean = false, useStringForDate: Boolean = false) + extends Serializable { + + def dataGenerator: DataGenerator + def tables: Seq[Table] + + private val log = LoggerFactory.getLogger(getClass) + + def sparkContext = sqlContext.sparkContext + + case class Table(name: String, partitionColumns: Seq[String], fields: StructField*) { + val schema = StructType(fields) + + def nonPartitioned: Table = { + Table(name, Nil, fields : _*) + } + + /** + * If convertToSchema is true, the data from generator will be parsed into columns and + * converted to `schema`. Otherwise, it just outputs the raw data (as a single STRING column). + */ + def df(convertToSchema: Boolean, numPartition: Int) = { + val generatedData = dataGenerator.generate(sparkContext, name, numPartition, scaleFactor) + val rows = generatedData.mapPartitions { iter => + iter.map { l => + if (convertToSchema) { + val values = l.split("\\|", -1).dropRight(1).map { v => + if (v.equals("")) { + // If the string value is an empty string, we turn it to a null + null + } else { + v + } + } + Row.fromSeq(values) + } else { + Row.fromSeq(Seq(l)) + } + } + } + + if (convertToSchema) { + val stringData = + sqlContext.createDataFrame( + rows, + StructType(schema.fields.map(f => StructField(f.name, StringType)))) + + val convertedData = { + val columns = schema.fields.map { f => + col(f.name).cast(f.dataType).as(f.name) + } + stringData.select(columns: _*) + } + + convertedData + } else { + sqlContext.createDataFrame(rows, StructType(Seq(StructField("value", StringType)))) + } + } + + def convertTypes(): Table = { + val newFields = fields.map { field => + val newDataType = field.dataType match { + case decimal: DecimalType if useDoubleForDecimal => DoubleType + case date: DateType if useStringForDate => StringType + case other => other + } + field.copy(dataType = newDataType) + } + + Table(name, partitionColumns, newFields:_*) + } + + def genData( + location: String, + format: String, + overwrite: Boolean, + clusterByPartitionColumns: Boolean, + filterOutNullPartitionValues: Boolean, + numPartitions: Int): Unit = { + val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Ignore + + val data = df(format != "text", numPartitions) + val tempTableName = s"${name}_text" + data.registerTempTable(tempTableName) + + val writer = if (partitionColumns.nonEmpty) { + if (clusterByPartitionColumns) { + val columnString = data.schema.fields.map { field => + field.name + }.mkString(",") + val partitionColumnString = partitionColumns.mkString(",") + val predicates = if (filterOutNullPartitionValues) { + partitionColumns.map(col => s"$col IS NOT NULL").mkString("WHERE ", " AND ", "") + } else { + "" + } + + val query = + s""" + |SELECT + | $columnString + |FROM + | $tempTableName + |$predicates + |DISTRIBUTE BY + | $partitionColumnString + """.stripMargin + val grouped = sqlContext.sql(query) + println(s"Pre-clustering with partitioning columns with query $query.") + log.info(s"Pre-clustering with partitioning columns with query $query.") + grouped.write + } else { + data.write + } + } else { + data.write + } + writer.format(format).mode(mode) + if (partitionColumns.nonEmpty) { + writer.partitionBy(partitionColumns : _*) + } + println(s"Generating table $name in database to $location with save mode $mode.") + log.info(s"Generating table $name in database to $location with save mode $mode.") + writer.save(location) + sqlContext.dropTempTable(tempTableName) + } + + def createExternalTable(location: String, format: String, databaseName: String, + overwrite: Boolean, discoverPartitions: Boolean = true): Unit = { + + val qualifiedTableName = databaseName + "." + name + val tableExists = sqlContext.tableNames(databaseName).contains(name) + if (overwrite) { + sqlContext.sql(s"DROP TABLE IF EXISTS $databaseName.$name") + } + if (!tableExists || overwrite) { + println(s"Creating external table $name in database $databaseName using data stored in $location.") + log.info(s"Creating external table $name in database $databaseName using data stored in $location.") + sqlContext.createExternalTable(qualifiedTableName, location, format) + } + if (partitionColumns.nonEmpty && discoverPartitions) { + println(s"Discovering partitions for table $name.") + log.info(s"Discovering partitions for table $name.") + sqlContext.sql(s"ALTER TABLE $databaseName.$name RECOVER PARTITIONS") + } + } + + def createTemporaryTable(location: String, format: String): Unit = { + println(s"Creating temporary table $name using data stored in $location.") + log.info(s"Creating temporary table $name using data stored in $location.") + sqlContext.read.format(format).load(location).registerTempTable(name) + } + + def analyzeTable(databaseName: String, analyzeColumns: Boolean = false): Unit = { + println(s"Analyzing table $name.") + log.info(s"Analyzing table $name.") + sqlContext.sql(s"ANALYZE TABLE $databaseName.$name COMPUTE STATISTICS") + if (analyzeColumns) { + val allColumns = fields.map(_.name).mkString(", ") + println(s"Analyzing table $name columns $allColumns.") + log.info(s"Analyzing table $name columns $allColumns.") + sqlContext.sql(s"ANALYZE TABLE $databaseName.$name COMPUTE STATISTICS FOR COLUMNS $allColumns") + } + } + } + + def genData( + location: String, + format: String, + overwrite: Boolean, + partitionTables: Boolean, + clusterByPartitionColumns: Boolean, + filterOutNullPartitionValues: Boolean, + tableFilter: String = "", + numPartitions: Int = 100): Unit = { + var tablesToBeGenerated = if (partitionTables) { + tables + } else { + tables.map(_.nonPartitioned) + } + + if (!tableFilter.isEmpty) { + tablesToBeGenerated = tablesToBeGenerated.filter(_.name == tableFilter) + if (tablesToBeGenerated.isEmpty) { + throw new RuntimeException("Bad table name filter: " + tableFilter) + } + } + + tablesToBeGenerated.foreach { table => + val tableLocation = s"$location/${table.name}" + table.genData(tableLocation, format, overwrite, clusterByPartitionColumns, + filterOutNullPartitionValues, numPartitions) + } + } + + def createExternalTables(location: String, format: String, databaseName: String, + overwrite: Boolean, discoverPartitions: Boolean, tableFilter: String = ""): Unit = { + + val filtered = if (tableFilter.isEmpty) { + tables + } else { + tables.filter(_.name == tableFilter) + } + + sqlContext.sql(s"CREATE DATABASE IF NOT EXISTS $databaseName") + filtered.foreach { table => + val tableLocation = s"$location/${table.name}" + table.createExternalTable(tableLocation, format, databaseName, overwrite, discoverPartitions) + } + sqlContext.sql(s"USE $databaseName") + println(s"The current database has been set to $databaseName.") + log.info(s"The current database has been set to $databaseName.") + } + + def createTemporaryTables(location: String, format: String, tableFilter: String = ""): Unit = { + val filtered = if (tableFilter.isEmpty) { + tables + } else { + tables.filter(_.name == tableFilter) + } + filtered.foreach { table => + val tableLocation = s"$location/${table.name}" + table.createTemporaryTable(tableLocation, format) + } + } + + def analyzeTables(databaseName: String, analyzeColumns: Boolean = false, tableFilter: String = ""): Unit = { + val filtered = if (tableFilter.isEmpty) { + tables + } else { + tables.filter(_.name == tableFilter) + } + filtered.foreach { table => + table.analyzeTable(databaseName, analyzeColumns) + } + } + + +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDSTables.scala similarity index 67% rename from src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala rename to src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDSTables.scala index 4d44f99..29553c7 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDSTables.scala @@ -18,274 +18,52 @@ package com.databricks.spark.sql.perf.tpcds import scala.sys.process._ -import org.slf4j.LoggerFactory +import com.databricks.spark.sql.perf +import com.databricks.spark.sql.perf.{DataGenerator, Table, Tables} -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{Row, SQLContext, SaveMode} +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext -class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int, - useDoubleForDecimal: Boolean = false, useStringForDate: Boolean = false) - extends Serializable { - - import sqlContext.implicits._ - - private val log = LoggerFactory.getLogger(getClass) - - def sparkContext = sqlContext.sparkContext +class DSDGEN(dsdgenDir: String) extends DataGenerator { val dsdgen = s"$dsdgenDir/dsdgen" - case class Table(name: String, partitionColumns: Seq[String], fields: StructField*) { - val schema = StructType(fields) - - def nonPartitioned: Table = { - Table(name, Nil, fields : _*) - } - - /** - * If convertToSchema is true, the data from generator will be parsed into columns and - * converted to `schema`. Otherwise, it just outputs the raw data (as a single STRING column). - */ - def df(convertToSchema: Boolean, numPartition: Int) = { - val partitions = if (partitionColumns.isEmpty) 1 else numPartition - val generatedData = { - sparkContext.parallelize(1 to partitions, partitions).flatMap { i => - val localToolsDir = if (new java.io.File(dsdgen).exists) { - dsdgenDir - } else if (new java.io.File(s"/$dsdgen").exists) { - s"/$dsdgenDir" - } else { - sys.error(s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install") - } - - // Note: RNGSEED is the RNG seed used by the data generator. Right now, it is fixed to 100. - val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else "" - val commands = Seq( - "bash", "-c", - s"cd $localToolsDir && ./dsdgen -table $name -filter Y -scale $scaleFactor -RNGSEED 100 $parallel") - println(commands) - commands.lines - } - } - - generatedData.setName(s"$name, sf=$scaleFactor, strings") - - val rows = generatedData.mapPartitions { iter => - iter.map { l => - if (convertToSchema) { - val values = l.split("\\|", -1).dropRight(1).map { v => - if (v.equals("")) { - // If the string value is an empty string, we turn it to a null - null - } else { - v - } - } - Row.fromSeq(values) - } else { - Row.fromSeq(Seq(l)) - } - } - } - - if (convertToSchema) { - val stringData = - sqlContext.createDataFrame( - rows, - StructType(schema.fields.map(f => StructField(f.name, StringType)))) - - val convertedData = { - val columns = schema.fields.map { f => - col(f.name).cast(f.dataType).as(f.name) - } - stringData.select(columns: _*) - } - - convertedData - } else { - sqlContext.createDataFrame(rows, StructType(Seq(StructField("value", StringType)))) - } - } - - def convertTypes(): Table = { - val newFields = fields.map { field => - val newDataType = field.dataType match { - case decimal: DecimalType if useDoubleForDecimal => DoubleType - case date: DateType if useStringForDate => StringType - case other => other - } - field.copy(dataType = newDataType) - } - - Table(name, partitionColumns, newFields:_*) - } - - def genData( - location: String, - format: String, - overwrite: Boolean, - clusterByPartitionColumns: Boolean, - filterOutNullPartitionValues: Boolean, - numPartitions: Int): Unit = { - val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Ignore - - val data = df(format != "text", numPartitions) - val tempTableName = s"${name}_text" - data.registerTempTable(tempTableName) - - val writer = if (partitionColumns.nonEmpty) { - if (clusterByPartitionColumns) { - val columnString = data.schema.fields.map { field => - field.name - }.mkString(",") - val partitionColumnString = partitionColumns.mkString(",") - val predicates = if (filterOutNullPartitionValues) { - partitionColumns.map(col => s"$col IS NOT NULL").mkString("WHERE ", " AND ", "") - } else { - "" - } - - val query = - s""" - |SELECT - | $columnString - |FROM - | $tempTableName - |$predicates - |DISTRIBUTE BY - | $partitionColumnString - """.stripMargin - val grouped = sqlContext.sql(query) - println(s"Pre-clustering with partitioning columns with query $query.") - log.info(s"Pre-clustering with partitioning columns with query $query.") - grouped.write + def generate(sparkContext: SparkContext, name: String, partitions: Int, scaleFactor: String) = { + val generatedData = { + sparkContext.parallelize(1 to partitions, partitions).flatMap { i => + val localToolsDir = if (new java.io.File(dsdgen).exists) { + dsdgenDir + } else if (new java.io.File(s"/$dsdgen").exists) { + s"/$dsdgenDir" } else { - data.write + sys.error(s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install") } - } else { - // If the table is not partitioned, coalesce the data to a single file. - data.coalesce(1).write - } - writer.format(format).mode(mode) - if (partitionColumns.nonEmpty) { - writer.partitionBy(partitionColumns : _*) - } - println(s"Generating table $name in database to $location with save mode $mode.") - log.info(s"Generating table $name in database to $location with save mode $mode.") - writer.save(location) - sqlContext.dropTempTable(tempTableName) - } - def createExternalTable(location: String, format: String, databaseName: String, - overwrite: Boolean, discoverPartitions: Boolean = true): Unit = { - - val qualifiedTableName = databaseName + "." + name - val tableExists = sqlContext.tableNames(databaseName).contains(name) - if (overwrite) { - sqlContext.sql(s"DROP TABLE IF EXISTS $databaseName.$name") - } - if (!tableExists || overwrite) { - println(s"Creating external table $name in database $databaseName using data stored in $location.") - log.info(s"Creating external table $name in database $databaseName using data stored in $location.") - sqlContext.createExternalTable(qualifiedTableName, location, format) - } - if (partitionColumns.nonEmpty && discoverPartitions) { - println(s"Discovering partitions for table $name.") - log.info(s"Discovering partitions for table $name.") - sqlContext.sql(s"ALTER TABLE $databaseName.$name RECOVER PARTITIONS") + // Note: RNGSEED is the RNG seed used by the data generator. Right now, it is fixed to 100. + val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else "" + val commands = Seq( + "bash", "-c", + s"cd $localToolsDir && ./dsdgen -table $name -filter Y -scale $scaleFactor -RNGSEED 100 $parallel") + println(commands) + commands.lines } } - def createTemporaryTable(location: String, format: String): Unit = { - println(s"Creating temporary table $name using data stored in $location.") - log.info(s"Creating temporary table $name using data stored in $location.") - sqlContext.read.format(format).load(location).registerTempTable(name) - } - - def analyzeTable(databaseName: String, analyzeColumns: Boolean = false): Unit = { - println(s"Analyzing table $name.") - log.info(s"Analyzing table $name.") - sqlContext.sql(s"ANALYZE TABLE $databaseName.$name COMPUTE STATISTICS") - if (analyzeColumns) { - val allColumns = fields.map(_.name).mkString(", ") - println(s"Analyzing table $name columns $allColumns.") - log.info(s"Analyzing table $name columns $allColumns.") - sqlContext.sql(s"ANALYZE TABLE $databaseName.$name COMPUTE STATISTICS FOR COLUMNS $allColumns") - } - } + generatedData.setName(s"$name, sf=$scaleFactor, strings") + generatedData } +} - def genData( - location: String, - format: String, - overwrite: Boolean, - partitionTables: Boolean, - clusterByPartitionColumns: Boolean, - filterOutNullPartitionValues: Boolean, - tableFilter: String = "", - numPartitions: Int = 100): Unit = { - var tablesToBeGenerated = if (partitionTables) { - tables - } else { - tables.map(_.nonPartitioned) - } - if (!tableFilter.isEmpty) { - tablesToBeGenerated = tablesToBeGenerated.filter(_.name == tableFilter) - if (tablesToBeGenerated.isEmpty) { - throw new RuntimeException("Bad table name filter: " + tableFilter) - } - } - - tablesToBeGenerated.foreach { table => - val tableLocation = s"$location/${table.name}" - table.genData(tableLocation, format, overwrite, clusterByPartitionColumns, - filterOutNullPartitionValues, numPartitions) - } - } - - def createExternalTables(location: String, format: String, databaseName: String, - overwrite: Boolean, discoverPartitions: Boolean, tableFilter: String = ""): Unit = { - - val filtered = if (tableFilter.isEmpty) { - tables - } else { - tables.filter(_.name == tableFilter) - } - - sqlContext.sql(s"CREATE DATABASE IF NOT EXISTS $databaseName") - filtered.foreach { table => - val tableLocation = s"$location/${table.name}" - table.createExternalTable(tableLocation, format, databaseName, overwrite, discoverPartitions) - } - sqlContext.sql(s"USE $databaseName") - println(s"The current database has been set to $databaseName.") - log.info(s"The current database has been set to $databaseName.") - } - - def createTemporaryTables(location: String, format: String, tableFilter: String = ""): Unit = { - val filtered = if (tableFilter.isEmpty) { - tables - } else { - tables.filter(_.name == tableFilter) - } - filtered.foreach { table => - val tableLocation = s"$location/${table.name}" - table.createTemporaryTable(tableLocation, format) - } - } - - def analyzeTables(databaseName: String, analyzeColumns: Boolean = false, tableFilter: String = ""): Unit = { - val filtered = if (tableFilter.isEmpty) { - tables - } else { - tables.filter(_.name == tableFilter) - } - filtered.foreach { table => - table.analyzeTable(databaseName, analyzeColumns) - } - } +class TPCDSTables( + sqlContext: SQLContext, + dsdgenDir: String, + scaleFactor: String, + useDoubleForDecimal: Boolean = false, + useStringForDate: Boolean = false) + extends Tables(sqlContext, scaleFactor, useDoubleForDecimal, useStringForDate) { + import sqlContext.implicits._ + val dataGenerator = new DSDGEN(dsdgenDir) val tables = Seq( Table("catalog_sales", partitionColumns = "cs_sold_date_sk" :: Nil, diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpch/TPCH.scala b/src/main/scala/com/databricks/spark/sql/perf/tpch/TPCH.scala new file mode 100644 index 0000000..bbd35d0 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/tpch/TPCH.scala @@ -0,0 +1,177 @@ +/* + * Copyright 2015 Databricks Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.spark.sql.perf.tpch +import scala.sys.process._ + +import com.databricks.spark.sql.perf.{Benchmark, DataGenerator, Table, Tables} +import com.databricks.spark.sql.perf.ExecutionMode.CollectResults +import org.apache.commons.io.IOUtils + +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext + +class DBGEN(dbgenDir: String, params: Seq[String]) extends DataGenerator { + val dbgen = s"$dbgenDir/dbgen" + def generate(sparkContext: SparkContext,name: String, partitions: Int, scaleFactor: String) = { + val smallTables = Seq("nation", "region") + val numPartitions = if (partitions > 1 && !smallTables.contains(name)) partitions else 1 + val generatedData = { + sparkContext.parallelize(1 to numPartitions, numPartitions).flatMap { i => + val localToolsDir = if (new java.io.File(dbgen).exists) { + dbgenDir + } else if (new java.io.File(s"/$dbgenDir").exists) { + s"/$dbgenDir" + } else { + sys.error(s"Could not find dbgen at $dbgen or /$dbgenDir. Run install") + } + val parallel = if (numPartitions > 1) s"-C $partitions -S $i" else "" + val shortTableNames = Map( + "customer" -> "c", + "lineitem" -> "L", + "nation" -> "n", + "orders" -> "O", + "part" -> "P", + "region" -> "r", + "supplier" -> "s", + "partsupp" -> "S" + ) + val paramsString = params.mkString(" ") + val commands = Seq( + "bash", "-c", + s"cd $localToolsDir && ./dbgen -q $paramsString -T ${shortTableNames(name)} -s $scaleFactor $parallel") + println(commands) + commands.lines + }.repartition(numPartitions) + } + + generatedData.setName(s"$name, sf=$scaleFactor, strings") + generatedData + } +} + +class TPCHTables( + sqlContext: SQLContext, + dbgenDir: String, + scaleFactor: String, + useDoubleForDecimal: Boolean = false, + useStringForDate: Boolean = false, + generatorParams: Seq[String] = Nil) + extends Tables(sqlContext, scaleFactor, useDoubleForDecimal, useStringForDate) { + import sqlContext.implicits._ + + val dataGenerator = new DBGEN(dbgenDir, generatorParams) + + val tables = Seq( + Table("part", + partitionColumns = Nil, + 'p_partkey.long, + 'p_name.string, + 'p_mfgr.string, + 'p_brand.string, + 'p_type.string, + 'p_size.int, + 'p_container.string, + 'p_retailprice.decimal(12, 2), + 'p_comment.string + ), + Table("supplier", + partitionColumns = Nil, + 's_suppkey.long, + 's_name.string, + 's_address.string, + 's_nationkey.long, + 's_phone.string, + 's_acctbal.decimal(12, 2), + 's_comment.string + ), + Table("partsupp", + partitionColumns = Nil, + 'ps_partkey.long, + 'ps_suppkey.long, + 'ps_availqty.int, + 'ps_supplycost.decimal(12, 2), + 'ps_comment.string + ), + Table("customer", + partitionColumns = Nil, + 'c_custkey.long, + 'c_name.string, + 'c_address.string, + 'c_nationkey.string, + 'c_phone.string, + 'c_acctbal.decimal(12, 2), + 'c_mktsegment.string, + 'c_comment.string + ), + Table("orders", + partitionColumns = Nil, + 'o_orderkey.long, + 'o_custkey.long, + 'o_orderstatus.string, + 'o_totalprice.decimal(12, 2), + 'o_orderdate.date, + 'o_orderpriority.string, + 'o_clerk.string, + 'o_shippriority.int, + 'o_comment.string + ), + Table("lineitem", + partitionColumns = Nil, + 'l_orderkey.long, + 'l_partkey.long, + 'l_suppkey.long, + 'l_linenumber.int, + 'l_quantity.decimal(12, 2), + 'l_extendedprice.decimal(12, 2), + 'l_discount.decimal(12, 2), + 'l_tax.decimal(12, 2), + 'l_returnflag.string, + 'l_linestatus.string, + 'l_shipdate.date, + 'l_commitdate.date, + 'l_receiptdate.date, + 'l_shipinstruct.string, + 'l_shipmode.string, + 'l_comment.string + ), + Table("nation", + partitionColumns = Nil, + 'n_nationkey.long, + 'n_name.string, + 'n_regionkey.long, + 'n_comment.string + ), + Table("region", + partitionColumns = Nil, + 'r_regionkey.long, + 'r_name.string, + 'r_comment.string + ) + ).map(_.convertTypes()) +} + +class TPCH(@transient sqlContext: SQLContext) + extends Benchmark(sqlContext) { + + val queries = (1 to 22).map { q => + val queryContent: String = IOUtils.toString( + getClass().getClassLoader().getResourceAsStream(s"tpch/queries/$q.sql")) + Query(s"Q$q", queryContent, description = "TPCH Query", + executionMode = CollectResults) + } + val queriesMap = queries.map(q => q.name.split("-").get(0) -> q).toMap +}