[CELEBORN-2222][CIP-14] Support Retrying when createReader failed for CelebornInputStream in CppClient#3583
Conversation
|
@HolyLow @SteNicholas @FMX @RexXiong Could you please help review this PR? Appreciate your help in improving this as needed! |
|
|
||
| int clientFetchMaxRetriesForEachReplica() const; | ||
|
|
||
| Timeout dataIoRetryWait() const; |
There was a problem hiding this comment.
Could this method name align with CelebornConf#networkIoRetryWaitMs for io wait conf of all modules?
| try { | ||
| VLOG(1) << "Create reader for location " << currentLocation->host << ":" | ||
| << currentLocation->fetchPort; | ||
| auto reader = createReader(*currentLocation); |
There was a problem hiding this comment.
This should check whether the partition location is excluded, which aligns with the logic of CelebornInputStream#createReaderWithRetry.
| return reader; | ||
| } catch (const std::exception& e) { | ||
| lastException = std::current_exception(); | ||
| fetchChunkRetryCnt_++; |
There was a problem hiding this comment.
The shuffle client should exclude failed fetch location.
| std::this_thread::sleep_for( | ||
| std::chrono::milliseconds(retryWait_.count())); | ||
| } | ||
| LOG(WARNING) << "CreatePartitionReader failed " << fetchChunkRetryCnt_ |
There was a problem hiding this comment.
Does this aligin with the failure handling of CelebornInputStream#createReaderWithRetry?
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3583 +/- ##
==========================================
- Coverage 67.13% 67.04% -0.09%
==========================================
Files 357 357
Lines 21860 21924 +64
Branches 1943 1949 +6
==========================================
+ Hits 14674 14696 +22
- Misses 6166 6213 +47
+ Partials 1020 1015 -5 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Thank you for your comments @SteNicholas , I will take a look over the next couple of days. I suspect some refactoring may need to be done to this PR, I will notify you once done. |
There was a problem hiding this comment.
Pull request overview
This PR implements retry support for createReader failures in the C++ client to match the Java implementation's behavior. It adds retry configuration, peer location helper methods, and implements the retry logic with peer failover.
Changes:
- Added three configuration properties for retry behavior:
clientFetchMaxRetriesForEachReplica,dataIoRetryWait, andclientPushReplicateEnabled - Added helper methods to PartitionLocation for peer access and formatting:
hasPeer(),getPeer(), andhostAndFetchPort() - Implemented retry logic in
createReaderWithRetry()that switches between primary and peer replicas on failure
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| cpp/celeborn/protocol/tests/PartitionLocationTest.cpp | Added unit tests for new PartitionLocation helper methods |
| cpp/celeborn/protocol/PartitionLocation.h | Declared three new helper methods for peer access and port formatting |
| cpp/celeborn/protocol/PartitionLocation.cpp | Implemented the three new helper methods |
| cpp/celeborn/conf/tests/CelebornConfTest.cpp | Added tests for new configuration properties and their default values |
| cpp/celeborn/conf/CelebornConf.h | Declared three new configuration properties and their accessor methods |
| cpp/celeborn/conf/CelebornConf.cpp | Implemented configuration property definitions and accessor methods |
| cpp/celeborn/client/reader/CelebornInputStream.h | Added member variables for retry tracking and retry wait timeout |
| cpp/celeborn/client/reader/CelebornInputStream.cpp | Implemented retry logic with peer failover and sleep between retries |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| VLOG(1) << "Create reader for location " << currentLocation->host << ":" | ||
| << currentLocation->fetchPort; | ||
| auto reader = createReader(*currentLocation); | ||
| fetchChunkRetryCnt_ = 0; |
There was a problem hiding this comment.
Resetting fetchChunkRetryCnt_ to 0 on successful reader creation (line 207) is redundant since it's already reset at line 187 in moveToNextReader() before this function is called. While not harmful, removing this reset would make the code cleaner and align better with the Java implementation which doesn't reset the counter after successful reader creation.
| fetchChunkRetryCnt_ = 0; |
| CELEBORN_FAIL( | ||
| "createPartitionReader failed after " + | ||
| std::to_string(fetchChunkRetryCnt_) + " retries for location " + | ||
| location.hostAndFetchPort()); |
There was a problem hiding this comment.
The captured lastException should be rethrown instead of using CELEBORN_FAIL. The Java implementation throws CelebornIOException with the lastException as the cause. In C++, you should use std::rethrow_exception(lastException) to preserve the original exception information, which is critical for debugging. If you want to add context, you could wrap it in a CelebornRuntimeError similar to the pattern seen in CelebornException.cpp line 35-36.
This PR implements retry support for createReader failures in the C++ client, matching the behavior of the Java implementation. The implementation includes:
Added configuration properties:
Added peer location support methods to PartitionLocation:
Implemented retry logic in createReaderWithRetry():
Added unit tests for new functionality
How was this patch tested?
Unit tests and compiling