Skip to content

Conversation

@ankitsol
Copy link

@ankitsol ankitsol commented Jan 12, 2026

ReplicationSourceShipper currently persists replication offsets immediately after shipping WAL batches. This is unsafe for buffering ReplicationEndpoints (e.g. ContinuousBackupReplicationEndpoint) that stage WAL entries before durably flushing them.

This change adds staged flush support by:

  • Tracking staged WAL size and last flush time in ReplicationSourceShipper
  • Adding optional buffer size and flush interval hooks to ReplicationEndpoint

Introducing beforePersistingReplicationOffset() to allow endpoints to flush staged WAL data before offsets are persisted

Default implementations preserve existing behaviour for non-buffering endpoints, which continue to persist offsets immediately.

Added TestReplicationSourceShipperBufferedFlush

  • Verifies beforePersistingReplicationOffset() is not called for empty batches

@Apache-HBase

This comment has been minimized.

@Apache-HBase

This comment has been minimized.

@ankitsol
Copy link
Author

@Apache9 @anmolnar @vinayakphegde Please review this PR as followup to preview PR #7591

@Apache-HBase
Copy link

🎊 +1 overall

Vote Subsystem Runtime Logfile Comment
+0 🆗 reexec 0m 28s Docker mode activated.
_ Prechecks _
+1 💚 dupname 0m 0s No case conflicting files found.
+0 🆗 codespell 0m 0s codespell was not available.
+0 🆗 detsecrets 0m 0s detect-secrets was not available.
+1 💚 @author 0m 0s The patch does not contain any @author tags.
+1 💚 hbaseanti 0m 0s Patch does not have any anti-patterns.
_ master Compile Tests _
+1 💚 mvninstall 3m 39s master passed
+1 💚 compile 4m 9s master passed
+1 💚 checkstyle 1m 10s master passed
+1 💚 spotbugs 2m 5s master passed
+1 💚 spotless 1m 4s branch has no errors when running spotless:check.
_ Patch Compile Tests _
+1 💚 mvninstall 3m 45s the patch passed
+1 💚 compile 4m 8s the patch passed
+1 💚 javac 4m 8s the patch passed
+1 💚 blanks 0m 0s The patch has no blanks issues.
+1 💚 checkstyle 1m 10s the patch passed
+1 💚 spotbugs 2m 28s the patch passed
+1 💚 hadoopcheck 13m 50s Patch does not cause any errors with Hadoop 3.3.6 3.4.1.
+1 💚 spotless 0m 56s patch has no errors when running spotless:check.
_ Other Tests _
+1 💚 asflicense 0m 13s The patch does not generate ASF License warnings.
48m 15s
Subsystem Report/Notes
Docker ClientAPI=1.43 ServerAPI=1.43 base: https://ci-hbase.apache.org/job/HBase-PreCommit-GitHub-PR/job/PR-7617/2/artifact/yetus-general-check/output/Dockerfile
GITHUB PR #7617
Optional Tests dupname asflicense javac spotbugs checkstyle codespell detsecrets compile hadoopcheck hbaseanti spotless
uname Linux 4f1801e5c24c 5.4.0-1103-aws #111~18.04.1-Ubuntu SMP Tue May 23 20:04:10 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux
Build tool maven
Personality dev-support/hbase-personality.sh
git revision master / 95000e0
Default Java Eclipse Adoptium-17.0.11+9
Max. process+thread count 83 (vs. ulimit of 30000)
modules C: hbase-server U: hbase-server
Console output https://ci-hbase.apache.org/job/HBase-PreCommit-GitHub-PR/job/PR-7617/2/console
versions git=2.34.1 maven=3.9.8 spotbugs=4.7.3
Powered by Apache Yetus 0.15.0 https://yetus.apache.org

This message was automatically generated.

@Apache-HBase
Copy link

💔 -1 overall

Vote Subsystem Runtime Logfile Comment
+0 🆗 reexec 0m 14s Docker mode activated.
-0 ⚠️ yetus 0m 3s Unprocessed flag(s): --brief-report-file --spotbugs-strict-precheck --author-ignore-list --blanks-eol-ignore-file --blanks-tabs-ignore-file --quick-hadoopcheck
_ Prechecks _
_ master Compile Tests _
+1 💚 mvninstall 3m 19s master passed
+1 💚 compile 0m 59s master passed
+1 💚 javadoc 0m 30s master passed
+1 💚 shadedjars 6m 20s branch has no errors when building our shaded downstream artifacts.
_ Patch Compile Tests _
+1 💚 mvninstall 2m 55s the patch passed
+1 💚 compile 1m 0s the patch passed
+1 💚 javac 1m 0s the patch passed
+1 💚 javadoc 0m 27s the patch passed
+1 💚 shadedjars 6m 20s patch has no errors when building our shaded downstream artifacts.
_ Other Tests _
-1 ❌ unit 251m 26s /patch-unit-hbase-server.txt hbase-server in the patch failed.
277m 59s
Subsystem Report/Notes
Docker ClientAPI=1.52 ServerAPI=1.52 base: https://ci-hbase.apache.org/job/HBase-PreCommit-GitHub-PR/job/PR-7617/2/artifact/yetus-jdk17-hadoop3-check/output/Dockerfile
GITHUB PR #7617
Optional Tests javac javadoc unit compile shadedjars
uname Linux 82b2e4b8e507 6.14.0-1018-aws #18~24.04.1-Ubuntu SMP Mon Nov 24 19:46:27 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Build tool maven
Personality dev-support/hbase-personality.sh
git revision master / 95000e0
Default Java Eclipse Adoptium-17.0.11+9
Test Results https://ci-hbase.apache.org/job/HBase-PreCommit-GitHub-PR/job/PR-7617/2/testReport/
Max. process+thread count 4718 (vs. ulimit of 30000)
modules C: hbase-server U: hbase-server
Console output https://ci-hbase.apache.org/job/HBase-PreCommit-GitHub-PR/job/PR-7617/2/console
versions git=2.34.1 maven=3.9.8
Powered by Apache Yetus 0.15.0 https://yetus.apache.org

This message was automatically generated.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds staged flush support to ReplicationSourceShipper to enable safe offset persistence for buffering ReplicationEndpoints (like ContinuousBackupReplicationEndpoint). The shipper now tracks staged WAL size and flush timing, only persisting offsets after buffered data is durably flushed.

Changes:

  • Added buffer management hooks (getMaxBufferSize(), maxFlushInterval(), beforePersistingReplicationOffset()) to ReplicationEndpoint interface
  • Modified ReplicationSourceShipper to stage WAL entries and defer offset persistence based on buffer size and time thresholds
  • Added test to verify beforePersistingReplicationOffset() is not called for empty batches

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 7 comments.

File Description
ReplicationEndpoint.java Added three default methods to support buffered replication: getMaxBufferSize(), maxFlushInterval(), and beforePersistingReplicationOffset()
ReplicationSourceShipper.java Modified to track staged WAL entries and defer offset persistence until buffer/timeout thresholds are met
TestReplicationSourceShipperBufferedFlush.java Added test to verify beforePersistingReplicationOffset() behavior with empty batches

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

for (Entry entry : entriesForCleanUpHFileRefs) {
try {
cleanUpHFileRefs(entry.getEdit());
} catch (IOException e) {
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling for IOException has been changed to catch and log the exception instead of propagating it. This silently suppresses IOException failures during cleanup, which could hide serious issues like file system problems. If cleanup failures should be non-fatal, this should be explicitly documented, or consider at least incrementing a failure metric to track these errors.

Suggested change
} catch (IOException e) {
} catch (IOException e) {
// Cleanup failures are intentionally treated as non-fatal: replication has already
// succeeded for these entries, so we log the failure and continue.

Copilot uses AI. Check for mistakes.
if (stagedWalSize == 0 || lastShippedBatch == null) {
return false;
}
return (stagedWalSize >= source.getReplicationEndpoint().getMaxBufferSize())
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition checks if stagedWalSize is greater than or equal to getMaxBufferSize(), but getMaxBufferSize() can return -1 for non-buffering endpoints (as documented in ReplicationEndpoint). This means the comparison 'stagedWalSize >= -1' would always be true when stagedWalSize > 0, causing immediate flushes for non-buffering endpoints. While this preserves existing behavior, the intent is unclear and could be confusing. Consider explicitly checking for -1 to make the logic more explicit.

Suggested change
return (stagedWalSize >= source.getReplicationEndpoint().getMaxBufferSize())
long maxBufferSize = source.getReplicationEndpoint().getMaxBufferSize();
// For non-buffering endpoints, getMaxBufferSize() returns a negative value (e.g., -1).
// In that case, we always trigger a flush based on size as soon as there is staged data.
boolean sizeBasedFlush =
(maxBufferSize < 0) || (stagedWalSize >= maxBufferSize);
return sizeBasedFlush

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can agree with this comment too. You introduced two new properties: getMaxBufferSize() and maxFlushInterval() with default values which should mimic the original behavior. I think you documented it in ReplicationEndpoint interface properly and it would be useful to explicitly check for it too.

Comment on lines +123 to +126
// Allow loop to run
Waiter.waitFor(conf, 3000, () -> true);

shipper.interrupt();
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test uses a fixed wait time of 3000ms with an empty lambda that always returns true (Waiter.waitFor(conf, 3000, () -> true)). This just sleeps unconditionally and doesn't actually verify any condition. The test should wait for a meaningful condition, such as verifying that the shipper has processed the batch or checking that the thread has reached a specific state. This makes the test timing-dependent and unreliable.

Suggested change
// Allow loop to run
Waiter.waitFor(conf, 3000, () -> true);
shipper.interrupt();
// Wait until the shipper thread has finished processing the batch
Waiter.waitFor(conf, 3000, () -> !shipper.isAlive());

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another good suggestion.

batch.setLastWalPosition(1L);
// no entries, no heap size

Mockito.when(walReader.take()).thenReturn(batch).thenReturn(null);
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mock is configured to stub 'walReader.take()' but the ReplicationSourceShipper actually calls 'entryReader.poll(getEntriesTimeout)' (line 118 in ReplicationSourceShipper.java). This means the mock configuration has no effect, and the test will call the unmocked poll() method which will return null, causing the shipper to skip processing any batches. The test should mock 'poll(anyInt())' instead of 'take()'.

Suggested change
Mockito.when(walReader.take()).thenReturn(batch).thenReturn(null);
Mockito.when(walReader.poll(Mockito.anyInt())).thenReturn(batch).thenReturn(null);

Copilot uses AI. Check for mistakes.
// ContinuousBackupReplicationEndpoint
// and -1 for other ReplicationEndpoint since they don't buffer.
// For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we update replication
// offset. Please check ReplicationSourceShipper#shouldFlushStagedWal()
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment references 'shouldFlushStagedWal()' but the actual method name in ReplicationSourceShipper is 'shouldPersistLogPosition()'. This inconsistency will confuse developers trying to understand the interaction between these components.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ankitsol It seems that this comment is valid.

// ContinuousBackupReplicationEndpoint
// and Long.MAX_VALUE for other ReplicationEndpoint since they don't buffer.
// For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we update replication
// offset. Please check ReplicationSourceShipper#shouldFlushStagedWal()
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment references 'shouldFlushStagedWal()' but the actual method name in ReplicationSourceShipper is 'shouldPersistLogPosition()'. This inconsistency will confuse developers trying to understand the interaction between these components.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId);
// Loop until we close down
while (isActive()) {
// check if flush needed for WAL backup, this is need for timeout based flush
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment has a typo: 'need' should be 'needed'. Should read "this is needed for timeout based flush".

Suggested change
// check if flush needed for WAL backup, this is need for timeout based flush
// check if flush needed for WAL backup, this is needed for timeout based flush

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@taklwu taklwu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also check if these two failures are related?

TestHRegionWithInMemoryFlush.testParallelIncrementWithMemStoreFlush
TestTags.testFlushAndCompactionwithCombinations

return -1;
}

// WAL entries are buffered in ContinuousBackupReplicationEndpoint before flushing to WAL backup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ContinuousBackupReplicationEndpoint is part of #7591 and it's yet committing to master, should we mention early in this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ankitsol I agree. Please remove these references from this patch.

Comment on lines 265 to 268
} catch (IOException e) {
LOG.warn("{} threw unknown exception:",
source.getReplicationEndpoint().getClass().getName(), e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: will this be a behavior change because previously when cleanUpHFileRefs failed, it's throwing thru the function but here we're logging it only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Either the exception should be rethrown here or don't need to catch at all.

@taklwu taklwu requested a review from Apache9 January 13, 2026 01:10
@ankitsol
Copy link
Author

can you also check if these two failures are related?

TestHRegionWithInMemoryFlush.testParallelIncrementWithMemStoreFlush TestTags.testFlushAndCompactionwithCombinations

These are passing locally

@ankitsol
Copy link
Author

Hi @Apache9 @anmolnar @vinayakphegde just a gentle reminder on this PR when you get a moment 🙂 Thanks!
This PR is followup to #7591

Copy link
Contributor

@anmolnar anmolnar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my initial feedback.

// ContinuousBackupReplicationEndpoint
// and -1 for other ReplicationEndpoint since they don't buffer.
// For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we update replication
// offset. Please check ReplicationSourceShipper#shouldFlushStagedWal()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ankitsol It seems that this comment is valid.

// ContinuousBackupReplicationEndpoint
// and Long.MAX_VALUE for other ReplicationEndpoint since they don't buffer.
// For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we update replication
// offset. Please check ReplicationSourceShipper#shouldFlushStagedWal()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

if (stagedWalSize == 0 || lastShippedBatch == null) {
return false;
}
return (stagedWalSize >= source.getReplicationEndpoint().getMaxBufferSize())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can agree with this comment too. You introduced two new properties: getMaxBufferSize() and maxFlushInterval() with default values which should mimic the original behavior. I think you documented it in ReplicationEndpoint interface properly and it would be useful to explicitly check for it too.

Comment on lines 265 to 268
} catch (IOException e) {
LOG.warn("{} threw unknown exception:",
source.getReplicationEndpoint().getClass().getName(), e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Either the exception should be rethrown here or don't need to catch at all.

Comment on lines +123 to +126
// Allow loop to run
Waiter.waitFor(conf, 3000, () -> true);

shipper.interrupt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another good suggestion.

Copy link
Contributor

@wchevreuil wchevreuil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we targeting this to master? Shouldn't it be on the feature branch?

Also, as already mentioned by @anmolnar and @taklwu , we should refrain from adding logic that is specific to the continuous backup replication in the generic replication interfaces/classes.

@Apache9
Copy link
Contributor

Apache9 commented Jan 21, 2026

Why are we targeting this to master? Shouldn't it be on the feature branch?

Also, as already mentioned by @anmolnar and @taklwu , we should refrain from adding logic that is specific to the continuous backup replication in the generic replication interfaces/classes.

This is my suggestion that we should target to master branch to add this feature first, and then reimplement the continuous backup feature on top of this change. Of course, we do not need to merge this to master first, this is just for better reviewing, we can land this to a feature branch and then rebase the continus backup branch.

Thanks.

@anmolnar
Copy link
Contributor

@wchevreuil @Apache9 Thank you guys for the suggestion. I'm happy to merge it either branch once it's accepted.

Please also provide feedback on the patch itself. Does it match the approach that you suggested on the feature branch @Apache9 ?

Comment on lines +108 to +113
if (shouldPersistLogPosition()) {
IOException error = persistLogPosition();
if (error != null) {
LOG.warn("Exception while persisting replication state", error);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I don't think you need to change the return type as IOException, instead you can just use try-catch if it's really needed to handle any Exception differently.

Suggested change
if (shouldPersistLogPosition()) {
IOException error = persistLogPosition();
if (error != null) {
LOG.warn("Exception while persisting replication state", error);
}
}
if (shouldPersistLogPosition()) {
persistLogPosition();
}

@Nullable
// Returns IOException instead of throwing so callers can decide
// whether to retry (shipEdits) or best-effort log (run()).
private IOException persistLogPosition() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private IOException persistLogPosition() {
private void persistLogPosition() throws IOException {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}
} catch (IOException e) {
LOG.warn("{} threw exception while cleaning up hfile refs", endpoint.getClass().getName(), e);
return e;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return e;
throw e;

@Apache-HBase
Copy link

🎊 +1 overall

Vote Subsystem Runtime Logfile Comment
+0 🆗 reexec 0m 29s Docker mode activated.
-0 ⚠️ yetus 0m 3s Unprocessed flag(s): --brief-report-file --spotbugs-strict-precheck --author-ignore-list --blanks-eol-ignore-file --blanks-tabs-ignore-file --quick-hadoopcheck
_ Prechecks _
_ master Compile Tests _
+1 💚 mvninstall 3m 27s master passed
+1 💚 compile 1m 6s master passed
+1 💚 javadoc 0m 30s master passed
+1 💚 shadedjars 5m 55s branch has no errors when building our shaded downstream artifacts.
_ Patch Compile Tests _
+1 💚 mvninstall 3m 8s the patch passed
+1 💚 compile 1m 5s the patch passed
+1 💚 javac 1m 5s the patch passed
+1 💚 javadoc 0m 27s the patch passed
+1 💚 shadedjars 5m 51s patch has no errors when building our shaded downstream artifacts.
_ Other Tests _
+1 💚 unit 241m 59s hbase-server in the patch passed.
269m 23s
Subsystem Report/Notes
Docker ClientAPI=1.43 ServerAPI=1.43 base: https://ci-hbase.apache.org/job/HBase-PreCommit-GitHub-PR/job/PR-7617/3/artifact/yetus-jdk17-hadoop3-check/output/Dockerfile
GITHUB PR #7617
Optional Tests javac javadoc unit compile shadedjars
uname Linux d4d0c9ba21ef 5.4.0-1103-aws #111~18.04.1-Ubuntu SMP Tue May 23 20:04:10 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux
Build tool maven
Personality dev-support/hbase-personality.sh
git revision master / daded96
Default Java Eclipse Adoptium-17.0.11+9
Test Results https://ci-hbase.apache.org/job/HBase-PreCommit-GitHub-PR/job/PR-7617/3/testReport/
Max. process+thread count 3736 (vs. ulimit of 30000)
modules C: hbase-server U: hbase-server
Console output https://ci-hbase.apache.org/job/HBase-PreCommit-GitHub-PR/job/PR-7617/3/console
versions git=2.34.1 maven=3.9.8
Powered by Apache Yetus 0.15.0 https://yetus.apache.org

This message was automatically generated.

* @return true if this endpoint buffers WAL entries and requires explicit flush control before
* persisting replication offsets.
*/
default boolean isBufferedReplicationEndpoint() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me, I do not think we need to expose this information to shipper?

The design here is that, when using different ReplicationEndpoint, you need to tune the shipper configuration by your own, as the parameters are not only affected by ReplicationEndpoint, they also depend on the shipper side.

For example, when you want to reduce the pressure on recording the offset, you should increase the record interval, i.e, increase batch size, increase the number of ship times between recording offset, etc. And if you want to reduce the pressure on memory and the target receiver, you should decrease the batch size, and for S3 based replication endpoint, there is also a trade off, if you increase the flush interval, you can get better performance and less files on S3, but failover will be more complicated as you need to start from long before.

So, this should be in the documentation, just exposing some configuration from ReplicationEndpoint can not handle all the above situations.

LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId);
// Loop until we close down
while (isActive()) {
// check if flush needed for WAL backup, this is need for timeout based flush
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not designed for WAL backup only, I need to say. Here, in shipper, we just follow the configured limit, i.e, time based or size based, to determine whether we need to record the log position, and there is a callback to ReplicationEndpoint before recording, the replication endpoint can use this callback to do something, the shipper does not know the detail.

@Nullable
// Returns IOException instead of throwing so callers can decide
// whether to retry (shipEdits) or best-effort log (run()).
private IOException persistLogPosition() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

* Hook invoked before persisting replication offsets. Buffered endpoints should flush/close WALs
* here.
*/
default void beforePersistingReplicationOffset() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the endpoints view, this method is just a flush/close of the given wal, so lets name it accordingly.

Suggested change
default void beforePersistingReplicationOffset() {
default void flushAndCloseWAL() {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this is exactly what we want to avoid here. This method just tells the endpoint what the shipper going to do, and the endpoint can do anything it wants. For our normal replication framework, there is no flush and close operations, as we will always send everything out and return until we get acks, so basically we do not need to implement this method. And for S3 based replication endpoint, we need to close the file to persist it on S3. Maybe in the future we have other types of replication endpoint which may do other works, so we do not want to name it flushAndCloseWAL.

Comment on lines +249 to +260
private boolean shouldPersistLogPosition() {
ReplicationEndpoint endpoint = source.getReplicationEndpoint();
if (!endpoint.isBufferedReplicationEndpoint()) {
// Non-buffering endpoints persist immediately
return true;
}
if (stagedWalSize == 0 || lastShippedBatch == null) {
return false;
}
return stagedWalSize >= endpoint.getMaxBufferSize()
|| (EnvironmentEdgeManager.currentTime() - lastStagedFlushTs >= endpoint.maxFlushInterval());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be in the Endpoint, as the decision varies according to the type of endpoint. Today we have basically two types, buffered and non-buffered. If we have new endpoint types in the future, we might again need to come here and add the related logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my comment above to get more context

For me, I do not think we need to expose this information to shipper?

The design here is that, when using different ReplicationEndpoint, you need to tune the shipper configuration by your own, as the parameters are not only affected by ReplicationEndpoint, they also depend on the shipper side.

For example, when you want to reduce the pressure on recording the offset, you should increase the record interval, i.e, increase batch size, increase the number of ship times between recording offset, etc. And if you want to reduce the pressure on memory and the target receiver, you should decrease the batch size, and for S3 based replication endpoint, there is also a trade off, if you increase the flush interval, you can get better performance and less files on S3, but failover will be more complicated as you need to start from long before.

So, this should be in the documentation, just exposing some configuration from ReplicationEndpoint can not handle all the above situations.

stagedWalSize += currentSize;
entriesForCleanUpHFileRefs.addAll(entries);
lastShippedBatch = entryBatch;
if (shouldPersistLogPosition()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than having these stagedWalSize and lastShippedBatch as global variables, we should just pass the entryBatch along to shouldPersistLogPosition() (which should be defined/implemented in the endpoints, btw, see my other comment related) and persistLogPosition().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to determine whether we need to persist the log position in shipper, based on some configurations, not triggered by replication endpoint. Users can choose different configuration values based on different replication endpoint implementations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, it doesn't look much cohesive. Shipper seems to be taking decisions based on specific endpoint implementations. What if new endpoint impls with different logic for updating log position are thought in the future, we would need to revisit the shipper again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think time based and size based persistency is enough for most cases? If in the future we have some special endpoint which needs new type of decision way, we can add new mechanism, no problem.

The problem here why we do not want to only trigger persistency from endpoint is that, we have other considerations about when to persist the log position, like the trade off between failover and pressure on replication storage. So here I suggest that we introduce general mechanisms to control the behavior of persistency of log position, users can tune it based on different approach.

Comment on lines +272 to +274
if (endpoint.isBufferedReplicationEndpoint() && stagedWalSize > 0) {
source.getReplicationEndpoint().beforePersistingReplicationOffset();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also should be in the endpoint.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants