celeborn/docs/developers/integrate.md
KenGeng 2097fcdfea [CELEBORN-1870] Fix typos in in 'Developer' documents
### What changes were proposed in this pull request?
Fix typo in 'Developer' documents.

### Why are the changes needed?
Improve the accurary of the doc.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Only doc changed. No test.

Closes #3108 from bgeng777/CELEBORN-1870.

Authored-by: KenGeng <samuelgeng7@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-02-19 14:31:14 +08:00

180 lines
6.6 KiB
Markdown

---
license: |
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---
# Integrating Celeborn
## Overview
The core components of Celeborn, i.e. `Master`, `Worker`, and `Client` are all engine irrelevant. Developers can
integrate Celeborn with various engines or applications by using or extending Celeborn's `Client`, as the officially
supported plugins for Spark/Flink/MapReduce.
This article briefly describes an example of integrating Celeborn into a simple distributed application using
Celeborn `Client`.
## Background
Say we have a distributed application who has two phases:
- Write phase that parallel tasks write data to some data service, each record is classified into some logical id,
say partition id.
- Read phase that parallel tasks read data from the data service, each task read data from a specified partition id.
Suppose the application has failover mechanism so that it's acceptable that when some data is lost the application
will rerun tasks.
Say developers of this application is searching for a suitable data service, and accidentally finds this article.
## Step One: Setup Celeborn Cluster
First, you need an available Celeborn Cluster. Refer to [QuickStart](../../) to set up a simple cluster in a
single node, or [Deploy](../../deploy) to set up a multi-node cluster, standalone or on K8s.
## Step Two: Create LifecycleManager
As described in [Client](../../developers/client), `Client` is separated into `LifecycleManager`, which is singleton
through an application; and `ShuffleClient`, which can have multiple instances.
Step two is to create a `LifecycleManager` instance, using the following API:
```scala
class LifecycleManager(val appUniqueId: String, val conf: CelebornConf)
```
- `appUniqueId` is the application id. Celeborn cluster stores, serves, and cleans up data in the granularity of
(application id, shuffle id)
- `conf` is an object of `CelebornConf`. The only required configuration is the address of Celeborn `Master`. For
the thorough description of configs, refer to [Configuration](../../configuration)
The example java code to create an `LifecycleManager` instance is as follows:
```java
CelebornConf celebornConf = new CelebornConf();
celebornConf.set("celeborn.master.endpoints", "<Master IP>:<Master Port>");
LifecycleManager lm = new LifecycleManager("myApp", celebornConf);
```
`LifecycleManager` object automatically starts necessary service after creation, so there is no need to call
other APIs to initialize it. You can get `LifecycleManager`'s address after creating it, which is needed to
create `ShuffleClient`.
```java
String host = lm.getHost();
int port = lm.getPort();
```
## Step Three: Create ShuffleClient
With `LifecycleManager`'s host and port, you can create `ShuffleClient` using the following API:
```java
public static ShuffleClient get(
String appUniqueId,
String host,
int port,
CelebornConf conf,
UserIdentifier userIdentifier)
```
- `appUniqueId` is the application id, same as above.
- `host` is the host of `LifecycleManager`
- `port` is the port of `LifecycleManager`
- `conf` is an object of `CelebornConf`, safe to pass an empty object
- `userIdentifier` specifies user identity, safe to pass null
You can create a `ShuffleClient` object using the following code:
```java
ShuffleClient shuffleClient =
ShuffleClient.get(
"myApp",
<LifecycleManager Host>,
<LifecycleManager Port>,
new CelebornConf(),
null);
```
This method returns a singleton `ShuffleClientImpl` object, and it's recommended to use this way as `ShuffleClientImpl`
maintains status and reuses resource across all shuffles. To make it work, you have to ensure that the
`LifecycleManager`'s host and port are reachable.
In practice, one `ShuffleClient` instance is created in each Executor process of Spark, or in each TaskManager
process of Flink.
## Step Four: Push Data
You can then push data with `ShuffleClient` with [pushData](../../developers/shuffleclient#api-specification), like
the following:
```java
int bytesWritten =
shuffleClient.pushData(
shuffleId,
mapId,
attemptId,
partitionId,
data,
0,
length,
numMappers,
numPartitions);
```
Each call of `pushData` passes a byte array containing data from the same partition id. In addition to specifying the
shuffleId, mapId, attemptId that the data belongs, `ShuffleClient` should also specify the number of mappers and the
number of partitions for [Lazy Register](../../developers/shuffleclient#lazy-shuffle-register).
After the map task finishes, `ShuffleClient` should call `mapperEnd` to tell `LifecycleManager` that the map task
finishes pushing its data:
```java
public abstract void mapperEnd(
int shuffleId,
int mapId,
int attempted,
int numMappers)
```
- `shuffleId` shuffle id of the current task
- `mapId` map id of the current task
- `attemptId` attempt id of the current task
- `numMappers` number of map ids in this shuffle
## Step Five: Read Data
After all tasks successfully called `mapperEnd`, you can start reading data from some partition id, using the
[readPartition API](../../developers/shuffleclient#api-specification_1), as the following code:
```java
InputStream inputStream = shuffleClient.readPartition(
shuffleId,
partitionId,
attemptId,
startMapIndex,
endMapIndex);
int byte = inputstream.read();
```
For simplicity, to read the whole data from the partition, you can pass 0 and `Integer.MAX_VALUE` to `startMapIndex`
and `endMapIndex`. This method will create an InputStream for the data, and guarantees no data lost and no
duplicate reading, else exception will be thrown.
## Step Six: Clean Up
After the shuffle finishes, you can call `LifecycleManager.unregisterShuffle` to clean up resources related to the
shuffle:
```java
lm.unregisterShuffle(0);
```
It's safe not to call `unregisterShuffle`, because Celeborn `Master` recognizes application finish by heartbeat
timeout, and will self-clean resources in the cluster.