diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java index 19f979992..7318d6bb1 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java @@ -55,7 +55,7 @@ public interface SortBuffer { /** Returns true if there is still data can be consumed in this {@link SortBuffer}. */ boolean hasRemaining(); - /** Finishes this {@link SortBuffer} which means no record can be appended any more. */ + /** Finishes this {@link SortBuffer} which means no record can be appended anymore. */ void finish(); /** Whether this {@link SortBuffer} is finished or not. */ diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java index 0684c72f8..dee8d897a 100644 --- a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java +++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java @@ -96,7 +96,7 @@ public class HashBasedShuffleWriter extends ShuffleWriter { /** * Are we in the process of stopping? Because map tasks can call stop() with success = true and * then call stop() with success = false if they get an exception, we want to make sure we don't - * try deleting files, etc twice. + * try deleting files, etc. twice. */ private volatile boolean stopping = false; @@ -393,9 +393,9 @@ public class HashBasedShuffleWriter extends ShuffleWriter { private void closeColumnarWrite() throws IOException { SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) dep.serializer()); for (int i = 0; i < numPartitions; i++) { - final CelebornBatchBuilder buidlers = celebornBatchBuilders[i]; - if (buidlers != null && buidlers.getRowCnt() > 0) { - byte[] buffers = buidlers.buildColumnBytes(); + final CelebornBatchBuilder builders = celebornBatchBuilders[i]; + if (builders != null && builders.getRowCnt() > 0) { + byte[] buffers = builders.buildColumnBytes(); if (dataSize != null) { dataSize.add(buffers.length); } diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java index 3766c623d..98c694306 100644 --- a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java +++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java @@ -85,7 +85,7 @@ public class SortBasedShuffleWriter extends ShuffleWriter { /** * Are we in the process of stopping? Because map tasks can call stop() with success = true and * then call stop() with success = false if they get an exception, we want to make sure we don't - * try deleting files, etc twice. + * try deleting files, etc. twice. */ private volatile boolean stopping = false; diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index dada674cc..00e95898e 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -605,7 +605,7 @@ public class ShuffleClientImpl extends ShuffleClient { * @param shuffleMap * @param partitionId * @param epoch - * @param wait wheter to wait for some time for a newer PartitionLocation + * @param wait whether to wait for some time for a newer PartitionLocation * @return */ boolean newerPartitionLocationExists( diff --git a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala index 316e9ba70..f211f9ba6 100644 --- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala @@ -54,7 +54,7 @@ case class ShuffleCommittedInfo( committedPrimaryStorageInfos: ConcurrentHashMap[String, StorageInfo], // unique partition id -> storage info committedReplicaStorageInfos: ConcurrentHashMap[String, StorageInfo], - // unique partition id -> mapid bitmat + // unique partition id -> mapId bitmap committedMapIdBitmap: ConcurrentHashMap[String, RoaringBitmap], // number of partition files currentShuffleFileCount: LongAdder, diff --git a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java index 067d1ddcc..8ad3f267a 100644 --- a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java +++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java @@ -139,7 +139,7 @@ public class MasterClient { LOG.debug("Send rpc message " + message); RpcEndpointRef endpointRef = null; // Use AtomicInteger or Integer or any Object which holds an int value is ok, we just need to - // transfer a object to get the change of the current index of master addresses. + // transfer an object to get the change of the current index of master addresses. AtomicInteger currentMasterIdx = new AtomicInteger(0); long sleepLimitTime = 2000; // 2s diff --git a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java index c10b4e65b..761304377 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java +++ b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java @@ -40,9 +40,9 @@ import org.apache.celeborn.common.network.util.TransportFrameDecoder; * setup Netty Channel pipelines with a {@link TransportChannelHandler}. * *

There are two communication protocols that the TransportClient provides, control-plane RPCs - * and data-plane "chunk fetching". The handling of the RPCs is performed outside of the scope of - * the TransportContext (i.e., by a user-provided handler), and it is responsible for setting up - * streams which can be streamed through the data plane in chunks using zero-copy IO. + * and data-plane "chunk fetching". The handling of the RPCs is performed outside the scope of the + * TransportContext (i.e., by a user-provided handler), and it is responsible for setting up streams + * which can be streamed through the data plane in chunks using zero-copy IO. * *

The TransportServer and TransportClientFactory both create a TransportChannelHandler for each * channel. As each TransportChannelHandler contains a TransportClient, this enables server diff --git a/common/src/main/java/org/apache/celeborn/common/network/buffer/ManagedBuffer.java b/common/src/main/java/org/apache/celeborn/common/network/buffer/ManagedBuffer.java index 64d5d1a81..ce320d9d7 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/buffer/ManagedBuffer.java +++ b/common/src/main/java/org/apache/celeborn/common/network/buffer/ManagedBuffer.java @@ -64,7 +64,7 @@ public abstract class ManagedBuffer { public abstract ManagedBuffer release(); /** - * Convert the buffer into an Netty object, used to write the data out. The return value is either + * Convert the buffer into a Netty object, used to write the data out. The return value is either * a {@link io.netty.buffer.ByteBuf} or a {@link io.netty.channel.FileRegion}. * *

If this method returns a ByteBuf, then that buffer's reference count will be incremented and diff --git a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java index bd01560c0..e4b12fb96 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java +++ b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java @@ -120,7 +120,7 @@ public class TransportClientFactory implements Closeable { throws IOException, InterruptedException { // Get connection from the connection pool first. // If it is not found or not active, create a new one. - // Use unresolved address here to avoid DNS resolution each time we creates a client. + // Use unresolved address here to avoid DNS resolution each time we create a client. final InetSocketAddress unresolvedAddress = InetSocketAddress.createUnresolved(remoteHost, remotePort); diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchSuccess.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchSuccess.java index 21a3581f8..baa663ce6 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchSuccess.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchSuccess.java @@ -26,8 +26,8 @@ import org.apache.celeborn.common.network.buffer.NettyManagedBuffer; /** * Response to {@link ChunkFetchRequest} when a chunk exists and has been successfully fetched. * - *

Note that the server-side encoding of this messages does NOT include the buffer itself, as - * this may be written by Netty in a more efficient manner (i.e., zero-copy write). Similarly, the + *

Note that the server-side encoding of this message does NOT include the buffer itself, as this + * may be written by Netty in a more efficient manner (i.e., zero-copy write). Similarly, the * client-side decoding will reuse the Netty ByteBuf as the buffer. */ public final class ChunkFetchSuccess extends ResponseMessage { diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/MessageWithHeader.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/MessageWithHeader.java index 3476d2714..21f11d49f 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/MessageWithHeader.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/MessageWithHeader.java @@ -138,7 +138,7 @@ class MessageWithHeader extends AbstractFileRegion { // SPARK-24578: cap the sub-region's size of returned nio buffer to improve the performance // for the case that the passed-in buffer has too many components. int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT); - // If the ByteBuf holds more then one ByteBuffer we should better call nioBuffers(...) + // If the ByteBuf holds more than one ByteBuffer we should better call nioBuffers(...) // to eliminate extra memory copies. int written = 0; if (buf.nioBufferCount() == 1) { diff --git a/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java b/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java index b7c119afd..f0eae1a7e 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java +++ b/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java @@ -175,7 +175,7 @@ public class TransportChannelHandler extends ChannelInboundHandlerAdapter { requestTimeoutNs / 1000 / 1000); } if (closeIdleConnections) { - // While CloseIdleConnections is enable, we also close idle connection + // While CloseIdleConnections is enabled, we also close idle connection client.timeOut(); ctx.close(); } diff --git a/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java b/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java index 0afb63906..be535cafe 100644 --- a/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java +++ b/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java @@ -296,7 +296,7 @@ public final class ShutdownHookManager { * Indicates if a shutdownHook is registered or not. * * @param shutdownHook shutdownHook to check if registered. - * @return TRUE/FALSE depending if the shutdownHook is is registered. + * @return TRUE/FALSE depending if the shutdownHook is registered. */ public boolean hasShutdownHook(Runnable shutdownHook) { return hooks.contains(new HookEntry(shutdownHook, 0)); diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 9aa927138..693250515 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -600,7 +600,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se get(HA_MASTER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT) def haMasterRatisLogPurgeGap: Int = get(HA_MASTER_RATIS_LOG_PURGE_GAP) def haMasterRatisLogInstallSnapshotEnabled: Boolean = - get(HA_MASTER_RATIS_LOG_INSTABLL_SNAPSHOT_ENABLED) + get(HA_MASTER_RATIS_LOG_INSTALL_SNAPSHOT_ENABLED) def haMasterRatisRpcRequestTimeout: Long = get(HA_MASTER_RATIS_RPC_REQUEST_TIMEOUT) def haMasterRatisRetryCacheExpiryTime: Long = get(HA_MASTER_RATIS_SERVER_RETRY_CACHE_EXPIRY_TIME) def haMasterRatisRpcTimeoutMin: Long = get(HA_MASTER_RATIS_RPC_TIMEOUT_MIN) @@ -1672,7 +1672,7 @@ object CelebornConf extends Logging { .bytesConf(ByteUnit.BYTE) .createWithDefaultString("32MB") - val HA_MASTER_RATIS_LOG_INSTABLL_SNAPSHOT_ENABLED: ConfigEntry[Boolean] = + val HA_MASTER_RATIS_LOG_INSTALL_SNAPSHOT_ENABLED: ConfigEntry[Boolean] = buildConf("celeborn.master.ha.ratis.raft.server.log.appender.install.snapshot.enabled") .withAlternative("celeborn.ha.master.ratis.raft.server.log.appender.install.snapshot.enabled") .internal @@ -3191,7 +3191,7 @@ object CelebornConf extends Logging { .withAlternative("celeborn.push.stageEnd.timeout") .categories("client") .doc(s"Timeout for waiting StageEnd. " + - s"During this process, there are `${CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY.key}` times for retry opportunities for committing files" + + s"During this process, there are `${CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY.key}` times for retry opportunities for committing files " + s"and 1 times for releasing slots request. User can customize this value according to your setting. " + s"By default, the value is the max timeout value `${NETWORK_IO_CONNECTION_TIMEOUT.key}`.") .version("0.3.0") @@ -3230,7 +3230,7 @@ object CelebornConf extends Logging { buildConf("celeborn.client.rpc.requestPartition.askTimeout") .categories("client") .version("0.2.0") - .doc(s"Timeout for ask operations during requesting change partition location, such as reviving or spliting partition. " + + .doc(s"Timeout for ask operations during requesting change partition location, such as reviving or splitting partition. " + s"During this process, there are `${CLIENT_RESERVE_SLOTS_MAX_RETRIES.key}` times for retry opportunities for reserving slots. " + s"User can customize this value according to your setting. " + s"By default, the value is the max timeout value `${NETWORK_IO_CONNECTION_TIMEOUT.key}`.") @@ -3241,7 +3241,7 @@ object CelebornConf extends Logging { .categories("client") .version("0.2.0") .doc(s"Timeout for ask operations during getting reducer file group information. " + - s"During this process, there are `${CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY.key}` times for retry opportunities for committing files" + + s"During this process, there are `${CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY.key}` times for retry opportunities for committing files " + s"and 1 times for releasing slots request. User can customize this value according to your setting. " + s"By default, the value is the max timeout value `${NETWORK_IO_CONNECTION_TIMEOUT.key}`.") .fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT) @@ -3502,7 +3502,7 @@ object CelebornConf extends Logging { s"`${classOf[DefaultIdentityProvider].getName}`. " + s"Optional values: " + s"org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will be obtained by UserGroupInformation.getUserName; " + - s"org.apache.celeborn.common.identity.DefaultIdentityProvider uesr name and tenant id are default values or user-specific values.") + s"org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values.") .version("0.2.0") .stringConf .createWithDefault(classOf[DefaultIdentityProvider].getName) diff --git a/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java b/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java index ca87af68e..927b3e9c1 100644 --- a/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java +++ b/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java @@ -51,7 +51,7 @@ public class PartitionLocationSuiteJ { for (int i = 2; i < 255; ++i) { byte otherMode = (byte) i; - // Should we return replica mode when the parameter passed in is neither 0 or 1? + // Should we return replica mode when the parameter passed in is neither 0 nor 1? assert PartitionLocation.getMode(otherMode) == PartitionLocation.Mode.REPLICA; } } diff --git a/docs/celeborn_ratis_shell.md b/docs/celeborn_ratis_shell.md index 841560849..b7086bb90 100644 --- a/docs/celeborn_ratis_shell.md +++ b/docs/celeborn_ratis_shell.md @@ -35,8 +35,8 @@ then it's convenient for Celeborn Admin to operate the master ratis service. Celeborn directly introduces the ratis-shell into the project, users don't need to set up ratis-shell env from ratis repo. User can directly download the Celeborn source tarball from [Download](https://celeborn.apache.org/download) and -build the Celeborn accoriding to [build_and_test](https://celeborn.apache.org/community/contributor_guide/build_and_test/) -or just down load the pre-built binary tarball from [Download](https://celeborn.apache.org/download) +build the Celeborn according to [build_and_test](https://celeborn.apache.org/community/contributor_guide/build_and_test/) +or just download the pre-built binary tarball from [Download](https://celeborn.apache.org/download) to get the binary package `apache-celeborn--bin.tgz`. After getting the binary package `apache-celeborn--bin.tgz`: @@ -64,7 +64,7 @@ Usage: celeborn-ratis sh [generic options] ## generic options The `generic options` pass values for a given ratis-shell property. -It support the following content: +It supports the following content: `-D*`, `-X*`, `-agentlib*`, `-javaagent*` ``` diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 0fac3020a..91a146f01 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -55,7 +55,7 @@ license: | | celeborn.client.push.slowStart.maxSleepTime | 2s | If celeborn.client.push.limit.strategy is set to SLOWSTART, push side will take a sleep strategy for each batch of requests, this controls the max sleep time if the max in flight requests limit is 1 for a long time | 0.3.0 | | celeborn.client.push.sort.randomizePartitionId.enabled | false | Whether to randomize partitionId in push sorter. If true, partitionId will be randomized when sort data to avoid skew when push to worker | 0.3.0 | | celeborn.client.push.splitPartition.threads | 8 | Thread number to process shuffle split request in shuffle client. | 0.3.0 | -| celeborn.client.push.stageEnd.timeout | <value of celeborn.<module>.io.connectionTimeout> | Timeout for waiting StageEnd. During this process, there are `celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities for committing filesand 1 times for releasing slots request. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn..io.connectionTimeout`. | 0.3.0 | +| celeborn.client.push.stageEnd.timeout | <value of celeborn.<module>.io.connectionTimeout> | Timeout for waiting StageEnd. During this process, there are `celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities for committing files and 1 times for releasing slots request. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn..io.connectionTimeout`. | 0.3.0 | | celeborn.client.push.takeTaskMaxWaitAttempts | 1 | Max wait times if no task available to push to worker. | 0.3.0 | | celeborn.client.push.takeTaskWaitInterval | 50ms | Wait interval if no task available to push to worker. | 0.3.0 | | celeborn.client.push.timeout | 120s | Timeout for a task to push data rpc message. This value should better be more than twice of `celeborn..push.timeoutCheck.interval` | 0.3.0 | @@ -68,10 +68,10 @@ license: | | celeborn.client.rpc.cache.concurrencyLevel | 32 | The number of write locks to update rpc cache. | 0.3.0 | | celeborn.client.rpc.cache.expireTime | 15s | The time before a cache item is removed. | 0.3.0 | | celeborn.client.rpc.cache.size | 256 | The max cache items count for rpc cache. | 0.3.0 | -| celeborn.client.rpc.getReducerFileGroup.askTimeout | <value of celeborn.<module>.io.connectionTimeout> | Timeout for ask operations during getting reducer file group information. During this process, there are `celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities for committing filesand 1 times for releasing slots request. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn..io.connectionTimeout`. | 0.2.0 | +| celeborn.client.rpc.getReducerFileGroup.askTimeout | <value of celeborn.<module>.io.connectionTimeout> | Timeout for ask operations during getting reducer file group information. During this process, there are `celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities for committing files and 1 times for releasing slots request. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn..io.connectionTimeout`. | 0.2.0 | | celeborn.client.rpc.maxParallelism | 1024 | Max parallelism of client on sending RPC requests. | 0.3.0 | | celeborn.client.rpc.registerShuffle.askTimeout | <value of celeborn.<module>.io.connectionTimeout> | Timeout for ask operations during register shuffle. During this process, there are two times for retry opportunities for requesting slots, one request for establishing a connection with Worker and `celeborn.client.reserveSlots.maxRetries` times for retry opportunities for reserving slots. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn..io.connectionTimeout`. | 0.3.0 | -| celeborn.client.rpc.requestPartition.askTimeout | <value of celeborn.<module>.io.connectionTimeout> | Timeout for ask operations during requesting change partition location, such as reviving or spliting partition. During this process, there are `celeborn.client.reserveSlots.maxRetries` times for retry opportunities for reserving slots. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn..io.connectionTimeout`. | 0.2.0 | +| celeborn.client.rpc.requestPartition.askTimeout | <value of celeborn.<module>.io.connectionTimeout> | Timeout for ask operations during requesting change partition location, such as reviving or splitting partition. During this process, there are `celeborn.client.reserveSlots.maxRetries` times for retry opportunities for reserving slots. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn..io.connectionTimeout`. | 0.2.0 | | celeborn.client.rpc.reserveSlots.askTimeout | <value of celeborn.rpc.askTimeout> | Timeout for LifecycleManager request reserve slots. | 0.3.0 | | celeborn.client.shuffle.batchHandleChangePartition.interval | 100ms | Interval for LifecycleManager to schedule handling change partition requests in batch. | 0.3.0 | | celeborn.client.shuffle.batchHandleChangePartition.threads | 8 | Threads number for LifecycleManager to handle change partition request in batch. | 0.3.0 | diff --git a/docs/configuration/index.md b/docs/configuration/index.md index d1a64cadc..2ee39c40a 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -36,7 +36,7 @@ bufferSize = `celeborn.worker.flusher.buffer.size` # the amount of memory will off-heap-memory = bufferSize * estimatedTasks * 2 + network memory ``` -For example, if an Celeborn worker has 10 storage directories or disks and the buffer size is set to 256 KiB. +For example, if a Celeborn worker has 10 storage directories or disks and the buffer size is set to 256 KiB. The necessary off-heap memory is 10 GiB. Network memory will be consumed when netty reads from a TPC channel, there will need some extra diff --git a/docs/configuration/quota.md b/docs/configuration/quota.md index 9892db75c..5537de934 100644 --- a/docs/configuration/quota.md +++ b/docs/configuration/quota.md @@ -21,7 +21,7 @@ license: | | --- | ------- | ----------- | ----- | | celeborn.quota.configuration.path | <undefined> | Quota configuration file path. The file format should be yaml. Quota configuration file template can be found under conf directory. | 0.2.0 | | celeborn.quota.enabled | true | When true, before registering shuffle, LifecycleManager should check if current user have enough quota space, if cluster don't have enough quota space for current user, fallback to Spark's default shuffle | 0.2.0 | -| celeborn.quota.identity.provider | org.apache.celeborn.common.identity.DefaultIdentityProvider | IdentityProvider class name. Default class is `org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values: org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will be obtained by UserGroupInformation.getUserName; org.apache.celeborn.common.identity.DefaultIdentityProvider uesr name and tenant id are default values or user-specific values. | 0.2.0 | +| celeborn.quota.identity.provider | org.apache.celeborn.common.identity.DefaultIdentityProvider | IdentityProvider class name. Default class is `org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values: org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will be obtained by UserGroupInformation.getUserName; org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values. | 0.2.0 | | celeborn.quota.identity.user-specific.tenant | default | Tenant id if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | | celeborn.quota.identity.user-specific.userName | default | User name if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | | celeborn.quota.manager | org.apache.celeborn.common.quota.DefaultQuotaManager | QuotaManger class name. Default class is `org.apache.celeborn.common.quota.DefaultQuotaManager`. | 0.2.0 | diff --git a/docs/monitoring.md b/docs/monitoring.md index 92db7c922..b2ff46162 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -115,7 +115,7 @@ These metrics are exposed by Celeborn master. - namespace=ResourceConsumption - **notes:** - - This merics data is generated for each user and they are identified using a metric tag. + - This metrics data is generated for each user and they are identified using a metric tag. - diskFileCount - diskBytesWritten - hdfsFileCount diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java index f20cfff16..73149bc01 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java @@ -94,7 +94,7 @@ public class HARaftServer { private long appTimeoutDeadline; /** - * Returns an Master Ratis server. + * Returns a Master Ratis server. * * @param conf configuration * @param localRaftPeerId raft peer id of this Ratis server diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java index 7a4a2f1ab..5b3582f61 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java @@ -328,7 +328,7 @@ public class StateMachine extends BaseStateMachine { } /** - * Store the current state as an snapshot file in the stateMachineStorage. + * Store the current state as a snapshot file in the stateMachineStorage. * * @return the index of the snapshot */ diff --git a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala index 3549e72d5..625debb4c 100644 --- a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala +++ b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala @@ -29,7 +29,7 @@ import org.apache.celeborn.service.deploy.worker.memory.MemoryManager; class HeartbeatTest extends AnyFunSuite with Logging with MiniClusterFeature with HeartbeatFeature with BeforeAndAfterAll with BeforeAndAfterEach { - test("celeborn flink hearbeat test - client <- worker") { + test("celeborn flink heartbeat test - client <- worker") { val (_, clientConf) = getTestHeartbeatFromWorker2ClientConf val flinkShuffleClientImpl = new FlinkShuffleClientImpl( @@ -44,7 +44,7 @@ class HeartbeatTest extends AnyFunSuite with Logging with MiniClusterFeature wit testHeartbeatFromWorker2Client(flinkShuffleClientImpl.getDataClientFactory) } - test("celeborn flink hearbeat test - client <- worker no heartbeat") { + test("celeborn flink heartbeat test - client <- worker no heartbeat") { val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf val flinkShuffleClientImpl = new FlinkShuffleClientImpl( @@ -59,7 +59,7 @@ class HeartbeatTest extends AnyFunSuite with Logging with MiniClusterFeature wit testHeartbeatFromWorker2ClientWithNoHeartbeat(flinkShuffleClientImpl.getDataClientFactory) } - test("celeborn flink hearbeat test - client <- worker timeout") { + test("celeborn flink heartbeat test - client <- worker timeout") { val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithCloseChannelConf val flinkShuffleClientImpl = new FlinkShuffleClientImpl( diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java index abfd2cb39..d9fbdb5a3 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java @@ -254,7 +254,7 @@ public class MapDataPartitionReader implements Comparable buffer.capacity()) { diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index fe125cd41..bb54a2f8c 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -175,9 +175,9 @@ class PushDataHandler extends BaseMessageHandler with Logging { if (location == null) { val (mapId, attemptId) = getMapAttempt(body) // MapperAttempts for a shuffle exists after any CommitFiles request succeeds. - // A shuffle can trigger multiple CommitFiles requests, for reasons like: Hard-Split happens, StageEnd. + // A shuffle can trigger multiple CommitFiles requests, for reasons like: HARD_SPLIT happens, StageEnd. // If MapperAttempts but the value is -1 for the mapId(-1 means the map has not yet finished), - // it's probably because commitFiles for Had-Split happens. + // it's probably because commitFiles for HARD_SPLIT happens. if (shuffleMapperAttempts.containsKey(shuffleKey)) { if (-1 != shuffleMapperAttempts.get(shuffleKey).get(mapId)) { // partition data has already been committed @@ -195,7 +195,7 @@ class PushDataHandler extends BaseMessageHandler with Logging { if (storageManager.shuffleKeySet().contains(shuffleKey)) { // If there is no shuffle key in shuffleMapperAttempts but there is shuffle key // in StorageManager. This partition should be HARD_SPLIT partition and - // after worker restart, some task still push data to this HARD_SPLIT partition. + // after worker restart, some tasks still push data to this HARD_SPLIT partition. logInfo(s"[Case2] Receive push data for committed hard split partition of " + s"(shuffle $shuffleKey, map $mapId attempt $attemptId)") callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue))) @@ -437,9 +437,9 @@ class PushDataHandler extends BaseMessageHandler with Logging { if (loc == null) { val (mapId, attemptId) = getMapAttempt(body) // MapperAttempts for a shuffle exists after any CommitFiles request succeeds. - // A shuffle can trigger multiple CommitFiles requests, for reasons like: Hard-Split happens, StageEnd. + // A shuffle can trigger multiple CommitFiles requests, for reasons like: HARD_SPLIT happens, StageEnd. // If MapperAttempts but the value is -1 for the mapId(-1 means the map has not yet finished), - // it's probably because commitFiles for Had-Split happens. + // it's probably because commitFiles for HARD_SPLIT happens. if (shuffleMapperAttempts.containsKey(shuffleKey)) { if (-1 != shuffleMapperAttempts.get(shuffleKey).get(mapId)) { logInfo(s"Receive push merged data from speculative " + @@ -457,7 +457,7 @@ class PushDataHandler extends BaseMessageHandler with Logging { if (storageManager.shuffleKeySet().contains(shuffleKey)) { // If there is no shuffle key in shuffleMapperAttempts but there is shuffle key // in StorageManager. This partition should be HARD_SPLIT partition and - // after worker restart, some task still push data to this HARD_SPLIT partition. + // after worker restart, some tasks still push data to this HARD_SPLIT partition. logInfo(s"Receive push merged data for committed hard split partition of " + s"(shuffle $shuffleKey, map $mapId attempt $attemptId)") callbackWithTimer.onSuccess(