From 00aa49e8e4ced6221a8ebf0b2de396639e1ccbf3 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 20 Aug 2015 16:46:12 -0700 Subject: [PATCH] Add support for CPU Profiling. --- build.sbt | 15 ++- .../databricks/spark/sql/perf/Benchmark.scala | 25 +++- .../spark/sql/perf/CpuProfile.scala | 125 ++++++++++++++++++ 3 files changed, 162 insertions(+), 3 deletions(-) create mode 100644 src/main/scala/com/databricks/spark/sql/perf/CpuProfile.scala diff --git a/build.sbt b/build.sbt index cbc582a..679e1a1 100644 --- a/build.sbt +++ b/build.sbt @@ -10,6 +10,17 @@ version := "0.0.4-SNAPSHOT" // All Spark Packages need a license licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0")) -sparkVersion := "1.4.0" +sparkVersion := "1.4.1" -sparkComponents ++= Seq("sql", "hive") \ No newline at end of file +sparkComponents ++= Seq("sql", "hive") + +initialCommands in console := + """ + |import org.apache.spark.sql.hive.test.TestHive + |import TestHive.implicits + |import TestHive.sql + """.stripMargin + +libraryDependencies += "com.twitter" %% "util-jvm" % "6.23.0" % "provided" + +libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.1" % "test" \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala index c73c8b2..ff4d645 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -25,7 +25,8 @@ import scala.concurrent.ExecutionContext.Implicits.global import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.functions._ + +import com.databricks.spark.sql.perf.cpu._ /** * A collection of queries that test a particular aspect of Spark SQL. @@ -204,6 +205,28 @@ abstract class Benchmark( } } + def scheduleCpuCollection(fs: FS) = resultsFuture.onComplete { _ => + currentMessages += s"Begining CPU log collection" + try { + val location = cpu.collectLogs(sqlContext, fs, timestamp) + currentMessages += s"cpu results recorded to $location" + } catch { + case e: Throwable => + currentMessages += s"Error collecting logs: $e" + throw e + } + } + + def cpuProfile = new Profile(sqlContext, sqlContext.read.json(getCpuLocation(timestamp))) + + def cpuProfileHtml(fs: FS) = { + s""" + |

CPU Profile

+ |Permalink: sqlContext.read.json("${getCpuLocation(timestamp)}")
+ |${cpuProfile.buildGraph(fs)} + """.stripMargin + } + /** Waits for the finish of the experiment. */ def waitForFinish(timeoutInSeconds: Int) = { Await.result(resultsFuture, timeoutInSeconds.seconds) diff --git a/src/main/scala/com/databricks/spark/sql/perf/CpuProfile.scala b/src/main/scala/com/databricks/spark/sql/perf/CpuProfile.scala new file mode 100644 index 0000000..fbb7278 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/CpuProfile.scala @@ -0,0 +1,125 @@ +/* + * Copyright 2015 Databricks Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.spark.sql.perf + +import java.io.{FileOutputStream, File} + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.{DataFrame, SQLContext, Row} +import org.apache.spark.sql.functions._ + +import scala.language.reflectiveCalls +import scala.sys.process._ + +import org.apache.hadoop.fs.{FileSystem, Path} + +import com.twitter.jvm.CpuProfile + +/** + * A collection of utilities for parsing stacktraces that have been recorded in JSON and generating visualizations + * on where time is being spent. + */ +package object cpu { + + type FS = { + def cp(from: String, to: String, recurse: Boolean): Boolean + def rm(dir: String, recurse: Boolean): Boolean + } + + private val resultsLocation = "/spark/sql/cpu" + + lazy val pprof = { + run( + "sudo apt-get install -y graphviz", + "cp /dbfs/home/michael/pprof ./", + "chmod 755 pprof") + + "./pprof" + } + + def getCpuLocation(timestamp: Long) = s"$resultsLocation/timestamp=$timestamp" + + def collectLogs(sqlContext: SQLContext, fs: FS, timestamp: Long): String = { + import sqlContext.implicits._ + + def sc = sqlContext.sparkContext + + def copyLogFiles() = { + val path = "pwd".!!.trim + val hostname = "hostname".!!.trim + + val conf = new Configuration() + val fs = FileSystem.get(conf) + fs.copyFromLocalFile(new Path(s"$path/logs/cpu.json"), new Path(s"$resultsLocation/timestamp=$timestamp/$hostname")) + } + + fs.rm(getCpuLocation(timestamp), true) + + copyLogFiles() + sc.parallelize((1 to 100)).foreach { i => copyLogFiles() } + getCpuLocation(timestamp) + } + + def run(cmds: String*) = { + val output = new StringBuilder + + def append(line: String): Unit = output.synchronized { + println(line) + output.append(line) + output.append("\n") + } + + val processLogger = ProcessLogger(append, append) + + val exitCode = Seq("/bin/bash", "-c", cmds.mkString(" && ")) ! processLogger + + (exitCode, output.toString()) + } + + class Profile(private val sqlContext: SQLContext, cpuLogs: DataFrame) { + import sqlContext.implicits._ + + def hosts = cpuLogs.select($"tags.hostName").distinct.collect().map(_.getString(0)) + + def buildGraph(fs: FS) = { + val stackLine = """(.*)\.([^\(]+)\(([^:]+)(:{0,1}\d*)\)""".r + def toStackElement(s: String) = s match { + case stackLine(cls, method, file, "") => new StackTraceElement(cls, method, file, 0) + case stackLine(cls, method, file, line) => new StackTraceElement(cls, method, file, line.stripPrefix(":").toInt) + } + + val counts = cpuLogs.groupBy($"stack").agg(count($"*")).collect().map { + case Row(stackLines: Seq[String], count: Long) => stackLines.map(toStackElement) -> count + }.toMap + val profile = new com.twitter.jvm.CpuProfile(counts, com.twitter.util.Duration.fromSeconds(10), cpuLogs.count().toInt, 0) + + val outfile = File.createTempFile("cpu", "profile") + val svgFile = File.createTempFile("cpu", "svg") + + profile.writeGoogleProfile(new FileOutputStream(outfile)) + + println(run( + "cp /dbfs/home/michael/pprof ./", + "chmod 755 pprof", + s"$pprof --svg ${outfile.getCanonicalPath} > ${svgFile.getCanonicalPath}")) + + val timestamp = System.currentTimeMillis() + fs.cp(s"file://$svgFile", s"/FileStore/cpu.profiles/$timestamp.svg", false) + s"""CPU Usage Visualization""" + } + } +} \ No newline at end of file