[CELEBORN-1530] support MPU for S3

### What changes were proposed in this pull request?

as title

### Why are the changes needed?
AWS S3 doesn't support append, so Celeborn had to copy the historical data from s3 to worker and write to s3 again, which heavily scales out the write. This PR implements a better solution via MPU to avoid copy-and-write.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

![WechatIMG257](https://github.com/user-attachments/assets/968d9162-e690-4767-8bed-e490e3055753)

I conducted an experiment with a 1GB input dataset to compare the performance of Celeborn using only S3 storage versus using SSD storage. The results showed that Celeborn with SSD storage was approximately three times faster than with only S3 storage.

<img width="1728" alt="Screenshot 2024-11-16 at 13 02 10" src="https://github.com/user-attachments/assets/8f879c47-c01a-4004-9eae-1c266c1f3ef2">

The above screenshot is the second test with 5000 mapper and reducer that I did.

Closes #2830 from zhaohehuhu/dev-1021.

Lead-authored-by: zhaohehuhu <luoyedeyi@163.com>
Co-authored-by: He Zhao <luoyedeyi459@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
zhaohehuhu 2024-11-22 15:03:53 +08:00 committed by mingji
parent 317cb973dc
commit a2d3972318
17 changed files with 422 additions and 104 deletions

View File

@ -264,8 +264,8 @@ function sbt_build_service {
echo "Celeborn $VERSION$GITREVSTRING" > "$DIST_DIR/RELEASE"
echo "Build flags: $@" >> "$DIST_DIR/RELEASE"
if [[ $@ == *"hadoop-aws"* ]]; then
SBT_MAVEN_PROFILES="hadoop-aws"
if [[ $@ == *"aws"* ]]; then
export SBT_MAVEN_PROFILES="aws"
fi
BUILD_COMMAND=("$SBT" clean package)

View File

@ -209,25 +209,5 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>hadoop-aws</id>
<activation>
<property>
<name>hadoop-aws-deps</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>${aws.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

View File

@ -1147,7 +1147,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def s3SecretKey: String = get(S3_SECRET_KEY).getOrElse("")
def s3Endpoint: String = get(S3_ENDPOINT).getOrElse("")
def s3EndpointRegion: String = get(S3_ENDPOINT_REGION).getOrElse("")
def s3MultiplePartUploadMaxRetries: Int = get(S3_MPU_MAX_RETRIES)
def s3Dir: String = {
get(S3_DIR).map {
@ -3062,14 +3064,22 @@ object CelebornConf extends Logging {
.stringConf
.createOptional
val S3_ENDPOINT: OptionalConfigEntry[String] =
buildConf("celeborn.storage.s3.endpoint")
val S3_ENDPOINT_REGION: OptionalConfigEntry[String] =
buildConf("celeborn.storage.s3.endpoint.region")
.categories("worker", "master", "client")
.version("0.6.0")
.doc("S3 endpoint for Celeborn to store shuffle data.")
.stringConf
.createOptional
val S3_MPU_MAX_RETRIES: ConfigEntry[Int] =
buildConf("celeborn.storage.s3.mpu.maxRetries")
.categories("worker")
.version("0.6.0")
.doc("S3 MPU upload max retries.")
.intConf
.createWithDefault(5)
val WORKER_DISK_RESERVE_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.storage.disk.reserve.size")
.withAlternative("celeborn.worker.disk.reserve.size")
@ -3552,7 +3562,7 @@ object CelebornConf extends Logging {
.version("0.6.0")
.doc("Size of buffer used by a S3 flusher.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("4m")
.createWithDefaultString("6m")
val WORKER_WRITER_CLOSE_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.writer.close.timeout")

View File

@ -49,9 +49,9 @@ object CelebornHadoopUtils extends Logging {
}
if (conf.s3Dir.nonEmpty) {
if (conf.s3AccessKey.isEmpty || conf.s3SecretKey.isEmpty || conf.s3Endpoint.isEmpty) {
if (conf.s3AccessKey.isEmpty || conf.s3SecretKey.isEmpty || conf.s3EndpointRegion.isEmpty) {
throw new CelebornException(
"S3 storage is enabled but s3AccessKey, s3SecretKey, or s3Endpoint is not set")
"S3 storage is enabled but s3AccessKey, s3SecretKey, or s3EndpointRegion is not set")
}
hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoopConf.set(
@ -59,7 +59,7 @@ object CelebornHadoopUtils extends Logging {
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
hadoopConf.set("fs.s3a.access.key", conf.s3AccessKey)
hadoopConf.set("fs.s3a.secret.key", conf.s3SecretKey)
hadoopConf.set("fs.s3a.endpoint", conf.s3Endpoint)
hadoopConf.set("fs.s3a.endpoint.region", conf.s3EndpointRegion)
}
appendSparkHadoopConfigs(conf, hadoopConf)
hadoopConf

View File

@ -19,6 +19,7 @@ HikariCP/4.0.3//HikariCP-4.0.3.jar
RoaringBitmap/1.0.6//RoaringBitmap-1.0.6.jar
aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar
ap-loader-all/3.0-8//ap-loader-all-3.0-8.jar
aws-java-sdk-bundle/1.12.367//aws-java-sdk-bundle-1.12.367.jar
classgraph/4.8.138//classgraph-4.8.138.jar
commons-cli/1.5.0//commons-cli-1.5.0.jar
commons-crypto/1.0.0//commons-crypto-1.0.0.jar
@ -27,6 +28,7 @@ commons-lang3/3.17.0//commons-lang3-3.17.0.jar
commons-logging/1.1.3//commons-logging-1.1.3.jar
failureaccess/1.0.2//failureaccess-1.0.2.jar
guava/33.1.0-jre//guava-33.1.0-jre.jar
hadoop-aws/3.3.6//hadoop-aws-3.3.6.jar
hadoop-client-api/3.3.6//hadoop-client-api-3.3.6.jar
hadoop-client-runtime/3.3.6//hadoop-client-runtime-3.3.6.jar
hk2-api/2.6.1//hk2-api-2.6.1.jar
@ -145,4 +147,5 @@ swagger-integration/2.2.1//swagger-integration-2.2.1.jar
swagger-jaxrs2/2.2.1//swagger-jaxrs2-2.2.1.jar
swagger-models/2.2.1//swagger-models-2.2.1.jar
swagger-ui/4.9.1//swagger-ui-4.9.1.jar
wildfly-openssl/1.1.3.Final//wildfly-openssl-1.1.3.Final.jar
zstd-jni/1.5.2-1//zstd-jni-1.5.2-1.jar

View File

@ -132,6 +132,6 @@ license: |
| celeborn.storage.hdfs.dir | &lt;undefined&gt; | false | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | |
| celeborn.storage.s3.access.key | &lt;undefined&gt; | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint | &lt;undefined&gt; | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.secret.key | &lt;undefined&gt; | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | |
<!--end-include-->

View File

@ -89,6 +89,6 @@ license: |
| celeborn.storage.hdfs.kerberos.principal | &lt;undefined&gt; | false | Kerberos principal for HDFS storage connection. | 0.3.2 | |
| celeborn.storage.s3.access.key | &lt;undefined&gt; | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint | &lt;undefined&gt; | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.secret.key | &lt;undefined&gt; | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | |
<!--end-include-->

View File

@ -48,7 +48,8 @@ license: |
| celeborn.storage.hdfs.kerberos.principal | &lt;undefined&gt; | false | Kerberos principal for HDFS storage connection. | 0.3.2 | |
| celeborn.storage.s3.access.key | &lt;undefined&gt; | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint | &lt;undefined&gt; | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.mpu.maxRetries | 5 | false | S3 MPU upload max retries. | 0.6.0 | |
| celeborn.storage.s3.secret.key | &lt;undefined&gt; | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.worker.activeConnection.max | &lt;undefined&gt; | false | If the number of active connections on a worker exceeds this configuration value, the worker will be marked as high-load in the heartbeat report, and the master will not include that node in the response of RequestSlots. | 0.3.1 | |
| celeborn.worker.applicationRegistry.cache.size | 10000 | false | Cache size of the application registry on Workers. | 0.5.0 | |
@ -84,7 +85,7 @@ license: |
| celeborn.worker.flusher.hdd.threads | 1 | false | Flusher's thread count per disk used for write data to HDD disks. | 0.2.0 | |
| celeborn.worker.flusher.hdfs.buffer.size | 4m | false | Size of buffer used by a HDFS flusher. | 0.3.0 | |
| celeborn.worker.flusher.hdfs.threads | 8 | false | Flusher's thread count used for write data to HDFS. | 0.2.0 | |
| celeborn.worker.flusher.s3.buffer.size | 4m | false | Size of buffer used by a S3 flusher. | 0.6.0 | |
| celeborn.worker.flusher.s3.buffer.size | 6m | false | Size of buffer used by a S3 flusher. | 0.6.0 | |
| celeborn.worker.flusher.s3.threads | 8 | false | Flusher's thread count used for write data to S3. | 0.6.0 | |
| celeborn.worker.flusher.shutdownTimeout | 3s | false | Timeout for a flusher to shutdown. | 0.2.0 | |
| celeborn.worker.flusher.ssd.threads | 16 | false | Flusher's thread count per disk used for write data to SSD disks. | 0.2.0 | |

View File

@ -163,26 +163,6 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>hadoop-aws</id>
<activation>
<property>
<name>hadoop-aws-deps</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>${aws.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>hadoop-2</id>
<activation>
@ -198,5 +178,20 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>aws</id>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>${aws.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-parent_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>celeborn-multipart-uploader_${scala.binary.version}</artifactId>
<packaging>jar</packaging>
<name>Celeborn Multipart Uploader</name>
<dependencies>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-service_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>${aws.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.retry.PredefinedRetryPolicies;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ListPartsRequest;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PartListing;
import com.amazonaws.services.s3.model.PartSummary;
import com.amazonaws.services.s3.model.UploadPartRequest;
import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class S3MultipartUploadHandler implements MultipartUploadHandler {
private static final Logger logger = LoggerFactory.getLogger(S3MultipartUploadHandler.class);
private String uploadId;
private AmazonS3 s3Client;
private String key;
private String bucketName;
private String s3AccessKey;
private String s3SecretKey;
private String s3EndpointRegion;
private Integer s3MultiplePartUploadMaxRetries;
public S3MultipartUploadHandler(String bucketName, String s3AccessKey, String s3SecretKey, String s3EndpointRegion, String key, Integer s3MultiplePartUploadMaxRetries) {
this.bucketName = bucketName;
this.s3AccessKey = s3AccessKey;
this.s3SecretKey = s3SecretKey;
this.s3EndpointRegion = s3EndpointRegion;
this.s3MultiplePartUploadMaxRetries = s3MultiplePartUploadMaxRetries;
BasicAWSCredentials basicAWSCredentials =
new BasicAWSCredentials(s3AccessKey, s3SecretKey);
ClientConfiguration clientConfig = new ClientConfiguration()
.withRetryPolicy(PredefinedRetryPolicies.getDefaultRetryPolicyWithCustomMaxRetries(s3MultiplePartUploadMaxRetries))
.withMaxErrorRetry(s3MultiplePartUploadMaxRetries);
this.s3Client =
AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials))
.withRegion(s3EndpointRegion)
.withClientConfiguration(clientConfig)
.build();
this.key = key;
}
@Override
public void startUpload() {
InitiateMultipartUploadRequest initRequest =
new InitiateMultipartUploadRequest(bucketName, key);
InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest);
this.uploadId = initResponse.getUploadId();
}
@Override
public void putPart(InputStream inputStream, Integer partNumber, Boolean finalFlush) throws IOException {
try (InputStream inStream = inputStream) {
int partSize = inStream.available();
if (partSize == 0) {
logger.debug("key {} uploadId {} part size is 0 for part number {} finalFlush {}", key, uploadId, partNumber, finalFlush);
return;
}
UploadPartRequest uploadRequest =
new UploadPartRequest()
.withBucketName(bucketName)
.withKey(key)
.withUploadId(uploadId)
.withPartNumber(partNumber)
.withInputStream(inStream)
.withPartSize(partSize)
.withLastPart(finalFlush);
s3Client.uploadPart(uploadRequest);
logger.debug("key {} uploadId {} part number {} uploaded with size {} finalFlush {}", key, uploadId, partNumber, partSize, finalFlush);
} catch (RuntimeException | IOException e) {
logger.error("Failed to upload part", e);
throw e;
}
}
@Override
public void complete() {
List<PartETag> partETags = new ArrayList<>();
ListPartsRequest listPartsRequest = new ListPartsRequest(bucketName, key, uploadId);
PartListing partListing;
do {
partListing = s3Client.listParts(listPartsRequest);
for (PartSummary part : partListing.getParts()) {
partETags.add(new PartETag(part.getPartNumber(), part.getETag()));
}
listPartsRequest.setPartNumberMarker(partListing.getNextPartNumberMarker());
} while (partListing.isTruncated());
if (partETags.size() == 0){
logger.debug("bucket {} key {} uploadId {} has no parts uploaded, aborting upload", bucketName, key, uploadId);
abort();
logger.debug("bucket {} key {} upload completed with size {}", bucketName, key, 0);
return;
}
ProgressListener progressListener = progressEvent -> {
logger.debug("key {} uploadId {} progress event type {} transferred {} bytes", key, uploadId, progressEvent.getEventType(), progressEvent.getBytesTransferred());
};
CompleteMultipartUploadRequest compRequest =
new CompleteMultipartUploadRequest(
bucketName, key, uploadId, partETags)
.withGeneralProgressListener(progressListener);
CompleteMultipartUploadResult compResult = s3Client.completeMultipartUpload(compRequest);
logger.debug("bucket {} key {} uploadId {} upload completed location is in {} ", bucketName, key, uploadId, compResult.getLocation());
}
@Override
public void abort() {
AbortMultipartUploadRequest abortMultipartUploadRequest =
new AbortMultipartUploadRequest(bucketName, key, uploadId);
s3Client.abortMultipartUpload(abortMultipartUploadRequest);
}
@Override
public void close() {
if (s3Client != null) {
s3Client.shutdown();
}
}
}

22
pom.xml
View File

@ -38,6 +38,7 @@
<module>service</module>
<module>master</module>
<module>worker</module>
<module>web</module>
<module>cli</module>
</modules>
@ -71,6 +72,7 @@
<!-- use hadoop-3 as default -->
<hadoop.version>3.3.6</hadoop.version>
<aws.version>1.12.532</aws.version>
<!--
If you change codahale.metrics.version, you also need to change
the link to metrics.dropwizard.io in docs/monitoring.md.
@ -1342,23 +1344,13 @@
</dependencies>
</profile>
<profile>
<id>hadoop-aws</id>
<id>aws</id>
<modules>
<module>multipart-uploader</module>
</modules>
<properties>
<hadoop-aws-deps>true</hadoop-aws-deps>
<aws.version>1.12.367</aws.version>
<aws-deps>true</aws-deps>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>${aws.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-2.4</id>

View File

@ -248,7 +248,7 @@ object CelebornCommonSettings {
"Build-Revision" -> gitHeadCommit.value.getOrElse("N/A"),
"Build-Branch" -> gitCurrentBranch.value,
"Build-Time" -> java.time.ZonedDateTime.now().format(java.time.format.DateTimeFormatter.ISO_DATE_TIME)),
// -target cannot be passed as a parameter to javadoc. See https://github.com/sbt/sbt/issues/355
Compile / compile / javacOptions ++= Seq("-target", "1.8"),
@ -370,7 +370,8 @@ object CelebornBuild extends sbt.internal.BuildDef {
CelebornService.service,
CelebornWorker.worker,
CelebornMaster.master,
CelebornCli.cli) ++ maybeSparkClientModules ++ maybeFlinkClientModules ++ maybeMRClientModules ++ maybeWebModules
CelebornCli.cli,
CeleborMPU.celeborMPU) ++ maybeSparkClientModules ++ maybeFlinkClientModules ++ maybeMRClientModules ++ maybeWebModules
}
// ThisBuild / parallelExecution := false
@ -494,13 +495,23 @@ object CelebornSpi {
)
}
object CeleborMPU {
lazy val hadoopAwsDependencies = Seq(Dependencies.hadoopAws, Dependencies.awsClient)
lazy val celeborMPU = Project("celeborn-multipart-uploader", file("multipart-uploader"))
.dependsOn(CelebornService.service % "test->test;compile->compile")
.settings (
commonSettings,
libraryDependencies ++= Seq(
Dependencies.log4j12Api,
Dependencies.log4jSlf4jImpl,
) ++ hadoopAwsDependencies
)
}
object CelebornCommon {
lazy val hadoopAwsDependencies = if(profiles.exists(_.startsWith("hadoop-aws"))){
Seq(Dependencies.hadoopAws, Dependencies.awsClient)
} else {
Seq.empty
}
lazy val common = Project("celeborn-common", file("common"))
.dependsOn(CelebornSpi.spi)
@ -538,7 +549,7 @@ object CelebornCommon {
// SSL support
Dependencies.bouncycastleBcprovJdk18on,
Dependencies.bouncycastleBcpkixJdk18on
) ++ commonUnitTestDependencies ++ hadoopAwsDependencies,
) ++ commonUnitTestDependencies,
Compile / sourceGenerators += Def.task {
val file = (Compile / sourceManaged).value / "org" / "apache" / "celeborn" / "package.scala"
@ -645,13 +656,18 @@ object CelebornMaster {
}
object CelebornWorker {
lazy val worker = Project("celeborn-worker", file("worker"))
var worker = Project("celeborn-worker", file("worker"))
.dependsOn(CelebornService.service)
.dependsOn(CelebornCommon.common % "test->test;compile->compile")
.dependsOn(CelebornService.service % "test->test;compile->compile")
.dependsOn(CelebornClient.client % "test->compile")
.dependsOn(CelebornMaster.master % "test->compile")
.settings (
if (profiles.exists(_.startsWith("aws"))) {
worker = worker.dependsOn(CeleborMPU.celeborMPU)
}
worker = worker.settings(
commonSettings,
libraryDependencies ++= Seq(
Dependencies.apLoader,

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.server.common.service.mpu;
import java.io.IOException;
import java.io.InputStream;
public interface MultipartUploadHandler {
void startUpload();
void putPart(InputStream inputStream, Integer partNumber, Boolean finalFlush) throws IOException;
void complete();
void abort();
void close();
}

View File

@ -141,4 +141,17 @@
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>aws</id>
<dependencies>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-multipart-uploader_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

View File

@ -19,6 +19,7 @@ package org.apache.celeborn.service.deploy.worker.storage;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.TimeUnit;
@ -32,6 +33,7 @@ import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
@ -49,6 +51,8 @@ import org.apache.celeborn.common.protocol.PartitionSplitMode;
import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.common.unsafe.Platform;
import org.apache.celeborn.common.util.FileChannelUtils;
import org.apache.celeborn.reflect.DynConstructors;
import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler;
import org.apache.celeborn.service.deploy.worker.WorkerSource;
import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController;
import org.apache.celeborn.service.deploy.worker.congestcontrol.UserBufferInfo;
@ -113,6 +117,12 @@ public abstract class PartitionDataWriter implements DeviceObserver {
private UserCongestionControlContext userCongestionControlContext = null;
protected MultipartUploadHandler s3MultipartUploadHandler;
protected int partNumber = 1;
private final int s3MultiplePartUploadMaxRetries;
public PartitionDataWriter(
StorageManager storageManager,
AbstractSource workerSource,
@ -137,6 +147,7 @@ public abstract class PartitionDataWriter implements DeviceObserver {
this.s3FlusherBufferSize = conf.workerS3FlusherBufferSize();
this.metricsCollectCriticalEnabled = conf.metricsCollectCriticalEnabled();
this.chunkSize = conf.shuffleChunkSize();
this.s3MultiplePartUploadMaxRetries = conf.s3MultiplePartUploadMaxRetries();
Tuple4<MemoryFileInfo, Flusher, DiskFileInfo, File> createFileResult =
storageManager.createFile(writerContext, supportInMemory);
@ -187,6 +198,38 @@ public abstract class PartitionDataWriter implements DeviceObserver {
// If we reuse DFS output stream, we will exhaust the memory soon.
try {
hadoopFs.create(this.diskFileInfo.getDfsPath(), true).close();
if (diskFileInfo.isS3()) {
Configuration configuration = hadoopFs.getConf();
String s3AccessKey = configuration.get("fs.s3a.access.key");
String s3SecretKey = configuration.get("fs.s3a.secret.key");
String s3EndpointRegion = configuration.get("fs.s3a.endpoint.region");
URI uri = hadoopFs.getUri();
String bucketName = uri.getHost();
int index = diskFileInfo.getFilePath().indexOf(bucketName);
String key = diskFileInfo.getFilePath().substring(index + bucketName.length() + 1);
this.s3MultipartUploadHandler =
(MultipartUploadHandler)
DynConstructors.builder()
.impl(
"org.apache.celeborn.S3MultipartUploadHandler",
String.class,
String.class,
String.class,
String.class,
String.class,
Integer.class)
.build()
.newInstance(
bucketName,
s3AccessKey,
s3SecretKey,
s3EndpointRegion,
key,
s3MultiplePartUploadMaxRetries);
s3MultipartUploadHandler.startUpload();
}
} catch (IOException e) {
try {
// If create file failed, wait 10 ms and retry
@ -236,7 +279,14 @@ public abstract class PartitionDataWriter implements DeviceObserver {
} else if (diskFileInfo.isHdfs()) {
task = new HdfsFlushTask(flushBuffer, diskFileInfo.getDfsPath(), notifier, false);
} else if (diskFileInfo.isS3()) {
task = new S3FlushTask(flushBuffer, diskFileInfo.getDfsPath(), notifier, false);
task =
new S3FlushTask(
flushBuffer,
notifier,
false,
s3MultipartUploadHandler,
partNumber++,
finalFlush);
}
MemoryManager.instance().releaseMemoryFileStorage(numBytes);
MemoryManager.instance().incrementDiskBuffer(numBytes);
@ -264,7 +314,14 @@ public abstract class PartitionDataWriter implements DeviceObserver {
} else if (diskFileInfo.isHdfs()) {
task = new HdfsFlushTask(flushBuffer, diskFileInfo.getDfsPath(), notifier, true);
} else if (diskFileInfo.isS3()) {
task = new S3FlushTask(flushBuffer, diskFileInfo.getDfsPath(), notifier, true);
task =
new S3FlushTask(
flushBuffer,
notifier,
true,
s3MultipartUploadHandler,
partNumber++,
finalFlush);
}
}
}
@ -303,6 +360,11 @@ public abstract class PartitionDataWriter implements DeviceObserver {
}
if (notifier.hasException()) {
if (s3MultipartUploadHandler != null) {
logger.warn("Abort s3 multipart upload for {}", diskFileInfo.getFilePath());
s3MultipartUploadHandler.complete();
s3MultipartUploadHandler.close();
}
return;
}
@ -461,7 +523,10 @@ public abstract class PartitionDataWriter implements DeviceObserver {
}
finalClose.run();
if (s3MultipartUploadHandler != null) {
s3MultipartUploadHandler.complete();
s3MultipartUploadHandler.close();
}
// unregister from DeviceMonitor
if (diskFileInfo != null && !this.diskFileInfo.isDFS()) {
logger.debug("file info {} unregister from device monitor", diskFileInfo);

View File

@ -17,13 +17,14 @@
package org.apache.celeborn.service.deploy.worker.storage
import java.io.ByteArrayInputStream
import java.nio.channels.FileChannel
import io.netty.buffer.{ByteBufUtil, CompositeByteBuf}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.IOUtils
import org.apache.celeborn.common.protocol.StorageInfo.Type
import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler
abstract private[worker] class FlushTask(
val buffer: CompositeByteBuf,
@ -64,29 +65,16 @@ private[worker] class HdfsFlushTask(
private[worker] class S3FlushTask(
buffer: CompositeByteBuf,
val path: Path,
notifier: FlushNotifier,
keepBuffer: Boolean) extends FlushTask(buffer, notifier, keepBuffer) {
keepBuffer: Boolean,
s3MultipartUploader: MultipartUploadHandler,
partNumber: Int,
finalFlush: Boolean = false)
extends FlushTask(buffer, notifier, keepBuffer) {
override def flush(): Unit = {
val hadoopFs = StorageManager.hadoopFs.get(Type.S3)
if (hadoopFs.exists(path)) {
val conf = hadoopFs.getConf
val tempPath = new Path(path.getParent, path.getName + ".tmp")
val outputStream = hadoopFs.create(tempPath, true, 256 * 1024)
val inputStream = hadoopFs.open(path)
try {
IOUtils.copyBytes(inputStream, outputStream, conf, false)
} finally {
inputStream.close()
}
outputStream.write(ByteBufUtil.getBytes(buffer))
outputStream.close()
hadoopFs.delete(path, false)
hadoopFs.rename(tempPath, path)
} else {
val s3Stream = hadoopFs.create(path, true, 256 * 1024)
s3Stream.write(ByteBufUtil.getBytes(buffer))
s3Stream.close()
}
val bytes = ByteBufUtil.getBytes(buffer)
val inputStream = new ByteArrayInputStream(bytes)
s3MultipartUploader.putPart(inputStream, partNumber, finalFlush)
}
}