diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/FlinkResultPartitionInfo.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/FlinkResultPartitionInfo.java index a8591d3e2..596c1ee5c 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/FlinkResultPartitionInfo.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/FlinkResultPartitionInfo.java @@ -18,7 +18,9 @@ package org.apache.celeborn.plugin.flink; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.shuffle.PartitionDescriptor; import org.apache.flink.runtime.shuffle.ProducerDescriptor; @@ -26,39 +28,50 @@ import org.apache.celeborn.plugin.flink.utils.FlinkUtils; public class FlinkResultPartitionInfo { private final JobID jobID; - private final PartitionDescriptor partitionDescriptor; - private final ProducerDescriptor producerDescriptor; + private final ResultPartitionID resultPartitionId; + private final IntermediateResultPartitionID partitionId; + private final ExecutionAttemptID producerId; + + public FlinkResultPartitionInfo(JobID jobID, ResultPartitionID resultPartitionId) { + this.jobID = jobID; + this.resultPartitionId = resultPartitionId; + this.partitionId = resultPartitionId.getPartitionId(); + this.producerId = resultPartitionId.getProducerId(); + } public FlinkResultPartitionInfo( JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) { this.jobID = jobID; - this.partitionDescriptor = partitionDescriptor; - this.producerDescriptor = producerDescriptor; + this.resultPartitionId = + new ResultPartitionID( + partitionDescriptor.getPartitionId(), producerDescriptor.getProducerExecutionId()); + this.partitionId = partitionDescriptor.getPartitionId(); + this.producerId = producerDescriptor.getProducerExecutionId(); } public ResultPartitionID getResultPartitionId() { - return new ResultPartitionID( - partitionDescriptor.getPartitionId(), producerDescriptor.getProducerExecutionId()); + return resultPartitionId; } public String getShuffleId() { - return FlinkUtils.toShuffleId(jobID, partitionDescriptor.getResultId()); + return FlinkUtils.toShuffleId(jobID, partitionId.getIntermediateDataSetID()); } public int getTaskId() { - return partitionDescriptor.getPartitionId().getPartitionNumber(); + return partitionId.getPartitionNumber(); } public String getAttemptId() { - return FlinkUtils.toAttemptId(producerDescriptor.getProducerExecutionId()); + return FlinkUtils.toAttemptId(producerId); } @Override public String toString() { final StringBuilder sb = new StringBuilder("FlinkResultPartitionInfo{"); sb.append("jobID=").append(jobID); - sb.append(", partitionDescriptor=").append(partitionDescriptor.getPartitionId()); - sb.append(", producerDescriptor=").append(producerDescriptor.getProducerExecutionId()); + sb.append(", resultPartitionId=").append(resultPartitionId); + sb.append(", partitionId=").append(partitionId); + sb.append(", producerId=").append(producerId); sb.append('}'); return sb.toString(); } diff --git a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java new file mode 100644 index 000000000..326a11985 --- /dev/null +++ b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierFactory.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.plugin.flink.tiered; + +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; +import org.apache.flink.runtime.util.ConfigurationParserUtils; + +import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.plugin.flink.utils.FlinkUtils; + +/** + * The factory class of the Celeborn client, used as a tier of flink hybrid shuffle tiered storage. + */ +public class CelebornTierFactory implements TierFactory { + + private CelebornConf conf; + + /** + * The bytes size of a single buffer, default value is 32KB, it will be set according to the flink + * configuration in {@link CelebornTierFactory#setup}. + */ + private int bufferSizeBytes = -1; + + /** + * The max bytes size of a single segment, it will determine how many buffer can save in a single + * segment. + */ + private static int NUM_BYTES_PER_SEGMENT = 8 * 1024 * 1024; + + private static final String CELEBORN_TIER_NAME = CelebornTierFactory.class.getSimpleName(); + + @Override + public void setup(Configuration configuration) { + conf = FlinkUtils.toCelebornConf(configuration); + this.bufferSizeBytes = ConfigurationParserUtils.getPageSize(configuration); + } + + @Override + public TieredStorageMemorySpec getMasterAgentMemorySpec() { + return new TieredStorageMemorySpec(getCelebornTierName(), 0); + } + + @Override + public TieredStorageMemorySpec getProducerAgentMemorySpec() { + return new TieredStorageMemorySpec(getCelebornTierName(), 1); + } + + @Override + public TieredStorageMemorySpec getConsumerAgentMemorySpec() { + return new TieredStorageMemorySpec(getCelebornTierName(), 0); + } + + @Override + public TierMasterAgent createMasterAgent( + TieredStorageResourceRegistry tieredStorageResourceRegistry) { + return new CelebornTierMasterAgent(conf); + } + + @Override + public TierProducerAgent createProducerAgent( + int numPartitions, + int numSubpartitions, + TieredStoragePartitionId partitionId, + String dataFileBasePath, + boolean isBroadcastOnly, + TieredStorageMemoryManager storageMemoryManager, + TieredStorageNettyService nettyService, + TieredStorageResourceRegistry resourceRegistry, + BatchShuffleReadBufferPool bufferPool, + ScheduledExecutorService ioExecutor, + List shuffleDescriptors, + int maxRequestedBuffers) { + // TODO impl this in the follow-up PR. + return null; + } + + @Override + public TierConsumerAgent createConsumerAgent( + List tieredStorageConsumerSpecs, + List shuffleDescriptors, + TieredStorageNettyService nettyService) { + // TODO impl this in the follow-up PR. + return null; + } + + public static String getCelebornTierName() { + return CELEBORN_TIER_NAME; + } +} diff --git a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java new file mode 100644 index 000000000..fe10a889a --- /dev/null +++ b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.plugin.flink.tiered; + +import static org.apache.celeborn.plugin.flink.utils.Utils.checkState; + +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleHandler; +import org.apache.flink.runtime.shuffle.JobShuffleContext; +import org.apache.flink.runtime.shuffle.PartitionWithMetrics; +import org.apache.flink.util.ExecutorUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.celeborn.client.LifecycleManager; +import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.util.JavaUtils; +import org.apache.celeborn.common.util.ThreadUtils; +import org.apache.celeborn.plugin.flink.FlinkResultPartitionInfo; +import org.apache.celeborn.plugin.flink.RemoteShuffleResource; +import org.apache.celeborn.plugin.flink.ShuffleResourceDescriptor; +import org.apache.celeborn.plugin.flink.ShuffleResourceTracker; +import org.apache.celeborn.plugin.flink.ShuffleTaskInfo; +import org.apache.celeborn.plugin.flink.utils.FlinkUtils; + +public class CelebornTierMasterAgent implements TierMasterAgent { + + private static final Logger LOG = LoggerFactory.getLogger(CelebornTierMasterAgent.class); + + // Flink JobId -> Celeborn register shuffleIds + private final Map> jobShuffleIds = JavaUtils.newConcurrentHashMap(); + + private final ShuffleTaskInfo shuffleTaskInfo = new ShuffleTaskInfo(); + + private final ScheduledExecutorService executor = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-client-tier-master-executor"); + private final long lifecycleManagerTimestamp; + + private final CelebornConf conf; + + private ShuffleResourceTracker shuffleResourceTracker; + + private String celebornAppId; + + private volatile LifecycleManager lifecycleManager; + + public CelebornTierMasterAgent(CelebornConf conf) { + this.conf = conf; + this.lifecycleManagerTimestamp = System.currentTimeMillis(); + } + + @Override + public void registerJob(JobID jobID, TierShuffleHandler tierShuffleHandler) { + if (lifecycleManager == null) { + synchronized (CelebornTierMasterAgent.class) { + if (lifecycleManager == null) { + celebornAppId = FlinkUtils.toCelebornAppId(lifecycleManagerTimestamp, jobID); + LOG.info("CelebornAppId: {}", celebornAppId); + // The default value of this config option is false. If set to true, Celeborn will use + // local allocated workers as candidate being checked workers, this is more useful for + // map partition to regenerate the lost data. So if not set, set to true as default for + // flink. + conf.setIfMissing(CelebornConf.CLIENT_CHECKED_USE_ALLOCATED_WORKERS(), true); + lifecycleManager = new LifecycleManager(celebornAppId, conf); + this.shuffleResourceTracker = new ShuffleResourceTracker(executor, lifecycleManager); + } + } + } + + Set previousShuffleIds = jobShuffleIds.putIfAbsent(jobID, new HashSet<>()); + LOG.info("Register job: {}.", jobID); + if (previousShuffleIds != null) { + throw new RuntimeException("Duplicated registration job: " + jobID); + } + shuffleResourceTracker.registerJob(getJobShuffleContext(jobID, tierShuffleHandler)); + } + + @Override + public void unregisterJob(JobID jobID) { + LOG.info("Unregister job: {}.", jobID); + Set shuffleIds = jobShuffleIds.remove(jobID); + if (shuffleIds != null) { + executor.execute( + () -> { + for (Integer shuffleId : shuffleIds) { + lifecycleManager.unregisterShuffle(shuffleId); + shuffleTaskInfo.removeExpiredShuffle(shuffleId); + } + shuffleResourceTracker.unRegisterJob(jobID); + }); + } + } + + @Override + public TierShuffleDescriptor addPartitionAndGetShuffleDescriptor( + JobID jobID, ResultPartitionID resultPartitionID) { + FlinkResultPartitionInfo resultPartitionInfo = + new FlinkResultPartitionInfo(jobID, resultPartitionID); + ShuffleResourceDescriptor shuffleResourceDescriptor = + shuffleTaskInfo.genShuffleResourceDescriptor( + resultPartitionInfo.getShuffleId(), + resultPartitionInfo.getTaskId(), + resultPartitionInfo.getAttemptId()); + Set shuffleIds = jobShuffleIds.get(jobID); + if (shuffleIds == null) { + throw new RuntimeException("Can not find job in master agent, job: " + jobID); + } + shuffleIds.add(shuffleResourceDescriptor.getShuffleId()); + shuffleResourceTracker.addPartitionResource( + jobID, + shuffleResourceDescriptor.getShuffleId(), + shuffleResourceDescriptor.getPartitionId(), + resultPartitionID); + + RemoteShuffleResource remoteShuffleResource = + new RemoteShuffleResource( + lifecycleManager.getHost(), + lifecycleManager.getPort(), + lifecycleManagerTimestamp, + shuffleResourceDescriptor); + return new TierShuffleDescriptorImpl( + celebornAppId, + jobID, + resultPartitionInfo.getShuffleId(), + resultPartitionID, + remoteShuffleResource); + } + + @Override + public void releasePartition(TierShuffleDescriptor shuffleDescriptor) { + checkState(shuffleDescriptor instanceof TierShuffleDescriptorImpl, "Wrong descriptor type."); + try { + TierShuffleDescriptorImpl descriptor = (TierShuffleDescriptorImpl) shuffleDescriptor; + RemoteShuffleResource shuffleResource = descriptor.getShuffleResource(); + ShuffleResourceDescriptor resourceDescriptor = + shuffleResource.getMapPartitionShuffleDescriptor(); + LOG.debug("release partition resource: {}.", resourceDescriptor); + lifecycleManager.releasePartition( + resourceDescriptor.getShuffleId(), resourceDescriptor.getPartitionId()); + shuffleResourceTracker.removePartitionResource( + descriptor.getJobId(), + resourceDescriptor.getShuffleId(), + resourceDescriptor.getPartitionId()); + } catch (Throwable throwable) { + LOG.debug("Failed to release data partition {}.", shuffleDescriptor, throwable); + } + } + + @Override + public void close() { + try { + jobShuffleIds.clear(); + lifecycleManager.stop(); + } catch (Exception e) { + LOG.warn("Encounter exception when shutdown: {}", e.getMessage(), e); + } + + ExecutorUtils.gracefulShutdown(10, TimeUnit.SECONDS, executor); + } + + private JobShuffleContext getJobShuffleContext( + JobID jobID, TierShuffleHandler tierShuffleHandler) { + return new JobShuffleContext() { + @Override + public JobID getJobId() { + return jobID; + } + + @Override + public CompletableFuture stopTrackingAndReleasePartitions( + Collection resultPartitionIds) { + return tierShuffleHandler.onReleasePartitions( + resultPartitionIds.stream() + .map(TieredStorageIdMappingUtils::convertId) + .collect(Collectors.toList())); + } + + @Override + public CompletableFuture> getPartitionWithMetrics( + Duration duration, Set set) { + // TODO we should impl this when we support JM failover. + return CompletableFuture.completedFuture(Collections.emptyList()); + } + + @Override + public void notifyPartitionRecoveryStarted() { + // TODO we should impl this when we support JM failover. + } + }; + } +} diff --git a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/TierShuffleDescriptorImpl.java b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/TierShuffleDescriptorImpl.java new file mode 100644 index 000000000..27450edc8 --- /dev/null +++ b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/TierShuffleDescriptorImpl.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.plugin.flink.tiered; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; + +import org.apache.celeborn.plugin.flink.RemoteShuffleDescriptor; +import org.apache.celeborn.plugin.flink.RemoteShuffleResource; + +/** + * Wrap the {@link RemoteShuffleDescriptor} to implement {@link TierShuffleDescriptor} interface. + */ +public class TierShuffleDescriptorImpl implements TierShuffleDescriptor { + + private final RemoteShuffleDescriptor remoteShuffleDescriptor; + + public TierShuffleDescriptorImpl( + String celebornAppId, + JobID jobId, + String shuffleId, + ResultPartitionID resultPartitionID, + RemoteShuffleResource shuffleResource) { + this.remoteShuffleDescriptor = + new RemoteShuffleDescriptor( + celebornAppId, jobId, shuffleId, resultPartitionID, shuffleResource); + } + + public ResultPartitionID getResultPartitionID() { + return remoteShuffleDescriptor.getResultPartitionID(); + } + + public String getCelebornAppId() { + return remoteShuffleDescriptor.getCelebornAppId(); + } + + public JobID getJobId() { + return remoteShuffleDescriptor.getJobId(); + } + + public String getShuffleId() { + return remoteShuffleDescriptor.getShuffleId(); + } + + public RemoteShuffleResource getShuffleResource() { + return remoteShuffleDescriptor.getShuffleResource(); + } + + @Override + public String toString() { + return "TierShuffleDescriptorImpl{" + + "remoteShuffleDescriptor=" + + remoteShuffleDescriptor + + '}'; + } +}