Update readme and register temp tables.

This commit is contained in:
Yin Huai 2015-08-20 14:50:53 -07:00
parent edb4daba80
commit 97093a45cd
2 changed files with 34 additions and 12 deletions

View File

@ -11,10 +11,19 @@ The rest of document will use TPC-DS benchmark as an example. We will add conten
Before running any query, a dataset needs to be setup by creating a `Benchmark` object.
```
import org.apache.spark.sql.parquet.Tables
import com.databricks.spark.sql.perf.tpcds.Tables
// Tables in TPC-DS benchmark used by experiments.
val tables = Tables(sqlContext)
// dsdgenDir is the location of dsdgen tool installed in your machines.
val tables = Tables(sqlContext, dsdgenDir, scaleFactor)
// Generate data.
tables.genData(location, format, overwrite)
// Create metastore tables in a specified database for your data.
// Once tables are created, the current database will be switched to the specified database.
tables.createExternalTables(location, format, databaseName, overwrite)
// Or, if you want to create temporary tables
tables.createTemporaryTables(location, format)
// Setup TPC-DS experiment
import com.databricks.spark.sql.perf.tpcds.TPCDS
val tpcds = new TPCDS (sqlContext = sqlContext)
```
@ -31,11 +40,9 @@ For every experiment run (i.e.\ every call of `runExperiment`), Spark SQL Perf w
While the experiment is running you can use `experiment.html` to list the status. Once the experiment is complete, the results will be saved to the table sqlPerformance in json.
```
// Get experiments results.
import com.databricks.spark.sql.perf.Results
val results = Results(resultsLocation = <the root location of performance results>, sqlContext = sqlContext)
// Get the DataFrame representing all results stored in the dir specified by resultsLocation.
val allResults = results.allResults
// Use DataFrame API to get results of a single run.
allResults.filter("timestamp = 1429132621024")
// Get all experiments results.
tpcds.createResultsTable()
sqlContext.sql("sqlPerformance")
// Get the result of a particular run by specifying the timestamp of that run.
sqlContext.sql("sqlPerformance").filter("timestamp = 1429132621024")
```

View File

@ -100,17 +100,22 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend
withPartitionColumns.save(location)
}
def createExternalTables(location: String, format: String, databaseName: String, overwrite: Boolean): Unit = {
def createExternalTable(location: String, format: String, databaseName: String, overwrite: Boolean): Unit = {
val qualifiedTableName = databaseName + "." + name
val tableExists = sqlContext.tableNames(databaseName).contains(name)
if (overwrite) {
sqlContext.sql(s"DROP TABLE IF EXISTS $databaseName.$name")
}
if (!tableExists || overwrite) {
logInfo(s"Creating external table $name in database $databaseName.")
logInfo(s"Creating external table $name in database $databaseName using data stored in $location.")
sqlContext.createExternalTable(qualifiedTableName, location, format)
}
}
def createTemporaryTable(location: String, format: String): Unit = {
logInfo(s"Creating temporary table $name using data stored in $location.")
sqlContext.read.format(format).load(location).registerTempTable(name)
}
}
def genData(location: String, format: String, overwrite: Boolean): Unit = {
@ -126,7 +131,17 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend
tables.foreach { table =>
val tableLocation =
location + File.separator + format + File.separator + "sf" + scaleFactor + File.separator + table.name
table.createExternalTables(tableLocation, format, databaseName, overwrite)
table.createExternalTable(tableLocation, format, databaseName, overwrite)
}
sqlContext.sql(s"USE $databaseName")
logInfo(s"The current database has been set to $databaseName.")
}
def createTemporaryTables(location: String, format: String): Unit = {
tables.foreach { table =>
val tableLocation =
location + File.separator + format + File.separator + "sf" + scaleFactor + File.separator + table.name
table.createTemporaryTable(tableLocation, format)
}
}