[ISSUE-273][FOLLOW-UP] Add minimum partition size threshold for parti… (#346)

This commit is contained in:
Keyong Zhou 2022-08-13 18:29:43 +08:00 committed by GitHub
parent 766b3118d7
commit 89903d162e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 14 deletions

View File

@ -204,7 +204,8 @@ So we should set `rss.worker.flush.queue.capacity=6553` and each RSS worker has
| `rss.resume.memory.ratio` | 0.5 | double | If direct memory usage is less than this limit, worker will resume receive |
| `rss.worker.reserveForSingleSort.memory` | 1mb | string | Reserve memory when sorting a shuffle file off-heap. |
| `rss.storage.hint` | memory | string | Available enumerations : memory,ssd,hdd,hdfs,oss |
| `rss.initial.partition.size` | int | string | Initial estimated partition size, default is 64m, and it will change according to runtime stats. |
| `rss.initial.partition.size` | 64m | string | Initial estimated partition size, default is 64m, and it will change according to runtime stats. |
| `rss.minimum.estimate.partition.size` | 8m | string | Ignore partition size smaller than this configuration for partition size estimation. |
| `rss.disk.flusher.useMountPoint` | bool | true | True means that each disk will get one disk flush. False means that a disk flush will be attached to a working directory. |
| `rss.rpc.askTimeout` | 240s | string | Timeout for sending rpc messages. |
| `rss.rpc.lookupTimeout` | 30s | string | Timeout for creating new connection. This value should be less than `rss.worker.timeout` to avoid worker lost in HA mode. |

View File

@ -535,10 +535,6 @@ object RssConf extends Logging {
conf.getTimeAsMs("rss.expire.emptyDir.duration", "2h")
}
def initialPartitionSize(conf: RssConf): Long = {
Utils.byteStringAsBytes(conf.get("rss.initial.partition.size", "64m"))
}
/**
*
* @param conf
@ -610,6 +606,14 @@ object RssConf extends Logging {
conf.getDouble("rss.disk.group.gradient", 0.1)
}
def initialPartitionSize(conf: RssConf): Long = {
Utils.byteStringAsBytes(conf.get("rss.initial.partition.size", "64m"))
}
def minimumPartitionSizeForEstimation(conf: RssConf): Long = {
Utils.byteStringAsBytes(conf.get("rss.minimum.estimate.partition.size", "8m"))
}
def partitionSizeUpdaterInitialDelay(conf: RssConf): Long = {
Utils.timeStringAsMs(conf.get("rss.partition.size.update.initial.delay", "5m"))
}

View File

@ -52,6 +52,7 @@ private[deploy] class Controller(
var timer: HashedWheelTimer = _
var commitThreadPool: ThreadPoolExecutor = _
var asyncReplyPool: ScheduledExecutorService = _
val minimumPartitionSizeForEstimation = RssConf.minimumPartitionSizeForEstimation(conf)
def init(worker: Worker): Unit = {
workerSource = worker.workerSource
@ -178,7 +179,7 @@ private[deploy] class Controller(
committedIds: jSet[String],
failedIds: jSet[String],
committedStorageHints: ConcurrentHashMap[String, StorageInfo],
writtenList: LinkedBlockingQueue[Long],
partitionSizeList: LinkedBlockingQueue[Long],
master: Boolean = true): CompletableFuture[Void] = {
var future: CompletableFuture[Void] = null
@ -202,7 +203,10 @@ private[deploy] class Controller(
val bytes = fileWriter.close()
if (bytes > 0L) {
committedStorageHints.put(uniqueId, fileWriter.getStorageInfo)
writtenList.add(bytes)
if (bytes >= minimumPartitionSizeForEstimation) {
logDebug(s"bytes $bytes above threshold $minimumPartitionSizeForEstimation")
partitionSizeList.add(bytes)
}
committedIds.add(uniqueId)
}
} catch {
@ -255,7 +259,7 @@ private[deploy] class Controller(
val failedSlaveIds = ConcurrentHashMap.newKeySet[String]()
val committedMasterStorageHints = new ConcurrentHashMap[String, StorageInfo]()
val committedSlaveStorageHints = new ConcurrentHashMap[String, StorageInfo]()
val committedWrittenSize = new LinkedBlockingQueue[Long]()
val partitionSizeList = new LinkedBlockingQueue[Long]()
val masterFuture =
commitFiles(
@ -264,7 +268,7 @@ private[deploy] class Controller(
committedMasterIds,
failedMasterIds,
committedMasterStorageHints,
committedWrittenSize
partitionSizeList
)
val slaveFuture = commitFiles(
shuffleKey,
@ -272,7 +276,7 @@ private[deploy] class Controller(
committedSlaveIds,
failedSlaveIds,
committedSlaveStorageHints,
committedWrittenSize,
partitionSizeList,
false
)
@ -306,8 +310,8 @@ private[deploy] class Controller(
new jHashMap[String, StorageInfo](committedMasterStorageHints)
val committedSlaveStorageAndDiskHintList =
new jHashMap[String, StorageInfo](committedSlaveStorageHints)
val totalWritten = committedWrittenSize.asScala.sum
val fileCount = committedWrittenSize.size()
val totalSize = partitionSizeList.asScala.sum
val fileCount = partitionSizeList.size()
// reply
if (failedMasterIds.isEmpty && failedSlaveIds.isEmpty) {
logInfo(s"CommitFiles for $shuffleKey success with ${committedMasterIds.size()}" +
@ -321,7 +325,7 @@ private[deploy] class Controller(
List.empty.asJava,
committedMasterStorageAndDiskHintList,
committedSlaveStorageAndDiskHintList,
totalWritten,
totalSize,
fileCount
)
)
@ -337,7 +341,7 @@ private[deploy] class Controller(
failedSlaveIdList,
committedMasterStorageAndDiskHintList,
committedSlaveStorageAndDiskHintList,
totalWritten,
totalSize,
fileCount
)
)