diff --git a/CONFIGURATION_GUIDE.md b/CONFIGURATION_GUIDE.md index fd4e68bf7..b67f6e9cd 100644 --- a/CONFIGURATION_GUIDE.md +++ b/CONFIGURATION_GUIDE.md @@ -204,7 +204,8 @@ So we should set `rss.worker.flush.queue.capacity=6553` and each RSS worker has | `rss.resume.memory.ratio` | 0.5 | double | If direct memory usage is less than this limit, worker will resume receive | | `rss.worker.reserveForSingleSort.memory` | 1mb | string | Reserve memory when sorting a shuffle file off-heap. | | `rss.storage.hint` | memory | string | Available enumerations : memory,ssd,hdd,hdfs,oss | -| `rss.initial.partition.size` | int | string | Initial estimated partition size, default is 64m, and it will change according to runtime stats. | +| `rss.initial.partition.size` | 64m | string | Initial estimated partition size, default is 64m, and it will change according to runtime stats. | +| `rss.minimum.estimate.partition.size` | 8m | string | Ignore partition size smaller than this configuration for partition size estimation. | | `rss.disk.flusher.useMountPoint` | bool | true | True means that each disk will get one disk flush. False means that a disk flush will be attached to a working directory. | | `rss.rpc.askTimeout` | 240s | string | Timeout for sending rpc messages. | | `rss.rpc.lookupTimeout` | 30s | string | Timeout for creating new connection. This value should be less than `rss.worker.timeout` to avoid worker lost in HA mode. | diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala index 6e6dc33ad..c926c2b29 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala @@ -535,10 +535,6 @@ object RssConf extends Logging { conf.getTimeAsMs("rss.expire.emptyDir.duration", "2h") } - def initialPartitionSize(conf: RssConf): Long = { - Utils.byteStringAsBytes(conf.get("rss.initial.partition.size", "64m")) - } - /** * * @param conf @@ -610,6 +606,14 @@ object RssConf extends Logging { conf.getDouble("rss.disk.group.gradient", 0.1) } + def initialPartitionSize(conf: RssConf): Long = { + Utils.byteStringAsBytes(conf.get("rss.initial.partition.size", "64m")) + } + + def minimumPartitionSizeForEstimation(conf: RssConf): Long = { + Utils.byteStringAsBytes(conf.get("rss.minimum.estimate.partition.size", "8m")) + } + def partitionSizeUpdaterInitialDelay(conf: RssConf): Long = { Utils.timeStringAsMs(conf.get("rss.partition.size.update.initial.delay", "5m")) } diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Controller.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Controller.scala index d64c3971a..f3be4661c 100644 --- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Controller.scala +++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Controller.scala @@ -52,6 +52,7 @@ private[deploy] class Controller( var timer: HashedWheelTimer = _ var commitThreadPool: ThreadPoolExecutor = _ var asyncReplyPool: ScheduledExecutorService = _ + val minimumPartitionSizeForEstimation = RssConf.minimumPartitionSizeForEstimation(conf) def init(worker: Worker): Unit = { workerSource = worker.workerSource @@ -178,7 +179,7 @@ private[deploy] class Controller( committedIds: jSet[String], failedIds: jSet[String], committedStorageHints: ConcurrentHashMap[String, StorageInfo], - writtenList: LinkedBlockingQueue[Long], + partitionSizeList: LinkedBlockingQueue[Long], master: Boolean = true): CompletableFuture[Void] = { var future: CompletableFuture[Void] = null @@ -202,7 +203,10 @@ private[deploy] class Controller( val bytes = fileWriter.close() if (bytes > 0L) { committedStorageHints.put(uniqueId, fileWriter.getStorageInfo) - writtenList.add(bytes) + if (bytes >= minimumPartitionSizeForEstimation) { + logDebug(s"bytes $bytes above threshold $minimumPartitionSizeForEstimation") + partitionSizeList.add(bytes) + } committedIds.add(uniqueId) } } catch { @@ -255,7 +259,7 @@ private[deploy] class Controller( val failedSlaveIds = ConcurrentHashMap.newKeySet[String]() val committedMasterStorageHints = new ConcurrentHashMap[String, StorageInfo]() val committedSlaveStorageHints = new ConcurrentHashMap[String, StorageInfo]() - val committedWrittenSize = new LinkedBlockingQueue[Long]() + val partitionSizeList = new LinkedBlockingQueue[Long]() val masterFuture = commitFiles( @@ -264,7 +268,7 @@ private[deploy] class Controller( committedMasterIds, failedMasterIds, committedMasterStorageHints, - committedWrittenSize + partitionSizeList ) val slaveFuture = commitFiles( shuffleKey, @@ -272,7 +276,7 @@ private[deploy] class Controller( committedSlaveIds, failedSlaveIds, committedSlaveStorageHints, - committedWrittenSize, + partitionSizeList, false ) @@ -306,8 +310,8 @@ private[deploy] class Controller( new jHashMap[String, StorageInfo](committedMasterStorageHints) val committedSlaveStorageAndDiskHintList = new jHashMap[String, StorageInfo](committedSlaveStorageHints) - val totalWritten = committedWrittenSize.asScala.sum - val fileCount = committedWrittenSize.size() + val totalSize = partitionSizeList.asScala.sum + val fileCount = partitionSizeList.size() // reply if (failedMasterIds.isEmpty && failedSlaveIds.isEmpty) { logInfo(s"CommitFiles for $shuffleKey success with ${committedMasterIds.size()}" + @@ -321,7 +325,7 @@ private[deploy] class Controller( List.empty.asJava, committedMasterStorageAndDiskHintList, committedSlaveStorageAndDiskHintList, - totalWritten, + totalSize, fileCount ) ) @@ -337,7 +341,7 @@ private[deploy] class Controller( failedSlaveIdList, committedMasterStorageAndDiskHintList, committedSlaveStorageAndDiskHintList, - totalWritten, + totalSize, fileCount ) )