[CELEBORN-1490][CIP-6] Introduce tier factory and master agent in flink hybrid shuffle

### What changes were proposed in this pull request?

Implement `TieredFactory` interface to adapt Flink hybrid shuffle and also introduce `CelebornTierMasterAgent`.

Note: Only the last commit need review in this PR.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
No need.

Closes #2721 from reswqa/cip6-4-pr.

Authored-by: Weijie Guo <reswqa@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
This commit is contained in:
Weijie Guo 2024-09-13 23:13:01 +08:00 committed by Shuang
parent 9621d1150d
commit 1053129ea4
4 changed files with 438 additions and 11 deletions

View File

@ -18,7 +18,9 @@
package org.apache.celeborn.plugin.flink;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
@ -26,39 +28,50 @@ import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
public class FlinkResultPartitionInfo {
private final JobID jobID;
private final PartitionDescriptor partitionDescriptor;
private final ProducerDescriptor producerDescriptor;
private final ResultPartitionID resultPartitionId;
private final IntermediateResultPartitionID partitionId;
private final ExecutionAttemptID producerId;
public FlinkResultPartitionInfo(JobID jobID, ResultPartitionID resultPartitionId) {
this.jobID = jobID;
this.resultPartitionId = resultPartitionId;
this.partitionId = resultPartitionId.getPartitionId();
this.producerId = resultPartitionId.getProducerId();
}
public FlinkResultPartitionInfo(
JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
this.jobID = jobID;
this.partitionDescriptor = partitionDescriptor;
this.producerDescriptor = producerDescriptor;
this.resultPartitionId =
new ResultPartitionID(
partitionDescriptor.getPartitionId(), producerDescriptor.getProducerExecutionId());
this.partitionId = partitionDescriptor.getPartitionId();
this.producerId = producerDescriptor.getProducerExecutionId();
}
public ResultPartitionID getResultPartitionId() {
return new ResultPartitionID(
partitionDescriptor.getPartitionId(), producerDescriptor.getProducerExecutionId());
return resultPartitionId;
}
public String getShuffleId() {
return FlinkUtils.toShuffleId(jobID, partitionDescriptor.getResultId());
return FlinkUtils.toShuffleId(jobID, partitionId.getIntermediateDataSetID());
}
public int getTaskId() {
return partitionDescriptor.getPartitionId().getPartitionNumber();
return partitionId.getPartitionNumber();
}
public String getAttemptId() {
return FlinkUtils.toAttemptId(producerDescriptor.getProducerExecutionId());
return FlinkUtils.toAttemptId(producerId);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("FlinkResultPartitionInfo{");
sb.append("jobID=").append(jobID);
sb.append(", partitionDescriptor=").append(partitionDescriptor.getPartitionId());
sb.append(", producerDescriptor=").append(producerDescriptor.getProducerExecutionId());
sb.append(", resultPartitionId=").append(resultPartitionId);
sb.append(", partitionId=").append(partitionId);
sb.append(", producerId=").append(producerId);
sb.append('}');
return sb.toString();
}

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.plugin.flink.tiered;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
/**
* The factory class of the Celeborn client, used as a tier of flink hybrid shuffle tiered storage.
*/
public class CelebornTierFactory implements TierFactory {
private CelebornConf conf;
/**
* The bytes size of a single buffer, default value is 32KB, it will be set according to the flink
* configuration in {@link CelebornTierFactory#setup}.
*/
private int bufferSizeBytes = -1;
/**
* The max bytes size of a single segment, it will determine how many buffer can save in a single
* segment.
*/
private static int NUM_BYTES_PER_SEGMENT = 8 * 1024 * 1024;
private static final String CELEBORN_TIER_NAME = CelebornTierFactory.class.getSimpleName();
@Override
public void setup(Configuration configuration) {
conf = FlinkUtils.toCelebornConf(configuration);
this.bufferSizeBytes = ConfigurationParserUtils.getPageSize(configuration);
}
@Override
public TieredStorageMemorySpec getMasterAgentMemorySpec() {
return new TieredStorageMemorySpec(getCelebornTierName(), 0);
}
@Override
public TieredStorageMemorySpec getProducerAgentMemorySpec() {
return new TieredStorageMemorySpec(getCelebornTierName(), 1);
}
@Override
public TieredStorageMemorySpec getConsumerAgentMemorySpec() {
return new TieredStorageMemorySpec(getCelebornTierName(), 0);
}
@Override
public TierMasterAgent createMasterAgent(
TieredStorageResourceRegistry tieredStorageResourceRegistry) {
return new CelebornTierMasterAgent(conf);
}
@Override
public TierProducerAgent createProducerAgent(
int numPartitions,
int numSubpartitions,
TieredStoragePartitionId partitionId,
String dataFileBasePath,
boolean isBroadcastOnly,
TieredStorageMemoryManager storageMemoryManager,
TieredStorageNettyService nettyService,
TieredStorageResourceRegistry resourceRegistry,
BatchShuffleReadBufferPool bufferPool,
ScheduledExecutorService ioExecutor,
List<TierShuffleDescriptor> shuffleDescriptors,
int maxRequestedBuffers) {
// TODO impl this in the follow-up PR.
return null;
}
@Override
public TierConsumerAgent createConsumerAgent(
List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs,
List<TierShuffleDescriptor> shuffleDescriptors,
TieredStorageNettyService nettyService) {
// TODO impl this in the follow-up PR.
return null;
}
public static String getCelebornTierName() {
return CELEBORN_TIER_NAME;
}
}

View File

@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.plugin.flink.tiered;
import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleHandler;
import org.apache.flink.runtime.shuffle.JobShuffleContext;
import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
import org.apache.flink.util.ExecutorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.client.LifecycleManager;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.plugin.flink.FlinkResultPartitionInfo;
import org.apache.celeborn.plugin.flink.RemoteShuffleResource;
import org.apache.celeborn.plugin.flink.ShuffleResourceDescriptor;
import org.apache.celeborn.plugin.flink.ShuffleResourceTracker;
import org.apache.celeborn.plugin.flink.ShuffleTaskInfo;
import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
public class CelebornTierMasterAgent implements TierMasterAgent {
private static final Logger LOG = LoggerFactory.getLogger(CelebornTierMasterAgent.class);
// Flink JobId -> Celeborn register shuffleIds
private final Map<JobID, Set<Integer>> jobShuffleIds = JavaUtils.newConcurrentHashMap();
private final ShuffleTaskInfo shuffleTaskInfo = new ShuffleTaskInfo();
private final ScheduledExecutorService executor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-client-tier-master-executor");
private final long lifecycleManagerTimestamp;
private final CelebornConf conf;
private ShuffleResourceTracker shuffleResourceTracker;
private String celebornAppId;
private volatile LifecycleManager lifecycleManager;
public CelebornTierMasterAgent(CelebornConf conf) {
this.conf = conf;
this.lifecycleManagerTimestamp = System.currentTimeMillis();
}
@Override
public void registerJob(JobID jobID, TierShuffleHandler tierShuffleHandler) {
if (lifecycleManager == null) {
synchronized (CelebornTierMasterAgent.class) {
if (lifecycleManager == null) {
celebornAppId = FlinkUtils.toCelebornAppId(lifecycleManagerTimestamp, jobID);
LOG.info("CelebornAppId: {}", celebornAppId);
// The default value of this config option is false. If set to true, Celeborn will use
// local allocated workers as candidate being checked workers, this is more useful for
// map partition to regenerate the lost data. So if not set, set to true as default for
// flink.
conf.setIfMissing(CelebornConf.CLIENT_CHECKED_USE_ALLOCATED_WORKERS(), true);
lifecycleManager = new LifecycleManager(celebornAppId, conf);
this.shuffleResourceTracker = new ShuffleResourceTracker(executor, lifecycleManager);
}
}
}
Set<Integer> previousShuffleIds = jobShuffleIds.putIfAbsent(jobID, new HashSet<>());
LOG.info("Register job: {}.", jobID);
if (previousShuffleIds != null) {
throw new RuntimeException("Duplicated registration job: " + jobID);
}
shuffleResourceTracker.registerJob(getJobShuffleContext(jobID, tierShuffleHandler));
}
@Override
public void unregisterJob(JobID jobID) {
LOG.info("Unregister job: {}.", jobID);
Set<Integer> shuffleIds = jobShuffleIds.remove(jobID);
if (shuffleIds != null) {
executor.execute(
() -> {
for (Integer shuffleId : shuffleIds) {
lifecycleManager.unregisterShuffle(shuffleId);
shuffleTaskInfo.removeExpiredShuffle(shuffleId);
}
shuffleResourceTracker.unRegisterJob(jobID);
});
}
}
@Override
public TierShuffleDescriptor addPartitionAndGetShuffleDescriptor(
JobID jobID, ResultPartitionID resultPartitionID) {
FlinkResultPartitionInfo resultPartitionInfo =
new FlinkResultPartitionInfo(jobID, resultPartitionID);
ShuffleResourceDescriptor shuffleResourceDescriptor =
shuffleTaskInfo.genShuffleResourceDescriptor(
resultPartitionInfo.getShuffleId(),
resultPartitionInfo.getTaskId(),
resultPartitionInfo.getAttemptId());
Set<Integer> shuffleIds = jobShuffleIds.get(jobID);
if (shuffleIds == null) {
throw new RuntimeException("Can not find job in master agent, job: " + jobID);
}
shuffleIds.add(shuffleResourceDescriptor.getShuffleId());
shuffleResourceTracker.addPartitionResource(
jobID,
shuffleResourceDescriptor.getShuffleId(),
shuffleResourceDescriptor.getPartitionId(),
resultPartitionID);
RemoteShuffleResource remoteShuffleResource =
new RemoteShuffleResource(
lifecycleManager.getHost(),
lifecycleManager.getPort(),
lifecycleManagerTimestamp,
shuffleResourceDescriptor);
return new TierShuffleDescriptorImpl(
celebornAppId,
jobID,
resultPartitionInfo.getShuffleId(),
resultPartitionID,
remoteShuffleResource);
}
@Override
public void releasePartition(TierShuffleDescriptor shuffleDescriptor) {
checkState(shuffleDescriptor instanceof TierShuffleDescriptorImpl, "Wrong descriptor type.");
try {
TierShuffleDescriptorImpl descriptor = (TierShuffleDescriptorImpl) shuffleDescriptor;
RemoteShuffleResource shuffleResource = descriptor.getShuffleResource();
ShuffleResourceDescriptor resourceDescriptor =
shuffleResource.getMapPartitionShuffleDescriptor();
LOG.debug("release partition resource: {}.", resourceDescriptor);
lifecycleManager.releasePartition(
resourceDescriptor.getShuffleId(), resourceDescriptor.getPartitionId());
shuffleResourceTracker.removePartitionResource(
descriptor.getJobId(),
resourceDescriptor.getShuffleId(),
resourceDescriptor.getPartitionId());
} catch (Throwable throwable) {
LOG.debug("Failed to release data partition {}.", shuffleDescriptor, throwable);
}
}
@Override
public void close() {
try {
jobShuffleIds.clear();
lifecycleManager.stop();
} catch (Exception e) {
LOG.warn("Encounter exception when shutdown: {}", e.getMessage(), e);
}
ExecutorUtils.gracefulShutdown(10, TimeUnit.SECONDS, executor);
}
private JobShuffleContext getJobShuffleContext(
JobID jobID, TierShuffleHandler tierShuffleHandler) {
return new JobShuffleContext() {
@Override
public JobID getJobId() {
return jobID;
}
@Override
public CompletableFuture<?> stopTrackingAndReleasePartitions(
Collection<ResultPartitionID> resultPartitionIds) {
return tierShuffleHandler.onReleasePartitions(
resultPartitionIds.stream()
.map(TieredStorageIdMappingUtils::convertId)
.collect(Collectors.toList()));
}
@Override
public CompletableFuture<Collection<PartitionWithMetrics>> getPartitionWithMetrics(
Duration duration, Set<ResultPartitionID> set) {
// TODO we should impl this when we support JM failover.
return CompletableFuture.completedFuture(Collections.emptyList());
}
@Override
public void notifyPartitionRecoveryStarted() {
// TODO we should impl this when we support JM failover.
}
};
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.plugin.flink.tiered;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
import org.apache.celeborn.plugin.flink.RemoteShuffleDescriptor;
import org.apache.celeborn.plugin.flink.RemoteShuffleResource;
/**
* Wrap the {@link RemoteShuffleDescriptor} to implement {@link TierShuffleDescriptor} interface.
*/
public class TierShuffleDescriptorImpl implements TierShuffleDescriptor {
private final RemoteShuffleDescriptor remoteShuffleDescriptor;
public TierShuffleDescriptorImpl(
String celebornAppId,
JobID jobId,
String shuffleId,
ResultPartitionID resultPartitionID,
RemoteShuffleResource shuffleResource) {
this.remoteShuffleDescriptor =
new RemoteShuffleDescriptor(
celebornAppId, jobId, shuffleId, resultPartitionID, shuffleResource);
}
public ResultPartitionID getResultPartitionID() {
return remoteShuffleDescriptor.getResultPartitionID();
}
public String getCelebornAppId() {
return remoteShuffleDescriptor.getCelebornAppId();
}
public JobID getJobId() {
return remoteShuffleDescriptor.getJobId();
}
public String getShuffleId() {
return remoteShuffleDescriptor.getShuffleId();
}
public RemoteShuffleResource getShuffleResource() {
return remoteShuffleDescriptor.getShuffleResource();
}
@Override
public String toString() {
return "TierShuffleDescriptorImpl{"
+ "remoteShuffleDescriptor="
+ remoteShuffleDescriptor
+ '}';
}
}