diff --git a/src/main/notebooks/performance.dashboard.scala b/src/main/notebooks/performance.dashboard.scala new file mode 100644 index 0000000..3c12d20 --- /dev/null +++ b/src/main/notebooks/performance.dashboard.scala @@ -0,0 +1,255 @@ +// Databricks notebook source exported at Fri, 11 Sep 2015 06:27:08 UTC +import com.databricks.spark.sql.perf._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.expressions._ +import org.apache.spark.sql.types._ + +sqlContext.read.load("/databricks/spark/sql/sqlPerformanceCompact").registerTempTable("sqlPerformanceCompact") + +val timestampWindow = Window.partitionBy("sparkVersion").orderBy($"timestamp".desc) + +val normalizeVersion = udf((v: String) => v.stripSuffix("-SNAPSHOT")) + +class RecentRuns(prefix: String, inputFunction: DataFrame => DataFrame) { + val inputData = table("sqlPerformanceCompact") + .where($"tags.runtype" === "daily") + .withColumn("sparkVersion", normalizeVersion($"configuration.sparkVersion")) + .withColumn("startTime", from_unixtime($"timestamp" / 1000)) + .withColumn("database", $"tags.database") + .withColumn("jobname", $"tags.jobname") + + val recentRuns = inputFunction(inputData) + .withColumn("runId", denseRank().over(timestampWindow)) + .withColumn("runId", -$"runId") + .filter($"runId" >= -10) + + val baseData = recentRuns + .withColumn("result", explode($"results")) + .withColumn("day", concat(month($"startTime"), lit("-"), dayofmonth($"startTime"))) + .withColumn("runtimeSeconds", runtime / 1000) + .withColumn("queryName", $"result.name") + + baseData.registerTempTable(s"${prefix}_baseData") + + val smoothed = baseData + .where($"iteration" !== 1) + .groupBy("runId", "sparkVersion", "database", "queryName") + .agg(callUDF("percentile", $"runtimeSeconds".cast("long"), lit(0.5)).as('medianRuntimeSeconds)) + .orderBy($"runId", $"sparkVersion") + + smoothed.registerTempTable(s"${prefix}_smoothed") +} + +val tpcds1500 = new RecentRuns("tpcds1500", _.filter($"database" === "tpcds1500")) + +// COMMAND ---------- + +display(tpcds1500.recentRuns.select($"runId", $"sparkVersion", $"startTime", $"timestamp").distinct) + +// COMMAND ---------- + +class GeometricMean extends org.apache.spark.sql.expressions.UserDefinedAggregateFunction { + def bufferSchema: StructType = StructType( + StructField("count", LongType) :: + StructField("product", DoubleType) :: Nil + ) + + def dataType: DataType = DoubleType + + def deterministic: Boolean = true + + def evaluate(buffer: org.apache.spark.sql.Row): Any = { + math.pow(buffer.getDouble(1), 1.toDouble / buffer.getLong(0)) + } + + def initialize(buffer: org.apache.spark.sql.expressions.MutableAggregationBuffer): Unit = { + buffer(0) = 0L + buffer(1) = 1.0 + } + + def inputSchema: org.apache.spark.sql.types.StructType = + StructType(StructField("value", DoubleType) :: Nil) + + def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { + buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0) + buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1) + } + + def update(buffer: MutableAggregationBuffer,input: Row): Unit = { + buffer(0) = buffer.getAs[Long](0) + 1 + buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0) + } +} + +val gm = new GeometricMean +sqlContext.udf.register("gm", gm) + +// COMMAND ---------- + +// MAGIC %python spark_versions = ["1.6.0", "1.5.0", "1.4.1"] +// MAGIC +// MAGIC def plot_single(df, column, height = 30): +// MAGIC for v in spark_versions: +// MAGIC runtimes = df[df['sparkVersion'] == v][column] +// MAGIC padded = ([0.0] * (10 - len(runtimes))) + list(runtimes) +// MAGIC plt.plot(range(10), padded) +// MAGIC +// MAGIC plt.xticks(range(10), list(df[df['sparkVersion'] == v]['runId'])) +// MAGIC +// MAGIC plt.legend(spark_versions, loc='lower left') +// MAGIC plt.ylim([0,height]) + +// COMMAND ---------- + +// MAGIC %md ## TPCDS - 20 Queries, Total Runtime (Smoothed to reduce outliers) +// MAGIC - time for each query = median of 4 iterations +// MAGIC - dropping 1 warmup iteration + +// COMMAND ---------- + +// MAGIC %python +// MAGIC import matplotlib.pyplot as plt +// MAGIC import numpy as np +// MAGIC from pyspark.sql.functions import * +// MAGIC +// MAGIC df = table("tpcds1500_smoothed") \ +// MAGIC .groupBy("runId", "sparkVersion") \ +// MAGIC .agg(sum("medianRuntimeSeconds").alias("totalRunTime")) \ +// MAGIC .toPandas() +// MAGIC +// MAGIC fig = plt.figure(figsize=(12, 2)) +// MAGIC plot_single(df, "totalRunTime", height=1000) +// MAGIC display(fig) + +// COMMAND ---------- + +org.apache.spark.SPARK_VERSION + +// COMMAND ---------- + +// MAGIC %python df['sparkVersion'].unique() + +// COMMAND ---------- + +// MAGIC %md ## TPCDS - 20 Queries Geometric Mean +// MAGIC - time for each query = median of 4 iterations +// MAGIC - dropping 1 warmup iteration + +// COMMAND ---------- + +// MAGIC %python +// MAGIC import matplotlib.pyplot as plt +// MAGIC from pyspark.sql.functions import * +// MAGIC +// MAGIC df = table("tpcds1500_smoothed").groupBy("runId", "sparkVersion").agg(expr("gm(medianRuntimeSeconds) AS score")).toPandas() +// MAGIC +// MAGIC fig = plt.figure(figsize=(12, 2)) +// MAGIC plot_single(df, "score") +// MAGIC display(fig) + +// COMMAND ---------- + +// MAGIC %python +// MAGIC import matplotlib.pyplot as plt +// MAGIC from math import ceil, floor +// MAGIC +// MAGIC def plot_queries(df, queries): +// MAGIC num_queries = len(queries) +// MAGIC rows = int(ceil(num_queries / 2)) +// MAGIC +// MAGIC for idx, name in enumerate(queries): +// MAGIC data = df[df['queryName'] == name] +// MAGIC plt.subplot(rows, 2 , idx) +// MAGIC plt.title(name) +// MAGIC +// MAGIC +// MAGIC plot_single(data, 'medianRuntimeSeconds', 200) + +// COMMAND ---------- + +// MAGIC %md # Interactive Queries + +// COMMAND ---------- + +// MAGIC %python +// MAGIC fig = plt.figure(figsize=(15, 20)) +// MAGIC plot_queries(table("tpcds1500_smoothed").toPandas(), ["q19", "q42", "q52", "q55", "q63", "q68", "q73", "q98"]) +// MAGIC display(fig) + +// COMMAND ---------- + +// MAGIC %md #Reporting Queries + +// COMMAND ---------- + +// MAGIC %python +// MAGIC fig = plt.figure(figsize=(15, 20)) +// MAGIC plot_queries(table("tpcds1500_smoothed").toPandas(), ["q3","q7","q27","q43", "q53", "q89"]) +// MAGIC display(fig) + +// COMMAND ---------- + +// MAGIC %md # Deep Analytics + +// COMMAND ---------- + +// MAGIC %python +// MAGIC fig = plt.figure(figsize=(15, 20)) +// MAGIC plot_queries(table("tpcds1500_smoothed").toPandas(), ["q34", "q46", "q59", "q65", "q79", "ss_max"]) +// MAGIC display(fig) + +// COMMAND ---------- + +val joins = new RecentRuns("joins", _.filter($"jobname" === "joins.daily")) + +// COMMAND ---------- + +display(table("joins_smoothed").groupBy("runId", "sparkVersion").agg(sum("medianRuntimeSeconds").alias("runtime"))) + +// COMMAND ---------- + +// MAGIC %python +// MAGIC df = table("joins_smoothed").groupBy("runId", "sparkVersion").agg(sum("medianRuntimeSeconds").alias("runtime")).toPandas() +// MAGIC +// MAGIC fig = plt.figure(figsize=(12, 2)) +// MAGIC plot_single(df, "runtime", 2500) +// MAGIC display(fig) + +// COMMAND ---------- + +// MAGIC %python +// MAGIC df = table("joins_smoothed").groupBy("runId", "sparkVersion").agg(expr("gm(medianRuntimeSeconds) AS score")).toPandas() +// MAGIC +// MAGIC fig = plt.figure(figsize=(12, 2)) +// MAGIC plot_single(df, "score", 200) +// MAGIC display(fig) + +// COMMAND ---------- + +// MAGIC %python +// MAGIC fig = plt.figure(figsize=(15, 20)) +// MAGIC df = table("joins_smoothed").orderBy("queryName").toPandas() +// MAGIC plot_queries(df, list(df['queryName'].unique())) +// MAGIC display(fig) + +// COMMAND ---------- + +// MAGIC %md # Query Plan Breakdown + +// COMMAND ---------- + +val result = tpcds1500.baseData.where($"queryName" === getArgument("queryName", "ss_max")).groupBy("sparkVersion").agg(first($"result.queryExecution")).orderBy("sparkVersion").collect().map { + case Row(version: String, queryExecution: String) => + s""" + |

$version

+ |
$queryExecution
+ """.stripMargin + +}.mkString("
") + +displayHTML(result) + +// COMMAND ---------- +