RATIS-2242 change consistency criteria of heartbeat during appendLog#1215
RATIS-2242 change consistency criteria of heartbeat during appendLog#1215szetszwo merged 2 commits intoapache:masterfrom
Conversation
szetszwo
left a comment
There was a problem hiding this comment.
@SzyWilliam , thanks for finding out the problem and working on this! Please see the comment inlined.
|
|
||
| // Check if "previous" is contained in current state. | ||
| if (previous != null && !state.containsTermIndex(previous)) { | ||
| if (previous != null && !state.containsTermIndex(previous) && appendLogFuture.get().isDone()) { |
There was a problem hiding this comment.
This will disable inconsistent check when the server is busy (appendLogFuture.get().isDone() will mostly false). We need to remember the entries being appended to the log; see https://issues.apache.org/jira/secure/attachment/13074189/1215_review.patch
|
@szetszwo Thanks very much for the patch! I’m really impressed by how clean and intuitive the design of remembering the entries. It's always exciting to enjoy the art of programming. |
szetszwo
left a comment
There was a problem hiding this comment.
+1 the change looks good.
|
It could throw IllegalStateException. Then, the ref is retained but not released. There seems to have two async calls, appendEntries1 and then appendEntries2. The failure happens when appendEntries2 runs faster than appendEntries1. |
|
gRPC calls onNext one-by-one but our RaftServerImpl.executeSubmitServerRequestAsync method uses |
szetszwo
left a comment
There was a problem hiding this comment.
@SzyWilliam , thanks for the update!
As mentioned in my previous comments, we should not use executeSubmitServerRequestAsync.
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 6b41c8c2a..523a41833 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -653,7 +653,8 @@ class RaftServerProxy implements RaftServer {
try {
final RaftGroupId groupId = ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
return getImplFuture(groupId)
- .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.appendEntriesAsync(requestRef)));
+ .thenCompose(impl -> JavaUtils.callAsUnchecked(
+ () -> impl.appendEntriesAsync(requestRef), CompletionException::new));
} finally {
requestRef.release();
}| // validate index0 | ||
| final long index0 = indices.startIndex; | ||
| final Map.Entry<Long, ConsecutiveIndices> lastEntry = map.lastEntry(); | ||
| if (lastEntry != null) { | ||
| Preconditions.assertSame(lastEntry.getValue().getNextIndex(), index0, "index0"); | ||
| } | ||
| map.put(index0, indices); |
There was a problem hiding this comment.
Let's remove index0.
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 976a05008..c5010a534 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -138,13 +138,12 @@ public final class ServerImplUtils {
synchronized void append(List<ConsecutiveIndices> entriesTermIndices) {
for(ConsecutiveIndices indices : entriesTermIndices) {
- // validate index0
- final long index0 = indices.startIndex;
+ // validate startIndex
final Map.Entry<Long, ConsecutiveIndices> lastEntry = map.lastEntry();
if (lastEntry != null) {
- Preconditions.assertSame(lastEntry.getValue().getNextIndex(), index0, "index0");
+ Preconditions.assertSame(lastEntry.getValue().getNextIndex(), indices.startIndex, "startIndex");
}
- map.put(index0, indices);
+ map.put(indices.startIndex, indices);
}
}| entriesRef.retain(); | ||
| final List<LogEntryProto> entries = entriesRef.retain(); | ||
| final List<ConsecutiveIndices> entriesTermIndices = ConsecutiveIndices.convert(entries); | ||
| appendLogTermIndices.append(entriesTermIndices); | ||
|
|
There was a problem hiding this comment.
We actually need to retain twice, one for sync and another one for async.
private CompletableFuture<Void> appendLog(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
final List<ConsecutiveIndices> entriesTermIndices;
try(UncheckedAutoCloseableSupplier<List<LogEntryProto>> entries = entriesRef.retainAndReleaseOnClose()) {
entriesTermIndices = ConsecutiveIndices.convert(entries.get());
appendLogTermIndices.append(entriesTermIndices);
}
entriesRef.retain();
return appendLogFuture.updateAndGet(f -> f.thenCompose(
...|
@szetszwo Thanks very much for the suggested code changes. Just came back from a long lunar new year break, sorry for the delay. Let me fix this patch today! |
szetszwo
left a comment
There was a problem hiding this comment.
+1 the change looks good.
What changes were proposed in this pull request?
When enable both appendLog channel and heartbeat channel,
Leader:
appendLog will send an AppendEntries RPC with (previous = nextIndex0, entries = [e1,e2...]), then update the follower nextIndex to nextIndex1
Subsequent heartbeat channel will send AppendEntries RPCs with (previous = nextIndex1, entries = empty)
Follower:
Inconsistency reply will be triggered when handling the heartbeats by
https://github.com/apache/ratis/blob/8353a017fe6545fbfb74960ecb3a0f4396c478d2/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java#L1720-1724
What is the link to the Apache JIRA
https://issues.apache.org/jira/projects/RATIS/issues/RATIS-2242?filter=allissues
How was this patch tested?
Unit tests