Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,35 @@ public int getTimeout() {
* @throws IllegalStateException if this service's state isn't FAILED.
*/
Throwable failureCause();

/**
* @return true if this endpoint buffers WAL entries and requires explicit flush control before
* persisting replication offsets.
*/
default boolean isBufferedReplicationEndpoint() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me, I do not think we need to expose this information to shipper?

The design here is that, when using different ReplicationEndpoint, you need to tune the shipper configuration by your own, as the parameters are not only affected by ReplicationEndpoint, they also depend on the shipper side.

For example, when you want to reduce the pressure on recording the offset, you should increase the record interval, i.e, increase batch size, increase the number of ship times between recording offset, etc. And if you want to reduce the pressure on memory and the target receiver, you should decrease the batch size, and for S3 based replication endpoint, there is also a trade off, if you increase the flush interval, you can get better performance and less files on S3, but failover will be more complicated as you need to start from long before.

So, this should be in the documentation, just exposing some configuration from ReplicationEndpoint can not handle all the above situations.

return false;
}

/**
* Maximum WAL size (bytes) to buffer before forcing a flush. Only meaningful when
* isBufferedReplicationEndpoint() == true.
*/
default long getMaxBufferSize() {
return -1L;
}

/**
* Maximum time (ms) to wait before forcing a flush. Only meaningful when
* isBufferedReplicationEndpoint() == true.
*/
default long maxFlushInterval() {
return Long.MAX_VALUE;
}

/**
* Hook invoked before persisting replication offsets. Buffered endpoints should flush/close WALs
* here.
*/
default void beforePersistingReplicationOffset() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the endpoints view, this method is just a flush/close of the given wal, so lets name it accordingly.

Suggested change
default void beforePersistingReplicationOffset() {
default void flushAndCloseWAL() {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this is exactly what we want to avoid here. This method just tells the endpoint what the shipper going to do, and the endpoint can do anything it wants. For our normal replication framework, there is no flush and close operations, as we will always send everything out and return until we get acks, so basically we do not need to implement this method. And for S3 based replication endpoint, we need to close the file to persist it on S3. Maybe in the future we have other types of replication endpoint which may do other works, so we do not want to name it flushAndCloseWAL.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;

import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -74,6 +76,10 @@ public enum WorkerState {
private final int DEFAULT_TIMEOUT = 20000;
private final int getEntriesTimeout;
private final int shipEditsTimeout;
private long stagedWalSize = 0L;
private long lastStagedFlushTs = EnvironmentEdgeManager.currentTime();
private WALEntryBatch lastShippedBatch;
private final List<Entry> entriesForCleanUpHFileRefs = new ArrayList<>();

public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source,
ReplicationSourceWALReader walReader) {
Expand All @@ -98,6 +104,13 @@ public final void run() {
LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId);
// Loop until we close down
while (isActive()) {
// check if flush needed for WAL backup, this is need for timeout based flush
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment has a typo: 'need' should be 'needed'. Should read "this is needed for timeout based flush".

Suggested change
// check if flush needed for WAL backup, this is need for timeout based flush
// check if flush needed for WAL backup, this is needed for timeout based flush

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not designed for WAL backup only, I need to say. Here, in shipper, we just follow the configured limit, i.e, time based or size based, to determine whether we need to record the log position, and there is a callback to ReplicationEndpoint before recording, the replication endpoint can use this callback to do something, the shipper does not know the detail.

if (shouldPersistLogPosition()) {
IOException error = persistLogPosition();
if (error != null) {
LOG.warn("Exception while persisting replication state", error);
}
}
Comment on lines +108 to +113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I don't think you need to change the return type as IOException, instead you can just use try-catch if it's really needed to handle any Exception differently.

Suggested change
if (shouldPersistLogPosition()) {
IOException error = persistLogPosition();
if (error != null) {
LOG.warn("Exception while persisting replication state", error);
}
}
if (shouldPersistLogPosition()) {
persistLogPosition();
}

// Sleep until replication is enabled again
if (!source.isPeerEnabled()) {
// The peer enabled check is in memory, not expensive, so do not need to increase the
Expand Down Expand Up @@ -155,7 +168,8 @@ private void shipEdits(WALEntryBatch entryBatch) {
List<Entry> entries = entryBatch.getWalEntries();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
updateLogPosition(entryBatch);
lastShippedBatch = entryBatch;
persistLogPosition();
return;
}
int currentSize = (int) entryBatch.getHeapSize();
Expand Down Expand Up @@ -190,13 +204,16 @@ private void shipEdits(WALEntryBatch entryBatch) {
} else {
sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
}
// Clean up hfile references
for (Entry entry : entries) {
cleanUpHFileRefs(entry.getEdit());
LOG.trace("shipped entry {}: ", entry);

stagedWalSize += currentSize;
entriesForCleanUpHFileRefs.addAll(entries);
lastShippedBatch = entryBatch;
if (shouldPersistLogPosition()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than having these stagedWalSize and lastShippedBatch as global variables, we should just pass the entryBatch along to shouldPersistLogPosition() (which should be defined/implemented in the endpoints, btw, see my other comment related) and persistLogPosition().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to determine whether we need to persist the log position in shipper, based on some configurations, not triggered by replication endpoint. Users can choose different configuration values based on different replication endpoint implementations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, it doesn't look much cohesive. Shipper seems to be taking decisions based on specific endpoint implementations. What if new endpoint impls with different logic for updating log position are thought in the future, we would need to revisit the shipper again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think time based and size based persistency is enough for most cases? If in the future we have some special endpoint which needs new type of decision way, we can add new mechanism, no problem.

The problem here why we do not want to only trigger persistency from endpoint is that, we have other considerations about when to persist the log position, like the trade off between failover and pressure on replication storage. So here I suggest that we introduce general mechanisms to control the behavior of persistency of log position, users can tune it based on different approach.

IOException error = persistLogPosition();
if (error != null) {
throw error; // existing catch block handles retry
}
}
// Log and clean up WAL logs
updateLogPosition(entryBatch);

// offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
// this sizeExcludeBulkLoad has to use same calculation that when calling
Expand Down Expand Up @@ -229,6 +246,53 @@ private void shipEdits(WALEntryBatch entryBatch) {
}
}

private boolean shouldPersistLogPosition() {
ReplicationEndpoint endpoint = source.getReplicationEndpoint();
if (!endpoint.isBufferedReplicationEndpoint()) {
// Non-buffering endpoints persist immediately
return true;
}
if (stagedWalSize == 0 || lastShippedBatch == null) {
return false;
}
return stagedWalSize >= endpoint.getMaxBufferSize()
|| (EnvironmentEdgeManager.currentTime() - lastStagedFlushTs >= endpoint.maxFlushInterval());
}
Comment on lines +249 to +260
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be in the Endpoint, as the decision varies according to the type of endpoint. Today we have basically two types, buffered and non-buffered. If we have new endpoint types in the future, we might again need to come here and add the related logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my comment above to get more context

For me, I do not think we need to expose this information to shipper?

The design here is that, when using different ReplicationEndpoint, you need to tune the shipper configuration by your own, as the parameters are not only affected by ReplicationEndpoint, they also depend on the shipper side.

For example, when you want to reduce the pressure on recording the offset, you should increase the record interval, i.e, increase batch size, increase the number of ship times between recording offset, etc. And if you want to reduce the pressure on memory and the target receiver, you should decrease the batch size, and for S3 based replication endpoint, there is also a trade off, if you increase the flush interval, you can get better performance and less files on S3, but failover will be more complicated as you need to start from long before.

So, this should be in the documentation, just exposing some configuration from ReplicationEndpoint can not handle all the above situations.


@Nullable
// Returns IOException instead of throwing so callers can decide
// whether to retry (shipEdits) or best-effort log (run()).
private IOException persistLogPosition() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private IOException persistLogPosition() {
private void persistLogPosition() throws IOException {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

if (lastShippedBatch == null) {
return null;
}

ReplicationEndpoint endpoint = source.getReplicationEndpoint();

if (endpoint.isBufferedReplicationEndpoint() && stagedWalSize > 0) {
source.getReplicationEndpoint().beforePersistingReplicationOffset();
}
Comment on lines +272 to +274
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also should be in the endpoint.


stagedWalSize = 0;
lastStagedFlushTs = EnvironmentEdgeManager.currentTime();

// Clean up hfile references
try {
for (Entry entry : entriesForCleanUpHFileRefs) {
cleanUpHFileRefs(entry.getEdit());
LOG.trace("shipped entry {}: ", entry);
}
} catch (IOException e) {
LOG.warn("{} threw exception while cleaning up hfile refs", endpoint.getClass().getName(), e);
return e;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return e;
throw e;

}
entriesForCleanUpHFileRefs.clear();

// Log and clean up WAL logs
updateLogPosition(lastShippedBatch);
return null;
}

private void cleanUpHFileRefs(WALEdit edit) throws IOException {
String peerId = source.getPeerId();
if (peerId.contains("-")) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;

import static org.junit.Assert.assertEquals;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;

/**
* Tests staged WAL flush behavior in ReplicationSourceShipper.
*/
@Category({ ReplicationTests.class, MediumTests.class })
public class TestReplicationSourceShipperBufferedFlush {

static class CountingReplicationEndpoint extends BaseReplicationEndpoint {

private final AtomicInteger beforePersistCalls = new AtomicInteger();

@Override
public void start() {
startAsync().awaitRunning();
}

@Override
public void stop() {
stopAsync().awaitTerminated();
}

@Override
protected void doStart() {
notifyStarted();
}

@Override
protected void doStop() {
notifyStopped();
}

@Override
public boolean replicate(ReplicateContext ctx) {
return true;
}

@Override
public void beforePersistingReplicationOffset() {
beforePersistCalls.incrementAndGet();
}

@Override
public long getMaxBufferSize() {
return 1L; // force immediate flush
}

@Override
public long maxFlushInterval() {
return Long.MAX_VALUE;
}

@Override
public UUID getPeerUUID() {
return null;
}

int getBeforePersistCalls() {
return beforePersistCalls.get();
}
}

@Test
public void testBeforePersistNotCalledForEmptyBatch() throws Exception {
Configuration conf = HBaseConfiguration.create();

CountingReplicationEndpoint endpoint = new CountingReplicationEndpoint();
endpoint.start();

ReplicationSource source = Mockito.mock(ReplicationSource.class);
ReplicationSourceWALReader walReader = Mockito.mock(ReplicationSourceWALReader.class);

Mockito.when(source.isPeerEnabled()).thenReturn(true);
Mockito.when(source.isSourceActive()).thenReturn(true);
Mockito.when(source.getReplicationEndpoint()).thenReturn(endpoint);
Mockito.when(source.getPeerId()).thenReturn("1");
Mockito.when(source.getSourceMetrics()).thenReturn(Mockito.mock(MetricsSource.class));

WALEntryBatch batch = new WALEntryBatch(1, null);
batch.setLastWalPath(new Path("wal"));
batch.setLastWalPosition(1L);
// no entries, no heap size

Mockito.when(walReader.take()).thenReturn(batch).thenReturn(null);
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mock is configured to stub 'walReader.take()' but the ReplicationSourceShipper actually calls 'entryReader.poll(getEntriesTimeout)' (line 118 in ReplicationSourceShipper.java). This means the mock configuration has no effect, and the test will call the unmocked poll() method which will return null, causing the shipper to skip processing any batches. The test should mock 'poll(anyInt())' instead of 'take()'.

Suggested change
Mockito.when(walReader.take()).thenReturn(batch).thenReturn(null);
Mockito.when(walReader.poll(Mockito.anyInt())).thenReturn(batch).thenReturn(null);

Copilot uses AI. Check for mistakes.

ReplicationSourceShipper shipper =
new ReplicationSourceShipper(conf, "wal-group", source, walReader);

shipper.start();

// Allow loop to run
Waiter.waitFor(conf, 3000, () -> true);

shipper.interrupt();
Comment on lines +123 to +126
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test uses a fixed wait time of 3000ms with an empty lambda that always returns true (Waiter.waitFor(conf, 3000, () -> true)). This just sleeps unconditionally and doesn't actually verify any condition. The test should wait for a meaningful condition, such as verifying that the shipper has processed the batch or checking that the thread has reached a specific state. This makes the test timing-dependent and unreliable.

Suggested change
// Allow loop to run
Waiter.waitFor(conf, 3000, () -> true);
shipper.interrupt();
// Wait until the shipper thread has finished processing the batch
Waiter.waitFor(conf, 3000, () -> !shipper.isAlive());

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another good suggestion.

shipper.join();

assertEquals(0, endpoint.getBeforePersistCalls());
}
}