[CELEBORN-810] Fix some typos and grammar
### What changes were proposed in this pull request? Fix some typos and grammar ### Why are the changes needed? Ditto ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? manually test Closes #1733 from onebox-li/fix-typo. Authored-by: onebox-li <lyh-36@163.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
This commit is contained in:
parent
c8ad39d9bd
commit
405b2801fa
@ -55,7 +55,7 @@ public interface SortBuffer {
|
||||
/** Returns true if there is still data can be consumed in this {@link SortBuffer}. */
|
||||
boolean hasRemaining();
|
||||
|
||||
/** Finishes this {@link SortBuffer} which means no record can be appended any more. */
|
||||
/** Finishes this {@link SortBuffer} which means no record can be appended anymore. */
|
||||
void finish();
|
||||
|
||||
/** Whether this {@link SortBuffer} is finished or not. */
|
||||
|
||||
@ -96,7 +96,7 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
|
||||
/**
|
||||
* Are we in the process of stopping? Because map tasks can call stop() with success = true and
|
||||
* then call stop() with success = false if they get an exception, we want to make sure we don't
|
||||
* try deleting files, etc twice.
|
||||
* try deleting files, etc. twice.
|
||||
*/
|
||||
private volatile boolean stopping = false;
|
||||
|
||||
@ -393,9 +393,9 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
|
||||
private void closeColumnarWrite() throws IOException {
|
||||
SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) dep.serializer());
|
||||
for (int i = 0; i < numPartitions; i++) {
|
||||
final CelebornBatchBuilder buidlers = celebornBatchBuilders[i];
|
||||
if (buidlers != null && buidlers.getRowCnt() > 0) {
|
||||
byte[] buffers = buidlers.buildColumnBytes();
|
||||
final CelebornBatchBuilder builders = celebornBatchBuilders[i];
|
||||
if (builders != null && builders.getRowCnt() > 0) {
|
||||
byte[] buffers = builders.buildColumnBytes();
|
||||
if (dataSize != null) {
|
||||
dataSize.add(buffers.length);
|
||||
}
|
||||
|
||||
@ -85,7 +85,7 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
|
||||
/**
|
||||
* Are we in the process of stopping? Because map tasks can call stop() with success = true and
|
||||
* then call stop() with success = false if they get an exception, we want to make sure we don't
|
||||
* try deleting files, etc twice.
|
||||
* try deleting files, etc. twice.
|
||||
*/
|
||||
private volatile boolean stopping = false;
|
||||
|
||||
|
||||
@ -605,7 +605,7 @@ public class ShuffleClientImpl extends ShuffleClient {
|
||||
* @param shuffleMap
|
||||
* @param partitionId
|
||||
* @param epoch
|
||||
* @param wait wheter to wait for some time for a newer PartitionLocation
|
||||
* @param wait whether to wait for some time for a newer PartitionLocation
|
||||
* @return
|
||||
*/
|
||||
boolean newerPartitionLocationExists(
|
||||
|
||||
@ -54,7 +54,7 @@ case class ShuffleCommittedInfo(
|
||||
committedPrimaryStorageInfos: ConcurrentHashMap[String, StorageInfo],
|
||||
// unique partition id -> storage info
|
||||
committedReplicaStorageInfos: ConcurrentHashMap[String, StorageInfo],
|
||||
// unique partition id -> mapid bitmat
|
||||
// unique partition id -> mapId bitmap
|
||||
committedMapIdBitmap: ConcurrentHashMap[String, RoaringBitmap],
|
||||
// number of partition files
|
||||
currentShuffleFileCount: LongAdder,
|
||||
|
||||
@ -139,7 +139,7 @@ public class MasterClient {
|
||||
LOG.debug("Send rpc message " + message);
|
||||
RpcEndpointRef endpointRef = null;
|
||||
// Use AtomicInteger or Integer or any Object which holds an int value is ok, we just need to
|
||||
// transfer a object to get the change of the current index of master addresses.
|
||||
// transfer an object to get the change of the current index of master addresses.
|
||||
AtomicInteger currentMasterIdx = new AtomicInteger(0);
|
||||
|
||||
long sleepLimitTime = 2000; // 2s
|
||||
|
||||
@ -40,9 +40,9 @@ import org.apache.celeborn.common.network.util.TransportFrameDecoder;
|
||||
* setup Netty Channel pipelines with a {@link TransportChannelHandler}.
|
||||
*
|
||||
* <p>There are two communication protocols that the TransportClient provides, control-plane RPCs
|
||||
* and data-plane "chunk fetching". The handling of the RPCs is performed outside of the scope of
|
||||
* the TransportContext (i.e., by a user-provided handler), and it is responsible for setting up
|
||||
* streams which can be streamed through the data plane in chunks using zero-copy IO.
|
||||
* and data-plane "chunk fetching". The handling of the RPCs is performed outside the scope of the
|
||||
* TransportContext (i.e., by a user-provided handler), and it is responsible for setting up streams
|
||||
* which can be streamed through the data plane in chunks using zero-copy IO.
|
||||
*
|
||||
* <p>The TransportServer and TransportClientFactory both create a TransportChannelHandler for each
|
||||
* channel. As each TransportChannelHandler contains a TransportClient, this enables server
|
||||
|
||||
@ -64,7 +64,7 @@ public abstract class ManagedBuffer {
|
||||
public abstract ManagedBuffer release();
|
||||
|
||||
/**
|
||||
* Convert the buffer into an Netty object, used to write the data out. The return value is either
|
||||
* Convert the buffer into a Netty object, used to write the data out. The return value is either
|
||||
* a {@link io.netty.buffer.ByteBuf} or a {@link io.netty.channel.FileRegion}.
|
||||
*
|
||||
* <p>If this method returns a ByteBuf, then that buffer's reference count will be incremented and
|
||||
|
||||
@ -120,7 +120,7 @@ public class TransportClientFactory implements Closeable {
|
||||
throws IOException, InterruptedException {
|
||||
// Get connection from the connection pool first.
|
||||
// If it is not found or not active, create a new one.
|
||||
// Use unresolved address here to avoid DNS resolution each time we creates a client.
|
||||
// Use unresolved address here to avoid DNS resolution each time we create a client.
|
||||
final InetSocketAddress unresolvedAddress =
|
||||
InetSocketAddress.createUnresolved(remoteHost, remotePort);
|
||||
|
||||
|
||||
@ -26,8 +26,8 @@ import org.apache.celeborn.common.network.buffer.NettyManagedBuffer;
|
||||
/**
|
||||
* Response to {@link ChunkFetchRequest} when a chunk exists and has been successfully fetched.
|
||||
*
|
||||
* <p>Note that the server-side encoding of this messages does NOT include the buffer itself, as
|
||||
* this may be written by Netty in a more efficient manner (i.e., zero-copy write). Similarly, the
|
||||
* <p>Note that the server-side encoding of this message does NOT include the buffer itself, as this
|
||||
* may be written by Netty in a more efficient manner (i.e., zero-copy write). Similarly, the
|
||||
* client-side decoding will reuse the Netty ByteBuf as the buffer.
|
||||
*/
|
||||
public final class ChunkFetchSuccess extends ResponseMessage {
|
||||
|
||||
@ -138,7 +138,7 @@ class MessageWithHeader extends AbstractFileRegion {
|
||||
// SPARK-24578: cap the sub-region's size of returned nio buffer to improve the performance
|
||||
// for the case that the passed-in buffer has too many components.
|
||||
int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
|
||||
// If the ByteBuf holds more then one ByteBuffer we should better call nioBuffers(...)
|
||||
// If the ByteBuf holds more than one ByteBuffer we should better call nioBuffers(...)
|
||||
// to eliminate extra memory copies.
|
||||
int written = 0;
|
||||
if (buf.nioBufferCount() == 1) {
|
||||
|
||||
@ -175,7 +175,7 @@ public class TransportChannelHandler extends ChannelInboundHandlerAdapter {
|
||||
requestTimeoutNs / 1000 / 1000);
|
||||
}
|
||||
if (closeIdleConnections) {
|
||||
// While CloseIdleConnections is enable, we also close idle connection
|
||||
// While CloseIdleConnections is enabled, we also close idle connection
|
||||
client.timeOut();
|
||||
ctx.close();
|
||||
}
|
||||
|
||||
@ -296,7 +296,7 @@ public final class ShutdownHookManager {
|
||||
* Indicates if a shutdownHook is registered or not.
|
||||
*
|
||||
* @param shutdownHook shutdownHook to check if registered.
|
||||
* @return TRUE/FALSE depending if the shutdownHook is is registered.
|
||||
* @return TRUE/FALSE depending if the shutdownHook is registered.
|
||||
*/
|
||||
public boolean hasShutdownHook(Runnable shutdownHook) {
|
||||
return hooks.contains(new HookEntry(shutdownHook, 0));
|
||||
|
||||
@ -600,7 +600,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
|
||||
get(HA_MASTER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT)
|
||||
def haMasterRatisLogPurgeGap: Int = get(HA_MASTER_RATIS_LOG_PURGE_GAP)
|
||||
def haMasterRatisLogInstallSnapshotEnabled: Boolean =
|
||||
get(HA_MASTER_RATIS_LOG_INSTABLL_SNAPSHOT_ENABLED)
|
||||
get(HA_MASTER_RATIS_LOG_INSTALL_SNAPSHOT_ENABLED)
|
||||
def haMasterRatisRpcRequestTimeout: Long = get(HA_MASTER_RATIS_RPC_REQUEST_TIMEOUT)
|
||||
def haMasterRatisRetryCacheExpiryTime: Long = get(HA_MASTER_RATIS_SERVER_RETRY_CACHE_EXPIRY_TIME)
|
||||
def haMasterRatisRpcTimeoutMin: Long = get(HA_MASTER_RATIS_RPC_TIMEOUT_MIN)
|
||||
@ -1672,7 +1672,7 @@ object CelebornConf extends Logging {
|
||||
.bytesConf(ByteUnit.BYTE)
|
||||
.createWithDefaultString("32MB")
|
||||
|
||||
val HA_MASTER_RATIS_LOG_INSTABLL_SNAPSHOT_ENABLED: ConfigEntry[Boolean] =
|
||||
val HA_MASTER_RATIS_LOG_INSTALL_SNAPSHOT_ENABLED: ConfigEntry[Boolean] =
|
||||
buildConf("celeborn.master.ha.ratis.raft.server.log.appender.install.snapshot.enabled")
|
||||
.withAlternative("celeborn.ha.master.ratis.raft.server.log.appender.install.snapshot.enabled")
|
||||
.internal
|
||||
@ -3191,7 +3191,7 @@ object CelebornConf extends Logging {
|
||||
.withAlternative("celeborn.push.stageEnd.timeout")
|
||||
.categories("client")
|
||||
.doc(s"Timeout for waiting StageEnd. " +
|
||||
s"During this process, there are `${CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY.key}` times for retry opportunities for committing files" +
|
||||
s"During this process, there are `${CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY.key}` times for retry opportunities for committing files " +
|
||||
s"and 1 times for releasing slots request. User can customize this value according to your setting. " +
|
||||
s"By default, the value is the max timeout value `${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
|
||||
.version("0.3.0")
|
||||
@ -3230,7 +3230,7 @@ object CelebornConf extends Logging {
|
||||
buildConf("celeborn.client.rpc.requestPartition.askTimeout")
|
||||
.categories("client")
|
||||
.version("0.2.0")
|
||||
.doc(s"Timeout for ask operations during requesting change partition location, such as reviving or spliting partition. " +
|
||||
.doc(s"Timeout for ask operations during requesting change partition location, such as reviving or splitting partition. " +
|
||||
s"During this process, there are `${CLIENT_RESERVE_SLOTS_MAX_RETRIES.key}` times for retry opportunities for reserving slots. " +
|
||||
s"User can customize this value according to your setting. " +
|
||||
s"By default, the value is the max timeout value `${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
|
||||
@ -3241,7 +3241,7 @@ object CelebornConf extends Logging {
|
||||
.categories("client")
|
||||
.version("0.2.0")
|
||||
.doc(s"Timeout for ask operations during getting reducer file group information. " +
|
||||
s"During this process, there are `${CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY.key}` times for retry opportunities for committing files" +
|
||||
s"During this process, there are `${CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY.key}` times for retry opportunities for committing files " +
|
||||
s"and 1 times for releasing slots request. User can customize this value according to your setting. " +
|
||||
s"By default, the value is the max timeout value `${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
|
||||
.fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT)
|
||||
@ -3502,7 +3502,7 @@ object CelebornConf extends Logging {
|
||||
s"`${classOf[DefaultIdentityProvider].getName}`. " +
|
||||
s"Optional values: " +
|
||||
s"org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will be obtained by UserGroupInformation.getUserName; " +
|
||||
s"org.apache.celeborn.common.identity.DefaultIdentityProvider uesr name and tenant id are default values or user-specific values.")
|
||||
s"org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values.")
|
||||
.version("0.2.0")
|
||||
.stringConf
|
||||
.createWithDefault(classOf[DefaultIdentityProvider].getName)
|
||||
|
||||
@ -51,7 +51,7 @@ public class PartitionLocationSuiteJ {
|
||||
|
||||
for (int i = 2; i < 255; ++i) {
|
||||
byte otherMode = (byte) i;
|
||||
// Should we return replica mode when the parameter passed in is neither 0 or 1?
|
||||
// Should we return replica mode when the parameter passed in is neither 0 nor 1?
|
||||
assert PartitionLocation.getMode(otherMode) == PartitionLocation.Mode.REPLICA;
|
||||
}
|
||||
}
|
||||
|
||||
@ -35,8 +35,8 @@ then it's convenient for Celeborn Admin to operate the master ratis service.
|
||||
|
||||
Celeborn directly introduces the ratis-shell into the project, users don't need to set up ratis-shell env from ratis repo.
|
||||
User can directly download the Celeborn source tarball from [Download](https://celeborn.apache.org/download) and
|
||||
build the Celeborn accoriding to [build_and_test](https://celeborn.apache.org/community/contributor_guide/build_and_test/)
|
||||
or just down load the pre-built binary tarball from [Download](https://celeborn.apache.org/download)
|
||||
build the Celeborn according to [build_and_test](https://celeborn.apache.org/community/contributor_guide/build_and_test/)
|
||||
or just download the pre-built binary tarball from [Download](https://celeborn.apache.org/download)
|
||||
to get the binary package `apache-celeborn-<VERSION>-bin.tgz`.
|
||||
|
||||
After getting the binary package `apache-celeborn-<VERSION>-bin.tgz`:
|
||||
@ -64,7 +64,7 @@ Usage: celeborn-ratis sh [generic options]
|
||||
|
||||
## generic options
|
||||
The `generic options` pass values for a given ratis-shell property.
|
||||
It support the following content:
|
||||
It supports the following content:
|
||||
`-D*`, `-X*`, `-agentlib*`, `-javaagent*`
|
||||
|
||||
```
|
||||
|
||||
@ -55,7 +55,7 @@ license: |
|
||||
| celeborn.client.push.slowStart.maxSleepTime | 2s | If celeborn.client.push.limit.strategy is set to SLOWSTART, push side will take a sleep strategy for each batch of requests, this controls the max sleep time if the max in flight requests limit is 1 for a long time | 0.3.0 |
|
||||
| celeborn.client.push.sort.randomizePartitionId.enabled | false | Whether to randomize partitionId in push sorter. If true, partitionId will be randomized when sort data to avoid skew when push to worker | 0.3.0 |
|
||||
| celeborn.client.push.splitPartition.threads | 8 | Thread number to process shuffle split request in shuffle client. | 0.3.0 |
|
||||
| celeborn.client.push.stageEnd.timeout | <value of celeborn.<module>.io.connectionTimeout> | Timeout for waiting StageEnd. During this process, there are `celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities for committing filesand 1 times for releasing slots request. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn.<module>.io.connectionTimeout`. | 0.3.0 |
|
||||
| celeborn.client.push.stageEnd.timeout | <value of celeborn.<module>.io.connectionTimeout> | Timeout for waiting StageEnd. During this process, there are `celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities for committing files and 1 times for releasing slots request. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn.<module>.io.connectionTimeout`. | 0.3.0 |
|
||||
| celeborn.client.push.takeTaskMaxWaitAttempts | 1 | Max wait times if no task available to push to worker. | 0.3.0 |
|
||||
| celeborn.client.push.takeTaskWaitInterval | 50ms | Wait interval if no task available to push to worker. | 0.3.0 |
|
||||
| celeborn.client.push.timeout | 120s | Timeout for a task to push data rpc message. This value should better be more than twice of `celeborn.<module>.push.timeoutCheck.interval` | 0.3.0 |
|
||||
@ -68,10 +68,10 @@ license: |
|
||||
| celeborn.client.rpc.cache.concurrencyLevel | 32 | The number of write locks to update rpc cache. | 0.3.0 |
|
||||
| celeborn.client.rpc.cache.expireTime | 15s | The time before a cache item is removed. | 0.3.0 |
|
||||
| celeborn.client.rpc.cache.size | 256 | The max cache items count for rpc cache. | 0.3.0 |
|
||||
| celeborn.client.rpc.getReducerFileGroup.askTimeout | <value of celeborn.<module>.io.connectionTimeout> | Timeout for ask operations during getting reducer file group information. During this process, there are `celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities for committing filesand 1 times for releasing slots request. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn.<module>.io.connectionTimeout`. | 0.2.0 |
|
||||
| celeborn.client.rpc.getReducerFileGroup.askTimeout | <value of celeborn.<module>.io.connectionTimeout> | Timeout for ask operations during getting reducer file group information. During this process, there are `celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities for committing files and 1 times for releasing slots request. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn.<module>.io.connectionTimeout`. | 0.2.0 |
|
||||
| celeborn.client.rpc.maxParallelism | 1024 | Max parallelism of client on sending RPC requests. | 0.3.0 |
|
||||
| celeborn.client.rpc.registerShuffle.askTimeout | <value of celeborn.<module>.io.connectionTimeout> | Timeout for ask operations during register shuffle. During this process, there are two times for retry opportunities for requesting slots, one request for establishing a connection with Worker and `celeborn.client.reserveSlots.maxRetries` times for retry opportunities for reserving slots. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn.<module>.io.connectionTimeout`. | 0.3.0 |
|
||||
| celeborn.client.rpc.requestPartition.askTimeout | <value of celeborn.<module>.io.connectionTimeout> | Timeout for ask operations during requesting change partition location, such as reviving or spliting partition. During this process, there are `celeborn.client.reserveSlots.maxRetries` times for retry opportunities for reserving slots. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn.<module>.io.connectionTimeout`. | 0.2.0 |
|
||||
| celeborn.client.rpc.requestPartition.askTimeout | <value of celeborn.<module>.io.connectionTimeout> | Timeout for ask operations during requesting change partition location, such as reviving or splitting partition. During this process, there are `celeborn.client.reserveSlots.maxRetries` times for retry opportunities for reserving slots. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn.<module>.io.connectionTimeout`. | 0.2.0 |
|
||||
| celeborn.client.rpc.reserveSlots.askTimeout | <value of celeborn.rpc.askTimeout> | Timeout for LifecycleManager request reserve slots. | 0.3.0 |
|
||||
| celeborn.client.shuffle.batchHandleChangePartition.interval | 100ms | Interval for LifecycleManager to schedule handling change partition requests in batch. | 0.3.0 |
|
||||
| celeborn.client.shuffle.batchHandleChangePartition.threads | 8 | Threads number for LifecycleManager to handle change partition request in batch. | 0.3.0 |
|
||||
|
||||
@ -36,7 +36,7 @@ bufferSize = `celeborn.worker.flusher.buffer.size` # the amount of memory will
|
||||
off-heap-memory = bufferSize * estimatedTasks * 2 + network memory
|
||||
```
|
||||
|
||||
For example, if an Celeborn worker has 10 storage directories or disks and the buffer size is set to 256 KiB.
|
||||
For example, if a Celeborn worker has 10 storage directories or disks and the buffer size is set to 256 KiB.
|
||||
The necessary off-heap memory is 10 GiB.
|
||||
|
||||
Network memory will be consumed when netty reads from a TPC channel, there will need some extra
|
||||
|
||||
@ -21,7 +21,7 @@ license: |
|
||||
| --- | ------- | ----------- | ----- |
|
||||
| celeborn.quota.configuration.path | <undefined> | Quota configuration file path. The file format should be yaml. Quota configuration file template can be found under conf directory. | 0.2.0 |
|
||||
| celeborn.quota.enabled | true | When true, before registering shuffle, LifecycleManager should check if current user have enough quota space, if cluster don't have enough quota space for current user, fallback to Spark's default shuffle | 0.2.0 |
|
||||
| celeborn.quota.identity.provider | org.apache.celeborn.common.identity.DefaultIdentityProvider | IdentityProvider class name. Default class is `org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values: org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will be obtained by UserGroupInformation.getUserName; org.apache.celeborn.common.identity.DefaultIdentityProvider uesr name and tenant id are default values or user-specific values. | 0.2.0 |
|
||||
| celeborn.quota.identity.provider | org.apache.celeborn.common.identity.DefaultIdentityProvider | IdentityProvider class name. Default class is `org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values: org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will be obtained by UserGroupInformation.getUserName; org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values. | 0.2.0 |
|
||||
| celeborn.quota.identity.user-specific.tenant | default | Tenant id if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 |
|
||||
| celeborn.quota.identity.user-specific.userName | default | User name if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 |
|
||||
| celeborn.quota.manager | org.apache.celeborn.common.quota.DefaultQuotaManager | QuotaManger class name. Default class is `org.apache.celeborn.common.quota.DefaultQuotaManager`. | 0.2.0 |
|
||||
|
||||
@ -115,7 +115,7 @@ These metrics are exposed by Celeborn master.
|
||||
|
||||
- namespace=ResourceConsumption
|
||||
- **notes:**
|
||||
- This merics data is generated for each user and they are identified using a metric tag.
|
||||
- This metrics data is generated for each user and they are identified using a metric tag.
|
||||
- diskFileCount
|
||||
- diskBytesWritten
|
||||
- hdfsFileCount
|
||||
|
||||
@ -94,7 +94,7 @@ public class HARaftServer {
|
||||
private long appTimeoutDeadline;
|
||||
|
||||
/**
|
||||
* Returns an Master Ratis server.
|
||||
* Returns a Master Ratis server.
|
||||
*
|
||||
* @param conf configuration
|
||||
* @param localRaftPeerId raft peer id of this Ratis server
|
||||
|
||||
@ -328,7 +328,7 @@ public class StateMachine extends BaseStateMachine {
|
||||
}
|
||||
|
||||
/**
|
||||
* Store the current state as an snapshot file in the stateMachineStorage.
|
||||
* Store the current state as a snapshot file in the stateMachineStorage.
|
||||
*
|
||||
* @return the index of the snapshot
|
||||
*/
|
||||
|
||||
@ -29,7 +29,7 @@ import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
|
||||
class HeartbeatTest extends AnyFunSuite with Logging with MiniClusterFeature with HeartbeatFeature
|
||||
with BeforeAndAfterAll with BeforeAndAfterEach {
|
||||
|
||||
test("celeborn flink hearbeat test - client <- worker") {
|
||||
test("celeborn flink heartbeat test - client <- worker") {
|
||||
val (_, clientConf) = getTestHeartbeatFromWorker2ClientConf
|
||||
val flinkShuffleClientImpl =
|
||||
new FlinkShuffleClientImpl(
|
||||
@ -44,7 +44,7 @@ class HeartbeatTest extends AnyFunSuite with Logging with MiniClusterFeature wit
|
||||
testHeartbeatFromWorker2Client(flinkShuffleClientImpl.getDataClientFactory)
|
||||
}
|
||||
|
||||
test("celeborn flink hearbeat test - client <- worker no heartbeat") {
|
||||
test("celeborn flink heartbeat test - client <- worker no heartbeat") {
|
||||
val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf
|
||||
val flinkShuffleClientImpl =
|
||||
new FlinkShuffleClientImpl(
|
||||
@ -59,7 +59,7 @@ class HeartbeatTest extends AnyFunSuite with Logging with MiniClusterFeature wit
|
||||
testHeartbeatFromWorker2ClientWithNoHeartbeat(flinkShuffleClientImpl.getDataClientFactory)
|
||||
}
|
||||
|
||||
test("celeborn flink hearbeat test - client <- worker timeout") {
|
||||
test("celeborn flink heartbeat test - client <- worker timeout") {
|
||||
val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithCloseChannelConf
|
||||
val flinkShuffleClientImpl =
|
||||
new FlinkShuffleClientImpl(
|
||||
|
||||
@ -254,7 +254,7 @@ public class MapDataPartitionReader implements Comparable<MapDataPartitionReader
|
||||
String filename, FileChannel channel, ByteBuffer header, ByteBuf buffer, int headerSize)
|
||||
throws IOException {
|
||||
readHeaderOrIndexBuffer(channel, header, headerSize);
|
||||
// header is combined of mapId(4),attemptId(4),nextBatchId(4) and total Compresszed Length(4)
|
||||
// header is combined of mapId(4),attemptId(4),nextBatchId(4) and total Compressed Length(4)
|
||||
// we need size here,so we read length directly
|
||||
int bufferLength = header.getInt(12);
|
||||
if (bufferLength <= 0 || bufferLength > buffer.capacity()) {
|
||||
|
||||
@ -175,9 +175,9 @@ class PushDataHandler extends BaseMessageHandler with Logging {
|
||||
if (location == null) {
|
||||
val (mapId, attemptId) = getMapAttempt(body)
|
||||
// MapperAttempts for a shuffle exists after any CommitFiles request succeeds.
|
||||
// A shuffle can trigger multiple CommitFiles requests, for reasons like: Hard-Split happens, StageEnd.
|
||||
// A shuffle can trigger multiple CommitFiles requests, for reasons like: HARD_SPLIT happens, StageEnd.
|
||||
// If MapperAttempts but the value is -1 for the mapId(-1 means the map has not yet finished),
|
||||
// it's probably because commitFiles for Had-Split happens.
|
||||
// it's probably because commitFiles for HARD_SPLIT happens.
|
||||
if (shuffleMapperAttempts.containsKey(shuffleKey)) {
|
||||
if (-1 != shuffleMapperAttempts.get(shuffleKey).get(mapId)) {
|
||||
// partition data has already been committed
|
||||
@ -195,7 +195,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
|
||||
if (storageManager.shuffleKeySet().contains(shuffleKey)) {
|
||||
// If there is no shuffle key in shuffleMapperAttempts but there is shuffle key
|
||||
// in StorageManager. This partition should be HARD_SPLIT partition and
|
||||
// after worker restart, some task still push data to this HARD_SPLIT partition.
|
||||
// after worker restart, some tasks still push data to this HARD_SPLIT partition.
|
||||
logInfo(s"[Case2] Receive push data for committed hard split partition of " +
|
||||
s"(shuffle $shuffleKey, map $mapId attempt $attemptId)")
|
||||
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
|
||||
@ -437,9 +437,9 @@ class PushDataHandler extends BaseMessageHandler with Logging {
|
||||
if (loc == null) {
|
||||
val (mapId, attemptId) = getMapAttempt(body)
|
||||
// MapperAttempts for a shuffle exists after any CommitFiles request succeeds.
|
||||
// A shuffle can trigger multiple CommitFiles requests, for reasons like: Hard-Split happens, StageEnd.
|
||||
// A shuffle can trigger multiple CommitFiles requests, for reasons like: HARD_SPLIT happens, StageEnd.
|
||||
// If MapperAttempts but the value is -1 for the mapId(-1 means the map has not yet finished),
|
||||
// it's probably because commitFiles for Had-Split happens.
|
||||
// it's probably because commitFiles for HARD_SPLIT happens.
|
||||
if (shuffleMapperAttempts.containsKey(shuffleKey)) {
|
||||
if (-1 != shuffleMapperAttempts.get(shuffleKey).get(mapId)) {
|
||||
logInfo(s"Receive push merged data from speculative " +
|
||||
@ -457,7 +457,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
|
||||
if (storageManager.shuffleKeySet().contains(shuffleKey)) {
|
||||
// If there is no shuffle key in shuffleMapperAttempts but there is shuffle key
|
||||
// in StorageManager. This partition should be HARD_SPLIT partition and
|
||||
// after worker restart, some task still push data to this HARD_SPLIT partition.
|
||||
// after worker restart, some tasks still push data to this HARD_SPLIT partition.
|
||||
logInfo(s"Receive push merged data for committed hard split partition of " +
|
||||
s"(shuffle $shuffleKey, map $mapId attempt $attemptId)")
|
||||
callbackWithTimer.onSuccess(
|
||||
|
||||
Loading…
Reference in New Issue
Block a user