[CELEBORN-546][FLINK] Use autoIncrement partitionId replace encode(mapId, attemptId) for generating partitionId (#1447)

This commit is contained in:
Shuang 2023-04-22 16:33:22 +08:00 committed by GitHub
parent e3ae2f0e17
commit d68deecaaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 95 additions and 292 deletions

View File

@ -58,6 +58,10 @@ public class RemoteShuffleDescriptor implements ShuffleDescriptor {
return jobId;
}
public String getShuffleId() {
return shuffleId;
}
public RemoteShuffleResource getShuffleResource() {
return shuffleResource;
}

View File

@ -127,18 +127,16 @@ public class RemoteShuffleMaster implements ShuffleMaster<RemoteShuffleDescripto
FlinkResultPartitionInfo resultPartitionInfo =
new FlinkResultPartitionInfo(jobID, partitionDescriptor, producerDescriptor);
ShuffleTask shuffleTask =
encodeExternalShuffleTask(
ShuffleResourceDescriptor shuffleResourceDescriptor =
genShuffleResourceDescriptor(
resultPartitionInfo.getShuffleId(),
resultPartitionInfo.getTaskId(),
resultPartitionInfo.getAttemptId());
synchronized (shuffleIds) {
shuffleIds.add(shuffleTask.getShuffleId());
shuffleIds.add(shuffleResourceDescriptor.getShuffleId());
}
ShuffleResourceDescriptor shuffleResourceDescriptor =
new ShuffleResourceDescriptor(shuffleTask);
RemoteShuffleResource remoteShuffleResource =
new RemoteShuffleResource(
lifecycleManager.getRssMetaServiceHost(),
@ -238,18 +236,20 @@ public class RemoteShuffleMaster implements ShuffleMaster<RemoteShuffleDescripto
ThreadUtils.shutdownExecutors(10, executor);
}
private ShuffleTask encodeExternalShuffleTask(
private ShuffleResourceDescriptor genShuffleResourceDescriptor(
String taskShuffleId, int mapId, String taskAttemptId) {
int shuffleId = shuffleTaskInfo.getShuffleId(taskShuffleId);
int attemptId = shuffleTaskInfo.getAttemptId(taskShuffleId, mapId, taskAttemptId);
int attemptId = shuffleTaskInfo.genAttemptId(shuffleId, mapId);
int partitionId = shuffleTaskInfo.genPartitionId(shuffleId);
LOG.info(
"encode task from ({}, {}, {}) to ({}, {},, {})",
"Assign for ({}, {}, {}) resource ({}, {}, {}, {})",
taskShuffleId,
mapId,
taskAttemptId,
shuffleId,
mapId,
attemptId);
return new ShuffleTask(shuffleId, mapId, attemptId);
attemptId,
partitionId);
return new ShuffleResourceDescriptor(shuffleId, mapId, attemptId, partitionId);
}
}

View File

@ -73,6 +73,7 @@ public class RemoteShuffleOutputGate {
private int shuffleId;
private int mapId;
private int attemptId;
private int partitionId;
private String rssMetaServiceHost;
private int rssMetaServicePort;
private UserIdentifier userIdentifier;
@ -104,6 +105,8 @@ public class RemoteShuffleOutputGate {
this.mapId = shuffleDesc.getShuffleResource().getMapPartitionShuffleDescriptor().getMapId();
this.attemptId =
shuffleDesc.getShuffleResource().getMapPartitionShuffleDescriptor().getAttemptId();
this.partitionId =
shuffleDesc.getShuffleResource().getMapPartitionShuffleDescriptor().getPartitionId();
this.rssMetaServiceHost = shuffleDesc.getShuffleResource().getRssMetaServiceHost();
this.rssMetaServicePort = shuffleDesc.getShuffleResource().getRssMetaServicePort();
this.flinkShuffleClient = getShuffleClient();
@ -236,7 +239,7 @@ public class RemoteShuffleOutputGate {
if (isFirstHandShake) {
partitionLocation =
flinkShuffleClient.registerMapPartitionTask(
applicationId, shuffleId, numMappers, mapId, attemptId);
applicationId, shuffleId, numMappers, mapId, attemptId, partitionId);
Utils.checkNotNull(partitionLocation);
currentRegionIndex = 0;

View File

@ -19,8 +19,6 @@ package org.apache.celeborn.plugin.flink;
import java.io.Serializable;
import org.apache.celeborn.common.util.PackedPartitionId;
public class ShuffleResourceDescriptor implements Serializable {
private static final long serialVersionUID = -1251659747395561342L;
@ -30,11 +28,11 @@ public class ShuffleResourceDescriptor implements Serializable {
private final int attemptId;
private final int partitionId;
public ShuffleResourceDescriptor(ShuffleTask shuffleTask) {
this.shuffleId = shuffleTask.getShuffleId();
this.mapId = shuffleTask.getMapId();
this.attemptId = shuffleTask.getAttemptId();
this.partitionId = PackedPartitionId.packedPartitionId(mapId, attemptId);
public ShuffleResourceDescriptor(int shuffleId, int mapId, int attemptId, int partitionId) {
this.shuffleId = shuffleId;
this.mapId = mapId;
this.attemptId = attemptId;
this.partitionId = partitionId;
}
public int getShuffleId() {

View File

@ -1,42 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.plugin.flink;
public class ShuffleTask {
private final int shuffleId;
private final int mapId;
private final int attemptId;
public ShuffleTask(int shuffleId, int mapId, int attemptId) {
this.shuffleId = shuffleId;
this.mapId = mapId;
this.attemptId = attemptId;
}
public int getShuffleId() {
return shuffleId;
}
public int getMapId() {
return mapId;
}
public int getAttemptId() {
return attemptId;
}
}

View File

@ -18,17 +18,15 @@
package org.apache.celeborn.plugin.flink;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.celeborn.common.util.JavaUtils;
public class ShuffleTaskInfo {
private int currentShuffleIndex = 0;
// task shuffle id -> mapId_taskAttemptId -> attemptIdx
private ConcurrentHashMap<String, ConcurrentHashMap<String, Integer>>
taskShuffleAttemptIdToAttemptId = JavaUtils.newConcurrentHashMap();
// map attemptId index
private ConcurrentHashMap<String, ConcurrentHashMap<Integer, Integer>> taskShuffleAttemptIdIndex =
JavaUtils.newConcurrentHashMap();
private ConcurrentHashMap<Integer, ConcurrentHashMap<Integer, AtomicInteger>>
shuffleIdMapAttemptIdIndex = JavaUtils.newConcurrentHashMap();
// task shuffle id -> celeborn shuffle id
private ConcurrentHashMap<String, Integer> taskShuffleIdToShuffleId =
JavaUtils.newConcurrentHashMap();
@ -36,6 +34,9 @@ public class ShuffleTaskInfo {
private ConcurrentHashMap<Integer, String> shuffleIdToTaskShuffleId =
JavaUtils.newConcurrentHashMap();
private ConcurrentHashMap<Integer, AtomicInteger> shuffleIdPartitionIdIndex =
JavaUtils.newConcurrentHashMap();
public int getShuffleId(String taskShuffleId) {
synchronized (taskShuffleIdToShuffleId) {
if (taskShuffleIdToShuffleId.containsKey(taskShuffleId)) {
@ -43,6 +44,8 @@ public class ShuffleTaskInfo {
} else {
taskShuffleIdToShuffleId.put(taskShuffleId, currentShuffleIndex);
shuffleIdToTaskShuffleId.put(currentShuffleIndex, taskShuffleId);
shuffleIdMapAttemptIdIndex.put(currentShuffleIndex, JavaUtils.newConcurrentHashMap());
shuffleIdPartitionIdIndex.put(currentShuffleIndex, new AtomicInteger(0));
int tempShuffleIndex = currentShuffleIndex;
currentShuffleIndex = currentShuffleIndex + 1;
return tempShuffleIndex;
@ -50,36 +53,24 @@ public class ShuffleTaskInfo {
}
}
public int getAttemptId(String taskShuffleId, int mapId, String attemptId) {
ConcurrentHashMap<Integer, Integer> attemptIndex =
taskShuffleAttemptIdIndex.computeIfAbsent(
taskShuffleId, (id) -> JavaUtils.newConcurrentHashMap());
ConcurrentHashMap<String, Integer> attemptIdMap =
taskShuffleAttemptIdToAttemptId.computeIfAbsent(
taskShuffleId, (id) -> JavaUtils.newConcurrentHashMap());
String mapAttemptId = mapId + "_" + attemptId;
synchronized (attemptIndex) {
if (!attemptIdMap.containsKey(mapAttemptId)) {
if (attemptIndex.containsKey(mapId)) {
int index = attemptIndex.get(mapId);
attemptIdMap.put(mapAttemptId, index + 1);
attemptIndex.put(mapId, index + 1);
} else {
attemptIdMap.put(mapAttemptId, 0);
attemptIndex.put(mapId, 0);
}
}
}
public int genAttemptId(int shuffleId, int mapId) {
AtomicInteger currentAttemptIndex =
shuffleIdMapAttemptIdIndex
.get(shuffleId)
.computeIfAbsent(mapId, (id) -> new AtomicInteger(0));
return currentAttemptIndex.getAndIncrement();
}
return attemptIdMap.get(mapAttemptId);
public int genPartitionId(int shuffleId) {
return shuffleIdPartitionIdIndex.get(shuffleId).getAndIncrement();
}
public void removeExpiredShuffle(int shuffleId) {
if (shuffleIdToTaskShuffleId.containsKey(shuffleId)) {
shuffleIdPartitionIdIndex.remove(shuffleId);
shuffleIdMapAttemptIdIndex.remove(shuffleId);
String taskShuffleId = shuffleIdToTaskShuffleId.remove(shuffleId);
taskShuffleIdToShuffleId.remove(taskShuffleId);
taskShuffleAttemptIdIndex.remove(taskShuffleId);
taskShuffleAttemptIdToAttemptId.remove(taskShuffleId);
}
}
}

View File

@ -47,7 +47,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.util.PackedPartitionId;
import org.apache.celeborn.plugin.flink.config.PluginConf;
import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
@ -124,8 +123,6 @@ public class RemoteShuffleMasterTest {
mapPartitionShuffleDescriptor =
remoteShuffleDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
Assert.assertEquals(0, mapPartitionShuffleDescriptor.getShuffleId());
Assert.assertEquals(
PackedPartitionId.packedPartitionId(1, 1), mapPartitionShuffleDescriptor.getPartitionId());
Assert.assertEquals(1, mapPartitionShuffleDescriptor.getAttemptId());
Assert.assertEquals(1, mapPartitionShuffleDescriptor.getMapId());
}

View File

@ -58,7 +58,8 @@ public class RemoteShuffleOutputGateSuiteJ {
PartitionLocation partitionLocation =
new PartitionLocation(1, 0, "localhost", 123, 245, 789, 238, PartitionLocation.Mode.MASTER);
when(shuffleClient.registerMapPartitionTask(any(), anyInt(), anyInt(), anyInt(), anyInt()))
when(shuffleClient.registerMapPartitionTask(
any(), anyInt(), anyInt(), anyInt(), anyInt(), anyInt()))
.thenAnswer(t -> partitionLocation);
doNothing()
.when(remoteShuffleOutputGate.flinkShuffleClient)

View File

@ -60,7 +60,6 @@ import org.apache.flink.util.function.SupplierWithException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
@ -557,16 +556,12 @@ public class RemoteShuffleResultPartitionSuiteJ {
Random random = new Random();
byte[] bytes = new byte[16];
random.nextBytes(bytes);
ShuffleTask shuffleTask = Mockito.mock(ShuffleTask.class);
Mockito.when(shuffleTask.getAttemptId()).thenReturn(1);
Mockito.when(shuffleTask.getMapId()).thenReturn(1);
Mockito.when(shuffleTask.getShuffleId()).thenReturn(1);
return new RemoteShuffleDescriptor(
new JobID(bytes).toString(),
new JobID(bytes),
new JobID(bytes).toString(),
new ResultPartitionID(),
new RemoteShuffleResource("1", 2, new ShuffleResourceDescriptor(shuffleTask)));
new RemoteShuffleResource("1", 2, new ShuffleResourceDescriptor(1, 1, 1, 0)));
}
/** Data written and its {@link Buffer.DataType}. */

View File

@ -34,21 +34,21 @@ public class ShuffleTaskInfoSuitJ {
int encodeShuffleId0 = shuffleTaskInfo.getShuffleId("shuffleId");
Assert.assertEquals(encodeShuffleId0, 0);
int encodeAttemptId011 = shuffleTaskInfo.getAttemptId("shuffleId", 1, "attempt1");
int encodeAttemptId112 = shuffleTaskInfo.getAttemptId("shuffleId1", 1, "attempt2");
int encodeAttemptId021 = shuffleTaskInfo.getAttemptId("shuffleId", 2, "attempt1");
int encodeAttemptId012 = shuffleTaskInfo.getAttemptId("shuffleId", 1, "attempt2");
int encodeAttemptId011 = shuffleTaskInfo.genAttemptId(encodeShuffleId1, 1);
int encodeAttemptId112 = shuffleTaskInfo.genAttemptId(encodeShuffleId1, 1);
int encodeAttemptId021 = shuffleTaskInfo.genAttemptId(encodeShuffleId0, 2);
int encodeAttemptId012 = shuffleTaskInfo.genAttemptId(encodeShuffleId0, 1);
Assert.assertEquals(encodeAttemptId011, 0);
Assert.assertEquals(encodeAttemptId112, 0);
Assert.assertEquals(encodeAttemptId112, 1);
Assert.assertEquals(encodeAttemptId021, 0);
Assert.assertEquals(encodeAttemptId012, 1);
Assert.assertEquals(encodeAttemptId012, 0);
// remove shuffleId and reEncode
shuffleTaskInfo.removeExpiredShuffle(encodeShuffleId);
int encodeShuffleIdNew = shuffleTaskInfo.getShuffleId("shuffleId");
Assert.assertEquals(encodeShuffleIdNew, 2);
int encodeAttemptId211 = shuffleTaskInfo.getAttemptId("shuffleId", 1, "attempt1");
int encodeAttemptId211 = shuffleTaskInfo.genAttemptId(encodeShuffleIdNew, 1);
Assert.assertEquals(encodeAttemptId211, 0);
}

View File

@ -196,7 +196,8 @@ public abstract class ShuffleClient {
public abstract void shutdown();
public abstract PartitionLocation registerMapPartitionTask(
String appId, int shuffleId, int numMappers, int mapId, int attemptId) throws IOException;
String appId, int shuffleId, int numMappers, int mapId, int attemptId, int partitionId)
throws IOException;
public abstract ConcurrentHashMap<Integer, PartitionLocation> getPartitionLocation(
String applicationId, int shuffleId, int numMappers, int numPartitions);

View File

@ -383,8 +383,8 @@ public class ShuffleClientImpl extends ShuffleClient {
@VisibleForTesting
public PartitionLocation registerMapPartitionTask(
String appId, int shuffleId, int numMappers, int mapId, int attemptId) throws IOException {
int partitionId = PackedPartitionId.packedPartitionId(mapId, attemptId);
String appId, int shuffleId, int numMappers, int mapId, int attemptId, int partitionId)
throws IOException {
logger.info(
"Register MapPartition task for shuffle {} map {} attempt {} partition {} with {} mapper.",
shuffleId,

View File

@ -151,7 +151,7 @@ public class DummyShuffleClient extends ShuffleClient {
@Override
public PartitionLocation registerMapPartitionTask(
String appId, int shuffleId, int numMappers, int mapId, int attemptId) {
String appId, int shuffleId, int numMappers, int mapId, int attemptId, int partitionId) {
return null;
}

View File

@ -28,7 +28,6 @@ import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.util.JavaUtils.timeOutOrMeetCondition
import org.apache.celeborn.common.util.PackedPartitionId
trait WithShuffleClientSuite extends CelebornFunSuite {
@ -58,12 +57,13 @@ trait WithShuffleClientSuite extends CelebornFunSuite {
prepareService()
shuffleId = 1
var location =
shuffleClient.registerMapPartitionTask(APP, shuffleId, numMappers, mapId, attemptId)
Assert.assertEquals(location.getId, PackedPartitionId.packedPartitionId(mapId, attemptId))
shuffleClient.registerMapPartitionTask(APP, shuffleId, numMappers, mapId, attemptId, 1)
Assert.assertEquals(location.getId, 1)
// retry register
location = shuffleClient.registerMapPartitionTask(APP, shuffleId, numMappers, mapId, attemptId)
Assert.assertEquals(location.getId, PackedPartitionId.packedPartitionId(mapId, attemptId))
location =
shuffleClient.registerMapPartitionTask(APP, shuffleId, numMappers, mapId, attemptId, 1)
Assert.assertEquals(location.getId, 1)
// check all allocated slots
var partitionLocationInfos = lifecycleManager.workerSnapshots(shuffleId).values().asScala
@ -73,15 +73,19 @@ trait WithShuffleClientSuite extends CelebornFunSuite {
// another mapId
location =
shuffleClient.registerMapPartitionTask(APP, shuffleId, numMappers, mapId + 1, attemptId)
Assert.assertEquals(location.getId, PackedPartitionId.packedPartitionId(mapId + 1, attemptId))
shuffleClient.registerMapPartitionTask(APP, shuffleId, numMappers, mapId + 1, attemptId, 2)
Assert.assertEquals(location.getId, 2)
// another mapId with another attemptId
location =
shuffleClient.registerMapPartitionTask(APP, shuffleId, numMappers, mapId + 1, attemptId + 1)
Assert.assertEquals(
location.getId,
PackedPartitionId.packedPartitionId(mapId + 1, attemptId + 1))
shuffleClient.registerMapPartitionTask(
APP,
shuffleId,
numMappers,
mapId + 1,
attemptId + 1,
numMappers + 1)
Assert.assertEquals(location.getId, numMappers + 1)
// check all allocated all slots
partitionLocationInfos = lifecycleManager.workerSnapshots(shuffleId).values().asScala
@ -101,7 +105,7 @@ trait WithShuffleClientSuite extends CelebornFunSuite {
// check batch release
lifecycleManager.releasePartition(
shuffleId,
PackedPartitionId.packedPartitionId(mapId, attemptId + 1))
4)
timeOutOrMeetCondition(new Callable[java.lang.Boolean] {
override def call(): lang.Boolean = {
@ -122,7 +126,7 @@ trait WithShuffleClientSuite extends CelebornFunSuite {
// check single release
lifecycleManager.releasePartition(
shuffleId,
PackedPartitionId.packedPartitionId(mapId, attemptId + 1))
4)
Assert.assertEquals(
partitionLocationInfos.map(r => r.getMasterPartitions().size()).sum,
@ -153,15 +157,15 @@ trait WithShuffleClientSuite extends CelebornFunSuite {
}
private def registerAndFinishPartition(): Unit = {
shuffleClient.registerMapPartitionTask(APP, shuffleId, numMappers, mapId, attemptId)
shuffleClient.registerMapPartitionTask(APP, shuffleId, numMappers, mapId + 1, attemptId)
shuffleClient.registerMapPartitionTask(APP, shuffleId, numMappers, mapId + 2, attemptId)
shuffleClient.registerMapPartitionTask(APP, shuffleId, numMappers, mapId, attemptId, 1)
shuffleClient.registerMapPartitionTask(APP, shuffleId, numMappers, mapId + 1, attemptId, 2)
shuffleClient.registerMapPartitionTask(APP, shuffleId, numMappers, mapId + 2, attemptId, 3)
// task number incr to numMappers + 1
shuffleClient.registerMapPartitionTask(APP, shuffleId, numMappers, mapId, attemptId + 1)
shuffleClient.mapPartitionMapperEnd(APP, shuffleId, mapId, attemptId, numMappers, mapId)
shuffleClient.registerMapPartitionTask(APP, shuffleId, numMappers, mapId, attemptId + 1, 9)
shuffleClient.mapPartitionMapperEnd(APP, shuffleId, mapId, attemptId, numMappers, 1)
// retry
shuffleClient.mapPartitionMapperEnd(APP, shuffleId, mapId, attemptId, numMappers, mapId)
shuffleClient.mapPartitionMapperEnd(APP, shuffleId, mapId, attemptId, numMappers, 1)
// another attempt
shuffleClient.mapPartitionMapperEnd(
APP,
@ -169,8 +173,7 @@ trait WithShuffleClientSuite extends CelebornFunSuite {
mapId,
attemptId + 1,
numMappers,
PackedPartitionId
.packedPartitionId(mapId, attemptId + 1))
9)
// another mapper
shuffleClient.mapPartitionMapperEnd(APP, shuffleId, mapId + 1, attemptId, numMappers, mapId + 1)
}

View File

@ -22,7 +22,6 @@ import java.io.Serializable;
import org.roaringbitmap.RoaringBitmap;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.util.PackedPartitionId;
public class PartitionLocation implements Serializable {
public enum Mode {
@ -286,13 +285,9 @@ public class PartitionLocation implements Serializable {
peerAddr = peer.hostAndPorts();
}
return "PartitionLocation["
+ "\n id(rawId-attemptId)-epoch:"
+ "\n id-epoch:"
+ id
+ "("
+ getRawId()
+ "-"
+ getAttemptId()
+ ")-"
+ epoch
+ "\n host-rpcPort-pushPort-fetchPort-replicatePort:"
+ host
@ -326,12 +321,4 @@ public class PartitionLocation implements Serializable {
public void setMapIdBitMap(RoaringBitmap mapIdBitMap) {
this.mapIdBitMap = mapIdBitMap;
}
public int getRawId() {
return PackedPartitionId.getRawPartitionId(id);
}
public int getAttemptId() {
return PackedPartitionId.getAttemptId(id);
}
}

View File

@ -1,58 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.common.util;
import com.google.common.base.Preconditions;
/**
* Pack for encode/decode id of partition Location for id of partitionLocation attemptId
* raw_partitionId <br>
* (upper 8 bits = attemptId) (lower 24 bits = raw id) <br>
* (0000 0000) (0000 0000 0000 0000 0000 0000)<br>
*
* @see org.apache.celeborn.common.protocol.PartitionLocation#id
*/
public class PackedPartitionId {
/** The maximum partition identifier that can be encoded. Note that partition ids start from 0. */
static final int MAXIMUM_PARTITION_ID = (1 << 24) - 1; // 16777215
/** The maximum partition attempt id that can be encoded. Note that attempt ids start from 0. */
static final int MAXIMUM_ATTEMPT_ID = (1 << 8) - 1; // 255
static final int MASK_INT_LOWER_24_BITS = (int) (1L << 24) - 1;
public static int packedPartitionId(int partitionRawId, int attemptId) {
Preconditions.checkArgument(
partitionRawId <= MAXIMUM_PARTITION_ID,
"packedPartitionId called with invalid partitionRawId: " + partitionRawId);
Preconditions.checkArgument(
attemptId <= MAXIMUM_ATTEMPT_ID,
"packedPartitionId called with invalid attemptId: " + attemptId);
return (attemptId << 24) | partitionRawId;
}
public static int getRawPartitionId(int packedPartitionId) {
return packedPartitionId & MASK_INT_LOWER_24_BITS;
}
public static int getAttemptId(int packedPartitionId) {
return packedPartitionId >>> 24;
}
}

View File

@ -20,8 +20,6 @@ package org.apache.celeborn.common.protocol;
import org.junit.Test;
import org.roaringbitmap.RoaringBitmap;
import org.apache.celeborn.common.util.PackedPartitionId;
public class PartitionLocationSuiteJ {
private final int partitionId = 0;
@ -186,12 +184,10 @@ public class PartitionLocationSuiteJ {
bitmap.add(2);
bitmap.add(3);
int attemptId = 10;
int rawPartitionId = 1000;
int newPartitionId = PackedPartitionId.packedPartitionId(rawPartitionId, attemptId);
int partitionId = 1000;
PartitionLocation location3 =
new PartitionLocation(
newPartitionId,
partitionId,
epoch,
host,
rpcPort,
@ -205,7 +201,7 @@ public class PartitionLocationSuiteJ {
String exp1 =
"PartitionLocation[\n"
+ " id(rawId-attemptId)-epoch:0(0-0)-0\n"
+ " id-epoch:0-0\n"
+ " host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4\n"
+ " mode:MASTER\n"
+ " peer:(empty)\n"
@ -213,7 +209,7 @@ public class PartitionLocationSuiteJ {
+ " mapIdBitMap:{}]";
String exp2 =
"PartitionLocation[\n"
+ " id(rawId-attemptId)-epoch:0(0-0)-0\n"
+ " id-epoch:0-0\n"
+ " host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4\n"
+ " mode:MASTER\n"
+ " peer:(host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4)\n"
@ -221,16 +217,12 @@ public class PartitionLocationSuiteJ {
+ " mapIdBitMap:{}]";
String exp3 =
"PartitionLocation[\n"
+ " id(rawId-attemptId)-epoch:167773160(1000-10)-0\n"
+ " id-epoch:1000-0\n"
+ " host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4\n"
+ " mode:MASTER\n"
+ " peer:(host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4)\n"
+ " storage hint:StorageInfo{type=MEMORY, mountPoint='/mnt/disk/0', finalResult=false, filePath=null}\n"
+ " mapIdBitMap:{1,2,3}]";
System.out.println(location1);
System.out.println(location2);
System.out.println(location3);
assert exp1.equals(location1.toString());
assert exp2.equals(location2.toString());
assert exp3.equals(location3.toString());

View File

@ -1,53 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.common.util;
import org.junit.Assert;
import org.junit.Test;
public class PackedPartitionIdSuiteJ {
@Test
public void testNormalPackedPartitionId() {
assertTest(0, 0);
assertTest(555, 1);
assertTest(888, 1);
assertTest(10001, 100);
// testUseMaxPartitionId or MaxAttemptId
assertTest(PackedPartitionId.MAXIMUM_PARTITION_ID, 11);
assertTest(100, PackedPartitionId.MAXIMUM_ATTEMPT_ID);
assertTest(PackedPartitionId.MAXIMUM_PARTITION_ID, PackedPartitionId.MAXIMUM_ATTEMPT_ID);
}
@Test(expected = IllegalArgumentException.class)
public void testAttemptIdGreaterThanMaximumAttemptId() {
PackedPartitionId.packedPartitionId(0, PackedPartitionId.MAXIMUM_ATTEMPT_ID + 1);
}
@Test(expected = IllegalArgumentException.class)
public void testPartitionIdGreaterThanMaximumPartitionId() {
PackedPartitionId.packedPartitionId(PackedPartitionId.MAXIMUM_PARTITION_ID + 1, 1);
}
private void assertTest(int partitionRawId, int attemptId) {
int packedPartitionId = PackedPartitionId.packedPartitionId(partitionRawId, attemptId);
Assert.assertTrue(partitionRawId == PackedPartitionId.getRawPartitionId(packedPartitionId));
Assert.assertTrue(attemptId == PackedPartitionId.getAttemptId(packedPartitionId));
}
}

View File

@ -53,7 +53,7 @@ class ShuffleClientSuite extends WithShuffleClientSuite with MiniClusterFeature
}
assertThrows[IOException] {
() -> shuffleClient.registerMapPartitionTask(APP, 1, 1, 0, 0)
() -> shuffleClient.registerMapPartitionTask(APP, 1, 1, 0, 0, 1)
}
lifecycleManager.stop()

View File

@ -37,7 +37,6 @@ import org.apache.celeborn.common.network.server.BaseMessageHandler
import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType}
import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.common.unsafe.Platform
import org.apache.celeborn.common.util.PackedPartitionId
import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, HdfsFlusher, LocalFlusher, MapPartitionFileWriter, StorageManager}
@ -965,22 +964,13 @@ class PushDataHandler extends BaseMessageHandler with Logging {
callback: RpcResponseCallback,
wrappedCallback: RpcResponseCallback): Boolean = {
if (location == null) {
val (mapId, attemptId) = getMapAttempt(partitionUniqueId)
if (shuffleMapperAttempts.containsKey(shuffleKey) &&
-1 != shuffleMapperAttempts.get(shuffleKey).get(mapId)) {
// partition data has already been committed
logInfo(s"Receive push data from speculative task(shuffle $shuffleKey, map $mapId, " +
s" attempt $attemptId), but this mapper has already been ended.")
wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.STAGE_ENDED.getValue)))
} else {
val msg = s"Partition location wasn't found for task(shuffle $shuffleKey, map $mapId, " +
s"attempt $attemptId, uniqueId $partitionUniqueId)."
logWarning(s"[handle$messageType] $msg")
messageType match {
case Type.PUSH_MERGED_DATA => callback.onFailure(new CelebornIOException(msg))
case _ => callback.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_FAIL_PARTITION_NOT_FOUND))
}
val msg =
s"Partition location wasn't found for task(shuffle $shuffleKey, uniqueId $partitionUniqueId)."
logWarning(s"[handle$messageType] $msg")
messageType match {
case Type.PUSH_MERGED_DATA => callback.onFailure(new CelebornIOException(msg))
case _ => callback.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_FAIL_PARTITION_NOT_FOUND))
}
return true
}
@ -1058,12 +1048,6 @@ class PushDataHandler extends BaseMessageHandler with Logging {
false
}
private def getMapAttempt(
partitionUniqueId: String): (Int, Int) = {
val id = partitionUniqueId.split("-")(0).toInt
(PackedPartitionId.getRawPartitionId(id), PackedPartitionId.getAttemptId(id))
}
private def getClient(host: String, port: Int, partitionId: Int): TransportClient = {
if (conf.workerReplicateRandomConnectionEnabled) {
pushClientFactory.createClient(host, port)