From 5c761a8df3ffab7add287c8ffb122574f4e25796 Mon Sep 17 00:00:00 2001 From: Ethan Feng Date: Thu, 20 Oct 2022 15:23:17 +0800 Subject: [PATCH] [ISSUE-813][Refactor] Refactor flusher configurations. (#813) * Refactor flusher configurations. * Refactor flusher configurations. * Update. * remove brackets. * update docs. * rename. * update. * update docs. * update. * update. * update. * update. * update. * update. * update. * format. * update. * update. --- .../celeborn/client/LifecycleManager.scala | 2 +- .../org/apache/celeborn/common/RssConf.scala | 348 +++++++++++------- .../apache/celeborn/common/RssConfSuite.scala | 8 +- docs/configuration/client.md | 1 + docs/configuration/master.md | 1 - docs/configuration/metrics.md | 1 - docs/configuration/worker.md | 18 +- .../service/deploy/master/Master.scala | 4 +- pom.xml | 1 + .../deploy/worker/storage/FileWriter.java | 14 +- .../service/deploy/worker/Controller.scala | 8 +- .../deploy/worker/PushDataHandler.scala | 10 +- .../service/deploy/worker/Worker.scala | 2 +- .../deploy/worker/storage/DeviceMonitor.scala | 23 +- .../deploy/worker/storage/Flusher.scala | 24 +- .../worker/storage/StorageManager.scala | 18 +- 16 files changed, 286 insertions(+), 197 deletions(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 17b1574f0..89537f1f1 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -1210,7 +1210,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit candidates: List[WorkerInfo], slots: WorkerResource): Boolean = { var requestSlots = slots - val maxRetryTimes = RssConf.reserveSlotsMaxRetry(conf) + val maxRetryTimes = RssConf.reserveSlotsMaxRetries(conf) val retryWaitInterval = RssConf.reserveSlotsRetryWait(conf) var retryTimes = 1 var noAvailableSlots = false diff --git a/common/src/main/scala/org/apache/celeborn/common/RssConf.scala b/common/src/main/scala/org/apache/celeborn/common/RssConf.scala index fc3aea043..dd912e7ec 100644 --- a/common/src/main/scala/org/apache/celeborn/common/RssConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/RssConf.scala @@ -24,6 +24,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import scala.collection.JavaConverters._ import scala.util.Try +import org.apache.celeborn.common.RssConf.log import org.apache.celeborn.common.identity.DefaultIdentityProvider import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.internal.config._ @@ -369,7 +370,85 @@ class RssConf(loadDefaults: Boolean) extends Cloneable with Logging with Seriali def checkSlotsFinishedTimeoutMs: Long = get(WORKER_CHECK_SLOTS_FINISHED_TIMEOUT) def workerRecoverPath: String = get(WORKER_RECOVER_PATH) def partitionSorterCloseAwaitTimeMs: Long = get(PARTITION_SORTER_SHUTDOWN_TIMEOUT) - def workerDiskFlusherShutdownTimeoutMs: Long = get(WORKER_FLUSHER_SHUTDOWN_TIMEOUT) + def workerFlusherShutdownTimeoutMs: Long = get(WORKER_FLUSHER_SHUTDOWN_TIMEOUT) + def shuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT) + def writerCloseTimeoutMs: Long = get(WORKER_WRITER_CLOSE_TIMEOUT) + def hddFlusherThreads: Int = get(WORKER_FLUSHER_HDD_THREADS) + def ssdFlusherThreads: Int = get(WORKER_FLUSHER_SSD_THREADS) + def hdfsFlusherThreads: Int = get(WORKER_FLUSHER_HDFS_THREADS) + def avgFlushTimeSlidingWindowSize: Int = get(WORKER_FLUSHER_AVGFLUSHTIME_SLIDINGWINDOW_SIZE) + def avgFlushTimeSlidingWindowMinCount: Int = + get(WORKER_FLUSHER_AVGFLUSHTIME_SLIDINGWINDOW_MINCOUNT) + def diskReserveSize: Long = get(WORKER_DISK_RESERVE_SIZE) + def deviceMonitorEnabled: Boolean = get(WORKER_DEVICE_MONITOR_ENABLED) + def deviceMonitorCheckList: Seq[String] = get(WORKER_DEVICE_MONITOR_CHECKLIST) + def diskCheckInterval: Long = get(WORKER_DISK_CHECK_INTERVAL) + def sysBlockDir: String = get(WORKER_DEVICEMONITOR_SYS_BLOCKDIR) + def createWriterCreateMaxAttempts: Int = get(WORKER_WRITER_CREATE_MAXATTEMPTS) + def workerStorageBaseDirPrefix: String = get(WORKER_STORAGE_BASE_DIR_PREFIX) + def workerStorageBaseDirNumber: Int = get(WORKER_STORAGE_BASE_DIR_COUNT) + + /** + * @return workingDir, usable space, flusher thread count, disk type + * check more details at CONFIGURATION_GUIDE.md + */ + def workerBaseDirs: Seq[(String, Long, Int, Type)] = { + // I assume there is no disk is bigger than 1 PB in recent days. + val defaultMaxCapacity = Utils.byteStringAsBytes("1PB") + get(WORKER_STORAGE_DIRS).map { storageDirs: Seq[String] => + storageDirs.map { str => + var maxCapacity = defaultMaxCapacity + var diskType = HDD + var flushThread = -1 + val (dir, attributes) = str.split(":").toList match { + case _dir :: tail => (_dir, tail) + case nil => throw new IllegalArgumentException(s"Illegal storage dir: $nil") + } + attributes.foreach { + case capacityStr if capacityStr.toLowerCase.startsWith("capacity=") => + maxCapacity = Utils.byteStringAsBytes(capacityStr.split("=")(1)) + case diskTypeStr if diskTypeStr.toLowerCase.startsWith("disktype=") => + diskType = Type.valueOf(diskTypeStr.split("=")(1)) + if (diskType == Type.MEMORY) { + throw new IOException(s"Invalid diskType: $diskType") + } + case threadCountStr if threadCountStr.toLowerCase.startsWith("flushthread=") => + flushThread = threadCountStr.split("=")(1).toInt + case illegal => + throw new IllegalArgumentException(s"Illegal attribute: $illegal") + } + if (flushThread == -1) { + flushThread = diskType match { + case HDD => hddFlusherThreads + case SSD => ssdFlusherThreads + } + } + (dir, maxCapacity, flushThread, diskType) + } + }.getOrElse { + val prefix = workerStorageBaseDirPrefix + val number = workerStorageBaseDirNumber + (1 to number).map { i => + (s"$prefix$i", defaultMaxCapacity, hddFlusherThreads, HDD) + } + } + } + + def partitionSplitMinimumSize: Long = { + getSizeAsBytes("rss.partition.split.minimum.size", "1m") + } + + def hdfsDir: String = { + get(HDFS_DIR).map { + hdfsDir => + if (!Utils.isHdfsPath(hdfsDir)) { + log.error(s"${HDFS_DIR.key} configuration is wrong $hdfsDir. Disable HDFS support.") + "" + } else { + hdfsDir + } + }.getOrElse("") + } } object RssConf extends Logging { @@ -748,7 +827,7 @@ object RssConf extends Logging { def registerShuffleRetryWait(conf: RssConf): Long = conf.get(SHUFFLE_REGISTER_RETRY_WAIT) - def reserveSlotsMaxRetry(conf: RssConf): Int = conf.get(RESERVE_SLOTS_MAX_RETRIES) + def reserveSlotsMaxRetries(conf: RssConf): Int = conf.get(RESERVE_SLOTS_MAX_RETRIES) def reserveSlotsRetryWait(conf: RssConf): Long = conf.get(RESERVE_SLOTS_RETRY_WAIT) @@ -1110,29 +1189,38 @@ object RssConf extends Logging { .toSequence .createOptional - val WORKER_FLUSH_BUFFER_SIZE: ConfigEntry[Long] = - buildConf("celeborn.worker.flush.buffer.size") + val WORKER_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] = + buildConf("celeborn.worker.flusher.buffer.size") .withAlternative("rss.worker.flush.buffer.size") .categories("worker") .doc("Size of buffer used by a single flusher.") .bytesConf(ByteUnit.BYTE) .createWithDefaultString("256k") - def workerHeartbeatTimeoutMs(conf: RssConf): Long = conf.get(WORKER_HEARTBEAT_TIMEOUT) def workerReplicateThreads(conf: RssConf): Int = conf.get(WORKER_REPLICATE_THREADS) def workerCommitThreads(conf: RssConf): Int = conf.get(WORKER_COMMIT_THREADS) - def workerFlushBufferSize(conf: RssConf): Long = conf.get(WORKER_FLUSH_BUFFER_SIZE) + def workerFlusherBufferSize(conf: RssConf): Long = conf.get(WORKER_FLUSHER_BUFFER_SIZE) - def flushTimeout(conf: RssConf): Long = { - conf.getTimeAsSeconds("rss.flush.timeout", "120s") - } + val WORKER_SHUFFLE_COMMIT_TIMEOUT: ConfigEntry[Long] = + buildConf("celeborn.worker.shuffle.commit.timeout") + .withAlternative("rss.flush.timeout") + .categories("worker") + .doc("Timeout for a Celeborn worker to commit a shuffle.") + .version("0.2.0") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("120s") - def fileWriterTimeoutMs(conf: RssConf): Long = { - conf.getTimeAsMs("rss.filewriter.timeout", "120s") - } + val WORKER_WRITER_CLOSE_TIMEOUT: ConfigEntry[Long] = + buildConf("celeborn.worker.writer.close.timeout") + .withAlternative("rss.filewriter.timeout") + .categories("worker") + .doc("Timeout for a file writer to close") + .version("0.2.0") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("120s") def appExpireDurationMs(conf: RssConf): Long = { conf.getTimeAsMs("rss.expire.nonEmptyDir.duration", "1d") @@ -1142,63 +1230,71 @@ object RssConf extends Logging { conf.get("rss.worker.workingDirName", "hadoop/rss-worker/shuffle_data") } - /** - * @return workingDir, usable space, flusher thread count, disk type - * check more details at CONFIGURATION_GUIDE.md - */ - def workerBaseDirs(conf: RssConf): Seq[(String, Long, Int, Type)] = { - // I assume there is no disk is bigger than 1 PB in recent days. - val defaultMaxCapacity = Utils.byteStringAsBytes("1PB") - conf.get(WORKER_STORAGE_DIRS).map { storageDirs: Seq[String] => - storageDirs.map { str => - var maxCapacity = defaultMaxCapacity - var diskType = HDD - var flushThread = -1 - val (dir, attributes) = str.split(":").toList match { - case _dir :: tail => (_dir, tail) - case nil => throw new IllegalArgumentException(s"Illegal storage dir: $nil") - } - attributes.foreach { - case capacityStr if capacityStr.toLowerCase.startsWith("capacity=") => - maxCapacity = Utils.byteStringAsBytes(capacityStr.split("=")(1)) - case diskTypeStr if diskTypeStr.toLowerCase.startsWith("disktype=") => - diskType = Type.valueOf(diskTypeStr.split("=")(1)) - if (diskType == Type.MEMORY) { - throw new IOException(s"Invalid diskType: $diskType") - } - case threadCountStr if threadCountStr.toLowerCase.startsWith("flushthread=") => - flushThread = threadCountStr.split("=")(1).toInt - case illegal => - throw new IllegalArgumentException(s"Illegal attribute: $illegal") - } - if (flushThread == -1) { - flushThread = diskType match { - case HDD => HDDFlusherThread(conf) - case SSD => SSDFlusherThread(conf) - } - } - (dir, maxCapacity, flushThread, diskType) - } - }.getOrElse { - val prefix = RssConf.workerBaseDirPrefix(conf) - val number = RssConf.workerBaseDirNumber(conf) - (1 to number).map { i => - (s"$prefix$i", defaultMaxCapacity, HDDFlusherThread(conf), HDD) - } - } - } + val WORKER_FLUSHER_HDD_THREADS: ConfigEntry[Int] = + buildConf("celeborn.worker.flusher.hdd.threads") + .withAlternative("rss.flusher.hdd.thread.count") + .categories("worker") + .doc("Flusher's thread count used for write data to HDD disks.") + .version("0.2.0") + .intConf + .createWithDefault(1) - def HDDFlusherThread(conf: RssConf): Int = { - conf.getInt("rss.flusher.hdd.thread.count", 1) - } + val WORKER_FLUSHER_SSD_THREADS: ConfigEntry[Int] = + buildConf("celeborn.worker.flusher.ssd.threads") + .withAlternative("rss.flusher.hdd.thread.count") + .categories("worker") + .doc("Flusher's thread count used for write data to SSD disks.") + .version("0.2.0") + .intConf + .createWithDefault(8) - def SSDFlusherThread(conf: RssConf): Int = { - conf.getInt("rss.flusher.ssd.thread.count", 8) - } + val WORKER_FLUSHER_HDFS_THREADS: ConfigEntry[Int] = + buildConf("celeborn.worker.flusher.hdfs.threads") + .withAlternative("rss.worker.hdfs.flusher.thread.count") + .categories("worker") + .doc("Flusher's thread count used for write data to HDFS.") + .version("0.2.0") + .intConf + .createWithDefault(4) - def diskMinimumReserveSize(conf: RssConf): Long = { - Utils.byteStringAsBytes(conf.get("rss.disk.minimum.reserve.size", "5G")) - } + val WORKER_FLUSHER_SHUTDOWN_TIMEOUT: ConfigEntry[Long] = + buildConf("celeborn.worker.flusher.shutdown.timeout") + .withAlternative("rss.worker.diskFlusherShutdownTimeoutMs") + .categories("worker") + .doc("Timeout for a flusher to shutdown.") + .version("0.2.0") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("3s") + + val WORKER_DISK_RESERVE_SIZE: ConfigEntry[Long] = + buildConf("celeborn.worker.disk.reserve.size") + .withAlternative("rss.disk.minimum.reserve.size") + .categories("worker") + .doc("Celeborn worker reserved space for each disk.") + .version("0.2.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("5G") + + val WORKER_FLUSHER_AVGFLUSHTIME_SLIDINGWINDOW_SIZE: ConfigEntry[Int] = + buildConf("celeborn.worker.flusher.avgFlushTime.slidingWindow.size") + .withAlternative("rss.flusher.avg.time.window") + .categories("worker") + .doc("The minimum flush count to enter a sliding window" + + " to calculate statistics about flushed time and count.") + .version("0.2.0") + .intConf + .createWithDefault(20) + + val WORKER_FLUSHER_AVGFLUSHTIME_SLIDINGWINDOW_MINCOUNT: ConfigEntry[Int] = + buildConf("celeborn.worker.flusher.avgFlushTime.slidingWindow.minCount") + .withAlternative("rss.flusher.avg.time.minimum.count") + .categories("worker") + .doc("The minimum flush count to enter a sliding window" + + " to calculate statistics about flushed time and count.") + .version("0.2.0") + .internal + .intConf + .createWithDefault(1000) /** * @return This configuration is a guidance for load-aware slot allocation algorithm. This value @@ -1228,14 +1324,6 @@ object RssConf extends Logging { Utils.timeStringAsMs(conf.get("rss.partition.size.update.interval", "10m")) } - def workerBaseDirPrefix(conf: RssConf): String = { - conf.get("rss.worker.base.dir.prefix", "/mnt/disk") - } - - def workerBaseDirNumber(conf: RssConf): Int = { - conf.getInt("rss.worker.base.dir.number", 16) - } - def stageEndTimeout(conf: RssConf): Long = { conf.getTimeAsMs("rss.stage.end.timeout", "240s") } @@ -1282,13 +1370,6 @@ object RssConf extends Logging { .booleanConf .createWithDefault(true) - val METRICS_TIMER_SLIDING_SIZE: ConfigEntry[Int] = - buildConf("celeborn.metrics.timer.sliding.size") - .withAlternative("rss.metrics.system.timer.sliding.size") - .categories("master", "worker", "metrics") - .intConf - .createWithDefault(4000) - val METRICS_SAMPLE_RATE: ConfigEntry[Double] = buildConf("celeborn.metrics.sample.rate") .withAlternative("rss.metrics.system.sample.rate") @@ -1336,8 +1417,6 @@ object RssConf extends Logging { def metricsSystemEnable(conf: RssConf): Boolean = conf.get(METRICS_ENABLED) - def metricsTimerSlidingSize(conf: RssConf): Int = conf.get(METRICS_TIMER_SLIDING_SIZE) - def metricsSampleRate(conf: RssConf): Double = conf.get(METRICS_SAMPLE_RATE) def metricsSamplePerfCritical(conf: RssConf): Boolean = { @@ -1420,25 +1499,48 @@ object RssConf extends Logging { .toSequence .createWithDefaultString("readwrite,diskusage") - def deviceMonitorEnabled(conf: RssConf): Boolean = conf.get(WORKER_DEVICE_MONITOR_ENABLED) + val WORKER_DISK_CHECK_INTERVAL: ConfigEntry[Long] = + buildConf("celeborn.worker.deviceMonitor.check.interval") + .withAlternative("rss.disk.check.interval") + .categories("worker") + .doc("Intervals between device monitor to check disk.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("60s") - def deviceMonitorCheckList(conf: RssConf): Seq[String] = conf.get(WORKER_DEVICE_MONITOR_CHECKLIST) + val WORKER_DEVICEMONITOR_SYS_BLOCKDIR: ConfigEntry[String] = + buildConf("celeborn.worker.deviceMonitor.sys.block.dir") + .withAlternative("rss.sys.block.dir") + .categories("worker") + .doc("The directory where linux file block information is stored.") + .stringConf + .createWithDefault("/sys/block") - def diskCheckIntervalMs(conf: RssConf): Long = { - conf.getTimeAsMs("rss.disk.check.interval", "60s") - } + val WORKER_WRITER_CREATE_MAXATTEMPTS: ConfigEntry[Int] = + buildConf("celeborn.worker.writer.create.maxAttempts") + .withAlternative("rss.create.file.writer.retry.count") + .categories("worker") + .doc("Retry count for a file writer to create if its creation was failed.") + .intConf + .createWithDefault(3) - def slowFlushIntervalMs(conf: RssConf): Long = { - conf.getTimeAsMs("rss.slow.flush.interval", "10s") - } + val WORKER_STORAGE_BASE_DIR_PREFIX: ConfigEntry[String] = + buildConf("celeborn.worker.storage.base.dir.prefix") + .withAlternative("rss.worker.base.dir.prefix") + .categories("worker") + .doc("Base directory for Celeborn worker to write if \'base.dir\' is not set.") + .stringConf + .createWithDefault("/mnt/disk") - def sysBlockDir(conf: RssConf): String = { - conf.get("rss.sys.block.dir", "/sys/block") - } - - def createFileWriterRetryCount(conf: RssConf): Int = { - conf.getInt("rss.create.file.writer.retry.count", 3) - } + val WORKER_STORAGE_BASE_DIR_COUNT: ConfigEntry[Int] = + buildConf("celeborn.worker.storage.base.dir.number") + .withAlternative("rss.worker.base.dir.number") + .categories("worker") + .doc("How many directories will be create if \'base.dir\' is not set. " + + "The directory name is a combination of \'dir.prefix\' " + + "and from zero to \"dir.number\" step by one. " + + "No sub directory will be created.") + .intConf + .createWithDefault(16) def workerStatusCheckTimeout(conf: RssConf): Long = { conf.getTimeAsSeconds("rss.worker.status.check.timeout", "30s") @@ -1476,10 +1578,6 @@ object RssConf extends Logging { conf.getSizeAsBytes("rss.partition.split.threshold", "256m") } - def partitionSplitMinimumSize(conf: RssConf): Long = { - conf.getSizeAsBytes("rss.partition.split.minimum.size", "1m") - } - def batchHandleChangePartitionEnabled(conf: RssConf): Boolean = { conf.getBoolean("rss.change.partition.batch.enabled", false) } @@ -1597,7 +1695,8 @@ object RssConf extends Logging { buildConf("celeborn.worker.graceful.shutdown.checkSlotsFinished.interval") .withAlternative("rss.worker.checkSlots.interval") .categories("worker") - .doc("The wait interval of checking whether all released slots to be committed or destroyed during worker graceful shutdown") + .doc("The wait interval of checking whether all released slots " + + "to be committed or destroyed during worker graceful shutdown") .version("0.2.0") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1s") @@ -1606,7 +1705,8 @@ object RssConf extends Logging { buildConf("celeborn.worker.graceful.shutdown.checkSlotsFinished.timeout") .withAlternative("rss.worker.checkSlots.timeout") .categories("worker") - .doc("The wait time of waiting for the released slots to be committed or destroyed during worker graceful shutdown.") + .doc("The wait time of waiting for the released slots" + + " to be committed or destroyed during worker graceful shutdown.") .version("0.2.0") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("480s") @@ -1625,20 +1725,12 @@ object RssConf extends Logging { buildConf("celeborn.worker.graceful.shutdown.partitionSorter.shutdownTimeout") .withAlternative("rss.worker.partitionSorterCloseAwaitTime") .categories("worker") - .doc("The wait time of waiting for sorting partition files during worker graceful shutdown.") + .doc("The wait time of waiting for sorting partition files" + + " during worker graceful shutdown.") .version("0.2.0") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("120s") - val WORKER_FLUSHER_SHUTDOWN_TIMEOUT: ConfigEntry[Long] = - buildConf("celeborn.worker.graceful.shutdown.flusher.shutdownTimeout") - .withAlternative("rss.worker.diskFlusherShutdownTimeoutMs") - .categories("worker") - .doc("The timeout to wait for diskOperators to execute remaining jobs before being shutdown immediately.") - .version("0.2.0") - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefaultString("3s") - def offerSlotsAlgorithm(conf: RssConf): String = { var algorithm = conf.get("rss.offer.slots.algorithm", "roundrobin") if (algorithm != "loadaware" && algorithm != "roundrobin") { @@ -1649,27 +1741,13 @@ object RssConf extends Logging { algorithm } - def flushAvgTimeWindow(conf: RssConf): Int = { - conf.getInt("rss.flusher.avg.time.window", 20); - } - - def flushAvgTimeMinimumCount(conf: RssConf): Int = { - conf.getInt("rss.flusher.avg.time.minimum.count", 1000); - } - - def hdfsDir(conf: RssConf): String = { - val hdfsDir = conf.get("rss.worker.hdfs.dir", "") - if (hdfsDir.nonEmpty && !Utils.isHdfsPath(hdfsDir)) { - log.error(s"rss.worker.hdfs.dir configuration is wrong $hdfsDir. Disable hdfs support.") - "" - } else { - hdfsDir - } - } - - def hdfsFlusherThreadCount(conf: RssConf): Int = { - conf.getInt("rss.worker.hdfs.flusher.thread.count", 4) - } + val HDFS_DIR: OptionalConfigEntry[String] = + buildConf("celeborn.storage.hdfs.dir") + .withAlternative("rss.worker.hdfs.dir") + .categories("worker", "client") + .doc("HDFS dir configuration for Celeborn to access HDFS.") + .stringConf + .createOptional def rangeReadFilterEnabled(conf: RssConf): Boolean = { conf.getBoolean("rss.range.read.filter.enabled", false) diff --git a/common/src/test/scala/org/apache/celeborn/common/RssConfSuite.scala b/common/src/test/scala/org/apache/celeborn/common/RssConfSuite.scala index bee34a61a..56d57619b 100644 --- a/common/src/test/scala/org/apache/celeborn/common/RssConfSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/RssConfSuite.scala @@ -36,7 +36,7 @@ class RssConfSuite extends RssFunSuite { val conf = new RssConf() val defaultMaxUsableSpace = 1024L * 1024 * 1024 * 1024 * 1024 conf.set("celeborn.worker.storage.dirs", "/mnt/disk1") - val parsedDirs = RssConf.workerBaseDirs(conf) + val parsedDirs = conf.workerBaseDirs assert(parsedDirs.size == 1) assert(parsedDirs.head._3 == 1) assert(parsedDirs.head._2 == defaultMaxUsableSpace) @@ -46,7 +46,7 @@ class RssConfSuite extends RssFunSuite { val conf = new RssConf() val defaultMaxUsableSpace = 1024L * 1024 * 1024 * 1024 * 1024 conf.set("celeborn.worker.storage.dirs", "/mnt/disk1:disktype=SSD:capacity=10g") - val parsedDirs = RssConf.workerBaseDirs(conf) + val parsedDirs = conf.workerBaseDirs assert(parsedDirs.size == 1) assert(parsedDirs.head._3 == 8) assert(parsedDirs.head._2 == 10 * 1024 * 1024 * 1024L) @@ -55,7 +55,7 @@ class RssConfSuite extends RssFunSuite { test("basedir test3") { val conf = new RssConf() conf.set("celeborn.worker.storage.dirs", "/mnt/disk1:disktype=SSD:capacity=10g:flushthread=3") - val parsedDirs = RssConf.workerBaseDirs(conf) + val parsedDirs = conf.workerBaseDirs assert(parsedDirs.size == 1) assert(parsedDirs.head._3 == 3) assert(parsedDirs.head._2 == 10 * 1024 * 1024 * 1024L) @@ -67,7 +67,7 @@ class RssConfSuite extends RssFunSuite { "celeborn.worker.storage.dirs", "/mnt/disk1:disktype=SSD:capacity=10g:flushthread=3," + "/mnt/disk2:disktype=HDD:capacity=15g:flushthread=7") - val parsedDirs = RssConf.workerBaseDirs(conf) + val parsedDirs = conf.workerBaseDirs assert(parsedDirs.size == 2) assert(parsedDirs.head._1 == "/mnt/disk1") assert(parsedDirs.head._3 == 3) diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 4ec14ebfb..d5e17cb57 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -33,5 +33,6 @@ license: | | celeborn.shuffle.writer.mode | `hash` | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count it huge. | 0.2.0 | | celeborn.slot.reserve.maxRetries | `3` | Max retry times for client to reserve slots. | | | celeborn.slot.reserve.retryWait | `3s` | Wait time before next retry if reserve slots failed. | | +| celeborn.storage.hdfs.dir | `` | HDFS dir configuration for Celeborn to access HDFS. | | | celeborn.worker.excluded.interval | `30s` | Interval for client to refresh excluded worker list. | | diff --git a/docs/configuration/master.md b/docs/configuration/master.md index ab3c22b4d..79c40e96c 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -29,7 +29,6 @@ license: | | celeborn.master.port | `9097` | Port for master to bind. | 0.2.0 | | celeborn.metrics.enabled | `true` | When true, enable metrics system. | | | celeborn.metrics.sample.rate | `1.0` | | | -| celeborn.metrics.timer.sliding.size | `4000` | | | | celeborn.metrics.timer.sliding.window.size | `4096` | | | | celeborn.worker.heartbeat.timeout | `120s` | Worker heartbeat timeout. | | diff --git a/docs/configuration/metrics.md b/docs/configuration/metrics.md index d5c07baf9..086f9850c 100644 --- a/docs/configuration/metrics.md +++ b/docs/configuration/metrics.md @@ -20,7 +20,6 @@ license: | | celeborn.master.metrics.prometheus.port | `9098` | | | | celeborn.metrics.enabled | `true` | When true, enable metrics system. | | | celeborn.metrics.sample.rate | `1.0` | | | -| celeborn.metrics.timer.sliding.size | `4000` | | | | celeborn.metrics.timer.sliding.window.size | `4096` | | | | celeborn.worker.metrics.prometheus.host | `0.0.0.0` | | | | celeborn.worker.metrics.prometheus.port | `9096` | | | diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index a6b62b04a..b5052ac6f 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -19,17 +19,24 @@ license: | | celeborn.master.endpoints | `:9097` | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `:[,:]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 | | celeborn.metrics.enabled | `true` | When true, enable metrics system. | | | celeborn.metrics.sample.rate | `1.0` | | | -| celeborn.metrics.timer.sliding.size | `4000` | | | | celeborn.metrics.timer.sliding.window.size | `4096` | | | | celeborn.shuffle.chuck.size | `8m` | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | | +| celeborn.storage.hdfs.dir | `` | HDFS dir configuration for Celeborn to access HDFS. | | | celeborn.worker.commit.threads | `32` | Thread number of worker to commit shuffle data files asynchronously. | | +| celeborn.worker.deviceMonitor.check.interval | `60s` | Intervals between device monitor to check disk. | | | celeborn.worker.deviceMonitor.checklist | `readwrite,diskusage` | Select what the device needs to detect, available items are: iohang, readwrite and diskusage. | | | celeborn.worker.deviceMonitor.enabled | `true` | When true, worker will monitor device and report to master. | | -| celeborn.worker.flush.buffer.size | `256k` | Size of buffer used by a single flusher. | | +| celeborn.worker.deviceMonitor.sys.block.dir | `/sys/block` | The directory where linux file block information is stored. | | +| celeborn.worker.disk.reserve.size | `5G` | Celeborn worker reserved space for each disk. | 0.2.0 | +| celeborn.worker.flusher.avgFlushTime.slidingWindow.size | `20` | The minimum flush count to enter a sliding window to calculate statistics about flushed time and count. | 0.2.0 | +| celeborn.worker.flusher.buffer.size | `256k` | Size of buffer used by a single flusher. | | +| celeborn.worker.flusher.hdd.threads | `1` | Flusher's thread count used for write data to HDD disks. | 0.2.0 | +| celeborn.worker.flusher.hdfs.threads | `4` | Flusher's thread count used for write data to HDFS. | 0.2.0 | +| celeborn.worker.flusher.shutdown.timeout | `3s` | Timeout for a flusher to shutdown. | 0.2.0 | +| celeborn.worker.flusher.ssd.threads | `8` | Flusher's thread count used for write data to SSD disks. | 0.2.0 | | celeborn.worker.graceful.shutdown.checkSlotsFinished.interval | `1s` | The wait interval of checking whether all released slots to be committed or destroyed during worker graceful shutdown | 0.2.0 | | celeborn.worker.graceful.shutdown.checkSlotsFinished.timeout | `480s` | The wait time of waiting for the released slots to be committed or destroyed during worker graceful shutdown. | 0.2.0 | | celeborn.worker.graceful.shutdown.enabled | `false` | When true, during worker shutdown, the worker will wait for all released slots to be committed or destroyed. | 0.2.0 | -| celeborn.worker.graceful.shutdown.flusher.shutdownTimeout | `3s` | The timeout to wait for diskOperators to execute remaining jobs before being shutdown immediately. | 0.2.0 | | celeborn.worker.graceful.shutdown.partitionSorter.shutdownTimeout | `120s` | The wait time of waiting for sorting partition files during worker graceful shutdown. | 0.2.0 | | celeborn.worker.graceful.shutdown.recoverPath | `/recover` | The path to store levelDB. | 0.2.0 | | celeborn.worker.graceful.shutdown.timeout | `600s` | The worker's graceful shutdown timeout time. | 0.2.0 | @@ -37,5 +44,10 @@ license: | | celeborn.worker.metrics.prometheus.host | `0.0.0.0` | | | | celeborn.worker.metrics.prometheus.port | `9096` | | | | celeborn.worker.replicate.threads | `64` | Thread number of worker to replicate shuffle data. | | +| celeborn.worker.shuffle.commit.timeout | `120s` | Timeout for a Celeborn worker to commit a shuffle. | 0.2.0 | +| celeborn.worker.storage.base.dir.number | `16` | How many directories will be create if 'base.dir' is not set. The directory name is a combination of 'dir.prefix' and from zero to "dir.number" step by one. No sub directory will be created. | | +| celeborn.worker.storage.base.dir.prefix | `/mnt/disk` | Base directory for Celeborn worker to write if 'base.dir' is not set. | | | celeborn.worker.storage.dirs | `` | Directory list to store shuffle data. It's recommended to configure one directory on each disk. Storage size limit can be set for each directory. For the sake of performance, there should be no more than 2 flush threads on the same disk partition if you are using HDD, and should be 8 or more flush threads on the same disk partition if you are using SSD. For example: dir1[:capacity=][:disktype=][:flushthread=],dir2[:capacity=][:disktype=][:flushthread=] | | +| celeborn.worker.writer.close.timeout | `120s` | Timeout for a file writer to close | 0.2.0 | +| celeborn.worker.writer.create.maxAttempts | `3` | Retry count for a file writer to create if its creation was failed. | | diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index aae03b02f..4bf6cb65d 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -103,7 +103,7 @@ private[celeborn] class Master( private def workersSnapShot: util.List[WorkerInfo] = statusSystem.workers.synchronized(new util.ArrayList[WorkerInfo](statusSystem.workers)) - private def minimumUsableSize = RssConf.diskMinimumReserveSize(conf) + private def diskReserveSize = conf.diskReserveSize private def diskGroups = RssConf.diskGroups(conf) @@ -498,7 +498,7 @@ private[celeborn] class Master( workersNotBlacklisted(), requestSlots.partitionIdList, requestSlots.shouldReplicate, - minimumUsableSize, + diskReserveSize, diskGroups, diskGroupGradient) } diff --git a/pom.xml b/pom.xml index afd00ea2d..14cba5914 100644 --- a/pom.xml +++ b/pom.xml @@ -574,6 +574,7 @@ 1 false false + false diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java index 6f12e2f0f..0b7c6694e 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java @@ -66,9 +66,9 @@ public final class FileWriter implements DeviceObserver { private CompositeByteBuf flushBuffer; private final long chunkSize; - private final long timeoutMs; + private final long writerCloseTimeoutMs; - private final long flushBufferSize; + private final long flusherBufferSize; private final DeviceMonitor deviceMonitor; private final AbstractSource source; // metrics @@ -109,9 +109,9 @@ public final class FileWriter implements DeviceObserver { this.flushWorkerIndex = flusher.getWorkerIndex(); this.chunkSize = RssConf.shuffleChunkSize(rssConf); this.nextBoundary = this.chunkSize; - this.timeoutMs = RssConf.fileWriterTimeoutMs(rssConf); + this.writerCloseTimeoutMs = rssConf.writerCloseTimeoutMs(); this.splitThreshold = splitThreshold; - this.flushBufferSize = RssConf.workerFlushBufferSize(rssConf); + this.flusherBufferSize = RssConf.workerFlusherBufferSize(rssConf); this.deviceMonitor = deviceMonitor; this.splitMode = splitMode; this.partitionType = partitionType; @@ -212,7 +212,7 @@ public final class FileWriter implements DeviceObserver { mapIdBitMap.add(mapId); } if (flushBuffer.readableBytes() != 0 - && flushBuffer.readableBytes() + numBytes >= this.flushBufferSize) { + && flushBuffer.readableBytes() + numBytes >= this.flusherBufferSize) { flush(false); takeBuffer(); } @@ -344,7 +344,7 @@ public final class FileWriter implements DeviceObserver { } private void waitOnNoPending(AtomicInteger counter) throws IOException { - long waitTime = timeoutMs; + long waitTime = writerCloseTimeoutMs; while (counter.get() > 0 && waitTime > 0) { try { notifier.checkException(); @@ -390,7 +390,7 @@ public final class FileWriter implements DeviceObserver { } private void addTask(FlushTask task) throws IOException { - if (!flusher.addTask(task, timeoutMs, flushWorkerIndex)) { + if (!flusher.addTask(task, writerCloseTimeoutMs, flushWorkerIndex)) { IOException e = new IOException("Add flush task timeout."); notifier.setException(e); throw e; diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala index 6145b19f9..0634e9439 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala @@ -399,18 +399,18 @@ private[deploy] class Controller( if (future != null) { val result = new AtomicReference[CompletableFuture[Unit]]() - val flushTimeout = RssConf.flushTimeout(conf) + val shuffleCommitTimeout = conf.shuffleCommitTimeout val timeout = timer.newTimeout( new TimerTask { override def run(timeout: Timeout): Unit = { if (result.get() != null) { result.get().cancel(true) - logWarning(s"After waiting $flushTimeout s, cancel all commit file jobs.") + logWarning(s"After waiting $shuffleCommitTimeout s, cancel all commit file jobs.") } } }, - flushTimeout, + shuffleCommitTimeout, TimeUnit.SECONDS) result.set(future.handleAsync( @@ -427,7 +427,7 @@ private[deploy] class Controller( Thread.currentThread().interrupt() throw ie case _: TimeoutException => - logWarning(s"While handling commitFiles, timeout after $flushTimeout s.") + logWarning(s"While handling commitFiles, timeout after $shuffleCommitTimeout s.") case throwable: Throwable => logError("While handling commitFiles, exception occurs.", throwable) } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index aad8a714a..11f9729f5 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -49,7 +49,7 @@ class PushDataHandler extends BaseMessageHandler with Logging { var pushClientFactory: TransportClientFactory = _ var registered: AtomicBoolean = _ var workerInfo: WorkerInfo = _ - var diskMinimumReserveSize: Long = _ + var diskReserveSize: Long = _ var partitionSplitMinimumSize: Long = _ def init(worker: Worker): Unit = { @@ -62,10 +62,10 @@ class PushDataHandler extends BaseMessageHandler with Logging { pushClientFactory = worker.pushClientFactory registered = worker.registered workerInfo = worker.workerInfo - diskMinimumReserveSize = RssConf.diskMinimumReserveSize(worker.conf) - partitionSplitMinimumSize = RssConf.partitionSplitMinimumSize(worker.conf) + diskReserveSize = worker.conf.diskReserveSize + partitionSplitMinimumSize = worker.conf.partitionSplitMinimumSize - logInfo(s"diskMinimumReserveSize $diskMinimumReserveSize") + logInfo(s"diskReserveSize $diskReserveSize") } override def receive(client: TransportClient, msg: RequestMessage): Unit = @@ -213,7 +213,7 @@ class PushDataHandler extends BaseMessageHandler with Logging { } val diskFull = workerInfo.diskInfos .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint) - .actualUsableSpace < diskMinimumReserveSize + .actualUsableSpace < diskReserveSize if ((diskFull && fileWriter.getFileInfo.getFileLength > partitionSplitMinimumSize) || (isMaster && fileWriter.getFileInfo.getFileLength > fileWriter.getSplitThreshold())) { if (fileWriter.getSplitMode == PartitionSplitMode.SOFT) { diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index eb51f11a4..12d0a7c15 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -455,7 +455,7 @@ private[deploy] object Worker extends Logging { // address of the Master should be used in the end. workerArgs.master.foreach { master => conf.set( - "celeborn.master.endpoints", + MASTER_ENDPOINTS.key, RpcAddress.fromRssURL(master).toString.replace("rss://", "")) } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala index 2f079b3e6..0fc0b607c 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala @@ -30,7 +30,6 @@ import org.apache.commons.io.FileUtils import org.slf4j.LoggerFactory import org.apache.celeborn.common.RssConf -import org.apache.celeborn.common.RssConf.{deviceMonitorCheckList, diskCheckIntervalMs} import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, DiskStatus} import org.apache.celeborn.common.util.ThreadUtils import org.apache.celeborn.common.util.Utils._ @@ -62,7 +61,7 @@ class LocalDeviceMonitor( } val observers: jSet[DeviceObserver] = ConcurrentHashMap.newKeySet[DeviceObserver]() - val sysBlockDir = RssConf.sysBlockDir(rssConf) + val sysBlockDir = rssConf.sysBlockDir val statFile = new File(s"$sysBlockDir/${deviceInfo.name}/stat") val inFlightFile = new File(s"$sysBlockDir/${deviceInfo.name}/inflight") @@ -181,13 +180,13 @@ class LocalDeviceMonitor( // (deviceName -> ObservedDevice) var observedDevices: util.Map[DeviceInfo, ObservedDevice] = _ - val diskCheckInterval = diskCheckIntervalMs(rssConf) + val diskCheckInterval = rssConf.diskCheckInterval // we should choose what the device needs to detect - val monitorCheckList = deviceMonitorCheckList(rssConf) - val checkIoHang = monitorCheckList.contains("iohang") - val checkReadWrite = monitorCheckList.contains("readwrite") - val checkDiskUsage = monitorCheckList.contains("diskusage") + val deviceMonitorCheckList = rssConf.deviceMonitorCheckList + val checkIoHang = deviceMonitorCheckList.contains("iohang") + val checkReadWrite = deviceMonitorCheckList.contains("readwrite") + val checkDiskUsage = deviceMonitorCheckList.contains("diskusage") private val diskChecker = ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-disk-checker") @@ -293,7 +292,7 @@ object DeviceMonitor { deviceInfos: util.Map[String, DeviceInfo], diskInfos: util.Map[String, DiskInfo]): DeviceMonitor = { try { - if (RssConf.deviceMonitorEnabled(rssConf)) { + if (rssConf.deviceMonitorEnabled) { val monitor = new LocalDeviceMonitor(rssConf, deviceObserver, deviceInfos, diskInfos) monitor.init() logger.info("Device monitor init success") @@ -310,18 +309,18 @@ object DeviceMonitor { /** * check if the disk is high usage - * @param rssConf conf + * @param conf conf * @param diskRootPath disk root path * @return true if high disk usage */ - def highDiskUsage(rssConf: RssConf, diskRootPath: String): Boolean = { + def highDiskUsage(conf: RssConf, diskRootPath: String): Boolean = { tryWithTimeoutAndCallback({ val usage = runCommand(s"df -B 1G $diskRootPath").trim.split("[ \t]+") val totalSpace = usage(usage.length - 5) val freeSpace = usage(usage.length - 3) val used_percent = usage(usage.length - 2) - val status = freeSpace.toLong < RssConf.diskMinimumReserveSize(rssConf) / 1024 / 1024 / 1024 + val status = freeSpace.toLong < conf.diskReserveSize / 1024 / 1024 / 1024 if (status) { logger.warn(s"$diskRootPath usage:{total:$totalSpace GB," + s" free:$freeSpace GB, used_percent:$used_percent}") @@ -329,7 +328,7 @@ object DeviceMonitor { status })(false)( deviceCheckThreadPool, - RssConf.workerStatusCheckTimeout(rssConf), + RssConf.workerStatusCheckTimeout(conf), s"Disk: $diskRootPath Usage Check Timeout") } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala index cb0a3e301..fc79a51df 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala @@ -37,8 +37,8 @@ import org.apache.celeborn.service.deploy.worker.WorkerSource abstract private[worker] class Flusher( val workerSource: AbstractSource, val threadCount: Int, - val flushAvgTimeWindowSize: Int, - val flushAvgTimeMinimumCount: Int) extends Logging { + val avgFlushTimeSlidingWindowSize: Int, + val avgFlushTimeSlidingWindowMinCount: Int) extends Logging { protected lazy val flusherId = System.identityHashCode(this) protected val workingQueues = new Array[LinkedBlockingQueue[FlushTask]](threadCount) protected val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf]() @@ -46,7 +46,7 @@ abstract private[worker] class Flusher( protected var nextWorkerIndex: Int = 0 protected val flushCount = new LongAdder protected val flushTotalTime = new LongAdder - protected val avgTimeWindow = new Array[(Long, Long)](flushAvgTimeWindowSize) + protected val avgTimeWindow = new Array[(Long, Long)](avgFlushTimeSlidingWindowSize) protected var avgTimeWindowCurrentIndex = 0 val lastBeginFlushTime: AtomicLongArray = new AtomicLongArray(threadCount) @@ -56,7 +56,7 @@ abstract private[worker] class Flusher( init() private def init(): Unit = { - for (i <- 0 until flushAvgTimeWindowSize) { + for (i <- 0 until avgFlushTimeSlidingWindowSize) { avgTimeWindow(i) = (0L, 0L) } for (i <- 0 until lastBeginFlushTime.length()) { @@ -113,9 +113,9 @@ abstract private[worker] class Flusher( } val currentFlushTime = flushTotalTime.sumThenReset() val currentFlushCount = flushCount.sumThenReset() - if (currentFlushCount >= flushAvgTimeMinimumCount) { + if (currentFlushCount >= avgFlushTimeSlidingWindowMinCount) { avgTimeWindow(avgTimeWindowCurrentIndex) = (currentFlushTime, currentFlushCount) - avgTimeWindowCurrentIndex = (avgTimeWindowCurrentIndex + 1) % flushAvgTimeWindowSize + avgTimeWindowCurrentIndex = (avgTimeWindowCurrentIndex + 1) % avgFlushTimeSlidingWindowSize } var totalFlushTime = 0L @@ -177,12 +177,12 @@ private[worker] class LocalFlusher( val deviceMonitor: DeviceMonitor, threadCount: Int, val mountPoint: String, - flushAvgTimeWindowSize: Int, + avgFlushTimeSlidingWindowSize: Int, flushAvgTimeMinimumCount: Int, val diskType: StorageInfo.Type) extends Flusher( workerSource, threadCount, - flushAvgTimeWindowSize, + avgFlushTimeSlidingWindowSize, flushAvgTimeMinimumCount) with DeviceObserver with Logging { @@ -216,13 +216,13 @@ private[worker] class LocalFlusher( final private[worker] class HdfsFlusher( workerSource: AbstractSource, - threadCount: Int, + hdfsFlusherThreads: Int, flushAvgTimeWindowSize: Int, - flushAvgTimeMinimumCount: Int) extends Flusher( + avgFlushTimeSlidingWindowMinCount: Int) extends Flusher( workerSource, - threadCount, + hdfsFlusherThreads, flushAvgTimeWindowSize, - flushAvgTimeMinimumCount) with Logging { + avgFlushTimeSlidingWindowMinCount) with Logging { override def toString: String = s"HdfsFlusher@$flusherId" diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index b0befc157..8045525c0 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -54,7 +54,7 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract val (deviceInfos, diskInfos) = { val workingDirInfos = - RssConf.workerBaseDirs(conf).map { case (workdir, maxSpace, flusherThread, storageType) => + conf.workerBaseDirs.map { case (workdir, maxSpace, flusherThread, storageType) => (new File(workdir, RssConf.workingDirName(conf)), maxSpace, flusherThread, storageType) } @@ -104,8 +104,8 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract deviceMonitor, diskInfo.threadCount, diskInfo.mountPoint, - RssConf.flushAvgTimeWindow(conf), - RssConf.flushAvgTimeMinimumCount(conf), + conf.avgFlushTimeSlidingWindowSize, + conf.avgFlushTimeSlidingWindowMinCount, diskInfo.storageType) flushers.put(diskInfo.mountPoint, flusher) } @@ -118,7 +118,7 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract deviceMonitor.startCheck() - val hdfsDir = RssConf.hdfsDir(conf) + val hdfsDir = conf.hdfsDir val hdfsPermission = FsPermission.createImmutable(755) val hdfsWriters = new util.ArrayList[FileWriter]() val hdfsFlusher = @@ -129,9 +129,9 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract StorageManager.hdfsFs = FileSystem.get(hdfsConfiguration) Some(new HdfsFlusher( workerSource, - RssConf.hdfsFlusherThreadCount(conf), - RssConf.flushAvgTimeWindow(conf), - RssConf.flushAvgTimeMinimumCount(conf))) + conf.hdfsFlusherThreads, + conf.avgFlushTimeSlidingWindowSize, + conf.avgFlushTimeSlidingWindowMinCount)) } else { None } @@ -261,7 +261,7 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract var retryCount = 0 var exception: IOException = null val suggestedMountPoint = location.getStorageInfo.getMountPoint - while (retryCount < RssConf.createFileWriterRetryCount(conf)) { + while (retryCount < conf.createWriterCreateMaxAttempts) { val diskInfo = diskInfos.get(suggestedMountPoint) val dirs = if (diskInfo != null && diskInfo.status.equals(DiskStatus.HEALTHY)) { @@ -523,7 +523,7 @@ final private[worker] class StorageManager(conf: RssConf, workerSource: Abstract diskOperators.size()) { entry => ThreadUtils.shutdown( entry._2, - conf.workerDiskFlusherShutdownTimeoutMs.milliseconds) + conf.workerFlusherShutdownTimeoutMs.milliseconds) } } storageScheduler.shutdownNow()