[CELEBORN-1092] Introduce JVM monitoring in Celeborn Worker using JVMQuake

### What changes were proposed in this pull request?

Introduce JVM monitoring in Celeborn Worker using JVMQuake to enable early detection of memory management issues and facilitate fast failure.

### Why are the changes needed?

When facing out-of-control memory management in Celeborn Worker we typically use JVMkill as a remedy by killing the process and generating a heap dump for post-analysis. However, even with jvmkill protection, we may still encounter issues caused by JVM running out of memory, such as repeated execution of Full GC without performing any useful work during the pause time. Since the JVM does not exhaust 100% of resources, JVMkill will not be triggered. Therefore JVMQuake is introduced to provide more granular monitoring of GC behavior, enabling early detection of memory management issues and facilitating fast failure. Refers to the principle of [jvmquake](https://github.com/Netflix-Skunkworks/jvmquake) which is a JVMTI agent that attaches to your JVM and automatically signals and kills it when the program has become unstable.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`JVMQuakeSuite`

Closes #2061 from SteNicholas/CELEBORN-1092.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
This commit is contained in:
SteNicholas 2023-11-28 20:45:08 +08:00 committed by Fu Chen
parent 6623309237
commit 4dfcd9b56b
25 changed files with 501 additions and 4 deletions

View File

@ -304,6 +304,8 @@ javax.servlet:javax.servlet-api
MIT License
------------
See license/LISENCE-jdktools.txt for details.
com.github.olivergondza:maven-jdk-tools-wrapper
See license/LICENSE-slf4j.txt for details.
org.slf4j:jul-to-slf4j
org.slf4j:slf4j-api

View File

@ -689,6 +689,21 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def workerFetchHeartbeatEnabled: Boolean = get(WORKER_FETCH_HEARTBEAT_ENABLED)
def workerPartitionSplitEnabled: Boolean = get(WORKER_PARTITION_SPLIT_ENABLED)
def workerActiveConnectionMax: Option[Long] = get(WORKER_ACTIVE_CONNECTION_MAX)
def workerJvmQuakeEnabled: Boolean = get(WORKER_JVM_QUAKE_ENABLED)
def workerJvmQuakeCheckInterval: Long = get(WORKER_JVM_QUAKE_CHECK_INTERVAL)
def workerJvmQuakeRuntimeWeight: Double = get(WORKER_JVM_QUAKE_RUNTIME_WEIGHT)
def workerJvmQuakeDumpEnabled: Boolean = get(WORKER_JVM_QUAKE_DUMP_ENABLED)
def workerJvmQuakeDumpPath: String = get(WORKER_JVM_QUAKE_DUMP_PATH)
def workerJvmQuakeDumpThreshold: Duration =
getTimeAsMs(
WORKER_JVM_QUAKE_DUMP_THRESHOLD.key,
WORKER_JVM_QUAKE_DUMP_THRESHOLD.defaultValueString).microsecond
def workerJvmQuakeKillThreshold: Duration =
getTimeAsMs(
WORKER_JVM_QUAKE_KILL_THRESHOLD.key,
WORKER_JVM_QUAKE_KILL_THRESHOLD.defaultValueString).microsecond
def workerJvmQuakeExitCode: Int = get(WORKER_JVM_QUAKE_EXIT_CODE)
// //////////////////////////////////////////////////////
// Metrics System //
@ -2896,6 +2911,77 @@ object CelebornConf extends Logging {
.longConf
.createOptional
val WORKER_JVM_QUAKE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.jvmQuake.enabled")
.categories("worker")
.version("0.4.0")
.doc("When true, Celeborn worker will start the jvm quake to monitor of gc behavior, " +
"which enables early detection of memory management issues and facilitates fast failure.")
.booleanConf
.createWithDefault(false)
val WORKER_JVM_QUAKE_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.jvmQuake.check.interval")
.categories("worker")
.version("0.4.0")
.doc("Interval of gc behavior checking for worker jvm quake.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")
val WORKER_JVM_QUAKE_RUNTIME_WEIGHT: ConfigEntry[Double] =
buildConf("celeborn.worker.jvmQuake.runtimeWeight")
.categories("worker")
.version("0.4.0")
.doc(
"The factor by which to multiply running JVM time, when weighing it against GCing time. " +
"'Deficit' is accumulated as `gc_time - runtime * runtime_weight`, and is compared against threshold " +
"to determine whether to take action.")
.doubleConf
.createWithDefault(5)
val WORKER_JVM_QUAKE_DUMP_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.worker.jvmQuake.dump.threshold")
.categories("worker")
.version("0.4.0")
.doc("The threshold of heap dump for the maximum GC 'deficit' which can be accumulated before jvmquake takes action. " +
"Meanwhile, there is no heap dump generated when dump threshold is greater than kill threshold.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
val WORKER_JVM_QUAKE_KILL_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.worker.jvmQuake.kill.threshold")
.categories("worker")
.version("0.4.0")
.doc("The threshold of system kill for the maximum GC 'deficit' which can be accumulated before jvmquake takes action.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")
val WORKER_JVM_QUAKE_DUMP_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.jvmQuake.dump.enabled")
.categories("worker")
.version("0.4.0")
.doc("Whether to heap dump for the maximum GC 'deficit' during worker jvm quake.")
.booleanConf
.createWithDefault(true)
val WORKER_JVM_QUAKE_DUMP_PATH: ConfigEntry[String] =
buildConf("celeborn.worker.jvmQuake.dump.path")
.categories("worker")
.version("0.4.0")
.doc("The path of heap dump for the maximum GC 'deficit' during worker jvm quake.")
.stringConf
.transform(_.replace("<tmp>", System.getProperty("java.io.tmpdir"))
.replace("<pid>", Utils.getProcessId))
.createWithDefault(s"<tmp>/jvm-quake/dump/<pid>")
val WORKER_JVM_QUAKE_EXIT_CODE: ConfigEntry[Int] =
buildConf("celeborn.worker.jvmQuake.exitCode")
.categories("worker")
.version("0.4.0")
.doc("The exit code of system kill for the maximum GC 'deficit' during worker jvm quake.")
.intConf
.createWithDefault(502)
val APPLICATION_HEARTBEAT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.client.application.heartbeatInterval")
.withAlternative("celeborn.application.heartbeatInterval")

View File

@ -1091,4 +1091,6 @@ object Utils extends Logging {
}
labelPart(0).trim -> labelPart(1).trim
}
def getProcessId: String = ManagementFactory.getRuntimeMXBean.getName.split("@")(0)
}

View File

@ -56,9 +56,9 @@ function mvn_build_classpath() {
}
function sbt_build_client_classpath() {
PATTERN="$SBT_PROJECT / Runtime / externalDependencyClasspath"
PATTERN="$SBT_PROJECT / Runtime / managedClasspath"
deps=$(
$SBT -P$MODULE "clean; export Runtime/externalDependencyClasspath" | \
$SBT -P$MODULE "clean; export Runtime/managedClasspath" | \
awk -v pat="$PATTERN" '$0 ~ pat { found=1 } found { print }' | \
awk 'NR==2' | \
tr ":" "\n"
@ -96,8 +96,8 @@ function sbt_build_client_classpath() {
}
function sbt_build_server_classpath() {
$SBT "error; clean; export externalDependencyClasspath" | \
awk '/externalDependencyClasspath/ { found=1 } found { print }' | \
$SBT "error; clean; export managedClasspath" | \
awk '/managedClasspath/ { found=1 } found { print }' | \
awk 'NR % 2 == 0' | \
# This will skip the last line
sed '$d' | \

View File

@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.8.0//lz4-java-1.8.0.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar

View File

@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.8.0//lz4-java-1.8.0.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar

View File

@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.8.0//lz4-java-1.8.0.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar

View File

@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.8.0//lz4-java-1.8.0.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar

View File

@ -122,6 +122,7 @@ kerby-util/1.0.1//kerby-util-1.0.1.jar
kerby-xdr/1.0.1//kerby-xdr-1.0.1.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.8.0//lz4-java-1.8.0.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar

View File

@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.4.0//lz4-java-1.4.0.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar

View File

@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.7.1//lz4-java-1.7.1.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar

View File

@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.7.1//lz4-java-1.7.1.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar

View File

@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.7.1//lz4-java-1.7.1.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar

View File

@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.8.0//lz4-java-1.8.0.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar

View File

@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.8.0//lz4-java-1.8.0.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar

View File

@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.8.0//lz4-java-1.8.0.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar

View File

@ -37,6 +37,7 @@ log4j-api/2.17.2//log4j-api-2.17.2.jar
log4j-core/2.17.2//log4j-core-2.17.2.jar
log4j-slf4j-impl/2.17.2//log4j-slf4j-impl-2.17.2.jar
lz4-java/1.8.0//lz4-java-1.8.0.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar

View File

@ -70,6 +70,14 @@ license: |
| celeborn.worker.graceful.shutdown.timeout | 600s | The worker's graceful shutdown timeout time. | 0.2.0 |
| celeborn.worker.http.host | &lt;localhost&gt; | Worker's http host. | 0.4.0 |
| celeborn.worker.http.port | 9096 | Worker's http port. | 0.4.0 |
| celeborn.worker.jvmQuake.check.interval | 1s | Interval of gc behavior checking for worker jvm quake. | 0.4.0 |
| celeborn.worker.jvmQuake.dump.enabled | true | Whether to heap dump for the maximum GC 'deficit' during worker jvm quake. | 0.4.0 |
| celeborn.worker.jvmQuake.dump.path | &lt;tmp&gt;/jvm-quake/dump/&lt;pid&gt; | The path of heap dump for the maximum GC 'deficit' during worker jvm quake. | 0.4.0 |
| celeborn.worker.jvmQuake.dump.threshold | 30s | The threshold of heap dump for the maximum GC 'deficit' which can be accumulated before jvmquake takes action. Meanwhile, there is no heap dump generated when dump threshold is greater than kill threshold. | 0.4.0 |
| celeborn.worker.jvmQuake.enabled | false | When true, Celeborn worker will start the jvm quake to monitor of gc behavior, which enables early detection of memory management issues and facilitates fast failure. | 0.4.0 |
| celeborn.worker.jvmQuake.exitCode | 502 | The exit code of system kill for the maximum GC 'deficit' during worker jvm quake. | 0.4.0 |
| celeborn.worker.jvmQuake.kill.threshold | 60s | The threshold of system kill for the maximum GC 'deficit' which can be accumulated before jvmquake takes action. | 0.4.0 |
| celeborn.worker.jvmQuake.runtimeWeight | 5.0 | The factor by which to multiply running JVM time, when weighing it against GCing time. 'Deficit' is accumulated as `gc_time - runtime * runtime_weight`, and is compared against threshold to determine whether to take action. | 0.4.0 |
| celeborn.worker.monitor.disk.check.interval | 30s | Intervals between device monitor to check disk. | 0.3.0 |
| celeborn.worker.monitor.disk.check.timeout | 30s | Timeout time for worker check device status. | 0.3.0 |
| celeborn.worker.monitor.disk.checklist | readwrite,diskusage | Monitor type for disk, available items are: iohang, readwrite and diskusage. | 0.2.0 |

View File

@ -0,0 +1,21 @@
The MIT License
Copyright (c) 2016 Red Hat, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -122,6 +122,7 @@
<!-- for JDK-17 test-->
<extraJavaTestArgs>-XX:+IgnoreUnrecognizedVMOptions
--add-exports=jdk.internal.jvmstat/sun.jvmstat.monitor=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
@ -1156,6 +1157,13 @@
<properties>
<java.version>8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.github.olivergondza</groupId>
<artifactId>maven-jdk-tools-wrapper</artifactId>
<version>0.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Based on https://github.com/google/error-prone/blob/f8e33bc460be82ab22256a7ef8b979d7a2cacaba/docs/installation.md#jdk-8 -->

View File

@ -50,6 +50,7 @@ object Dependencies {
val junitVersion = "4.13.2"
val leveldbJniVersion = "1.8"
val log4j2Version = "2.17.2"
val jdkToolsVersion = "0.1"
val metricsVersion = "3.2.6"
val mockitoVersion = "4.11.0"
val nettyVersion = "4.1.93.Final"
@ -71,6 +72,7 @@ object Dependencies {
val commonsIo = "commons-io" % "commons-io" % commonsIoVersion
val commonsLang3 = "org.apache.commons" % "commons-lang3" % commonsLang3Version
val commonsLogging = "commons-logging" % "commons-logging" % commonsLoggingVersion
val jdkTools = "com.github.olivergondza" % "maven-jdk-tools-wrapper" % jdkToolsVersion
val findbugsJsr305 = "com.google.code.findbugs" % "jsr305" % findbugsVersion
val guava = "com.google.guava" % "guava" % guavaVersion excludeAll(
ExclusionRule("org.checkerframework", "checker-qual"),
@ -167,6 +169,7 @@ object CelebornCommonSettings {
Test / javaOptions ++= Seq(
"-Xmx4g",
"-XX:+IgnoreUnrecognizedVMOptions",
"--add-exports=jdk.internal.jvmstat/sun.jvmstat.monitor=ALL-UNNAMED",
"--add-opens=java.base/java.lang=ALL-UNNAMED",
"--add-opens=java.base/java.lang.invoke=ALL-UNNAMED",
"--add-opens=java.base/java.lang.reflect=ALL-UNNAMED",
@ -322,6 +325,7 @@ object CelebornCommon {
Dependencies.commonsLang3,
Dependencies.hadoopClientApi,
Dependencies.hadoopClientRuntime,
Dependencies.jdkTools,
Dependencies.ratisClient,
Dependencies.ratisCommon,
Dependencies.leveldbJniAll,

77
project/JDKTools.scala Normal file
View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.build
import java.io.FileNotFoundException
import sbt._
import sbt.Keys._
import scala.sys.process._
import scala.util.Try
/**
* Represents the main plugin to add tooling settings to projects using libraries from the Scala Debugger project.
*/
object JDKTools extends AutoPlugin {
override def requires = plugins.JvmPlugin
override def trigger = allRequirements
override def projectSettings: Seq[Def.Setting[_]] = inConfig(Compile)(settings)
lazy val settings = baseScalaDebuggerToolsSettings
lazy val baseScalaDebuggerToolsSettings: Seq[Def.Setting[_]] =
if (System.getProperty("java.specification.version").startsWith("1."))
Seq(
// JDK Dependency (just for sbt, must exist on classpath for execution, cannot be redistributed)
unmanagedJars += {
Attributed.blank(JavaTools)
}
)
else
// on Java 9+, we don't need to do anything at all
Seq()
//
// NOTE: Taken from Ensime Server project (when under BSD 3-clause)
// https://github.com/ensime/ensime-server/blob/master/project/EnsimeBuild.scala
//
// WORKAROUND: https://github.com/typelevel/scala/issues/75
lazy val JavaTools: File = List(
// manual
sys.env.get("JAVA_HOME"),
sys.env.get("JDK_HOME"),
// osx
Try("/usr/libexec/java_home".!!).toOption,
// fallback
// sys.props.get("java.home") returns jre home for JDK8
sys.props.get("java.home").map(new File(_).getParent),
sys.props.get("java.home")
).flatten.map { n =>
new File(n.trim + "/lib/tools.jar")
}.find(_.exists()).getOrElse(
throw new FileNotFoundException(
"""
|Could not automatically find the JDK/lib/tools.jar.
|You must explicitly set JDK_HOME or JAVA_HOME.
""".stripMargin)
)
}

View File

@ -51,6 +51,7 @@ import org.apache.celeborn.service.deploy.worker.WorkerSource.ACTIVE_CONNECTION_
import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
import org.apache.celeborn.service.deploy.worker.memory.{ChannelsLimiter, MemoryManager}
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingState
import org.apache.celeborn.service.deploy.worker.monitor.JVMQuake
import org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter, StorageManager}
private[celeborn] class Worker(
@ -275,6 +276,12 @@ private[celeborn] class Worker(
private val userResourceConsumptions =
JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, Long)]()
private var jvmQuake: JVMQuake = _
if (conf.workerJvmQuakeEnabled) {
jvmQuake = JVMQuake.create(conf, workerInfo.toUniqueId().replace(":", "-"))
jvmQuake.start()
}
workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () =>
workerInfo.getShuffleKeySet.size
}
@ -431,6 +438,9 @@ private[celeborn] class Worker(
if (!stopped) {
logInfo("Stopping Worker.")
if (jvmQuake != null) {
jvmQuake.stop()
}
if (sendHeartbeatTask != null) {
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
sendHeartbeatTask.cancel(false)

View File

@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.service.deploy.worker.monitor
import java.io.File
import java.lang.management.ManagementFactory
import java.nio.file.Files
import java.util.UUID
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import com.google.common.annotations.VisibleForTesting
import com.sun.management.HotSpotDiagnosticMXBean
import sun.jvmstat.monitor._
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.util.{ThreadUtils, Utils}
/**
* The JVM quake provides granular monitoring of GC behavior, which enables early detection of memory management
* issues and facilitates fast failure.
*
* Note: The principle is in alignment with GC instability detection algorithm for jvmquake project of Netflix:
* https://github.com/Netflix-Skunkworks/jvmquake.
*
* @param conf Celeborn configuration with jvm quake config.
*/
class JVMQuake(conf: CelebornConf, uniqueId: String = UUID.randomUUID().toString) extends Logging {
import JVMQuake._
val dumpFile = s"worker-quake-heapdump-$uniqueId.hprof"
var heapDumped: Boolean = false
private[this] val enabled = conf.workerJvmQuakeEnabled
private[this] val checkInterval = conf.workerJvmQuakeCheckInterval
private[this] val runtimeWeight = conf.workerJvmQuakeRuntimeWeight
private[this] val dumpThreshold = conf.workerJvmQuakeDumpThreshold.toNanos
private[this] val killThreshold = conf.workerJvmQuakeKillThreshold.toNanos
private[this] val dumpEnabled = conf.workerJvmQuakeDumpEnabled
private[this] val dumpPath = conf.workerJvmQuakeDumpPath
private[this] val exitCode = conf.workerJvmQuakeExitCode
private[this] var lastExitTime: Long = 0L
private[this] var lastGCTime: Long = 0L
private[this] var bucket: Long = 0L
private[this] var scheduler: ScheduledExecutorService = _
def start(): Unit = {
if (enabled) {
lastExitTime = getLastExitTime
lastGCTime = getLastGCTime
scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("jvm-quake")
scheduler.scheduleWithFixedDelay(
new Runnable() {
override def run(): Unit = {
JVMQuake.this.run()
}
},
0,
checkInterval,
TimeUnit.MILLISECONDS)
}
}
def stop(): Unit = {
if (enabled) {
scheduler.shutdown()
}
}
private def run(): Unit = {
val currentExitTime = getLastExitTime
val currentGCTime = getLastGCTime
val gcTime = currentGCTime - lastGCTime
val runTime = currentExitTime - lastExitTime - gcTime
bucket = Math.max(0, bucket + gcTime - BigDecimal(runTime * runtimeWeight).toLong)
logDebug(s"Time: (gc time: ${Utils.msDurationToString(gcTime)}, execution time: ${Utils.msDurationToString(runTime)})")
logDebug(
s"Capacity: (bucket: $bucket, dump threshold: $dumpThreshold, kill threshold: $killThreshold)")
if (bucket > dumpThreshold) {
logError(s"JVM GC has reached the threshold: bucket: $bucket, dumpThreshold: $dumpThreshold.")
if (shouldHeapDump) {
val savePath = getHeapDumpSavePath
val linkPath = getHeapDumpLinkPath
heapDump(savePath, linkPath)
} else if (bucket > killThreshold) {
logError(s"Exit JVM with $exitCode. JVM GC has reached the threshold: bucket: $bucket, killThreshold: $killThreshold.")
System.exit(exitCode)
}
}
lastExitTime = currentExitTime
lastGCTime = currentGCTime
}
def shouldHeapDump: Boolean = {
dumpEnabled && !heapDumped
}
@VisibleForTesting
def getHeapDumpSavePath: String =
dumpPath
@VisibleForTesting
def getHeapDumpLinkPath: String =
s"${new File(dumpPath).getParent}/link/${Utils.getProcessId}"
private def heapDump(savePath: String, linkPath: String, live: Boolean = false): Unit = {
val saveDir = new File(savePath)
if (!saveDir.exists()) {
saveDir.mkdirs()
}
val heapDump = new File(saveDir, dumpFile)
if (heapDump.exists()) {
// Each worker process only generates one heap dump. Skip when heap dump of worker already exists.
logWarning(s"Skip because heap dump of worker already exists: $heapDump.")
heapDumped = true
return
}
logInfo(s"Starting heap dump: $heapDump.")
ManagementFactory.newPlatformMXBeanProxy(
ManagementFactory.getPlatformMBeanServer,
"com.sun.management:type=HotSpotDiagnostic",
classOf[HotSpotDiagnosticMXBean]).dumpHeap(heapDump.getAbsolutePath, live)
val linkDir = new File(linkPath)
if (linkDir.exists()) {
// Each worker process only generates one heap dump. Skip when symbolic link of heap dump exists.
logWarning(s"Skip because symbolic link of heap dump exists: $linkPath.")
} else if (!linkDir.getParentFile.exists()) {
linkDir.getParentFile.mkdirs()
}
try {
Files.createSymbolicLink(linkDir.toPath, saveDir.toPath)
logInfo(s"Created symbolic link: $linkPath.")
} catch {
case e: Exception =>
logError("Create symbolic link failed.", e)
} finally {
heapDumped = true
logInfo(s"Finished heap dump: $dumpFile.")
}
}
}
object JVMQuake {
private[this] var quake: JVMQuake = _
def create(conf: CelebornConf, uniqueId: String): JVMQuake = {
set(new JVMQuake(conf, uniqueId))
quake
}
def get: JVMQuake = {
quake
}
def set(quake: JVMQuake): Unit = {
this.quake = quake
}
private[this] lazy val monitoredVm: MonitoredVm = {
val host = MonitoredHost.getMonitoredHost(new HostIdentifier("localhost"))
host.getMonitoredVm(new VmIdentifier("local://%s@localhost".format(Utils.getProcessId)))
}
private[this] lazy val ygcExitTimeMonitor: Monitor =
monitoredVm.findByName("sun.gc.collector.0.lastExitTime")
private[this] lazy val fgcExitTimeMonitor: Monitor =
monitoredVm.findByName("sun.gc.collector.1.lastExitTime")
private[this] lazy val ygcTimeMonitor: Monitor = monitoredVm.findByName("sun.gc.collector.0.time")
private[this] lazy val fgcTimeMonitor: Monitor = monitoredVm.findByName("sun.gc.collector.1.time")
private def getLastExitTime: Long = Math.max(
ygcExitTimeMonitor.getValue.asInstanceOf[Long],
fgcExitTimeMonitor.getValue.asInstanceOf[Long])
private def getLastGCTime: Long =
ygcTimeMonitor.getValue.asInstanceOf[Long] + fgcTimeMonitor.getValue.asInstanceOf[Long]
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.service.deploy.worker.monitor
import java.io.File
import scala.collection.mutable.ArrayBuffer
import org.junit.Assert.assertTrue
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.CelebornConf._
import org.apache.celeborn.common.util.JavaUtils
class JVMQuakeSuite extends CelebornFunSuite {
private val allocation = new ArrayBuffer[Array[Byte]]()
override def afterEach(): Unit = {
allocation.clear()
System.gc()
}
test("[CELEBORN-1092] Introduce JVM monitoring in Celeborn Worker using JVMQuake") {
val quake = new JVMQuake(new CelebornConf().set(WORKER_JVM_QUAKE_ENABLED.key, "true")
.set(WORKER_JVM_QUAKE_RUNTIME_WEIGHT.key, "1")
.set(WORKER_JVM_QUAKE_DUMP_THRESHOLD.key, "1s")
.set(WORKER_JVM_QUAKE_KILL_THRESHOLD.key, "2s"))
quake.start()
allocateMemory(quake)
quake.stop()
assertTrue(quake.heapDumped)
val heapDump = new File(s"${quake.getHeapDumpSavePath}/${quake.dumpFile}")
assert(heapDump.exists())
JavaUtils.deleteRecursively(heapDump)
JavaUtils.deleteRecursively(new File(quake.getHeapDumpLinkPath))
}
def allocateMemory(quake: JVMQuake): Unit = {
val capacity = 1024 * 100
while (allocation.size * capacity < Runtime.getRuntime.maxMemory / 4) {
val bytes = new Array[Byte](capacity)
allocation.append(bytes)
}
while (quake.shouldHeapDump) {
for (index <- allocation.indices) {
val bytes = new Array[Byte](capacity)
allocation(index) = bytes
}
}
}
}