[CELEBORN-2045] Add logger sinks to allow persist metrics data and avoid possible worker OOM

### What changes were proposed in this pull request?
1. Add a new sink and allow the user to store metrics to files.
2. Celeborn will scrape its metrics periodically to make sure that the metric data won't be too large to cause OOM.

### Why are the changes needed?
A long-running worker ran out of memory and found out that the metrics are huge in the heap dump.
As you can see below, the biggest object is the time metric queue, and I got 1.6 million records.
<img width="1516" alt="Screenshot 2025-06-24 at 09 59 30" src="https://github.com/user-attachments/assets/691c7bc2-b974-4cc0-8d5a-bf626ab903c0" />
<img width="1239" alt="Screenshot 2025-06-24 at 14 45 10" src="https://github.com/user-attachments/assets/ebdf5a4d-c941-4f1e-911f-647aa156b37a" />

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
Cluster.

Closes #3346 from FMX/b2045.

Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Co-authored-by: Ethan Feng <ethanfeng@apache.org>
Co-authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
mingji 2025-06-26 18:42:20 -07:00 committed by Wang, Fei
parent 0fc7827ab8
commit 7a0eee332a
11 changed files with 258 additions and 2 deletions

View File

@ -900,6 +900,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
get(METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD) get(METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD)
def metricsJsonPrettyEnabled: Boolean = get(METRICS_JSON_PRETTY_ENABLED) def metricsJsonPrettyEnabled: Boolean = get(METRICS_JSON_PRETTY_ENABLED)
def metricsWorkerAppLevelEnabled: Boolean = get(METRICS_WORKER_APP_LEVEL_ENABLED) def metricsWorkerAppLevelEnabled: Boolean = get(METRICS_WORKER_APP_LEVEL_ENABLED)
def metricsLoggerSinkScrapeInterval: Long = get(METRICS_LOGGERSINK_SCRAPE_INTERVAL)
def metricsLoggerSinkScrapeOutputEnabled: Boolean = get(METRICS_LOGGERSINK_SCRAPE_OUTPUT_ENABLED)
// ////////////////////////////////////////////////////// // //////////////////////////////////////////////////////
// Quota // // Quota //
@ -5637,6 +5639,28 @@ object CelebornConf extends Logging {
.booleanConf .booleanConf
.createWithDefault(true) .createWithDefault(true)
val METRICS_LOGGERSINK_SCRAPE_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.metrics.loggerSink.scrape.interval")
.categories("metrics")
.version("0.6.0")
.doc("The interval of logger sink to scrape its own metrics. " +
"This config will have effect if you enabled logger sink. " +
"If you will not scrape metrics periodically, " +
"do add `org.apache.celeborn.common.metrics.sink.LoggerSink` to metrics.properties.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30min")
val METRICS_LOGGERSINK_SCRAPE_OUTPUT_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.metrics.loggerSink.output.enabled")
.categories("metrics")
.version("0.6.0")
.doc("Whether to output scraped metrics to the logger. " +
"This config will have effect if you enabled logger sink." +
"If you will not scrape metrics periodically," +
" do add `org.apache.celeborn.common.metrics.sink.LoggerSink` to metrics.properties.")
.booleanConf
.createWithDefault(false)
val IDENTITY_PROVIDER: ConfigEntry[String] = val IDENTITY_PROVIDER: ConfigEntry[String] =
buildConf("celeborn.identity.provider") buildConf("celeborn.identity.provider")
.withAlternative("celeborn.quota.identity.provider") .withAlternative("celeborn.quota.identity.provider")

View File

@ -627,6 +627,8 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
sum sum
} }
// Do use this method to get metrics, because it will clear the timeMetrics queue
// Do not use the LogReporter to report metrics
override def getMetrics: String = { override def getMetrics: String = {
var leftMetricsNum = metricsCapacity var leftMetricsNum = metricsCapacity
val sb = new mutable.StringBuilder val sb = new mutable.StringBuilder

View File

@ -83,6 +83,23 @@
</Delete> </Delete>
</DefaultRolloverStrategy> </DefaultRolloverStrategy>
</RollingRandomAccessFile> </RollingRandomAccessFile>
<RollingRandomAccessFile name="metricsAuditFile" fileName="${env:CELEBORN_LOG_DIR}/audit/metrics-audit.log"
filePattern="${env:CELEBORN_LOG_DIR}/audit/metrics-audit.log.%d-%i">
<PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} %p [%t] %c{1}: %m%n%ex"/>
<Policies>
<SizeBasedTriggeringPolicy size="200 MB"/>
</Policies>
<DefaultRolloverStrategy max="7">
<Delete basePath="${env:CELEBORN_LOG_DIR}/audit" maxDepth="1">
<IfFileName glob="metrics-audit.log*">
<IfAny>
<IfAccumulatedFileSize exceeds="1 GB"/>
<IfAccumulatedFileCount exceeds="10"/>
</IfAny>
</IfFileName>
</Delete>
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
</Appenders> </Appenders>
<Loggers> <Loggers>
@ -107,5 +124,8 @@
<Logger name="org.apache.celeborn.service.deploy.master.audit.ShuffleAuditLogger" level="INFO" additivity="false"> <Logger name="org.apache.celeborn.service.deploy.master.audit.ShuffleAuditLogger" level="INFO" additivity="false">
<Appender-ref ref="shuffleAuditFile" level="INFO"/> <Appender-ref ref="shuffleAuditFile" level="INFO"/>
</Logger> </Logger>
<Logger name="org.apache.celeborn.common.metrics.sink.LoggerSink" level="INFO" additivity="false">
<Appender-ref ref="metricsAuditFile" level="INFO"/>
</Logger>
</Loggers> </Loggers>
</Configuration> </Configuration>

View File

@ -17,3 +17,4 @@
*.sink.prometheusServlet.class=org.apache.celeborn.common.metrics.sink.PrometheusServlet *.sink.prometheusServlet.class=org.apache.celeborn.common.metrics.sink.PrometheusServlet
*.sink.jsonServlet.class=org.apache.celeborn.common.metrics.sink.JsonServlet *.sink.jsonServlet.class=org.apache.celeborn.common.metrics.sink.JsonServlet
*.sink.loggerSink.class=org.apache.celeborn.common.metrics.sink.LoggerSink

View File

@ -26,6 +26,8 @@ license: |
| celeborn.metrics.extraLabels | | false | If default metric labels are not enough, extra metric labels can be customized. Labels' pattern is: `<label1_key>=<label1_value>[,<label2_key>=<label2_value>]*`; e.g. `env=prod,version=1` | 0.3.0 | | | celeborn.metrics.extraLabels | | false | If default metric labels are not enough, extra metric labels can be customized. Labels' pattern is: `<label1_key>=<label1_value>[,<label2_key>=<label2_value>]*`; e.g. `env=prod,version=1` | 0.3.0 | |
| celeborn.metrics.json.path | /metrics/json | false | URI context path of json metrics HTTP server. | 0.4.0 | | | celeborn.metrics.json.path | /metrics/json | false | URI context path of json metrics HTTP server. | 0.4.0 | |
| celeborn.metrics.json.pretty.enabled | true | false | When true, view metrics in json pretty format | 0.4.0 | | | celeborn.metrics.json.pretty.enabled | true | false | When true, view metrics in json pretty format | 0.4.0 | |
| celeborn.metrics.loggerSink.output.enabled | false | false | Whether to output scraped metrics to the logger. This config will have effect if you enabled logger sink.If you will not scrape metrics periodically, do add `org.apache.celeborn.common.metrics.sink.LoggerSink` to metrics.properties. | 0.6.0 | |
| celeborn.metrics.loggerSink.scrape.interval | 30min | false | The interval of logger sink to scrape its own metrics. This config will have effect if you enabled logger sink. If you will not scrape metrics periodically, do add `org.apache.celeborn.common.metrics.sink.LoggerSink` to metrics.properties. | 0.6.0 | |
| celeborn.metrics.prometheus.path | /metrics/prometheus | false | URI context path of prometheus metrics HTTP server. | 0.4.0 | | | celeborn.metrics.prometheus.path | /metrics/prometheus | false | URI context path of prometheus metrics HTTP server. | 0.4.0 | |
| celeborn.metrics.sample.rate | 1.0 | false | It controls if Celeborn collect timer metrics for some operations. Its value should be in [0.0, 1.0]. | 0.2.0 | | | celeborn.metrics.sample.rate | 1.0 | false | It controls if Celeborn collect timer metrics for some operations. Its value should be in [0.0, 1.0]. | 0.2.0 | |
| celeborn.metrics.timer.slidingWindow.size | 4096 | false | The sliding window size of timer metric. | 0.2.0 | | | celeborn.metrics.timer.slidingWindow.size | 4096 | false | The sliding window size of timer metric. | 0.2.0 | |

View File

@ -100,9 +100,14 @@ license: |
- Since 0.6.0, the client respects the spark.celeborn.storage.availableTypes configuration, - Since 0.6.0, the client respects the spark.celeborn.storage.availableTypes configuration,
ensuring revived partition locations no longer default to memory storage. In contrast, clients prior ensuring revived partition locations no longer default to memory storage. In contrast, clients prior
to 0.6.0 default to memory storage for revived partitions. This means that if memory storage is enabled in to 0.6.0 default to memory storage for revived partitions. This means that if memory storage is enabled in
worker nodes, pre-0.6.0 clients may inadvertently utilize memory storage for an application even when memory worker nodes, clients prior to 0.6.0 may inadvertently utilize memory storage for an application even when memory
storage is not enabled for that app. storage is not enabled for that app.
- Since 0.6.0, we have added a new sink `org.apache.celeborn.common.metrics.sink.LoggerSink` to make sure that Celeborn
metrics will be scraped periodically. It's recommended to enable this sink to make sure that worker's metrics data won't
be too large to cause worker OOM if you don't have a collector to scrape metrics periodically. Don't forget to update
the `metrics.properties`.
## Upgrading from 0.5.0 to 0.5.1 ## Upgrading from 0.5.0 to 0.5.1
- Since 0.5.1, Celeborn master REST API `/exclude` request uses media type `application/x-www-form-urlencoded` instead of `text/plain`. - Since 0.5.1, Celeborn master REST API `/exclude` request uses media type `application/x-www-form-urlencoded` instead of `text/plain`.

View File

@ -45,7 +45,12 @@ Each instance can report to zero or more _sinks_. Sinks are contained in the
* `CSVSink`: Exports metrics data to CSV files at regular intervals. * `CSVSink`: Exports metrics data to CSV files at regular intervals.
* `PrometheusServlet`: Adds a servlet within the existing Celeborn REST API to serve metrics data in Prometheus format. * `PrometheusServlet`: Adds a servlet within the existing Celeborn REST API to serve metrics data in Prometheus format.
* `JsonServlet`: Adds a servlet within the existing Celeborn REST API to serve metrics data in JSON format.
* `GraphiteSink`: Sends metrics to a Graphite node. * `GraphiteSink`: Sends metrics to a Graphite node.
* `LoggerSink`: Scrape metrics periodically and output them to the logger files if you have enabled
`celeborn.metrics.loggerSink.output.enabled`. This is used as safety valve to make sure the
metrics data won't exist in the memory for a long time. If you don't have a metrics collector to
collect metrics from celeborn periodically, it's important to enable this sink.
The syntax of the metrics configuration file and the parameters available for each sink are defined The syntax of the metrics configuration file and the parameters available for each sink are defined
in an example configuration file, in an example configuration file,
@ -66,6 +71,8 @@ This example shows a list of Celeborn configuration parameters for a CSV sink:
Default values of the Celeborn metrics configuration are as follows: Default values of the Celeborn metrics configuration are as follows:
``` ```
*.sink.prometheusServlet.class=org.apache.celeborn.common.metrics.sink.PrometheusServlet *.sink.prometheusServlet.class=org.apache.celeborn.common.metrics.sink.PrometheusServlet
*.sink.jsonServlet.class=org.apache.celeborn.common.metrics.sink.JsonServlet
*.sink.loggerSink.class=org.apache.celeborn.common.metrics.sink.LoggerSink
``` ```
Additional sources can be configured using the metrics configuration file or the configuration Additional sources can be configured using the metrics configuration file or the configuration

View File

@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.matching.Regex import scala.util.matching.Regex
import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry} import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
import com.google.common.annotations.VisibleForTesting
import org.eclipse.jetty.servlet.ServletContextHandler import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.CelebornConf
@ -39,7 +40,8 @@ class MetricsSystem(
conf: CelebornConf) extends Logging { conf: CelebornConf) extends Logging {
private[this] val metricsConfig = new MetricsConfig(conf) private[this] val metricsConfig = new MetricsConfig(conf)
private val sinks = new ArrayBuffer[Sink] @VisibleForTesting
val sinks = new ArrayBuffer[Sink]
private val sources = new CopyOnWriteArrayList[Source] private val sources = new CopyOnWriteArrayList[Source]
private val registry = new MetricRegistry() private val registry = new MetricRegistry()
private val prometheusServletPath = conf.get(METRICS_PROMETHEUS_PATH) private val prometheusServletPath = conf.get(METRICS_PROMETHEUS_PATH)
@ -156,6 +158,11 @@ class MetricsSystem(
sources.asScala.toSeq, sources.asScala.toSeq,
jsonServletPath, jsonServletPath,
conf.metricsJsonPrettyEnabled.asInstanceOf[Object]).asInstanceOf[JsonServlet]) conf.metricsJsonPrettyEnabled.asInstanceOf[Object]).asInstanceOf[JsonServlet])
} else if (kv._1 == "loggerSink") {
val sink = Utils.classForName(classPath)
.getConstructor(classOf[Seq[Source]], classOf[CelebornConf])
.newInstance(sources.asScala.toSeq, conf)
sinks += sink.asInstanceOf[Sink]
} else { } else {
val sink = Utils.classForName(classPath) val sink = Utils.classForName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry]) .getConstructor(classOf[Properties], classOf[MetricRegistry])

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.common.metrics.sink
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.metrics.source.Source
import org.apache.celeborn.common.util.ThreadUtils
/**
* This sink is not follow the strandard sink interface. It has the duty to clean internal state.
* @param sources
* @param conf
*/
class LoggerSink(sources: Seq[Source], conf: CelebornConf) extends Sink with Logging {
val metricsLoggerSinkScrapeOutputEnabled = conf.metricsLoggerSinkScrapeOutputEnabled
val metricsLoggerSinkScrapeInterval = conf.metricsLoggerSinkScrapeInterval
val metricScrapeThread: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"metrics-scrape-thread")
override def start(): Unit = {
metricScrapeThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
sources.foreach { source =>
// The method `source.getMetrics` will clear `timeMetric` queue.
// This is essential because the queue can be large enough
// to cause the worker run out of memory
val metricsData = source.getMetrics
if (metricsLoggerSinkScrapeOutputEnabled) {
logInfo(s"Source ${source.sourceName} scraped metrics: ${metricsData}")
}
}
}
},
metricsLoggerSinkScrapeInterval,
metricsLoggerSinkScrapeInterval,
TimeUnit.MILLISECONDS)
}
override def stop(): Unit = {
ThreadUtils.shutdown(metricScrapeThread)
}
override def report(): Unit = {}
}

View File

@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
*.sink.prometheusServlet.class=org.apache.celeborn.common.metrics.sink.PrometheusServlet
*.sink.jsonServlet.class=org.apache.celeborn.common.metrics.sink.JsonServlet
*.sink.loggerSink.class=org.apache.celeborn.common.metrics.sink.LoggerSink

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.server.common.metrics.sink
import org.apache.logging.log4j.message.SimpleMessage
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.metrics.MetricsSystem
import org.apache.celeborn.common.metrics.sink.LoggerSink
import org.apache.celeborn.common.metrics.source.JVMSource
import org.apache.celeborn.common.network.TestHelper
class LoggerSinkSuite extends CelebornFunSuite {
test("test load logger sink case") {
val celebornConf = new CelebornConf()
celebornConf
.set(CelebornConf.METRICS_ENABLED.key, "true")
.set(
CelebornConf.METRICS_CONF.key,
TestHelper.getResourceAsAbsolutePath("/metrics2.properties"))
val metricsSystem = MetricsSystem.createMetricsSystem("test", celebornConf)
metricsSystem.registerSource(new JVMSource(celebornConf, "test"))
metricsSystem.start(true)
var hasLoggerSink = false
metricsSystem.sinks.foreach { sink =>
sink.isInstanceOf[LoggerSink] match {
case true =>
hasLoggerSink = true
case false =>
}
}
metricsSystem.stop()
assert(hasLoggerSink)
}
test("test logger sink configs case") {
val celebornConf = new CelebornConf()
celebornConf
.set(CelebornConf.METRICS_ENABLED.key, "true")
.set(
CelebornConf.METRICS_CONF.key,
TestHelper.getResourceAsAbsolutePath("/metrics2.properties"))
celebornConf.set("celeborn.metrics.loggerSink.scrape.interval", "10s")
celebornConf.set("celeborn.metrics.loggerSink.output.enabled", "true")
val metricsSystem = MetricsSystem.createMetricsSystem("test", celebornConf)
metricsSystem.registerSource(new JVMSource(celebornConf, "test"))
metricsSystem.start(true)
metricsSystem.sinks.foreach { sink =>
sink.isInstanceOf[LoggerSink] match {
case true =>
val loggerSink = sink.asInstanceOf[LoggerSink]
assert(loggerSink.metricsLoggerSinkScrapeOutputEnabled == true)
assert(loggerSink.metricsLoggerSinkScrapeInterval == 10000)
case false =>
}
}
metricsSystem.stop()
}
test("test logger sink validity case") {
val celebornConf = new CelebornConf()
celebornConf
.set(CelebornConf.METRICS_ENABLED.key, "true")
.set(
CelebornConf.METRICS_CONF.key,
TestHelper.getResourceAsAbsolutePath("/metrics2.properties"))
celebornConf.set("celeborn.metrics.loggerSink.scrape.interval", "3s")
celebornConf.set("celeborn.metrics.loggerSink.output.enabled", "true")
val metricsSystem = MetricsSystem.createMetricsSystem("test", celebornConf)
val jvmSource = new JVMSource(celebornConf, "test")
metricsSystem.registerSource(jvmSource)
metricsSystem.start(true)
jvmSource.timerMetrics.add("test1")
jvmSource.timerMetrics.add("test2")
jvmSource.timerMetrics.add("test3")
jvmSource.timerMetrics.add("test4")
jvmSource.timerMetrics.add("test5")
Thread.sleep(100)
assert(jvmSource.timerMetrics.size() != 0)
Thread.sleep(10000)
metricsSystem.stop()
assert(jvmSource.timerMetrics.size() == 0)
}
}