Coalesce(n) instead of hardcoded (1) for large tables/partitions

When using `clusterByPartitionColumns` coalesce into multiple files (instead of hardcoded 1) when the number of records is larger than `spark.sql.files.maxRecordsPerFile`.  It has the cost of a count operation, but enables multiple writers for large scale factors.  This improves cluster utilization and reduces total generation time, in especial to non-partitioned tables (or with few partitions i.e., TPCH) and large scale factors.  The cost of count is leveraged as partitioning the data is needed anyway for the multiple writers, skipping a stage.
Additionaly updates deprecated `registerTempTable` to `createOrReplaceTempView` to avoid warnings.
This commit is contained in:
Nico Poggi 2018-10-16 11:21:05 +02:00 committed by GitHub
parent 3c1c9e9070
commit 0367ff65a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -179,7 +179,7 @@ abstract class Tables(sqlContext: SQLContext, scaleFactor: String,
val data = df(format != "text", numPartitions)
val tempTableName = s"${name}_text"
data.registerTempTable(tempTableName)
data.createOrReplaceTempView(tempTableName)
val writer = if (partitionColumns.nonEmpty) {
if (clusterByPartitionColumns) {
@ -211,9 +211,24 @@ abstract class Tables(sqlContext: SQLContext, scaleFactor: String,
data.write
}
} else {
// treat non-partitioned tables as "one partition" that we want to coalesce
if (clusterByPartitionColumns) {
// treat non-partitioned tables as "one partition" that we want to coalesce
data.coalesce(1).write
// in case data has more than maxRecordsPerFile, split into multiple writers to improve datagen speed
// files will be truncated to maxRecordsPerFile value, so the final result will be the same
val numRows = data.count
val maxRecordPerFile = util.Try(sqlContext.getConf("spark.sql.files.maxRecordsPerFile").toInt).getOrElse(0)
println(s"Data has $numRows rows clustered $clusterByPartitionColumns for $maxRecordPerFile")
log.info(s"Data has $numRows rows clustered $clusterByPartitionColumns for $maxRecordPerFile")
if (maxRecordPerFile > 0 && numRows > maxRecordPerFile) {
val numFiles = ((numRows)/maxRecordPerFile).ceil.toInt
println(s"Coalescing into $numFiles files")
log.info(s"Coalescing into $numFiles files")
data.coalesce(numFiles).write
} else {
data.coalesce(1).write
}
} else {
data.write
}
@ -251,7 +266,7 @@ abstract class Tables(sqlContext: SQLContext, scaleFactor: String,
def createTemporaryTable(location: String, format: String): Unit = {
println(s"Creating temporary table $name using data stored in $location.")
log.info(s"Creating temporary table $name using data stored in $location.")
sqlContext.read.format(format).load(location).registerTempTable(name)
sqlContext.read.format(format).load(location).createOrReplaceTempView(name)
}
def analyzeTable(databaseName: String, analyzeColumns: Boolean = false): Unit = {