### What changes were proposed in this pull request? Improve Celeborn document to fix typos, formats, unvalid link and unsynced default value of document. Meanwhile, the public interfaces of `shuffleclient.md` keep the consistent with `ShuffleClient`. ### Why are the changes needed? There are some typos, formats, unvalid link and unsynced default value fixes in Celeborn document at present. Meanwhile, the public interfaces of `shuffleclient.md` is inconsistent with `ShuffleClient`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No. Closes #2410 from SteNicholas/CELEBORN-1341. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
182 lines
9.3 KiB
Markdown
182 lines
9.3 KiB
Markdown
---
|
|
license: |
|
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
|
contributor license agreements. See the NOTICE file distributed with
|
|
this work for additional information regarding copyright ownership.
|
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
|
(the "License"); you may not use this file except in compliance with
|
|
the License. You may obtain a copy of the License at
|
|
|
|
https://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
---
|
|
|
|
# ShuffleClient
|
|
|
|
## Overview
|
|
ShuffleClient is responsible for pushing and reading shuffle data. It's a singleton in each leaf process,
|
|
i.e. Executor in Apache Spark, or TaskManager in Apache Flink. This article describes the detailed design for
|
|
push data and read data.
|
|
|
|
## Push Data
|
|
### API specification
|
|
The push data API is as follows:
|
|
```java
|
|
public abstract int pushData(
|
|
int shuffleId,
|
|
int mapId,
|
|
int attemptId,
|
|
int partitionId,
|
|
byte[] data,
|
|
int offset,
|
|
int length,
|
|
int numMappers,
|
|
int numPartitions)
|
|
```
|
|
|
|
- `shuffleId` is the unique shuffle id of the application
|
|
- `mapId` is the map id of the shuffle
|
|
- `attemptId` is the attempt id of the map task, i.e. speculative task or task rerun for Apache Spark
|
|
- `partitionId` is the partition id the data belongs to
|
|
- `data`,`offset`,`length` specifies the bytes to be pushed
|
|
- `numMappers` is the number map tasks in the shuffle
|
|
- `numPartitions` is the number of partitions in the shuffle
|
|
|
|
### Lazy Shuffle Register
|
|
The first time `pushData` is called, Client will check whether the shuffle id has been registered. If not,
|
|
it sends `RegisterShuffle` to `LifecycleManager`, `LifecycleManager` then sends `RequestSlots` to `Master`.
|
|
`RequestSlots` specifies how many `PartitionLocation`s this shuffle requires, each `PartitionLocation` logically
|
|
responds to data of some partition id.
|
|
|
|
Upon receiving `RequestSlots`, `Master` allocates slots for the shuffle among `Worker`s. If replication is turned on,
|
|
`Master` allocates a pair of `Worker`s for each `PartitionLocation` to store two replicas for each `PartitionLocation`.
|
|
The detailed allocation strategy can be found in [Slots Allocation](../../developers/master#slots-allocation). `Master` then
|
|
responds to `LifecycleManager` with the allocated `PartitionLocation`s.
|
|
|
|
`LifcycleManager` caches the `PartitionLocation`s for the shuffle and responds to each `RegisterShuffle` RPCs from
|
|
`ShuffleClient`s.
|
|
|
|
### Normal Push
|
|
In normal cases, the process of pushing data is as follows:
|
|
|
|
- `ShuffleClient` compresses data, currently supports `zstd` and `lz4`
|
|
- `ShuffleClient` adds Header for the data: `mapId`, `attemptId`, `batchId` and `size`. The `bastchId` is a unique
|
|
id for the data batch inside the (`mapId`, `attemptId`), for the purpose of de-duplication
|
|
- `ShuffleClient` sends `PushData` to the `Worker` on which the current `PartitionLocation` is allocated, and holds push
|
|
state for this pushing
|
|
- `Worker` receives the data, do replication if needed, then responds success ACK to `ShuffleClient`. For more details
|
|
about how data is replicated and stored in `Worker`s, please refer to [Worker](../../developers/worker)
|
|
- Upon receiving success ACK from `Worker`, `ShuffleClient` considers success for this pushing and modifies the push state
|
|
|
|
### Push or Merge?
|
|
If the size of data to be pushed is small, say hundreds of bytes, it will be very inefficient to send to the wire.
|
|
So `ShuffleClient` offers another API: `mergeData` to batch data locally before sending to `Worker`.
|
|
|
|
`mergeData` merges data with the same target into `DataBatches`. `Same target` means the destination for both the
|
|
primary and replica are the same. When the size of a `DataBatches` exceeds a threshold (defaults to`64KiB`),
|
|
`ShuffleClient` triggers pushing and sends `PushMergedData` to the destination.
|
|
|
|
Upon receiving `PushMergedData`, `Worker` unpacks it into data segments each for a specific `PartitionLocation`, then
|
|
stores them accordingly.
|
|
|
|
### Async Push
|
|
Celeborn's `ShuffleClient` does not block compute engine's execution by asynchronous pushing, implemented in
|
|
`DataPusher`.
|
|
|
|
Whenever compute engine decides to push data, it calls `DataPusher#addTask`, `DataPusher` creates a `PushTask` which
|
|
contains the data, and added the `PushTask` in a non-blocking queue. `DataPusher` continuously poll the queue
|
|
and invokes `ShuffleClient#pushData` to do actual push.
|
|
|
|
### Split
|
|
As mentioned before, Celeborn will split a `PartitionLocation` when any of the following conditions happens:
|
|
|
|
- `PartitionLocation` file exceeds threshold (defaults to 1GiB)
|
|
- Usable space of local disk is less than threshold (defaults to 5GiB)
|
|
- `Worker` is in `Graceful Shutdown` state
|
|
- Push data fails
|
|
|
|
For the first three cases, `Worker` informs `ShuffleClient` that it should trigger split; for the last case,
|
|
`ShuffleClient` triggers split itself.
|
|
|
|
There are two kinds of Split:
|
|
|
|
- `HARD_SPLIT`, meaning old `PartitionLocation` epoch refuses to accept any data, and future data of the
|
|
`PartitionLocation` will only be pushed after new `PartitionLocation` epoch is ready
|
|
- `SOFT_SPLIT`, meaning old `PartitionLocation` epoch continues to accept data, when new epoch is ready, `ShuffleClient`
|
|
switches to the new location transparently
|
|
|
|
The process of `SOFT_SPLIT` is as follows:
|
|
|
|

|
|
|
|
`LifecycleManager` keeps the split information and tells reducer to read from all splits of the `PartitionLocation`
|
|
to guarantee no data is lost.
|
|
|
|
## Read Data
|
|
### API specification
|
|
`ShuffleClient` provides an API that creates an InputStream to read data from a partition id. Users can also set
|
|
`startMapIndex` and `endMapIndex` to read data within the map range.
|
|
```java
|
|
public abstract CelebornInputStream readPartition(
|
|
int shuffleId,
|
|
int appShuffleId,
|
|
int partitionId,
|
|
int attemptNumber,
|
|
int startMapIndex,
|
|
int endMapIndex,
|
|
ExceptionMaker exceptionMaker,
|
|
MetricsCallback metricsCallback)
|
|
```
|
|
|
|
- `shuffleId` is the unique shuffle id of Celeborn
|
|
- `appShuffleId` is the unique shuffle id of the application
|
|
- `partitionId` is the partition id to read from
|
|
- `attemptNumber` is the attempt id of reduce task, can be safely set to any value
|
|
- `startMapIndex` is the index of start map index of interested map range, set to 0 if you want to read all
|
|
partition data
|
|
- `endMapIndex` is the index of end map index of interested map range, set to `Integer.MAX_VALUE` if you want
|
|
to read all partition data
|
|
- `exceptionMaker` is the marker of exception including fetch failure exception.
|
|
- `metricsCallback` is the callback of monitoring metrics to increase read bytes and time etc.
|
|
|
|
The returned input stream is guaranteed to be `Exactly Once`, meaning no data lost and no duplicated reading, or else
|
|
an exception will be thrown, see [Here](../../developers/faulttolerant#exactly-once).
|
|
|
|
### Get PartitionLocations
|
|
To read data from a partition id, `ShuffleClient` first checks whether the mapping from partition id to all
|
|
`PartitionLocation`s are locally cached, if not, `ShuffleClient` sends GetReducerFileGroup to `LifecycleManager`
|
|
for the mapping, see [Here](../../developers/lifecyclemanager#getreducerfilegroup).
|
|
|
|
### Read from PartitionLocation
|
|
`ShuffleClient` creates a `PartitionReader` for each `PartitinLocation`.
|
|
As described [Here](../../developers/storage#multi-layered-storage), `PartitionLocation` data can be stored in
|
|
different medium, i.e. memory, local disk, distributed filesystem. For the former two, it creates
|
|
a `WorkerPartitionReader` to read from `Worker`, for the last one, it creates a `DfsPartitionReader` to read
|
|
directly from the distributed filesystem.
|
|
|
|
As described [Here](../../developers/storage#reducepartition), the file is chunked. `WorkerPartitionReader` asynchronously
|
|
requests multiple chunks from `Worker`, and reduce task consumes the data whenever available.
|
|
|
|
If exception occurs when fetching a chunk, `ShuffleClient` will restart reading from the beginning of another
|
|
(if replication is turned on, else retry the same) `PartitionLocation`. The reason to restart reading the whole
|
|
`PartitionLocation` instead of the chunk is because chunks with the same index in primary and replica are not
|
|
guaranteed to contain the same data, as explained [Here](../../developers/storage#reducepartition).
|
|
|
|
`ShuffleClient` chained the `PartitionReader`s and wrap them in an InputStream. To avoid duplicated read,
|
|
`CelebornInputStream` discards data from un-successful attempts, and records batch ids it has seen within an attempt.
|
|
|
|
### Read from Map Range
|
|
If user specifies `startMapIndex` and `endMapIndex`, `CelebornInputStream` will only return data within the range.
|
|
Under the hood is that `Worker` only responds data within the range. This is achieved by sorting and indexing the file
|
|
by map id upon receiving such range read request, then return the continuous data range of interest.
|
|
|
|
Notice that the sort on read is only triggered upon map range read, not for the common cases where whole partition data
|
|
is requested.
|
|
|
|
Celeborn also optionally records map ids for each `PartitionLocation`, in the case of map range reading,
|
|
`CelebornInputStream` will filter out `PartitionLocation`s that are out of the specified range. |