-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-29823 Control WAL flush and offset persistence from ReplicationSourceShipper #7617
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -283,4 +283,35 @@ public int getTimeout() { | |||||
| * @throws IllegalStateException if this service's state isn't FAILED. | ||||||
| */ | ||||||
| Throwable failureCause(); | ||||||
|
|
||||||
| /** | ||||||
| * @return true if this endpoint buffers WAL entries and requires explicit flush control before | ||||||
| * persisting replication offsets. | ||||||
| */ | ||||||
| default boolean isBufferedReplicationEndpoint() { | ||||||
| return false; | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Maximum WAL size (bytes) to buffer before forcing a flush. Only meaningful when | ||||||
| * isBufferedReplicationEndpoint() == true. | ||||||
| */ | ||||||
| default long getMaxBufferSize() { | ||||||
| return -1L; | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Maximum time (ms) to wait before forcing a flush. Only meaningful when | ||||||
| * isBufferedReplicationEndpoint() == true. | ||||||
| */ | ||||||
| default long maxFlushInterval() { | ||||||
| return Long.MAX_VALUE; | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Hook invoked before persisting replication offsets. Buffered endpoints should flush/close WALs | ||||||
| * here. | ||||||
| */ | ||||||
| default void beforePersistingReplicationOffset() { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From the endpoints view, this method is just a flush/close of the given wal, so lets name it accordingly.
Suggested change
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, this is exactly what we want to avoid here. This method just tells the endpoint what the shipper going to do, and the endpoint can do anything it wants. For our normal replication framework, there is no flush and close operations, as we will always send everything out and return until we get acks, so basically we do not need to implement this method. And for S3 based replication endpoint, we need to close the file to persist it on S3. Maybe in the future we have other types of replication endpoint which may do other works, so we do not want to name it |
||||||
| } | ||||||
| } | ||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -20,7 +20,9 @@ | |||||||||||||||||||
| import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout; | ||||||||||||||||||||
| import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| import edu.umd.cs.findbugs.annotations.Nullable; | ||||||||||||||||||||
| import java.io.IOException; | ||||||||||||||||||||
| import java.util.ArrayList; | ||||||||||||||||||||
| import java.util.List; | ||||||||||||||||||||
| import org.apache.hadoop.conf.Configuration; | ||||||||||||||||||||
| import org.apache.hadoop.fs.Path; | ||||||||||||||||||||
|
|
@@ -74,6 +76,10 @@ public enum WorkerState { | |||||||||||||||||||
| private final int DEFAULT_TIMEOUT = 20000; | ||||||||||||||||||||
| private final int getEntriesTimeout; | ||||||||||||||||||||
| private final int shipEditsTimeout; | ||||||||||||||||||||
| private long stagedWalSize = 0L; | ||||||||||||||||||||
| private long lastStagedFlushTs = EnvironmentEdgeManager.currentTime(); | ||||||||||||||||||||
| private WALEntryBatch lastShippedBatch; | ||||||||||||||||||||
| private final List<Entry> entriesForCleanUpHFileRefs = new ArrayList<>(); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source, | ||||||||||||||||||||
| ReplicationSourceWALReader walReader) { | ||||||||||||||||||||
|
|
@@ -98,6 +104,13 @@ public final void run() { | |||||||||||||||||||
| LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId); | ||||||||||||||||||||
| // Loop until we close down | ||||||||||||||||||||
| while (isActive()) { | ||||||||||||||||||||
| // check if flush needed for WAL backup, this is need for timeout based flush | ||||||||||||||||||||
|
||||||||||||||||||||
| // check if flush needed for WAL backup, this is need for timeout based flush | |
| // check if flush needed for WAL backup, this is needed for timeout based flush |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not designed for WAL backup only, I need to say. Here, in shipper, we just follow the configured limit, i.e, time based or size based, to determine whether we need to record the log position, and there is a callback to ReplicationEndpoint before recording, the replication endpoint can use this callback to do something, the shipper does not know the detail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I don't think you need to change the return type as IOException, instead you can just use try-catch if it's really needed to handle any Exception differently.
| if (shouldPersistLogPosition()) { | |
| IOException error = persistLogPosition(); | |
| if (error != null) { | |
| LOG.warn("Exception while persisting replication state", error); | |
| } | |
| } | |
| if (shouldPersistLogPosition()) { | |
| persistLogPosition(); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than having these stagedWalSize and lastShippedBatch as global variables, we should just pass the entryBatch along to shouldPersistLogPosition() (which should be defined/implemented in the endpoints, btw, see my other comment related) and persistLogPosition().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to determine whether we need to persist the log position in shipper, based on some configurations, not triggered by replication endpoint. Users can choose different configuration values based on different replication endpoint implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, it doesn't look much cohesive. Shipper seems to be taking decisions based on specific endpoint implementations. What if new endpoint impls with different logic for updating log position are thought in the future, we would need to revisit the shipper again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think time based and size based persistency is enough for most cases? If in the future we have some special endpoint which needs new type of decision way, we can add new mechanism, no problem.
The problem here why we do not want to only trigger persistency from endpoint is that, we have other considerations about when to persist the log position, like the trade off between failover and pressure on replication storage. So here I suggest that we introduce general mechanisms to control the behavior of persistency of log position, users can tune it based on different approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be in the Endpoint, as the decision varies according to the type of endpoint. Today we have basically two types, buffered and non-buffered. If we have new endpoint types in the future, we might again need to come here and add the related logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see my comment above to get more context
For me, I do not think we need to expose this information to shipper?
The design here is that, when using different ReplicationEndpoint, you need to tune the shipper configuration by your own, as the parameters are not only affected by ReplicationEndpoint, they also depend on the shipper side.
For example, when you want to reduce the pressure on recording the offset, you should increase the record interval, i.e, increase batch size, increase the number of ship times between recording offset, etc. And if you want to reduce the pressure on memory and the target receiver, you should decrease the batch size, and for S3 based replication endpoint, there is also a trade off, if you increase the flush interval, you can get better performance and less files on S3, but failover will be more complicated as you need to start from long before.
So, this should be in the documentation, just exposing some configuration from ReplicationEndpoint can not handle all the above situations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private IOException persistLogPosition() { | |
| private void persistLogPosition() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also should be in the endpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return e; | |
| throw e; |
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,131 @@ | ||||||||||||||
| /* | ||||||||||||||
| * Licensed to the Apache Software Foundation (ASF) under one | ||||||||||||||
| * or more contributor license agreements. See the NOTICE file | ||||||||||||||
| * distributed with this work for additional information | ||||||||||||||
| * regarding copyright ownership. The ASF licenses this file | ||||||||||||||
| * to you under the Apache License, Version 2.0 (the | ||||||||||||||
| * "License"); you may not use this file except in compliance | ||||||||||||||
| * with the License. You may obtain a copy of the License at | ||||||||||||||
| * | ||||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||
| * | ||||||||||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||||||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||||
| * limitations under the License. | ||||||||||||||
| */ | ||||||||||||||
| package org.apache.hadoop.hbase.replication.regionserver; | ||||||||||||||
|
|
||||||||||||||
| import static org.junit.Assert.assertEquals; | ||||||||||||||
|
|
||||||||||||||
| import java.util.UUID; | ||||||||||||||
| import java.util.concurrent.atomic.AtomicInteger; | ||||||||||||||
| import org.apache.hadoop.conf.Configuration; | ||||||||||||||
| import org.apache.hadoop.fs.Path; | ||||||||||||||
| import org.apache.hadoop.hbase.HBaseConfiguration; | ||||||||||||||
| import org.apache.hadoop.hbase.Waiter; | ||||||||||||||
| import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; | ||||||||||||||
| import org.apache.hadoop.hbase.testclassification.MediumTests; | ||||||||||||||
| import org.apache.hadoop.hbase.testclassification.ReplicationTests; | ||||||||||||||
| import org.junit.Test; | ||||||||||||||
| import org.junit.experimental.categories.Category; | ||||||||||||||
| import org.mockito.Mockito; | ||||||||||||||
|
|
||||||||||||||
| /** | ||||||||||||||
| * Tests staged WAL flush behavior in ReplicationSourceShipper. | ||||||||||||||
| */ | ||||||||||||||
| @Category({ ReplicationTests.class, MediumTests.class }) | ||||||||||||||
| public class TestReplicationSourceShipperBufferedFlush { | ||||||||||||||
|
|
||||||||||||||
| static class CountingReplicationEndpoint extends BaseReplicationEndpoint { | ||||||||||||||
|
|
||||||||||||||
| private final AtomicInteger beforePersistCalls = new AtomicInteger(); | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public void start() { | ||||||||||||||
| startAsync().awaitRunning(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public void stop() { | ||||||||||||||
| stopAsync().awaitTerminated(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| protected void doStart() { | ||||||||||||||
| notifyStarted(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| protected void doStop() { | ||||||||||||||
| notifyStopped(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public boolean replicate(ReplicateContext ctx) { | ||||||||||||||
| return true; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public void beforePersistingReplicationOffset() { | ||||||||||||||
| beforePersistCalls.incrementAndGet(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public long getMaxBufferSize() { | ||||||||||||||
| return 1L; // force immediate flush | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public long maxFlushInterval() { | ||||||||||||||
| return Long.MAX_VALUE; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public UUID getPeerUUID() { | ||||||||||||||
| return null; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| int getBeforePersistCalls() { | ||||||||||||||
| return beforePersistCalls.get(); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Test | ||||||||||||||
| public void testBeforePersistNotCalledForEmptyBatch() throws Exception { | ||||||||||||||
| Configuration conf = HBaseConfiguration.create(); | ||||||||||||||
|
|
||||||||||||||
| CountingReplicationEndpoint endpoint = new CountingReplicationEndpoint(); | ||||||||||||||
| endpoint.start(); | ||||||||||||||
|
|
||||||||||||||
| ReplicationSource source = Mockito.mock(ReplicationSource.class); | ||||||||||||||
| ReplicationSourceWALReader walReader = Mockito.mock(ReplicationSourceWALReader.class); | ||||||||||||||
|
|
||||||||||||||
| Mockito.when(source.isPeerEnabled()).thenReturn(true); | ||||||||||||||
| Mockito.when(source.isSourceActive()).thenReturn(true); | ||||||||||||||
| Mockito.when(source.getReplicationEndpoint()).thenReturn(endpoint); | ||||||||||||||
| Mockito.when(source.getPeerId()).thenReturn("1"); | ||||||||||||||
| Mockito.when(source.getSourceMetrics()).thenReturn(Mockito.mock(MetricsSource.class)); | ||||||||||||||
|
|
||||||||||||||
| WALEntryBatch batch = new WALEntryBatch(1, null); | ||||||||||||||
| batch.setLastWalPath(new Path("wal")); | ||||||||||||||
| batch.setLastWalPosition(1L); | ||||||||||||||
| // no entries, no heap size | ||||||||||||||
|
|
||||||||||||||
| Mockito.when(walReader.take()).thenReturn(batch).thenReturn(null); | ||||||||||||||
|
||||||||||||||
| Mockito.when(walReader.take()).thenReturn(batch).thenReturn(null); | |
| Mockito.when(walReader.poll(Mockito.anyInt())).thenReturn(batch).thenReturn(null); |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test uses a fixed wait time of 3000ms with an empty lambda that always returns true (Waiter.waitFor(conf, 3000, () -> true)). This just sleeps unconditionally and doesn't actually verify any condition. The test should wait for a meaningful condition, such as verifying that the shipper has processed the batch or checking that the thread has reached a specific state. This makes the test timing-dependent and unreliable.
| // Allow loop to run | |
| Waiter.waitFor(conf, 3000, () -> true); | |
| shipper.interrupt(); | |
| // Wait until the shipper thread has finished processing the batch | |
| Waiter.waitFor(conf, 3000, () -> !shipper.isAlive()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another good suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me, I do not think we need to expose this information to shipper?
The design here is that, when using different ReplicationEndpoint, you need to tune the shipper configuration by your own, as the parameters are not only affected by ReplicationEndpoint, they also depend on the shipper side.
For example, when you want to reduce the pressure on recording the offset, you should increase the record interval, i.e, increase batch size, increase the number of ship times between recording offset, etc. And if you want to reduce the pressure on memory and the target receiver, you should decrease the batch size, and for S3 based replication endpoint, there is also a trade off, if you increase the flush interval, you can get better performance and less files on S3, but failover will be more complicated as you need to start from long before.
So, this should be in the documentation, just exposing some configuration from ReplicationEndpoint can not handle all the above situations.