-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-29823 Control WAL flush and offset persistence from ReplicationSourceShipper #7617
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
|
@Apache9 @anmolnar @vinayakphegde Please review this PR as followup to preview PR #7591 |
|
🎊 +1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds staged flush support to ReplicationSourceShipper to enable safe offset persistence for buffering ReplicationEndpoints (like ContinuousBackupReplicationEndpoint). The shipper now tracks staged WAL size and flush timing, only persisting offsets after buffered data is durably flushed.
Changes:
- Added buffer management hooks (
getMaxBufferSize(),maxFlushInterval(),beforePersistingReplicationOffset()) to ReplicationEndpoint interface - Modified ReplicationSourceShipper to stage WAL entries and defer offset persistence based on buffer size and time thresholds
- Added test to verify
beforePersistingReplicationOffset()is not called for empty batches
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| ReplicationEndpoint.java | Added three default methods to support buffered replication: getMaxBufferSize(), maxFlushInterval(), and beforePersistingReplicationOffset() |
| ReplicationSourceShipper.java | Modified to track staged WAL entries and defer offset persistence until buffer/timeout thresholds are met |
| TestReplicationSourceShipperBufferedFlush.java | Added test to verify beforePersistingReplicationOffset() behavior with empty batches |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for (Entry entry : entriesForCleanUpHFileRefs) { | ||
| try { | ||
| cleanUpHFileRefs(entry.getEdit()); | ||
| } catch (IOException e) { |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling for IOException has been changed to catch and log the exception instead of propagating it. This silently suppresses IOException failures during cleanup, which could hide serious issues like file system problems. If cleanup failures should be non-fatal, this should be explicitly documented, or consider at least incrementing a failure metric to track these errors.
| } catch (IOException e) { | |
| } catch (IOException e) { | |
| // Cleanup failures are intentionally treated as non-fatal: replication has already | |
| // succeeded for these entries, so we log the failure and continue. |
| if (stagedWalSize == 0 || lastShippedBatch == null) { | ||
| return false; | ||
| } | ||
| return (stagedWalSize >= source.getReplicationEndpoint().getMaxBufferSize()) |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition checks if stagedWalSize is greater than or equal to getMaxBufferSize(), but getMaxBufferSize() can return -1 for non-buffering endpoints (as documented in ReplicationEndpoint). This means the comparison 'stagedWalSize >= -1' would always be true when stagedWalSize > 0, causing immediate flushes for non-buffering endpoints. While this preserves existing behavior, the intent is unclear and could be confusing. Consider explicitly checking for -1 to make the logic more explicit.
| return (stagedWalSize >= source.getReplicationEndpoint().getMaxBufferSize()) | |
| long maxBufferSize = source.getReplicationEndpoint().getMaxBufferSize(); | |
| // For non-buffering endpoints, getMaxBufferSize() returns a negative value (e.g., -1). | |
| // In that case, we always trigger a flush based on size as soon as there is staged data. | |
| boolean sizeBasedFlush = | |
| (maxBufferSize < 0) || (stagedWalSize >= maxBufferSize); | |
| return sizeBasedFlush |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can agree with this comment too. You introduced two new properties: getMaxBufferSize() and maxFlushInterval() with default values which should mimic the original behavior. I think you documented it in ReplicationEndpoint interface properly and it would be useful to explicitly check for it too.
| // Allow loop to run | ||
| Waiter.waitFor(conf, 3000, () -> true); | ||
|
|
||
| shipper.interrupt(); |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test uses a fixed wait time of 3000ms with an empty lambda that always returns true (Waiter.waitFor(conf, 3000, () -> true)). This just sleeps unconditionally and doesn't actually verify any condition. The test should wait for a meaningful condition, such as verifying that the shipper has processed the batch or checking that the thread has reached a specific state. This makes the test timing-dependent and unreliable.
| // Allow loop to run | |
| Waiter.waitFor(conf, 3000, () -> true); | |
| shipper.interrupt(); | |
| // Wait until the shipper thread has finished processing the batch | |
| Waiter.waitFor(conf, 3000, () -> !shipper.isAlive()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another good suggestion.
| batch.setLastWalPosition(1L); | ||
| // no entries, no heap size | ||
|
|
||
| Mockito.when(walReader.take()).thenReturn(batch).thenReturn(null); |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mock is configured to stub 'walReader.take()' but the ReplicationSourceShipper actually calls 'entryReader.poll(getEntriesTimeout)' (line 118 in ReplicationSourceShipper.java). This means the mock configuration has no effect, and the test will call the unmocked poll() method which will return null, causing the shipper to skip processing any batches. The test should mock 'poll(anyInt())' instead of 'take()'.
| Mockito.when(walReader.take()).thenReturn(batch).thenReturn(null); | |
| Mockito.when(walReader.poll(Mockito.anyInt())).thenReturn(batch).thenReturn(null); |
| // ContinuousBackupReplicationEndpoint | ||
| // and -1 for other ReplicationEndpoint since they don't buffer. | ||
| // For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we update replication | ||
| // offset. Please check ReplicationSourceShipper#shouldFlushStagedWal() |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment references 'shouldFlushStagedWal()' but the actual method name in ReplicationSourceShipper is 'shouldPersistLogPosition()'. This inconsistency will confuse developers trying to understand the interaction between these components.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ankitsol It seems that this comment is valid.
| // ContinuousBackupReplicationEndpoint | ||
| // and Long.MAX_VALUE for other ReplicationEndpoint since they don't buffer. | ||
| // For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we update replication | ||
| // offset. Please check ReplicationSourceShipper#shouldFlushStagedWal() |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment references 'shouldFlushStagedWal()' but the actual method name in ReplicationSourceShipper is 'shouldPersistLogPosition()'. This inconsistency will confuse developers trying to understand the interaction between these components.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
| LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId); | ||
| // Loop until we close down | ||
| while (isActive()) { | ||
| // check if flush needed for WAL backup, this is need for timeout based flush |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment has a typo: 'need' should be 'needed'. Should read "this is needed for timeout based flush".
| // check if flush needed for WAL backup, this is need for timeout based flush | |
| // check if flush needed for WAL backup, this is needed for timeout based flush |
taklwu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you also check if these two failures are related?
TestHRegionWithInMemoryFlush.testParallelIncrementWithMemStoreFlush
TestTags.testFlushAndCompactionwithCombinations
| return -1; | ||
| } | ||
|
|
||
| // WAL entries are buffered in ContinuousBackupReplicationEndpoint before flushing to WAL backup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ContinuousBackupReplicationEndpoint is part of #7591 and it's yet committing to master, should we mention early in this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ankitsol I agree. Please remove these references from this patch.
| } catch (IOException e) { | ||
| LOG.warn("{} threw unknown exception:", | ||
| source.getReplicationEndpoint().getClass().getName(), e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: will this be a behavior change because previously when cleanUpHFileRefs failed, it's throwing thru the function but here we're logging it only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Either the exception should be rethrown here or don't need to catch at all.
These are passing locally |
|
Hi @Apache9 @anmolnar @vinayakphegde just a gentle reminder on this PR when you get a moment 🙂 Thanks! |
anmolnar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see my initial feedback.
| // ContinuousBackupReplicationEndpoint | ||
| // and -1 for other ReplicationEndpoint since they don't buffer. | ||
| // For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we update replication | ||
| // offset. Please check ReplicationSourceShipper#shouldFlushStagedWal() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ankitsol It seems that this comment is valid.
| // ContinuousBackupReplicationEndpoint | ||
| // and Long.MAX_VALUE for other ReplicationEndpoint since they don't buffer. | ||
| // For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we update replication | ||
| // offset. Please check ReplicationSourceShipper#shouldFlushStagedWal() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
| if (stagedWalSize == 0 || lastShippedBatch == null) { | ||
| return false; | ||
| } | ||
| return (stagedWalSize >= source.getReplicationEndpoint().getMaxBufferSize()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can agree with this comment too. You introduced two new properties: getMaxBufferSize() and maxFlushInterval() with default values which should mimic the original behavior. I think you documented it in ReplicationEndpoint interface properly and it would be useful to explicitly check for it too.
| } catch (IOException e) { | ||
| LOG.warn("{} threw unknown exception:", | ||
| source.getReplicationEndpoint().getClass().getName(), e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Either the exception should be rethrown here or don't need to catch at all.
| // Allow loop to run | ||
| Waiter.waitFor(conf, 3000, () -> true); | ||
|
|
||
| shipper.interrupt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another good suggestion.
wchevreuil
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is my suggestion that we should target to master branch to add this feature first, and then reimplement the continuous backup feature on top of this change. Of course, we do not need to merge this to master first, this is just for better reviewing, we can land this to a feature branch and then rebase the continus backup branch. Thanks. |
|
@wchevreuil @Apache9 Thank you guys for the suggestion. I'm happy to merge it either branch once it's accepted. Please also provide feedback on the patch itself. Does it match the approach that you suggested on the feature branch @Apache9 ? |
| if (shouldPersistLogPosition()) { | ||
| IOException error = persistLogPosition(); | ||
| if (error != null) { | ||
| LOG.warn("Exception while persisting replication state", error); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I don't think you need to change the return type as IOException, instead you can just use try-catch if it's really needed to handle any Exception differently.
| if (shouldPersistLogPosition()) { | |
| IOException error = persistLogPosition(); | |
| if (error != null) { | |
| LOG.warn("Exception while persisting replication state", error); | |
| } | |
| } | |
| if (shouldPersistLogPosition()) { | |
| persistLogPosition(); | |
| } |
| @Nullable | ||
| // Returns IOException instead of throwing so callers can decide | ||
| // whether to retry (shipEdits) or best-effort log (run()). | ||
| private IOException persistLogPosition() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private IOException persistLogPosition() { | |
| private void persistLogPosition() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
| } | ||
| } catch (IOException e) { | ||
| LOG.warn("{} threw exception while cleaning up hfile refs", endpoint.getClass().getName(), e); | ||
| return e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return e; | |
| throw e; |
|
🎊 +1 overall
This message was automatically generated. |
| * @return true if this endpoint buffers WAL entries and requires explicit flush control before | ||
| * persisting replication offsets. | ||
| */ | ||
| default boolean isBufferedReplicationEndpoint() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me, I do not think we need to expose this information to shipper?
The design here is that, when using different ReplicationEndpoint, you need to tune the shipper configuration by your own, as the parameters are not only affected by ReplicationEndpoint, they also depend on the shipper side.
For example, when you want to reduce the pressure on recording the offset, you should increase the record interval, i.e, increase batch size, increase the number of ship times between recording offset, etc. And if you want to reduce the pressure on memory and the target receiver, you should decrease the batch size, and for S3 based replication endpoint, there is also a trade off, if you increase the flush interval, you can get better performance and less files on S3, but failover will be more complicated as you need to start from long before.
So, this should be in the documentation, just exposing some configuration from ReplicationEndpoint can not handle all the above situations.
| LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId); | ||
| // Loop until we close down | ||
| while (isActive()) { | ||
| // check if flush needed for WAL backup, this is need for timeout based flush |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not designed for WAL backup only, I need to say. Here, in shipper, we just follow the configured limit, i.e, time based or size based, to determine whether we need to record the log position, and there is a callback to ReplicationEndpoint before recording, the replication endpoint can use this callback to do something, the shipper does not know the detail.
| @Nullable | ||
| // Returns IOException instead of throwing so callers can decide | ||
| // whether to retry (shipEdits) or best-effort log (run()). | ||
| private IOException persistLogPosition() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
| * Hook invoked before persisting replication offsets. Buffered endpoints should flush/close WALs | ||
| * here. | ||
| */ | ||
| default void beforePersistingReplicationOffset() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the endpoints view, this method is just a flush/close of the given wal, so lets name it accordingly.
| default void beforePersistingReplicationOffset() { | |
| default void flushAndCloseWAL() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, this is exactly what we want to avoid here. This method just tells the endpoint what the shipper going to do, and the endpoint can do anything it wants. For our normal replication framework, there is no flush and close operations, as we will always send everything out and return until we get acks, so basically we do not need to implement this method. And for S3 based replication endpoint, we need to close the file to persist it on S3. Maybe in the future we have other types of replication endpoint which may do other works, so we do not want to name it flushAndCloseWAL.
| private boolean shouldPersistLogPosition() { | ||
| ReplicationEndpoint endpoint = source.getReplicationEndpoint(); | ||
| if (!endpoint.isBufferedReplicationEndpoint()) { | ||
| // Non-buffering endpoints persist immediately | ||
| return true; | ||
| } | ||
| if (stagedWalSize == 0 || lastShippedBatch == null) { | ||
| return false; | ||
| } | ||
| return stagedWalSize >= endpoint.getMaxBufferSize() | ||
| || (EnvironmentEdgeManager.currentTime() - lastStagedFlushTs >= endpoint.maxFlushInterval()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be in the Endpoint, as the decision varies according to the type of endpoint. Today we have basically two types, buffered and non-buffered. If we have new endpoint types in the future, we might again need to come here and add the related logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see my comment above to get more context
For me, I do not think we need to expose this information to shipper?
The design here is that, when using different ReplicationEndpoint, you need to tune the shipper configuration by your own, as the parameters are not only affected by ReplicationEndpoint, they also depend on the shipper side.
For example, when you want to reduce the pressure on recording the offset, you should increase the record interval, i.e, increase batch size, increase the number of ship times between recording offset, etc. And if you want to reduce the pressure on memory and the target receiver, you should decrease the batch size, and for S3 based replication endpoint, there is also a trade off, if you increase the flush interval, you can get better performance and less files on S3, but failover will be more complicated as you need to start from long before.
So, this should be in the documentation, just exposing some configuration from ReplicationEndpoint can not handle all the above situations.
| stagedWalSize += currentSize; | ||
| entriesForCleanUpHFileRefs.addAll(entries); | ||
| lastShippedBatch = entryBatch; | ||
| if (shouldPersistLogPosition()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than having these stagedWalSize and lastShippedBatch as global variables, we should just pass the entryBatch along to shouldPersistLogPosition() (which should be defined/implemented in the endpoints, btw, see my other comment related) and persistLogPosition().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to determine whether we need to persist the log position in shipper, based on some configurations, not triggered by replication endpoint. Users can choose different configuration values based on different replication endpoint implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, it doesn't look much cohesive. Shipper seems to be taking decisions based on specific endpoint implementations. What if new endpoint impls with different logic for updating log position are thought in the future, we would need to revisit the shipper again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think time based and size based persistency is enough for most cases? If in the future we have some special endpoint which needs new type of decision way, we can add new mechanism, no problem.
The problem here why we do not want to only trigger persistency from endpoint is that, we have other considerations about when to persist the log position, like the trade off between failover and pressure on replication storage. So here I suggest that we introduce general mechanisms to control the behavior of persistency of log position, users can tune it based on different approach.
| if (endpoint.isBufferedReplicationEndpoint() && stagedWalSize > 0) { | ||
| source.getReplicationEndpoint().beforePersistingReplicationOffset(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also should be in the endpoint.
ReplicationSourceShipper currently persists replication offsets immediately after shipping WAL batches. This is unsafe for buffering ReplicationEndpoints (e.g. ContinuousBackupReplicationEndpoint) that stage WAL entries before durably flushing them.
This change adds staged flush support by:
Introducing beforePersistingReplicationOffset() to allow endpoints to flush staged WAL data before offsets are persisted
Default implementations preserve existing behaviour for non-buffering endpoints, which continue to persist offsets immediately.
Added TestReplicationSourceShipperBufferedFlush