[ISSUE-813][Refactor] Refactor flusher configurations. (#813)

* Refactor flusher configurations.

* Refactor flusher configurations.

* Update.

* remove brackets.

* update docs.

* rename.

* update.

* update docs.

* update.

* update.

* update.

* update.

* update.

* update.

* update.

* format.

* update.

* update.
This commit is contained in:
Ethan Feng 2022-10-20 15:23:17 +08:00 committed by GitHub
parent 1e5bed2da7
commit 5c761a8df3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 286 additions and 197 deletions

View File

@ -1210,7 +1210,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
candidates: List[WorkerInfo],
slots: WorkerResource): Boolean = {
var requestSlots = slots
val maxRetryTimes = RssConf.reserveSlotsMaxRetry(conf)
val maxRetryTimes = RssConf.reserveSlotsMaxRetries(conf)
val retryWaitInterval = RssConf.reserveSlotsRetryWait(conf)
var retryTimes = 1
var noAvailableSlots = false

View File

@ -24,6 +24,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.collection.JavaConverters._
import scala.util.Try
import org.apache.celeborn.common.RssConf.log
import org.apache.celeborn.common.identity.DefaultIdentityProvider
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.internal.config._
@ -369,7 +370,85 @@ class RssConf(loadDefaults: Boolean) extends Cloneable with Logging with Seriali
def checkSlotsFinishedTimeoutMs: Long = get(WORKER_CHECK_SLOTS_FINISHED_TIMEOUT)
def workerRecoverPath: String = get(WORKER_RECOVER_PATH)
def partitionSorterCloseAwaitTimeMs: Long = get(PARTITION_SORTER_SHUTDOWN_TIMEOUT)
def workerDiskFlusherShutdownTimeoutMs: Long = get(WORKER_FLUSHER_SHUTDOWN_TIMEOUT)
def workerFlusherShutdownTimeoutMs: Long = get(WORKER_FLUSHER_SHUTDOWN_TIMEOUT)
def shuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
def writerCloseTimeoutMs: Long = get(WORKER_WRITER_CLOSE_TIMEOUT)
def hddFlusherThreads: Int = get(WORKER_FLUSHER_HDD_THREADS)
def ssdFlusherThreads: Int = get(WORKER_FLUSHER_SSD_THREADS)
def hdfsFlusherThreads: Int = get(WORKER_FLUSHER_HDFS_THREADS)
def avgFlushTimeSlidingWindowSize: Int = get(WORKER_FLUSHER_AVGFLUSHTIME_SLIDINGWINDOW_SIZE)
def avgFlushTimeSlidingWindowMinCount: Int =
get(WORKER_FLUSHER_AVGFLUSHTIME_SLIDINGWINDOW_MINCOUNT)
def diskReserveSize: Long = get(WORKER_DISK_RESERVE_SIZE)
def deviceMonitorEnabled: Boolean = get(WORKER_DEVICE_MONITOR_ENABLED)
def deviceMonitorCheckList: Seq[String] = get(WORKER_DEVICE_MONITOR_CHECKLIST)
def diskCheckInterval: Long = get(WORKER_DISK_CHECK_INTERVAL)
def sysBlockDir: String = get(WORKER_DEVICEMONITOR_SYS_BLOCKDIR)
def createWriterCreateMaxAttempts: Int = get(WORKER_WRITER_CREATE_MAXATTEMPTS)
def workerStorageBaseDirPrefix: String = get(WORKER_STORAGE_BASE_DIR_PREFIX)
def workerStorageBaseDirNumber: Int = get(WORKER_STORAGE_BASE_DIR_COUNT)
/**
* @return workingDir, usable space, flusher thread count, disk type
* check more details at CONFIGURATION_GUIDE.md
*/
def workerBaseDirs: Seq[(String, Long, Int, Type)] = {
// I assume there is no disk is bigger than 1 PB in recent days.
val defaultMaxCapacity = Utils.byteStringAsBytes("1PB")
get(WORKER_STORAGE_DIRS).map { storageDirs: Seq[String] =>
storageDirs.map { str =>
var maxCapacity = defaultMaxCapacity
var diskType = HDD
var flushThread = -1
val (dir, attributes) = str.split(":").toList match {
case _dir :: tail => (_dir, tail)
case nil => throw new IllegalArgumentException(s"Illegal storage dir: $nil")
}
attributes.foreach {
case capacityStr if capacityStr.toLowerCase.startsWith("capacity=") =>
maxCapacity = Utils.byteStringAsBytes(capacityStr.split("=")(1))
case diskTypeStr if diskTypeStr.toLowerCase.startsWith("disktype=") =>
diskType = Type.valueOf(diskTypeStr.split("=")(1))
if (diskType == Type.MEMORY) {
throw new IOException(s"Invalid diskType: $diskType")
}
case threadCountStr if threadCountStr.toLowerCase.startsWith("flushthread=") =>
flushThread = threadCountStr.split("=")(1).toInt
case illegal =>
throw new IllegalArgumentException(s"Illegal attribute: $illegal")
}
if (flushThread == -1) {
flushThread = diskType match {
case HDD => hddFlusherThreads
case SSD => ssdFlusherThreads
}
}
(dir, maxCapacity, flushThread, diskType)
}
}.getOrElse {
val prefix = workerStorageBaseDirPrefix
val number = workerStorageBaseDirNumber
(1 to number).map { i =>
(s"$prefix$i", defaultMaxCapacity, hddFlusherThreads, HDD)
}
}
}
def partitionSplitMinimumSize: Long = {
getSizeAsBytes("rss.partition.split.minimum.size", "1m")
}
def hdfsDir: String = {
get(HDFS_DIR).map {
hdfsDir =>
if (!Utils.isHdfsPath(hdfsDir)) {
log.error(s"${HDFS_DIR.key} configuration is wrong $hdfsDir. Disable HDFS support.")
""
} else {
hdfsDir
}
}.getOrElse("")
}
}
object RssConf extends Logging {
@ -748,7 +827,7 @@ object RssConf extends Logging {
def registerShuffleRetryWait(conf: RssConf): Long = conf.get(SHUFFLE_REGISTER_RETRY_WAIT)
def reserveSlotsMaxRetry(conf: RssConf): Int = conf.get(RESERVE_SLOTS_MAX_RETRIES)
def reserveSlotsMaxRetries(conf: RssConf): Int = conf.get(RESERVE_SLOTS_MAX_RETRIES)
def reserveSlotsRetryWait(conf: RssConf): Long = conf.get(RESERVE_SLOTS_RETRY_WAIT)
@ -1110,29 +1189,38 @@ object RssConf extends Logging {
.toSequence
.createOptional
val WORKER_FLUSH_BUFFER_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.flush.buffer.size")
val WORKER_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.flusher.buffer.size")
.withAlternative("rss.worker.flush.buffer.size")
.categories("worker")
.doc("Size of buffer used by a single flusher.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("256k")
def workerHeartbeatTimeoutMs(conf: RssConf): Long = conf.get(WORKER_HEARTBEAT_TIMEOUT)
def workerReplicateThreads(conf: RssConf): Int = conf.get(WORKER_REPLICATE_THREADS)
def workerCommitThreads(conf: RssConf): Int = conf.get(WORKER_COMMIT_THREADS)
def workerFlushBufferSize(conf: RssConf): Long = conf.get(WORKER_FLUSH_BUFFER_SIZE)
def workerFlusherBufferSize(conf: RssConf): Long = conf.get(WORKER_FLUSHER_BUFFER_SIZE)
def flushTimeout(conf: RssConf): Long = {
conf.getTimeAsSeconds("rss.flush.timeout", "120s")
}
val WORKER_SHUFFLE_COMMIT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.shuffle.commit.timeout")
.withAlternative("rss.flush.timeout")
.categories("worker")
.doc("Timeout for a Celeborn worker to commit a shuffle.")
.version("0.2.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("120s")
def fileWriterTimeoutMs(conf: RssConf): Long = {
conf.getTimeAsMs("rss.filewriter.timeout", "120s")
}
val WORKER_WRITER_CLOSE_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.writer.close.timeout")
.withAlternative("rss.filewriter.timeout")
.categories("worker")
.doc("Timeout for a file writer to close")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")
def appExpireDurationMs(conf: RssConf): Long = {
conf.getTimeAsMs("rss.expire.nonEmptyDir.duration", "1d")
@ -1142,63 +1230,71 @@ object RssConf extends Logging {
conf.get("rss.worker.workingDirName", "hadoop/rss-worker/shuffle_data")
}
/**
* @return workingDir, usable space, flusher thread count, disk type
* check more details at CONFIGURATION_GUIDE.md
*/
def workerBaseDirs(conf: RssConf): Seq[(String, Long, Int, Type)] = {
// I assume there is no disk is bigger than 1 PB in recent days.
val defaultMaxCapacity = Utils.byteStringAsBytes("1PB")
conf.get(WORKER_STORAGE_DIRS).map { storageDirs: Seq[String] =>
storageDirs.map { str =>
var maxCapacity = defaultMaxCapacity
var diskType = HDD
var flushThread = -1
val (dir, attributes) = str.split(":").toList match {
case _dir :: tail => (_dir, tail)
case nil => throw new IllegalArgumentException(s"Illegal storage dir: $nil")
}
attributes.foreach {
case capacityStr if capacityStr.toLowerCase.startsWith("capacity=") =>
maxCapacity = Utils.byteStringAsBytes(capacityStr.split("=")(1))
case diskTypeStr if diskTypeStr.toLowerCase.startsWith("disktype=") =>
diskType = Type.valueOf(diskTypeStr.split("=")(1))
if (diskType == Type.MEMORY) {
throw new IOException(s"Invalid diskType: $diskType")
}
case threadCountStr if threadCountStr.toLowerCase.startsWith("flushthread=") =>
flushThread = threadCountStr.split("=")(1).toInt
case illegal =>
throw new IllegalArgumentException(s"Illegal attribute: $illegal")
}
if (flushThread == -1) {
flushThread = diskType match {
case HDD => HDDFlusherThread(conf)
case SSD => SSDFlusherThread(conf)
}
}
(dir, maxCapacity, flushThread, diskType)
}
}.getOrElse {
val prefix = RssConf.workerBaseDirPrefix(conf)
val number = RssConf.workerBaseDirNumber(conf)
(1 to number).map { i =>
(s"$prefix$i", defaultMaxCapacity, HDDFlusherThread(conf), HDD)
}
}
}
val WORKER_FLUSHER_HDD_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.hdd.threads")
.withAlternative("rss.flusher.hdd.thread.count")
.categories("worker")
.doc("Flusher's thread count used for write data to HDD disks.")
.version("0.2.0")
.intConf
.createWithDefault(1)
def HDDFlusherThread(conf: RssConf): Int = {
conf.getInt("rss.flusher.hdd.thread.count", 1)
}
val WORKER_FLUSHER_SSD_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.ssd.threads")
.withAlternative("rss.flusher.hdd.thread.count")
.categories("worker")
.doc("Flusher's thread count used for write data to SSD disks.")
.version("0.2.0")
.intConf
.createWithDefault(8)
def SSDFlusherThread(conf: RssConf): Int = {
conf.getInt("rss.flusher.ssd.thread.count", 8)
}
val WORKER_FLUSHER_HDFS_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.hdfs.threads")
.withAlternative("rss.worker.hdfs.flusher.thread.count")
.categories("worker")
.doc("Flusher's thread count used for write data to HDFS.")
.version("0.2.0")
.intConf
.createWithDefault(4)
def diskMinimumReserveSize(conf: RssConf): Long = {
Utils.byteStringAsBytes(conf.get("rss.disk.minimum.reserve.size", "5G"))
}
val WORKER_FLUSHER_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.flusher.shutdown.timeout")
.withAlternative("rss.worker.diskFlusherShutdownTimeoutMs")
.categories("worker")
.doc("Timeout for a flusher to shutdown.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("3s")
val WORKER_DISK_RESERVE_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.disk.reserve.size")
.withAlternative("rss.disk.minimum.reserve.size")
.categories("worker")
.doc("Celeborn worker reserved space for each disk.")
.version("0.2.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("5G")
val WORKER_FLUSHER_AVGFLUSHTIME_SLIDINGWINDOW_SIZE: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.avgFlushTime.slidingWindow.size")
.withAlternative("rss.flusher.avg.time.window")
.categories("worker")
.doc("The minimum flush count to enter a sliding window" +
" to calculate statistics about flushed time and count.")
.version("0.2.0")
.intConf
.createWithDefault(20)
val WORKER_FLUSHER_AVGFLUSHTIME_SLIDINGWINDOW_MINCOUNT: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.avgFlushTime.slidingWindow.minCount")
.withAlternative("rss.flusher.avg.time.minimum.count")
.categories("worker")
.doc("The minimum flush count to enter a sliding window" +
" to calculate statistics about flushed time and count.")
.version("0.2.0")
.internal
.intConf
.createWithDefault(1000)
/**
* @return This configuration is a guidance for load-aware slot allocation algorithm. This value
@ -1228,14 +1324,6 @@ object RssConf extends Logging {
Utils.timeStringAsMs(conf.get("rss.partition.size.update.interval", "10m"))
}
def workerBaseDirPrefix(conf: RssConf): String = {
conf.get("rss.worker.base.dir.prefix", "/mnt/disk")
}
def workerBaseDirNumber(conf: RssConf): Int = {
conf.getInt("rss.worker.base.dir.number", 16)
}
def stageEndTimeout(conf: RssConf): Long = {
conf.getTimeAsMs("rss.stage.end.timeout", "240s")
}
@ -1282,13 +1370,6 @@ object RssConf extends Logging {
.booleanConf
.createWithDefault(true)
val METRICS_TIMER_SLIDING_SIZE: ConfigEntry[Int] =
buildConf("celeborn.metrics.timer.sliding.size")
.withAlternative("rss.metrics.system.timer.sliding.size")
.categories("master", "worker", "metrics")
.intConf
.createWithDefault(4000)
val METRICS_SAMPLE_RATE: ConfigEntry[Double] =
buildConf("celeborn.metrics.sample.rate")
.withAlternative("rss.metrics.system.sample.rate")
@ -1336,8 +1417,6 @@ object RssConf extends Logging {
def metricsSystemEnable(conf: RssConf): Boolean = conf.get(METRICS_ENABLED)
def metricsTimerSlidingSize(conf: RssConf): Int = conf.get(METRICS_TIMER_SLIDING_SIZE)
def metricsSampleRate(conf: RssConf): Double = conf.get(METRICS_SAMPLE_RATE)
def metricsSamplePerfCritical(conf: RssConf): Boolean = {
@ -1420,25 +1499,48 @@ object RssConf extends Logging {
.toSequence
.createWithDefaultString("readwrite,diskusage")
def deviceMonitorEnabled(conf: RssConf): Boolean = conf.get(WORKER_DEVICE_MONITOR_ENABLED)
val WORKER_DISK_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.deviceMonitor.check.interval")
.withAlternative("rss.disk.check.interval")
.categories("worker")
.doc("Intervals between device monitor to check disk.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")
def deviceMonitorCheckList(conf: RssConf): Seq[String] = conf.get(WORKER_DEVICE_MONITOR_CHECKLIST)
val WORKER_DEVICEMONITOR_SYS_BLOCKDIR: ConfigEntry[String] =
buildConf("celeborn.worker.deviceMonitor.sys.block.dir")
.withAlternative("rss.sys.block.dir")
.categories("worker")
.doc("The directory where linux file block information is stored.")
.stringConf
.createWithDefault("/sys/block")
def diskCheckIntervalMs(conf: RssConf): Long = {
conf.getTimeAsMs("rss.disk.check.interval", "60s")
}
val WORKER_WRITER_CREATE_MAXATTEMPTS: ConfigEntry[Int] =
buildConf("celeborn.worker.writer.create.maxAttempts")
.withAlternative("rss.create.file.writer.retry.count")
.categories("worker")
.doc("Retry count for a file writer to create if its creation was failed.")
.intConf
.createWithDefault(3)
def slowFlushIntervalMs(conf: RssConf): Long = {
conf.getTimeAsMs("rss.slow.flush.interval", "10s")
}
val WORKER_STORAGE_BASE_DIR_PREFIX: ConfigEntry[String] =
buildConf("celeborn.worker.storage.base.dir.prefix")
.withAlternative("rss.worker.base.dir.prefix")
.categories("worker")
.doc("Base directory for Celeborn worker to write if \'base.dir\' is not set.")
.stringConf
.createWithDefault("/mnt/disk")
def sysBlockDir(conf: RssConf): String = {
conf.get("rss.sys.block.dir", "/sys/block")
}
def createFileWriterRetryCount(conf: RssConf): Int = {
conf.getInt("rss.create.file.writer.retry.count", 3)
}
val WORKER_STORAGE_BASE_DIR_COUNT: ConfigEntry[Int] =
buildConf("celeborn.worker.storage.base.dir.number")
.withAlternative("rss.worker.base.dir.number")
.categories("worker")
.doc("How many directories will be create if \'base.dir\' is not set. " +
"The directory name is a combination of \'dir.prefix\' " +
"and from zero to \"dir.number\" step by one. " +
"No sub directory will be created.")
.intConf
.createWithDefault(16)
def workerStatusCheckTimeout(conf: RssConf): Long = {
conf.getTimeAsSeconds("rss.worker.status.check.timeout", "30s")
@ -1476,10 +1578,6 @@ object RssConf extends Logging {
conf.getSizeAsBytes("rss.partition.split.threshold", "256m")
}
def partitionSplitMinimumSize(conf: RssConf): Long = {
conf.getSizeAsBytes("rss.partition.split.minimum.size", "1m")
}
def batchHandleChangePartitionEnabled(conf: RssConf): Boolean = {
conf.getBoolean("rss.change.partition.batch.enabled", false)
}
@ -1597,7 +1695,8 @@ object RssConf extends Logging {
buildConf("celeborn.worker.graceful.shutdown.checkSlotsFinished.interval")
.withAlternative("rss.worker.checkSlots.interval")
.categories("worker")
.doc("The wait interval of checking whether all released slots to be committed or destroyed during worker graceful shutdown")
.doc("The wait interval of checking whether all released slots " +
"to be committed or destroyed during worker graceful shutdown")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")
@ -1606,7 +1705,8 @@ object RssConf extends Logging {
buildConf("celeborn.worker.graceful.shutdown.checkSlotsFinished.timeout")
.withAlternative("rss.worker.checkSlots.timeout")
.categories("worker")
.doc("The wait time of waiting for the released slots to be committed or destroyed during worker graceful shutdown.")
.doc("The wait time of waiting for the released slots" +
" to be committed or destroyed during worker graceful shutdown.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("480s")
@ -1625,20 +1725,12 @@ object RssConf extends Logging {
buildConf("celeborn.worker.graceful.shutdown.partitionSorter.shutdownTimeout")
.withAlternative("rss.worker.partitionSorterCloseAwaitTime")
.categories("worker")
.doc("The wait time of waiting for sorting partition files during worker graceful shutdown.")
.doc("The wait time of waiting for sorting partition files" +
" during worker graceful shutdown.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")
val WORKER_FLUSHER_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.graceful.shutdown.flusher.shutdownTimeout")
.withAlternative("rss.worker.diskFlusherShutdownTimeoutMs")
.categories("worker")
.doc("The timeout to wait for diskOperators to execute remaining jobs before being shutdown immediately.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("3s")
def offerSlotsAlgorithm(conf: RssConf): String = {
var algorithm = conf.get("rss.offer.slots.algorithm", "roundrobin")
if (algorithm != "loadaware" && algorithm != "roundrobin") {
@ -1649,27 +1741,13 @@ object RssConf extends Logging {
algorithm
}
def flushAvgTimeWindow(conf: RssConf): Int = {
conf.getInt("rss.flusher.avg.time.window", 20);
}
def flushAvgTimeMinimumCount(conf: RssConf): Int = {
conf.getInt("rss.flusher.avg.time.minimum.count", 1000);
}
def hdfsDir(conf: RssConf): String = {
val hdfsDir = conf.get("rss.worker.hdfs.dir", "")
if (hdfsDir.nonEmpty && !Utils.isHdfsPath(hdfsDir)) {
log.error(s"rss.worker.hdfs.dir configuration is wrong $hdfsDir. Disable hdfs support.")
""
} else {
hdfsDir
}
}
def hdfsFlusherThreadCount(conf: RssConf): Int = {
conf.getInt("rss.worker.hdfs.flusher.thread.count", 4)
}
val HDFS_DIR: OptionalConfigEntry[String] =
buildConf("celeborn.storage.hdfs.dir")
.withAlternative("rss.worker.hdfs.dir")
.categories("worker", "client")
.doc("HDFS dir configuration for Celeborn to access HDFS.")
.stringConf
.createOptional
def rangeReadFilterEnabled(conf: RssConf): Boolean = {
conf.getBoolean("rss.range.read.filter.enabled", false)

View File

@ -36,7 +36,7 @@ class RssConfSuite extends RssFunSuite {
val conf = new RssConf()
val defaultMaxUsableSpace = 1024L * 1024 * 1024 * 1024 * 1024
conf.set("celeborn.worker.storage.dirs", "/mnt/disk1")
val parsedDirs = RssConf.workerBaseDirs(conf)
val parsedDirs = conf.workerBaseDirs
assert(parsedDirs.size == 1)
assert(parsedDirs.head._3 == 1)
assert(parsedDirs.head._2 == defaultMaxUsableSpace)
@ -46,7 +46,7 @@ class RssConfSuite extends RssFunSuite {
val conf = new RssConf()
val defaultMaxUsableSpace = 1024L * 1024 * 1024 * 1024 * 1024
conf.set("celeborn.worker.storage.dirs", "/mnt/disk1:disktype=SSD:capacity=10g")
val parsedDirs = RssConf.workerBaseDirs(conf)
val parsedDirs = conf.workerBaseDirs
assert(parsedDirs.size == 1)
assert(parsedDirs.head._3 == 8)
assert(parsedDirs.head._2 == 10 * 1024 * 1024 * 1024L)
@ -55,7 +55,7 @@ class RssConfSuite extends RssFunSuite {
test("basedir test3") {
val conf = new RssConf()
conf.set("celeborn.worker.storage.dirs", "/mnt/disk1:disktype=SSD:capacity=10g:flushthread=3")
val parsedDirs = RssConf.workerBaseDirs(conf)
val parsedDirs = conf.workerBaseDirs
assert(parsedDirs.size == 1)
assert(parsedDirs.head._3 == 3)
assert(parsedDirs.head._2 == 10 * 1024 * 1024 * 1024L)
@ -67,7 +67,7 @@ class RssConfSuite extends RssFunSuite {
"celeborn.worker.storage.dirs",
"/mnt/disk1:disktype=SSD:capacity=10g:flushthread=3," +
"/mnt/disk2:disktype=HDD:capacity=15g:flushthread=7")
val parsedDirs = RssConf.workerBaseDirs(conf)
val parsedDirs = conf.workerBaseDirs
assert(parsedDirs.size == 2)
assert(parsedDirs.head._1 == "/mnt/disk1")
assert(parsedDirs.head._3 == 3)

View File

@ -33,5 +33,6 @@ license: |
| celeborn.shuffle.writer.mode | `hash` | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count it huge. | 0.2.0 |
| celeborn.slot.reserve.maxRetries | `3` | Max retry times for client to reserve slots. | |
| celeborn.slot.reserve.retryWait | `3s` | Wait time before next retry if reserve slots failed. | |
| celeborn.storage.hdfs.dir | `<undefined>` | HDFS dir configuration for Celeborn to access HDFS. | |
| celeborn.worker.excluded.interval | `30s` | Interval for client to refresh excluded worker list. | |
<!--end-include-->

View File

@ -29,7 +29,6 @@ license: |
| celeborn.master.port | `9097` | Port for master to bind. | 0.2.0 |
| celeborn.metrics.enabled | `true` | When true, enable metrics system. | |
| celeborn.metrics.sample.rate | `1.0` | | |
| celeborn.metrics.timer.sliding.size | `4000` | | |
| celeborn.metrics.timer.sliding.window.size | `4096` | | |
| celeborn.worker.heartbeat.timeout | `120s` | Worker heartbeat timeout. | |
<!--end-include-->

View File

@ -20,7 +20,6 @@ license: |
| celeborn.master.metrics.prometheus.port | `9098` | | |
| celeborn.metrics.enabled | `true` | When true, enable metrics system. | |
| celeborn.metrics.sample.rate | `1.0` | | |
| celeborn.metrics.timer.sliding.size | `4000` | | |
| celeborn.metrics.timer.sliding.window.size | `4096` | | |
| celeborn.worker.metrics.prometheus.host | `0.0.0.0` | | |
| celeborn.worker.metrics.prometheus.port | `9096` | | |

View File

@ -19,17 +19,24 @@ license: |
| celeborn.master.endpoints | `<localhost>:9097` | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 |
| celeborn.metrics.enabled | `true` | When true, enable metrics system. | |
| celeborn.metrics.sample.rate | `1.0` | | |
| celeborn.metrics.timer.sliding.size | `4000` | | |
| celeborn.metrics.timer.sliding.window.size | `4096` | | |
| celeborn.shuffle.chuck.size | `8m` | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | |
| celeborn.storage.hdfs.dir | `<undefined>` | HDFS dir configuration for Celeborn to access HDFS. | |
| celeborn.worker.commit.threads | `32` | Thread number of worker to commit shuffle data files asynchronously. | |
| celeborn.worker.deviceMonitor.check.interval | `60s` | Intervals between device monitor to check disk. | |
| celeborn.worker.deviceMonitor.checklist | `readwrite,diskusage` | Select what the device needs to detect, available items are: iohang, readwrite and diskusage. | |
| celeborn.worker.deviceMonitor.enabled | `true` | When true, worker will monitor device and report to master. | |
| celeborn.worker.flush.buffer.size | `256k` | Size of buffer used by a single flusher. | |
| celeborn.worker.deviceMonitor.sys.block.dir | `/sys/block` | The directory where linux file block information is stored. | |
| celeborn.worker.disk.reserve.size | `5G` | Celeborn worker reserved space for each disk. | 0.2.0 |
| celeborn.worker.flusher.avgFlushTime.slidingWindow.size | `20` | The minimum flush count to enter a sliding window to calculate statistics about flushed time and count. | 0.2.0 |
| celeborn.worker.flusher.buffer.size | `256k` | Size of buffer used by a single flusher. | |
| celeborn.worker.flusher.hdd.threads | `1` | Flusher's thread count used for write data to HDD disks. | 0.2.0 |
| celeborn.worker.flusher.hdfs.threads | `4` | Flusher's thread count used for write data to HDFS. | 0.2.0 |
| celeborn.worker.flusher.shutdown.timeout | `3s` | Timeout for a flusher to shutdown. | 0.2.0 |
| celeborn.worker.flusher.ssd.threads | `8` | Flusher's thread count used for write data to SSD disks. | 0.2.0 |
| celeborn.worker.graceful.shutdown.checkSlotsFinished.interval | `1s` | The wait interval of checking whether all released slots to be committed or destroyed during worker graceful shutdown | 0.2.0 |
| celeborn.worker.graceful.shutdown.checkSlotsFinished.timeout | `480s` | The wait time of waiting for the released slots to be committed or destroyed during worker graceful shutdown. | 0.2.0 |
| celeborn.worker.graceful.shutdown.enabled | `false` | When true, during worker shutdown, the worker will wait for all released slots to be committed or destroyed. | 0.2.0 |
| celeborn.worker.graceful.shutdown.flusher.shutdownTimeout | `3s` | The timeout to wait for diskOperators to execute remaining jobs before being shutdown immediately. | 0.2.0 |
| celeborn.worker.graceful.shutdown.partitionSorter.shutdownTimeout | `120s` | The wait time of waiting for sorting partition files during worker graceful shutdown. | 0.2.0 |
| celeborn.worker.graceful.shutdown.recoverPath | `<tmp>/recover` | The path to store levelDB. | 0.2.0 |
| celeborn.worker.graceful.shutdown.timeout | `600s` | The worker's graceful shutdown timeout time. | 0.2.0 |
@ -37,5 +44,10 @@ license: |
| celeborn.worker.metrics.prometheus.host | `0.0.0.0` | | |
| celeborn.worker.metrics.prometheus.port | `9096` | | |
| celeborn.worker.replicate.threads | `64` | Thread number of worker to replicate shuffle data. | |
| celeborn.worker.shuffle.commit.timeout | `120s` | Timeout for a Celeborn worker to commit a shuffle. | 0.2.0 |
| celeborn.worker.storage.base.dir.number | `16` | How many directories will be create if 'base.dir' is not set. The directory name is a combination of 'dir.prefix' and from zero to "dir.number" step by one. No sub directory will be created. | |
| celeborn.worker.storage.base.dir.prefix | `/mnt/disk` | Base directory for Celeborn worker to write if 'base.dir' is not set. | |
| celeborn.worker.storage.dirs | `<undefined>` | Directory list to store shuffle data. It's recommended to configure one directory on each disk. Storage size limit can be set for each directory. For the sake of performance, there should be no more than 2 flush threads on the same disk partition if you are using HDD, and should be 8 or more flush threads on the same disk partition if you are using SSD. For example: dir1[:capacity=][:disktype=][:flushthread=],dir2[:capacity=][:disktype=][:flushthread=] | |
| celeborn.worker.writer.close.timeout | `120s` | Timeout for a file writer to close | 0.2.0 |
| celeborn.worker.writer.create.maxAttempts | `3` | Retry count for a file writer to create if its creation was failed. | |
<!--end-include-->

View File

@ -103,7 +103,7 @@ private[celeborn] class Master(
private def workersSnapShot: util.List[WorkerInfo] =
statusSystem.workers.synchronized(new util.ArrayList[WorkerInfo](statusSystem.workers))
private def minimumUsableSize = RssConf.diskMinimumReserveSize(conf)
private def diskReserveSize = conf.diskReserveSize
private def diskGroups = RssConf.diskGroups(conf)
@ -498,7 +498,7 @@ private[celeborn] class Master(
workersNotBlacklisted(),
requestSlots.partitionIdList,
requestSlots.shouldReplicate,
minimumUsableSize,
diskReserveSize,
diskGroups,
diskGroupGradient)
}

View File

@ -574,6 +574,7 @@
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<failIfNoTests>false</failIfNoTests>
<failIfNoSpecifiedTests>false</failIfNoSpecifiedTests>
</configuration>
<executions>
<execution>

View File

@ -66,9 +66,9 @@ public final class FileWriter implements DeviceObserver {
private CompositeByteBuf flushBuffer;
private final long chunkSize;
private final long timeoutMs;
private final long writerCloseTimeoutMs;
private final long flushBufferSize;
private final long flusherBufferSize;
private final DeviceMonitor deviceMonitor;
private final AbstractSource source; // metrics
@ -109,9 +109,9 @@ public final class FileWriter implements DeviceObserver {
this.flushWorkerIndex = flusher.getWorkerIndex();
this.chunkSize = RssConf.shuffleChunkSize(rssConf);
this.nextBoundary = this.chunkSize;
this.timeoutMs = RssConf.fileWriterTimeoutMs(rssConf);
this.writerCloseTimeoutMs = rssConf.writerCloseTimeoutMs();
this.splitThreshold = splitThreshold;
this.flushBufferSize = RssConf.workerFlushBufferSize(rssConf);
this.flusherBufferSize = RssConf.workerFlusherBufferSize(rssConf);
this.deviceMonitor = deviceMonitor;
this.splitMode = splitMode;
this.partitionType = partitionType;
@ -212,7 +212,7 @@ public final class FileWriter implements DeviceObserver {
mapIdBitMap.add(mapId);
}
if (flushBuffer.readableBytes() != 0
&& flushBuffer.readableBytes() + numBytes >= this.flushBufferSize) {
&& flushBuffer.readableBytes() + numBytes >= this.flusherBufferSize) {
flush(false);
takeBuffer();
}
@ -344,7 +344,7 @@ public final class FileWriter implements DeviceObserver {
}
private void waitOnNoPending(AtomicInteger counter) throws IOException {
long waitTime = timeoutMs;
long waitTime = writerCloseTimeoutMs;
while (counter.get() > 0 && waitTime > 0) {
try {
notifier.checkException();
@ -390,7 +390,7 @@ public final class FileWriter implements DeviceObserver {
}
private void addTask(FlushTask task) throws IOException {
if (!flusher.addTask(task, timeoutMs, flushWorkerIndex)) {
if (!flusher.addTask(task, writerCloseTimeoutMs, flushWorkerIndex)) {
IOException e = new IOException("Add flush task timeout.");
notifier.setException(e);
throw e;

View File

@ -399,18 +399,18 @@ private[deploy] class Controller(
if (future != null) {
val result = new AtomicReference[CompletableFuture[Unit]]()
val flushTimeout = RssConf.flushTimeout(conf)
val shuffleCommitTimeout = conf.shuffleCommitTimeout
val timeout = timer.newTimeout(
new TimerTask {
override def run(timeout: Timeout): Unit = {
if (result.get() != null) {
result.get().cancel(true)
logWarning(s"After waiting $flushTimeout s, cancel all commit file jobs.")
logWarning(s"After waiting $shuffleCommitTimeout s, cancel all commit file jobs.")
}
}
},
flushTimeout,
shuffleCommitTimeout,
TimeUnit.SECONDS)
result.set(future.handleAsync(
@ -427,7 +427,7 @@ private[deploy] class Controller(
Thread.currentThread().interrupt()
throw ie
case _: TimeoutException =>
logWarning(s"While handling commitFiles, timeout after $flushTimeout s.")
logWarning(s"While handling commitFiles, timeout after $shuffleCommitTimeout s.")
case throwable: Throwable =>
logError("While handling commitFiles, exception occurs.", throwable)
}

View File

@ -49,7 +49,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
var pushClientFactory: TransportClientFactory = _
var registered: AtomicBoolean = _
var workerInfo: WorkerInfo = _
var diskMinimumReserveSize: Long = _
var diskReserveSize: Long = _
var partitionSplitMinimumSize: Long = _
def init(worker: Worker): Unit = {
@ -62,10 +62,10 @@ class PushDataHandler extends BaseMessageHandler with Logging {
pushClientFactory = worker.pushClientFactory
registered = worker.registered
workerInfo = worker.workerInfo
diskMinimumReserveSize = RssConf.diskMinimumReserveSize(worker.conf)
partitionSplitMinimumSize = RssConf.partitionSplitMinimumSize(worker.conf)
diskReserveSize = worker.conf.diskReserveSize
partitionSplitMinimumSize = worker.conf.partitionSplitMinimumSize
logInfo(s"diskMinimumReserveSize $diskMinimumReserveSize")
logInfo(s"diskReserveSize $diskReserveSize")
}
override def receive(client: TransportClient, msg: RequestMessage): Unit =
@ -213,7 +213,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
}
val diskFull = workerInfo.diskInfos
.get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
.actualUsableSpace < diskMinimumReserveSize
.actualUsableSpace < diskReserveSize
if ((diskFull && fileWriter.getFileInfo.getFileLength > partitionSplitMinimumSize) ||
(isMaster && fileWriter.getFileInfo.getFileLength > fileWriter.getSplitThreshold())) {
if (fileWriter.getSplitMode == PartitionSplitMode.SOFT) {

View File

@ -455,7 +455,7 @@ private[deploy] object Worker extends Logging {
// address of the Master should be used in the end.
workerArgs.master.foreach { master =>
conf.set(
"celeborn.master.endpoints",
MASTER_ENDPOINTS.key,
RpcAddress.fromRssURL(master).toString.replace("rss://", ""))
}

View File

@ -30,7 +30,6 @@ import org.apache.commons.io.FileUtils
import org.slf4j.LoggerFactory
import org.apache.celeborn.common.RssConf
import org.apache.celeborn.common.RssConf.{deviceMonitorCheckList, diskCheckIntervalMs}
import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, DiskStatus}
import org.apache.celeborn.common.util.ThreadUtils
import org.apache.celeborn.common.util.Utils._
@ -62,7 +61,7 @@ class LocalDeviceMonitor(
}
val observers: jSet[DeviceObserver] = ConcurrentHashMap.newKeySet[DeviceObserver]()
val sysBlockDir = RssConf.sysBlockDir(rssConf)
val sysBlockDir = rssConf.sysBlockDir
val statFile = new File(s"$sysBlockDir/${deviceInfo.name}/stat")
val inFlightFile = new File(s"$sysBlockDir/${deviceInfo.name}/inflight")
@ -181,13 +180,13 @@ class LocalDeviceMonitor(
// (deviceName -> ObservedDevice)
var observedDevices: util.Map[DeviceInfo, ObservedDevice] = _
val diskCheckInterval = diskCheckIntervalMs(rssConf)
val diskCheckInterval = rssConf.diskCheckInterval
// we should choose what the device needs to detect
val monitorCheckList = deviceMonitorCheckList(rssConf)
val checkIoHang = monitorCheckList.contains("iohang")
val checkReadWrite = monitorCheckList.contains("readwrite")
val checkDiskUsage = monitorCheckList.contains("diskusage")
val deviceMonitorCheckList = rssConf.deviceMonitorCheckList
val checkIoHang = deviceMonitorCheckList.contains("iohang")
val checkReadWrite = deviceMonitorCheckList.contains("readwrite")
val checkDiskUsage = deviceMonitorCheckList.contains("diskusage")
private val diskChecker =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-disk-checker")
@ -293,7 +292,7 @@ object DeviceMonitor {
deviceInfos: util.Map[String, DeviceInfo],
diskInfos: util.Map[String, DiskInfo]): DeviceMonitor = {
try {
if (RssConf.deviceMonitorEnabled(rssConf)) {
if (rssConf.deviceMonitorEnabled) {
val monitor = new LocalDeviceMonitor(rssConf, deviceObserver, deviceInfos, diskInfos)
monitor.init()
logger.info("Device monitor init success")
@ -310,18 +309,18 @@ object DeviceMonitor {
/**
* check if the disk is high usage
* @param rssConf conf
* @param conf conf
* @param diskRootPath disk root path
* @return true if high disk usage
*/
def highDiskUsage(rssConf: RssConf, diskRootPath: String): Boolean = {
def highDiskUsage(conf: RssConf, diskRootPath: String): Boolean = {
tryWithTimeoutAndCallback({
val usage = runCommand(s"df -B 1G $diskRootPath").trim.split("[ \t]+")
val totalSpace = usage(usage.length - 5)
val freeSpace = usage(usage.length - 3)
val used_percent = usage(usage.length - 2)
val status = freeSpace.toLong < RssConf.diskMinimumReserveSize(rssConf) / 1024 / 1024 / 1024
val status = freeSpace.toLong < conf.diskReserveSize / 1024 / 1024 / 1024
if (status) {
logger.warn(s"$diskRootPath usage:{total:$totalSpace GB," +
s" free:$freeSpace GB, used_percent:$used_percent}")
@ -329,7 +328,7 @@ object DeviceMonitor {
status
})(false)(
deviceCheckThreadPool,
RssConf.workerStatusCheckTimeout(rssConf),
RssConf.workerStatusCheckTimeout(conf),
s"Disk: $diskRootPath Usage Check Timeout")
}

View File

@ -37,8 +37,8 @@ import org.apache.celeborn.service.deploy.worker.WorkerSource
abstract private[worker] class Flusher(
val workerSource: AbstractSource,
val threadCount: Int,
val flushAvgTimeWindowSize: Int,
val flushAvgTimeMinimumCount: Int) extends Logging {
val avgFlushTimeSlidingWindowSize: Int,
val avgFlushTimeSlidingWindowMinCount: Int) extends Logging {
protected lazy val flusherId = System.identityHashCode(this)
protected val workingQueues = new Array[LinkedBlockingQueue[FlushTask]](threadCount)
protected val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf]()
@ -46,7 +46,7 @@ abstract private[worker] class Flusher(
protected var nextWorkerIndex: Int = 0
protected val flushCount = new LongAdder
protected val flushTotalTime = new LongAdder
protected val avgTimeWindow = new Array[(Long, Long)](flushAvgTimeWindowSize)
protected val avgTimeWindow = new Array[(Long, Long)](avgFlushTimeSlidingWindowSize)
protected var avgTimeWindowCurrentIndex = 0
val lastBeginFlushTime: AtomicLongArray = new AtomicLongArray(threadCount)
@ -56,7 +56,7 @@ abstract private[worker] class Flusher(
init()
private def init(): Unit = {
for (i <- 0 until flushAvgTimeWindowSize) {
for (i <- 0 until avgFlushTimeSlidingWindowSize) {
avgTimeWindow(i) = (0L, 0L)
}
for (i <- 0 until lastBeginFlushTime.length()) {
@ -113,9 +113,9 @@ abstract private[worker] class Flusher(
}
val currentFlushTime = flushTotalTime.sumThenReset()
val currentFlushCount = flushCount.sumThenReset()
if (currentFlushCount >= flushAvgTimeMinimumCount) {
if (currentFlushCount >= avgFlushTimeSlidingWindowMinCount) {
avgTimeWindow(avgTimeWindowCurrentIndex) = (currentFlushTime, currentFlushCount)
avgTimeWindowCurrentIndex = (avgTimeWindowCurrentIndex + 1) % flushAvgTimeWindowSize
avgTimeWindowCurrentIndex = (avgTimeWindowCurrentIndex + 1) % avgFlushTimeSlidingWindowSize
}
var totalFlushTime = 0L
@ -177,12 +177,12 @@ private[worker] class LocalFlusher(
val deviceMonitor: DeviceMonitor,
threadCount: Int,
val mountPoint: String,
flushAvgTimeWindowSize: Int,
avgFlushTimeSlidingWindowSize: Int,
flushAvgTimeMinimumCount: Int,
val diskType: StorageInfo.Type) extends Flusher(
workerSource,
threadCount,
flushAvgTimeWindowSize,
avgFlushTimeSlidingWindowSize,
flushAvgTimeMinimumCount)
with DeviceObserver with Logging {
@ -216,13 +216,13 @@ private[worker] class LocalFlusher(
final private[worker] class HdfsFlusher(
workerSource: AbstractSource,
threadCount: Int,
hdfsFlusherThreads: Int,
flushAvgTimeWindowSize: Int,
flushAvgTimeMinimumCount: Int) extends Flusher(
avgFlushTimeSlidingWindowMinCount: Int) extends Flusher(
workerSource,
threadCount,
hdfsFlusherThreads,
flushAvgTimeWindowSize,
flushAvgTimeMinimumCount) with Logging {
avgFlushTimeSlidingWindowMinCount) with Logging {
override def toString: String = s"HdfsFlusher@$flusherId"

View File

@ -54,7 +54,7 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
val (deviceInfos, diskInfos) = {
val workingDirInfos =
RssConf.workerBaseDirs(conf).map { case (workdir, maxSpace, flusherThread, storageType) =>
conf.workerBaseDirs.map { case (workdir, maxSpace, flusherThread, storageType) =>
(new File(workdir, RssConf.workingDirName(conf)), maxSpace, flusherThread, storageType)
}
@ -104,8 +104,8 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
deviceMonitor,
diskInfo.threadCount,
diskInfo.mountPoint,
RssConf.flushAvgTimeWindow(conf),
RssConf.flushAvgTimeMinimumCount(conf),
conf.avgFlushTimeSlidingWindowSize,
conf.avgFlushTimeSlidingWindowMinCount,
diskInfo.storageType)
flushers.put(diskInfo.mountPoint, flusher)
}
@ -118,7 +118,7 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
deviceMonitor.startCheck()
val hdfsDir = RssConf.hdfsDir(conf)
val hdfsDir = conf.hdfsDir
val hdfsPermission = FsPermission.createImmutable(755)
val hdfsWriters = new util.ArrayList[FileWriter]()
val hdfsFlusher =
@ -129,9 +129,9 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
StorageManager.hdfsFs = FileSystem.get(hdfsConfiguration)
Some(new HdfsFlusher(
workerSource,
RssConf.hdfsFlusherThreadCount(conf),
RssConf.flushAvgTimeWindow(conf),
RssConf.flushAvgTimeMinimumCount(conf)))
conf.hdfsFlusherThreads,
conf.avgFlushTimeSlidingWindowSize,
conf.avgFlushTimeSlidingWindowMinCount))
} else {
None
}
@ -261,7 +261,7 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
var retryCount = 0
var exception: IOException = null
val suggestedMountPoint = location.getStorageInfo.getMountPoint
while (retryCount < RssConf.createFileWriterRetryCount(conf)) {
while (retryCount < conf.createWriterCreateMaxAttempts) {
val diskInfo = diskInfos.get(suggestedMountPoint)
val dirs =
if (diskInfo != null && diskInfo.status.equals(DiskStatus.HEALTHY)) {
@ -523,7 +523,7 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract
diskOperators.size()) { entry =>
ThreadUtils.shutdown(
entry._2,
conf.workerDiskFlusherShutdownTimeoutMs.milliseconds)
conf.workerFlusherShutdownTimeoutMs.milliseconds)
}
}
storageScheduler.shutdownNow()