diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 846b87702f..b9075e4238 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -772,9 +772,8 @@ private CompletableFuture checkLeaderState(RaftClientRequest re */ private CompletableFuture checkLeaderState(RaftClientRequest request, CacheEntry entry) { if (!getInfo().isLeader()) { - NotLeaderException exception = generateNotLeaderException(); - final RaftClientReply reply = newExceptionReply(request, exception); - return RetryCacheImpl.failWithReply(reply, entry); + return retryCache.failWithReplyAndInvalidate( + newExceptionReply(request, generateNotLeaderException()), entry); } if (!getInfo().isLeaderReady()) { final CacheEntry cacheEntry = retryCache.getIfPresent(ClientInvocationId.valueOf(request)); @@ -832,8 +831,8 @@ private CompletableFuture appendTransaction( final LeaderStateImpl unsyncedLeaderState = role.getLeaderState().orElse(null); if (unsyncedLeaderState == null) { - final RaftClientReply reply = newExceptionReply(request, generateNotLeaderException()); - return RetryCacheImpl.failWithReply(reply, cacheEntry); + return retryCache.failWithReplyAndInvalidate( + newExceptionReply(request, generateNotLeaderException()), cacheEntry); } final PendingRequests.Permit unsyncedPermit = unsyncedLeaderState.tryAcquirePendingRequest(request.getMessage()); if (unsyncedPermit == null) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java index 4da459ae9e..0a0c89d95a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java @@ -231,6 +231,10 @@ CacheQueryResult queryCache(RaftClientRequest request) { } } + void invalidate(ClientInvocationId key) { + cache.invalidate(key); + } + void invalidateRepliedRequests(RaftClientRequest request) { final ClientId clientId = request.getClientId(); final Iterable callIds = request.getRepliedCallIds(); @@ -263,8 +267,22 @@ static CompletableFuture failWithReply( if (entry != null) { entry.failWithReply(reply); return entry.getReplyFuture(); - } else { - return CompletableFuture.completedFuture(reply); } + return CompletableFuture.completedFuture(reply); + } + + /** + * Fail the cache entry with the given reply and remove it from the cache. + * This should be used when we want to fail a request without caching the failure, + * such as when leadership is lost before processing the request. + */ + CompletableFuture failWithReplyAndInvalidate( + RaftClientReply reply, CacheEntry entry) { + if (entry != null) { + entry.failWithReply(reply); + invalidate(entry.getKey()); + return entry.getReplyFuture(); + } + return CompletableFuture.completedFuture(reply); } }