[ISSUE-415][REFACTOR] Refactor Storage related class to separated scala file (#416)

This commit is contained in:
AngersZhuuuu 2022-08-24 15:18:53 +08:00 committed by GitHub
parent 1d4bb3616e
commit 8ca97d92e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 389 additions and 288 deletions

View File

@ -18,6 +18,7 @@
package com.aliyun.emr.rss.service.deploy.worker;
import com.aliyun.emr.rss.common.protocol.PartitionLocation;
import com.aliyun.emr.rss.service.deploy.worker.storage.FileWriter;
public class WorkingPartition extends PartitionLocation {
private final transient FileWriter fileWriter;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package com.aliyun.emr.rss.service.deploy.worker;
package com.aliyun.emr.rss.service.deploy.worker.storage;
import java.io.File;
import java.io.FileOutputStream;
@ -25,7 +25,6 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
@ -42,6 +41,7 @@ import com.aliyun.emr.rss.common.network.server.MemoryTracker;
import com.aliyun.emr.rss.common.protocol.PartitionSplitMode;
import com.aliyun.emr.rss.common.protocol.PartitionType;
import com.aliyun.emr.rss.common.protocol.StorageInfo;
import com.aliyun.emr.rss.service.deploy.worker.WorkerSource;
/*
* Note: Once FlushNotifier.exception is set, the whole file is not available.
@ -89,37 +89,17 @@ public final class FileWriter implements DeviceObserver {
deviceMonitor.unregisterFileWriter(this);
}
static class FlushNotifier {
final AtomicInteger numPendingFlushes = new AtomicInteger();
final AtomicReference<IOException> exception = new AtomicReference<>();
void setException(IOException e) {
exception.set(e);
}
boolean hasException() {
return exception.get() != null;
}
void checkException() throws IOException {
IOException e = exception.get();
if (e != null) {
throw e;
}
}
}
private final FlushNotifier notifier = new FlushNotifier();
public FileWriter(
FileInfo fileInfo,
Flusher flusher,
AbstractSource workerSource,
RssConf rssConf,
DeviceMonitor deviceMonitor,
long splitThreshold,
PartitionSplitMode splitMode,
PartitionType partitionType) throws IOException {
FileInfo fileInfo,
Flusher flusher,
AbstractSource workerSource,
RssConf rssConf,
DeviceMonitor deviceMonitor,
long splitThreshold,
PartitionSplitMode splitMode,
PartitionType partitionType) throws IOException {
this.fileInfo = fileInfo;
this.flusher = flusher;
this.flushWorkerIndex = flusher.getWorkerIndex();

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.aliyun.emr.rss.service.deploy.worker.storage;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class FlushNotifier {
final AtomicInteger numPendingFlushes = new AtomicInteger();
final AtomicReference<IOException> exception = new AtomicReference<>();
void setException(IOException e) {
exception.set(e);
}
boolean hasException() {
return exception.get() != null;
}
void checkException() throws IOException {
IOException e = exception.get();
if (e != null) {
throw e;
}
}
}

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package com.aliyun.emr.rss.service.deploy.worker;
package com.aliyun.emr.rss.service.deploy.worker.storage;
import java.io.File;
import java.io.FileInputStream;
@ -50,6 +50,9 @@ import com.aliyun.emr.rss.common.network.server.MemoryTracker;
import com.aliyun.emr.rss.common.unsafe.Platform;
import com.aliyun.emr.rss.common.util.ThreadUtils;
import com.aliyun.emr.rss.common.utils.PBSerDeUtils;
import com.aliyun.emr.rss.service.deploy.worker.LevelDBProvider;
import com.aliyun.emr.rss.service.deploy.worker.ShuffleRecoverHelper;
import com.aliyun.emr.rss.service.deploy.worker.WorkerSource;
public class PartitionFilesSorter extends ShuffleRecoverHelper {
private static final Logger logger = LoggerFactory.getLogger(PartitionFilesSorter.class);
@ -79,7 +82,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
"worker-file-sorter-execute", Math.max(Runtime.getRuntime().availableProcessors(), 8), 120);
private final Thread fileSorterSchedulerThread;
PartitionFilesSorter(MemoryTracker memoryTracker, RssConf conf, AbstractSource source) {
public PartitionFilesSorter(MemoryTracker memoryTracker, RssConf conf, AbstractSource source) {
this.sortTimeout = RssConf.partitionSortTimeout(conf);
this.fetchChunkSize = RssConf.chunkSize(conf);
this.reserveMemoryForSingleSort = RssConf.memoryReservedForSingleSort(conf);

View File

@ -36,6 +36,7 @@ import com.aliyun.emr.rss.common.protocol.message.ControlMessages._
import com.aliyun.emr.rss.common.protocol.message.StatusCode
import com.aliyun.emr.rss.common.rpc._
import com.aliyun.emr.rss.common.util.Utils
import com.aliyun.emr.rss.service.deploy.worker.storage.StorageManager
private[deploy] class Controller(
override val rpcEnv: RpcEnv,

View File

@ -36,6 +36,7 @@ import com.aliyun.emr.rss.common.network.server.FileManagedBuffers
import com.aliyun.emr.rss.common.network.server.OneForOneStreamManager
import com.aliyun.emr.rss.common.network.util.NettyUtils
import com.aliyun.emr.rss.common.network.util.TransportConf
import com.aliyun.emr.rss.service.deploy.worker.storage.{PartitionFilesSorter, StorageManager}
class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logging {
var streamManager = new OneForOneStreamManager()

View File

@ -35,7 +35,7 @@ import com.aliyun.emr.rss.common.network.server.BaseMessageHandler
import com.aliyun.emr.rss.common.protocol.{PartitionLocation, PartitionSplitMode}
import com.aliyun.emr.rss.common.protocol.message.StatusCode
import com.aliyun.emr.rss.common.unsafe.Platform
import com.aliyun.emr.rss.service.deploy.worker.storage.{FileWriter, LocalFlusher}
class PushDataHandler extends BaseMessageHandler with Logging {

View File

@ -42,6 +42,7 @@ import com.aliyun.emr.rss.common.util.{ThreadUtils, Utils}
import com.aliyun.emr.rss.common.utils.ShutdownHookManager
import com.aliyun.emr.rss.server.common.http.{HttpServer, HttpServerInitializer}
import com.aliyun.emr.rss.service.deploy.worker.http.HttpRequestHandler
import com.aliyun.emr.rss.service.deploy.worker.storage.{PartitionFilesSorter, StorageManager}
private[deploy] class Worker(
val conf: RssConf,

View File

@ -15,9 +15,9 @@
* limitations under the License.
*/
package com.aliyun.emr.rss.service.deploy.worker
package com.aliyun.emr.rss.service.deploy.worker.storage
import java.io.{BufferedReader, File, FileInputStream, InputStreamReader, IOException}
import java.io._
import java.nio.charset.Charset
import java.util
import java.util.{Set => jSet}
@ -30,8 +30,7 @@ import org.apache.commons.io.FileUtils
import org.slf4j.LoggerFactory
import com.aliyun.emr.rss.common.RssConf
import com.aliyun.emr.rss.common.RssConf.deviceMonitorCheckList
import com.aliyun.emr.rss.common.RssConf.diskCheckIntervalMs
import com.aliyun.emr.rss.common.RssConf.{deviceMonitorCheckList, diskCheckIntervalMs}
import com.aliyun.emr.rss.common.meta.{DeviceInfo, DiskInfo, DiskStatus}
import com.aliyun.emr.rss.common.util.ThreadUtils
import com.aliyun.emr.rss.common.util.Utils._

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.aliyun.emr.rss.service.deploy.worker.storage
import com.aliyun.emr.rss.common.meta.DiskStatus
trait DeviceObserver {
def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit = {}
def notifyHealthy(mountPoint: String): Unit = {}
def notifyHighDiskUsage(mountPoint: String): Unit = {}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.aliyun.emr.rss.service.deploy.worker.storage
import java.nio.channels.FileChannel
import io.netty.buffer.{ByteBufUtil, CompositeByteBuf}
import org.apache.hadoop.fs.FSDataOutputStream
private[worker] abstract class FlushTask(
val buffer: CompositeByteBuf,
val notifier: FlushNotifier) {
def flush(): Unit
}
private[worker] class LocalFlushTask(
buffer: CompositeByteBuf,
fileChannel: FileChannel,
notifier: FlushNotifier) extends FlushTask(buffer, notifier) {
override def flush(): Unit = {
fileChannel.write(buffer.nioBuffers())
}
}
private[worker] class HdfsFlushTask(
buffer: CompositeByteBuf,
fsStream: FSDataOutputStream,
notifier: FlushNotifier) extends FlushTask(buffer, notifier) {
override def flush(): Unit = {
fsStream.write(ByteBufUtil.getBytes(buffer))
}
}

View File

@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.aliyun.emr.rss.service.deploy.worker.storage
import java.io.IOException
import java.nio.channels.ClosedByInterruptException
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLongArray, LongAdder}
import scala.collection.JavaConverters._
import scala.util.Random
import io.netty.buffer.{CompositeByteBuf, Unpooled}
import com.aliyun.emr.rss.common.internal.Logging
import com.aliyun.emr.rss.common.meta.DiskStatus
import com.aliyun.emr.rss.common.metrics.source.AbstractSource
import com.aliyun.emr.rss.common.network.server.MemoryTracker
import com.aliyun.emr.rss.common.protocol.StorageInfo
import com.aliyun.emr.rss.service.deploy.worker.WorkerSource
private[worker] abstract class Flusher(
val workerSource: AbstractSource,
val threadCount: Int,
val flushAvgTimeWindowSize: Int,
val flushAvgTimeMinimumCount: Int) extends Logging {
protected lazy val flusherId = System.identityHashCode(this)
protected val workingQueues = new Array[LinkedBlockingQueue[FlushTask]](threadCount)
protected val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf]()
protected val workers = new Array[Thread](threadCount)
protected var nextWorkerIndex: Int = 0
protected val flushCount = new LongAdder
protected val flushTotalTime = new LongAdder
protected val avgTimeWindow = new Array[(Long, Long)](flushAvgTimeWindowSize)
protected var avgTimeWindowCurrentIndex = 0
val lastBeginFlushTime: AtomicLongArray = new AtomicLongArray(threadCount)
val stopFlag = new AtomicBoolean(false)
val rand = new Random()
init()
private def init(): Unit = {
for (i <- 0 until flushAvgTimeWindowSize) {
avgTimeWindow(i) = (0L, 0L)
}
for (i <- 0 until lastBeginFlushTime.length()) {
lastBeginFlushTime.set(i, -1)
}
for (index <- 0 until threadCount) {
workingQueues(index) = new LinkedBlockingQueue[FlushTask]()
workers(index) = new Thread(s"$this-$index") {
override def run(): Unit = {
while (!stopFlag.get()) {
val task = workingQueues(index).take()
val key = s"Flusher-$this-${rand.nextInt()}"
workerSource.sample(WorkerSource.FlushDataTime, key) {
if (!task.notifier.hasException) {
try {
val flushBeginTime = System.nanoTime()
lastBeginFlushTime.set(index, flushBeginTime)
task.flush()
flushTotalTime.add(System.nanoTime() - flushBeginTime)
flushCount.increment()
} catch {
case _: ClosedByInterruptException =>
case e: IOException =>
task.notifier.setException(e)
processIOException(e, DiskStatus.ReadOrWriteFailure)
}
lastBeginFlushTime.set(index, -1)
}
returnBuffer(task.buffer)
task.notifier.numPendingFlushes.decrementAndGet()
}
}
}
}
workers(index).setDaemon(true)
workers(index).setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
logError(s"$this thread terminated.", e)
}
})
workers(index).start()
}
}
def getWorkerIndex: Int = synchronized {
nextWorkerIndex = (nextWorkerIndex + 1) % threadCount
nextWorkerIndex
}
def averageFlushTime(): Long = {
if (this.isInstanceOf[LocalFlusher]) {
logInfo(s"Flush count in ${this.asInstanceOf[LocalFlusher].mountPoint}" +
s" last heartbeat interval: $flushCount")
}
val currentFlushTime = flushTotalTime.sumThenReset()
val currentFlushCount = flushCount.sumThenReset()
if (currentFlushCount >= flushAvgTimeMinimumCount) {
avgTimeWindow(avgTimeWindowCurrentIndex) = (currentFlushTime, currentFlushCount)
avgTimeWindowCurrentIndex = (avgTimeWindowCurrentIndex + 1) % flushAvgTimeWindowSize
}
var totalFlushTime = 0L
var totalFlushCount = 0L
avgTimeWindow.foreach { case (flushTime, flushCount) =>
totalFlushTime = totalFlushTime + flushTime
totalFlushCount = totalFlushCount + flushCount
}
if (totalFlushCount != 0) {
totalFlushTime / totalFlushCount
} else {
0L
}
}
def takeBuffer(): CompositeByteBuf = {
var buffer = bufferQueue.poll()
if (buffer == null) {
buffer = Unpooled.compositeBuffer(256)
}
buffer
}
def returnBuffer(buffer: CompositeByteBuf): Unit = {
MemoryTracker.instance().releaseDiskBuffer(buffer.readableBytes())
buffer.removeComponents(0, buffer.numComponents())
buffer.clear()
bufferQueue.put(buffer)
}
def addTask(task: FlushTask, timeoutMs: Long, workerIndex: Int): Boolean = {
workingQueues(workerIndex).offer(task, timeoutMs, TimeUnit.MILLISECONDS)
}
def bufferQueueInfo(): String = s"$this used buffers: ${bufferQueue.size()}"
def stopAndCleanFlusher(): Unit = {
stopFlag.set(true)
try {
workers.foreach(_.interrupt())
} catch {
case e: Exception =>
logError(s"Exception when interrupt worker: ${workers.mkString(",")}, $e")
}
workingQueues.foreach { queue =>
queue.asScala.foreach { task =>
task.buffer.removeComponents(0, task.buffer.numComponents())
task.buffer.clear()
}
}
}
def processIOException(e: IOException, deviceErrorType: DiskStatus): Unit
}
private[worker] class LocalFlusher(
workerSource: AbstractSource,
val deviceMonitor: DeviceMonitor,
threadCount: Int,
val mountPoint: String,
flushAvgTimeWindowSize: Int,
flushAvgTimeMinimumCount: Int,
val diskType: StorageInfo.Type) extends Flusher(
workerSource,
threadCount,
flushAvgTimeWindowSize,
flushAvgTimeMinimumCount)
with DeviceObserver with Logging {
deviceMonitor.registerFlusher(this)
override def processIOException(e: IOException, deviceErrorType: DiskStatus): Unit = {
stopFlag.set(true)
logError(s"$this write failed, report to DeviceMonitor, eception: $e")
deviceMonitor.reportDeviceError(mountPoint, e, deviceErrorType)
}
override def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit = {
logError(s"$this is notified Disk $mountPoint $diskStatus! Stop LocalFlusher.")
stopAndCleanFlusher()
deviceMonitor.unregisterFlusher(this)
}
override def hashCode(): Int = {
mountPoint.hashCode()
}
override def equals(obj: Any): Boolean = {
obj.isInstanceOf[LocalFlusher] &&
obj.asInstanceOf[LocalFlusher].mountPoint.equals(mountPoint)
}
override def toString(): String = {
s"LocalFlusher@$flusherId-$mountPoint"
}
}
private[worker] final class HdfsFlusher(
workerSource: AbstractSource,
threadCount: Int,
flushAvgTimeWindowSize: Int,
flushAvgTimeMinimumCount: Int) extends Flusher(
workerSource,
threadCount,
flushAvgTimeWindowSize,
flushAvgTimeMinimumCount) with Logging {
override def toString: String = s"HdfsFlusher@$flusherId"
override def processIOException(e: IOException, deviceErrorType: DiskStatus): Unit = {
stopAndCleanFlusher()
logError(s"$this write failed, reason $deviceErrorType ,exception: $e")
}
}

View File

@ -15,273 +15,37 @@
* limitations under the License.
*/
package com.aliyun.emr.rss.service.deploy.worker
package com.aliyun.emr.rss.service.deploy.worker.storage
import java.io.{File, IOException}
import java.nio.channels.{ClosedByInterruptException, FileChannel}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.util
import java.util.concurrent.{ConcurrentHashMap, Executors, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLongArray, LongAdder}
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.IntUnaryOperator
import scala.collection.JavaConverters._
import scala.util.Random
import com.google.common.util.concurrent.ThreadFactoryBuilder
import io.netty.buffer.{ByteBufUtil, CompositeByteBuf, Unpooled}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.iq80.leveldb.DB
import com.aliyun.emr.rss.common.RssConf
import com.aliyun.emr.rss.common.exception.RssException
import com.aliyun.emr.rss.common.internal.Logging
import com.aliyun.emr.rss.common.meta.{DeviceInfo, DiskInfo}
import com.aliyun.emr.rss.common.meta.DiskStatus
import com.aliyun.emr.rss.common.meta.{DeviceInfo, DiskInfo, DiskStatus}
import com.aliyun.emr.rss.common.metrics.source.AbstractSource
import com.aliyun.emr.rss.common.network.server.{FileInfo, MemoryTracker}
import com.aliyun.emr.rss.common.network.server.FileInfo
import com.aliyun.emr.rss.common.network.server.MemoryTracker.MemoryTrackerListener
import com.aliyun.emr.rss.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType, StorageInfo}
import com.aliyun.emr.rss.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType}
import com.aliyun.emr.rss.common.util.{ThreadUtils, Utils}
import com.aliyun.emr.rss.common.utils.PBSerDeUtils
import com.aliyun.emr.rss.service.deploy.worker.FileWriter.FlushNotifier
import com.aliyun.emr.rss.service.deploy.worker._
trait DeviceObserver {
def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit = {}
def notifyHealthy(mountPoint: String): Unit = {}
def notifyHighDiskUsage(mountPoint: String): Unit = {}
}
private[worker] abstract class FlushTask(
val buffer: CompositeByteBuf,
val notifier: FlushNotifier) {
def flush(): Unit
}
private[worker] class LocalFlushTask(
buffer: CompositeByteBuf,
fileChannel: FileChannel,
notifier: FileWriter.FlushNotifier) extends FlushTask(buffer, notifier) {
override def flush(): Unit = {
fileChannel.write(buffer.nioBuffers())
}
}
private[worker] class HdfsFlushTask(
buffer: CompositeByteBuf,
fsStream: FSDataOutputStream,
notifier: FileWriter.FlushNotifier) extends FlushTask(buffer, notifier) {
override def flush(): Unit = {
fsStream.write(ByteBufUtil.getBytes(buffer))
}
}
private[worker] abstract class Flusher(
val workerSource: AbstractSource,
val threadCount: Int,
val flushAvgTimeWindowSize: Int,
val flushAvgTimeMinimumCount: Int) extends Logging {
protected lazy val flusherId = System.identityHashCode(this)
protected val workingQueues = new Array[LinkedBlockingQueue[FlushTask]](threadCount)
protected val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf]()
protected val workers = new Array[Thread](threadCount)
protected var nextWorkerIndex: Int = 0
protected val flushCount = new LongAdder
protected val flushTotalTime = new LongAdder
protected val avgTimeWindow = new Array[(Long, Long)](flushAvgTimeWindowSize)
protected var avgTimeWindowCurrentIndex = 0
val lastBeginFlushTime: AtomicLongArray = new AtomicLongArray(threadCount)
val stopFlag = new AtomicBoolean(false)
val rand = new Random()
init()
private def init(): Unit = {
for (i <- 0 until flushAvgTimeWindowSize) {
avgTimeWindow(i) = (0L, 0L)
}
for (i <- 0 until lastBeginFlushTime.length()) {
lastBeginFlushTime.set(i, -1)
}
for (index <- 0 until threadCount) {
workingQueues(index) = new LinkedBlockingQueue[FlushTask]()
workers(index) = new Thread(s"$this-$index") {
override def run(): Unit = {
while (!stopFlag.get()) {
val task = workingQueues(index).take()
val key = s"Flusher-$this-${rand.nextInt()}"
workerSource.sample(WorkerSource.FlushDataTime, key) {
if (!task.notifier.hasException) {
try {
val flushBeginTime = System.nanoTime()
lastBeginFlushTime.set(index, flushBeginTime)
task.flush()
flushTotalTime.add(System.nanoTime() - flushBeginTime)
flushCount.increment()
} catch {
case _: ClosedByInterruptException =>
case e: IOException =>
task.notifier.setException(e)
processIOException(e, DiskStatus.ReadOrWriteFailure)
}
lastBeginFlushTime.set(index, -1)
}
returnBuffer(task.buffer)
task.notifier.numPendingFlushes.decrementAndGet()
}
}
}
}
workers(index).setDaemon(true)
workers(index).setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
logError(s"$this thread terminated.", e)
}
})
workers(index).start()
}
}
def getWorkerIndex: Int = synchronized {
nextWorkerIndex = (nextWorkerIndex + 1) % threadCount
nextWorkerIndex
}
def averageFlushTime(): Long = {
if (this.isInstanceOf[LocalFlusher]) {
logInfo(s"Flush count in ${this.asInstanceOf[LocalFlusher].mountPoint}" +
s" last heartbeat interval: $flushCount")
}
val currentFlushTime = flushTotalTime.sumThenReset()
val currentFlushCount = flushCount.sumThenReset()
if (currentFlushCount >= flushAvgTimeMinimumCount) {
avgTimeWindow(avgTimeWindowCurrentIndex) = (currentFlushTime, currentFlushCount)
avgTimeWindowCurrentIndex = (avgTimeWindowCurrentIndex + 1) % flushAvgTimeWindowSize
}
var totalFlushTime = 0L
var totalFlushCount = 0L
avgTimeWindow.foreach { case (flushTime, flushCount) =>
totalFlushTime = totalFlushTime + flushTime
totalFlushCount = totalFlushCount + flushCount
}
if (totalFlushCount != 0) {
totalFlushTime / totalFlushCount
} else {
0L
}
}
def takeBuffer(): CompositeByteBuf = {
var buffer = bufferQueue.poll()
if (buffer == null) {
buffer = Unpooled.compositeBuffer(256)
}
buffer
}
def returnBuffer(buffer: CompositeByteBuf): Unit = {
MemoryTracker.instance().releaseDiskBuffer(buffer.readableBytes())
buffer.removeComponents(0, buffer.numComponents())
buffer.clear()
bufferQueue.put(buffer)
}
def addTask(task: FlushTask, timeoutMs: Long, workerIndex: Int): Boolean = {
workingQueues(workerIndex).offer(task, timeoutMs, TimeUnit.MILLISECONDS)
}
def bufferQueueInfo(): String = s"$this used buffers: ${bufferQueue.size()}"
def stopAndCleanFlusher(): Unit = {
stopFlag.set(true)
try {
workers.foreach(_.interrupt())
} catch {
case e: Exception =>
logError(s"Exception when interrupt worker: ${workers.mkString(",")}, $e")
}
workingQueues.foreach { queue =>
queue.asScala.foreach { task =>
task.buffer.removeComponents(0, task.buffer.numComponents())
task.buffer.clear()
}
}
}
def processIOException(e: IOException, deviceErrorType: DiskStatus): Unit
}
private[worker] class LocalFlusher(
workerSource: AbstractSource,
val deviceMonitor: DeviceMonitor,
threadCount: Int,
val mountPoint: String,
flushAvgTimeWindowSize: Int,
flushAvgTimeMinimumCount: Int,
val diskType: StorageInfo.Type) extends Flusher(
workerSource,
threadCount,
flushAvgTimeWindowSize,
flushAvgTimeMinimumCount)
with DeviceObserver with Logging {
deviceMonitor.registerFlusher(this)
override def processIOException(e: IOException, deviceErrorType: DiskStatus): Unit = {
stopFlag.set(true)
logError(s"$this write failed, report to DeviceMonitor, eception: $e")
deviceMonitor.reportDeviceError(mountPoint, e, deviceErrorType)
}
override def notifyError(mountPoint: String,
diskStatus: DiskStatus): Unit = {
logError(s"$this is notified Disk $mountPoint $diskStatus! Stop LocalFlusher.")
stopAndCleanFlusher()
deviceMonitor.unregisterFlusher(this)
}
override def hashCode(): Int = {
mountPoint.hashCode()
}
override def equals(obj: Any): Boolean = {
obj.isInstanceOf[LocalFlusher] &&
obj.asInstanceOf[LocalFlusher].mountPoint.equals(mountPoint)
}
override def toString(): String = {
s"LocalFlusher@$flusherId-$mountPoint"
}
}
private[worker] final class HdfsFlusher(
workerSource: AbstractSource,
threadCount: Int,
flushAvgTimeWindowSize: Int,
flushAvgTimeMinimumCount: Int) extends Flusher(
workerSource,
threadCount,
flushAvgTimeWindowSize,
flushAvgTimeMinimumCount) with Logging {
override def toString: String = s"HdfsFlusher@$flusherId"
override def processIOException(e: IOException, deviceErrorType: DiskStatus): Unit = {
stopAndCleanFlusher()
logError(s"$this write failed, reason $deviceErrorType ,exception: $e")
}
}
private[worker] final class StorageManager(
conf: RssConf,
workerSource: AbstractSource)
private[worker] final class StorageManager(conf: RssConf, workerSource: AbstractSource)
extends ShuffleRecoverHelper with DeviceObserver with Logging with MemoryTrackerListener{
// mount point -> filewriter
val workingDirWriters = new ConcurrentHashMap[File, util.ArrayList[FileWriter]]()

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package com.aliyun.emr.rss.service.deploy.worker;
package com.aliyun.emr.rss.service.deploy.worker.storage;
import java.io.File;
import java.io.IOException;
@ -70,6 +70,8 @@ import com.aliyun.emr.rss.common.protocol.PartitionType;
import com.aliyun.emr.rss.common.protocol.StorageInfo;
import com.aliyun.emr.rss.common.util.ThreadUtils;
import com.aliyun.emr.rss.common.util.Utils;
import com.aliyun.emr.rss.service.deploy.worker.FetchHandler;
import com.aliyun.emr.rss.service.deploy.worker.WorkerSource;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package com.aliyun.emr.rss.service.deploy.worker;
package com.aliyun.emr.rss.service.deploy.worker.storage;
import java.io.File;
import java.io.FileOutputStream;
@ -36,6 +36,7 @@ import com.aliyun.emr.rss.common.network.server.FileInfo;
import com.aliyun.emr.rss.common.network.server.MemoryTracker;
import com.aliyun.emr.rss.common.unsafe.Platform;
import com.aliyun.emr.rss.common.util.Utils;
import com.aliyun.emr.rss.service.deploy.worker.WorkerSource;
import static org.mockito.Mockito.when;

View File

@ -15,15 +15,14 @@
* limitations under the License.
*/
package com.aliyun.emr.rss.service.deploy.worker
package com.aliyun.emr.rss.service.deploy.worker.storage
import java.io.File
import java.util.{ArrayList => jArrayList}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.{ArrayList => jArrayList}
import scala.collection.JavaConverters._
import scala.collection.JavaConverters.{bufferAsJavaListConverter, _}
import scala.collection.mutable.ListBuffer
import scala.collection.JavaConverters.bufferAsJavaListConverter
import org.junit.Assert.assertEquals
import org.mockito.ArgumentMatchers._