diff --git a/assets/grafana/celeborn-dashboard.json b/assets/grafana/celeborn-dashboard.json index d2c462c5b..9b07db6c6 100644 --- a/assets/grafana/celeborn-dashboard.json +++ b/assets/grafana/celeborn-dashboard.json @@ -7555,7 +7555,7 @@ "h": 9, "w": 12, "x": 0, - "y": 280 + "y": 10 }, "id": 139, "options": { @@ -7647,7 +7647,7 @@ "h": 9, "w": 12, "x": 12, - "y": 280 + "y": 10 }, "id": 141, "options": { @@ -7739,7 +7739,7 @@ "h": 9, "w": 12, "x": 0, - "y": 289 + "y": 19 }, "id": 142, "options": { @@ -7831,7 +7831,7 @@ "h": 9, "w": 12, "x": 12, - "y": 289 + "y": 19 }, "id": 143, "options": { @@ -7923,7 +7923,7 @@ "h": 9, "w": 12, "x": 0, - "y": 298 + "y": 28 }, "id": 144, "options": { @@ -8015,7 +8015,7 @@ "h": 9, "w": 12, "x": 12, - "y": 298 + "y": 28 }, "id": 145, "options": { @@ -8107,7 +8107,7 @@ "h": 9, "w": 12, "x": 0, - "y": 307 + "y": 37 }, "id": 146, "options": { @@ -8199,7 +8199,7 @@ "h": 9, "w": 12, "x": 12, - "y": 307 + "y": 37 }, "id": 147, "options": { @@ -8291,7 +8291,7 @@ "h": 9, "w": 12, "x": 0, - "y": 316 + "y": 46 }, "id": 148, "options": { @@ -8383,7 +8383,7 @@ "h": 9, "w": 12, "x": 12, - "y": 316 + "y": 46 }, "id": 149, "options": { @@ -8475,7 +8475,7 @@ "h": 9, "w": 12, "x": 0, - "y": 325 + "y": 55 }, "id": 150, "options": { @@ -8567,7 +8567,7 @@ "h": 9, "w": 12, "x": 12, - "y": 325 + "y": 55 }, "id": 151, "options": { @@ -8658,7 +8658,7 @@ "h": 8, "w": 12, "x": 0, - "y": 334 + "y": 64 }, "id": 153, "options": { @@ -8749,7 +8749,7 @@ "h": 8, "w": 12, "x": 12, - "y": 334 + "y": 64 }, "id": 154, "options": { @@ -8840,7 +8840,7 @@ "h": 8, "w": 12, "x": 0, - "y": 342 + "y": 72 }, "id": 155, "options": { @@ -8870,6 +8870,465 @@ ], "title": "metrics_RegionStartFailCount_Count", "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 72 + }, + "id": 200, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "metrics_SegmentStartFailCount_Count", + "legendFormat": "${baseLegend}", + "range": true, + "refId": "A" + } + ], + "title": "metrics_SegmentStartFailCount_Count", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 0, + "y": 80 + }, + "id": 198, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "metrics_ReplicaSegmentStartTime_Mean", + "legendFormat": "${baseLegend}", + "range": true, + "refId": "A" + } + ], + "title": "metrics_ReplicaSegmentStartTime_Mean", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 12, + "y": 80 + }, + "id": 199, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "metrics_ReplicaSegmentStartTime_Max", + "legendFormat": "${baseLegend}", + "range": true, + "refId": "A" + } + ], + "title": "metrics_ReplicaSegmentStartTime_Max", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 0, + "y": 89 + }, + "id": 196, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "metrics_PrimarySegmentStartTime_Mean", + "legendFormat": "${baseLegend}", + "range": true, + "refId": "A" + } + ], + "title": "metrics_PrimarySegmentStartTime_Mean", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 12, + "y": 89 + }, + "id": 197, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "metrics_PrimarySegmentStartTime_Max", + "legendFormat": "${baseLegend}", + "range": true, + "refId": "A" + } + ], + "title": "metrics_PrimarySegmentStartTime_Max", + "type": "timeseries" } ], "title": "MapPartitionRelatives", diff --git a/docs/monitoring.md b/docs/monitoring.md index 8a1629454..5568b8f76 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -179,6 +179,8 @@ These metrics are exposed by Celeborn worker. | FetchChunkFailCount | The count of fetching chunk failed in current worker. | | PrimaryPushDataTime | The time for a worker to handle a pushData RPC sent from a celeborn client. | | ReplicaPushDataTime | The time for a worker to handle a pushData RPC sent from a celeborn worker by replicating. | + | PrimarySegmentStartTime | The time for a worker to handle a segmentStart RPC sent from a celeborn client. | + | ReplicaSegmentStartTime | The time for a worker to handle a segmentStart RPC sent from a celeborn worker by replicating. | | WriteDataHardSplitCount | The count of writing PushData or PushMergedData to HARD_SPLIT partition in current worker. | | WriteDataSuccessCount | The count of writing PushData or PushMergedData succeed in current worker. | | WriteDataFailCount | The count of writing PushData or PushMergedData failed in current worker. | @@ -191,6 +193,7 @@ These metrics are exposed by Celeborn worker. | PushDataHandshakeFailCount | The count of PushDataHandshake failed in current worker. | | RegionStartFailCount | The count of RegionStart failed in current worker. | | RegionFinishFailCount | The count of RegionFinish failed in current worker. | + | SegmentStartFailCount | The count of SegmentStart failed in current worker. | | PrimaryPushDataHandshakeTime | PrimaryPushDataHandshake means handle PushData of primary partition location. | | ReplicaPushDataHandshakeTime | ReplicaPushDataHandshake means handle PushData of replica partition location. | | PrimaryRegionStartTime | PrimaryRegionStart means handle RegionStart of primary partition location. | diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java index cb671b05f..0a4cb1379 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java @@ -40,7 +40,7 @@ import org.apache.celeborn.common.util.Utils; /* * map partition file writer, it will create index for each partition */ -public final class MapPartitionDataWriter extends PartitionDataWriter { +public class MapPartitionDataWriter extends PartitionDataWriter { private static final Logger logger = LoggerFactory.getLogger(MapPartitionDataWriter.class); private int numSubpartitions; @@ -115,7 +115,7 @@ public final class MapPartitionDataWriter extends PartitionDataWriter { long length = data.readableBytes(); totalBytes += length; numSubpartitionBytes[partitionId] += length; - super.write(data); + writeDataToFile(data); isRegionFinished = false; } @@ -186,6 +186,10 @@ public final class MapPartitionDataWriter extends PartitionDataWriter { } public void regionFinish() throws IOException { + // TODO: When region is finished, flush the data to be ready for the reading, in scenarios that + // the upstream task writes and the downstream task reads simultaneously, such as flink hybrid + // shuffle + logger.debug("FileWriter:{} regionFinish", diskFileInfo.getFilePath()); if (regionStartingOffset == totalBytes) { return; @@ -232,6 +236,10 @@ public final class MapPartitionDataWriter extends PartitionDataWriter { isRegionFinished = true; } + protected void writeDataToFile(ByteBuf data) throws IOException { + super.write(data); + } + private synchronized void destroyIndex() { try { if (indexChannel != null) { @@ -246,7 +254,10 @@ public final class MapPartitionDataWriter extends PartitionDataWriter { } @SuppressWarnings("ByteBufferBackingArray") - private void flushIndex() throws IOException { + protected void flushIndex() throws IOException { + // TODO: force flush the index file channel in scenarios which the upstream task writes and + // downstream task reads simultaneously, such as flink hybrid shuffle + if (indexBuffer != null) { logger.debug("flushIndex start:{}", diskFileInfo.getIndexPath()); long startTime = System.currentTimeMillis(); @@ -275,7 +286,11 @@ public final class MapPartitionDataWriter extends PartitionDataWriter { } } - private ByteBuffer allocateIndexBuffer(int numSubpartitions) { + protected MapFileMeta getFileMeta() { + return (MapFileMeta) diskFileInfo.getFileMeta(); + } + + protected ByteBuffer allocateIndexBuffer(int numSubpartitions) { // the returned buffer size is no smaller than 4096 bytes to improve disk IO performance int minBufferSize = 4096; @@ -313,4 +328,28 @@ public final class MapPartitionDataWriter extends PartitionDataWriter { } return false; } + + public int getCurrentSubpartition() { + return currentSubpartition; + } + + public long[] getNumSubpartitionBytes() { + return numSubpartitionBytes; + } + + public long getTotalBytes() { + return totalBytes; + } + + public void setCurrentSubpartition(int currentSubpartition) { + this.currentSubpartition = currentSubpartition; + } + + public void setTotalBytes(long totalBytes) { + this.totalBytes = totalBytes; + } + + public void setRegionFinished(boolean regionFinished) { + isRegionFinished = regionFinished; + } } diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java index 5172193f6..5bb506270 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java @@ -214,6 +214,9 @@ public abstract class PartitionDataWriter implements DeviceObserver { @VisibleForTesting public void flush(boolean finalFlush, boolean fromEvict) throws IOException { + // TODO: force flush buffer in scenarios where the upstream task writes and the downstream task + // reads simultaneously, such as flink hybrid shuffle. + // flushBuffer == null here means this writer is already closed if (flushBuffer != null) { int numBytes = flushBuffer.readableBytes(); diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionFileWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionFileWriter.java new file mode 100644 index 000000000..634afc676 --- /dev/null +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionFileWriter.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.worker.storage.segment; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import io.netty.buffer.ByteBuf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.meta.MapFileMeta; +import org.apache.celeborn.common.metrics.source.AbstractSource; +import org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor; +import org.apache.celeborn.service.deploy.worker.storage.MapPartitionDataWriter; +import org.apache.celeborn.service.deploy.worker.storage.PartitionDataWriterContext; +import org.apache.celeborn.service.deploy.worker.storage.StorageManager; + +/** + * Write the shuffle file in map partition format with segment granularity visibility. This means + * that the shuffle file should be written intermediate, allowing it to be read in segments rather + * than waiting for the entire shuffle file to be completed. + */ +public class SegmentMapPartitionFileWriter extends MapPartitionDataWriter { + + public static final Logger logger = LoggerFactory.getLogger(SegmentMapPartitionFileWriter.class); + + /** + * subPartitionId -> started (boolean). There are 3 cases: 1. If the subPartition key not exist, + * it indicates that the subPartition has not sent the {@link + * org.apache.celeborn.common.protocol.PbSegmentStart}. Therefore, shuffle data should not be + * written in this case. 2. If the subPartition key exists and the started value is true, it means + * that the subPartition has initiated the segment. In this situation, the next buffer for this + * subPartition will be the first buffer of the current segment. The information about the first + * buffer will be recorded in {@code MapFileMeta#subPartitionSegmentIndexes}. 3. If the + * subPartition key exists and the started value is false, it means that the subPartition has + * initiated the segment, but the next buffer for this subPartition is not the first buffer of the + * current segment. + */ + private final Map subPartitionHasStartSegment; + + // current buffer index per subPartition + private int[] subPartitionBufferIndex; + + public SegmentMapPartitionFileWriter( + StorageManager storageManager, + AbstractSource workerSource, + CelebornConf conf, + DeviceMonitor deviceMonitor, + PartitionDataWriterContext writerContext) + throws IOException { + super(storageManager, workerSource, conf, deviceMonitor, writerContext); + this.subPartitionHasStartSegment = new HashMap<>(); + } + + @Override + public void pushDataHandShake(int numSubpartitions, int bufferSize) { + super.pushDataHandShake(numSubpartitions, bufferSize); + subPartitionBufferIndex = new int[numSubpartitions]; + Arrays.fill(subPartitionBufferIndex, 0); + getFileMeta().setIsWriterClosed(false); + getFileMeta().setSegmentGranularityVisible(true); + } + + @Override + public void write(ByteBuf data) throws IOException { + data.markReaderIndex(); + int subPartitionId = data.readInt(); + int attemptId = data.readInt(); + int batchId = data.readInt(); + int size = data.readInt(); + + if (!subPartitionHasStartSegment.containsKey(subPartitionId)) { + throw new IllegalStateException( + String.format( + "This partition may not start a segment: subPartitionId:%s attemptId:%s batchId:%s size:%s", + subPartitionId, attemptId, batchId, size)); + } + int currentSubpartition = getCurrentSubpartition(); + // the subPartitionId must be ordered in a region + if (subPartitionId < currentSubpartition) { + throw new IOException( + String.format( + "Must writing data in reduce partition index order, but now supPartitionId is %s and the previous supPartitionId is %s, attemptId is %s, batchId is %s, size is %s", + subPartitionId, currentSubpartition, attemptId, batchId, size)); + } + + data.resetReaderIndex(); + logger.debug( + "mappartition filename:{} write partition:{} currentSubPartition:{} attemptId:{} batchId:{} size:{}", + diskFileInfo.getFilePath(), + subPartitionId, + currentSubpartition, + attemptId, + batchId, + size); + + if (subPartitionId > currentSubpartition) { + setCurrentSubpartition(subPartitionId); + } + long length = data.readableBytes(); + setTotalBytes(getTotalBytes() + length); + getNumSubpartitionBytes()[subPartitionId] += length; + if (flushBuffer == null) { + takeBuffer(); + } + writeDataToFile(data); + setRegionFinished(false); + + MapFileMeta mapFileMeta = getFileMeta(); + // Only when the sub partition has stated the segment, the buffer index(this is the first buffer + // of this segment) will be added. + if (subPartitionHasStartSegment.get(subPartitionId)) { + mapFileMeta.addSegmentIdAndFirstBufferIndex( + subPartitionId, + subPartitionBufferIndex[subPartitionId], + mapFileMeta.getPartitionWritingSegmentId(subPartitionId)); + logger.debug( + "Add a segment id, partitionId:{}, bufferIndex:{}, segmentId: {}, filename:{}, attemptId:{}.", + subPartitionId, + subPartitionBufferIndex[subPartitionId], + mapFileMeta.getPartitionWritingSegmentId(subPartitionId), + diskFileInfo.getFilePath(), + attemptId); + // After the first buffer index of the segment is added, the following buffers in the segment + // should not be added anymore, so the subPartitionHasStartSegment is updated to false. + subPartitionHasStartSegment.put(subPartitionId, false); + } + subPartitionBufferIndex[subPartitionId]++; + } + + @Override + public synchronized long close() throws IOException { + subPartitionHasStartSegment.clear(); + long fileLength = super.close(); + logger.debug("Close {} for file {}", this, getFile()); + getFileMeta().setIsWriterClosed(true); + return fileLength; + } + + @Override + public String toString() { + return String.format("SegmentMapPartitionFileWriter{filePath=%s}", diskFileInfo.getFilePath()); + } + + public void segmentStart(int partitionId, int segmentId) { + getFileMeta().addPartitionSegmentId(partitionId, segmentId); + subPartitionHasStartSegment.put(partitionId, true); + } +} diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala index 0a141ede8..0b0c53c1f 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala @@ -190,7 +190,8 @@ private[deploy] class Controller( partitionType, rangeReadFilter, userIdentifier, - partitionSplitEnabled) + partitionSplitEnabled, + isSegmentGranularityVisible) primaryLocs.add(new WorkingPartition(location, writer)) } else { primaryLocs.add(location) @@ -230,7 +231,8 @@ private[deploy] class Controller( partitionType, rangeReadFilter, userIdentifier, - partitionSplitEnabled) + partitionSplitEnabled, + isSegmentGranularityVisible) replicaLocs.add(new WorkingPartition(location, writer)) } else { replicaLocs.add(location) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index c98acaf10..9fe99c4fd 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -38,13 +38,14 @@ import org.apache.celeborn.common.network.client.{RpcResponseCallback, Transport import org.apache.celeborn.common.network.protocol.{Message, PushData, PushDataHandShake, PushMergedData, RegionFinish, RegionStart, RequestMessage, RpcFailure, RpcRequest, RpcResponse, TransportMessage} import org.apache.celeborn.common.network.protocol.Message.Type import org.apache.celeborn.common.network.server.BaseMessageHandler -import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType, PbPushDataHandShake, PbRegionFinish, PbRegionStart} +import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType, PbPushDataHandShake, PbRegionFinish, PbRegionStart, PbSegmentStart} import org.apache.celeborn.common.protocol.PbPartitionLocation.Mode import org.apache.celeborn.common.protocol.message.StatusCode import org.apache.celeborn.common.unsafe.Platform import org.apache.celeborn.common.util.{DiskUtils, ExceptionUtils, Utils} import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController import org.apache.celeborn.service.deploy.worker.storage.{HdfsFlusher, LocalFlusher, MapPartitionDataWriter, PartitionDataWriter, S3Flusher, StorageManager} +import org.apache.celeborn.service.deploy.worker.storage.segment.SegmentMapPartitionFileWriter class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler with Logging { @@ -916,6 +917,16 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler rf.getShuffleKey, rf.getPartitionUniqueId, false) + case ss: PbSegmentStart => + ( + msg, + null, + false, + Type.SEGMENT_START, + ss.getMode, + ss.getShuffleKey, + ss.getPartitionUniqueId, + false) } } catch { case _: Exception => @@ -980,6 +991,8 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler (WorkerSource.PRIMARY_REGION_START_TIME, WorkerSource.REPLICA_REGION_START_TIME) case Type.REGION_FINISH => (WorkerSource.PRIMARY_REGION_FINISH_TIME, WorkerSource.REPLICA_REGION_FINISH_TIME) + case Type.SEGMENT_START => + (WorkerSource.PRIMARY_SEGMENT_START_TIME, WorkerSource.REPLICA_SEGMENT_START_TIME) case _ => throw new IllegalArgumentException(s"Not support $messageType yet") } @@ -1064,6 +1077,14 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler isBroadcast) case Type.REGION_FINISH => fileWriter.asInstanceOf[MapPartitionDataWriter].regionFinish() + case Type.SEGMENT_START => + val (subPartitionId, segmentId) = + ( + pbMsg.asInstanceOf[PbSegmentStart].getSubPartitionId, + pbMsg.asInstanceOf[PbSegmentStart].getSegmentId) + fileWriter.asInstanceOf[SegmentMapPartitionFileWriter].segmentStart( + subPartitionId, + segmentId) case _ => throw new IllegalArgumentException(s"Not support $messageType yet") } // for primary , send data to replica @@ -1123,6 +1144,9 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler case Type.REGION_FINISH => workerSource.incCounter(WorkerSource.REGION_FINISH_FAIL_COUNT) callback.onFailure(new CelebornIOException(StatusCode.REGION_FINISH_FAIL_REPLICA, e)) + case Type.SEGMENT_START => + workerSource.incCounter(WorkerSource.SEGMENT_START_FAIL_COUNT) + callback.onFailure(new CelebornIOException(StatusCode.SEGMENT_START_FAIL_REPLICA, e)) case _ => workerSource.incCounter(WorkerSource.REPLICATE_DATA_FAIL_COUNT) if (e.isInstanceOf[CelebornIOException]) { @@ -1177,6 +1201,9 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler case Type.REGION_FINISH => ( StatusCode.REGION_FINISH_FAIL_PRIMARY, StatusCode.REGION_FINISH_FAIL_REPLICA) + case Type.SEGMENT_START => ( + StatusCode.SEGMENT_START_FAIL_PRIMARY, + StatusCode.SEGMENT_START_FAIL_REPLICA) case _ => throw new IllegalArgumentException(s"Not support $messageType yet") } callback.onFailure(new CelebornIOException( diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index 5358fab02..b2777c564 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -56,6 +56,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, MetricsSyste addCounter(REGION_START_FAIL_COUNT) addCounter(REGION_FINISH_FAIL_COUNT) addCounter(ACTIVE_CONNECTION_COUNT) + addCounter(SEGMENT_START_FAIL_COUNT) addCounter(SLOTS_ALLOCATED) @@ -72,6 +73,8 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, MetricsSyste addTimer(REPLICA_REGION_START_TIME) addTimer(PRIMARY_REGION_FINISH_TIME) addTimer(REPLICA_REGION_FINISH_TIME) + addTimer(PRIMARY_SEGMENT_START_TIME) + addTimer(REPLICA_SEGMENT_START_TIME) addTimer(FETCH_CHUNK_TIME) addTimer(OPEN_STREAM_TIME) @@ -151,12 +154,15 @@ object WorkerSource { val PUSH_DATA_HANDSHAKE_FAIL_COUNT = "PushDataHandshakeFailCount" val REGION_START_FAIL_COUNT = "RegionStartFailCount" val REGION_FINISH_FAIL_COUNT = "RegionFinishFailCount" + val SEGMENT_START_FAIL_COUNT = "SegmentStartFailCount" val PRIMARY_PUSH_DATA_HANDSHAKE_TIME = "PrimaryPushDataHandshakeTime" val REPLICA_PUSH_DATA_HANDSHAKE_TIME = "ReplicaPushDataHandshakeTime" val PRIMARY_REGION_START_TIME = "PrimaryRegionStartTime" val REPLICA_REGION_START_TIME = "ReplicaRegionStartTime" val PRIMARY_REGION_FINISH_TIME = "PrimaryRegionFinishTime" val REPLICA_REGION_FINISH_TIME = "ReplicaRegionFinishTime" + val PRIMARY_SEGMENT_START_TIME = "PrimarySegmentStartTime" + val REPLICA_SEGMENT_START_TIME = "ReplicaSegmentStartTime" // pause push data val PAUSE_PUSH_DATA_TIME = "PausePushDataTime" diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala index df0d63be3..0c4fa4105 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala @@ -44,6 +44,8 @@ private[worker] class LocalFlushTask( fileChannel.write(buffer) } } + + // TODO: force flush file channel in scenarios where the upstream task writes and the downstream task reads simultaneously, such as flink hybrid shuffle. } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index c7c8f8adf..00e0a2887 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -49,6 +49,7 @@ import org.apache.celeborn.service.deploy.worker.memory.MemoryManager import org.apache.celeborn.service.deploy.worker.memory.MemoryManager.MemoryPressureListener import org.apache.celeborn.service.deploy.worker.shuffledb.{DB, DBBackend, DBProvider} import org.apache.celeborn.service.deploy.worker.storage.StorageManager.hadoopFs +import org.apache.celeborn.service.deploy.worker.storage.segment.SegmentMapPartitionFileWriter final private[worker] class StorageManager(conf: CelebornConf, workerSource: AbstractSource) extends ShuffleRecoverHelper with DeviceObserver with Logging with MemoryPressureListener { @@ -407,7 +408,8 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs partitionType, rangeReadFilter, userIdentifier, - true) + true, + isSegmentGranularityVisible = false) } @throws[IOException] @@ -420,7 +422,8 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs partitionType: PartitionType, rangeReadFilter: Boolean, userIdentifier: UserIdentifier, - partitionSplitEnabled: Boolean): PartitionDataWriter = { + partitionSplitEnabled: Boolean, + isSegmentGranularityVisible: Boolean): PartitionDataWriter = { if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage && !hasS3Storage) { throw new IOException("No available working dirs!") } @@ -438,7 +441,14 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val writer = try { partitionType match { - case PartitionType.MAP => new MapPartitionDataWriter( + case PartitionType.MAP => + if (isSegmentGranularityVisible) new SegmentMapPartitionFileWriter( + this, + workerSource, + conf, + deviceMonitor, + partitionDataWriterContext) + else new MapPartitionDataWriter( this, workerSource, conf,