### What changes were proposed in this pull request? Fix some typos ### Why are the changes needed? Ditto ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Closes #1983 from onebox-li/fix-typo. Authored-by: onebox-li <lyh-36@163.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
133 lines
7.0 KiB
Markdown
133 lines
7.0 KiB
Markdown
---
|
|
license: |
|
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
|
contributor license agreements. See the NOTICE file distributed with
|
|
this work for additional information regarding copyright ownership.
|
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
|
(the "License"); you may not use this file except in compliance with
|
|
the License. You may obtain a copy of the License at
|
|
|
|
https://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
---
|
|
|
|
# LifecycleManager
|
|
|
|
## Overview
|
|
`LifecycleManager` maintains information of each shuffle for the application:
|
|
|
|
- All active shuffle ids
|
|
- `Worker`s that are serving each shuffle, and what `PartitionLocation`s are on each `Worker`
|
|
- Status of each shuffle, i.e. not committed, committing, committed, data lost, expired
|
|
- The newest `PartitionLocation` with the largest epoch of each partition id
|
|
- User identifier for this application
|
|
|
|
Also, `LifecycleManager` handles control messages with `ShuffleClient` and Celeborn `Master`, typically, it receives
|
|
requests from `ShuffleClient`:
|
|
|
|
- RegisterShuffle
|
|
- Revive/PartitionSplit
|
|
- MapperEnd/StageEnd
|
|
- GetReducerFileGroup
|
|
|
|
to handle the requests, `LifecycleManager` will send requests to `Master` and `Worker`s:
|
|
|
|
- Heartbeat to `Master`
|
|
- RequestSlots to `Master`
|
|
- UnregisterShuffle to `Master`
|
|
- ReserveSlots to `Worker`
|
|
- CommitFiles to `Worker`
|
|
- DestroyWorkerSlots to `Worker`
|
|
|
|
## RegisterShuffle
|
|
As described in [PushData](../../developers/shuffleclient#lazy-shuffle-register), `ShuffleClient` lazily send
|
|
RegisterShuffle to LifecycleManager, so many concurrent requests will be sent to `LifecycleManager`.
|
|
|
|
To ensure only one request for each shuffle is handled, `LifecycleManager` puts tail requests in a set and only
|
|
let go the first request. When the first request finishes, `LifecycleManager` responds to all cached requests.
|
|
|
|
The process of handling RegisterShuffle is as follows:
|
|
|
|
`LifecycleManager` sends RequestSlots to `Master`, `Master` allocates slots for the shuffle, as
|
|
[Here](../../developers/master#slots-allocation) describes.
|
|
|
|
Upon receiving slots allocation result, `LifecycleManager` sends ReserveSlots to all `Workers`s allocated
|
|
in parallel. `Worker`s then select a disk and initialize for each `PartitionLocation`, see
|
|
[Here](../../developers/storage#local-disk-and-memory-buffer).
|
|
|
|
After all related `Worker`s successfully reserved slots, `LifecycleManager` stores the shuffle information in
|
|
memory and responds to all pending and future requests.
|
|
|
|
## Revive/PartitionSplit
|
|
Celeborn handles push data failure in a so-called Revive mechanism, see
|
|
[Here](../../developers/faulttolerant#handle-pushdata-failure). Similar to [Split](../../developers/pushdata#split),
|
|
they both asks `LifecycleManager` for a new epoch of `PartitionLocation` for future data pushing.
|
|
|
|
Upon receiving Revive/PartitionSplit, `LifecycleManager` first checks whether it has a newer epoch locally, if so
|
|
it just responds the newer one. If not, like handling RegisterShuffle, it puts tail requests for the same partition id
|
|
in a set and only let go the first one.
|
|
|
|
Unlike RegisterShuffle, `LifecycleManager` does not send RequestSlots to `Master` to ask for new `Worker`s. Instead,
|
|
it randomly picks `Worker`s from local `Worker` list, excluding the failing ones. This design is to avoid too many
|
|
RPCs to `Master`.
|
|
|
|
Then `LifecycleManager` sends ReserveSlots to the picked `Worker`s. When success, it responds the new
|
|
`PartitionLocation`s to `ShuffleClient`s.
|
|
|
|
## MapperEnd/StageEnd
|
|
Celeborn needs to known when shuffle write stage ends to persist shuffle data, check if any data lost, and prepare for
|
|
shuffle read. Many compute engines do not signal such event (for example, Spark's ShuffleManager does not
|
|
have such API), Celeborn has to recognize that itself.
|
|
|
|
To achieve this, Celeborn requires `ShuffleClient` to specify the number of map tasks in RegisterShuffle request,
|
|
and send MapperEnd request to `LifecycleManager` when a map task succeeds. When MapperEnd are received for every
|
|
map id, `LifecycleManager` knows that the shuffle write stage ends, and sends CommitFiles to related `Worker`s.
|
|
|
|
For many compute engines, a map task may launch multiple attempts (i.e. speculative execution), and the engine
|
|
chooses one of them as successful attempt. However, there is no way for Celeborn to know about the chosen attempt.
|
|
Instead, `LifecycleManager` records the first attempt sending MapperEnd as the success one for each map task,
|
|
and ignores other attempts. This is correct because compute engines guarantee that all attempts for a map task
|
|
generate the same output data.
|
|
|
|
Upon receiving CommitFiles, `Worker`s flush buffered data to files and responds the succeeded and failed
|
|
`PartitionLocation`s to `LifecycleManager`, see [Here](../../developers/storage#local-disk-and-memory-buffer).
|
|
`LifecycleManager` then checks if any of `PartitionLocation` loses both primary and replica data (mark data lost if so),
|
|
and stores the information in memory.
|
|
|
|
## GetReducerFileGroup
|
|
Reduce task asks `LifecycleManager` for `PartitionLocation`s of each partition id to read data. To reduce the number
|
|
of RPCs, `ShuffleClient` asks for the mapping from all partition ids to their `PartitionLocation`s and caches in
|
|
memory, through GetReducerFileGroup request
|
|
|
|
Upon receiving the request, `LifecycleManager` responds the cached mapping or indicates data lost.
|
|
|
|
## Heartbeat to Master
|
|
`LifecycleManager` periodically sends heartbeat to `Master`, piggybacking the following information:
|
|
|
|
- Bytes and files written by the application, used to calculate estimated partition size, see
|
|
[Here](../../developers/master#maintain-active-shuffles)
|
|
- `Worker` list that `LifecycleManager` wants `Master` to tell status
|
|
|
|
## UnregisterShuffle
|
|
When compute engines tells Celeborn that some shuffle is complete (i.e. through unregisterShuffle for Spark),
|
|
`LifecycleManager` first checks and waits for write stage end, then put the shuffle id into unregistered set,
|
|
after some expire time it removes the id and sends UnregisterShuffle to `Master` for cleanup, see
|
|
[Here](../../developers/master#maintain-active-shuffles)
|
|
|
|
## DestroyWorkerSlots
|
|
Normally, `Worker`s cleanup resources for `PartitionLocation`s after notified shuffle unregistered.
|
|
In some abnormal cases, `Master` will send DestroyWorkerSlots to early cleanup, for example if some `Worker`s fail
|
|
to reserve slots, `LifecycleManager` will tell the successfully reserved `Worker`s to release the slots.
|
|
|
|
## Batch RPCs
|
|
Some RPCs are of high frequent, for example Revive/PartitionSplit, CommitFiles, DestroyWorkerSlots. To reduce
|
|
the number of RPCs, `LifecycleManager` batches the same kind of RPCs and periodically checks and sends to `Master`
|
|
through a dedicated thread.
|
|
|
|
Users can enable and tune batch RPC through the following configs:
|
|
`celeborn.client.shuffle.batch*` |