### What changes were proposed in this pull request?
Dependency leveldbjni uses `org.openlabtesting.leveldbjni` to support linux aarch64 platform for leveldb.
### Why are the changes needed?
Celeborn worker could not start on arm arch devices if db backend is `LevelDB`, which should support leveldbjni on the aarch64 platform.
aarch64 uses `org.openlabtesting.leveldbjni:leveldbjni-all.1.8`, and other platforms use `org.fusesource.leveldbjni:leveldbjni-all.1.8`. Meanwhile, because some hadoop dependencies packages are also depend on `org.fusesource.leveldbjni:leveldbjni-all`, but hadoop merge the similar change on trunk, details see
[HADOOP-16614](https://issues.apache.org/jira/browse/HADOOP-16614), therefore it should exclude the dependency of `org.fusesource.leveldbjni` for these hadoop packages related.
Backport:
- https://github.com/apache/spark/pull/26636
- https://github.com/apache/spark/pull/31036
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes#2476 from SteNicholas/CELEBORN-1380.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Split the `rpc` transport module into `rpc_app` and `rpc_service` to allow for them to be independently configured.
### Why are the changes needed?
We need the ability to independently configure communication between application components (driver/executors in spark applications) and those to/from Celeborn service (master/workers) components.
This is particularly relevant for TLS support where applications might be running with TLS disabled for their rpc services or using self-signed certificates (see CELEBORN-1354 for an example), while services would have signed certs.
### Does this PR introduce _any_ user-facing change?
Yes, it allows users to independently configure rpc env within the application and those to/from services.
Backward compatibility is maintained - and so existing `rpc` is the fallback in case `rpc_app` or `rpc_service` config is not found.
### How was this patch tested?
Unit tests were enhanced, existing tests pass.
Closes#2460 from mridulm/split_rpc_module-retry1.
Lead-authored-by: Mridul Muralidharan <mridul@gmail.com>
Co-authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
`LevelDBProvider`/`RocksDBProvider` creates non-existent multi-level directory for LevelDB/RocksDB initialization.
### Why are the changes needed?
`RocksDBProvider` creates database if missing via `Options#setCreateIfMissing` when initializing RocksDB at present, which causes the following exception when `dbFile` is non-existent multi-level directory.
```
2024-04-09T03:19:35.6807077Z 24/04/09 03:19:35,679 ERROR [pool-1-thread-1-ScalaTest-running-StorageManagerSuite] RocksDBProvider: error opening rocksdb file /tmp/recover/recovery.rdb. Creating new file, will not be able to recover state for existing applications
2024-04-09T03:19:35.6810066Z org.rocksdb.RocksDBException: While mkdir if missing: /tmp/recover/recovery.rdb: No such file or directory
2024-04-09T03:19:35.6811303Z at org.rocksdb.RocksDB.open(Native Method)
2024-04-09T03:19:35.6812052Z at org.rocksdb.RocksDB.open(RocksDB.java:259)
2024-04-09T03:19:35.6813431Z at org.apache.celeborn.service.deploy.worker.shuffledb.RocksDBProvider.initRockDB(RocksDBProvider.java:66)
2024-04-09T03:19:35.6815230Z at org.apache.celeborn.service.deploy.worker.shuffledb.DBProvider.initDB(DBProvider.java:39)
2024-04-09T03:19:35.6816975Z at org.apache.celeborn.service.deploy.worker.storage.StorageManager.<init>(StorageManager.scala:216)
2024-04-09T03:19:35.6818904Z at org.apache.celeborn.service.deploy.worker.storage.StorageManagerSuite.$anonfun$new$1(StorageManagerSuite.scala:30)
2024-04-09T03:19:35.6820538Z at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
2024-04-09T03:19:35.6821620Z at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
2024-04-09T03:19:35.6822585Z at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
2024-04-09T03:19:35.6823948Z at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
2024-04-09T03:19:35.6824908Z at org.scalatest.Transformer.apply(Transformer.scala:22)
2024-04-09T03:19:35.6825862Z at org.scalatest.Transformer.apply(Transformer.scala:20)
2024-04-09T03:19:35.6827073Z at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
2024-04-09T03:19:35.6828439Z at org.apache.celeborn.CelebornFunSuite.withFixture(CelebornFunSuite.scala:157)
2024-04-09T03:19:35.6829909Z at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
2024-04-09T03:19:35.6831386Z at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
2024-04-09T03:19:35.6832590Z at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
2024-04-09T03:19:35.6833727Z at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
2024-04-09T03:19:35.6835034Z at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
2024-04-09T03:19:35.6836660Z at org.apache.celeborn.CelebornFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(CelebornFunSuite.scala:35)
2024-04-09T03:19:35.6838253Z at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
2024-04-09T03:19:35.6839512Z at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
2024-04-09T03:19:35.6840766Z at org.apache.celeborn.CelebornFunSuite.runTest(CelebornFunSuite.scala:35)
2024-04-09T03:19:35.6842131Z at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
2024-04-09T03:19:35.6843459Z at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
2024-04-09T03:19:35.6844543Z at scala.collection.immutable.List.foreach(List.scala:431)
2024-04-09T03:19:35.6845566Z at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
2024-04-09T03:19:35.6846677Z at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
2024-04-09T03:19:35.6847722Z at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
2024-04-09T03:19:35.6849045Z at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
2024-04-09T03:19:35.6850358Z at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
2024-04-09T03:19:35.6851608Z at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
2024-04-09T03:19:35.6852566Z at org.scalatest.Suite.run(Suite.scala:1114)
2024-04-09T03:19:35.6853295Z at org.scalatest.Suite.run$(Suite.scala:1096)
2024-04-09T03:19:35.6854857Z at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
2024-04-09T03:19:35.6856472Z at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
2024-04-09T03:19:35.6857654Z at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
2024-04-09T03:19:35.6858737Z at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
2024-04-09T03:19:35.6859974Z at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
2024-04-09T03:19:35.6861519Z at org.apache.celeborn.CelebornFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(CelebornFunSuite.scala:35)
2024-04-09T03:19:35.6863041Z at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
2024-04-09T03:19:35.6864233Z at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
2024-04-09T03:19:35.6865355Z at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
2024-04-09T03:19:35.6866487Z at org.apache.celeborn.CelebornFunSuite.run(CelebornFunSuite.scala:35)
2024-04-09T03:19:35.6867764Z at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
2024-04-09T03:19:35.6869119Z at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
2024-04-09T03:19:35.6870146Z at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
2024-04-09T03:19:35.6871069Z at java.util.concurrent.FutureTask.run(FutureTask.java:266)
2024-04-09T03:19:35.6872490Z at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2024-04-09T03:19:35.6873824Z at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2024-04-09T03:19:35.6874805Z at java.lang.Thread.run(Thread.java:750)
2024-04-09T03:19:35.6887377Z 24/04/09 03:19:35,687 ERROR [pool-1-thread-1-ScalaTest-running-StorageManagerSuite] StorageManager: Init level DB failed:
2024-04-09T03:19:35.6889076Z java.io.IOException: Unable to create state store
2024-04-09T03:19:35.6890473Z at org.apache.celeborn.service.deploy.worker.shuffledb.RocksDBProvider.initRockDB(RocksDBProvider.java:98)
2024-04-09T03:19:35.6894605Z at org.apache.celeborn.service.deploy.worker.shuffledb.DBProvider.initDB(DBProvider.java:39)
2024-04-09T03:19:35.6904452Z at org.apache.celeborn.service.deploy.worker.storage.StorageManager.<init>(StorageManager.scala:216)
2024-04-09T03:19:35.6936013Z at org.apache.celeborn.service.deploy.worker.storage.StorageManagerSuite.$anonfun$new$1(StorageManagerSuite.scala:30)
2024-04-09T03:19:35.6937634Z at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
2024-04-09T03:19:35.6938639Z at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
2024-04-09T03:19:35.6939493Z at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
2024-04-09T03:19:35.6940348Z at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
2024-04-09T03:19:35.6941200Z at org.scalatest.Transformer.apply(Transformer.scala:22)
2024-04-09T03:19:35.6942029Z at org.scalatest.Transformer.apply(Transformer.scala:20)
2024-04-09T03:19:35.6943079Z at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
2024-04-09T03:19:35.6944350Z at org.apache.celeborn.CelebornFunSuite.withFixture(CelebornFunSuite.scala:157)
2024-04-09T03:19:35.6945683Z at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
2024-04-09T03:19:35.6947057Z at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
2024-04-09T03:19:35.6948181Z at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
2024-04-09T03:19:35.6949222Z at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
2024-04-09T03:19:35.6950415Z at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
2024-04-09T03:19:35.6951915Z at org.apache.celeborn.CelebornFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(CelebornFunSuite.scala:35)
2024-04-09T03:19:35.6953391Z at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
2024-04-09T03:19:35.6954811Z at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
2024-04-09T03:19:35.6955990Z at org.apache.celeborn.CelebornFunSuite.runTest(CelebornFunSuite.scala:35)
2024-04-09T03:19:35.6957249Z at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
2024-04-09T03:19:35.6958473Z at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
2024-04-09T03:19:35.6959465Z at scala.collection.immutable.List.foreach(List.scala:431)
2024-04-09T03:19:35.6960422Z at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
2024-04-09T03:19:35.6961411Z at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
2024-04-09T03:19:35.6962347Z at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
2024-04-09T03:19:35.6963404Z at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
2024-04-09T03:19:35.6964635Z at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
2024-04-09T03:19:35.6965797Z at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
2024-04-09T03:19:35.6966679Z at org.scalatest.Suite.run(Suite.scala:1114)
2024-04-09T03:19:35.6967341Z at org.scalatest.Suite.run$(Suite.scala:1096)
2024-04-09T03:19:35.6968835Z at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
2024-04-09T03:19:35.6970329Z at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
2024-04-09T03:19:35.6971604Z at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
2024-04-09T03:19:35.6972573Z at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
2024-04-09T03:19:35.6973695Z at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
2024-04-09T03:19:35.6975279Z at org.apache.celeborn.CelebornFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(CelebornFunSuite.scala:35)
2024-04-09T03:19:35.6976986Z at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
2024-04-09T03:19:35.6978212Z at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
2024-04-09T03:19:35.6979969Z at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
2024-04-09T03:19:35.6982049Z at org.apache.celeborn.CelebornFunSuite.run(CelebornFunSuite.scala:35)
2024-04-09T03:19:35.6982851Z at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
2024-04-09T03:19:35.6983608Z at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
2024-04-09T03:19:35.6984185Z at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
2024-04-09T03:19:35.6984688Z at java.util.concurrent.FutureTask.run(FutureTask.java:266)
2024-04-09T03:19:35.6985336Z at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2024-04-09T03:19:35.6986127Z at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2024-04-09T03:19:35.6986690Z at java.lang.Thread.run(Thread.java:750)
2024-04-09T03:19:35.6987383Z Caused by: org.rocksdb.RocksDBException: While mkdir if missing: /tmp/recover/recovery.rdb: No such file or directory
2024-04-09T03:19:35.6988110Z at org.rocksdb.RocksDB.open(Native Method)
2024-04-09T03:19:35.6988520Z at org.rocksdb.RocksDB.open(RocksDB.java:259)
2024-04-09T03:19:35.6989257Z at org.apache.celeborn.service.deploy.worker.shuffledb.RocksDBProvider.initRockDB(RocksDBProvider.java:96)
2024-04-09T03:19:35.6989929Z ... 48 more
```
Because `mkdir` does not support creating non-existent multi-level directory, `CreateDirIfMissing` does not support creation of non-existent multi-level directory in [CreateDirIfMissing](https://github.com/facebook/rocksdb/blob/main/env/fs_posix.cc#L637). Meanwhile, `CreateDir` also does not support the creation in [CreateDir](https://github.com/google/leveldb/blob/main/util/env_posix.cc#L625). Therefore `LevelDBProvider`/`RocksDBProvider` should create non-existent multi-level directory for LevelDB/RocksDB initialization.
```
IOStatus CreateDirIfMissing(const std::string& name,
const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
if (mkdir(name.c_str(), 0755) != 0) {
if (errno != EEXIST) {
return IOError("While mkdir if missing", name, errno);
} else if (!DirExists(name)) { // Check that name is actually a
// directory.
// Message is taken from mkdir
return IOStatus::IOError("`" + name +
"' exists but is not a directory");
}
}
return IOStatus::OK();
}
```
```
Status CreateDir(const std::string& dirname) override {
if (::mkdir(dirname.c_str(), 0755) != 0) {
return PosixError(dirname, errno);
}
return Status::OK();
}
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`DBProviderSuiteJ#testRockDBCheckVersionFailed`
Closes#2458 from SteNicholas/CELEBORN-1386.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Catch throwable and release unused buffers.
### Why are the changes needed?
When the BufferAllocator tries to allocate a new ByteBuf, it may throw an OutOfDirectMemoryError. We should catch Throwable in ReadBufferDispatcher to avoid the thread from exiting.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes#2452 from RexXiong/CELEBORN-1379.
Lead-authored-by: lvshuang.xjs <lvshuang.xjs@taobao.com>
Co-authored-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Log celeborn config for debugging purposes.
### Why are the changes needed?
Help with debugging
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
tested the patch internally.
Closes#2442 from akpatnam25/CELEBORN-1368.
Authored-by: Aravind Patnam <apatnam@linkedin.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Introduce `ActiveSlots` metric to represent the disk resource demand currently in the cluster.
### Why are the changes needed?
It's recommended to introduce `ActiveSlots` metric to represent the disk resource demand currently in the cluster.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
In our test cluster (we can see the value of activeSlots increases and then back to 0 after the application finished, and slotsAllocated is increasing all the way).

Closes#2386 from CodingCat/slots_decrease.
Lead-authored-by: CodingCat <zhunansjtu@gmail.com>
Co-authored-by: Nan Zhu <CodingCat@users.noreply.github.com>
Co-authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Add SSLFactory, and wire up TLS support with rest of Celeborn to enable secure over the wire communication.
### Why are the changes needed?
Add support for TLS to secure wire communication.
This is the last PR to add basic support for TLS.
There will be a follow up for CELEBORN-1356 and documentation ofcourse !
### Does this PR introduce _any_ user-facing change?
Yes, completes basic support for TLS in Celeborn.
### How was this patch tested?
Existing tests, augmented with additional unit tests.
Closes#2438 from mridulm/add-sslfactory-and-related-changes.
Authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
This ensures that an authenticated client does not update the metadata belonging to another application.
### Why are the changes needed?
The changes are needed for authentication support.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Closes#2441 from otterc/CELEBORN-1365.
Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
This ensures that an authenticated client is not trying to push or fetch data which belongs to another application.
### Why are the changes needed?
The changes are needed for authentication support.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Closes#2431 from otterc/CELEBORN-1360.
Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
To fix the UT for http server port already in use issue.
For Jetty HttpServer, if failed to bind port, the exception is IOException and the cause is BindException, we should retry for that.
Before:
```
case e: BindException => // retry to setup mini cluster
```
Now:
```
case e: IOException
if e.isInstanceOf[BindException] || Option(e.getCause).exists(
_.isInstanceOf[BindException]) => // retry to setup mini cluster
```
### Why are the changes needed?
To fix the UT for http server port already in use issue.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Will trigger GA for 3 three times.
Closes#2424 from turboFei/set_connector_stop_timeout.
Authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Before, there is no http request spec likes query param, http method and response mediaType.
And for each api, a HttpEndpoint class is needed.
In this PR, we refine the code for http service and provide swagger ui.
Note that: This pr does not change the orignal api request and response behavior, including metrics APIs.
TODO:
1. define DTO
2. http request authentication
<img width="1900" alt="image" src="https://github.com/apache/incubator-celeborn/assets/6757692/7f8c2363-170d-4bdf-b2c9-74260e31d3e5">
<img width="1138" alt="image" src="https://github.com/apache/incubator-celeborn/assets/6757692/3ae6ec8e-00a8-475b-bb37-0329536185f6">
### Why are the changes needed?
To close CELEBORN-1317
### Does this PR introduce _any_ user-facing change?
The api is align with before.
### How was this patch tested?
UT.
Closes#2371 from turboFei/jetty.
Authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Batch OpenStream RPCs by Worker to avoid too many RPCs.
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Passes GA and Manual tests.
Closes#2362 from waitinfuture/1144.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Introduce JVM profiling `JVMProfier` in Celeborn Worker using async-profiler to capture CPU and memory profiles.
### Why are the changes needed?
[async-profiler](https://github.com/async-profiler) is a sampling profiler for any JDK based on the HotSpot JVM that does not suffer from Safepoint bias problem. It has low overhead and doesn’t rely on JVMTI. It avoids the safepoint bias problem by using the `AsyncGetCallTrace` API provided by HotSpot JVM to profile the Java code paths, and Linux’s perf_events to profile the native code paths. It features HotSpot-specific APIs to collect stack traces and to track memory allocations.
The feature introduces a profier plugin that does not add any overhead unless enabled and can be configured to accept profiler arguments as a configuration parameter. It should support to turn profiling on/off, includes the jar/binaries needed for profiling.
Backport [[SPARK-46094] Support Executor JVM Profiling](https://github.com/apache/spark/pull/44021).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Worker cluster test.
Closes#2409 from SteNicholas/CELEBORN-1299.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
To fix a bug that might cause persisted committed file info lost.
### Why are the changes needed?
A worker starts will clean its persisted committed file info and won't put back if this worker restart again, the committed file infos will lost.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA.
Closes#2390 from FMX/b863-1.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Enable custom network location aware replication, based on a custom impl of `DNSToSwitchMapping`.
### Why are the changes needed?
Resolution of network location of multiple workers at master can be expensive at times. This way, each worker resolves its own network location and sends to master via the RegisterWorker transport message. If worker cannot resolve, fallback to attempting to resolve at master (during update meta or reload of snapshot). Proposal: [Celeborn Custom Network Location Aware Replication](https://docs.google.com/document/d/11M_MKKnIXCTExJHMX-OMTq7SBpkl8fJMlpy8hLgmev0/edit#heading=h.s3vnydz589z5)
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Updated the unit tests.
Closes#2367 from akpatnam25/CELEBORN-1313.
Authored-by: Aravind Patnam <apatnam@linkedin.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
This enables a Celeborn Worker to retrieve the application meta from the Master if it hasn't received the secret from the Master before the application attempts to connect to it. Additionally, the Celeborn Worker's SecretRegistry has been converted into an LRU cache to prevent unbounded growth of the registry.
### Why are the changes needed?
This is last change needed for Auth support in Celeborn (https://issues.apache.org/jira/browse/CELEBORN-1011)
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added UTs and part of a bigger change which will be tested end-to-end.
Closes#2363 from otterc/CELEBORN-1179.
Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Override `toString` method for `StoreVersion`.
### Why are the changes needed?
Avoid displaying `StoreVersionhashCode` in the `IOException` thrown after the checkVersion check fails in `RocksDBProvider`/`LevelDBProvider`, show something like:
```
cannot read state DB with version org.apache.celeborn.service.deploy.worker.shuffledb.StoreVersion1f, incompatible with current version org.apache.celeborn.service.deploy.worker.shuffledb.StoreVersion3e
```
Backport https://github.com/apache/spark/pull/44624.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- `DBProviderSuiteJ`
Closes#2372 from SteNicholas/CELEBORN-1316.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
When we shut down the master or worker, we can output the signal as a record.
### Why are the changes needed?
Conveniently track the status of master and workers.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
local test
```bash
./sbin/stop-all.sh
```
```
12:20:59.932 [SIGTERM handler] ERROR org.apache.celeborn.service.deploy.master.Master - RECEIVED SIGNAL TERM
```
```
12:20:59.563 [SIGTERM handler] ERROR org.apache.celeborn.service.deploy.worker.Worker - RECEIVED SIGNAL TERM
```
Closes#2334 from cxzl25/CELEBORN-1293.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Should close the `RocksDB`/`LevelDB` instance when `checkVersion` throw Exception.
Backport [[SPARK-46389][CORE] Manually close the RocksDB/LevelDB instance when checkVersion throw Exception](https://github.com/apache/spark/pull/44327).
### Why are the changes needed?
In the process of initializing the DB in `RocksDBProvider`/`LevelDBProvider`, there is a `checkVersion` step that may throw an exception. After the exception is thrown, the upper-level caller cannot hold the already opened RockDB/LevelDB instance, so it cannot perform resource cleanup, which poses a potential risk of handle leakage. So this PR manually closes the `RocksDB`/`LevelDB` instance when `checkVersion` throws an exception.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#2369 from SteNicholas/CELEBORN-1315.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Fix `FetchHandler#handleEndStreamFromClient` `NullPointerException` after recycling stream for `streams` which removes the corresponding `streamId`.
### Why are the changes needed?
`FetchHandler#handleEndStreamFromClient` needs to get the shuffle key to record application active connection. But after recycling stream, `FetchHandler#handleEndStreamFromClient` may cause `NullPointerException`. Because `recycleStream` may be invoked in `MapPartitionDataReader`, which causes that the corresponding `streamId` is removed before `FetchHandler#handleEndStreamFromClient`.
```
24/03/05 13:27:14,522 DEBUG [worker-credit-stream-manager-recycler] MapDataPartition: release all for stream: 990159671000
24/03/05 13:27:14,524 DEBUG [worker-credit-stream-manager-recycler] MapDataPartition: release map data partition FileInfo{file=/data00/home/guoyangze/data/celeborn-worker/shuffle_data/1709616425086-343fe33c97559405b474412efc0d9ce5/0/0-0-0, chunkOffsets=0, userIdentifier=`default`.`default`, partitionType=MAP}
24/03/05 13:27:14,531 ERROR [fetch-server-11-1] TransportRequestHandler: Error while invoking handler#receive() on RPC id 18
java.lang.NullPointerException
at org.apache.celeborn.service.deploy.worker.storage.CreditStreamManager.getStreamShuffleKey(CreditStreamManager.java:189)
at org.apache.celeborn.service.deploy.worker.FetchHandler.handleEndStreamFromClient(FetchHandler.scala:369)
at org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:143)
at org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:97)
at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:96)
at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#2356 from SteNicholas/CELEBORN-1282.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
This enables Celeborn Master to persist application meta in Ratis and also push it to Celeborn Workers when it receives the requests for slots from the LifecycleManager.
### Why are the changes needed?
This change is required for adding authentication. ([CELEBORN-1011](https://issues.apache.org/jira/browse/CELEBORN-1011)).
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added some UTs.
Closes#2346 from otterc/CELEBORN-1234.
Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
This adds an internal port and auth support to Celeborn Wokers.
1. Internal port is used by a worker to receive messages from Celeborn Master.
2. Authentication support for secure communication with clients. This change doesn't add the support in clients to communicate to the Workers securely. That will be in a future change.
This change targets just adding the port and auth support to Worker. The following items from the proposal are still pending:
- Persisting the app secrets in Ratis.
- Forwarding secrets to Workers and having ability for the workers to pull registration info from the Master.
- Secured communication between workers and clients.
### Why are the changes needed?
It is needed for adding authentication support to Celeborn ([CELEBORN-1011](https://issues.apache.org/jira/browse/CELEBORN-1011))
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Part of a bigger change. For this change, only modified existing UTs.
Closes#2292 from otterc/CELEBORN-1256.
Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: waitinfuture <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Master side calculate sub resource consumption occupy cpu cause rpc time out and miss prometheus metrics
<img width="1781" alt="截屏2024-02-28 12 04 19" src="https://github.com/apache/incubator-celeborn/assets/46485123/ba49a4ac-ec49-4234-8758-c0db9242abf6">
Worker side generate too much metrics data
### Why are the changes needed?
Fix performance issue
### Does this PR introduce _any_ user-facing change?
Remove app level metrics
### How was this patch tested?
Closes#2342 from AngersZhuuuu/CELEBORN-1292.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
This PR addressed a NPE issue occurs when the `Worker#reigstered` member is accessed before it is initialized.
The problem occurs because the `TransportChannelHandler` might be served before the worker is registered.
```
24/02/01 15:07:32,090 WARN [push-server-6-6] TransportChannelHandler: Exception in connection from /xx.xx.xx.xx:xxx
java.lang.NullPointerException
at org.apache.celeborn.service.deploy.worker.PushDataHandler.checkRegistered(PushDataHandler.scala:714)
at org.apache.celeborn.common.network.server.TransportRequestHandler.checkRegistered(TransportRequestHandler.java:82)
at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:76)
at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:151)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes#2274 from cfmcgrady/check-registered.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
### What changes were proposed in this pull request?
Optimize the handling of exceptions during the push of replica data, now only throwing PUSH_DATA_CONNECTION_EXCEPTION_REPLICA in specific scenarios.
### Why are the changes needed?
When handling exceptions related to pushing replica data in the worker, unmatched exceptions, such as 'file already closed,' are uniformly transformed into REPLICATE_DATA_CONNECTION_EXCEPTION_COUNT and returned to the client. The client then excludes the peer node based on this count, which may not be appropriate in certain scenarios. For instance, in the case of an exception like 'file already closed,' it typically occurs during multiple splits and commitFile operations. Excluding a large number of nodes under such circumstances is clearly not in line with expectations.

### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
through exist uts
Closes#2323 from lyy-pineapple/CELEBORN-1282.
Authored-by: liangyongyuan <liangyongyuan@xiaomi.com>
Signed-off-by: waitinfuture <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Change the default value of `celeborn.worker.graceful.shutdown.recoverDbBackend` from `LEVELDB` to `ROCKSDB`.
### Why are the changes needed?
Because the LevelDB support will be removed, the default value of `celeborn.worker.graceful.shutdown.recoverDbBackend` could be changed to ROCKSDB instead of LEVELDB for preparation of LevelDB deprecation.
Backport:
[[SPARK-45351][CORE] Change spark.shuffle.service.db.backend default value to ROCKSDB](https://github.com/apache/spark/pull/43142)
[[SPARK-45413][CORE] Add warning for prepare drop LevelDB support](https://github.com/apache/spark/pull/43217)
### Does this PR introduce _any_ user-facing change?
The default value of `celeborn.worker.graceful.shutdown.recoverDbBackend` is changed from `LEVELDB` to `ROCKSDB`.
### How was this patch tested?
No.
Closes#2320 from SteNicholas/CELEBORN-1280.
Lead-authored-by: SteNicholas <programgeek@163.com>
Co-authored-by: Nicholas Jiang <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Rename `celeborn.worker.sortPartition.reservedMemory.enabled` to `celeborn.worker.sortPartition.prefetch.enabled`. Address [r1469066327](https://github.com/apache/incubator-celeborn/pull/2264/files#r1469066327) of pan3793.
### Why are the changes needed?
`celeborn.worker.sortPartition.reservedMemory.enabled` is misleading, which should represent that prefetch the original partition files during the first sequential reading path to leverage the Linux PageCache mechanism to speed up the subsequent random reading of them. The config name could use `celeborn.worker.sortPartition.prefetch.enabled` which is is more accurate.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes#2312 from SteNicholas/CELEBORN-1254.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
In this pr, when getting device disk info, we check the dir writable to make sure that the capacity reported to celeborn master are correct and does no include non-writable directories.
### Why are the changes needed?
To ignore bad disk when initializing the worker.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UT.
Closes#2233 from turboFei/check_disk_init.
Lead-authored-by: Fei Wang <fwang12@ebay.com>
Co-authored-by: Keyong Zhou <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
This adds a secured port to Celeborn Master which is used for secure communication with LifecycleManager.
This is part of adding authentication support in Celeborn (see CELEBORN-1011).
This change targets just adding the secured port to Master. The following items from the proposal are still pending:
1. Persisting the app secrets in Ratis.
2. Forwarding secrets to Workers and having ability for the workers to pull registration info from the Master.
3. Secured and internal port in Workers.
4. Secured communication between workers and clients.
In addition, since we are supporting both secured and unsecured communication for backward compatibility and seamless rolling upgrades, there is an additional change needed. An app which registers with the Master can try to talk to the workers on unsecured ports which is a security breach. So, the workers need to know whether an app registered with Master or not and for that Master has to propagate list of un-secured apps to Celeborn workers as well. We can discuss this more with https://issues.apache.org/jira/browse/CELEBORN-1261
### Why are the changes needed?
It is needed for adding authentication support to Celeborn (CELEBORN-1011)
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Added a simple UT.
Closes#2281 from otterc/CELEBORN-1257.
Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Fix `Worker#computeResourceConsumption` `NullPointerException` for `userResourceConsumption` that does not contain given `userIdentifier`.
### Why are the changes needed?
When `userResourceConsumption` of `workerInfo` does not contain given `userIdentifier`, `Worker#computeResourceConsumption` causes `NullPointerException` for worker dimension resource consumption metrics.
```
24/02/05 17:36:15,983 ERROR [worker-forward-message-scheduler] Utils: Uncaught exception in thread worker-forward-message-scheduler
java.lang.NullPointerException
at org.apache.celeborn.service.deploy.worker.Worker.$anonfun$gaugeResourceConsumption$1(Worker.scala:555)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at org.apache.celeborn.common.metrics.source.GaugeSupplier$$anon$3.getValue(AbstractSource.scala:453)
at org.apache.celeborn.common.metrics.source.AbstractSource.addGauge(AbstractSource.scala:79)
at org.apache.celeborn.common.metrics.source.AbstractSource.addGauge(AbstractSource.scala:99)
at org.apache.celeborn.service.deploy.worker.Worker.gaugeResourceConsumption(Worker.scala:554)
at org.apache.celeborn.service.deploy.worker.Worker.$anonfun$handleResourceConsumption$1(Worker.scala:537)
at org.apache.celeborn.service.deploy.worker.Worker.$anonfun$handleResourceConsumption$1$adapted(Worker.scala:536)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
at org.apache.celeborn.service.deploy.worker.Worker.handleResourceConsumption(Worker.scala:536)
at org.apache.celeborn.service.deploy.worker.Worker.org$apache$celeborn$service$deploy$worker$Worker$$heartbeatToMaster(Worker.scala:362)
at org.apache.celeborn.service.deploy.worker.Worker$$anon$1.$anonfun$run$1(Worker.scala:395)
at org.apache.celeborn.common.util.Utils$.tryLogNonFatalError(Utils.scala:230)
at org.apache.celeborn.service.deploy.worker.Worker$$anon$1.run(Worker.scala:395)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA and cluster.
Closes#2288 from SteNicholas/CELEBORN-1252.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Fix `Worker#computeResourceConsumption` `NullPointerException` with null `subResourceConsumptions`.
### Why are the changes needed?
With null `subResourceConsumptions`, `Worker#computeResourceConsumption` causes `NullPointerException` for application dimension resource consumption metrics.
```
24/02/04 13:58:13,757 ERROR [worker-forward-message-scheduler] Utils: Uncaught exception in thread worker-forward-message-scheduler
java.lang.NullPointerException
at org.apache.celeborn.service.deploy.worker.Worker.computeResourceConsumption(Worker.scala:581)
at org.apache.celeborn.service.deploy.worker.Worker.$anonfun$gaugeResourceConsumption$1(Worker.scala:555)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at org.apache.celeborn.common.metrics.source.GaugeSupplier$$anon$3.getValue(AbstractSource.scala:453)
at org.apache.celeborn.common.metrics.source.AbstractSource.addGauge(AbstractSource.scala:79)
at org.apache.celeborn.common.metrics.source.AbstractSource.addGauge(AbstractSource.scala:99)
at org.apache.celeborn.service.deploy.worker.Worker.gaugeResourceConsumption(Worker.scala:554)
at org.apache.celeborn.service.deploy.worker.Worker.$anonfun$handleResourceConsumption$1(Worker.scala:537)
at org.apache.celeborn.service.deploy.worker.Worker.$anonfun$handleResourceConsumption$1$adapted(Worker.scala:536)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
at org.apache.celeborn.service.deploy.worker.Worker.handleResourceConsumption(Worker.scala:536)
at org.apache.celeborn.service.deploy.worker.Worker.org$apache$celeborn$service$deploy$worker$Worker$$heartbeatToMaster(Worker.scala:362)
at org.apache.celeborn.service.deploy.worker.Worker$$anon$1.$anonfun$run$1(Worker.scala:395)
at org.apache.celeborn.common.util.Utils$.tryLogNonFatalError(Utils.scala:230)
at org.apache.celeborn.service.deploy.worker.Worker$$anon$1.run(Worker.scala:395)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA and cluster.
Closes#2286 from SteNicholas/CELEBORN-1174.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
`WorkerSource` supports application dimension `ActiveConnectionCount` metric to record the number of registered connections for each application.
### Why are the changes needed?
`ActiveConnectionCount` metric records the number of registered connections at present. It's recommended to support dimension ActiveConnectionCount metric to record the number of registered connections for each application in Worker. Application dimension `ActiveConnectionCount` metric could provide users with the actual number of registered connections for each application.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal tests.
Closes#2167 from SteNicholas/CELEBORN-1182.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Introduce application dimension resource consumption metrics for `ResourceConsumptionSource`.
### Why are the changes needed?
`ResourceConsumption` namespace metrics are generated for each user and they are identified using a metric tag at present. It's recommended to introduce application dimension resource consumption metrics that expose application dimension resource consumption of Master and Worker. By monitoring resource consumption in the application dimension, you can obtain the actual situation of application resource consumption.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- `WorkerInfoSuite#WorkerInfo toString output`
- `PbSerDeUtilsTest#fromAndToPbResourceConsumption`
- `MasterStateMachineSuitej#testObjSerde`
Closes#2161 from SteNicholas/CELEBORN-1174.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
1. Support Celeborn Master(Leader) to manage workers by sending event when heartbeat
2. Add Worker Status to Worker then we can know the status of the workers(such as during decommission...)
3. Add Http interface for master to handleWorkerEvent/getWorkerEvent
### Why are the changes needed?
Currently, we only support managing the status of workers on the worker side. This pr supports the master to manage the status of all workers. By sending events such as (Decommission/Graceful/Exit) when heartbeat, workers can be asynchronously execute the command from master. MeanWhile we can't know what the worker status during worker decommission so this pr add worker status to tell the exactly status of the worker.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes#2255 from RexXiong/CELEBORN-1245.
Authored-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Improve flusher's robustness.
### Why are the changes needed?
Flusher's thread might be terminated due to uncaught exceptions.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA and cluaster.
Closes#2254 from FMX/b1248.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
this PR is to improve the test implementations so that it starts test nodes in random ports instead of using the hardcoded ones
### Why are the changes needed?
currently the test nodes are started in the hard coded ports, this prevents to run multiple builds in the same CI/CD server (which is not uncommonly seen in many companies infra)
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
it runs in our private CI/CD infra with many parallel builds very well
Closes#2237 from CodingCat/enhance_test.
Authored-by: CodingCat <zhunansjtu@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Introduce `celeborn.worker.sortPartition.reservedMemory.enabled` to support that `PartitionFilesSorter` seeks to position of each block and does not warmup for non-hdfs files.
### Why are the changes needed?
File sorting includes three steps: reading files, sorting MapIds, and writing files. The default block of Celeborn is 256k, and the number of blocks is about 1000, so the sorting process is very fast, and the main overhead is file reading and writing. There are roughly three options for the entire sorting process:
1. Memory of the file size is allocated in advance, the file is read in as a whole, MapId is parsed and sorted, and Blocks are written back to the disk in MapId order.
2. No memory is allocated, seek to the location of each block, parse and sort the MapId, and transfer the Blocks of the original file to the new file in the order of MapId.
3. Allocate a small block of memory (such as 256k), read the entire file sequentially, parse and sort the MapId, and transfer the block of the original file to the new file in the order of MapId.
From an IO perspective, at first glance, solution 1 uses sufficient memory and there is no sequential reading and writing; solution 2 has random reading and random writing; solution 3 has sequential writing. Intuitively solution 1 has better performance. Due to the existence of PageCache, when writing a file in solution 3, the original file is likely to be cached in PageCache. `PartitionFilesSorter` support solution3 with PageCache at present, which has better performance especially HDD disk. It's better to support solution2 with switch config that seeks to position of each block and does not warm up for non-hdfs files especially SDD disk.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA and cluster.
Closes#2264 from SteNicholas/CELEBORN-1254.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Resource consumption of worker does not update when update interval of resource consumpution is greater than heartbeat interval.
<img width="1741" alt="截屏2024-01-24 14 49 50" src="https://github.com/apache/incubator-celeborn/assets/46485123/21cfd412-c69e-4955-8bc8-155ee470697d">
This pull request introduces below changes:
1. Avoid master repeat add gauge for same user
2. For worker, user resource consumption can directly get from worker's snapshot, didn't need update interval
### Why are the changes needed?
No.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes#2260 from AngersZhuuuu/CELEBORN-1252.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Introduce `OpenStreamSuccessCount`, `FetchChunkSuccessCount` and `WriteDataSuccessCount` metric to expose the count of opening stream, fetching chunk and writing data successfully in current worker.
### Why are the changes needed?
The ratio of opening stream, fetching chunk and writing data failed is important for Celeborn performance to balance the healty of cluster, which is lack of the count of opening stream, fetching chunk and writing data successfully.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes#2252 from AngersZhuuuu/CELEBORN-1246.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Unify celeborn thread name format with the following pattern:
- client: `celeborn-client-[component]-[function]er`
- service: `[master|worker]-[component]-[function]er`
- other: `celeborn-[component]-[function]er`
### Why are the changes needed?
It's recommended to unify celeborn thread name format especially client side for application.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes#2248 from AngersZhuuuu/CELEBORN-1242.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Make all single thread use standard ThreadUtils to simplify the code
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#2229 from AngersZhuuuu/CELEBORN-1226.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
deviceCheckThreadPool is only initialized when diskCheck is enabled
### Why are the changes needed?
deviceCheckThreadPool is only initialized when diskCheck is enabled
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing UTs.
Closes#2242 from leixm/issue_1238.
Authored-by: xianminglei <xianming.lei@shopee.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Refactor metrics name.
### Why are the changes needed?
Easier to understand the meaning of metrics
### Does this PR introduce _any_ user-facing change?
METRICS.md
migration.md
monitoring.md
### How was this patch tested?
Existing UTs.
Closes#2240 from leixm/metrics_name.
Authored-by: xianminglei <xianming.lei@shopee.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
I tested 1T TPCDS with the following Celeborn 8-worker cluster setup:
1. Workers have fixed ports for rpc/push/replicate
2. `spark.celeborn.client.spark.fetch.throwsFetchFailure` is enabled
3. graceful shutdown is enabled
I randomly kill -9 and ./sbin/stop-worker.sh (both graceful shutdown and non-graceful shutdown) some workers and start it immediately. Then I encountered result incorrectness with low probability (1 out of 99 queries).
After digging into it, I found the reason is as follows:
1. At time T1, all workers are serving shuffle 602
2. At time T2, I run stop-worker.sh for worker2, and then run kill -9 and start worker1. Since the workers are configured with fixed ports, clients think they are OK and Master let them re-register, which will also success. And worker2 is clean in memory.
4. At time T3, push requests to worker2 fails and revives on worker1, so worker1 has reservation for shuffle 602. Then I start worker2.
5. At time T4, LifecycleManager sends CommitFiles to all workers, on worker1, it just logs that some PartitionLocations
don't exist and ignores them.
6. CommitFiles success, but worker1 loses some data before restarting, and no error happens.
The following snapshot shows the process.

This PR fixes this by treating unfound PartitionLocations as failed when handling CommitFiles.
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test
Closes#2235 from waitinfuture/1233.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
`PushDataHandler` should build replicate factory to get client for sending replicate data instead of push client factory.
### Why are the changes needed?
`PushDataHandler` uses push client factory to create client for replicating, which should use replicate factory, otherwise replicate module configuration does not take effect for replicating of worker server.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA and cluster.
Closes#2232 from SteNicholas/CELEBORN-1225.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Format the timestamp when recoding worker failure inforamtion.
### Why are the changes needed?
Now the long type timestamp is difficult to view and confuse without reading source code.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#2230 from turboFei/date_format.
Authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
To record hdfs writer in worker.
To fix a bug introduced by https://github.com/apache/incubator-celeborn/pull/2130.
### Why are the changes needed?
If the hdfs writer won't be recorded, the worker won't clean the HDFS shuffle file if a partition is broken until the master cleans the HDFS.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA and cluster.
Closes#2227 from FMX/b1222.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
takeBuffer() avoid checking source.metricsCollectCriticalEnabled twice
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#2223 from AngersZhuuuu/CELEBORN-1219.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
We meet a case that trim action stoped but didn't set trimInProcess back, then the worker won't trigger a new trim and pause push data, already pushed data(replicate data can work well) won't release, then won't recover receive push data request.
This pr make the logic more robust
### Why are the changes needed?
Make logic more robust
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No
Closes#2224 from AngersZhuuuu/CELEBORN-1220.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
`OpenStream` should register stream via `ChunkStreamManager`, which is served to obtain disk file to close stream for `ReusedExchange` operator.
Follow up #1932.
### Why are the changes needed?
`OpenStream` does not register chunk stream for reading local or dfs shuffle. Therefore `LocalPartitionReader` and `DfsPartitionReader` could not obtain the disk file from `ChunkStreamManager` that causes the below `NullPointerException` for closing stream.
```
ERROR [fetch-server-11-11] TransportRequestHandler: Error while invoking handler#receive() on RPC id 4
java.lang.NullPointerException
at org.apache.celeborn.service.deploy.worker.storage.ChunkStreamManager.getShuffleKeyAndFileName(ChunkStreamManager.java:188)
at org.apache.celeborn.service.deploy.worker.FetchHandler.handleEndStreamFromClient(FetchHandler.scala:344)
at org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:137)
at org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:94)
at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:96)
at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:151)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
```
In summary, `FetchHandler` only closes stream registered via `ChunkStreamManager`. `LocalPartitionReader` and `DfsPartitionReader` should use `ChunkStreamManager#registerStream` to close stream for deleting original unsorted disk file in `ReusedExchange` operator.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- `FetchHandlerSuiteJ#testLocalReadSortFileOnceOriginalFileBeDeleted`
- `FetchHandlerSuiteJ#testDoNotDeleteOriginalFileWhenNonRangeLocalReadWorkInProgress`
- `ReusedExchangeSuite`
Closes#2209 from SteNicholas/CELEBORN-1177.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
### What changes were proposed in this pull request?
### Why are the changes needed?
```
common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java:[110,14] [MissingOverride] getFileLength implements method in FileInfo; expected Override
common/src/main/java/org/apache/celeborn/common/meta/MapFileMeta.java:[40,38] [InconsistentCapitalization] Found the field 'numSubPartitions' with the same name as the parameter 'numSubpartitions' but with different capitalization.
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java:[164,65] [UnnecessaryParentheses] These grouping parentheses are unnecessary; it is unlikely the code will be misinterpreted without them
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes#2222 from cxzl25/CELEBORN-1133-FOLLOWUP.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Introduce `PausePushDataAndReplicateTime` metric to record time for a worker to stop receiving pushData from clients and other workers.
### Why are the changes needed?
`PausePushData` means the count for a worker to stop receiving pushData from clients because of back pressure. Meanwhile, `PausePushDataAndReplicate` means the count for a worker to stop receiving pushData from clients and other workers because of back pressure. Therefore,`PausePushDataTime` records the time for a worker to stop receiving pushData from clients or other workers, of which definition is confusing for users. It's recommended that `PausePushDataAndReplicateTime` metric is introduced that means the time for a worker to stop receiving pushData from clients and other workers because of back pressure.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- [Celeborn Dashboard](https://stenicholas.grafana.net/d/U_qgru_7z/celeborn?orgId=1&refresh=5s)
- `MemoryManagerSuite#[CELEBORN-882] Test MemoryManager check memory thread logic`
Closes#2221 from SteNicholas/CELEBORN-1215.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Introduce `WriteDataHardSplitCount` metric to record `HARD_SPLIT` partitions of PushData and PushMergedData.
### Why are the changes needed?
As the log level of `PushDataHandler#handlePushData` and `PushDataHandler#handlePushMergedData` use the DEBUG level, `WriteDataHardSplitCount` metric shoud be introduced to record HARD_SPLIT partitions of PushData and PushMergedData for `PushDataHandler`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
[Celeborn Dashboard](https://stenicholas.grafana.net/d/U_qgru_7z/celeborn?orgId=1&refresh=5s)
Closes#2217 from SteNicholas/CELEBORN-1214.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
1. let `cleanupExpiredShuffleKey` holds the same lock with add and cleaner thread
2. fix the removal during the iteration
3. simply call `condition.await()` instead of while loop with `condition.await(500ms)`
### Why are the changes needed?
The usage of `LinkedBlockingQueue queue` in `PartitionFilesCleaner` is not a typical producer-consumer model, but an order-agnostic buffer, which confuses me on the first round code reading.
Though `LinkedBlockingQueue` is a thread-safe collection, it means won't get a `ConcurrentModificationException` if the queue is modified while it is iterating, but the iteration is not guaranteed to see all queue entries. As `cleanupExpiredShuffleKey` is not guarded by the lock, elements removal from the cleaner thread may break the iteration and eventually cause a memory leak.
Ref: https://stackoverflow.com/questions/37945981/concurrently-iterating-over-a-blockingqueue
Another issue is the removal inside iteration. The typical usage is
```
Iterator itr = collection.listIterator();
while (itr.hasNext()) {
if (itr.next() xxx condition) {
itr.remove();
}
}
```
The keypoint is that `itr.remove()` should be called instead of `collection.remove(x)`
### Does this PR introduce _any_ user-facing change?
I suppose there is a potential memory leak issue on the Worker without this patch.
### How was this patch tested?
Pass GA.
Closes#2205 from pan3793/CELEBORN-1210.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Introduces `ChunkStreamCount`, `OpenStreamFailCount` metrics about opening stream of `FetchHandler`:
- `WorkerSource` adds `ChunkStreamCount`, `OpenStreamFailCount` metrics.
- Corrects the grafana dashboard of `celeborn-dashboard.json`. `celeborn-dashboard.json` has been verified via [Celeborn Dashboard](https://stenicholas.grafana.net/d/U_qgru_7z/celeborn?orgId=1&refresh=5s). For example:
1. `"expr": "metrics_RunningApplicationCount_Value"`
2. Moves the panel positition of `FetchChunkFailCount` to `FetchRelatives` instead of `PushRelatives`.
3. Updates the `gridPos` of some panels.
### Why are the changes needed?
There are no any metrics about opening stream of `FetchHandler` for Celeborn Worker.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
[Celeborn Dashboard](https://stenicholas.grafana.net/d/U_qgru_7z/celeborn?orgId=1&refresh=5s)
Closes#2212 from SteNicholas/CELEBORN-1100.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Add a cache in partition sorted and limit its max size.
### Why are the changes needed?
To reduce memory consumption in partition sort by tweak the index cache.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA and cluster.
Closes#2194 from FMX/B1201.
Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Co-authored-by: Keyong Zhou <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Rename FileWriter to PartitionLocationDataWriter, add storageManager, delete fileinfo, and flusher in the constructor.
FileInfo(userIdentifier,partitionSplitEnabled,fileMeta)
– NonMemoryFileInfo(streams,filePath,storageType,bytesFlushed)
– MemoryFileInfo(length,buffer)
FileMeta
– reduceFileMeta(chunkOffsets,sorted)
– mapFileMeta(bufferSize,numSubPartitions)
### Why are the changes needed?
1. To make concepts more clear.
2. To support memory storage and HDFS slot management.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA and cluster test with worker kill.
Closes#2130 from FMX/b1133.
Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Co-authored-by: Keyong Zhou <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
MapperAttempts for a shuffle replies the `MAP_ENDED` when mapper has already been ended for receving push data or push merged data from speculative task.
Follow up #1591.
### Why are the changes needed?
When mapper has already been ended for receving push data or push merged data from speculative task, `PushDataHandler` should trigger MapEnd instead of StageEnd for worker. Meanwhile, the `ShuffleClientImpl` should handle `STAGE_ENDED` as MapEnd, otherwise causes that other tasks of the stage could not send shuffle data for data lost.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal test.
Closes#2190 from SteNicholas/CELEBORN-678.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Introduce `RunningApplicationCount` metric and `/applications` API to record running applications for Celeborn worker.
### Why are the changes needed?
`RunningApplicationCount` metrics only monitors the count of running applications in the cluster for master. Meanwhile, `/listTopDiskUsedApps` API lists the top disk usage application ids for master and worker. Therefore `RunningApplicationCount` metric and `/applications` API could be introduced to record running applications of worker.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal tests.
Closes#2172 from SteNicholas/CELEBORN-1189.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
1. Fix MissingOverride, DefaultCharset, UnnecessaryParentheses Rule
2. Exclude generated sources, FutureReturnValueIgnored, TypeParameterUnusedInFormals, UnusedVariable
### Why are the changes needed?
```
./build/make-distribution.sh --release
```
We get a lot of WARNINGs.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes#2177 from cxzl25/error_prone_patch.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
### What changes were proposed in this pull request?
I'm testing main branch and encountered the following scenario.
I run `sbin/stop-worker.sh` near simultaneously on 3 out of 6 workers, and I'm expecting the 3 workers
will soon shutdown because I enabled graceful shutdown. However, only the first worker I stopped
shutdown in 15s as expected, the other two won't shutdown until shutdown timeout.
After digging into it, I found `LifecycleManager#reserveSlotsWithRetry` will reserve for the same location
twice:
1. At T1, only worker1 shutdown, pushes receive HARD_SPLIT and goes to revive
2. At T2, LifecycleManager handles revive requests in batch, and try to reallocate the locs to other workers
3. At T3, reserve to worker3 succeeds because it's not shutdown yet, but reserve to worker2 fails because it's shutdown
4. At T4, LifecycleManager will re-allocate the failed slots to other workers except worker1 and worker2. However, at this time Worker3 is also shutdown, so it fails to reserve on worker3
5. At T5, it re-allocates slots that failed to worker3. However, `getFailedPartitionLocations` will return slots allocated to worker3 in step 3, and increment the epoch to 2. At this time, worker3 has slots of epoch 1, but they will never to pushed to because newer epoch 3 is generated at the same time
6. Since the epoch 2 locs in worker3 will never be pushed to, it will never get a chance to return HARD_SPLIT, as a result it can't fast shutdown untile timeout.
This PR fixes this by destroying failed to be reserved slots in the process of `reserveSlotsWithRetry`
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual test.
Before:

After:

Closes#2163 from waitinfuture/1178.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title.
### Why are the changes needed?
As title.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Passes UTs.
Closes#2162 from waitinfuture/1175-2.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
One user reported that LifecycleManager's parmap can create huge number of threads and causes OOM.

There are four places where parmap is called:
1. When LifecycleManager commits files
2. When LifecycleManager reserves slots
3. When LifecycleManager setup connection to workers
4. When LifecycleManager call destroy slots
This PR fixes the fourth one. To be more detail, this PR eliminates `parmap` when destroying slots, and also replaces `askSync` with `ask`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test and GA.
Closes#2156 from waitinfuture/1167.
Lead-authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Co-authored-by: cxzl25 <cxzl25@users.noreply.github.com>
Co-authored-by: Keyong Zhou <waitinfuture@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Introduce `FetchChunkFailCount` metric to expose the count of fetching chunk failed in current worker.
### Why are the changes needed?
The metrics about the count of PushData or PushMergedData failed in current worker is supported at present. It's better to support `FetchChunkFailCount` metric to expose the count of fetching chunk failed in current worker.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal test.
Closes#2151 from SteNicholas/CELEBORN-1164.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
When I'm testing main branch I encountered exception below:
```
23/12/12 16:03:03,262 WARN [fetch-server-11-52] DefaultPromise: An exception was thrown by org.apache.celeborn.service.deploy.worker.FetchHandler$$anon$2.operationComplete()
io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454)
at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1383)
at io.netty.buffer.AbstractByteBuf.getInt(AbstractByteBuf.java:433)
at io.netty.buffer.ByteBufUtil.hashCode(ByteBufUtil.java:208)
at io.netty.buffer.AbstractByteBuf.hashCode(AbstractByteBuf.java:1342)
at java.util.WeakHashMap.hash(WeakHashMap.java:298)
at java.util.WeakHashMap.getEntry(WeakHashMap.java:426)
at java.util.WeakHashMap.containsKey(WeakHashMap.java:417)
at org.apache.commons.lang3.builder.ToStringStyle.isRegistered(ToStringStyle.java:207)
at org.apache.commons.lang3.builder.ToStringStyle.appendInternal(ToStringStyle.java:492)
at org.apache.commons.lang3.builder.ToStringStyle.append(ToStringStyle.java:466)
at org.apache.commons.lang3.builder.ToStringBuilder.append(ToStringBuilder.java:845)
at org.apache.celeborn.common.network.buffer.NettyManagedBuffer.toString(NettyManagedBuffer.java:82)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuffer.append(StringBuffer.java:269)
at org.apache.commons.lang3.builder.ToStringStyle.appendDetail(ToStringStyle.java:614)
at org.apache.commons.lang3.builder.ToStringStyle.appendInternal(ToStringStyle.java:579)
at org.apache.commons.lang3.builder.ToStringStyle.append(ToStringStyle.java:466)
at org.apache.commons.lang3.builder.ToStringBuilder.append(ToStringBuilder.java:845)
at org.apache.celeborn.common.network.protocol.RpcRequest.toString(RpcRequest.java:96)
at org.apache.celeborn.service.deploy.worker.FetchHandler$$anon$2.operationComplete(FetchHandler.scala:403)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625)
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105)
at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:728)
at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:283)
at io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:242)
at io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:212)
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:407)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:361)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:782)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)
```
This is because in https://github.com/apache/incubator-celeborn/pull/2123 the `release` is called in `TransportRequestHandler#processRpcRequest`, but `FetchHandler#handleChunkFetchRequest` refererences `req` in callback, which is later.
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test
Closes#2148 from waitinfuture/1162.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Currently in `BaseMessageHandler` there is a single API for receive which is used for all messages. This makes handling messages when multiple handlers are added messy.
- req.body.release() is only invoked when the handler actually process the message and not delegates it.
- every handler will have to create an instance of RpcResponseCallback for Rpc messages which is exactly the same.
Instead, releasing the message body and creating a callback for Rpc messages can be done in TransportRequestHandler. This avoids:
- code duplication related to RpcResponseCallback in every RPC handler
- every new request handler doesn't need to release the request body. It will be always be done in TransportRequestHandler.
Please note that this is how it is in Apache Spark and with Sasl Authentication, we will add a SaslRpcHandler (https://github.com/apache/incubator-celeborn/pull/2105) which wraps the underlying message handler.
### Why are the changes needed?
The changes are needed for adding authentication to Celeborn. See [CELEBORN-1011](https://issues.apache.org/jira/browse/CELEBORN-1011).
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing UTs and added some more UTs.
Closes#2123 from otterc/CELEBORN-1147.
Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Avoid NPE in DeviceMonitor#readWriteError
### Why are the changes needed?
The NullPointerException in the finally block will mask the real exception.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
manually test
Closes#2137 from jiang13021/celeborn-1154.
Authored-by: jiang13021 <jiangyanze.jyz@antgroup.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
If the user does not use prometheus to collect monitoring metrics, but rather some other ones. Using metrics in JSON format would be more user-friendly.The PR supports JSON format for metrics.
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
Metrics supports JSON format
### How was this patch tested?
Cluster test.
Closes#2089 from suizhe007/CELEBORN-1122.
Authored-by: qinrui <qr7972@gmail.com>
Signed-off-by: Shuang <lvshuang.tb@gmail.com>
### What changes were proposed in this pull request?
Introduce JVM monitoring in Celeborn Worker using JVMQuake to enable early detection of memory management issues and facilitate fast failure.
### Why are the changes needed?
When facing out-of-control memory management in Celeborn Worker we typically use JVMkill as a remedy by killing the process and generating a heap dump for post-analysis. However, even with jvmkill protection, we may still encounter issues caused by JVM running out of memory, such as repeated execution of Full GC without performing any useful work during the pause time. Since the JVM does not exhaust 100% of resources, JVMkill will not be triggered. Therefore JVMQuake is introduced to provide more granular monitoring of GC behavior, enabling early detection of memory management issues and facilitating fast failure. Refers to the principle of [jvmquake](https://github.com/Netflix-Skunkworks/jvmquake) which is a JVMTI agent that attaches to your JVM and automatically signals and kills it when the program has become unstable.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`JVMQuakeSuite`
Closes#2061 from SteNicholas/CELEBORN-1092.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
### What changes were proposed in this pull request?
1. Remove UNKNOWN_DISK from StorageInfo.
2. Enable load-aware slots allocation when there is HDFS.
### Why are the changes needed?
To support the application's config about available storage types.
### Does this PR introduce _any_ user-facing change?
no.
### How was this patch tested?
GA and Cluster.
Closes#2098 from FMX/B1081-1.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.tb@gmail.com>
### What changes were proposed in this pull request?
Currently, Celeborn uses replication to handle shuffle data lost for celeborn shuffle reader, this PR implements an alternative solution by Spark stage resubmission.
Design doc:
https://docs.google.com/document/d/1dkG6fww3g99VAb1wkphNlUES_MpngVPNg8601chmVp8/edit
### Why are the changes needed?
Spark stage resubmission uses less resources compared with replication, and some Celeborn users are also asking for it
### Does this PR introduce _any_ user-facing change?
a new config celeborn.client.fetch.throwsFetchFailure is introduced to enable this feature
### How was this patch tested?
two UTs are attached, and we also tested it in Ant Group's Dev spark cluster
Closes#1924 from ErikFang/Re-run-Spark-Stage-for-Celeborn-Shuffle-Fetch-Failure.
Lead-authored-by: Erik.fang <fmerik@gmail.com>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
ConcurrentHashMap.contains main containsValue ,not containsKey. In the current codebase, there is a misuse of the contains method in the ConcurrentHashMap class.
### Why are the changes needed?
ConcurrentHashMap.contains misuse
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No
Closes#2102 from lyy-pineapple/hashMap.
Authored-by: liangyongyuan <liangyongyuan@xiaomi.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
The changes are to ensure that the data is at least written into the flush buffer before sending a message back to the client. Earlier, the message would be sent before this happens.
### Why are the changes needed?
Changes are needed because currently the primary will send a response back to client before it is even written into the flush buffer to persist locally. We do this persist async. Additionally, this will prevent data corruption issues when data may not be present properly in primary but only on replica, but client fetches only from primary.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Will let CI run, and also tested on our internal cluster
Closes#2064 from akpatnam25/CELEBORN-1106.
Lead-authored-by: Aravind Patnam <apatnam@linkedin.com>
Co-authored-by: Aravind Patnam <akpatnam25@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Followup support `celeborn.worker.storage.disk.reserve.ratio` with `minimumUsableSize` cache in a variable instead of calculate for every pushdata.
### Why are the changes needed?
Cache `minimumUsableSize` in a variable instead of calculate for every pushdata because `DiskUtils.getMinimumUsableSize` is costly.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`SlotsAllocatorSuiteJ`
Closes#2083 from SteNicholas/CELEBORN-1110.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Currently, `WorkerInfo` is used in many places, and allocationBuckets is only used when its own workers want to collect metrics `SLOTS_ALLOCATED`. If there are lots of workers in the RSS cluster, there may be a certain amount of memory waste, each `WorkerInfo` maintain a Array\[Int](61), so remove it from `WorkerInfo`.
And refactor the metrics `SLOTS_ALLOCATED` from gauge to counter. Originally, this metrics is approximately one hour's total only if there are continuous tasks. Now refactoring it into a counter can reduce the cost of maintaining time windows, including storage and timely expiration data, etc. It can also be more flexibly transformed according to user needs on the prometheus side.
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
Yes. metrics_SlotsAllocated_Count metrics change from gauge for 1 hour to a increasing counter.
### How was this patch tested?
Cluster test.
Closes#2078 from onebox-li/improve-SlotsAllocated.
Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Support `celeborn.worker.storage.disk.reserve.ratio` to configure worker reserved ratio for each disk.
### Why are the changes needed?
`CelebornConf` supports to configure celeborn worker reserved space for each disk, which space is absolute. `CelebornConf` could support `celeborn.worker.storage.disk.reserve.ratio` to configure worker reserved ratio for each disk. The minimum usable size for each disk should be the max space between the reserved space and the space calculate via reserved ratio.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`SlotsAllocatorSuiteJ`
Closes#2071 from SteNicholas/CELEBORN-1110.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Support exclude worker manually given worker id. This worker is added into excluded workers manually.
### Why are the changes needed?
Celeborn supports to shuffle client-side fetch and push exclude workers on failure at present. It's necessary to exclude worker manually for maintaining the Celeborn cluster.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- `HttpUtilsSuite`
- `DefaultMetaSystemSuiteJ#testHandleWorkerExclude`
- `RatisMasterStatusSystemSuiteJ#testHandleWorkerExclude`
- `MasterStateMachineSuiteJ#testObjSerde`
Closes#1997 from SteNicholas/CELEBORN-448.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
1.To support `celeborn.storage.activeTypes` in Client.
2.Master will ignore slots for "UNKNOWN_DISK".
### Why are the changes needed?
Enable client application to select storage types to use.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
GA and cluster.
Closes#2045 from FMX/B1081.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.tb@gmail.com>
### What changes were proposed in this pull request?
The `cleaner` of `Worker` executes the `StorageManager#cleanupExpiredShuffleKey` to clean expired shuffle keys with daemon cached thread pool. The optimization speeds up cleaning including expired shuffle keys of ChunkManager to avoid memory leak.
### Why are the changes needed?
`ChunkManager#streams` could lead memory leak when the speed of cleanup is slower than expiration for expired shuffle of worker. The behavior that `ChunkStreamManager` cleanup expired shuffle key should be optimized to avoid memory leak, which causes that the VM thread of worker is 100%.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`WorkerSuite#clean up`.
Closes#2053 from SteNicholas/CELEBORN-1094.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Add the metric `ResourceConsumption` to monitor each user's quota usage of Celeborn Worker.
### Why are the changes needed?
The metric `ResourceConsumption` supports to monitor each user's quota usage of Celeborn Master at present. The usage of Celeborn Worker also needs to monitor.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal tests.
Closes#2059 from SteNicholas/CELEBORN-247.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Seperate `overHighWatermark` check to a dedicated thread, let this value can shared better and lighten `CongestionController#isUserCongested` logic.
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test and UT.
Closes#2041 from onebox-li/congest-check.
Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
**When a bad disk occurs, cleaning up expired shuffle keys can cause NullPointerException appearing in the thread pool obtained from `diskOperators` in `StorageManager`.
Therefore, only cleaning up expired shuffle keys from good disks will not cause the above problems.**
https://issues.apache.org/jira/browse/CELEBORN-1103
### Why are the changes needed?
bugfix
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#2060 from suizhe007/CELEBORN-1103.
Lead-authored-by: qinrui <qr7972@gmail.com>
Co-authored-by: qinrui <51885730+suizhe007@users.noreply.github.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Use `scheduleWithFixedDelay` instead of `scheduleAtFixedRate` in thread pool of Celeborn Master and Worker.
### Why are the changes needed?
Follow up #1970.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal tests.
Closes#2048 from SteNicholas/CELEBORN-1032.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Refine some TimeSlidingHub code.
### Why are the changes needed?
Ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT.
Closes#2033 from onebox-li/refine.
Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: Shuang <lvshuang.tb@gmail.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
This PR addresses a NPE issue that occurs when the `workerSource` member is accessed before it is initialized. To resolve this issue, we initialize the `workerSource` member when the handlers are created.
```
23/10/24 16:27:03,363 ERROR [fetch-server-11-1] TransportChannelHandler: Exception from request handler while channel is active
java.lang.NullPointerException
at org.apache.celeborn.service.deploy.worker.FetchHandler.channelActive(FetchHandler.scala:412)
at org.apache.celeborn.common.network.server.TransportRequestHandler.channelActive(TransportRequestHandler.java:66)
at org.apache.celeborn.common.network.server.TransportChannelHandler.channelActive(TransportChannelHandler.java:120)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:69)
at io.netty.handler.timeout.IdleStateHandler.channelActive(IdleStateHandler.java:271)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:260)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1398)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:258)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238)
at io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:895)
at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:522)
at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:429)
at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:486)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
...
23/10/24 16:27:03,423 INFO [main] Worker: Starting Worker <ip>:<port1>:<port2>:<port3> with {/data1=DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: /data1, usableSpace: 250.0 GiB, avgFlushTime: 0 ns, avgFetchTime: 0 ns, activeSlots: 0) status: HEALTHY dirs /data1/celeborn/worker/celeborn-worker/shuffle_data, /data=DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: /data, usableSpace: 250.0 GiB, avgFlushTime: 0 ns, avgFetchTime: 0 ns, activeSlots: 0) status: HEALTHY dirs /data/celeborn/worker/celeborn-worker/shuffle_data} slots.
...
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes#2034 from cfmcgrady/fix-start-worker-npe.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
Recently, while testing on the main branch, we discovered that the partition sorter task might fail with a `NoSuchFileException`, leading to the entire job's failure. Upon further investigation, we identified that the root cause of this issue is the potential addition of the same sorting task to the sorter queue multiple times.
```
23/10/22 01:02:15,334 DEBUG [worker-file-sorter-execute-9530] PartitionFilesSorter: sort complete for application_1653035898918_4284043-9975 /data1/celeborn/worker/celeborn-worker/shuffle_data/application_1653035898918_4284043/9975/0-0-0
...
23/10/22 01:02:15,335 ERROR [worker-file-sorter-execute-9532] PartitionFilesSorter: Sorting shuffle file for application_1653035898918_4284043-9975-0-0-0 /data1/celeborn/worker/cele
born-worker/shuffle_data/application_1653035898918_4284043/9975/0-0-0 failed, detail:
java.nio.file.NoSuchFileException: /data1/celeborn/worker/celeborn-worker/shuffle_data/application_1653035898918_4284043/9975/0-0-0
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:177)
at java.nio.channels.FileChannel.open(FileChannel.java:287)
at java.nio.channels.FileChannel.open(FileChannel.java:335)
at org.apache.celeborn.common.util.FileChannelUtils.openReadableFileChannel(FileChannelUtils.java:33)
at org.apache.celeborn.service.deploy.worker.storage.PartitionFilesSorter$FileSorter.initializeFiles(PartitionFilesSorter.java:641)
at org.apache.celeborn.service.deploy.worker.storage.PartitionFilesSorter$FileSorter.sort(PartitionFilesSorter.java:559)
at org.apache.celeborn.service.deploy.worker.storage.PartitionFilesSorter.lambda$null$0(PartitionFilesSorter.java:146)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
...
```
Before this PR, there was a scenario where sorter tasks for the same `fileId` could arrive after being removed from the `sorting` state, and they could be mistakenly added to the sorter queue. To address this, we moved the code block that checks the `fileId`'s status in `sorted` inside the `synchronized (sorting)` block. This change ensures that tasks are not added to the sorter queue multiple times because if a `fileId`'s sorter task has already completed and its status has been removed from `sorting`, it will definitely be present in `sorted`. This behavior is consistent with how it worked prior to version 0.3.0.
### Does this PR introduce _any_ user-facing change?
No, only bug fix
### How was this patch tested?
Pass GA
Closes#2031 from cfmcgrady/fix-no-such-file-exception.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Remove unnecessary increment index of `Master#timeoutDeadWorkers`.
### Why are the changes needed?
Increment index of `Master#timeoutDeadWorkers` is unnecessary.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes#2027 from SteNicholas/timeout-dead-workers.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Adds new metric `ActiveShuffleFileCount` about active shuffle file count of Celeborn Worker.
### Why are the changes needed?
`ActiveShuffleSize` metric report the active shuffle size of peer worker at present. Therefore, it's better to introduce `ActiveShuffleFileCount` to report the active shuffle file count of Celeborn Worker.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal tests.
Closes#2009 from SteNicholas/CELEBORN-916.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
- Minor improvement in `ChunkStreamManager.numShuffleSteams`, by replacing `.flatMap(Set::stream).count()` to `.mapToLong(Set::size).sum()`
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI tests.
Closes#2013 from bowenliang123/numShuffleSteams.
Authored-by: liangbowen <liangbowen@gf.com.cn>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Fix misc error prone reports.
As detailed in the jira, they are:
* Reference equality of boxed primitive types: see [BoxedPrimitiveEquality](https://errorprone.info/bugpattern/BoxedPrimitiveEquality)
* Calling run directly - since use is legitimate, mark it as ignore. See: [DoNotCall](https://errorprone.info/bugpattern/DoNotCall)
* `Ignore` test instead of catching `AssertionError` and ignoring it. See: [AssertionFailureIgnored](https://errorprone.info/bugpattern/AssertionFailureIgnored)
Fix misc error prone reports.
No
Unit tests
Closes#2019 from mridulm/fix-misc-issues.
Authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Ensure appropriate lock is held when accessing/mutating state - as marked with `GuardedBy`.
More [here](https://errorprone.info/bugpattern/GuardedBy).
This also fixes [DCL](https://errorprone.info/bugpattern/DoubleCheckedLocking) bugs observed.
Fix bug with locking as identified by error-prone
No
Unit tests
Closes#2018 from mridulm/fix-locking-issues-found.
Authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
This PR modifies the log level from debug to trace for the `PushDataHandler#checkDiskFullAndSplit` method, which is invoked with each `PushData` request. This change is aimed at addressing the issue of excessive log volume caused by the high frequency of `PushData` requests.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes#2007 from cfmcgrady/check-disk-full-and-split-log-level.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
The config key `celeborn.worker.sortPartition.eagerlyRemoveOriginalFiles.enabled` has become unnecessary as a result of #1932
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes#1999 from cfmcgrady/celeborn-1047.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Add a configuration "celeborn.worker.storage.expireDirs.timeout" with a default value of 6h in rsswork. This configuration is used to set the expiration time for app local directories.
https://issues.apache.org/jira/browse/CELEBORN-1046
### Why are the changes needed?
When Celeborn periodically deletes the directories of apps, it determines whether the app needs to be deleted based on the shuffleKeySet in memory. However, this method may not accurately indicate the completion of the app and could potentially lead to the unintentional deletion of shuffle data.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1998 from wilsonjie/CELEBORN-1046.
Authored-by: sunjunjie <sunjunjie@zto.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
`StreamChunkSlice`, `ChunkFetchRequest` and `TransportableError` should merge to transport messages to enhance celeborn's compatibility.
### Why are the changes needed?
1. Improves celeborn's transport flexibility to change RPC.
2. Makes Compatible with 0.2 client.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- `FetchHandlerSuiteJ`
- `RequestTimeoutIntegrationSuiteJ`
- `ChunkFetchIntegrationSuiteJ`
Closes#1982 from SteNicholas/CELEBORN-772.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.tb@gmail.com>
### What changes were proposed in this pull request?
Replace
```properties
celeborn.metrics.master.prometheus.host
celeborn.metrics.master.prometheus.port
celeborn.metrics.worker.prometheus.host
celeborn.metrics.worker.prometheus.port
```
With
```properties
celeborn.master.http.host
celeborn.master.http.port
celeborn.worker.http.host
celeborn.worker.http.port
```
### Why are the changes needed?
The `celeborn.master.metrics.prometheus.port` and `celeborn.metrics.worker.prometheus.port` bind port not only serve prometheus metrics, but also provide some useful API services.
https://celeborn.apache.org/docs/latest/monitoring/#rest-api
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1919 from cxzl25/CELEBORN-983.
Lead-authored-by: sychen <sychen@ctrip.com>
Co-authored-by: Keyong Zhou <zhouky@apache.org>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
The new added `deadlocks` metrics in `ThreadStatesGaugeSet` is a Set<String>, which is invalid. So here add a filter at the `addGauge` extrance.
### Why are the changes needed?
Ditto
### Does this PR introduce _any_ user-facing change?
Remove the unused metrics. BTW the template use `metrics_jvm_thread_deadlock_count_Value`
### How was this patch tested?
Manual test
Closes#1981 from onebox-li/fix-1037.
Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Fix some typos
### Why are the changes needed?
Ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
-
Closes#1983 from onebox-li/fix-typo.
Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1971 from cxzl25/typo.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
`celeborn.metrics.prometheus.path`
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1965 from cxzl25/CELEBORN-1028.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
1. this is developer-friendly for debugging unit tests in IntelliJ IDEA, for example: Netty's memory leak reports are logged at the error level and won't cause unit tests to be marked as fatal.
```
23/10/09 09:57:26,422 ERROR [fetch-server-52-2] ResourceLeakDetector: LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:403)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:140)
io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:120)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:150)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:750)
```
2. this won't increase console output and affect the stability of CI.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes#1958 from cfmcgrady/ut-console-log-level.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
The `ReusedExchange` operator has the potential to generate different types of fetch requests, including both non-range and range requests. Currently, an issue arises due to the synchronous deletion of the original file by the Celeborn worker upon completion of sorting. This issue leads to the failure of non-range requests following a range request for the same partition.
the snippets to reproduce this bug
```scala
val sparkConf = new SparkConf().setAppName("celeborn-test").setMaster("local[2]")
.set("spark.shuffle.manager", "org.apache.spark.shuffle.celeborn.SparkShuffleManager")
.set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", masterInfo._1.rpcEnv.address.toString)
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "100")
.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "100")
val spark = SparkSession.builder()
.config(sparkConf)
.getOrCreate()
spark.range(0, 1000, 1, 10)
.selectExpr("id as k1", "id as v1")
.createOrReplaceTempView("ta")
spark.range(0, 1000, 1, 10)
.selectExpr("id % 1 as k21", "id % 1 as k22", "id as v2")
.createOrReplaceTempView("tb")
spark.range(140)
.select(
col("id").cast("long").as("k3"),
concat(col("id").cast("string"), lit("a")).as("v3"))
.createOrReplaceTempView("tc")
spark.sql(
"""
|SELECT *
|FROM ta
|LEFT JOIN tb ON ta.k1 = tb.k21
|LEFT JOIN tc ON tb.k22 = tc.k3
|""".stripMargin)
.createOrReplaceTempView("v1")
spark.sql(
"""
|SELECT * FROM v1 WHERE v3 IS NOT NULL
|UNION
|SELECT * FROM v1
|""".stripMargin)
.collect()
```
This PR proposes a solution to address this problem. It introduces an asynchronous thread for the removal of the original file. Once the sorted file is generated for a given partition, this modification ensures that both non-range and range fetch requests will be able to and only fetch the sorted file once it is generated for a given partition.
this activity diagram of `openStream`

### Does this PR introduce _any_ user-facing change?
No, only bug fix
### How was this patch tested?
UT
Closes#1932 from cfmcgrady/fix-partition-sort-bug-v4.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
When working on reading shuffle data, the file was accidentally deleted
`2023-09-22 16:32:36,810 [storage-scheduler] INFO org.apache.celeborn.service.deploy.worker.storage.StorageManager[51]: Delete expired app dir /data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1.
2023-09-22 16:32:36,810 [Disk-cleaner-/data8-6] DEBUG org.apache.celeborn.service.deploy.worker.storage.StorageManager[47]: Deleted expired shuffle file /data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1/32/924-0-0.
2023-09-22 16:32:53,304 [fetch-server-11-31] DEBUG org.apache.celeborn.service.deploy.worker.FetchHandler[47]: Received chunk fetch request application_1689848866482_12296544_1-32 924-0-0 0 2147483647 get file info FileInfo{file=/data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1/32/924-0-0, chunkOffsets=0,558, userIdentifier=`default`.`default`, partitionType=REDUCE}
java.io.FileNotFoundException: /data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1/32/924-0-0 (No such file or directory)`
Because when cleaning up the directories of expired apps, the file directory is created first and then added to the fileInfos collection. As a result, when getting the shuffleKeySet, the running apps do not yet exist, causing the files to be mistakenly deleted.
https://issues.apache.org/jira/browse/CELEBORN-1005
### Why are the changes needed?
bugfix
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1937 from wilsonjie/CELEBORN-1005.
Lead-authored-by: sunjunjie <sunjunjie@zto.com>
Co-authored-by: junjie.sun <40379361+wilsonjie@users.noreply.github.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
`BacklogAnnouncement`, `BufferStreamEnd`, and `ReadAddCredit` should merge to transport messages to enhance celeborn's compatibility.
### Why are the changes needed?
1. Improves celeborn's transport flexibility to change RPC.
2. Makes Compatible with 0.2 client.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- `TransportFrameDecoderWithBufferSupplierSuiteJ`
Closes#1905 from SteNicholas/CELEBORN-770.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.tb@gmail.com>
### What changes were proposed in this pull request?
1. rename config key from `celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled` to `celeborn.worker.sortPartition.eagerlyRemoveOriginalFiles.enabled`
2. make this config as an internal config
### Why are the changes needed?
make the config key more clearly
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes#1934 from cfmcgrady/celeborn-988-followup.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
`PushDataHandShake`, `RegionFinish`, and `RegionStart` should merge to transport messages to enhance celeborn's compatibility.
### Why are the changes needed?
1. Improves celeborn's transport flexibility to change RPC.
2. Makes Compatible with 0.2 client.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- `RemoteShuffleOutputGateSuiteJ`
Closes#1910 from SteNicholas/CELEBORN-771.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Add exception handler when calling CelebornHadoopUtils.getHadoopFS(conf) on Master and Worker, Avoid Concealing Initialization HDFS Exception Information
### What changes were proposed in this pull request?
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1923 from leemingzixxoo/main.
Authored-by: ming.li <ming.li@dmall.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
This PR adds a new configuration option, `celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled`, allowing users to control whether the `PartitionFilesSorter` deletes the original unsorted file.
### Why are the changes needed?
https://github.com/apache/incubator-celeborn/pull/1907#issuecomment-1723420513
### Does this PR introduce _any_ user-facing change?
Users have the option to prevent the `PartitionSorter` from deleting the original unsorted file by configuring `celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled = false`.
### How was this patch tested?
Pass GA
Closes#1922 from cfmcgrady/make-delete-configurable.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
### Why are the changes needed?
LevelDB does not support mac arm version.
```java
java.lang.UnsatisfiedLinkError: Could not load library. Reasons: [no leveldbjni64-1.8 in java.library.path, no leveldbjni-1.8 in java.library.path, no leveldbjni in java.library.path, /private/var/folders/tc/r2n_8g6j4731h7clfqwntg880000gn/T/libleveldbjni-64-1-4616234670453989010.8: dlopen(/private/var/folders/tc/r2n_8g6j4731h7clfqwntg880000gn/T/libleveldbjni-64-1-4616234670453989010.8, 0x0001): tried: '/private/var/folders/tc/r2n_8g6j4731h7clfqwntg880000gn/T/libleveldbjni-64-1-4616234670453989010.8' (fat file, but missing compatible architecture (have 'x86_64,i386', need 'arm64')), '/System/Volumes/Preboot/Cryptexes/OS/private/var/folders/tc/r2n_8g6j4731h7clfqwntg880000gn/T/libleveldbjni-64-1-4616234670453989010.8' (no such file), '/private/var/folders/tc/r2n_8g6j4731h7clfqwntg880000gn/T/libleveldbjni-64-1-4616234670453989010.8' (fat file, but missing compatible architecture (have 'x86_64,i386', need 'arm64'))]
at org.fusesource.hawtjni.runtime.Library.doLoad(Library.java:182)
at org.fusesource.hawtjni.runtime.Library.load(Library.java:140)
at org.fusesource.leveldbjni.JniDBFactory.<clinit>(JniDBFactory.java:48)
at org.apache.celeborn.service.deploy.worker.shuffledb.LevelDBProvider.initLevelDB(LevelDBProvider.java:49)
at org.apache.celeborn.service.deploy.worker.shuffledb.DBProvider.initDB(DBProvider.java:30)
at org.apache.celeborn.service.deploy.worker.storage.StorageManager.<init>(StorageManager.scala:197)
at org.apache.celeborn.service.deploy.worker.Worker.<init>(Worker.scala:109)
at org.apache.celeborn.service.deploy.worker.Worker$.main(Worker.scala:734)
at org.apache.celeborn.service.deploy.worker.Worker.main(Worker.scala)
```
The released `leveldbjni-all` for `org.fusesource.leveldbjni` does not support AArch64 Linux, we need to use `org.openlabtesting.leveldbjni`.
See https://issues.apache.org/jira/browse/HADOOP-16614
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
local test
Closes#1913 from cxzl25/CELEBORN-977.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
```
celeborn.worker.graceful.shutdown.enabled=true
```
```
23/09/15 17:17:29,887 ERROR [main] Worker: Initialize worker failed.
java.lang.AssertionError: assertion failed: If enable graceful shutdown, the worker should use stable server port.
at scala.Predef$.assert(Predef.scala:223)
at org.apache.celeborn.service.deploy.worker.Worker.<init>(Worker.scala:87)
at org.apache.celeborn.service.deploy.worker.Worker$.main(Worker.scala:734)
at org.apache.celeborn.service.deploy.worker.Worker.main(Worker.scala)
```
```
23/09/15 17:51:25,937 ERROR [main] Worker: Initialize worker failed.
java.lang.AssertionError: assertion failed: If enable graceful shutdown, the worker should use non-zero port. celeborn.worker.rpc.port=0, celeborn.worker.fetch.port=9193, celeborn.worker.push.port=9192, celeborn.worker.replicate.port=9194
at scala.Predef$.assert(Predef.scala:223)
at org.apache.celeborn.service.deploy.worker.Worker.<init>(Worker.scala:91)
at org.apache.celeborn.service.deploy.worker.Worker$.main(Worker.scala:738)
at org.apache.celeborn.service.deploy.worker.Worker.main(Worker.scala)
```
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1917 from cxzl25/CELEBORN-981.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As Title
### Why are the changes needed?
As Title
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
PASS GA
Closes#1909 from jiaoqingbo/CELEBORN-962.
Authored-by: jiaoqingbo <1178404354@qq.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Add `PausePushDataTime ` Metrics
### Why are the changes needed?
Count each celeborn worker pause time.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Cluster Test
Closes#1800 from zwangsheng/CELEBORN-882.
Lead-authored-by: zwangsheng <2213335496@qq.com>
Co-authored-by: zwangsheng <binjieyang@apache.org>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Use Java API to obtain disk capacity information.
bf605c8acc/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DF.java (L84-L104)599bb77c45/jdk/src/solaris/native/java/io/UnixFileSystem_md.c (L439-L467)
### Why are the changes needed?
Some OS does not support the `df -B1` command, and the worker will throw an `ArrayIndexOutOfBoundsException` exception.
We can replace the df command with the Java API, which is more general.
```java
23/09/08 22:03:25,522 ERROR [worker-disk-checker] LocalDeviceMonitor: Device check failed.
java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: -4
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:206)
at org.apache.celeborn.common.util.Utils$.tryWithTimeoutAndCallback(Utils.scala:858)
at org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor$.highDiskUsage(DeviceMonitor.scala:258)
at org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$9(DeviceMonitor.scala:136)
at org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$9$adapted(DeviceMonitor.scala:135)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$2(DeviceMonitor.scala:135)
at org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$2$adapted(DeviceMonitor.scala:110)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.run(DeviceMonitor.scala:110)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -4
at org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor$.$anonfun$highDiskUsage$1(DeviceMonitor.scala:240)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at org.apache.celeborn.common.util.Utils$$anon$3.call(Utils.scala:851)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
```
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1892 from cxzl25/CELEBORN-959.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Move the constructor of Worker into a try-catch block.
### Why are the changes needed?
There are some exceptions thrown from Worker's constructor instead of Worker.initialize(), so it is necessary to catch these exceptions.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Start a worker with conf
```
celeborn.worker.directMemoryRatioToPauseReceive=0.7
celeborn.worker.directMemoryRatioToResume=0.7
```
an IllegalArgumentException will be caught
Closes#1894 from jiang13021/celenorn-961.
Authored-by: jiang13021 <jiangyanze.jyz@antgroup.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Worker throw FileNotFoundException while fetch chunk:
```
java.io.FileNotFoundException: /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/871-0-0 (No such file or directory
```
before commit shuffle files, files are deleted in storage-scheduler thread
```
2023-09-07 19:38:16,506 [INFO] [dispatcher-event-loop-44] - org.apache.celeborn.service.deploy.worker.storage.StorageManager -Logging.scala(51) -Create file /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/986-0-0 success
2023-09-07 19:38:16,506 [INFO] [dispatcher-event-loop-44] - org.apache.celeborn.service.deploy.worker.Controller -Logging.scala(51) -Reserved 29 primary location and 0 replica location for application_1693206141914_540726_1-1
2023-09-07 19:38:16,537 [INFO] [storage-scheduler] - org.apache.celeborn.service.deploy.worker.storage.StorageManager -Logging.scala(51) -Delete expired app dir /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
2023-09-07 19:38:16,580 [INFO] [storage-scheduler] - org.apache.celeborn.service.deploy.worker.storage.StorageManager -Logging.scala(51) -Delete expired app dir /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
2023-09-07 19:38:16,629 [INFO] [storage-scheduler] - org.apache.celeborn.service.deploy.worker.storage.StorageManager -Logging.scala(51) -Delete expired app dir /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
2023-09-07 19:38:16,661 [INFO] [storage-scheduler] - org.apache.celeborn.service.deploy.worker.storage.StorageManager -Logging.scala(51) -Delete expired app dir /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
2023-09-07 19:38:16,681 [INFO] [storage-scheduler] - org.apache.celeborn.service.deploy.worker.storage.StorageManager -Logging.scala(51) -Delete expired app dir /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
2023-09-07 19:38:17,355 [INFO] [dispatcher-event-loop-12] - org.apache.celeborn.service.deploy.worker.Controller -Logging.scala(51) -Start commitFiles for application_1693206141914_540726_1-1
2023-09-07 19:38:17,362 [INFO] [async-reply] - org.apache.celeborn.service.deploy.worker.Controller -Logging.scala(51) -CommitFiles for application_1693206141914_540726_1-1 success with 29 committed primary partitions, 0 empty primary partitions, 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions, 0 failed replica partitions.
java.io.FileNotFoundException: /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/976-0-0 (No such file or directory)
java.io.FileNotFoundException: /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/482-0-0 (No such file or directory)
java.io.FileNotFoundException: /xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/658-0-0 (No such file or directory)
```
it may have concurrent problem in this method.
``` scala
private def cleanupExpiredAppDirs(): Unit = {
val appIds = shuffleKeySet().asScala.map(key => Utils.splitShuffleKey(key)._1)
disksSnapshot().filter(_.status != DiskStatus.IO_HANG).foreach { diskInfo =>
diskInfo.dirs.foreach {
case workingDir if workingDir.exists() =>
workingDir.listFiles().foreach { appDir =>
// Don't delete shuffleKey's data that exist correct shuffle file info.
if (!appIds.contains(appDir.getName)) {
val threadPool = diskOperators.get(diskInfo.mountPoint)
deleteDirectory(appDir, threadPool)
logInfo(s"Delete expired app dir $appDir.")
}
}
// workingDir not exist when initializing worker on new disk
case _ => // do nothing
}
}
}
```
We should find all app directories first, then get the active shuffle keys.
https://issues.apache.org/jira/browse/CELEBORN-881
### Why are the changes needed?
Bugfix.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Passes GA and manual test.
Closes#1889 from zy-jordan/CELEBORN-881.
Lead-authored-by: hongzhaoyang <15316036153@163.com>
Co-authored-by: Keyong Zhou <waitinfuture@gmail.com>
Co-authored-by: Keyong Zhou <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
use `TimeUnit.NANOSECONDS.toMillis` instead of `/1000_000`
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1888 from cxzl25/CELEBORN-957.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
fix duplicated sending commitFiles for MapPartition and fix not sending BufferStreamEnd while opening MapPartition split.
### Why are the changes needed?
After open partition split for MapPartition, there are 2 errors.
- ERROR1 : Worker don't send streamend to client because concurrent thread sync problem . After idle timeout, client will close the channel and throws the Exception **" xx is lost, notify related stream xx"**
```java
2023-09-06T04:40:47.7549935Z 23/09/06 04:40:47,753 WARN [Keyed Aggregation -> Map -> Sink: Unnamed (5/8)#0] Task: Keyed Aggregation -> Map -> Sink: Unnamed (5/8)#0 (c1cade728ddb3a32e0bf72acb1d87588_c27dcf7b54ef6bfd6cff02ca8870b681_4_0) switched from RUNNING to FAILED with failure cause:
2023-09-06T04:40:47.7550644Z java.io.IOException: Client localhost/127.0.0.1:38485 is lost, notify related stream 256654410004
2023-09-06T04:40:47.7551219Z at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:142)
2023-09-06T04:40:47.7551886Z at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77)
2023-09-06T04:40:47.7552576Z at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:57)
2023-09-06T04:40:47.7553250Z at org.apache.celeborn.plugin.flink.network.ReadClientHandler.lambda$channelInactive$0(ReadClientHandler.java:119)
2023-09-06T04:40:47.7553806Z at java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1597)
2023-09-06T04:40:47.7554564Z at org.apache.celeborn.plugin.flink.network.ReadClientHandler.channelInactive(ReadClientHandler.java:110)
2023-09-06T04:40:47.7555270Z at org.apache.celeborn.common.network.server.TransportRequestHandler.channelInactive(TransportRequestHandler.java:71)
2023-09-06T04:40:47.7556005Z at org.apache.celeborn.common.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:136)
2023-09-06T04:40:47.7556710Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
2023-09-06T04:40:47.7557370Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
2023-09-06T04:40:47.7558172Z at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
2023-09-06T04:40:47.7558803Z at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
2023-09-06T04:40:47.7559368Z at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
2023-09-06T04:40:47.7559954Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
2023-09-06T04:40:47.7560589Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
2023-09-06T04:40:47.7561222Z at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
2023-09-06T04:40:47.7561829Z at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
2023-09-06T04:40:47.7562620Z at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelInactive(TransportFrameDecoderWithBufferSupplier.java:206)
2023-09-06T04:40:47.7563506Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
2023-09-06T04:40:47.7564207Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
2023-09-06T04:40:47.7564829Z at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
2023-09-06T04:40:47.7565417Z at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
2023-09-06T04:40:47.7566014Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
2023-09-06T04:40:47.7566654Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
2023-09-06T04:40:47.7567317Z at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
2023-09-06T04:40:47.7567813Z at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
2023-09-06T04:40:47.7568297Z at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
2023-09-06T04:40:47.7568830Z at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
2023-09-06T04:40:47.7569402Z at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
2023-09-06T04:40:47.7569894Z at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
2023-09-06T04:40:47.7570356Z at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
2023-09-06T04:40:47.7570841Z at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
2023-09-06T04:40:47.7571319Z at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
2023-09-06T04:40:47.7571721Z at java.lang.Thread.run(Thread.java:750)
```
- ERROR2: Client will send duplicated commitFiles to worker. Becuase of inconsistency unHandledPartiitions , both batchCommit and finalCommit send commitFiles
``` java
2023-09-06T04:36:48.3146773Z 23/09/06 04:36:48,314 WARN [Worker-CommitFiles-1] Controller: Get Partition Location for 1693975002919-61094c8156f918062a5fae12d551bc90-0 0-1 but didn't exist.
```
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
ci
Closes#1881 from zhongqiangczq/fix-split-test.
Authored-by: zhongqiang.czq <zhongqiang.czq@alibaba-inc.com>
Signed-off-by: zhongqiang.czq <zhongqiang.czq@alibaba-inc.com>
### What changes were proposed in this pull request?
When shutdown type is decommission, we should change the `ShutdownHookManager#HookEntry`'s
timeout to `celeborn.worker.decommission.forceExitTimeout`.
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual test
Closes#1877 from waitinfuture/945.
Lead-authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Co-authored-by: Keyong Zhou <waitinfuture@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
In MapPartiitoin, datas are split into regions.
1. Unlike ReducePartition whose partition split can occur on pushing data
to keep MapPartition data ordering, PartitionSplit only be done on the time of sending PushDataHandShake or RegionStart messages (As shown in the following image). That's to say that the partition split only appear at the beginnig of a region but not inner a region.
> Notice: if the client side think that it's failed to push HandShake or RegionStart messages. but the worker side can still receive normal HandShake/RegionStart message. After client revive succss, it don't push any messages to old partition, so the worker having the old partition will create a empty file. After committing files, the worker will return empty commitids. That's to say that empty file will be filterd after committing files and ReduceTask will not read any empty files.

2. PushData/RegioinFinish don't care the following cases:
- Diskfull
- ExceedPartitionSplitThreshold
- Worker ShuttingDown
so if one of the above three conditions appears, PushData and RegionFinish cant still do as normal. Workers should consider the ShuttingDown case and try best to wait all the regions finished before shutting down.
if PushData or RegionFinish failed like network timeout and so on, then MapTask will failed and start another attempte maptask.

3. how shuffle read supports partition split?
ReduceTask should get split paritions by order and open the stream by partition epoc orderly
### Why are the changes needed?
PartiitonSplit is not supported by MapPartition from now.
There still a risk that a partition file'size is too large to store the file on worker disk.
To avoid this risk, this pr introduces partition split in shuffle read and shuffle write.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
UT and manual TPCDS test
Closes#1550 from FMX/CELEBORN-627.
Lead-authored-by: zhongqiang.czq <zhongqiang.czq@alibaba-inc.com>
Co-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Co-authored-by: Ethan Feng <ethanfeng@apache.org>
Signed-off-by: zhongqiang.czq <zhongqiang.czq@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
As title
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
PASS GA
Closes#1872 from jiaoqingbo/939.
Authored-by: jiaoqingbo <1178404354@qq.com>
Signed-off-by: Shuang <lvshuang.tb@gmail.com>
### What changes were proposed in this pull request?
As Title
### Why are the changes needed?
As Title
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
PASS GA
Closes#1873 from jiaoqingbo/940.
Authored-by: jiaoqingbo <1178404354@qq.com>
Signed-off-by: Shuang <lvshuang.tb@gmail.com>
### What changes were proposed in this pull request?
Separate local read test.
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
Pass GA.
Closes#1871 from FMX/CELEBORN-752-FOLLOWUP.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
For spark clusters, support read local shuffle file if Celeborn is co-deployed with yarn node managers. This PR help to reduce the number of active connections.
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA and cluster. The performance is identical whether you enable local reader, but the active connection number may vary according to your connections per peer.
<img width="951" alt="截屏2023-08-16 20 20 14" src="https://github.com/apache/incubator-celeborn/assets/4150993/9106e731-28fc-4e78-9c05-ae6a269d249a">
The active connection number changed from 3745 to 2894. This PR will help to improve cluster stability.
Closes#1812 from FMX/CELEBORN-752.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Add metrics about active shuffle data size in every worker and update Grafana dashboard. The metric value will decrease when shuffle is expired.
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
Cluster.
<img width="733" alt="截屏2023-08-30 17 00 11" src="https://github.com/apache/incubator-celeborn/assets/4150993/48e28c1c-2b49-45d7-b3ba-358674ff3f3d">
Closes#1867 from FMX/CELEBORN-933.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Modify log content and location to accurately describe state changes
### Why are the changes needed?
In the previous implementation, when servingState was PUSH_PAUSED and lastState was PUSH_AND_REPLICATE_PAUSED, the code only triggered the Resume of REPLICATE_MODULE, but the log showed "Trigger action: PAUSE PUSH, RESUME REPLICATE"
The above log content is not accurate
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
PASS GA
Closes#1864 from jiaoqingbo/934.
Authored-by: jiaoqingbo <1178404354@qq.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Don't stop LocalFlusher when notify error.
### Why are the changes needed?
If LocalDeviceMonitor find non-critical error(e.g. disk full usage) count exceeds threshold and notify error to observed device. LocalFlusher will do stopAndCleanFlusher, which will interrupt flush threads. Then if the disk recover from the error, new disk buffer pushed into the flush queue cannot be flushed. It always causes high memory usage, which would cause pausePushDataAndReplicate, and can't recover from it .
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Manual.
1. Run a spark job with shuffle.
2. Copy data in a worker to trigger full disk error.
3. See logs as below

5. Confirm flush threads are still running.
6. Submit another spark job with shuffle, and found disk buffer increase and decrease while flushing disk.

Closes#1860 from liying919/opt-disk-buffer.
Authored-by: 宪英 <xianying.ly@antgroup.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
This PR
1. Strengthening assertion conditions.
2. Enabling the previously ignored `testLargeFile` scenario.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes#1848 from cfmcgrady/refine-partition-files-sorter-suite.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Using `awaitTermination` instead of `shutdownNow`.
### Why are the changes needed?
When we call `wait` function without get the object's monitor will meet `IllegalMonitorStateException`.
And In `saveAllCommittedFileInfosToDB` this function, we are hard to get threshold monitor.
`wait` method metioned:
>IllegalMonitorStateException – if the current thread is not the owner of the object's monitor.
We can using `awaitTermination` replace of `shutdownNow`.
According to desc about `shutdownNow` method:
> This method does not wait for actively executing tasks to terminate. Use awaitTermination to do that.
And `awaitTermination` metions:
> Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.
At all, `awaitTermination` is applicable to the current scenario.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes#1849 from zwangsheng/CELEBORN-926.
Authored-by: zwangsheng <2213335496@qq.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As Title
### Why are the changes needed?
As Title
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
PASS GA
Closes#1847 from jiaoqingbo/CELEBORN-924.
Authored-by: jiaoqingbo <1178404354@qq.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Adding a flag indicating high load in the worker's heartbeat allows the master to better schedule the workers
### Why are the changes needed?
In our production environment, there is a node with abnormally high load, but the master is not aware of this situation. It assigned numerous jobs to this node, and as a result, the stability of these jobs has been affected.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
UT
Closes#1840 from JQ-Cao/920.
Lead-authored-by: Keyong Zhou <zky.zhoukeyong@alibaba-inc.com>
Co-authored-by: caojiaqing <caojiaqing@bilibili.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
- Tweak pause and resume logic
- Add unit test
```mermaid
graph TB
A(NON_PAUSE)
B(PAUSE_PUSH)
C(PAUSE_PUSH_AND_REPLICATE)
A --> | pause push listener | B
B --> | resume push listener | A
A --> | pause push and replicate listeners | C
C --> | resume push and replicate listeners | A
B --> | pause replicate listener | C
C --> | resume replicate listener | B
```
### Why are the changes needed?
Add unit test for those pause and resume logic.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add unit test.
Closes#1835 from zwangsheng/CELEBORN-908.
Authored-by: zwangsheng <2213335496@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Tweak the logic of `MemoryManager#currentServingState`
Add Unit Test for this function
```mermaid
graph TB
A(Check Used Memory) --> B{Reach Pause Replicate Threshold}
B --> | N | C{Reach Pause Push Threshold}
B --> | Y | Z(Trigger Pause Push and Replicate)
C --> | N | D{Reach Resume Threshold}
C --> | Y | Y(Trigger Pause Push but Resume Replicate)
D --> | N | E{In Pause Mode}
D --> | Y | X(Trigger Resume Push and Replicate)
E --> | N | U(Do Nothing)
E --> | Y | Y
```
### Why are the changes needed?
Make this method logical, and add unit test to ensure logic won't be accidental modification
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add Unit Test
Closes#1811 from zwangsheng/CELEBORN-888.
Authored-by: zwangsheng <2213335496@qq.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
This PR introduces support for Scala 2.13
1. Resolved a compilation issue specific to Scala 2.13
2. Successfully validated compatibility with Scala 2.13 through the comprehensive suite of unit tests
3. Enabled SBT CI for Scala 2.13 within the "server" module and the "spark client"
For more detailed guidance on migrating to Scala 2.13, please consult the following resources:
1. https://www.scala-lang.org/blog/2017/02/28/collections-rework.html
2. https://docs.scala-lang.org/overviews/core/collections-migration-213.html
### Why are the changes needed?
As title
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes#1825 from cfmcgrady/scala213.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
When worker enabled graceful shutdown, `exitKind` is not set correctly
### Why are the changes needed?
Ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manually test
Closes#1822 from onebox-li/fix-worker-shutdown.
Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1768 from AngersZhuuuu/CELEBORN-847.
Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: Keyong Zhou <zhouky@apache.org>
Co-authored-by: Keyong Zhou <waitinfuture@gmail.com>
Co-authored-by: Keyong Zhou <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
This PR fixes a bug that in rare cases it may cause data lost.
### Why are the changes needed?
I received a bug report from one of the users that in an extreme case small data lost happens. I
reproduced the bug under the following conditions:
1. Shuffle data size for one partition id is relatively large, for example 400GB
2. `celeborn.client.shuffle.partitionSplit.mode` is set to HARD
3. `celeborn.client.shuffle.batchHandleCommitPartition.enabled` is enabled
At the mean time, there are warning messages in worker's log
```
23/08/11 17:10:04,501 WARN [push-server-6-44] PushDataHandler: Append data failed for task(shuffle application_1691635581416_0021-0, map 746, attempt 0), caused by AlreadyClosedException, endedAttempt -1, error message: FileWriter has already closed!, fileName /mnt/disk1/celeborn-worker/shuffle_data/application_1691635581416_0021/0/0-107-0
23/08/11 17:12:04,445 WARN [push-server-6-35] PushDataHandler: Append data failed for task(shuffle application_1691635581416_0021-0, map 3016, attempt 0), caused by AlreadyClosedException, endedAttempt -1, error message: FileWriter has already closed!, fileName /mnt/disk3/celeborn-worker/shuffle_data/application_1691635581416_0021/0/0-356-0
```

After digging into it, I found the reason for the data lost is as follows:
1. For some partition id in some worker, the file size exceeds `celeborn.client.shuffle.partitionSplit.threshold`, then
`CommitManager` in `LifecycleManager` will trigger `CommitFiles` because `batchHandleCommitPartition` is enabled
2. Before `CommitFile` finishes, `PushDataHandler` receives `PushData` or `PushMergedData`, it finds that the partition has not committed yet, and is preparing to call `fileWriter.incrementPendingWrites()` and `callback.onSuccess`
3. Before `PushDataHandler` calls `fileWriter.incrementPendingWrites()`, the `CommitFiles` finishes and the FileWriter
successfully closes.
4. Then `PushDataHandler` calls `fileWriter.incrementPendingWrites()` and `callback.onSuccess`. After this time,
`ShuffleClient` thinks the `PushData` succeeds. However, when `PushDataHandler` calls `fileWriter.write()`, it
finds it already closed and throws the above exception. However, the exception is ignored, so the data lost happens.
This PR fixes this by checking whether FileWriter has closed after calling `incrementPendingWrites`. If true,
`PushDataHandler` calls `onFailure`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes#1808 from waitinfuture/890.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
As title
### Does this PR introduce _any_ user-facing change?
Yes, the thread local cache of shared `PooledByteBufAllocator` can be disabled by setting `celeborn.network.memory.allocator.allowCache=false`
### How was this patch tested?
Pass GA
Closes#1716 from cfmcgrady/allow-cache.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
<!--
Thanks for sending a pull request! Here are some tips for you:
- Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] Your PR title ...'.
- Be sure to keep the PR description updated to reflect all changes.
- Please write your PR title to summarize what this PR proposes.
- If possible, provide a concise example to reproduce the issue for a faster review.
-->
### What changes were proposed in this pull request?
1. Expose the config check logic during `MemoryManager#initialization` in the user configuration doc.
2. Add Preconditions Error Message
3. Add unit test to make sure that part of the logic isn't altered by mistake
### Why are the changes needed?
User-friendly
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Add Unit Test
Closes#1801 from zwangsheng/CELEBORN-883.
Authored-by: zwangsheng <2213335496@qq.com>
Signed-off-by: zwangsheng <2213335496@qq.com>
### What changes were proposed in this pull request?
1. Wrap IOException to PartitionUnRetryAbleException when fetch
2. Improve message logging when open stream/read data error
### Why are the changes needed?
When open stream, there would be encounter many different IOExceptions such as NoSuchFileException, FileNotFoundException,FileCorruptedException etc, for these checked exception should wrap to PartitionUnRetryAbleException to let client choose to regenerate the data.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT & Manual test
Closes#1796 from RexXiong/CELEBORN-878-IO-Exception.
Authored-by: Shuang <lvshuang.tb@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Merge OpenStream and StreamHandler to transport messages to enhance celeborn's compatibility.
### Why are the changes needed?
1. Improve flexibility to change RPC.
2. Compatible with 0.2 client.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
UT and cluster.
Closes#1750 from FMX/CELEBORN-760.
Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Co-authored-by: Ethan Feng <fengmingxiao.fmx@alibaba-inc.com>
Co-authored-by: Keyong Zhou <zhouky@apache.org>
Co-authored-by: Keyong Zhou <waitinfuture@gmail.com>
Co-authored-by: Keyong Zhou <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
As title
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes#1791 from cfmcgrady/enrich-fetch-log.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Support worker recovery if the worker has crashed when workers has enabled graceful shutdown..
1. Persist committed file info to LevelDB.
2. Load levelDB when worker started.
3. Clean expired file infos in LevelDB.
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA and cluster. After testing on a cluster I found that 8k file infos will consume about 2MB of disk space, disk space can be reclaimed if shuffle is expired shortly.
Closes#1779 from FMX/CELEBORN-863.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Adding new metrics to record the number of registered connections
### Why are the changes needed?
Monitor the number of active connections on worker nodes
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
no
Closes#1773 from JQ-Cao/852.
Authored-by: caojiaqing <caojiaqing@bilibili.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1759 from AngersZhuuuu/CELEBORN-832.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
Decrease sort memory counter after sorting procedure is complete.
### Why are the changes needed?
Fix incorrect sort memory counter.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
UT.
Closes#1766 from FMX/CELEBORN-845.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Pass exit kind to each component, if the exit kind match:
- GRACEFUL_SHUTDOWN: Behavior as origin code's graceful == true
- Others: will clean the level db file.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1748 from AngersZhuuuu/CELEBORN-819.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1756 from AngersZhuuuu/CELEBORN-656-FOLLOWUP.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Eliminate `chunksBeingTransferred` calculation when `celeborn.shuffle.io.maxChunksBeingTransferred` is not configured
### Why are the changes needed?
I observed high CPU usage on `ChunkStreamManager#chunksBeingTransferred` calculation. We can eliminate the method call if no threshold is configured, and investigate how to improve the method itself in the future.
<img width="1947" alt="image" src="https://github.com/apache/incubator-celeborn/assets/26535726/412c6a41-c0ce-440c-ae99-4424cb8702d3">
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI and Review.
Closes#1749 from pan3793/CELEBORN-827.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1742 from AngersZhuuuu/CELEBORN-820.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
As title.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Passes GA.
Closes#1739 from AngersZhuuuu/CELEBORN-815.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Fix some typos and grammar
### Why are the changes needed?
Ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
manually test
Closes#1733 from onebox-li/fix-typo.
Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
cleanup the unnecessary TODO which introduced in https://github.com/apache/incubator-celeborn/pull/1727
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Review
Closes#1728 from cfmcgrady/shutdown.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
Recently, while conducting the sbt build test, it came to my attention that certain resources such as ports and threads were not being released promptly.
This pull request introduces a new method, `shutdown(graceful: Boolean)`, to the `Service` trait. When invoked by `MiniClusterFeature.shutdownMiniCluster`, it calls `worker.shutdown(graceful = false)`. This implementation aims to prevent possible memory leaks during CI processes.
Before this PR the unit tests in the `client/common/master/service/worker` modules resulted in leaked ports.
```
$ jps
1138131 Jps
1130743 sbt-launch-1.9.0.jar
$ netstat -lntp | grep 1130743
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp 0 0 127.0.0.1:12345 0.0.0.0:* LISTEN 1130743/java
tcp 0 0 0.0.0.0:41563 0.0.0.0:* LISTEN 1130743/java
tcp 0 0 0.0.0.0:42905 0.0.0.0:* LISTEN 1130743/java
tcp 0 0 0.0.0.0:44419 0.0.0.0:* LISTEN 1130743/java
tcp 0 0 0.0.0.0:45025 0.0.0.0:* LISTEN 1130743/java
tcp 0 0 0.0.0.0:44799 0.0.0.0:* LISTEN 1130743/java
tcp 0 0 0.0.0.0:39053 0.0.0.0:* LISTEN 1130743/java
tcp 0 0 0.0.0.0:39029 0.0.0.0:* LISTEN 1130743/java
tcp 0 0 0.0.0.0:39475 0.0.0.0:* LISTEN 1130743/java
tcp 0 0 0.0.0.0:40153 0.0.0.0:* LISTEN 1130743/java
tcp 0 0 0.0.0.0:33051 0.0.0.0:* LISTEN 1130743/java
tcp 0 0 0.0.0.0:33449 0.0.0.0:* LISTEN 1130743/java
tcp 0 0 0.0.0.0:34073 0.0.0.0:* LISTEN 1130743/java
tcp 0 0 0.0.0.0:35347 0.0.0.0:* LISTEN 1130743/java
tcp 0 0 0.0.0.0:35971 0.0.0.0:* LISTEN 1130743/java
tcp 0 0 0.0.0.0:36799 0.0.0.0:* LISTEN 1130743/java
tcp 0 0 192.168.1.151:40775 0.0.0.0:* LISTEN 1130743/java
tcp 0 0 192.168.1.151:44457 0.0.0.0:* LISTEN 1130743/java
```
After this PR:
```
$ jps
1114423 Jps
1107544 sbt-launch-1.9.0.jar
$ netstat -lntp | grep 1107544
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes#1727 from cfmcgrady/shutdown.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
…up client
### What changes were proposed in this pull request?
Add heartbeat from client to lifecycle manager. In this PR heartbeat request contains local shuffle ids from
client, lifecycle manager checks with it's local set and returns ids it doesn't know. Upon receiving response,
client calls ```unregisterShuffle``` for cleanup.
### Why are the changes needed?
Before this PR, client side ```unregisterShuffle``` is never called. When running TPCDS 3T with spark thriftserver
without DRA, I found the Executor's heap contains 1.6 million PartitionLocation objects (and StorageInfo):

After this PR, the number of PartitionLocation objects decreases to 275 thousands

This heartbeat can be extended in the future for other purposes, i.e. reporting client's metrics.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Passes GA and manual test.
Closes#1719 from waitinfuture/798.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
According to https://github.com/apache/incubator-celeborn/pull/1709#discussion_r1260133078
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Passes GA.
Closes#1711 from waitinfuture/790-fu.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title.
### Why are the changes needed?
Before this PR, ```Flusher#takeBuffer``` returns a ```CompositeByteBuf``` which is unpooled and on heap:
```
buffer = Unpooled.compositeBuffer(maxComponents)
```
```
public static CompositeByteBuf compositeBuffer(int maxNumComponents) {
return new CompositeByteBuf(ALLOC, /*direct*/ false, maxNumComponents);
}
```
When consolidation happens, the data will be copied from direct memory to heap memory, causing OOM and
perf degration.
With this PR, in my test cases of shuffling 14G for three 1G/1G workers, I don't see disk buffer larger than direct memory,
nor do I encounter high GC.
### Does this PR introduce _any_ user-facing change?
This patch fixes some OOM issues.
### How was this patch tested?
Passes GA and manual test.
Closes#1709 from waitinfuture/790.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Add chunk related UTs for FileWriter.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Passes GA.
Closes#1705 from waitinfuture/787.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Add a configuration `celeborn.worker.shuffle.partitionSplit.max` to ensure that, in soft mode, individual partition files are limited to a size smaller than the configured value
### Why are the changes needed?
In soft mode, there may be situations where individual partition files are exceptionally large, which can result in excessively long sort times in skewed scenarios.
### Does this PR introduce _any_ user-facing change?
`celeborn.worker.shuffle.partitionSplit.max` defalut value 2g
### How was this patch tested?
none
Closes#1701 from JQ-Cao/785.
Authored-by: caojiaqing <caojiaqing@bilibili.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Final flush with none empty buffer.
### Why are the changes needed?
This bug was introduced by [CELEBORN-626](https://github.com/apache/incubator-celeborn/pull/1534/files)
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass UT
Closes#1702 from RexXiong/CELEBORN-626-dev.
Authored-by: Shuang <lvshuang.tb@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Make max components configurable for FileWriter#flushBuffer.
### Why are the changes needed?
When max components of ```CompositeByteBuf``` is too big (hard coded 256 before this PR), netty's offheap memory
usage will be several times bigger than true usage:
```
Direct memory usage: 1044.0 MiB/4.0 GiB, disk buffer size: 255.9 MiB
```
When set to 1, netty's memory usage will be very close to disk buffer:
```
Direct memory usage: 376.0 MiB/4.0 GiB, disk buffer size: 353.0 MiB
```
but when it set too low, for example 1, performance might decrease, especially for sort pusher:
```
max components: 1 vs. 16
shuffle write time: 34s vs. 30s
```
For hash pusher, difference is not so big:
```
max components: 1 vs. 8
shuffle write time: 25s vs. 23s
```
This PR makes the parameter configurable.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Passes GA, and manual test.
Closes#1697 from waitinfuture/782.
Lead-authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Co-authored-by: Keyong Zhou <waitinfuture@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
To fix sorted file size summary counter overflow. Sorted file size summary can be larger than the integer's max value. It should be applied to branch-0.3, branch-0.2, and main branch.
### Why are the changes needed?
This graph is incorrect.
<img width="1500" alt="lQLPJx4a6A76xo7NBOHNC7iwLBx50tXnv5IEoQTQxMAMAA_3000_1249" src="https://github.com/apache/incubator-celeborn/assets/4150993/8d288087-64d7-4569-ba04-05bb1915118a">
### Does this PR introduce _any_ user-facing change?
It will cause user-facing changes. Celeborn worker's metric for sorted file summary is incorrect and this patch will correct it.
### How was this patch tested?
UT.
Closes#1694 from FMX/CELEBORN-779.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Rename
```
enum MemoryManagerStat { resumeAll, pausePushDataAndReplicate, pausePushDataAndResumeReplicate }
```
to
```
enum ServingState { NONE_PAUSED, PUSH_AND_REPLICATE_PAUSED, PUSH_PAUSED }
```
### Why are the changes needed?
`MemoryManagerStat` indicates the worker serving functionalities, and it's weird to represent the state using verbs.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
Closes#1691 from pan3793/CELEBORN-778.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Fix compute bug
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1693 from AngersZhuuuu/CELEBORN-777.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
In `TimeSlidingHub.add()` `_deque` will clear then add the pair.
```
if (nodesToAdd >= maxQueueSize) {
// The new node exceed existing sliding list, need to clear all old nodes
// and create a new sliding list
_deque.clear();
_deque.add(Pair.of(currentTimestamp, (N) newNode.clone()));
sumNode = (N) newNode.clone();
return;
}
```
Then when call `BufferStatusHub.avgBytesPerSec()`, `currentNumBytes` can be `> 0` but `getCurrentTimeWindowsInMills` may return 0. Cause the error.
```
public long avgBytesPerSec() {
long currentNumBytes = sum().numBytes();
if (currentNumBytes > 0) {
return currentNumBytes * 1000 / (long) getCurrentTimeWindowsInMills();
}
return 0L;
}
```
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1690 from AngersZhuuuu/CELEBORN-777.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
Fix the refactor bug of CELEBORN-614 (https://github.com/apache/incubator-celeborn/pull/1517).
### Why are the changes needed?
This is a bug fix, the condition `writer.getException != null` was inverted accidentally during CELEBORN-614 (https://github.com/apache/incubator-celeborn/pull/1517), which causes the trim became no-op.
### Does this PR introduce _any_ user-facing change?
No. The bug was caused by an unreleased commit.
### How was this patch tested?
Set Worker off-heap memory to 2G, and run 1T tera sort.
Before: the trim does not trigger disk buffer flush, causing the worker can not to recover from the pause pushdata state, then Job failed.
After: the trim correctly triggers disk buffer flush, releases the worker memory, and the Job succeeded.
<img width="1653" alt="image" src="https://github.com/apache/incubator-celeborn/assets/26535726/9ef62c78-e6a9-497f-9dac-d3f712e830cc">
Closes#1689 from pan3793/CELEBORN-614-followup.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Make Celeborn leader clean expired app dirs on HDFS when an application is Lost.
### Why are the changes needed?
If Celeborn is working on HDFS, the storage manager starts and cleans expired app directories, and the newly created worker will want to delete any unknown app directories.
This will cause using app directories to be deleted unexpectedly.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
UT and cluster.
Closes#1678 from FMX/CELEBORN-764.
Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Co-authored-by: Ethan Feng <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Rename remain rss related class name and filenames etc...
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1664 from AngersZhuuuu/CELEBORN-751.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
eliminate comments introduced in https://github.com/apache/incubator-celeborn/pull/1650
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes#1672 from cfmcgrady/primary-replica-followup.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
- gauge method definition improvement. i.e.
before
```
def addGauge[T](name: String, f: Unit => T, labels: Map[String, String])
```
after
```
def addGauge[T](name: String, labels: Map[String, String])(f: () => T)
```
which improves the caller-side code style
```
addGauge(name, labels) { () =>
...
}
```
- remove unnecessary Java/Scala collection conversion. Since Scala 2.11 does not support SAM, the extra implicit function is required.
- leverage Logging to defer message evaluation
- UPPER_CASE string constants
### Why are the changes needed?
Improve code quality and performance(maybe)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
Closes#1670 from pan3793/CELEBORN-757.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Support to decide whether to compress shuffle data through configuration.
### Why are the changes needed?
Currently, Celeborn compresses all shuffle data, but for example, the shuffle data of Gluten has already been compressed. In this case, no additional compression is required. Therefore, configuration needs to be provided for users to decide whether to use Celeborn’s compression according to the actual situation.
### Does this PR introduce _any_ user-facing change?
no.
Closes#1669 from kerwin-zk/celeborn-755.
Authored-by: xiyu.zk <xiyu.zk@alibaba-inc.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
netty has exposed the public API `PlatformDependent.usedDirectMemory()` to get netty used direct memory since [netty-4.1.35.Final](https://github.com/netty/netty/pull/8945), simplifies the logic
### Why are the changes needed?
simplifies the get netty used direct memory logic
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes#1662 from cfmcgrady/netty-used-memory.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
In order to distinguish it from the existing master/worker, refactor data replication terminology to 'primary/replica' for improved clarity and inclusivity in the codebase
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
existing tests
Closes#1639 from cfmcgrady/primary-replica.
Lead-authored-by: Fu Chen <cfmcgrady@gmail.com>
Co-authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Rename RssHARetryClient to MasterClient
### Why are the changes needed?
Code refactor
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
Closes#1661 from AngersZhuuuu/CELEBORN-748.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Print time/bytes in human-readable format
### Why are the changes needed?
Make logs readable
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
Closes#1659 from pan3793/minor.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Rename HeartbeatResponse to HeartbeatFromWorkerResponse
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1651 from AngersZhuuuu/CELEBORN-739.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
This pull PR is an integral component of #1639 . It primarily focuses on updating configuration settings and metrics terminology, while ensuring compatibility with older client versions by refraining from introducing changes related to RPC.
### Why are the changes needed?
In order to distinguish it from the existing master/worker, refactor data replication terminology to 'primary/replica' for improved clarity and inclusivity in the codebase
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
existing tests.
Closes#1650 from cfmcgrady/primary-replica-metrics.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Remove unused RPC GetWorkerInfo & GetWorkerInfosResponse
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1647 from AngersZhuuuu/CELEBORN-735.
Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
Remove unused RPC ThreadDump & ThreadDumpResponse
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1645 from AngersZhuuuu/CELEBORN-732.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Refine the congestion relevant code/log/comments
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
manually test
Closes#1637 from onebox-li/improve-congestion.
Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Fix the flaky test by enlarging `celeborn.client.shuffle.expired.checkInterval`
### Why are the changes needed?
```
RssHashCheckDiskSuite:
- celeborn spark integration test - hash-checkDiskFull *** FAILED ***
868 was not less than 0 (RssHashCheckDiskSuite.scala:83)
```
https://github.com/apache/incubator-celeborn/actions/runs/5396767745/jobs/9800766633
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA, and should observe CI,
Closes#1640 from pan3793/CELEBORN-727.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
To clean the remnant application directory after Celeborn Worker is restarted.
### Why are the changes needed?
Remnant application directories will not be deleted, because `hadoopFs.listFiles(path,false)` will not list directories.
### Does this PR introduce _any_ user-facing change?
No.
Closes#1641 from Demon-Liang/0.3-dev.
Authored-by: Demon Liang <liangdingwen.ldw@alipay.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
(cherry picked from commit 42a9160c8ceaf79bae514c54dafcb5b8e12d5251)
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Unify all blacklist related code and comment
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1638 from AngersZhuuuu/CELEBORN-666-FOLLOWUP.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
This PR batches revive requests and periodically send to LifecycleManager to reduce number or RPC requests.
To be more detailed. This PR changes Revive message to support multiple unique partitions, and also passes a set unique mapIds for checking MapEnd. Each time ShuffleClientImpl wants to revive, it adds a ReviveRquest to ReviveManager and wait for result. ReviveManager batches revive requests and periodically send to LifecycleManager (deduplicated by partitionId). LifecycleManager constructs ChangeLocationsCallContext and after all locations are notified, it replies to ShuffleClientImpl.
### Why are the changes needed?
In my test 3T TPCDS q23a with 3 Celeborn workers, when kill a worker, the LifecycleManger will receive 4.8w Revive requests:
```
[emr-usermaster-1-1 logs]$ cat spark-emr-user-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-master-1-1.c-fa08904e94c028d1.out.1 |grep -i revive |wc -l
64364
```
After this PR, number of ReviveBatch requests reduces to 708:
```
[emr-usermaster-1-1 logs]$ cat spark-emr-user-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-master-1-1.c-fa08904e94c028d1.out |grep -i revive |wc -l
2573
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test. I have tested:
1. Disable graceful shutdown, kill one worker, job succeeds
2. Disable graceful shutdown, kill two workers successively, job fails as expected
3. Enable graceful shutdown, restart two workers successively, job succeeds
4. Enable graceful shutdown, restart two workers successively, then kill the third one, job succeeds
Closes#1588 from waitinfuture/656-2.
Lead-authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Co-authored-by: Keyong Zhou <zhouky@apache.org>
Co-authored-by: Keyong Zhou <waitinfuture@gmail.com>
Signed-off-by: Shuang <lvshuang.tb@gmail.com>
### What changes were proposed in this pull request?
1. Celeborn supports storage type selection. HDD, SSD, and HDFS are available for now.
2. Add new buffer size for HDFS file writers.
3. Worker support empty working dirs.
### Why are the changes needed?
Support HDFS only scenario.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
UT and cluster.
Closes#1619 from FMX/CELEBORN-568.
Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Co-authored-by: Ethan Feng <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Celeborn generate hadoop configuration should respect Celeborn conf
### Why are the changes needed?
In spark client side we should write like `spark.celeborn.hadoop.xxx.xx`
In server side we should write like `celeborn.hadoop.xxx.xxx`
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1629 from AngersZhuuuu/CELEBORN-719.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
This PR aims to make network local address binding support both IP and FQDN strategy.
Additional, it refactors the `ShuffleClientImpl#genAddressPair`, from `${hostAndPort}-${hostAndPort}` to `Pair<String, String>`, which works properly when using IP but may not on FQDN because FQDN may contain `-`
### Why are the changes needed?
Currently, when the bind hostname is not set explicitly, Celeborn will find the first non-loopback address and always uses the IP to bind, this is not suitable for K8s cases, as the STS has a stable FQDN but Pod IP will be changed once Pod restarting.
For `ShuffleClientImpl#genAddressPair`, it must be changed otherwise may cause
```
java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 11657 in stage 0.0 failed 4 times, most recent failure: Lost task 11657.3 in stage 0.0 (TID 12747) (10.153.253.198 executor 157): java.lang.ArrayIndexOutOfBoundsException: 1
at org.apache.celeborn.client.ShuffleClientImpl.doPushMergedData(ShuffleClientImpl.java:874)
at org.apache.celeborn.client.ShuffleClientImpl.pushOrMergeData(ShuffleClientImpl.java:735)
at org.apache.celeborn.client.ShuffleClientImpl.mergeData(ShuffleClientImpl.java:827)
at org.apache.spark.shuffle.celeborn.SortBasedPusher.pushData(SortBasedPusher.java:140)
at org.apache.spark.shuffle.celeborn.SortBasedPusher.insertRecord(SortBasedPusher.java:192)
at org.apache.spark.shuffle.celeborn.SortBasedShuffleWriter.fastWrite0(SortBasedShuffleWriter.java:192)
at org.apache.spark.shuffle.celeborn.SortBasedShuffleWriter.write(SortBasedShuffleWriter.java:145)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1508)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
```
### Does this PR introduce _any_ user-facing change?
Yes, a new configuration `celeborn.network.bind.preferIpAddress` is introduced, and the default value is `true` to preserve the existing behavior.
### How was this patch tested?
Manually testing with `celeborn.network.bind.preferIpAddress=false`
```
Server: 10.178.96.64
Address: 10.178.96.64#53
Name: celeborn-master-0.celeborn-master-svc.spark.svc.cluster.local
Address: 10.153.143.252
Server: 10.178.96.64
Address: 10.178.96.64#53
Name: celeborn-master-1.celeborn-master-svc.spark.svc.cluster.local
Address: 10.153.173.94
Server: 10.178.96.64
Address: 10.178.96.64#53
Name: celeborn-master-2.celeborn-master-svc.spark.svc.cluster.local
Address: 10.153.149.42
starting org.apache.celeborn.service.deploy.worker.Worker, logging to /opt/celeborn/logs/celeborn--org.apache.celeborn.service.deploy.worker.Worker-1-celeborn-worker-4.out
2023-06-25 23:49:52 [INFO] [main] org.apache.celeborn.common.rpc.netty.Dispatcher#51 - Dispatcher numThreads: 4
2023-06-25 23:49:52 [INFO] [main] org.apache.celeborn.common.network.client.TransportClientFactory#91 - mode NIO threads 64
2023-06-25 23:49:52 [INFO] [main] org.apache.celeborn.common.rpc.netty.NettyRpcEnvFactory#51 - Starting RPC Server [WorkerSys] on celeborn-worker-4.celeborn-worker-svc.spark.svc.cluster.local:0 with advisor endpoint celeborn-worker-4.celeborn-worker-svc.spark.svc.cluster.local:0
2023-06-25 23:49:52 [INFO] [main] org.apache.celeborn.common.util.Utils#51 - Successfully started service 'WorkerSys' on port 38303.
```
Closes#1622 from pan3793/CELEBORN-713.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Change Celeborn Master URL from `rss://<host>:<port>` to `celeborn://<host>:<port>`
### Why are the changes needed?
Respect the project name.
### Does this PR introduce _any_ user-facing change?
Yes, migration guide is updated accordingly.
### How was this patch tested?
Pass GA.
Closes#1624 from pan3793/CELEBORN-715.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
1. separated push data timeout tests and push merge data timeout tests in `PushDataTimeoutTest`
2. updated the test results assertion
3. rework `pushdata timeout will add to blacklist`
### Why are the changes needed?
ensure that the timeout behavior is correctly implemented
https://github.com/apache/incubator-celeborn/pull/1613#discussion_r1236423721
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
existing tests
Closes#1620 from cfmcgrady/push-timeout-test.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Make appUniqueId a member of ShuffleClientImpl and remove applicationId from RPC messages across client side, so it won't cause compatibility issues.
### Why are the changes needed?
Currently Celeborn Client is bound to a single application id, so there's no need to pass applicationId around in many RPC messages in client side.
### Does this PR introduce _any_ user-facing change?
In some logs the application id will not be printed, which should not be a problem.
### How was this patch tested?
UTs.
Closes#1621 from waitinfuture/appid.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
- Fix commit metrics in application heartbeat
- Change client side application heartbeat message log level to info
- Improve heartbeat log by unify the word "heartbeat"
### Why are the changes needed?
`commitHandler.commitMetrics()` has side effects, multiple calls to get values independently is incorrect.
```
def commitMetrics(): (Long, Long) = (totalWritten.sumThenReset(), fileCount.sumThenReset())
```
### Does this PR introduce _any_ user-facing change?
Yes, bug fix.
### How was this patch tested?
Review.
Closes#1617 from pan3793/CELEBORN-708.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Fixes compatibility issue caused by change of push data timeout.
### Why are the changes needed?
When I test with branch-0.2 client with main server side, I got the following error:
```
23/06/20 17:42:34,538 ERROR [push-timeout-checker-12] PushDataHandler: PushData replication failed for partitionLocation: PartitionLocation[
id-epoch:767-0
host-rpcPort-pushPort-fetchPort-replicatePort:192.168.1.17-37687-42605-42891-41319
mode:MASTER
peer:(host-rpcPort-pushPort-fetchPort-replicatePort:192.168.1.18-45201-35749-41831-46853)
storage hint:StorageInfo{type=MEMORY, mountPoint='/mnt/disk4', finalResult=false, filePath=}
mapIdBitMap:null]
org.apache.celeborn.common.exception.CelebornIOException: PUSH_DATA_TIMEOUT_SLAVE
at org.apache.celeborn.common.network.client.TransportResponseHandler.failExpiredPushRequest(TransportResponseHandler.java:125)
at org.apache.celeborn.common.network.client.TransportResponseHandler.lambda$new$0(TransportResponseHandler.java:96)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
```
The error is because in main branch ReserveSlots added a new field ```pushDataTimeout```, so when client is branch-0.2, the default value will be 0, and always trigger timeout.
### Does this PR introduce _any_ user-facing change?
Yes, this PR fixes a bug when client is branch-0.2 and server is main.
### How was this patch tested?
Manual test.
Closes#1611 from waitinfuture/701.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
1. Foreach PartitionLocation returned from Register Shuffle, remove from worker's local excluded list to refresh the local information.
2. ChangeLocationResponse will also return whether oldPartition is excluded in LifecycleManager. If so, remove it from Executor's excluded list.
3. Always trigger commit files for shutting down workers returned from HeartbeatFromApplicationResponse.
4. HeartbeatFromWorker sends the correct disk infos regardless of shutting down or not.
After this PR, the priority of excluded list is Master > LifecycleManager > Executor.
### Why are the changes needed?
During test with graceful turned on(workers have static rpc/push/fetch/replicate ports) and consistently restart one out of three workers, I encountered several bugs.
1. First I killed worker A, then Executor's client's local excluded list will contain A, after A stopped, I started it again, then master will offer slots on A, so we should remove from the executor's excluded list then.
2. When I kill-and-start a worker twice in a short time smaller than the app heartbeat interval, the second time WorkerStatusTracker will not trigger commit files because the local cache for the worker has not been refreshed.
3. When a worker is shutting down, in its heartbeat it passes empty diskInfos, and master blindly added to excluded list. We want a worker be either in the excluded list, or in the shutting down list, exclusively. If a worker is in excluded list, then LIfecycleManager will not trigger commit files when handle heartbeat response; on the other hand, if a worker is in the shutting down list, LifecycleManager will trigger commit files on it. So we must make it correct that a shutting down worker be in the shutting down list.
### Does this PR introduce _any_ user-facing change?
Yes, it fixes several bugs described above.
### How was this patch tested?
Manual test.
Closes#1606 from waitinfuture/696.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.tb@gmail.com>
### What changes were proposed in this pull request?
DeviceInfo `deviceStatAvailable` 's variable name and assignment does not match
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
Cluster test
Closes#1607 from onebox-li/fix-deviceStatAvailable.
Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
If dataDir does not exists, it will skip check in DeviceMonitor::readWriteError.
It will cause that a disk found READ_OR_WRITE_FAILURE when creating FileWriter error, may change to be healthy in the subsequent disk checker.
```
23/06/19 19:09:52,718 ERROR [dispatcher-event-loop-16] StorageManager: Create FileWriter for /data7/rss_storage/rss-worker/shuffle_data/application_1684305681931_1943199/180/79-0-0 of mount /data7 failed, report to DeviceMonitor
java.io.IOException: No such file or directory
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at org.apache.celeborn.service.deploy.worker.storage.StorageManager.createWriter(StorageManager.scala:330)
...
at java.lang.Thread.run(Thread.java:745)
23/06/19 19:09:52,718 ERROR [dispatcher-event-loop-16] LocalDeviceMonitor: Receive non-critical exception, disk: /data7, java.io.IOException: java.io.IOException: No such file or directory
updated state
DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: /data7, usableSpace: 536870912000, avgFlushTime: 0, avgFetchTime: 0, activeSlots: 0) status: READ_OR_WRITE_FAILURE dirs /data7/rss_storage/rss-worker/shuffle_data
after disk checker, updated stated
DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: /data7, usableSpace: 536870912000, avgFlushTime: 0, avgFetchTime: 0, activeSlots: 0) status: HEALTHY dirs /data7/rss_storage/rss-worker/shuffle_data
```
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Cluster test
Closes#1608 from onebox-li/fix-disk-check.
Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Correct the FsPermission 755.
### Why are the changes needed?
We should use octal 0755 or "755" instead of decimal to represent UNIX permission. String is chosen because octal is deprecated in Scala.
### Does this PR introduce _any_ user-facing change?
Yes, it's a bug fix.
### How was this patch tested?
Manually review.
Closes#1597 from pan3793/CELEBORN-685.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Refactor WorkerInfo
1. make ```diskInfos```, ```userResourceConsumption``` new maps instead of using the passed in reference
2. remove ```endpoint``` from the constructor
### Why are the changes needed?
When manually test stop-worker.sh with graceful turned on, I got the following Exception
```
23/06/19 11:04:25,665 INFO [worker-forward-message-scheduler] RssHARetryClient: connect to master master-1-1:9097.
23/06/19 11:04:27,168 ERROR [worker-forward-message-scheduler] RssHARetryClient: Send rpc with failure, has tried 15, max try 15!
org.apache.celeborn.common.exception.CelebornException: Exception thrown in awaitResult:
at org.apache.celeborn.common.util.ThreadUtils$.awaitResult(ThreadUtils.scala:231)
at org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:74)
at org.apache.celeborn.common.haclient.RssHARetryClient.sendMessageInner(RssHARetryClient.java:150)
at org.apache.celeborn.common.haclient.RssHARetryClient.askSync(RssHARetryClient.java:118)
at org.apache.celeborn.service.deploy.worker.Worker.org$apache$celeborn$service$deploy$worker$Worker$$heartBeatToMaster(Worker.scala:306)
at org.apache.celeborn.service.deploy.worker.Worker$$anon$1.$anonfun$run$1(Worker.scala:332)
at org.apache.celeborn.common.util.Utils$.tryLogNonFatalError(Utils.scala:186)
at org.apache.celeborn.service.deploy.worker.Worker$$anon$1.run(Worker.scala:332)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.celeborn.common.exception.CelebornIOException: remove
at org.apache.celeborn.service.deploy.master.clustermeta.ha.HAHelper.sendFailure(HAHelper.java:65)
at org.apache.celeborn.service.deploy.master.Master.executeWithLeaderChecker(Master.scala:210)
at org.apache.celeborn.service.deploy.master.Master$$anonfun$receiveAndReply$1.applyOrElse(Master.scala:315)
at org.apache.celeborn.common.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
at org.apache.celeborn.common.rpc.netty.Inbox.safelyCall(Inbox.scala:222)
at org.apache.celeborn.common.rpc.netty.Inbox.process(Inbox.scala:110)
at org.apache.celeborn.common.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:229)
... 3 more
Caused by: java.lang.UnsupportedOperationException: remove
at scala.collection.convert.Wrappers$MapWrapper$$anon$2$$anon$3.remove(Wrappers.scala:236)
at java.util.AbstractMap.remove(AbstractMap.java:254)
at org.apache.celeborn.common.meta.WorkerInfo.$anonfun$updateThenGetDiskInfos$2(WorkerInfo.scala:225)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.celeborn.common.meta.WorkerInfo.updateThenGetDiskInfos(WorkerInfo.scala:224)
at org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager.lambda$updateWorkerHeartbeatMeta$5(AbstractMetaManager.java:205)
at java.util.Optional.ifPresent(Optional.java:159)
at org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager.updateWorkerHeartbeatMeta(AbstractMetaManager.java:203)
at org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager.handleWorkerHeartbeat(SingleMasterMetaManager.java:105)
at org.apache.celeborn.service.deploy.master.Master.org$apache$celeborn$service$deploy$master$Master$$handleHeartbeatFromWorker(Master.scala:428)
at org.apache.celeborn.service.deploy.master.Master$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$20(Master.scala:326)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.celeborn.service.deploy.master.Master.executeWithLeaderChecker(Master.scala:207)
... 8 more
```
According to the suggestion from https://github.com/apache/incubator-celeborn/pull/1602#issuecomment-1596722991
### Does this PR introduce _any_ user-facing change?
Yes, it fixes bug described in https://github.com/apache/incubator-celeborn/pull/1602
### How was this patch tested?
UTs and manual test.
Closes#1605 from waitinfuture/695.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
1. Use `<arg>-Ywarn-unused-import</arg>` to remove some unused imports
There is no way to use `<arg>-Ywarn-unused-import</arg>` at this stage
Because we have the following code
```
// Can Remove this if celeborn don't support scala211 in future
import org.apache.celeborn.common.util.FunctionConverter._
```
2. Fix scala case match not fully covered, avoid `scala.MatchError`
3. Fixed some scala compilation warnings
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1600 from cxzl25/cleanup_code.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1601 from cxzl25/CELEBORN-689.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Celeborn fetch chunk also should support check timeout
#### Test case
```
executor instance 20
SQL:
SELECT count(1) from (select /*+ REPARTITION(100) */ * from spark_auxiliary.t50g) tmp;
--conf spark.celeborn.client.spark.shuffle.writer=sort \
--conf spark.celeborn.client.fetch.excludeWorkerOnFailure.enabled=true \
--conf spark.celeborn.client.push.timeout=10s \
--conf spark.celeborn.client.push.replicate.enabled=true \
--conf spark.celeborn.client.push.revive.maxRetries=10 \
--conf spark.celeborn.client.reserveSlots.maxRetries=10 \
--conf spark.celeborn.client.registerShuffle.maxRetries=3 \
--conf spark.celeborn.client.push.blacklist.enabled=true \
--conf spark.celeborn.client.blacklistSlave.enabled=true \
--conf spark.celeborn.client.fetch.timeout=30s \
--conf spark.celeborn.client.push.data.timeout=30s \
--conf spark.celeborn.client.push.limit.inFlight.timeout=600s \
--conf spark.celeborn.client.push.maxReqsInFlight=32 \
--conf spark.celeborn.client.shuffle.compression.codec=ZSTD \
--conf spark.celeborn.rpc.askTimeout=30s \
--conf spark.celeborn.client.rpc.reserveSlots.askTimeout=30s \
--conf spark.celeborn.client.shuffle.batchHandleChangePartition.enabled=true \
--conf spark.celeborn.client.shuffle.batchHandleCommitPartition.enabled=true \
--conf spark.celeborn.client.shuffle.batchHandleReleasePartition.enabled=true
```
Test with 3 worker and add a `Thread.sleep(100s)` before worker handle `ChunkFetchRequest`
Before patch
<img width="1783" alt="截屏2023-06-14 上午11 20 55" src="https://github.com/apache/incubator-celeborn/assets/46485123/182dff7d-a057-4077-8368-d1552104d206">
After patch
<img width="1792" alt="image" src="https://github.com/apache/incubator-celeborn/assets/46485123/3c8b7933-8ace-426d-8e9f-04e0aabfac8e">
The log shows the fetch timeout checker workers
```
23/06/14 11:14:54 ERROR WorkerPartitionReader: Fetch chunk 0 failed.
org.apache.celeborn.common.exception.CelebornIOException: FETCH_DATA_TIMEOUT
at org.apache.celeborn.common.network.client.TransportResponseHandler.failExpiredFetchRequest(TransportResponseHandler.java:147)
at org.apache.celeborn.common.network.client.TransportResponseHandler.lambda$new$1(TransportResponseHandler.java:103)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
23/06/14 11:14:54 WARN RssInputStream: Fetch chunk failed 1/6 times for location PartitionLocation[
id-epoch:35-0
host-rpcPort-pushPort-fetchPort-replicatePort:10.169.48.203-9092-9094-9093-9095
mode:MASTER
peer:(host-rpcPort-pushPort-fetchPort-replicatePort:10.169.48.202-9092-9094-9093-9095)
storage hint:StorageInfo{type=HDD, mountPoint='/mnt/ssd/0', finalResult=true, filePath=}
mapIdBitMap:null], change to peer
org.apache.celeborn.common.exception.CelebornIOException: Fetch chunk 0 failed.
at org.apache.celeborn.client.read.WorkerPartitionReader$1.onFailure(WorkerPartitionReader.java:98)
at org.apache.celeborn.common.network.client.TransportResponseHandler.failExpiredFetchRequest(TransportResponseHandler.java:146)
at org.apache.celeborn.common.network.client.TransportResponseHandler.lambda$new$1(TransportResponseHandler.java:103)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.celeborn.common.exception.CelebornIOException: FETCH_DATA_TIMEOUT
at org.apache.celeborn.common.network.client.TransportResponseHandler.failExpiredFetchRequest(TransportResponseHandler.java:147)
... 8 more
23/06/14 11:14:54 INFO SortBasedShuffleWriter: Memory used 72.0 MB
```
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1587 from AngersZhuuuu/CELEBORN-676.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
Give Heartbeat one byte message and skip this byte when decode.
### Why are the changes needed?
Heartbeat message may split in to two netty buffer, then the `empty buffer` (which don't need actually, but need keep) be wrong removed, then decodeNext would throw NPE. see
``` java
while (headerBuf.readableBytes() < HEADER_SIZE) {
ByteBuf next = buffers.getFirst();
int toRead = Math.min(next.readableBytes(), HEADER_SIZE - headerBuf.readableBytes());
headerBuf.writeBytes(next, toRead);
if (!next.isReadable()) {
buffers.removeFirst().release();
}
}
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT & MANUAL
Closes#1589 from RexXiong/CELEBORN-675.
Authored-by: Shuang <lvshuang.tb@gmail.com>
Signed-off-by: zhongqiang.czq <zhongqiang.czq@alibaba-inc.com>
### What changes were proposed in this pull request?
Add hasPeer method to PartitionLocation
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1583 from AngersZhuuuu/CELEBORN-671.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
…eful is disabled
### What changes were proposed in this pull request?
Worker should report WorkerLost instead of WorkerUnavailable in it's shutdown hook if graceful shutdown is disabled.
### Why are the changes needed?
To avoid unnecessary commit file requests from lifecycle manager since it's not graceful shutdown.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Closes#1580 from waitinfuture/668.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
In this PR, worker always report node unavailable regardless graceful shutdown is turned on or off.
### Why are the changes needed?
To inform master the shutting down worker as soon as possible.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes#1575 from waitinfuture/662.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
This PR upgrades
- `mockito` from 1.10.19 and 3.6.0 to 4.11.0
- `scalatest` from 3.2.3 to 3.2.16
- `mockito-scalatest` from 1.16.37 to 1.17.14
### Why are the changes needed?
Housekeeping, making test dependencies up-to-date and unified.
### Does this PR introduce _any_ user-facing change?
No, it only affects test.
### How was this patch tested?
Pass GA.
Closes#1562 from pan3793/CELEBORN-650.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
…ngth
### What changes were proposed in this pull request?
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1519 from zhongqiangczq/mapfilelength.
Authored-by: zhongqiang.czq <zhongqiang.czq@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Refine the logic here to make it easier understand.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#1555 from AngersZhuuuu/CELEBORN-645.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
`SimpleDateFormat` is not thread-safe, replace it with a thread-safe `FastDateFormat`
### Why are the changes needed?
`FastDateFormat` is a fast and thread-safe version of `java.text.SimpleDateFormat`.
### Does this PR introduce _any_ user-facing change?
Yes, it's a bug fix.
### How was this patch tested?
Manually review.
Closes#1545 from pan3793/CELEBORN-636.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Ethan Feng <ethanfeng@apache.org>
### What changes were proposed in this pull request?
Lock flushBuffer field and flush method to make sure thread safe access.
### Why are the changes needed?
When stageEnd, worker will commit files and filewriters would be closed, the speculative task may still push data to the file writer, if the push task increment numPendingWrites. the commit thread which hold the filewriter object lock will need wait the pending writes decrement to 0. but push thread need the filewriter object lock to decrement numPendingWrites, this cause deadlock..
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes#1534 from RexXiong/CELEBORN-626.
Authored-by: Shuang <lvshuang.tb@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>