diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index 5edd5b3e8c92..fbb8275750c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -283,4 +283,35 @@ public int getTimeout() { * @throws IllegalStateException if this service's state isn't FAILED. */ Throwable failureCause(); + + /** + * @return true if this endpoint buffers WAL entries and requires explicit flush control before + * persisting replication offsets. + */ + default boolean isBufferedReplicationEndpoint() { + return false; + } + + /** + * Maximum WAL size (bytes) to buffer before forcing a flush. Only meaningful when + * isBufferedReplicationEndpoint() == true. + */ + default long getMaxBufferSize() { + return -1L; + } + + /** + * Maximum time (ms) to wait before forcing a flush. Only meaningful when + * isBufferedReplicationEndpoint() == true. + */ + default long maxFlushInterval() { + return Long.MAX_VALUE; + } + + /** + * Hook invoked before persisting replication offsets. Buffered endpoints should flush/close WALs + * here. + */ + default void beforePersistingReplicationOffset() { + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 4709e607fc70..96c1c54d54ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -20,7 +20,9 @@ import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout; import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries; +import edu.umd.cs.findbugs.annotations.Nullable; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -74,6 +76,10 @@ public enum WorkerState { private final int DEFAULT_TIMEOUT = 20000; private final int getEntriesTimeout; private final int shipEditsTimeout; + private long stagedWalSize = 0L; + private long lastStagedFlushTs = EnvironmentEdgeManager.currentTime(); + private WALEntryBatch lastShippedBatch; + private final List entriesForCleanUpHFileRefs = new ArrayList<>(); public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source, ReplicationSourceWALReader walReader) { @@ -98,6 +104,13 @@ public final void run() { LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId); // Loop until we close down while (isActive()) { + // check if flush needed for WAL backup, this is need for timeout based flush + if (shouldPersistLogPosition()) { + IOException error = persistLogPosition(); + if (error != null) { + LOG.warn("Exception while persisting replication state", error); + } + } // Sleep until replication is enabled again if (!source.isPeerEnabled()) { // The peer enabled check is in memory, not expensive, so do not need to increase the @@ -155,7 +168,8 @@ private void shipEdits(WALEntryBatch entryBatch) { List entries = entryBatch.getWalEntries(); int sleepMultiplier = 0; if (entries.isEmpty()) { - updateLogPosition(entryBatch); + lastShippedBatch = entryBatch; + persistLogPosition(); return; } int currentSize = (int) entryBatch.getHeapSize(); @@ -190,13 +204,16 @@ private void shipEdits(WALEntryBatch entryBatch) { } else { sleepMultiplier = Math.max(sleepMultiplier - 1, 0); } - // Clean up hfile references - for (Entry entry : entries) { - cleanUpHFileRefs(entry.getEdit()); - LOG.trace("shipped entry {}: ", entry); + + stagedWalSize += currentSize; + entriesForCleanUpHFileRefs.addAll(entries); + lastShippedBatch = entryBatch; + if (shouldPersistLogPosition()) { + IOException error = persistLogPosition(); + if (error != null) { + throw error; // existing catch block handles retry + } } - // Log and clean up WAL logs - updateLogPosition(entryBatch); // offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) // this sizeExcludeBulkLoad has to use same calculation that when calling @@ -229,6 +246,53 @@ private void shipEdits(WALEntryBatch entryBatch) { } } + private boolean shouldPersistLogPosition() { + ReplicationEndpoint endpoint = source.getReplicationEndpoint(); + if (!endpoint.isBufferedReplicationEndpoint()) { + // Non-buffering endpoints persist immediately + return true; + } + if (stagedWalSize == 0 || lastShippedBatch == null) { + return false; + } + return stagedWalSize >= endpoint.getMaxBufferSize() + || (EnvironmentEdgeManager.currentTime() - lastStagedFlushTs >= endpoint.maxFlushInterval()); + } + + @Nullable + // Returns IOException instead of throwing so callers can decide + // whether to retry (shipEdits) or best-effort log (run()). + private IOException persistLogPosition() { + if (lastShippedBatch == null) { + return null; + } + + ReplicationEndpoint endpoint = source.getReplicationEndpoint(); + + if (endpoint.isBufferedReplicationEndpoint() && stagedWalSize > 0) { + source.getReplicationEndpoint().beforePersistingReplicationOffset(); + } + + stagedWalSize = 0; + lastStagedFlushTs = EnvironmentEdgeManager.currentTime(); + + // Clean up hfile references + try { + for (Entry entry : entriesForCleanUpHFileRefs) { + cleanUpHFileRefs(entry.getEdit()); + LOG.trace("shipped entry {}: ", entry); + } + } catch (IOException e) { + LOG.warn("{} threw exception while cleaning up hfile refs", endpoint.getClass().getName(), e); + return e; + } + entriesForCleanUpHFileRefs.clear(); + + // Log and clean up WAL logs + updateLogPosition(lastShippedBatch); + return null; + } + private void cleanUpHFileRefs(WALEdit edit) throws IOException { String peerId = source.getPeerId(); if (peerId.contains("-")) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceShipperBufferedFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceShipperBufferedFlush.java new file mode 100644 index 000000000000..66480f8dacb8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceShipperBufferedFlush.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertEquals; + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +/** + * Tests staged WAL flush behavior in ReplicationSourceShipper. + */ +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestReplicationSourceShipperBufferedFlush { + + static class CountingReplicationEndpoint extends BaseReplicationEndpoint { + + private final AtomicInteger beforePersistCalls = new AtomicInteger(); + + @Override + public void start() { + startAsync().awaitRunning(); + } + + @Override + public void stop() { + stopAsync().awaitTerminated(); + } + + @Override + protected void doStart() { + notifyStarted(); + } + + @Override + protected void doStop() { + notifyStopped(); + } + + @Override + public boolean replicate(ReplicateContext ctx) { + return true; + } + + @Override + public void beforePersistingReplicationOffset() { + beforePersistCalls.incrementAndGet(); + } + + @Override + public long getMaxBufferSize() { + return 1L; // force immediate flush + } + + @Override + public long maxFlushInterval() { + return Long.MAX_VALUE; + } + + @Override + public UUID getPeerUUID() { + return null; + } + + int getBeforePersistCalls() { + return beforePersistCalls.get(); + } + } + + @Test + public void testBeforePersistNotCalledForEmptyBatch() throws Exception { + Configuration conf = HBaseConfiguration.create(); + + CountingReplicationEndpoint endpoint = new CountingReplicationEndpoint(); + endpoint.start(); + + ReplicationSource source = Mockito.mock(ReplicationSource.class); + ReplicationSourceWALReader walReader = Mockito.mock(ReplicationSourceWALReader.class); + + Mockito.when(source.isPeerEnabled()).thenReturn(true); + Mockito.when(source.isSourceActive()).thenReturn(true); + Mockito.when(source.getReplicationEndpoint()).thenReturn(endpoint); + Mockito.when(source.getPeerId()).thenReturn("1"); + Mockito.when(source.getSourceMetrics()).thenReturn(Mockito.mock(MetricsSource.class)); + + WALEntryBatch batch = new WALEntryBatch(1, null); + batch.setLastWalPath(new Path("wal")); + batch.setLastWalPosition(1L); + // no entries, no heap size + + Mockito.when(walReader.take()).thenReturn(batch).thenReturn(null); + + ReplicationSourceShipper shipper = + new ReplicationSourceShipper(conf, "wal-group", source, walReader); + + shipper.start(); + + // Allow loop to run + Waiter.waitFor(conf, 3000, () -> true); + + shipper.interrupt(); + shipper.join(); + + assertEquals(0, endpoint.getBeforePersistCalls()); + } +}