[CELEBORN-2055] Fix some typos

### What changes were proposed in this pull request?
Inspired by [FLINK-38038](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-38038?filter=allissues]), I used [Tongyi Lingma](https://lingma.aliyun.com/) and qwen3-thinking LLM to identify and fix some typo issues in the Celeborn codebase. For example:
- backLog → backlog
- won`t → won't
- can to be read → can be read
- mapDataPartition → mapPartitionData
- UserDefinePasswordAuthenticationProviderImpl → UserDefinedPasswordAuthenticationProviderImpl

### Why are the changes needed?
Remove typos to improve source code readability for users and ease development for developers.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Code and documentation cleanup does not require additional testing.

Closes #3356 from codenohup/fix-typo.

Authored-by: codenohup <huangxu.walker@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
This commit is contained in:
codenohup 2025-07-10 12:01:02 +08:00 committed by SteNicholas
parent cd5d9cd93d
commit 0fa600ade1
31 changed files with 69 additions and 73 deletions

View File

@ -16,7 +16,7 @@ limitations under the License.
*/}}
{{/*
Common labels for Celeborn master resources
Common labels for Celeborn worker resources
*/}}
{{- define "celeborn.worker.labels" -}}
{{ include "celeborn.labels" . }}
@ -24,7 +24,7 @@ app.kubernetes.io/role: worker
{{- end }}
{{/*
Selector labels for Celeborn master pods
Selector labels for Celeborn worker pods
*/}}
{{- define "celeborn.worker.selectorLabels" -}}
{{ include "celeborn.selectorLabels" . }}

View File

@ -35,7 +35,7 @@ image:
tag: ""
# -- Image pull policy
pullPolicy: Always
# -- Image name for init containter. (your-private-repo/alpine:3.18)
# -- Image name for init container. (your-private-repo/alpine:3.18)
initContainerImage: alpine:3.18
# -- Image pull secrets for private image registry

View File

@ -26,7 +26,7 @@ import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.cli.config.CliConfigManager
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.authentication.HttpAuthSchemes
import org.apache.celeborn.server.common.http.authentication.{UserDefinePasswordAuthenticationProviderImpl, UserDefineTokenAuthenticationProviderImpl}
import org.apache.celeborn.server.common.http.authentication.{UserDefinedPasswordAuthenticationProviderImpl, UserDefineTokenAuthenticationProviderImpl}
import org.apache.celeborn.service.deploy.MiniClusterFeature
import org.apache.celeborn.service.deploy.master.Master
import org.apache.celeborn.service.deploy.worker.Worker
@ -38,11 +38,11 @@ class TestCelebornCliCommands extends CelebornFunSuite with MiniClusterFeature {
.set(CelebornConf.MASTER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC"))
.set(
CelebornConf.MASTER_HTTP_AUTH_BASIC_PROVIDER,
classOf[UserDefinePasswordAuthenticationProviderImpl].getName)
classOf[UserDefinedPasswordAuthenticationProviderImpl].getName)
.set(CelebornConf.WORKER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC"))
.set(
CelebornConf.WORKER_HTTP_AUTH_BASIC_PROVIDER,
classOf[UserDefinePasswordAuthenticationProviderImpl].getName)
classOf[UserDefinedPasswordAuthenticationProviderImpl].getName)
.set(CelebornConf.MASTER_HTTP_AUTH_ADMINISTERS, Seq(CELEBORN_ADMINISTER))
.set(CelebornConf.WORKER_HTTP_AUTH_ADMINISTERS, Seq(CELEBORN_ADMINISTER))
.set(CelebornConf.DYNAMIC_CONFIG_STORE_BACKEND, "DB")
@ -54,7 +54,7 @@ class TestCelebornCliCommands extends CelebornFunSuite with MiniClusterFeature {
private val BASIC_AUTH_HEADER = HttpAuthSchemes.BASIC + " " + new String(
Base64.getEncoder.encode(
s"$CELEBORN_ADMINISTER:${UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD}".getBytes()),
s"$CELEBORN_ADMINISTER:${UserDefinedPasswordAuthenticationProviderImpl.VALID_PASSWORD}".getBytes()),
StandardCharsets.UTF_8)
protected var master: Master = _

View File

@ -193,7 +193,7 @@ public class CelebornBufferStream {
}
}
private void cleanStream(long streamId) {
private void cleanupStream(long streamId) {
if (isOpenSuccess) {
mapShuffleClient.getReadClientHandler().removeHandler(streamId);
clientFactory.unregisterSupplier(streamId);
@ -204,7 +204,7 @@ public class CelebornBufferStream {
public void close() {
synchronized (lock) {
cleanStream(streamId);
cleanupStream(streamId);
isClosed = true;
}
}
@ -222,7 +222,7 @@ public class CelebornBufferStream {
locations.length);
if (currentLocationIndex.get() > 0) {
logger.debug("Get end streamId {}", endedStreamId);
cleanStream(endedStreamId);
cleanupStream(endedStreamId);
}
if (currentLocationIndex.get() < locations.length) {

View File

@ -46,9 +46,7 @@ import org.apache.celeborn.plugin.flink.client.CelebornBufferStream;
import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl;
import org.apache.celeborn.plugin.flink.protocol.SubPartitionReadData;
/**
* Wrap the {@link CelebornBufferStream}, utilize in flink hybrid shuffle integration strategy now.
*/
/** Wrap the {@link CelebornBufferStream}, used in flink hybrid shuffle integration strategy now. */
public class CelebornChannelBufferReader {
private static final Logger LOG = LoggerFactory.getLogger(CelebornChannelBufferReader.class);

View File

@ -79,7 +79,7 @@ public class CelebornTierConsumerAgent implements TierConsumerAgent {
/**
* partitionId -> subPartitionId -> reader, note that subPartitions may share the same reader, as
* a single reader can consume multiple subPartitions to improvement performance.
* a single reader can consume multiple subPartitions to improve performance.
*/
private final Map<
TieredStoragePartitionId, Map<TieredStorageSubpartitionId, CelebornChannelBufferReader>>
@ -111,7 +111,7 @@ public class CelebornTierConsumerAgent implements TierConsumerAgent {
/**
* The notify target is flink inputGate, used in notify input gate which subPartition contain
* shuffle data that can to be read.
* shuffle data that can be read.
*/
private AvailabilityNotifier availabilityNotifier;

View File

@ -56,7 +56,7 @@ public class CelebornTierFactory implements TierFactory {
* The max bytes size of a single segment, it will determine how many buffer can save in a single
* segment.
*/
private static int NUM_BYTES_PER_SEGMENT = 8 * 1024 * 1024;
private static int MAX_BYTES_PER_SEGMENT = 8 * 1024 * 1024;
private static final String CELEBORN_TIER_NAME = CelebornTierFactory.class.getSimpleName();
@ -106,7 +106,7 @@ public class CelebornTierFactory implements TierFactory {
partitionId,
numPartitions,
numSubpartitions,
NUM_BYTES_PER_SEGMENT,
MAX_BYTES_PER_SEGMENT,
bufferSizeBytes,
storageMemoryManager,
resourceRegistry,

View File

@ -46,9 +46,7 @@ import org.apache.celeborn.plugin.flink.client.CelebornBufferStream;
import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl;
import org.apache.celeborn.plugin.flink.protocol.SubPartitionReadData;
/**
* Wrap the {@link CelebornBufferStream}, utilize in flink hybrid shuffle integration strategy now.
*/
/** Wrap the {@link CelebornBufferStream}, used in flink hybrid shuffle integration strategy now. */
public class CelebornChannelBufferReader {
private static final Logger LOG = LoggerFactory.getLogger(CelebornChannelBufferReader.class);

View File

@ -79,7 +79,7 @@ public class CelebornTierConsumerAgent implements TierConsumerAgent {
/**
* partitionId -> subPartitionId -> reader, note that subPartitions may share the same reader, as
* a single reader can consume multiple subPartitions to improvement performance.
* a single reader can consume multiple subPartitions to improve performance.
*/
private final Map<
TieredStoragePartitionId, Map<TieredStorageSubpartitionId, CelebornChannelBufferReader>>
@ -111,7 +111,7 @@ public class CelebornTierConsumerAgent implements TierConsumerAgent {
/**
* The notify target is flink inputGate, used in notify input gate which subPartition contain
* shuffle data that can to be read.
* shuffle data that can be read.
*/
private AvailabilityNotifier availabilityNotifier;

View File

@ -59,7 +59,7 @@ public class CelebornTierFactory implements TierFactory {
* The max bytes size of a single segment, it will determine how many buffer can save in a single
* segment.
*/
private static int NUM_BYTES_PER_SEGMENT = 8 * 1024 * 1024;
private static int MAX_BYTES_PER_SEGMENT = 8 * 1024 * 1024;
private static final String CELEBORN_TIER_NAME = CelebornTierFactory.class.getSimpleName();
@ -110,7 +110,7 @@ public class CelebornTierFactory implements TierFactory {
partitionId,
numPartitions,
numSubpartitions,
NUM_BYTES_PER_SEGMENT,
MAX_BYTES_PER_SEGMENT,
bufferSizeBytes,
storageMemoryManager,
resourceRegistry,

View File

@ -118,7 +118,7 @@ public class SparkUtils {
field.setAccessible(true);
return (SQLMetric) field.get(serializer);
} catch (NoSuchFieldException | IllegalAccessException e) {
logger.warn("Failed to get dataSize metric, aqe won`t work properly.");
logger.warn("Failed to get dataSize metric, aqe won't work properly.");
}
return null;
}

View File

@ -932,7 +932,7 @@ public class ShuffleClientImpl extends ShuffleClient {
} else if (StatusCode.STAGE_ENDED.getValue() == statusCode) {
stageEndShuffleSet.add(shuffleId);
return results;
} else if (StatusCode.SHUFFLE_NOT_REGISTERED.getValue() == statusCode) {
} else if (StatusCode.SHUFFLE_UNREGISTERED.getValue() == statusCode) {
logger.error("SHUFFLE_NOT_REGISTERED!");
return null;
}
@ -1882,7 +1882,7 @@ public class ShuffleClientImpl extends ShuffleClient {
response.pushFailedBatches()),
null,
null);
case SHUFFLE_NOT_REGISTERED:
case SHUFFLE_UNREGISTERED:
logger.warn(
"Request {} return {} for {}.", getReducerFileGroup, response.status(), shuffleId);
// return empty result
@ -1894,7 +1894,7 @@ public class ShuffleClientImpl extends ShuffleClient {
response.pushFailedBatches()),
null,
null);
case STAGE_END_TIME_OUT:
case STAGE_END_TIMEOUT:
case SHUFFLE_DATA_LOST:
exceptionMsg =
String.format(

View File

@ -858,7 +858,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
logError(s"[handleRevive] shuffle $shuffleId not registered!")
contextWrapper.reply(
-1,
StatusCode.SHUFFLE_NOT_REGISTERED,
StatusCode.SHUFFLE_UNREGISTERED,
None,
false)
return
@ -938,7 +938,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
if (!registeredShuffle.contains(shuffleId) && !isSegmentGranularityVisible) {
logWarning(s"[handleGetReducerFileGroup] shuffle $shuffleId not registered, maybe no shuffle data within this stage.")
context.reply(GetReducerFileGroupResponse(
StatusCode.SHUFFLE_NOT_REGISTERED,
StatusCode.SHUFFLE_UNREGISTERED,
JavaUtils.newConcurrentHashMap(),
Array.empty,
serdeVersion = serdeVersion))

View File

@ -57,7 +57,7 @@ case class ChangeLocationsCallContext(
}
newLocs.put(partitionId, (status, available, partitionLocationOpt.getOrElse(null)))
if (newLocs.size() == partitionCount || StatusCode.SHUFFLE_NOT_REGISTERED == status
if (newLocs.size() == partitionCount || StatusCode.SHUFFLE_UNREGISTERED == status
|| StatusCode.STAGE_ENDED == status) {
context.reply(ChangeLocationResponse(endedMapIds, newLocs))
}

View File

@ -339,7 +339,7 @@ abstract class CommitHandler(
status.future.value.get match {
case scala.util.Success(res) =>
res.status match {
case StatusCode.SUCCESS | StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED | StatusCode.REQUEST_FAILED | StatusCode.WORKER_EXCLUDED | StatusCode.COMMIT_FILE_EXCEPTION =>
case StatusCode.SUCCESS | StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_UNREGISTERED | StatusCode.REQUEST_FAILED | StatusCode.WORKER_EXCLUDED | StatusCode.COMMIT_FILE_EXCEPTION =>
if (res.status == StatusCode.SUCCESS) {
logDebug(s"Request commitFiles return ${res.status} for " +
s"$shuffleKey from worker ${worker.readableAddress()}")

View File

@ -489,7 +489,7 @@ public class ShuffleClientSuiteJ {
.thenAnswer(
t -> {
return GetReducerFileGroupResponse$.MODULE$.apply(
StatusCode.SHUFFLE_NOT_REGISTERED,
StatusCode.SHUFFLE_UNREGISTERED,
locations,
new int[0],
Collections.emptySet(),
@ -502,7 +502,7 @@ public class ShuffleClientSuiteJ {
.thenAnswer(
t -> {
return GetReducerFileGroupResponse$.MODULE$.apply(
StatusCode.SHUFFLE_NOT_REGISTERED,
StatusCode.SHUFFLE_UNREGISTERED,
locations,
new int[0],
Collections.emptySet(),
@ -525,7 +525,7 @@ public class ShuffleClientSuiteJ {
.thenAnswer(
t -> {
return GetReducerFileGroupResponse$.MODULE$.apply(
StatusCode.STAGE_END_TIME_OUT,
StatusCode.STAGE_END_TIMEOUT,
locations,
new int[0],
Collections.emptySet(),
@ -538,7 +538,7 @@ public class ShuffleClientSuiteJ {
.thenAnswer(
t -> {
return GetReducerFileGroupResponse$.MODULE$.apply(
StatusCode.STAGE_END_TIME_OUT,
StatusCode.STAGE_END_TIMEOUT,
locations,
new int[0],
Collections.emptySet(),

View File

@ -104,8 +104,8 @@ public class TransportServer implements Closeable {
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.ALLOCATOR, allocator);
if (conf.backLog() > 0) {
bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
if (conf.backlog() > 0) {
bootstrap.option(ChannelOption.SO_BACKLOG, conf.backlog());
}
if (conf.receiveBuf() > 0) {

View File

@ -70,7 +70,7 @@ public class TransportConf {
}
/** Requested maximum length of the queue of incoming connections. Default 0 for no backlog. */
public int backLog() {
public int backlog() {
return celebornConf.networkIoBacklog(module);
}

View File

@ -31,7 +31,7 @@ public enum StatusCode {
// Specific Status
SHUFFLE_ALREADY_REGISTERED(3),
SHUFFLE_NOT_REGISTERED(4),
SHUFFLE_UNREGISTERED(4),
RESERVE_SLOTS_FAILED(5),
SLOT_NOT_AVAILABLE(6),
WORKER_NOT_FOUND(7),
@ -54,7 +54,7 @@ public enum StatusCode {
HARD_SPLIT(21),
SOFT_SPLIT(22),
STAGE_END_TIME_OUT(23),
STAGE_END_TIMEOUT(23),
SHUFFLE_DATA_LOST(24),
WORKER_SHUTDOWN(25),
NO_AVAILABLE_WORKING_DIR(26),

View File

@ -87,7 +87,7 @@ class WorkerPartitionReader
static constexpr auto kDefaultConsumeIter = std::chrono::milliseconds(500);
// TODO: add other params, such as fetchChunkRetryCnt, fetchChunkMaxRetry
// TODO: add other params, such as fetchChunkRetryCnt, fetchChunkMaxRetries
};
} // namespace client
} // namespace celeborn

View File

@ -162,7 +162,7 @@ private[celeborn] class Master(
logError(msg, ioe)
System.exit(1)
} else {
logError("Face unexpected IO exception during staring Ratis server", ioe)
logError("Face unexpected IO exception during starting Ratis server", ioe)
}
}
sys
@ -174,7 +174,7 @@ private[celeborn] class Master(
// Threads
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-message-forwarder")
private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _
private var checkForWorkerTimeoutTask: ScheduledFuture[_] = _
private var checkForApplicationTimeOutTask: ScheduledFuture[_] = _
private var checkForUnavailableWorkerTimeOutTask: ScheduledFuture[_] = _
private var checkForDFSRemnantDirsTimeOutTask: ScheduledFuture[_] = _
@ -336,7 +336,7 @@ private[celeborn] class Master(
"send-application-meta")
}
checkForWorkerTimeOutTask = scheduleCheckTask(workerHeartbeatTimeoutMs, pbCheckForWorkerTimeout)
checkForWorkerTimeoutTask = scheduleCheckTask(workerHeartbeatTimeoutMs, pbCheckForWorkerTimeout)
checkForApplicationTimeOutTask =
scheduleCheckTask(appHeartbeatTimeoutMs / 2, CheckForApplicationTimeOut)
@ -370,7 +370,7 @@ private[celeborn] class Master(
return
}
logInfo("Stopping Celeborn Master.")
Option(checkForWorkerTimeOutTask).foreach(_.cancel(true))
Option(checkForWorkerTimeoutTask).foreach(_.cancel(true))
Option(checkForUnavailableWorkerTimeOutTask).foreach(_.cancel(true))
Option(checkForApplicationTimeOutTask).foreach(_.cancel(true))
Option(checkForDFSRemnantDirsTimeOutTask).foreach(_.cancel(true))

View File

@ -26,7 +26,7 @@ import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.authentication.HttpAuthSchemes
import org.apache.celeborn.common.network.TestHelper
import org.apache.celeborn.server.common.http.HttpAuthUtils.AUTHORIZATION_HEADER
import org.apache.celeborn.server.common.http.authentication.{UserDefinePasswordAuthenticationProviderImpl, UserDefineTokenAuthenticationProviderImpl}
import org.apache.celeborn.server.common.http.authentication.{UserDefinedPasswordAuthenticationProviderImpl, UserDefineTokenAuthenticationProviderImpl}
abstract class ApiBaseResourceAuthenticationSuite extends HttpTestHelper {
val administers = Seq("celeborn", "celeborn2")
@ -38,14 +38,14 @@ abstract class ApiBaseResourceAuthenticationSuite extends HttpTestHelper {
.set(CelebornConf.MASTER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC", "BEARER"))
.set(
CelebornConf.MASTER_HTTP_AUTH_BASIC_PROVIDER,
classOf[UserDefinePasswordAuthenticationProviderImpl].getName)
classOf[UserDefinedPasswordAuthenticationProviderImpl].getName)
.set(
CelebornConf.MASTER_HTTP_AUTH_BEARER_PROVIDER,
classOf[UserDefineTokenAuthenticationProviderImpl].getName)
.set(CelebornConf.WORKER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC", "BEARER"))
.set(
CelebornConf.WORKER_HTTP_AUTH_BASIC_PROVIDER,
classOf[UserDefinePasswordAuthenticationProviderImpl].getName)
classOf[UserDefinedPasswordAuthenticationProviderImpl].getName)
.set(
CelebornConf.WORKER_HTTP_AUTH_BEARER_PROVIDER,
classOf[UserDefineTokenAuthenticationProviderImpl].getName)
@ -67,7 +67,7 @@ abstract class ApiBaseResourceAuthenticationSuite extends HttpTestHelper {
AUTHORIZATION_HEADER,
basicAuthorizationHeader(
"user",
UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD))
UserDefinedPasswordAuthenticationProviderImpl.VALID_PASSWORD))
.get()
assert(HttpServletResponse.SC_OK == response.getStatus)
@ -126,7 +126,7 @@ abstract class ApiBaseResourceAuthenticationSuite extends HttpTestHelper {
AUTHORIZATION_HEADER,
basicAuthorizationHeader(
"no_admin",
UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD))
UserDefinedPasswordAuthenticationProviderImpl.VALID_PASSWORD))
.post(null)
assert(HttpServletResponse.SC_FORBIDDEN == response.getStatus)
@ -137,7 +137,7 @@ abstract class ApiBaseResourceAuthenticationSuite extends HttpTestHelper {
AUTHORIZATION_HEADER,
basicAuthorizationHeader(
admin,
UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD))
UserDefinedPasswordAuthenticationProviderImpl.VALID_PASSWORD))
.post(null)
// pass the admin privilege check, but the api is not found
assert(HttpServletResponse.SC_NOT_FOUND == response.getStatus)

View File

@ -21,10 +21,10 @@ import java.security.Principal
import javax.security.sasl.AuthenticationException
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.server.common.http.authentication.UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD
import org.apache.celeborn.server.common.http.authentication.UserDefinedPasswordAuthenticationProviderImpl.VALID_PASSWORD
import org.apache.celeborn.spi.authentication.{BasicPrincipal, Credential, PasswdAuthenticationProvider, PasswordCredential}
class UserDefinePasswordAuthenticationProviderImpl
class UserDefinedPasswordAuthenticationProviderImpl
extends PasswdAuthenticationProvider with Logging {
override def authenticate(credential: PasswordCredential): Principal = {
val clientIp = credential.extraInfo.get(Credential.CLIENT_IP_KEY)
@ -37,6 +37,6 @@ class UserDefinePasswordAuthenticationProviderImpl
}
}
object UserDefinePasswordAuthenticationProviderImpl {
object UserDefinedPasswordAuthenticationProviderImpl {
val VALID_PASSWORD = "password"
}

View File

@ -30,7 +30,7 @@ import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// Assume that max-managed memory for a MapDataPartition is (2^31 * buffersize)
// Assume that max-managed memory for a MapPartitionData is (2^31 * buffersize)
public class BufferQueue {
public static final Logger logger = LoggerFactory.getLogger(BufferQueue.class);

View File

@ -209,10 +209,10 @@ public class MemoryManager {
() -> {
try {
if (creditStreamManager != null) {
int mapDataPartitionCount = creditStreamManager.getActiveMapPartitionCount();
if (mapDataPartitionCount > 0) {
int mapPartitionDataCount = creditStreamManager.getActiveMapPartitionCount();
if (mapPartitionDataCount > 0) {
long currentTarget =
(long) Math.ceil(readBufferTarget * 1.0 / mapDataPartitionCount);
(long) Math.ceil(readBufferTarget * 1.0 / mapPartitionDataCount);
if (Math.abs(lastNotifiedTarget - currentTarget)
> readBufferTargetNotifyThreshold) {
synchronized (readBufferTargetChangeListeners) {
@ -220,7 +220,7 @@ public class MemoryManager {
"read buffer target changed {} -> {} active map partition count {}",
lastNotifiedTarget,
currentTarget,
mapDataPartitionCount);
mapPartitionDataCount);
for (ReadBufferTargetChangeListener changeListener :
readBufferTargetChangeListeners) {
changeListener.onChange(currentTarget);

View File

@ -200,7 +200,7 @@ public class CreditStreamManager {
logger.warn("Ignore AddCredit from stream {}, numCredit {}.", streamId, numCredit);
return;
}
MapPartitionData mapPartitionData = streams.get(streamId).getMapDataPartition();
MapPartitionData mapPartitionData = streams.get(streamId).getMapPartitionData();
addCredit(mapPartitionData, numCredit, streamId);
}
@ -208,7 +208,7 @@ public class CreditStreamManager {
StreamState streamState = streams.get(streamId);
if (streamState != null) {
notifyRequiredSegment(
streamState.getMapDataPartition(), requiredSegmentId, streamId, subPartitionId);
streamState.getMapPartitionData(), requiredSegmentId, streamId, subPartitionId);
} else {
// In flink hybrid shuffle integration strategy, the stream may release in worker before
// client receive bufferStreamEnd,
@ -279,7 +279,7 @@ public class CreditStreamManager {
public void cleanResource(Long streamId) {
logger.debug("received clean stream: {}", streamId);
if (streams.containsKey(streamId)) {
MapPartitionData mapPartitionData = streams.get(streamId).getMapDataPartition();
MapPartitionData mapPartitionData = streams.get(streamId).getMapPartitionData();
if (mapPartitionData != null) {
if (mapPartitionData.releaseReader(streamId)) {
streams.remove(streamId);
@ -340,7 +340,7 @@ public class CreditStreamManager {
return bufferSize;
}
public MapPartitionData getMapDataPartition() {
public MapPartitionData getMapPartitionData() {
return mapPartitionData;
}
}

View File

@ -260,7 +260,7 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis
@Override
public String toString() {
return "MapDataPartition{" + "fileInfo=" + diskFileInfo.getFilePath() + '}';
return "MapPartitionData{" + "fileInfo=" + diskFileInfo.getFilePath() + '}';
}
public ConcurrentHashMap<Long, MapPartitionDataReader> getReaders() {

View File

@ -57,7 +57,7 @@ public class SegmentMapPartitionData extends MapPartitionData {
@Override
public void setupDataPartitionReader(
int startSubIndex, int endSubIndex, long streamId, Channel channel) {
SegmentMapPartitionDataReader mapDataPartitionReader =
SegmentMapPartitionDataReader mapPartitionDataReader =
new SegmentMapPartitionDataReader(
startSubIndex,
endSubIndex,
@ -70,7 +70,7 @@ public class SegmentMapPartitionData extends MapPartitionData {
startSubIndex,
endSubIndex,
streamId);
readers.put(streamId, mapDataPartitionReader);
readers.put(streamId, mapPartitionDataReader);
}
@Override
@ -85,7 +85,7 @@ public class SegmentMapPartitionData extends MapPartitionData {
@Override
public String toString() {
return String.format("SegmentMapDataPartition{filePath=%s}", diskFileInfo.getFilePath());
return String.format("SegmentMapPartitionData{filePath=%s}", diskFileInfo.getFilePath());
}
public void notifyRequiredSegmentId(int segmentId, long streamId, int subPartitionId) {

View File

@ -211,7 +211,7 @@ public class SegmentMapPartitionDataReader extends MapPartitionDataReader {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("SegmentMapDataPartitionReader{");
final StringBuilder sb = new StringBuilder("SegmentMapPartitionDataReader{");
sb.append("startPartitionIndex=").append(startPartitionIndex);
sb.append(", endPartitionIndex=").append(endPartitionIndex);
sb.append(", streamId=").append(streamId);

View File

@ -426,7 +426,7 @@ private[deploy] class Controller(
logError(s"Shuffle $shuffleKey doesn't exist!")
context.reply(
CommitFilesResponse(
StatusCode.SHUFFLE_NOT_REGISTERED,
StatusCode.SHUFFLE_UNREGISTERED,
List.empty.asJava,
List.empty.asJava,
primaryIds,
@ -681,7 +681,7 @@ private[deploy] class Controller(
logWarning(s"Shuffle $shuffleKey not registered!")
context.reply(
DestroyWorkerSlotsResponse(
StatusCode.SHUFFLE_NOT_REGISTERED,
StatusCode.SHUFFLE_UNREGISTERED,
primaryLocations,
replicaLocations))
return

View File

@ -102,9 +102,9 @@ public class CreditStreamManagerSuiteJ {
streamIdConsumer, channel, shuffleKey, 0, 1, 1, diskFileInfo);
MapPartitionData mapPartitionData1 =
creditStreamManager.getStreams().get(registerStream1).getMapDataPartition();
creditStreamManager.getStreams().get(registerStream1).getMapPartitionData();
MapPartitionData mapPartitionData2 =
creditStreamManager.getStreams().get(registerStream2).getMapDataPartition();
creditStreamManager.getStreams().get(registerStream2).getMapPartitionData();
Assert.assertEquals(mapPartitionData1, mapPartitionData2);
mapPartitionData1.getStreamReader(registerStream1).recycle();