Initial port.

This commit is contained in:
Yin Huai 2015-04-15 20:03:14 -07:00
parent e81669ab3b
commit 930751810e
18 changed files with 3228 additions and 0 deletions

19
.gitignore vendored Normal file
View File

@ -0,0 +1,19 @@
*.DS_Store
*.class
*.log
*.pyc
sbt/*.jar
.idea
.idea_modules
# sbt specific
build/*.jar
.cache/
.history/
.lib/
dist/*
target/
lib_managed/
src_managed/
project/boot/
project/plugins/project/

87
LICENSE Normal file
View File

@ -0,0 +1,87 @@
Apache License, Version 2.0
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files.
"Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work.
2. Grant of Copyright License.
Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form.
3. Grant of Patent License.
Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed.
4. Redistribution.
You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions:
You must give any other recipients of the Work or Derivative Works a copy of this License; and
You must cause any modified files to carry prominent notices stating that You changed the files; and
You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and
If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License.
You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License.
5. Submission of Contributions.
Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions.
6. Trademarks.
This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty.
Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License.
8. Limitation of Liability.
In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability.
While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work
To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,42 @@
## Spark SQL Performance Tests (WIP)
### TPC-DS
```
import com.databricks.spark.sql.perf.tpcds.TPCDS
import org.apache.spark.sql.parquet.Tables
// Tables used for TPC-DS.
val tables = Tables(sqlContext)
// Setup TPC-DS experiment
val tpcds =
new TPCDS (
sqlContext = sqlContext,
sparkVersion = "1.3.1",
dataLocation = <the location of data>,
dsdgenDir = <the location of dsdgen in every worker>,
resultsLocation = <the location of performance results>,
tables = tables.tables,
scaleFactor = "2",
collectResults = true)
tpcds.setupExperiment()
// Take a look at the size of every table.
tpcds.allStats.show
// Get all of the queries.
import com.databricks.spark.sql.perf.tpcds.Queries
// Just pick a single query as an example.
val oneQuery = Seq(Queries.q7Derived.head)
// Start the experiment.
val runningExp = tpcds.runExperiment(queries = oneQuery, iterations = 1)
// Get experiments results.
import com.databricks.spark.sql.perf.Results
val results = Results(resultsLocation = <the location of performance results>, sqlContext = sqlContext)
// This is all results.
val allResults = results.allResults
allResults.registerTempTable("results")
// This is the result for a single experiment started at the timestamp represented by 1429132621024 (2015-04-15 14:17:01.024).
allResults.filter("timestamp = 1429132621024")
```

24
build.sbt Normal file
View File

@ -0,0 +1,24 @@
// Your sbt build file. Guides on how to write one can be found at
// http://www.scala-sbt.org/0.13/docs/index.html
scalaVersion := "2.10.4"
sparkVersion := "1.3.0"
sparkPackageName := "databricks/spark-sql-perf"
// Don't forget to set the version
version := "0.0.1-SNAPSHOT"
// All Spark Packages need a license
licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0"))
// Add Spark components this package depends on, e.g, "mllib", ....
sparkComponents ++= Seq("sql", "hive")
// uncomment and change the value below to change the directory where your zip artifact will be created
// spDistDirectory := target.value
// add any sparkPackageDependencies using sparkPackageDependencies.
// e.g. sparkPackageDependencies += "databricks/spark-avro:0.1"

106
build/sbt Executable file
View File

@ -0,0 +1,106 @@
#!/usr/bin/env bash
# When creating new tests for Spark SQL Hive, the HADOOP_CLASSPATH must contain the hive jars so
# that we can run Hive to generate the golden answer. This is not required for normal development
# or testing.
for i in $HIVE_HOME/lib/*
do HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$i
done
export HADOOP_CLASSPATH
realpath () {
(
TARGET_FILE=$1
cd $(dirname $TARGET_FILE)
TARGET_FILE=$(basename $TARGET_FILE)
COUNT=0
while [ -L "$TARGET_FILE" -a $COUNT -lt 100 ]
do
TARGET_FILE=$(readlink $TARGET_FILE)
cd $(dirname $TARGET_FILE)
TARGET_FILE=$(basename $TARGET_FILE)
COUNT=$(($COUNT + 1))
done
echo $(pwd -P)/$TARGET_FILE
)
}
. $(dirname $(realpath $0))/sbt-launch-lib.bash
declare -r noshare_opts="-Dsbt.global.base=project/.sbtboot -Dsbt.boot.directory=project/.boot -Dsbt.ivy.home=project/.ivy"
declare -r sbt_opts_file=".sbtopts"
declare -r etc_sbt_opts_file="/etc/sbt/sbtopts"
usage() {
cat <<EOM
Usage: $script_name [options]
-h | -help print this message
-v | -verbose this runner is chattier
-d | -debug set sbt log level to debug
-no-colors disable ANSI color codes
-sbt-create start sbt even if current directory contains no sbt project
-sbt-dir <path> path to global settings/plugins directory (default: ~/.sbt)
-sbt-boot <path> path to shared boot directory (default: ~/.sbt/boot in 0.11 series)
-ivy <path> path to local Ivy repository (default: ~/.ivy2)
-mem <integer> set memory options (default: $sbt_mem, which is $(get_mem_opts $sbt_mem))
-no-share use all local caches; no sharing
-no-global uses global caches, but does not use global ~/.sbt directory.
-jvm-debug <port> Turn on JVM debugging, open at the given port.
-batch Disable interactive mode
# sbt version (default: from project/build.properties if present, else latest release)
-sbt-version <version> use the specified version of sbt
-sbt-jar <path> use the specified jar as the sbt launcher
-sbt-rc use an RC version of sbt
-sbt-snapshot use a snapshot version of sbt
# java version (default: java from PATH, currently $(java -version 2>&1 | grep version))
-java-home <path> alternate JAVA_HOME
# jvm options and output control
JAVA_OPTS environment variable, if unset uses "$java_opts"
SBT_OPTS environment variable, if unset uses "$default_sbt_opts"
.sbtopts if this file exists in the current directory, it is
prepended to the runner args
/etc/sbt/sbtopts if this file exists, it is prepended to the runner args
-Dkey=val pass -Dkey=val directly to the java runtime
-J-X pass option -X directly to the java runtime
(-J is stripped)
-S-X add -X to sbt's scalacOptions (-J is stripped)
-PmavenProfiles Enable a maven profile for the build.
In the case of duplicated or conflicting options, the order above
shows precedence: JAVA_OPTS lowest, command line options highest.
EOM
}
process_my_args () {
while [[ $# -gt 0 ]]; do
case "$1" in
-no-colors) addJava "-Dsbt.log.noformat=true" && shift ;;
-no-share) addJava "$noshare_opts" && shift ;;
-no-global) addJava "-Dsbt.global.base=$(pwd)/project/.sbtboot" && shift ;;
-sbt-boot) require_arg path "$1" "$2" && addJava "-Dsbt.boot.directory=$2" && shift 2 ;;
-sbt-dir) require_arg path "$1" "$2" && addJava "-Dsbt.global.base=$2" && shift 2 ;;
-debug-inc) addJava "-Dxsbt.inc.debug=true" && shift ;;
-batch) exec </dev/null && shift ;;
-sbt-create) sbt_create=true && shift ;;
*) addResidual "$1" && shift ;;
esac
done
# Now, ensure sbt version is used.
[[ "${sbt_version}XXX" != "XXX" ]] && addJava "-Dsbt.version=$sbt_version"
}
loadConfigFile() {
cat "$1" | sed '/^\#/d'
}
# if sbtopts files exist, prepend their contents to $@ so it can be processed by this runner
[[ -f "$etc_sbt_opts_file" ]] && set -- $(loadConfigFile "$etc_sbt_opts_file") "$@"
[[ -f "$sbt_opts_file" ]] && set -- $(loadConfigFile "$sbt_opts_file") "$@"
run "$@"

194
build/sbt-launch-lib.bash Executable file
View File

@ -0,0 +1,194 @@
#!/usr/bin/env bash
#
# A library to simplify using the SBT launcher from other packages.
# Note: This should be used by tools like giter8/conscript etc.
# TODO - Should we merge the main SBT script with this library?
if test -z "$HOME"; then
declare -r script_dir="$(dirname $script_path)"
else
declare -r script_dir="$HOME/.sbt"
fi
declare -a residual_args
declare -a java_args
declare -a scalac_args
declare -a sbt_commands
declare -a maven_profiles
if test -x "$JAVA_HOME/bin/java"; then
echo -e "Using $JAVA_HOME as default JAVA_HOME."
echo "Note, this will be overridden by -java-home if it is set."
declare java_cmd="$JAVA_HOME/bin/java"
else
declare java_cmd=java
fi
echoerr () {
echo 1>&2 "$@"
}
vlog () {
[[ $verbose || $debug ]] && echoerr "$@"
}
dlog () {
[[ $debug ]] && echoerr "$@"
}
acquire_sbt_jar () {
SBT_VERSION=`awk -F "=" '/sbt\\.version/ {print $2}' ./project/build.properties`
URL1=http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
URL2=http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
JAR=build/sbt-launch-${SBT_VERSION}.jar
sbt_jar=$JAR
if [[ ! -f "$sbt_jar" ]]; then
# Download sbt launch jar if it hasn't been downloaded yet
if [ ! -f ${JAR} ]; then
# Download
printf "Attempting to fetch sbt\n"
JAR_DL=${JAR}.part
if hash curl 2>/dev/null; then
(curl --progress-bar ${URL1} > ${JAR_DL} || curl --progress-bar ${URL2} > ${JAR_DL}) && mv ${JAR_DL} ${JAR}
elif hash wget 2>/dev/null; then
(wget --progress=bar ${URL1} -O ${JAR_DL} || wget --progress=bar ${URL2} -O ${JAR_DL}) && mv ${JAR_DL} ${JAR}
else
printf "You do not have curl or wget installed, please install sbt manually from http://www.scala-sbt.org/\n"
exit -1
fi
fi
if [ ! -f ${JAR} ]; then
# We failed to download
printf "Our attempt to download sbt locally to ${JAR} failed. Please install sbt manually from http://www.scala-sbt.org/\n"
exit -1
fi
printf "Launching sbt from ${JAR}\n"
fi
}
execRunner () {
# print the arguments one to a line, quoting any containing spaces
[[ $verbose || $debug ]] && echo "# Executing command line:" && {
for arg; do
if printf "%s\n" "$arg" | grep -q ' '; then
printf "\"%s\"\n" "$arg"
else
printf "%s\n" "$arg"
fi
done
echo ""
}
exec "$@"
}
addJava () {
dlog "[addJava] arg = '$1'"
java_args=( "${java_args[@]}" "$1" )
}
enableProfile () {
dlog "[enableProfile] arg = '$1'"
maven_profiles=( "${maven_profiles[@]}" "$1" )
export SBT_MAVEN_PROFILES="${maven_profiles[@]}"
}
addSbt () {
dlog "[addSbt] arg = '$1'"
sbt_commands=( "${sbt_commands[@]}" "$1" )
}
addResidual () {
dlog "[residual] arg = '$1'"
residual_args=( "${residual_args[@]}" "$1" )
}
addDebugger () {
addJava "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=$1"
}
# a ham-fisted attempt to move some memory settings in concert
# so they need not be dicked around with individually.
get_mem_opts () {
local mem=${1:-2048}
local perm=$(( $mem / 4 ))
(( $perm > 256 )) || perm=256
(( $perm < 4096 )) || perm=4096
local codecache=$(( $perm / 2 ))
echo "-Xms${mem}m -Xmx${mem}m -XX:MaxPermSize=${perm}m -XX:ReservedCodeCacheSize=${codecache}m"
}
require_arg () {
local type="$1"
local opt="$2"
local arg="$3"
if [[ -z "$arg" ]] || [[ "${arg:0:1}" == "-" ]]; then
die "$opt requires <$type> argument"
fi
}
is_function_defined() {
declare -f "$1" > /dev/null
}
process_args () {
while [[ $# -gt 0 ]]; do
case "$1" in
-h|-help) usage; exit 1 ;;
-v|-verbose) verbose=1 && shift ;;
-d|-debug) debug=1 && shift ;;
-ivy) require_arg path "$1" "$2" && addJava "-Dsbt.ivy.home=$2" && shift 2 ;;
-mem) require_arg integer "$1" "$2" && sbt_mem="$2" && shift 2 ;;
-jvm-debug) require_arg port "$1" "$2" && addDebugger $2 && shift 2 ;;
-batch) exec </dev/null && shift ;;
-sbt-jar) require_arg path "$1" "$2" && sbt_jar="$2" && shift 2 ;;
-sbt-version) require_arg version "$1" "$2" && sbt_version="$2" && shift 2 ;;
-java-home) require_arg path "$1" "$2" && java_cmd="$2/bin/java" && export JAVA_HOME=$2 && shift 2 ;;
-D*) addJava "$1" && shift ;;
-J*) addJava "${1:2}" && shift ;;
-P*) enableProfile "$1" && shift ;;
*) addResidual "$1" && shift ;;
esac
done
is_function_defined process_my_args && {
myargs=("${residual_args[@]}")
residual_args=()
process_my_args "${myargs[@]}"
}
}
run() {
# no jar? download it.
[[ -f "$sbt_jar" ]] || acquire_sbt_jar "$sbt_version" || {
# still no jar? uh-oh.
echo "Download failed. Obtain the sbt-launch.jar manually and place it at $sbt_jar"
exit 1
}
# process the combined args, then reset "$@" to the residuals
process_args "$@"
set -- "${residual_args[@]}"
argumentCount=$#
# run sbt
execRunner "$java_cmd" \
${SBT_OPTS:-$default_sbt_opts} \
$(get_mem_opts $sbt_mem) \
${java_opts} \
${java_args[@]} \
-jar "$sbt_jar" \
"${sbt_commands[@]}" \
"${residual_args[@]}"
}
runAlternateBoot() {
local bootpropsfile="$1"
shift
addJava "-Dsbt.boot.properties=$bootpropsfile"
run $@
}

2
project/build.properties Normal file
View File

@ -0,0 +1,2 @@
// This file should only contain the version of sbt to use.
sbt.version=0.13.6

9
project/plugins.sbt Normal file
View File

@ -0,0 +1,9 @@
// You may use this file to add plugin dependencies for sbt.
resolvers += "Spark Packages repo" at "https://dl.bintray.com/spark-packages/maven/"
resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/"
addSbtPlugin("org.spark-packages" %% "sbt-spark-package" % "0.1.1")
addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.6.0")

View File

@ -0,0 +1,50 @@
/*
* Copyright 2015 Databricks Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.databricks.spark.sql.perf.bigdata
import com.databricks.spark.sql.perf._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.parquet.TPCDSTableForTest
import org.apache.spark.sql.{Column, SQLContext}
class BigData (
@transient sqlContext: SQLContext,
sparkVersion: String,
dataLocation: String,
resultsLocation: String,
tables: Seq[Table],
scaleFactor: String,
collectResults: Boolean)
extends Experiment(
sqlContext,
sparkVersion,
dataLocation,
resultsLocation,
tables,
scaleFactor,
collectResults) with Serializable {
import sqlContext._
import sqlContext.implicits._
override val experiment = "bigDataBenchmark"
override def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] = {
tables.map(table =>
BigDataTableForTest(table, dataLocation, scaleFactor, sqlContext))
}
}

View File

@ -0,0 +1,128 @@
/*
* Copyright 2015 Databricks Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.databricks.spark.sql.perf.bigdata
import com.databricks.spark.sql.perf.Query
object Queries {
val queries1to3 = Seq(
Query("q1A",
"""
|SELECT
| pageURL,
| pageRank
|FROM rankings
|WHERE
| pageRank > 1000
""".stripMargin),
Query("q1B",
"""
|SELECT
| pageURL,
| pageRank
|FROM rankings
|WHERE
| pageRank > 100
""".stripMargin),
Query("q1C",
"""
|SELECT
| pageURL,
| pageRank
|FROM rankings
|WHERE
| pageRank > 10
""".stripMargin),
Query("q2A",
"""
|SELECT
| SUBSTR(sourceIP, 1, 8),
| SUM(adRevenue)
|FROM uservisits
|GROUP BY
| SUBSTR(sourceIP, 1, 8)
""".stripMargin),
Query("q2B",
"""
|SELECT
| SUBSTR(sourceIP, 1, 10),
| SUM(adRevenue)
|FROM uservisits
|GROUP BY
| SUBSTR(sourceIP, 1, 10)
""".stripMargin),
Query("q2C",
"""
|SELECT
| SUBSTR(sourceIP, 1, 12),
| SUM(adRevenue)
|FROM uservisits
|GROUP BY
| SUBSTR(sourceIP, 1, 12)
""".stripMargin),
Query("q3A",
"""
|SELECT sourceIP, totalRevenue, avgPageRank
|FROM
| (SELECT sourceIP,
| AVG(pageRank) as avgPageRank,
| SUM(adRevenue) as totalRevenue
| FROM Rankings AS R, UserVisits AS UV
| WHERE R.pageURL = UV.destURL
| AND UV.visitDate > "1980-01-01"
| AND UV.visitDate < "1980-04-01"
| GROUP BY UV.sourceIP) tmp
|ORDER BY totalRevenue DESC LIMIT 1
""".stripMargin),
Query("q3B",
"""
|SELECT sourceIP, totalRevenue, avgPageRank
|FROM
| (SELECT sourceIP,
| AVG(pageRank) as avgPageRank,
| SUM(adRevenue) as totalRevenue
| FROM Rankings AS R, UserVisits AS UV
| WHERE R.pageURL = UV.destURL
| AND UV.visitDate > "1980-01-01"
| AND UV.visitDate < "1983-01-01"
| GROUP BY UV.sourceIP) tmp
|ORDER BY totalRevenue DESC LIMIT 1
""".stripMargin),
Query("q3C",
"""
|SELECT sourceIP, totalRevenue, avgPageRank
|FROM
| (SELECT sourceIP,
| AVG(pageRank) as avgPageRank,
| SUM(adRevenue) as totalRevenue
| FROM Rankings AS R, UserVisits AS UV
| WHERE R.pageURL = UV.destURL
| AND UV.visitDate > "1980-01-01"
| AND UV.visitDate < "2010-01-01"
| GROUP BY UV.sourceIP) tmp
|ORDER BY totalRevenue DESC LIMIT 1
""".stripMargin)
)
}

View File

@ -0,0 +1,77 @@
/*
* Copyright 2015 Databricks Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.databricks.spark.sql.perf.bigdata
// This is a hack until parquet has better support for partitioning.
import java.text.SimpleDateFormat
import java.util.Date
import com.databricks.spark.sql.perf._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.hadoop.mapreduce.{Job, OutputCommitter, RecordWriter, TaskAttemptContext}
import org.apache.spark.SerializableWritable
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Column, ColumnName, SQLContext}
import parquet.hadoop.ParquetOutputFormat
import parquet.hadoop.util.ContextUtil
import scala.sys.process._
case class BigDataTableForTest(
table: Table,
baseDir: String,
scaleFactor: String,
@transient sqlContext: SQLContext)
extends TableForTest(table, baseDir, sqlContext) with Serializable {
@transient val sparkContext = sqlContext.sparkContext
override def generate(): Unit = ???
}
case class Tables(sqlContext: SQLContext) {
import sqlContext.implicits._
val tables = Seq(
Table("rankings",
UnpartitionedTable,
'pageURL .string,
'pageRank .int,
'avgDuration .int),
Table("uservisits",
UnpartitionedTable,
'sourceIP .string,
'destURL .string,
'visitDate .string,
'adRevenue .double,
'userAgent .string,
'countryCode .string,
'languageCode .string,
'searchWord .string,
'duration .int),
Table("documents",
UnpartitionedTable,
'line .string)
)
}

View File

@ -0,0 +1,25 @@
/*
* Copyright 2015 Databricks Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.databricks.spark.sql.perf
import org.apache.spark.sql.SQLContext
case class Results(resultsLocation: String, @transient sqlContext: SQLContext) {
def allResults =
sqlContext.jsonRDD(
sqlContext.sparkContext.textFile(s"$resultsLocation/*/"))
}

View File

@ -0,0 +1,78 @@
/*
* Copyright 2015 Databricks Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.databricks.spark.sql.perf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
case class Query(name: String, sqlText: String)
case class QueryForTest(
query: Query,
collectResults: Boolean,
@transient sqlContext: SQLContext) {
@transient val sparkContext = sqlContext.sparkContext
val name = query.name
def dataFrame = sqlContext.sql(query.sqlText)
def joinTypes = dataFrame.queryExecution.executedPlan.collect {
case k if k.nodeName contains "Join" => k.nodeName
}
val tablesInvolved = dataFrame.queryExecution.logical collect {
case UnresolvedRelation(tableIdentifier, _) => {
// We are ignoring the database name.
tableIdentifier.last
}
}
def benchmarkMs[A](f: => A): Double = {
val startTime = System.nanoTime()
val ret = f
val endTime = System.nanoTime()
(endTime - startTime).toDouble / 1000000
}
def benchmark(description: String = "") = {
try {
sparkContext.setJobDescription(s"Query: ${query.name}, $description")
val queryExecution = dataFrame.queryExecution
// We are not counting the time of ScalaReflection.convertRowToScala.
val execution = if (collectResults) {
benchmarkMs { queryExecution.toRdd.map(_.copy()).collect() }
} else {
benchmarkMs { queryExecution.toRdd.map(_.copy()).foreach {row => Unit } }
}
BenchmarkResult(
name = query.name,
joinTypes = joinTypes,
tables = tablesInvolved,
parsingTime = benchmarkMs { queryExecution.logical },
analysisTime = benchmarkMs { queryExecution.analyzed },
optimizationTime = benchmarkMs { queryExecution.optimizedPlan },
planningTime = benchmarkMs { queryExecution.executedPlan },
executionTime = execution)
} catch {
case e: Exception =>
throw new RuntimeException(
s"Failed to benchmark query ${query.name}\n${dataFrame.queryExecution}", e)
}
}
}

View File

@ -0,0 +1,213 @@
/*
* Copyright 2015 Databricks Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.databricks.spark.sql.perf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.sql.SQLContext
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
case class BenchmarkConfiguration(
sparkVersion: String,
scaleFactor: String,
useDecimal: Boolean,
sqlConf: Map[String, String],
sparkConf: Map[String,String],
cores: Int,
collectResults: Boolean)
case class BenchmarkResult(
name: String,
joinTypes: Seq[String],
tables: Seq[String],
parsingTime: Double,
analysisTime: Double,
optimizationTime: Double,
planningTime: Double,
executionTime: Double)
case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit)
case class ExperimentRun(
timestamp: Long,
experiment: String,
iteration: Int,
tags: Map[String, String],
configuration: BenchmarkConfiguration,
results: Seq[BenchmarkResult])
case class Benchmark(tables: Seq[Table])
abstract class Experiment(
@transient sqlContext: SQLContext,
sparkVersion: String,
dataLocation: String,
resultsLocation: String,
tables: Seq[Table],
scaleFactor: String,
collectResults: Boolean) extends Serializable {
val experiment: String
@transient val sparkContext = sqlContext.sparkContext
def createTablesForTest(tables: Seq[Table]): Seq[TableForTest]
val tablesForTest: Seq[TableForTest] = createTablesForTest(tables)
def checkData(): Unit = {
tablesForTest.foreach { table =>
val fs = FileSystem.get(new java.net.URI(table.outputDir), new Configuration())
val exists = fs.exists(new Path(table.outputDir))
val wasSuccessful = fs.exists(new Path(s"${table.outputDir}/_SUCCESS"))
if (!wasSuccessful) {
if (exists) {
println(s"Table '${table.name}' not generated successfully, regenerating.")
} else {
println(s"Table '${table.name}' does not exist, generating.")
}
fs.delete(new Path(table.outputDir), true)
table.generate()
} else {
println(s"Table ${table.name} already exists.")
}
}
}
def allStats = tablesForTest.map(_.stats).reduceLeft(_.unionAll(_))
def setupExperiment(): Unit = {
checkData()
tablesForTest.foreach(_.createTempTable())
}
def currentConfiguration = BenchmarkConfiguration(
sparkVersion = sparkVersion,
scaleFactor = scaleFactor,
useDecimal = true,
sqlConf = sqlContext.getAllConfs,
sparkConf = sparkContext.getConf.getAll.toMap,
cores = sparkContext.defaultMinPartitions,
collectResults = collectResults)
def runExperiment(
queries: Seq[Query],
iterations: Int = 3,
variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("")) { _ => {} }),
tags: Map[String, String] = Map.empty) = {
val queriesToRun = queries.map(query => QueryForTest(query, collectResults, sqlContext))
class ExperimentStatus {
val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]()
val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]()
val currentMessages = new collection.mutable.ArrayBuffer[String]()
@volatile
var currentQuery = ""
def cartesianProduct[T](xss: List[List[T]]): List[List[T]] = xss match {
case Nil => List(Nil)
case h :: t => for(xh <- h; xt <- cartesianProduct(t)) yield xh :: xt
}
val timestamp = System.currentTimeMillis()
val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList)
val resultsFuture = future {
val results = (1 to iterations).flatMap { i =>
combinations.map { setup =>
val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map {
case (v, idx) =>
v.setup(v.options(idx))
v.name -> v.options(idx).toString
}
val result = ExperimentRun(
timestamp = timestamp,
experiment = experiment,
iteration = i,
tags = currentOptions.toMap ++ tags,
configuration = currentConfiguration,
queriesToRun.flatMap { q =>
val setup = s"iteration: $i, ${currentOptions.map { case (k, v) => s"$k=$v"}.mkString(", ")}"
currentMessages += s"Running query ${q.name} $setup"
currentQuery = q.name
val singleResult = try q.benchmark(setup) :: Nil catch {
case e: Exception =>
currentMessages += s"Failed to run query ${q.name}: $e"
Nil
}
currentResults ++= singleResult
singleResult
})
currentRuns += result
result
}
}
val resultsTable = sqlContext.createDataFrame(results)
currentMessages += s"Results stored to: $resultsLocation/$timestamp"
resultsTable.toJSON.coalesce(1).saveAsTextFile(s"$resultsLocation/$timestamp")
resultsTable
}
/** Returns results from an actively running experiment. */
def getCurrentResults() = {
val tbl = sqlContext.createDataFrame(currentResults)
tbl.registerTempTable("currentResults")
tbl
}
/** Returns full iterations from an actively running experiment. */
def getCurrentRuns() = {
val tbl = sqlContext.createDataFrame(currentRuns)
tbl.registerTempTable("currentRuns")
tbl
}
def tail(n: Int = 5) = {
currentMessages.takeRight(n).mkString("\n")
}
def status =
if (resultsFuture.isCompleted) {
if (resultsFuture.value.get.isFailure) "Failed" else "Successful"
} else {
"Running"
}
override def toString =
s"""
|=== $status Experiment ===
|Permalink: table("allResults").where('timestamp === ${timestamp}L)
|Queries: ${queriesToRun.map(_.name).map(n => if(n == currentQuery) s"|$n|" else n).mkString(" ")}
|Iterations complete: ${currentRuns.size / combinations.size} / $iterations
|Queries run: ${currentResults.size} / ${iterations * combinations.size * queriesToRun.size}
|Run time: ${(System.currentTimeMillis() - timestamp) / 1000}s
|
|== Logs ==
|${tail()}
""".stripMargin
}
new ExperimentStatus
}
}

View File

@ -0,0 +1,72 @@
/*
* Copyright 2015 Databricks Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.databricks.spark.sql.perf
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext, RecordWriter, Job}
import org.apache.spark.SerializableWritable
import org.apache.spark.sql.{SQLContext, Column}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveMetastoreTypes
import org.apache.spark.sql.types._
import parquet.hadoop.ParquetOutputFormat
import parquet.hadoop.util.ContextUtil
abstract class TableType
case object UnpartitionedTable extends TableType
case class PartitionedTable(partitionColumn: String) extends TableType
case class Table(name: String, tableType: TableType, fields: StructField*)
abstract class TableForTest(
table: Table,
baseDir: String,
@transient sqlContext: SQLContext) extends Serializable {
val schema = StructType(table.fields)
val name = table.name
val outputDir = s"$baseDir/parquet/${name}"
def fromCatalog = sqlContext.table(name)
def stats =
fromCatalog.select(
lit(name) as "tableName",
count("*") as "numRows",
lit(fromCatalog.queryExecution.optimizedPlan.statistics.sizeInBytes.toLong) as "sizeInBytes")
def createTempTable(): Unit = {
sqlContext.sql(
s"""
|CREATE TEMPORARY TABLE ${name}
|USING org.apache.spark.sql.parquet
|OPTIONS (
| path '${outputDir}'
|)
""".stripMargin)
}
def generate(): Unit
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,72 @@
/*
* Copyright 2015 Databricks Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.databricks.spark.sql.perf.tpcds
import com.databricks.spark.sql.perf._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.parquet.TPCDSTableForTest
import org.apache.spark.sql.{Column, SQLContext}
class TPCDS (
@transient sqlContext: SQLContext,
sparkVersion: String,
dataLocation: String,
dsdgenDir: String,
resultsLocation: String,
tables: Seq[Table],
scaleFactor: String,
collectResults: Boolean)
extends Experiment(
sqlContext,
sparkVersion,
dataLocation,
resultsLocation,
tables,
scaleFactor,
collectResults) with Serializable {
import sqlContext._
import sqlContext.implicits._
override val experiment = "tpcds"
def baseDir = s"$dataLocation/scaleFactor=$scaleFactor/useDecimal=true"
override def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] = {
tables.map(table =>
TPCDSTableForTest(table, baseDir, scaleFactor.toInt, dsdgenDir, sqlContext))
}
override def setupExperiment(): Unit = {
super.setupExperiment()
setupBroadcast()
}
def setupBroadcast(skipTables: Seq[String] = Seq("store_sales", "customer")) = {
val skipExpr = skipTables.map(t => !('tableName === t)).reduceLeft[Column](_ && _)
val threshold =
allStats
.where(skipExpr)
.select(max('sizeInBytes))
.first()
.getLong(0)
val setQuery = s"SET spark.sql.autoBroadcastJoinThreshold=$threshold"
println(setQuery)
sql(setQuery)
}
}

View File

@ -0,0 +1,410 @@
/*
* Copyright 2015 Databricks Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.parquet // This is a hack until parquet has better support for partitioning.
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import scala.sys.process._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import com.databricks.spark.sql.perf._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext, RecordWriter, Job}
import org.apache.spark.SerializableWritable
import org.apache.spark.sql.{Column, ColumnName, SQLContext}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import parquet.hadoop.ParquetOutputFormat
import parquet.hadoop.util.ContextUtil
case class TPCDSTableForTest(
table: Table,
baseDir: String,
scaleFactor: Int,
dsdgenDir: String,
@transient sqlContext: SQLContext,
maxRowsPerPartitions: Int = 20 * 1000 * 1000)
extends TableForTest(table, baseDir, sqlContext) with Serializable with SparkHadoopMapReduceUtil {
@transient val sparkContext = sqlContext.sparkContext
val dsdgen = s"$dsdgenDir/dsdgen"
override def generate(): Unit = {
val partitions = table.tableType match {
case PartitionedTable(_) => scaleFactor
case _ => 1
}
val generatedData = {
sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
val localToolsDir = if (new java.io.File(dsdgen).exists) {
dsdgenDir
} else if (new java.io.File(s"/$dsdgen").exists) {
s"/$dsdgenDir"
} else {
sys.error(s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install")
}
val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
val commands = Seq(
"bash", "-c",
s"cd $localToolsDir && ./dsdgen -table ${table.name} -filter Y -scale $scaleFactor $parallel")
println(commands)
commands.lines
}
}
generatedData.setName(s"${table.name}, sf=$scaleFactor, strings")
val rows = generatedData.mapPartitions { iter =>
val currentRow = new GenericMutableRow(schema.fields.size)
iter.map { l =>
(0 until schema.fields.length).foreach(currentRow.setNullAt)
l.split("\\|", -1).zipWithIndex.dropRight(1).foreach { case (f, i) => currentRow(i) = f}
currentRow: Row
}
}
val stringData =
sqlContext.createDataFrame(
rows,
StructType(schema.fields.map(f => StructField(f.name, StringType))))
val convertedData = {
val columns = schema.fields.map { f =>
val columnName = new ColumnName(f.name)
columnName.cast(f.dataType).as(f.name)
}
stringData.select(columns: _*)
}
table.tableType match {
// This is an awful hack... spark sql parquet should support this natively.
case PartitionedTable(partitioningColumn) =>
sqlContext.setConf("spark.sql.planner.externalSort", "true")
val output = convertedData.queryExecution.analyzed.output
val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
val writeSupport =
if (schema.fields.map(_.dataType).forall(_.isPrimitive)) {
classOf[org.apache.spark.sql.parquet.MutableRowWriteSupport]
} else {
classOf[org.apache.spark.sql.parquet.RowWriteSupport]
}
ParquetOutputFormat.setWriteSupportClass(job, writeSupport)
val conf = new SerializableWritable(ContextUtil.getConfiguration(job))
org.apache.spark.sql.parquet.RowWriteSupport.setSchema(schema.toAttributes, conf.value)
val partColumnAttr =
BindReferences.bindReference[Expression](
output.find(_.name == partitioningColumn).get,
output)
// TODO: clusterBy would be faster than orderBy
val orderedConvertedData =
convertedData.filter(new Column(partitioningColumn) isNotNull).orderBy(Column(partitioningColumn) asc)
orderedConvertedData.queryExecution.toRdd.foreachPartition { iter =>
var writer: RecordWriter[Void, Row] = null
val getPartition = new InterpretedMutableProjection(Seq(partColumnAttr))
var currentPartition: Row = null
var hadoopContext: TaskAttemptContext = null
var committer: OutputCommitter = null
var rowCount = 0
var partition = 0
while (iter.hasNext) {
val currentRow = iter.next()
rowCount += 1
if (rowCount >= maxRowsPerPartitions) {
rowCount = 0
partition += 1
println(s"Starting partition $partition")
if (writer != null) {
writer.close(hadoopContext)
committer.commitTask(hadoopContext)
}
writer = null
}
if ((getPartition(currentRow) != currentPartition || writer == null) &&
!getPartition.currentValue.isNullAt(0)) {
rowCount = 0
currentPartition = getPartition.currentValue.copy()
if (writer != null) {
writer.close(hadoopContext)
committer.commitTask(hadoopContext)
}
val job = new Job(conf.value)
val keyType = classOf[Void]
job.setOutputKeyClass(keyType)
job.setOutputValueClass(classOf[Row])
NewFileOutputFormat.setOutputPath(
job,
new Path(s"$outputDir/$partitioningColumn=${currentPartition(0)}"))
val wrappedConf = new SerializableWritable(job.getConfiguration)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = partition
val attemptNumber = 1
/* "reduce task" <split #> <attempt # = spark task #> */
val attemptId = newTaskAttemptID(jobtrackerID, partition, isMap = false, partition, attemptNumber)
hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
val format = new ParquetOutputFormat[Row]
committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
writer = format.getRecordWriter(hadoopContext)
}
if (!getPartition.currentValue.isNullAt(0)) {
writer.write(null, currentRow)
}
}
if (writer != null) {
writer.close(hadoopContext)
committer.commitTask(hadoopContext)
}
}
val fs = FileSystem.get(new java.net.URI(outputDir), new Configuration())
fs.create(new Path(s"$outputDir/_SUCCESS")).close()
case _ => convertedData.saveAsParquetFile(outputDir)
}
}
}
case class Tables(sqlContext: SQLContext) {
import sqlContext.implicits._
val tables = Seq(
/* This is another large table that we don't build yet.
Table("inventory",
PartitionedTable("inv_date_sk"),
'inv_date_sk .int,
'inv_item_sk .int,
'inv_warehouse_sk .int,
'inv_quantity_on_hand .int),*/
Table("store_sales",
PartitionedTable("ss_sold_date_sk"),
'ss_sold_date_sk .int,
'ss_sold_time_sk .int,
'ss_item_sk .int,
'ss_customer_sk .int,
'ss_cdemo_sk .int,
'ss_hdemo_sk .int,
'ss_addr_sk .int,
'ss_store_sk .int,
'ss_promo_sk .int,
'ss_ticket_number .int,
'ss_quantity .int,
'ss_wholesale_cost .decimal(7,2),
'ss_list_price .decimal(7,2),
'ss_sales_price .decimal(7,2),
'ss_ext_discount_amt .decimal(7,2),
'ss_ext_sales_price .decimal(7,2),
'ss_ext_wholesale_cost.decimal(7,2),
'ss_ext_list_price .decimal(7,2),
'ss_ext_tax .decimal(7,2),
'ss_coupon_amt .decimal(7,2),
'ss_net_paid .decimal(7,2),
'ss_net_paid_inc_tax .decimal(7,2),
'ss_net_profit .decimal(7,2)),
Table("customer",
UnpartitionedTable,
'c_customer_sk .int,
'c_customer_id .string,
'c_current_cdemo_sk .int,
'c_current_hdemo_sk .int,
'c_current_addr_sk .int,
'c_first_shipto_date_sk .int,
'c_first_sales_date_sk .int,
'c_salutation .string,
'c_first_name .string,
'c_last_name .string,
'c_preferred_cust_flag .string,
'c_birth_day .int,
'c_birth_month .int,
'c_birth_year .int,
'c_birth_country .string,
'c_login .string,
'c_email_address .string,
'c_last_review_date .string),
Table("customer_address",
UnpartitionedTable,
'ca_address_sk .int,
'ca_address_id .string,
'ca_street_number .string,
'ca_street_name .string,
'ca_street_type .string,
'ca_suite_number .string,
'ca_city .string,
'ca_county .string,
'ca_state .string,
'ca_zip .string,
'ca_country .string,
'ca_gmt_offset .decimal(5,2),
'ca_location_type .string),
Table("customer_demographics",
UnpartitionedTable,
'cd_demo_sk .int,
'cd_gender .string,
'cd_marital_status .string,
'cd_education_status .string,
'cd_purchase_estimate .int,
'cd_credit_rating .string,
'cd_dep_count .int,
'cd_dep_employed_count .int,
'cd_dep_college_count .int),
Table("date_dim",
UnpartitionedTable,
'd_date_sk .int,
'd_date_id .string,
'd_date .string,
'd_month_seq .int,
'd_week_seq .int,
'd_quarter_seq .int,
'd_year .int,
'd_dow .int,
'd_moy .int,
'd_dom .int,
'd_qoy .int,
'd_fy_year .int,
'd_fy_quarter_seq .int,
'd_fy_week_seq .int,
'd_day_name .string,
'd_quarter_name .string,
'd_holiday .string,
'd_weekend .string,
'd_following_holiday .string,
'd_first_dom .int,
'd_last_dom .int,
'd_same_day_ly .int,
'd_same_day_lq .int,
'd_current_day .string,
'd_current_week .string,
'd_current_month .string,
'd_current_quarter .string,
'd_current_year .string),
Table("household_demographics",
UnpartitionedTable,
'hd_demo_sk .int,
'hd_income_band_sk .int,
'hd_buy_potential .string,
'hd_dep_count .int,
'hd_vehicle_count .int),
Table("item",
UnpartitionedTable,
'i_item_sk .int,
'i_item_id .string,
'i_rec_start_date .string,
'i_rec_end_date .string,
'i_item_desc .string,
'i_current_price .decimal(7,2),
'i_wholesale_cost .decimal(7,2),
'i_brand_id .int,
'i_brand .string,
'i_class_id .int,
'i_class .string,
'i_category_id .int,
'i_category .string,
'i_manufact_id .int,
'i_manufact .string,
'i_size .string,
'i_formulation .string,
'i_color .string,
'i_units .string,
'i_container .string,
'i_manager_id .int,
'i_product_name .string),
Table("promotion",
UnpartitionedTable,
'p_promo_sk .int,
'p_promo_id .string,
'p_start_date_sk .int,
'p_end_date_sk .int,
'p_item_sk .int,
'p_cost .decimal(15,2),
'p_response_target .int,
'p_promo_name .string,
'p_channel_dmail .string,
'p_channel_email .string,
'p_channel_catalog .string,
'p_channel_tv .string,
'p_channel_radio .string,
'p_channel_press .string,
'p_channel_event .string,
'p_channel_demo .string,
'p_channel_details .string,
'p_purpose .string,
'p_discount_active .string),
Table("store",
UnpartitionedTable,
's_store_sk .int,
's_store_id .string,
's_rec_start_date .string,
's_rec_end_date .string,
's_closed_date_sk .int,
's_store_name .string,
's_number_employees .int,
's_floor_space .int,
's_hours .string,
's_manager .string,
's_market_id .int,
's_geography_class .string,
's_market_desc .string,
's_market_manager .string,
's_division_id .int,
's_division_name .string,
's_company_id .int,
's_company_name .string,
's_street_number .string,
's_street_name .string,
's_street_type .string,
's_suite_number .string,
's_city .string,
's_county .string,
's_state .string,
's_zip .string,
's_country .string,
's_gmt_offset .decimal(5,2),
's_tax_precentage .decimal(5,2)),
Table("time_dim",
UnpartitionedTable,
't_time_sk .int,
't_time_id .string,
't_time .int,
't_hour .int,
't_minute .int,
't_second .int,
't_am_pm .string,
't_shift .string,
't_sub_shift .string,
't_meal_time .string))
}