From 70da4f490e78496b0f7fa2b7a67f8aaa47bbf322 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sat, 16 May 2015 19:31:55 -0700 Subject: [PATCH] Move dataframe into benchmark. --- .../com/databricks/spark/sql/perf/query.scala | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/src/main/scala/com/databricks/spark/sql/perf/query.scala b/src/main/scala/com/databricks/spark/sql/perf/query.scala index d83fbb8..c7191a3 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/query.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/query.scala @@ -29,19 +29,6 @@ case class QueryForTest( val name = query.name - def dataFrame = sqlContext.sql(query.sqlText) - - def joinTypes = dataFrame.queryExecution.executedPlan.collect { - case k if k.nodeName contains "Join" => k.nodeName - } - - val tablesInvolved = dataFrame.queryExecution.logical collect { - case UnresolvedRelation(tableIdentifier, _) => { - // We are ignoring the database name. - tableIdentifier.last - } - } - def benchmarkMs[A](f: => A): Double = { val startTime = System.nanoTime() val ret = f @@ -52,6 +39,7 @@ case class QueryForTest( def benchmark(description: String = "") = { try { sparkContext.setJobDescription(s"Query: ${query.name}, $description") + val dataFrame = sqlContext.sql(query.sqlText) val queryExecution = dataFrame.queryExecution // We are not counting the time of ScalaReflection.convertRowToScala. val parsingTime = benchmarkMs { queryExecution.logical } @@ -78,6 +66,17 @@ case class QueryForTest( benchmarkMs { dataFrame.rdd.foreach {row => Unit } } } + val joinTypes = dataFrame.queryExecution.executedPlan.collect { + case k if k.nodeName contains "Join" => k.nodeName + } + + val tablesInvolved = dataFrame.queryExecution.logical collect { + case UnresolvedRelation(tableIdentifier, _) => { + // We are ignoring the database name. + tableIdentifier.last + } + } + BenchmarkResult( name = query.name, joinTypes = joinTypes, @@ -91,7 +90,7 @@ case class QueryForTest( } catch { case e: Exception => throw new RuntimeException( - s"Failed to benchmark query ${query.name}\n${dataFrame.queryExecution}", e) + s"Failed to benchmark query ${query.name}", e) } } }