Move dataframe into benchmark.

This commit is contained in:
Yin Huai 2015-05-16 19:31:55 -07:00
parent 9156e14f4b
commit 70da4f490e

View File

@ -29,19 +29,6 @@ case class QueryForTest(
val name = query.name
def dataFrame = sqlContext.sql(query.sqlText)
def joinTypes = dataFrame.queryExecution.executedPlan.collect {
case k if k.nodeName contains "Join" => k.nodeName
}
val tablesInvolved = dataFrame.queryExecution.logical collect {
case UnresolvedRelation(tableIdentifier, _) => {
// We are ignoring the database name.
tableIdentifier.last
}
}
def benchmarkMs[A](f: => A): Double = {
val startTime = System.nanoTime()
val ret = f
@ -52,6 +39,7 @@ case class QueryForTest(
def benchmark(description: String = "") = {
try {
sparkContext.setJobDescription(s"Query: ${query.name}, $description")
val dataFrame = sqlContext.sql(query.sqlText)
val queryExecution = dataFrame.queryExecution
// We are not counting the time of ScalaReflection.convertRowToScala.
val parsingTime = benchmarkMs { queryExecution.logical }
@ -78,6 +66,17 @@ case class QueryForTest(
benchmarkMs { dataFrame.rdd.foreach {row => Unit } }
}
val joinTypes = dataFrame.queryExecution.executedPlan.collect {
case k if k.nodeName contains "Join" => k.nodeName
}
val tablesInvolved = dataFrame.queryExecution.logical collect {
case UnresolvedRelation(tableIdentifier, _) => {
// We are ignoring the database name.
tableIdentifier.last
}
}
BenchmarkResult(
name = query.name,
joinTypes = joinTypes,
@ -91,7 +90,7 @@ case class QueryForTest(
} catch {
case e: Exception =>
throw new RuntimeException(
s"Failed to benchmark query ${query.name}\n${dataFrame.queryExecution}", e)
s"Failed to benchmark query ${query.name}", e)
}
}
}