From a2d39723180468fb9bdd7c36c0344c982292b302 Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Fri, 22 Nov 2024 15:03:53 +0800 Subject: [PATCH] [CELEBORN-1530] support MPU for S3 ### What changes were proposed in this pull request? as title ### Why are the changes needed? AWS S3 doesn't support append, so Celeborn had to copy the historical data from s3 to worker and write to s3 again, which heavily scales out the write. This PR implements a better solution via MPU to avoid copy-and-write. ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ![WechatIMG257](https://github.com/user-attachments/assets/968d9162-e690-4767-8bed-e490e3055753) I conducted an experiment with a 1GB input dataset to compare the performance of Celeborn using only S3 storage versus using SSD storage. The results showed that Celeborn with SSD storage was approximately three times faster than with only S3 storage. Screenshot 2024-11-16 at 13 02 10 The above screenshot is the second test with 5000 mapper and reducer that I did. Closes #2830 from zhaohehuhu/dev-1021. Lead-authored-by: zhaohehuhu Co-authored-by: He Zhao Signed-off-by: mingji --- build/make-distribution.sh | 4 +- common/pom.xml | 20 --- .../apache/celeborn/common/CelebornConf.scala | 18 +- .../common/util/CelebornHadoopUtils.scala | 6 +- dev/deps/dependencies-server | 3 + docs/configuration/client.md | 2 +- docs/configuration/master.md | 2 +- docs/configuration/worker.md | 5 +- master/pom.xml | 35 ++-- multipart-uploader/pom.xml | 57 ++++++ .../celeborn/S3MultipartUploadHandler.java | 164 ++++++++++++++++++ pom.xml | 22 +-- project/CelebornBuild.scala | 36 ++-- .../service/mpu/MultipartUploadHandler.java | 34 ++++ worker/pom.xml | 13 ++ .../worker/storage/PartitionDataWriter.java | 71 +++++++- .../deploy/worker/storage/FlushTask.scala | 34 ++-- 17 files changed, 422 insertions(+), 104 deletions(-) create mode 100644 multipart-uploader/pom.xml create mode 100644 multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java create mode 100644 service/src/main/java/org/apache/celeborn/server/common/service/mpu/MultipartUploadHandler.java diff --git a/build/make-distribution.sh b/build/make-distribution.sh index 55b419d15..1c9bbe6e5 100755 --- a/build/make-distribution.sh +++ b/build/make-distribution.sh @@ -264,8 +264,8 @@ function sbt_build_service { echo "Celeborn $VERSION$GITREVSTRING" > "$DIST_DIR/RELEASE" echo "Build flags: $@" >> "$DIST_DIR/RELEASE" - if [[ $@ == *"hadoop-aws"* ]]; then - SBT_MAVEN_PROFILES="hadoop-aws" + if [[ $@ == *"aws"* ]]; then + export SBT_MAVEN_PROFILES="aws" fi BUILD_COMMAND=("$SBT" clean package) diff --git a/common/pom.xml b/common/pom.xml index 3c34be923..0f4da3e7a 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -209,25 +209,5 @@ - - hadoop-aws - - - hadoop-aws-deps - - - - - org.apache.hadoop - hadoop-aws - ${hadoop.version} - - - com.amazonaws - aws-java-sdk-bundle - ${aws.version} - - - diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 9bf3ea4d4..3e48d9388 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1147,7 +1147,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def s3SecretKey: String = get(S3_SECRET_KEY).getOrElse("") - def s3Endpoint: String = get(S3_ENDPOINT).getOrElse("") + def s3EndpointRegion: String = get(S3_ENDPOINT_REGION).getOrElse("") + + def s3MultiplePartUploadMaxRetries: Int = get(S3_MPU_MAX_RETRIES) def s3Dir: String = { get(S3_DIR).map { @@ -3062,14 +3064,22 @@ object CelebornConf extends Logging { .stringConf .createOptional - val S3_ENDPOINT: OptionalConfigEntry[String] = - buildConf("celeborn.storage.s3.endpoint") + val S3_ENDPOINT_REGION: OptionalConfigEntry[String] = + buildConf("celeborn.storage.s3.endpoint.region") .categories("worker", "master", "client") .version("0.6.0") .doc("S3 endpoint for Celeborn to store shuffle data.") .stringConf .createOptional + val S3_MPU_MAX_RETRIES: ConfigEntry[Int] = + buildConf("celeborn.storage.s3.mpu.maxRetries") + .categories("worker") + .version("0.6.0") + .doc("S3 MPU upload max retries.") + .intConf + .createWithDefault(5) + val WORKER_DISK_RESERVE_SIZE: ConfigEntry[Long] = buildConf("celeborn.worker.storage.disk.reserve.size") .withAlternative("celeborn.worker.disk.reserve.size") @@ -3552,7 +3562,7 @@ object CelebornConf extends Logging { .version("0.6.0") .doc("Size of buffer used by a S3 flusher.") .bytesConf(ByteUnit.BYTE) - .createWithDefaultString("4m") + .createWithDefaultString("6m") val WORKER_WRITER_CLOSE_TIMEOUT: ConfigEntry[Long] = buildConf("celeborn.worker.writer.close.timeout") diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala index 356711d75..b703da07c 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala @@ -49,9 +49,9 @@ object CelebornHadoopUtils extends Logging { } if (conf.s3Dir.nonEmpty) { - if (conf.s3AccessKey.isEmpty || conf.s3SecretKey.isEmpty || conf.s3Endpoint.isEmpty) { + if (conf.s3AccessKey.isEmpty || conf.s3SecretKey.isEmpty || conf.s3EndpointRegion.isEmpty) { throw new CelebornException( - "S3 storage is enabled but s3AccessKey, s3SecretKey, or s3Endpoint is not set") + "S3 storage is enabled but s3AccessKey, s3SecretKey, or s3EndpointRegion is not set") } hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") hadoopConf.set( @@ -59,7 +59,7 @@ object CelebornHadoopUtils extends Logging { "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") hadoopConf.set("fs.s3a.access.key", conf.s3AccessKey) hadoopConf.set("fs.s3a.secret.key", conf.s3SecretKey) - hadoopConf.set("fs.s3a.endpoint", conf.s3Endpoint) + hadoopConf.set("fs.s3a.endpoint.region", conf.s3EndpointRegion) } appendSparkHadoopConfigs(conf, hadoopConf) hadoopConf diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server index 74e71214a..0edc78b70 100644 --- a/dev/deps/dependencies-server +++ b/dev/deps/dependencies-server @@ -19,6 +19,7 @@ HikariCP/4.0.3//HikariCP-4.0.3.jar RoaringBitmap/1.0.6//RoaringBitmap-1.0.6.jar aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar ap-loader-all/3.0-8//ap-loader-all-3.0-8.jar +aws-java-sdk-bundle/1.12.367//aws-java-sdk-bundle-1.12.367.jar classgraph/4.8.138//classgraph-4.8.138.jar commons-cli/1.5.0//commons-cli-1.5.0.jar commons-crypto/1.0.0//commons-crypto-1.0.0.jar @@ -27,6 +28,7 @@ commons-lang3/3.17.0//commons-lang3-3.17.0.jar commons-logging/1.1.3//commons-logging-1.1.3.jar failureaccess/1.0.2//failureaccess-1.0.2.jar guava/33.1.0-jre//guava-33.1.0-jre.jar +hadoop-aws/3.3.6//hadoop-aws-3.3.6.jar hadoop-client-api/3.3.6//hadoop-client-api-3.3.6.jar hadoop-client-runtime/3.3.6//hadoop-client-runtime-3.3.6.jar hk2-api/2.6.1//hk2-api-2.6.1.jar @@ -145,4 +147,5 @@ swagger-integration/2.2.1//swagger-integration-2.2.1.jar swagger-jaxrs2/2.2.1//swagger-jaxrs2-2.2.1.jar swagger-models/2.2.1//swagger-models-2.2.1.jar swagger-ui/4.9.1//swagger-ui-4.9.1.jar +wildfly-openssl/1.1.3.Final//wildfly-openssl-1.1.3.Final.jar zstd-jni/1.5.2-1//zstd-jni-1.5.2-1.jar diff --git a/docs/configuration/client.md b/docs/configuration/client.md index bec9c56ae..2bb7a08f3 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -132,6 +132,6 @@ license: | | celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | | | celeborn.storage.s3.access.key | <undefined> | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.storage.s3.dir | <undefined> | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | | -| celeborn.storage.s3.endpoint | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.endpoint.region | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | | diff --git a/docs/configuration/master.md b/docs/configuration/master.md index d3300369f..4359527c0 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -89,6 +89,6 @@ license: | | celeborn.storage.hdfs.kerberos.principal | <undefined> | false | Kerberos principal for HDFS storage connection. | 0.3.2 | | | celeborn.storage.s3.access.key | <undefined> | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.storage.s3.dir | <undefined> | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | | -| celeborn.storage.s3.endpoint | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.endpoint.region | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | | diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 36e2cca97..97e262718 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -48,7 +48,8 @@ license: | | celeborn.storage.hdfs.kerberos.principal | <undefined> | false | Kerberos principal for HDFS storage connection. | 0.3.2 | | | celeborn.storage.s3.access.key | <undefined> | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.storage.s3.dir | <undefined> | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | | -| celeborn.storage.s3.endpoint | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.endpoint.region | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.mpu.maxRetries | 5 | false | S3 MPU upload max retries. | 0.6.0 | | | celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.worker.activeConnection.max | <undefined> | false | If the number of active connections on a worker exceeds this configuration value, the worker will be marked as high-load in the heartbeat report, and the master will not include that node in the response of RequestSlots. | 0.3.1 | | | celeborn.worker.applicationRegistry.cache.size | 10000 | false | Cache size of the application registry on Workers. | 0.5.0 | | @@ -84,7 +85,7 @@ license: | | celeborn.worker.flusher.hdd.threads | 1 | false | Flusher's thread count per disk used for write data to HDD disks. | 0.2.0 | | | celeborn.worker.flusher.hdfs.buffer.size | 4m | false | Size of buffer used by a HDFS flusher. | 0.3.0 | | | celeborn.worker.flusher.hdfs.threads | 8 | false | Flusher's thread count used for write data to HDFS. | 0.2.0 | | -| celeborn.worker.flusher.s3.buffer.size | 4m | false | Size of buffer used by a S3 flusher. | 0.6.0 | | +| celeborn.worker.flusher.s3.buffer.size | 6m | false | Size of buffer used by a S3 flusher. | 0.6.0 | | | celeborn.worker.flusher.s3.threads | 8 | false | Flusher's thread count used for write data to S3. | 0.6.0 | | | celeborn.worker.flusher.shutdownTimeout | 3s | false | Timeout for a flusher to shutdown. | 0.2.0 | | | celeborn.worker.flusher.ssd.threads | 16 | false | Flusher's thread count per disk used for write data to SSD disks. | 0.2.0 | | diff --git a/master/pom.xml b/master/pom.xml index 51286b9ef..88f2f9ece 100644 --- a/master/pom.xml +++ b/master/pom.xml @@ -163,26 +163,6 @@ - - hadoop-aws - - - hadoop-aws-deps - - - - - org.apache.hadoop - hadoop-aws - ${hadoop.version} - - - com.amazonaws - aws-java-sdk-bundle - ${aws.version} - - - hadoop-2 @@ -198,5 +178,20 @@ + + aws + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + + + com.amazonaws + aws-java-sdk-bundle + ${aws.version} + + + diff --git a/multipart-uploader/pom.xml b/multipart-uploader/pom.xml new file mode 100644 index 000000000..cfdbbb4ee --- /dev/null +++ b/multipart-uploader/pom.xml @@ -0,0 +1,57 @@ + + + + 4.0.0 + + org.apache.celeborn + celeborn-parent_${scala.binary.version} + ${project.version} + ../pom.xml + + + celeborn-multipart-uploader_${scala.binary.version} + jar + Celeborn Multipart Uploader + + + + org.apache.celeborn + celeborn-service_${scala.binary.version} + ${project.version} + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + + + com.amazonaws + aws-java-sdk-bundle + ${aws.version} + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + + diff --git a/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java b/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java new file mode 100644 index 000000000..f1699c884 --- /dev/null +++ b/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.event.ProgressListener; +import com.amazonaws.retry.PredefinedRetryPolicies; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.services.s3.model.ListPartsRequest; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.PartListing; +import com.amazonaws.services.s3.model.PartSummary; +import com.amazonaws.services.s3.model.UploadPartRequest; + +import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class S3MultipartUploadHandler implements MultipartUploadHandler { + + private static final Logger logger = LoggerFactory.getLogger(S3MultipartUploadHandler.class); + + private String uploadId; + + private AmazonS3 s3Client; + + private String key; + + private String bucketName; + + private String s3AccessKey; + + private String s3SecretKey; + + private String s3EndpointRegion; + + private Integer s3MultiplePartUploadMaxRetries; + + public S3MultipartUploadHandler(String bucketName, String s3AccessKey, String s3SecretKey, String s3EndpointRegion, String key, Integer s3MultiplePartUploadMaxRetries) { + this.bucketName = bucketName; + this.s3AccessKey = s3AccessKey; + this.s3SecretKey = s3SecretKey; + this.s3EndpointRegion = s3EndpointRegion; + this.s3MultiplePartUploadMaxRetries = s3MultiplePartUploadMaxRetries; + BasicAWSCredentials basicAWSCredentials = + new BasicAWSCredentials(s3AccessKey, s3SecretKey); + ClientConfiguration clientConfig = new ClientConfiguration() + .withRetryPolicy(PredefinedRetryPolicies.getDefaultRetryPolicyWithCustomMaxRetries(s3MultiplePartUploadMaxRetries)) + .withMaxErrorRetry(s3MultiplePartUploadMaxRetries); + this.s3Client = + AmazonS3ClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials)) + .withRegion(s3EndpointRegion) + .withClientConfiguration(clientConfig) + .build(); + this.key = key; + } + + @Override + public void startUpload() { + InitiateMultipartUploadRequest initRequest = + new InitiateMultipartUploadRequest(bucketName, key); + InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest); + this.uploadId = initResponse.getUploadId(); + } + + @Override + public void putPart(InputStream inputStream, Integer partNumber, Boolean finalFlush) throws IOException { + try (InputStream inStream = inputStream) { + int partSize = inStream.available(); + if (partSize == 0) { + logger.debug("key {} uploadId {} part size is 0 for part number {} finalFlush {}", key, uploadId, partNumber, finalFlush); + return; + } + UploadPartRequest uploadRequest = + new UploadPartRequest() + .withBucketName(bucketName) + .withKey(key) + .withUploadId(uploadId) + .withPartNumber(partNumber) + .withInputStream(inStream) + .withPartSize(partSize) + .withLastPart(finalFlush); + s3Client.uploadPart(uploadRequest); + logger.debug("key {} uploadId {} part number {} uploaded with size {} finalFlush {}", key, uploadId, partNumber, partSize, finalFlush); + } catch (RuntimeException | IOException e) { + logger.error("Failed to upload part", e); + throw e; + } + } + + @Override + public void complete() { + List partETags = new ArrayList<>(); + ListPartsRequest listPartsRequest = new ListPartsRequest(bucketName, key, uploadId); + PartListing partListing; + do { + partListing = s3Client.listParts(listPartsRequest); + for (PartSummary part : partListing.getParts()) { + partETags.add(new PartETag(part.getPartNumber(), part.getETag())); + } + listPartsRequest.setPartNumberMarker(partListing.getNextPartNumberMarker()); + } while (partListing.isTruncated()); + if (partETags.size() == 0){ + logger.debug("bucket {} key {} uploadId {} has no parts uploaded, aborting upload", bucketName, key, uploadId); + abort(); + logger.debug("bucket {} key {} upload completed with size {}", bucketName, key, 0); + return; + } + ProgressListener progressListener = progressEvent -> { + logger.debug("key {} uploadId {} progress event type {} transferred {} bytes", key, uploadId, progressEvent.getEventType(), progressEvent.getBytesTransferred()); + }; + + CompleteMultipartUploadRequest compRequest = + new CompleteMultipartUploadRequest( + bucketName, key, uploadId, partETags) + .withGeneralProgressListener(progressListener); + CompleteMultipartUploadResult compResult = s3Client.completeMultipartUpload(compRequest); + logger.debug("bucket {} key {} uploadId {} upload completed location is in {} ", bucketName, key, uploadId, compResult.getLocation()); + } + + @Override + public void abort() { + AbortMultipartUploadRequest abortMultipartUploadRequest = + new AbortMultipartUploadRequest(bucketName, key, uploadId); + s3Client.abortMultipartUpload(abortMultipartUploadRequest); + } + + @Override + public void close() { + if (s3Client != null) { + s3Client.shutdown(); + } + } +} diff --git a/pom.xml b/pom.xml index 1da158f70..a8549b5fc 100644 --- a/pom.xml +++ b/pom.xml @@ -38,6 +38,7 @@ service master worker + web cli @@ -71,6 +72,7 @@ 3.3.6 + 1.12.532