Add exception handler when calling CelebornHadoopUtils.getHadoopFS(conf) on Master and Worker, Avoid Concealing Initialization HDFS Exception Information
### What changes were proposed in this pull request?
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1923 from leemingzixxoo/main.
Authored-by: ming.li <ming.li@dmall.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
If Worker lost or lost after graceful shutdown, Master would retain these lostWorker/shutdownWorkers meta permanently,
These meta would cause some noisy message in lifecycleManager. For these meta better to delete them after a while
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT & E2E test
Closes#1916 from RexXiong/CELEBORN-468.
Authored-by: Shuang <lvshuang.tb@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Worker will firstly register failed after worker gracefully restart in HA mode, it will be really registered after one heartbeat.
<img width="889" alt="image" src="https://github.com/apache/incubator-celeborn/assets/19429353/371aa0e0-b2e9-4c1f-9e40-276dc1460219">
This is because master here uses same `requestId` to submit request, causing the second request not be processed correctly, due to Ratis `RetryCache`.
Master logs like below:
(worker gracefully stop)
Master: Receive ReportNodeFailure
(worker start)
Master: Received RegisterWorker request
Master: Received heartbeat from unknown worker
Master: Registered worker
So here improve AbstractMetaManager#updateRegisterWorkerMeta to cover `WorkerRemove` logic. For back compatibility and possible inconsistencies during rolling upgrade, temporarily fix duplicate requestId and keep remove function. And we can try to remove `WorkerRemove` logic in the future version.
### Why are the changes needed?
Ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Cluster test
Closes#1863 from onebox-li/fix-restart-register.
Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
The master checks the number of healthy disks in the woker and decides whether to exclude it.
### Why are the changes needed?
When the disks of all the workers are unhealthy, HDFS is not enabled, and the master does not exclude the workers, the spark client calls checkWorkersAvailable and returns available, and the shuffle write ultimately fails without fallback.
```java
23/09/08 23:20:44 ERROR LifecycleManager: Aggregated error of reserveSlots for shuffleId 9 failure:
[reserveSlots] Failed to reserve buffers for shuffleId 9 from worker Host:1.2.3.4:RpcPort:55803:PushPort:55805:FetchPort:55807:ReplicatePort:55806. Reason: Local storage has no available dirs!
23/09/08 23:20:44 ERROR LifecycleManager: Retry reserve slots for 9 failed caused by not enough slots.
23/09/08 23:20:44 WARN LifecycleManager: Reserve buffers for 9 still fail after retrying, clear buffers.
23/09/08 23:20:44 ERROR LifecycleManager: reserve buffer for 9 failed, reply to all.
23/09/08 23:20:44 ERROR ShuffleClientImpl: LifecycleManager request slots return RESERVE_SLOTS_FAILED, retry again, remain retry times 0.
23/09/08 23:20:47 WARN TaskSetManager: Lost task 8.0 in stage 27.0 (TID 89) (1.2.3.4 executor driver): TaskKilled (Stage cancelled)
23/09/08 23:20:59 ERROR MasterClient: Send rpc with failure, has tried 15, max try 15!
org.apache.celeborn.common.exception.CelebornException: Exception thrown in awaitResult:
at org.apache.celeborn.common.util.ThreadUtils$.awaitResult(ThreadUtils.scala:229)
at org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:74)
at org.apache.celeborn.common.client.MasterClient.sendMessageInner(MasterClient.java:150)
at org.apache.celeborn.common.client.MasterClient.askSync(MasterClient.java:118)
at org.apache.celeborn.client.LifecycleManager.requestMasterRequestSlots(LifecycleManager.scala:1033)
at org.apache.celeborn.client.LifecycleManager.requestMasterRequestSlotsWithRetry(LifecycleManager.scala:1022)
at org.apache.celeborn.client.LifecycleManager.org$apache$celeborn$client$LifecycleManager$$offerAndReserveSlots(LifecycleManager.scala:402)
at org.apache.celeborn.client.LifecycleManager$$anonfun$receiveAndReply$1.applyOrElse(LifecycleManager.scala:210)
```
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
local test
```
23/09/08 23:23:27 WARN CelebornShuffleFallbackPolicyRunner: No workers available for current user `default`.`default`.
23/09/08 23:23:27 WARN SparkShuffleManager: Fallback to vanilla Spark SortShuffleManager for shuffle: 10
23/09/08 23:23:28 WARN CelebornShuffleFallbackPolicyRunner: No workers available for current user `default`.`default`.
23/09/08 23:23:28 WARN SparkShuffleManager: Fallback to vanilla Spark SortShuffleManager for shuffle: 11
100000
Time taken: 0.192 seconds, Fetched 1 row(s)
```
```
Closes#1893 from cxzl25/CELEBORN-960.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
`CelebornShuffleFallbackPolicyRunner` could not only check quota, but also check whether cluster has available workers. If there is no available workers, fallback to external shuffle.
### Why are the changes needed?
`CelebornShuffleFallbackPolicyRunner` adds a check for available workers.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- `SparkShuffleManagerSuite#testClusterNotAvailableWithAvailableWorkers`
Closes#1814 from SteNicholas/CELEBORN-830.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Adding a flag indicating high load in the worker's heartbeat allows the master to better schedule the workers
### Why are the changes needed?
In our production environment, there is a node with abnormally high load, but the master is not aware of this situation. It assigned numerous jobs to this node, and as a result, the stability of these jobs has been affected.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
UT
Closes#1840 from JQ-Cao/920.
Lead-authored-by: Keyong Zhou <zky.zhoukeyong@alibaba-inc.com>
Co-authored-by: caojiaqing <caojiaqing@bilibili.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Keep ReleaseSlots RPC to make sure that 0.3 client can worker with 0.3.1-SNAPSHOT and 0.4.0-SNAPSHOT.
This PR will need to merged into main and branch-0.3.
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA and cluster.
Closes#1794 from FMX/CELEBORN-846-FOLLOWUP.
Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Co-authored-by: Ethan Feng <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Add config to limit max workers when offering slots, the config can be set both
in server side and client side. Celeborn will choose the smaller positive configs from client and master.
### Why are the changes needed?
For large Celeborn clusters, users may want to limit the number of workers that
a shuffle can spread, reasons are:
1. One worker failure will not affect all applications
2. One huge shuffle will not affect all applications
3. It's more efficient to limit a shuffle within a restricted number of workers, say 100, than
spreading across a large number of workers, say 1000, because the network connections
in pushing data is `number of ShuffleClient` * `number of allocated Workers`
The recommended number of Workers should depend on workload and Worker hardware,
and this can be configured per application, so it's relatively flexible.
### Does this PR introduce _any_ user-facing change?
No, added a new configuration.
### How was this patch tested?
Added ITs and passes GA.
Closes#1790 from waitinfuture/152.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
Reduce duplicate code segments, improve code readability and maintenance difficulty.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit Test
Closes#1786 from zwangsheng/CELEBORN-872.
Authored-by: zwangsheng <2213335496@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Fallback in following order:
1. usableDisks is empty (no need to call iter)
2. under replicate case, first usableDisks == 1 fast fallback
3. count distinct worker
### Why are the changes needed?
Clear about the logic here
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit Test
Closes#1781 from zwangsheng/CELEBORN-868.
Authored-by: zwangsheng <2213335496@qq.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
CELEBORN-791 removed sending the ReleaseSlotsRequest from worker, so Master is not required to handle it.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1767 from AngersZhuuuu/CELEBORN-846.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
Pass exit kind to each component, if the exit kind match:
- GRACEFUL_SHUTDOWN: Behavior as origin code's graceful == true
- Others: will clean the level db file.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1748 from AngersZhuuuu/CELEBORN-819.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1742 from AngersZhuuuu/CELEBORN-820.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
Fix some typos and grammar
### Why are the changes needed?
Ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
manually test
Closes#1733 from onebox-li/fix-typo.
Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Master won't simulate slots allocations and use active slots sent from worker.
### Why are the changes needed?
I have observed that a new worker might allocate more slots than other workers when using the round-robin slot allocation algorithm.
There is a logic error in processing heartbeat from worker. It will update disk info's active slots to max(current disk info active slots, disk info sent from worker active slots). If I registered a huge shuffle, master will allocate more slots than a disk's max slots and mark them as unknown disk slots but worker will count the unknown disk slots as active slots and report it to the master. Then the slots release logic can not distinguish unknown slots from a number so the release will not decrease active slots properly.
Due to the gap between work and master, so I think it's OK to remove slots allocation simulation from worker and use active slots from worker.
Before this patch:
<img width="928" alt="截屏2023-07-12 16 51 15" src="https://github.com/apache/incubator-celeborn/assets/4150993/9c8a46d9-26a8-42f5-a956-938273277c9b">
After this patch:
<img width="509" alt="截屏2023-07-12 16 25 52" src="https://github.com/apache/incubator-celeborn/assets/4150993/c49b3d91-14ea-4eb8-9b71-9aab73541faf">
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
UT and cluster.
Closes#1710 from FMX/CELEBORN-791.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Clean unused GetBlacklist & GetBlacklistResponse
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1656 from AngersZhuuuu/CELEBORN-733.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
Make Celeborn leader clean expired app dirs on HDFS when an application is Lost.
### Why are the changes needed?
If Celeborn is working on HDFS, the storage manager starts and cleans expired app directories, and the newly created worker will want to delete any unknown app directories.
This will cause using app directories to be deleted unexpectedly.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
UT and cluster.
Closes#1678 from FMX/CELEBORN-764.
Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Co-authored-by: Ethan Feng <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Rename remain rss related class name and filenames etc...
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1664 from AngersZhuuuu/CELEBORN-751.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
- gauge method definition improvement. i.e.
before
```
def addGauge[T](name: String, f: Unit => T, labels: Map[String, String])
```
after
```
def addGauge[T](name: String, labels: Map[String, String])(f: () => T)
```
which improves the caller-side code style
```
addGauge(name, labels) { () =>
...
}
```
- remove unnecessary Java/Scala collection conversion. Since Scala 2.11 does not support SAM, the extra implicit function is required.
- leverage Logging to defer message evaluation
- UPPER_CASE string constants
### Why are the changes needed?
Improve code quality and performance(maybe)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
Closes#1670 from pan3793/CELEBORN-757.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
In order to distinguish it from the existing master/worker, refactor data replication terminology to 'primary/replica' for improved clarity and inclusivity in the codebase
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
existing tests
Closes#1639 from cfmcgrady/primary-replica.
Lead-authored-by: Fu Chen <cfmcgrady@gmail.com>
Co-authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Rename RssHARetryClient to MasterClient
### Why are the changes needed?
Code refactor
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
Closes#1661 from AngersZhuuuu/CELEBORN-748.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Rename HeartbeatResponse to HeartbeatFromWorkerResponse
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1651 from AngersZhuuuu/CELEBORN-739.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Remove unused RPC GetWorkerInfo & GetWorkerInfosResponse
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1647 from AngersZhuuuu/CELEBORN-735.
Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
In this pr, we rename all RPC blacklist fields, it won't have have compatibility issues.
For RPC `GetBlacklist` and `GetBlacklistResponse` we won't change it, since it won't be used in next release, so we can remove these two RPC in next release.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1643 from AngersZhuuuu/CELEBORN-666-RPC.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Unify all blacklist related code and comment
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1638 from AngersZhuuuu/CELEBORN-666-FOLLOWUP.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
…nse with lower versions
### What changes were proposed in this pull request?
The master side will check HeartbeatFromApplication's reply field. if reply is true then it replies HeartbeatFromApplicationResponse otherwise OneWayMessageResponse.
The reply field is default false before the version 0.2.1, so master can be compatible with older client version
### Why are the changes needed?
Before the version `0.2.1`, the response of HeartbeatFromApplication is` OneWayMessageResponse`, but from `0.3.0`, the response of HeartbeatFromApplication is modified to `HeartbeatFromApplicationResponse`.
if the version of `client side `is `0.2.1` and the version of `server side is 0.3.0`, the `compatiblity issue `will occur.
The following compatiblity error will be printted.
``` java
java.io.InvalidObjectException: enum constant HEARTBEAT_FROM_APPLICATION_RESPONSE does not exist in class org.apache.celeborn.common.protocol.MessageType
at java.io.ObjectInputStream.readEnum(ObjectInputStream.java:2157) ~[?:1.8.0_362]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1662) ~[?:1.8.0_362]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430) ~[?:1.8.0_362]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) ~[?:1.8.0_362]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) ~[?:1.8.0_362]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) ~[?:1.8.0_362]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502) ~[?:1.8.0_362]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460) ~[?:1.8.0_362]
at org.apache.celeborn.common.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) ~[celeborn-client-spark-3-shaded_2.12-0.2.1-incubating.jar:?]
```
``` java
Caused by: java.lang.ClassCastException: Cannot cast org.apache.celeborn.common.protocol.message.ControlMessages$HeartbeatFromApplicationResponse to org.apache.celeborn.common.protocol.message.ControlMessages$OneWayMessageResponse$
at java.lang.Class.cast(Class.java:3369) ~[?:1.8.0_362]
at scala.concurrent.Future.$anonfun$mapTo$1(Future.scala:500) ~[scala-library-2.12.15.jar:?]
at scala.util.Success.$anonfun$map$1(Try.scala:255) ~[scala-library-2.12.15.jar:?]
at scala.util.Success.map(Try.scala:213) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.BatchingExecutor$Batch.processBatch$1(BatchingExecutor.scala:67) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.BatchingExecutor$Batch.$anonfun$run$1(BatchingExecutor.scala:82) ~[scala-library-2.12.15.jar:?]
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:59) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:875) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:110) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:107) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:873) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.Promise.trySuccess(Promise.scala:94) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.Promise.trySuccess$(Promise.scala:94) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.Promise$DefaultPromise.trySuccess(Promise.scala:187) ~[scala-library-2.12.15.jar:?]
at org.apache.celeborn.common.rpc.netty.NettyRpcEnv.onSuccess$1(NettyRpcEnv.scala:218) ~[celeborn-client-spark-3-shaded_2.12-0.2.1-incubating.jar:?]
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
The pr is tested manually and the testing process is as follows:
1. server side is deploy using the code of latest branch-0.3.
2. spark client is deploy the version of 0.2.1, then run spark-sql to execute 3 tpcds queries( query1.sql/querey2/quere3.sql whose datasize is 1T), finnally verify that the queries are executed successfully and no above compatiblity error printted
3. spark client is deploy the version of 0.3.0, then run spark-sql to execute 3 tpcds queries( query1.sql/querey2/quere3.sql whose datasize is 1T), finnally verify that the queries are executed successfully and no above compatiblity error printted
This patch had conflicts when merged, resolved by
Committer: Cheng Pan <chengpan@apache.org>
Closes#1635 from zhongqiangczq/heartbeat2.
Authored-by: zhongqiang.czq <zhongqiang.czq@alibaba-inc.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
1. Celeborn supports storage type selection. HDD, SSD, and HDFS are available for now.
2. Add new buffer size for HDFS file writers.
3. Worker support empty working dirs.
### Why are the changes needed?
Support HDFS only scenario.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
UT and cluster.
Closes#1619 from FMX/CELEBORN-568.
Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Co-authored-by: Ethan Feng <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Celeborn generate hadoop configuration should respect Celeborn conf
### Why are the changes needed?
In spark client side we should write like `spark.celeborn.hadoop.xxx.xx`
In server side we should write like `celeborn.hadoop.xxx.xxx`
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1629 from AngersZhuuuu/CELEBORN-719.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
This PR aims to make network local address binding support both IP and FQDN strategy.
Additional, it refactors the `ShuffleClientImpl#genAddressPair`, from `${hostAndPort}-${hostAndPort}` to `Pair<String, String>`, which works properly when using IP but may not on FQDN because FQDN may contain `-`
### Why are the changes needed?
Currently, when the bind hostname is not set explicitly, Celeborn will find the first non-loopback address and always uses the IP to bind, this is not suitable for K8s cases, as the STS has a stable FQDN but Pod IP will be changed once Pod restarting.
For `ShuffleClientImpl#genAddressPair`, it must be changed otherwise may cause
```
java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 11657 in stage 0.0 failed 4 times, most recent failure: Lost task 11657.3 in stage 0.0 (TID 12747) (10.153.253.198 executor 157): java.lang.ArrayIndexOutOfBoundsException: 1
at org.apache.celeborn.client.ShuffleClientImpl.doPushMergedData(ShuffleClientImpl.java:874)
at org.apache.celeborn.client.ShuffleClientImpl.pushOrMergeData(ShuffleClientImpl.java:735)
at org.apache.celeborn.client.ShuffleClientImpl.mergeData(ShuffleClientImpl.java:827)
at org.apache.spark.shuffle.celeborn.SortBasedPusher.pushData(SortBasedPusher.java:140)
at org.apache.spark.shuffle.celeborn.SortBasedPusher.insertRecord(SortBasedPusher.java:192)
at org.apache.spark.shuffle.celeborn.SortBasedShuffleWriter.fastWrite0(SortBasedShuffleWriter.java:192)
at org.apache.spark.shuffle.celeborn.SortBasedShuffleWriter.write(SortBasedShuffleWriter.java:145)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1508)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
```
### Does this PR introduce _any_ user-facing change?
Yes, a new configuration `celeborn.network.bind.preferIpAddress` is introduced, and the default value is `true` to preserve the existing behavior.
### How was this patch tested?
Manually testing with `celeborn.network.bind.preferIpAddress=false`
```
Server: 10.178.96.64
Address: 10.178.96.64#53
Name: celeborn-master-0.celeborn-master-svc.spark.svc.cluster.local
Address: 10.153.143.252
Server: 10.178.96.64
Address: 10.178.96.64#53
Name: celeborn-master-1.celeborn-master-svc.spark.svc.cluster.local
Address: 10.153.173.94
Server: 10.178.96.64
Address: 10.178.96.64#53
Name: celeborn-master-2.celeborn-master-svc.spark.svc.cluster.local
Address: 10.153.149.42
starting org.apache.celeborn.service.deploy.worker.Worker, logging to /opt/celeborn/logs/celeborn--org.apache.celeborn.service.deploy.worker.Worker-1-celeborn-worker-4.out
2023-06-25 23:49:52 [INFO] [main] org.apache.celeborn.common.rpc.netty.Dispatcher#51 - Dispatcher numThreads: 4
2023-06-25 23:49:52 [INFO] [main] org.apache.celeborn.common.network.client.TransportClientFactory#91 - mode NIO threads 64
2023-06-25 23:49:52 [INFO] [main] org.apache.celeborn.common.rpc.netty.NettyRpcEnvFactory#51 - Starting RPC Server [WorkerSys] on celeborn-worker-4.celeborn-worker-svc.spark.svc.cluster.local:0 with advisor endpoint celeborn-worker-4.celeborn-worker-svc.spark.svc.cluster.local:0
2023-06-25 23:49:52 [INFO] [main] org.apache.celeborn.common.util.Utils#51 - Successfully started service 'WorkerSys' on port 38303.
```
Closes#1622 from pan3793/CELEBORN-713.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Change Celeborn Master URL from `rss://<host>:<port>` to `celeborn://<host>:<port>`
### Why are the changes needed?
Respect the project name.
### Does this PR introduce _any_ user-facing change?
Yes, migration guide is updated accordingly.
### How was this patch tested?
Pass GA.
Closes#1624 from pan3793/CELEBORN-715.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Remove environment variables `CELEBORN_MASTER_HOST` and `CELEBORN_MASTER_PORT`, and makes `CELEBORN_LOCAL_HOSTNAME` takes effect on both master and worker.
### Why are the changes needed?
There are many different ways to configure the master/worker host and port, which makes the thing complex and inconsistent.
After this change,
#### master
1. cli args `--host` `--port` takes the highest priority
2. then lookup env `CELEBORN_LOCAL_HOSTNAME`
3. things are different when HA enabled and disabled
3.1. when HA is disabled, lookup configurations `celeborn.master.host` and `celeborn.master.port`
3.2. when HA is enabled, each node needs to know the whole cluster info,
```
celeborn.master.ha.node.1.host clb-1
celeborn.master.ha.node.1.port 9097
celeborn.master.ha.node.2.host clb-2
celeborn.master.ha.node.2.port 9097
celeborn.master.ha.node.3.host clb-3
celeborn.master.ha.node.3.port 9097
```
in addition, `celeborn.master.ha.node.id=1` can be used to indicate the node id, otherwise, the master will try to bind each host to match the node id.
#### worker
1. cli args `--host` `--port` takes the highest priority
2. then lookup env `CELEBORN_LOCAL_HOSTNAME`
things are simple than the master case because each worker is not required to know others.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
UT.
Closes#1616 from pan3793/CELEBORN-707.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
- Fix commit metrics in application heartbeat
- Change client side application heartbeat message log level to info
- Improve heartbeat log by unify the word "heartbeat"
### Why are the changes needed?
`commitHandler.commitMetrics()` has side effects, multiple calls to get values independently is incorrect.
```
def commitMetrics(): (Long, Long) = (totalWritten.sumThenReset(), fileCount.sumThenReset())
```
### Does this PR introduce _any_ user-facing change?
Yes, bug fix.
### How was this patch tested?
Review.
Closes#1617 from pan3793/CELEBORN-708.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Refactor WorkerInfo
1. make ```diskInfos```, ```userResourceConsumption``` new maps instead of using the passed in reference
2. remove ```endpoint``` from the constructor
### Why are the changes needed?
When manually test stop-worker.sh with graceful turned on, I got the following Exception
```
23/06/19 11:04:25,665 INFO [worker-forward-message-scheduler] RssHARetryClient: connect to master master-1-1:9097.
23/06/19 11:04:27,168 ERROR [worker-forward-message-scheduler] RssHARetryClient: Send rpc with failure, has tried 15, max try 15!
org.apache.celeborn.common.exception.CelebornException: Exception thrown in awaitResult:
at org.apache.celeborn.common.util.ThreadUtils$.awaitResult(ThreadUtils.scala:231)
at org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:74)
at org.apache.celeborn.common.haclient.RssHARetryClient.sendMessageInner(RssHARetryClient.java:150)
at org.apache.celeborn.common.haclient.RssHARetryClient.askSync(RssHARetryClient.java:118)
at org.apache.celeborn.service.deploy.worker.Worker.org$apache$celeborn$service$deploy$worker$Worker$$heartBeatToMaster(Worker.scala:306)
at org.apache.celeborn.service.deploy.worker.Worker$$anon$1.$anonfun$run$1(Worker.scala:332)
at org.apache.celeborn.common.util.Utils$.tryLogNonFatalError(Utils.scala:186)
at org.apache.celeborn.service.deploy.worker.Worker$$anon$1.run(Worker.scala:332)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.celeborn.common.exception.CelebornIOException: remove
at org.apache.celeborn.service.deploy.master.clustermeta.ha.HAHelper.sendFailure(HAHelper.java:65)
at org.apache.celeborn.service.deploy.master.Master.executeWithLeaderChecker(Master.scala:210)
at org.apache.celeborn.service.deploy.master.Master$$anonfun$receiveAndReply$1.applyOrElse(Master.scala:315)
at org.apache.celeborn.common.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
at org.apache.celeborn.common.rpc.netty.Inbox.safelyCall(Inbox.scala:222)
at org.apache.celeborn.common.rpc.netty.Inbox.process(Inbox.scala:110)
at org.apache.celeborn.common.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:229)
... 3 more
Caused by: java.lang.UnsupportedOperationException: remove
at scala.collection.convert.Wrappers$MapWrapper$$anon$2$$anon$3.remove(Wrappers.scala:236)
at java.util.AbstractMap.remove(AbstractMap.java:254)
at org.apache.celeborn.common.meta.WorkerInfo.$anonfun$updateThenGetDiskInfos$2(WorkerInfo.scala:225)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.celeborn.common.meta.WorkerInfo.updateThenGetDiskInfos(WorkerInfo.scala:224)
at org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager.lambda$updateWorkerHeartbeatMeta$5(AbstractMetaManager.java:205)
at java.util.Optional.ifPresent(Optional.java:159)
at org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager.updateWorkerHeartbeatMeta(AbstractMetaManager.java:203)
at org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager.handleWorkerHeartbeat(SingleMasterMetaManager.java:105)
at org.apache.celeborn.service.deploy.master.Master.org$apache$celeborn$service$deploy$master$Master$$handleHeartbeatFromWorker(Master.scala:428)
at org.apache.celeborn.service.deploy.master.Master$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$20(Master.scala:326)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.celeborn.service.deploy.master.Master.executeWithLeaderChecker(Master.scala:207)
... 8 more
```
According to the suggestion from https://github.com/apache/incubator-celeborn/pull/1602#issuecomment-1596722991
### Does this PR introduce _any_ user-facing change?
Yes, it fixes bug described in https://github.com/apache/incubator-celeborn/pull/1602
### How was this patch tested?
UTs and manual test.
Closes#1605 from waitinfuture/695.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
1. Use `<arg>-Ywarn-unused-import</arg>` to remove some unused imports
There is no way to use `<arg>-Ywarn-unused-import</arg>` at this stage
Because we have the following code
```
// Can Remove this if celeborn don't support scala211 in future
import org.apache.celeborn.common.util.FunctionConverter._
```
2. Fix scala case match not fully covered, avoid `scala.MatchError`
3. Fixed some scala compilation warnings
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1600 from cxzl25/cleanup_code.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1594 from AngersZhuuuu/CELEBORN-682.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Now worker will send WorkLost too, master should also reply WorkerLostResponse when it's unknown worker
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1584 from AngersZhuuuu/CELEBORN-668-FOLLOWUP.
Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: Keyong Zhou <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
…eful is disabled
### What changes were proposed in this pull request?
Worker should report WorkerLost instead of WorkerUnavailable in it's shutdown hook if graceful shutdown is disabled.
### Why are the changes needed?
To avoid unnecessary commit file requests from lifecycle manager since it's not graceful shutdown.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Closes#1580 from waitinfuture/668.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Throw exception when raft client request not success.
### Why are the changes needed?
Ratis client may not throw exception when submit request not success. Current only LeaderNotReadyException,NotLeaderException throw out exception this may cause some inconsistent.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes#1556 from RexXiong/CELEBORN-646.
Authored-by: Shuang <lvshuang.tb@gmail.com>
Signed-off-by: Shuang <lvshuang.tb@gmail.com>
### What changes were proposed in this pull request?
`SimpleDateFormat` is not thread-safe, replace it with a thread-safe `FastDateFormat`
### Why are the changes needed?
`FastDateFormat` is a fast and thread-safe version of `java.text.SimpleDateFormat`.
### Does this PR introduce _any_ user-facing change?
Yes, it's a bug fix.
### How was this patch tested?
Manually review.
Closes#1545 from pan3793/CELEBORN-636.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Ethan Feng <ethanfeng@apache.org>