diff --git a/build/make-distribution.sh b/build/make-distribution.sh index 55b419d15..1c9bbe6e5 100755 --- a/build/make-distribution.sh +++ b/build/make-distribution.sh @@ -264,8 +264,8 @@ function sbt_build_service { echo "Celeborn $VERSION$GITREVSTRING" > "$DIST_DIR/RELEASE" echo "Build flags: $@" >> "$DIST_DIR/RELEASE" - if [[ $@ == *"hadoop-aws"* ]]; then - SBT_MAVEN_PROFILES="hadoop-aws" + if [[ $@ == *"aws"* ]]; then + export SBT_MAVEN_PROFILES="aws" fi BUILD_COMMAND=("$SBT" clean package) diff --git a/common/pom.xml b/common/pom.xml index 3c34be923..0f4da3e7a 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -209,25 +209,5 @@ - - hadoop-aws - - - hadoop-aws-deps - - - - - org.apache.hadoop - hadoop-aws - ${hadoop.version} - - - com.amazonaws - aws-java-sdk-bundle - ${aws.version} - - - diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 9bf3ea4d4..3e48d9388 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1147,7 +1147,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def s3SecretKey: String = get(S3_SECRET_KEY).getOrElse("") - def s3Endpoint: String = get(S3_ENDPOINT).getOrElse("") + def s3EndpointRegion: String = get(S3_ENDPOINT_REGION).getOrElse("") + + def s3MultiplePartUploadMaxRetries: Int = get(S3_MPU_MAX_RETRIES) def s3Dir: String = { get(S3_DIR).map { @@ -3062,14 +3064,22 @@ object CelebornConf extends Logging { .stringConf .createOptional - val S3_ENDPOINT: OptionalConfigEntry[String] = - buildConf("celeborn.storage.s3.endpoint") + val S3_ENDPOINT_REGION: OptionalConfigEntry[String] = + buildConf("celeborn.storage.s3.endpoint.region") .categories("worker", "master", "client") .version("0.6.0") .doc("S3 endpoint for Celeborn to store shuffle data.") .stringConf .createOptional + val S3_MPU_MAX_RETRIES: ConfigEntry[Int] = + buildConf("celeborn.storage.s3.mpu.maxRetries") + .categories("worker") + .version("0.6.0") + .doc("S3 MPU upload max retries.") + .intConf + .createWithDefault(5) + val WORKER_DISK_RESERVE_SIZE: ConfigEntry[Long] = buildConf("celeborn.worker.storage.disk.reserve.size") .withAlternative("celeborn.worker.disk.reserve.size") @@ -3552,7 +3562,7 @@ object CelebornConf extends Logging { .version("0.6.0") .doc("Size of buffer used by a S3 flusher.") .bytesConf(ByteUnit.BYTE) - .createWithDefaultString("4m") + .createWithDefaultString("6m") val WORKER_WRITER_CLOSE_TIMEOUT: ConfigEntry[Long] = buildConf("celeborn.worker.writer.close.timeout") diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala index 356711d75..b703da07c 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala @@ -49,9 +49,9 @@ object CelebornHadoopUtils extends Logging { } if (conf.s3Dir.nonEmpty) { - if (conf.s3AccessKey.isEmpty || conf.s3SecretKey.isEmpty || conf.s3Endpoint.isEmpty) { + if (conf.s3AccessKey.isEmpty || conf.s3SecretKey.isEmpty || conf.s3EndpointRegion.isEmpty) { throw new CelebornException( - "S3 storage is enabled but s3AccessKey, s3SecretKey, or s3Endpoint is not set") + "S3 storage is enabled but s3AccessKey, s3SecretKey, or s3EndpointRegion is not set") } hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") hadoopConf.set( @@ -59,7 +59,7 @@ object CelebornHadoopUtils extends Logging { "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") hadoopConf.set("fs.s3a.access.key", conf.s3AccessKey) hadoopConf.set("fs.s3a.secret.key", conf.s3SecretKey) - hadoopConf.set("fs.s3a.endpoint", conf.s3Endpoint) + hadoopConf.set("fs.s3a.endpoint.region", conf.s3EndpointRegion) } appendSparkHadoopConfigs(conf, hadoopConf) hadoopConf diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server index 74e71214a..0edc78b70 100644 --- a/dev/deps/dependencies-server +++ b/dev/deps/dependencies-server @@ -19,6 +19,7 @@ HikariCP/4.0.3//HikariCP-4.0.3.jar RoaringBitmap/1.0.6//RoaringBitmap-1.0.6.jar aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar ap-loader-all/3.0-8//ap-loader-all-3.0-8.jar +aws-java-sdk-bundle/1.12.367//aws-java-sdk-bundle-1.12.367.jar classgraph/4.8.138//classgraph-4.8.138.jar commons-cli/1.5.0//commons-cli-1.5.0.jar commons-crypto/1.0.0//commons-crypto-1.0.0.jar @@ -27,6 +28,7 @@ commons-lang3/3.17.0//commons-lang3-3.17.0.jar commons-logging/1.1.3//commons-logging-1.1.3.jar failureaccess/1.0.2//failureaccess-1.0.2.jar guava/33.1.0-jre//guava-33.1.0-jre.jar +hadoop-aws/3.3.6//hadoop-aws-3.3.6.jar hadoop-client-api/3.3.6//hadoop-client-api-3.3.6.jar hadoop-client-runtime/3.3.6//hadoop-client-runtime-3.3.6.jar hk2-api/2.6.1//hk2-api-2.6.1.jar @@ -145,4 +147,5 @@ swagger-integration/2.2.1//swagger-integration-2.2.1.jar swagger-jaxrs2/2.2.1//swagger-jaxrs2-2.2.1.jar swagger-models/2.2.1//swagger-models-2.2.1.jar swagger-ui/4.9.1//swagger-ui-4.9.1.jar +wildfly-openssl/1.1.3.Final//wildfly-openssl-1.1.3.Final.jar zstd-jni/1.5.2-1//zstd-jni-1.5.2-1.jar diff --git a/docs/configuration/client.md b/docs/configuration/client.md index bec9c56ae..2bb7a08f3 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -132,6 +132,6 @@ license: | | celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | | | celeborn.storage.s3.access.key | <undefined> | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.storage.s3.dir | <undefined> | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | | -| celeborn.storage.s3.endpoint | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.endpoint.region | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | | diff --git a/docs/configuration/master.md b/docs/configuration/master.md index d3300369f..4359527c0 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -89,6 +89,6 @@ license: | | celeborn.storage.hdfs.kerberos.principal | <undefined> | false | Kerberos principal for HDFS storage connection. | 0.3.2 | | | celeborn.storage.s3.access.key | <undefined> | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.storage.s3.dir | <undefined> | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | | -| celeborn.storage.s3.endpoint | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.endpoint.region | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | | diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 36e2cca97..97e262718 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -48,7 +48,8 @@ license: | | celeborn.storage.hdfs.kerberos.principal | <undefined> | false | Kerberos principal for HDFS storage connection. | 0.3.2 | | | celeborn.storage.s3.access.key | <undefined> | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.storage.s3.dir | <undefined> | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | | -| celeborn.storage.s3.endpoint | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.endpoint.region | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.mpu.maxRetries | 5 | false | S3 MPU upload max retries. | 0.6.0 | | | celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.worker.activeConnection.max | <undefined> | false | If the number of active connections on a worker exceeds this configuration value, the worker will be marked as high-load in the heartbeat report, and the master will not include that node in the response of RequestSlots. | 0.3.1 | | | celeborn.worker.applicationRegistry.cache.size | 10000 | false | Cache size of the application registry on Workers. | 0.5.0 | | @@ -84,7 +85,7 @@ license: | | celeborn.worker.flusher.hdd.threads | 1 | false | Flusher's thread count per disk used for write data to HDD disks. | 0.2.0 | | | celeborn.worker.flusher.hdfs.buffer.size | 4m | false | Size of buffer used by a HDFS flusher. | 0.3.0 | | | celeborn.worker.flusher.hdfs.threads | 8 | false | Flusher's thread count used for write data to HDFS. | 0.2.0 | | -| celeborn.worker.flusher.s3.buffer.size | 4m | false | Size of buffer used by a S3 flusher. | 0.6.0 | | +| celeborn.worker.flusher.s3.buffer.size | 6m | false | Size of buffer used by a S3 flusher. | 0.6.0 | | | celeborn.worker.flusher.s3.threads | 8 | false | Flusher's thread count used for write data to S3. | 0.6.0 | | | celeborn.worker.flusher.shutdownTimeout | 3s | false | Timeout for a flusher to shutdown. | 0.2.0 | | | celeborn.worker.flusher.ssd.threads | 16 | false | Flusher's thread count per disk used for write data to SSD disks. | 0.2.0 | | diff --git a/master/pom.xml b/master/pom.xml index 51286b9ef..88f2f9ece 100644 --- a/master/pom.xml +++ b/master/pom.xml @@ -163,26 +163,6 @@ - - hadoop-aws - - - hadoop-aws-deps - - - - - org.apache.hadoop - hadoop-aws - ${hadoop.version} - - - com.amazonaws - aws-java-sdk-bundle - ${aws.version} - - - hadoop-2 @@ -198,5 +178,20 @@ + + aws + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + + + com.amazonaws + aws-java-sdk-bundle + ${aws.version} + + + diff --git a/multipart-uploader/pom.xml b/multipart-uploader/pom.xml new file mode 100644 index 000000000..cfdbbb4ee --- /dev/null +++ b/multipart-uploader/pom.xml @@ -0,0 +1,57 @@ + + + + 4.0.0 + + org.apache.celeborn + celeborn-parent_${scala.binary.version} + ${project.version} + ../pom.xml + + + celeborn-multipart-uploader_${scala.binary.version} + jar + Celeborn Multipart Uploader + + + + org.apache.celeborn + celeborn-service_${scala.binary.version} + ${project.version} + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + + + com.amazonaws + aws-java-sdk-bundle + ${aws.version} + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + + diff --git a/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java b/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java new file mode 100644 index 000000000..f1699c884 --- /dev/null +++ b/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.event.ProgressListener; +import com.amazonaws.retry.PredefinedRetryPolicies; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.services.s3.model.ListPartsRequest; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.PartListing; +import com.amazonaws.services.s3.model.PartSummary; +import com.amazonaws.services.s3.model.UploadPartRequest; + +import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class S3MultipartUploadHandler implements MultipartUploadHandler { + + private static final Logger logger = LoggerFactory.getLogger(S3MultipartUploadHandler.class); + + private String uploadId; + + private AmazonS3 s3Client; + + private String key; + + private String bucketName; + + private String s3AccessKey; + + private String s3SecretKey; + + private String s3EndpointRegion; + + private Integer s3MultiplePartUploadMaxRetries; + + public S3MultipartUploadHandler(String bucketName, String s3AccessKey, String s3SecretKey, String s3EndpointRegion, String key, Integer s3MultiplePartUploadMaxRetries) { + this.bucketName = bucketName; + this.s3AccessKey = s3AccessKey; + this.s3SecretKey = s3SecretKey; + this.s3EndpointRegion = s3EndpointRegion; + this.s3MultiplePartUploadMaxRetries = s3MultiplePartUploadMaxRetries; + BasicAWSCredentials basicAWSCredentials = + new BasicAWSCredentials(s3AccessKey, s3SecretKey); + ClientConfiguration clientConfig = new ClientConfiguration() + .withRetryPolicy(PredefinedRetryPolicies.getDefaultRetryPolicyWithCustomMaxRetries(s3MultiplePartUploadMaxRetries)) + .withMaxErrorRetry(s3MultiplePartUploadMaxRetries); + this.s3Client = + AmazonS3ClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials)) + .withRegion(s3EndpointRegion) + .withClientConfiguration(clientConfig) + .build(); + this.key = key; + } + + @Override + public void startUpload() { + InitiateMultipartUploadRequest initRequest = + new InitiateMultipartUploadRequest(bucketName, key); + InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest); + this.uploadId = initResponse.getUploadId(); + } + + @Override + public void putPart(InputStream inputStream, Integer partNumber, Boolean finalFlush) throws IOException { + try (InputStream inStream = inputStream) { + int partSize = inStream.available(); + if (partSize == 0) { + logger.debug("key {} uploadId {} part size is 0 for part number {} finalFlush {}", key, uploadId, partNumber, finalFlush); + return; + } + UploadPartRequest uploadRequest = + new UploadPartRequest() + .withBucketName(bucketName) + .withKey(key) + .withUploadId(uploadId) + .withPartNumber(partNumber) + .withInputStream(inStream) + .withPartSize(partSize) + .withLastPart(finalFlush); + s3Client.uploadPart(uploadRequest); + logger.debug("key {} uploadId {} part number {} uploaded with size {} finalFlush {}", key, uploadId, partNumber, partSize, finalFlush); + } catch (RuntimeException | IOException e) { + logger.error("Failed to upload part", e); + throw e; + } + } + + @Override + public void complete() { + List partETags = new ArrayList<>(); + ListPartsRequest listPartsRequest = new ListPartsRequest(bucketName, key, uploadId); + PartListing partListing; + do { + partListing = s3Client.listParts(listPartsRequest); + for (PartSummary part : partListing.getParts()) { + partETags.add(new PartETag(part.getPartNumber(), part.getETag())); + } + listPartsRequest.setPartNumberMarker(partListing.getNextPartNumberMarker()); + } while (partListing.isTruncated()); + if (partETags.size() == 0){ + logger.debug("bucket {} key {} uploadId {} has no parts uploaded, aborting upload", bucketName, key, uploadId); + abort(); + logger.debug("bucket {} key {} upload completed with size {}", bucketName, key, 0); + return; + } + ProgressListener progressListener = progressEvent -> { + logger.debug("key {} uploadId {} progress event type {} transferred {} bytes", key, uploadId, progressEvent.getEventType(), progressEvent.getBytesTransferred()); + }; + + CompleteMultipartUploadRequest compRequest = + new CompleteMultipartUploadRequest( + bucketName, key, uploadId, partETags) + .withGeneralProgressListener(progressListener); + CompleteMultipartUploadResult compResult = s3Client.completeMultipartUpload(compRequest); + logger.debug("bucket {} key {} uploadId {} upload completed location is in {} ", bucketName, key, uploadId, compResult.getLocation()); + } + + @Override + public void abort() { + AbortMultipartUploadRequest abortMultipartUploadRequest = + new AbortMultipartUploadRequest(bucketName, key, uploadId); + s3Client.abortMultipartUpload(abortMultipartUploadRequest); + } + + @Override + public void close() { + if (s3Client != null) { + s3Client.shutdown(); + } + } +} diff --git a/pom.xml b/pom.xml index 1da158f70..a8549b5fc 100644 --- a/pom.xml +++ b/pom.xml @@ -38,6 +38,7 @@ service master worker + web cli @@ -71,6 +72,7 @@ 3.3.6 + 1.12.532