[CELEBORN-1490][CIP-6] Impl worker write process for Flink Hybrid Shuffle

### What changes were proposed in this pull request?

Impl worker write process for Flink Hybrid Shuffle.

### Why are the changes needed?

We supports tiered producer write data from flink to worker. In this PR, we enable the worker to write this kind of data to storage.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?
no need.

Closes #2741 from reswqa/cip6-6-pr.

Authored-by: Weijie Guo <reswqa@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
This commit is contained in:
Weijie Guo 2024-09-25 10:27:55 +08:00 committed by Shuang
parent 909d6c3b9c
commit d8809793f3
10 changed files with 745 additions and 25 deletions

View File

@ -7555,7 +7555,7 @@
"h": 9,
"w": 12,
"x": 0,
"y": 280
"y": 10
},
"id": 139,
"options": {
@ -7647,7 +7647,7 @@
"h": 9,
"w": 12,
"x": 12,
"y": 280
"y": 10
},
"id": 141,
"options": {
@ -7739,7 +7739,7 @@
"h": 9,
"w": 12,
"x": 0,
"y": 289
"y": 19
},
"id": 142,
"options": {
@ -7831,7 +7831,7 @@
"h": 9,
"w": 12,
"x": 12,
"y": 289
"y": 19
},
"id": 143,
"options": {
@ -7923,7 +7923,7 @@
"h": 9,
"w": 12,
"x": 0,
"y": 298
"y": 28
},
"id": 144,
"options": {
@ -8015,7 +8015,7 @@
"h": 9,
"w": 12,
"x": 12,
"y": 298
"y": 28
},
"id": 145,
"options": {
@ -8107,7 +8107,7 @@
"h": 9,
"w": 12,
"x": 0,
"y": 307
"y": 37
},
"id": 146,
"options": {
@ -8199,7 +8199,7 @@
"h": 9,
"w": 12,
"x": 12,
"y": 307
"y": 37
},
"id": 147,
"options": {
@ -8291,7 +8291,7 @@
"h": 9,
"w": 12,
"x": 0,
"y": 316
"y": 46
},
"id": 148,
"options": {
@ -8383,7 +8383,7 @@
"h": 9,
"w": 12,
"x": 12,
"y": 316
"y": 46
},
"id": 149,
"options": {
@ -8475,7 +8475,7 @@
"h": 9,
"w": 12,
"x": 0,
"y": 325
"y": 55
},
"id": 150,
"options": {
@ -8567,7 +8567,7 @@
"h": 9,
"w": 12,
"x": 12,
"y": 325
"y": 55
},
"id": 151,
"options": {
@ -8658,7 +8658,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 334
"y": 64
},
"id": 153,
"options": {
@ -8749,7 +8749,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 334
"y": 64
},
"id": 154,
"options": {
@ -8840,7 +8840,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 342
"y": 72
},
"id": 155,
"options": {
@ -8870,6 +8870,465 @@
],
"title": "metrics_RegionStartFailCount_Count",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 72
},
"id": 200,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "metrics_SegmentStartFailCount_Count",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
"title": "metrics_SegmentStartFailCount_Count",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "ms"
},
"overrides": []
},
"gridPos": {
"h": 9,
"w": 12,
"x": 0,
"y": 80
},
"id": 198,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "metrics_ReplicaSegmentStartTime_Mean",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
"title": "metrics_ReplicaSegmentStartTime_Mean",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "ms"
},
"overrides": []
},
"gridPos": {
"h": 9,
"w": 12,
"x": 12,
"y": 80
},
"id": 199,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "metrics_ReplicaSegmentStartTime_Max",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
"title": "metrics_ReplicaSegmentStartTime_Max",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "ms"
},
"overrides": []
},
"gridPos": {
"h": 9,
"w": 12,
"x": 0,
"y": 89
},
"id": 196,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "metrics_PrimarySegmentStartTime_Mean",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
"title": "metrics_PrimarySegmentStartTime_Mean",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "ms"
},
"overrides": []
},
"gridPos": {
"h": 9,
"w": 12,
"x": 12,
"y": 89
},
"id": 197,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "metrics_PrimarySegmentStartTime_Max",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
"title": "metrics_PrimarySegmentStartTime_Max",
"type": "timeseries"
}
],
"title": "MapPartitionRelatives",

View File

@ -179,6 +179,8 @@ These metrics are exposed by Celeborn worker.
| FetchChunkFailCount | The count of fetching chunk failed in current worker. |
| PrimaryPushDataTime | The time for a worker to handle a pushData RPC sent from a celeborn client. |
| ReplicaPushDataTime | The time for a worker to handle a pushData RPC sent from a celeborn worker by replicating. |
| PrimarySegmentStartTime | The time for a worker to handle a segmentStart RPC sent from a celeborn client. |
| ReplicaSegmentStartTime | The time for a worker to handle a segmentStart RPC sent from a celeborn worker by replicating. |
| WriteDataHardSplitCount | The count of writing PushData or PushMergedData to HARD_SPLIT partition in current worker. |
| WriteDataSuccessCount | The count of writing PushData or PushMergedData succeed in current worker. |
| WriteDataFailCount | The count of writing PushData or PushMergedData failed in current worker. |
@ -191,6 +193,7 @@ These metrics are exposed by Celeborn worker.
| PushDataHandshakeFailCount | The count of PushDataHandshake failed in current worker. |
| RegionStartFailCount | The count of RegionStart failed in current worker. |
| RegionFinishFailCount | The count of RegionFinish failed in current worker. |
| SegmentStartFailCount | The count of SegmentStart failed in current worker. |
| PrimaryPushDataHandshakeTime | PrimaryPushDataHandshake means handle PushData of primary partition location. |
| ReplicaPushDataHandshakeTime | ReplicaPushDataHandshake means handle PushData of replica partition location. |
| PrimaryRegionStartTime | PrimaryRegionStart means handle RegionStart of primary partition location. |

View File

@ -40,7 +40,7 @@ import org.apache.celeborn.common.util.Utils;
/*
* map partition file writer, it will create index for each partition
*/
public final class MapPartitionDataWriter extends PartitionDataWriter {
public class MapPartitionDataWriter extends PartitionDataWriter {
private static final Logger logger = LoggerFactory.getLogger(MapPartitionDataWriter.class);
private int numSubpartitions;
@ -115,7 +115,7 @@ public final class MapPartitionDataWriter extends PartitionDataWriter {
long length = data.readableBytes();
totalBytes += length;
numSubpartitionBytes[partitionId] += length;
super.write(data);
writeDataToFile(data);
isRegionFinished = false;
}
@ -186,6 +186,10 @@ public final class MapPartitionDataWriter extends PartitionDataWriter {
}
public void regionFinish() throws IOException {
// TODO: When region is finished, flush the data to be ready for the reading, in scenarios that
// the upstream task writes and the downstream task reads simultaneously, such as flink hybrid
// shuffle
logger.debug("FileWriter:{} regionFinish", diskFileInfo.getFilePath());
if (regionStartingOffset == totalBytes) {
return;
@ -232,6 +236,10 @@ public final class MapPartitionDataWriter extends PartitionDataWriter {
isRegionFinished = true;
}
protected void writeDataToFile(ByteBuf data) throws IOException {
super.write(data);
}
private synchronized void destroyIndex() {
try {
if (indexChannel != null) {
@ -246,7 +254,10 @@ public final class MapPartitionDataWriter extends PartitionDataWriter {
}
@SuppressWarnings("ByteBufferBackingArray")
private void flushIndex() throws IOException {
protected void flushIndex() throws IOException {
// TODO: force flush the index file channel in scenarios which the upstream task writes and
// downstream task reads simultaneously, such as flink hybrid shuffle
if (indexBuffer != null) {
logger.debug("flushIndex start:{}", diskFileInfo.getIndexPath());
long startTime = System.currentTimeMillis();
@ -275,7 +286,11 @@ public final class MapPartitionDataWriter extends PartitionDataWriter {
}
}
private ByteBuffer allocateIndexBuffer(int numSubpartitions) {
protected MapFileMeta getFileMeta() {
return (MapFileMeta) diskFileInfo.getFileMeta();
}
protected ByteBuffer allocateIndexBuffer(int numSubpartitions) {
// the returned buffer size is no smaller than 4096 bytes to improve disk IO performance
int minBufferSize = 4096;
@ -313,4 +328,28 @@ public final class MapPartitionDataWriter extends PartitionDataWriter {
}
return false;
}
public int getCurrentSubpartition() {
return currentSubpartition;
}
public long[] getNumSubpartitionBytes() {
return numSubpartitionBytes;
}
public long getTotalBytes() {
return totalBytes;
}
public void setCurrentSubpartition(int currentSubpartition) {
this.currentSubpartition = currentSubpartition;
}
public void setTotalBytes(long totalBytes) {
this.totalBytes = totalBytes;
}
public void setRegionFinished(boolean regionFinished) {
isRegionFinished = regionFinished;
}
}

View File

@ -214,6 +214,9 @@ public abstract class PartitionDataWriter implements DeviceObserver {
@VisibleForTesting
public void flush(boolean finalFlush, boolean fromEvict) throws IOException {
// TODO: force flush buffer in scenarios where the upstream task writes and the downstream task
// reads simultaneously, such as flink hybrid shuffle.
// flushBuffer == null here means this writer is already closed
if (flushBuffer != null) {
int numBytes = flushBuffer.readableBytes();

View File

@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.service.deploy.worker.storage.segment;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.meta.MapFileMeta;
import org.apache.celeborn.common.metrics.source.AbstractSource;
import org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor;
import org.apache.celeborn.service.deploy.worker.storage.MapPartitionDataWriter;
import org.apache.celeborn.service.deploy.worker.storage.PartitionDataWriterContext;
import org.apache.celeborn.service.deploy.worker.storage.StorageManager;
/**
* Write the shuffle file in map partition format with segment granularity visibility. This means
* that the shuffle file should be written intermediate, allowing it to be read in segments rather
* than waiting for the entire shuffle file to be completed.
*/
public class SegmentMapPartitionFileWriter extends MapPartitionDataWriter {
public static final Logger logger = LoggerFactory.getLogger(SegmentMapPartitionFileWriter.class);
/**
* subPartitionId -> started (boolean). There are 3 cases: 1. If the subPartition key not exist,
* it indicates that the subPartition has not sent the {@link
* org.apache.celeborn.common.protocol.PbSegmentStart}. Therefore, shuffle data should not be
* written in this case. 2. If the subPartition key exists and the started value is true, it means
* that the subPartition has initiated the segment. In this situation, the next buffer for this
* subPartition will be the first buffer of the current segment. The information about the first
* buffer will be recorded in {@code MapFileMeta#subPartitionSegmentIndexes}. 3. If the
* subPartition key exists and the started value is false, it means that the subPartition has
* initiated the segment, but the next buffer for this subPartition is not the first buffer of the
* current segment.
*/
private final Map<Integer, Boolean> subPartitionHasStartSegment;
// current buffer index per subPartition
private int[] subPartitionBufferIndex;
public SegmentMapPartitionFileWriter(
StorageManager storageManager,
AbstractSource workerSource,
CelebornConf conf,
DeviceMonitor deviceMonitor,
PartitionDataWriterContext writerContext)
throws IOException {
super(storageManager, workerSource, conf, deviceMonitor, writerContext);
this.subPartitionHasStartSegment = new HashMap<>();
}
@Override
public void pushDataHandShake(int numSubpartitions, int bufferSize) {
super.pushDataHandShake(numSubpartitions, bufferSize);
subPartitionBufferIndex = new int[numSubpartitions];
Arrays.fill(subPartitionBufferIndex, 0);
getFileMeta().setIsWriterClosed(false);
getFileMeta().setSegmentGranularityVisible(true);
}
@Override
public void write(ByteBuf data) throws IOException {
data.markReaderIndex();
int subPartitionId = data.readInt();
int attemptId = data.readInt();
int batchId = data.readInt();
int size = data.readInt();
if (!subPartitionHasStartSegment.containsKey(subPartitionId)) {
throw new IllegalStateException(
String.format(
"This partition may not start a segment: subPartitionId:%s attemptId:%s batchId:%s size:%s",
subPartitionId, attemptId, batchId, size));
}
int currentSubpartition = getCurrentSubpartition();
// the subPartitionId must be ordered in a region
if (subPartitionId < currentSubpartition) {
throw new IOException(
String.format(
"Must writing data in reduce partition index order, but now supPartitionId is %s and the previous supPartitionId is %s, attemptId is %s, batchId is %s, size is %s",
subPartitionId, currentSubpartition, attemptId, batchId, size));
}
data.resetReaderIndex();
logger.debug(
"mappartition filename:{} write partition:{} currentSubPartition:{} attemptId:{} batchId:{} size:{}",
diskFileInfo.getFilePath(),
subPartitionId,
currentSubpartition,
attemptId,
batchId,
size);
if (subPartitionId > currentSubpartition) {
setCurrentSubpartition(subPartitionId);
}
long length = data.readableBytes();
setTotalBytes(getTotalBytes() + length);
getNumSubpartitionBytes()[subPartitionId] += length;
if (flushBuffer == null) {
takeBuffer();
}
writeDataToFile(data);
setRegionFinished(false);
MapFileMeta mapFileMeta = getFileMeta();
// Only when the sub partition has stated the segment, the buffer index(this is the first buffer
// of this segment) will be added.
if (subPartitionHasStartSegment.get(subPartitionId)) {
mapFileMeta.addSegmentIdAndFirstBufferIndex(
subPartitionId,
subPartitionBufferIndex[subPartitionId],
mapFileMeta.getPartitionWritingSegmentId(subPartitionId));
logger.debug(
"Add a segment id, partitionId:{}, bufferIndex:{}, segmentId: {}, filename:{}, attemptId:{}.",
subPartitionId,
subPartitionBufferIndex[subPartitionId],
mapFileMeta.getPartitionWritingSegmentId(subPartitionId),
diskFileInfo.getFilePath(),
attemptId);
// After the first buffer index of the segment is added, the following buffers in the segment
// should not be added anymore, so the subPartitionHasStartSegment is updated to false.
subPartitionHasStartSegment.put(subPartitionId, false);
}
subPartitionBufferIndex[subPartitionId]++;
}
@Override
public synchronized long close() throws IOException {
subPartitionHasStartSegment.clear();
long fileLength = super.close();
logger.debug("Close {} for file {}", this, getFile());
getFileMeta().setIsWriterClosed(true);
return fileLength;
}
@Override
public String toString() {
return String.format("SegmentMapPartitionFileWriter{filePath=%s}", diskFileInfo.getFilePath());
}
public void segmentStart(int partitionId, int segmentId) {
getFileMeta().addPartitionSegmentId(partitionId, segmentId);
subPartitionHasStartSegment.put(partitionId, true);
}
}

View File

@ -190,7 +190,8 @@ private[deploy] class Controller(
partitionType,
rangeReadFilter,
userIdentifier,
partitionSplitEnabled)
partitionSplitEnabled,
isSegmentGranularityVisible)
primaryLocs.add(new WorkingPartition(location, writer))
} else {
primaryLocs.add(location)
@ -230,7 +231,8 @@ private[deploy] class Controller(
partitionType,
rangeReadFilter,
userIdentifier,
partitionSplitEnabled)
partitionSplitEnabled,
isSegmentGranularityVisible)
replicaLocs.add(new WorkingPartition(location, writer))
} else {
replicaLocs.add(location)

View File

@ -38,13 +38,14 @@ import org.apache.celeborn.common.network.client.{RpcResponseCallback, Transport
import org.apache.celeborn.common.network.protocol.{Message, PushData, PushDataHandShake, PushMergedData, RegionFinish, RegionStart, RequestMessage, RpcFailure, RpcRequest, RpcResponse, TransportMessage}
import org.apache.celeborn.common.network.protocol.Message.Type
import org.apache.celeborn.common.network.server.BaseMessageHandler
import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType, PbPushDataHandShake, PbRegionFinish, PbRegionStart}
import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType, PbPushDataHandShake, PbRegionFinish, PbRegionStart, PbSegmentStart}
import org.apache.celeborn.common.protocol.PbPartitionLocation.Mode
import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.common.unsafe.Platform
import org.apache.celeborn.common.util.{DiskUtils, ExceptionUtils, Utils}
import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
import org.apache.celeborn.service.deploy.worker.storage.{HdfsFlusher, LocalFlusher, MapPartitionDataWriter, PartitionDataWriter, S3Flusher, StorageManager}
import org.apache.celeborn.service.deploy.worker.storage.segment.SegmentMapPartitionFileWriter
class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler with Logging {
@ -916,6 +917,16 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
rf.getShuffleKey,
rf.getPartitionUniqueId,
false)
case ss: PbSegmentStart =>
(
msg,
null,
false,
Type.SEGMENT_START,
ss.getMode,
ss.getShuffleKey,
ss.getPartitionUniqueId,
false)
}
} catch {
case _: Exception =>
@ -980,6 +991,8 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
(WorkerSource.PRIMARY_REGION_START_TIME, WorkerSource.REPLICA_REGION_START_TIME)
case Type.REGION_FINISH =>
(WorkerSource.PRIMARY_REGION_FINISH_TIME, WorkerSource.REPLICA_REGION_FINISH_TIME)
case Type.SEGMENT_START =>
(WorkerSource.PRIMARY_SEGMENT_START_TIME, WorkerSource.REPLICA_SEGMENT_START_TIME)
case _ => throw new IllegalArgumentException(s"Not support $messageType yet")
}
@ -1064,6 +1077,14 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
isBroadcast)
case Type.REGION_FINISH =>
fileWriter.asInstanceOf[MapPartitionDataWriter].regionFinish()
case Type.SEGMENT_START =>
val (subPartitionId, segmentId) =
(
pbMsg.asInstanceOf[PbSegmentStart].getSubPartitionId,
pbMsg.asInstanceOf[PbSegmentStart].getSegmentId)
fileWriter.asInstanceOf[SegmentMapPartitionFileWriter].segmentStart(
subPartitionId,
segmentId)
case _ => throw new IllegalArgumentException(s"Not support $messageType yet")
}
// for primary , send data to replica
@ -1123,6 +1144,9 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
case Type.REGION_FINISH =>
workerSource.incCounter(WorkerSource.REGION_FINISH_FAIL_COUNT)
callback.onFailure(new CelebornIOException(StatusCode.REGION_FINISH_FAIL_REPLICA, e))
case Type.SEGMENT_START =>
workerSource.incCounter(WorkerSource.SEGMENT_START_FAIL_COUNT)
callback.onFailure(new CelebornIOException(StatusCode.SEGMENT_START_FAIL_REPLICA, e))
case _ =>
workerSource.incCounter(WorkerSource.REPLICATE_DATA_FAIL_COUNT)
if (e.isInstanceOf[CelebornIOException]) {
@ -1177,6 +1201,9 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
case Type.REGION_FINISH => (
StatusCode.REGION_FINISH_FAIL_PRIMARY,
StatusCode.REGION_FINISH_FAIL_REPLICA)
case Type.SEGMENT_START => (
StatusCode.SEGMENT_START_FAIL_PRIMARY,
StatusCode.SEGMENT_START_FAIL_REPLICA)
case _ => throw new IllegalArgumentException(s"Not support $messageType yet")
}
callback.onFailure(new CelebornIOException(

View File

@ -56,6 +56,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, MetricsSyste
addCounter(REGION_START_FAIL_COUNT)
addCounter(REGION_FINISH_FAIL_COUNT)
addCounter(ACTIVE_CONNECTION_COUNT)
addCounter(SEGMENT_START_FAIL_COUNT)
addCounter(SLOTS_ALLOCATED)
@ -72,6 +73,8 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, MetricsSyste
addTimer(REPLICA_REGION_START_TIME)
addTimer(PRIMARY_REGION_FINISH_TIME)
addTimer(REPLICA_REGION_FINISH_TIME)
addTimer(PRIMARY_SEGMENT_START_TIME)
addTimer(REPLICA_SEGMENT_START_TIME)
addTimer(FETCH_CHUNK_TIME)
addTimer(OPEN_STREAM_TIME)
@ -151,12 +154,15 @@ object WorkerSource {
val PUSH_DATA_HANDSHAKE_FAIL_COUNT = "PushDataHandshakeFailCount"
val REGION_START_FAIL_COUNT = "RegionStartFailCount"
val REGION_FINISH_FAIL_COUNT = "RegionFinishFailCount"
val SEGMENT_START_FAIL_COUNT = "SegmentStartFailCount"
val PRIMARY_PUSH_DATA_HANDSHAKE_TIME = "PrimaryPushDataHandshakeTime"
val REPLICA_PUSH_DATA_HANDSHAKE_TIME = "ReplicaPushDataHandshakeTime"
val PRIMARY_REGION_START_TIME = "PrimaryRegionStartTime"
val REPLICA_REGION_START_TIME = "ReplicaRegionStartTime"
val PRIMARY_REGION_FINISH_TIME = "PrimaryRegionFinishTime"
val REPLICA_REGION_FINISH_TIME = "ReplicaRegionFinishTime"
val PRIMARY_SEGMENT_START_TIME = "PrimarySegmentStartTime"
val REPLICA_SEGMENT_START_TIME = "ReplicaSegmentStartTime"
// pause push data
val PAUSE_PUSH_DATA_TIME = "PausePushDataTime"

View File

@ -44,6 +44,8 @@ private[worker] class LocalFlushTask(
fileChannel.write(buffer)
}
}
// TODO: force flush file channel in scenarios where the upstream task writes and the downstream task reads simultaneously, such as flink hybrid shuffle.
}
}

View File

@ -49,6 +49,7 @@ import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager.MemoryPressureListener
import org.apache.celeborn.service.deploy.worker.shuffledb.{DB, DBBackend, DBProvider}
import org.apache.celeborn.service.deploy.worker.storage.StorageManager.hadoopFs
import org.apache.celeborn.service.deploy.worker.storage.segment.SegmentMapPartitionFileWriter
final private[worker] class StorageManager(conf: CelebornConf, workerSource: AbstractSource)
extends ShuffleRecoverHelper with DeviceObserver with Logging with MemoryPressureListener {
@ -407,7 +408,8 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
partitionType,
rangeReadFilter,
userIdentifier,
true)
true,
isSegmentGranularityVisible = false)
}
@throws[IOException]
@ -420,7 +422,8 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
partitionType: PartitionType,
rangeReadFilter: Boolean,
userIdentifier: UserIdentifier,
partitionSplitEnabled: Boolean): PartitionDataWriter = {
partitionSplitEnabled: Boolean,
isSegmentGranularityVisible: Boolean): PartitionDataWriter = {
if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage && !hasS3Storage) {
throw new IOException("No available working dirs!")
}
@ -438,7 +441,14 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
val writer =
try {
partitionType match {
case PartitionType.MAP => new MapPartitionDataWriter(
case PartitionType.MAP =>
if (isSegmentGranularityVisible) new SegmentMapPartitionFileWriter(
this,
workerSource,
conf,
deviceMonitor,
partitionDataWriterContext)
else new MapPartitionDataWriter(
this,
workerSource,
conf,