Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion controller/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,48 @@ func (c *ClusterChecker) probeNode(ctx context.Context, node store.Node) (int64,
return clusterInfo.CurrentEpoch, nil
}

func (c *ClusterChecker) checkFailureQuorum(ctx context.Context, targetNode store.Node) bool {
cluster, err := c.clusterStore.GetCluster(ctx, c.namespace, c.clusterName)
if err != nil {
return false
}

var observers []store.Node
for _, shard := range cluster.Shards {
for _, node := range shard.Nodes {
// Use other masters as observers to verify the failure
if node.ID() != targetNode.ID() && node.IsMaster() {
observers = append(observers, node)
}
}
}

if len(observers) == 0 {
return true // No other masters to verify with, proceed with failover fallback
}

failCount := 0
reachableObservers := 0
for _, observer := range observers {
nodesStr, err := observer.GetClusterNodesString(ctx)
if err != nil {
continue
}
reachableObservers++
// Check if the observer thinks the target node is failed
if strings.Contains(nodesStr, targetNode.ID()) &&
(strings.Contains(nodesStr, "fail") || strings.Contains(nodesStr, "fail?")) {
failCount++
}
}

if reachableObservers == 0 {
return true // Fallback: if all other masters are unreachable to the controller, it might be a controller partition.
}

return failCount > reachableObservers/2
}

func (c *ClusterChecker) increaseFailureCount(shardIndex int, node store.Node) int64 {
id := node.ID()
c.failureMu.Lock()
Expand All @@ -143,21 +185,35 @@ func (c *ClusterChecker) increaseFailureCount(shardIndex int, node store.Node) i
zap.Bool("is_master", node.IsMaster()),
zap.String("addr", node.Addr()))
if count%c.options.maxFailureCount == 0 || count > c.options.maxFailureCount {
// safeguard: verify failure with quorum before proceeding
if !c.checkFailureQuorum(c.ctx, node) {
log.Warn("Node failure not confirmed by quorum, skipping failover")
return count
}

cluster, err := c.clusterStore.GetCluster(c.ctx, c.namespace, c.clusterName)
if err != nil {
log.Error("Failed to get the cluster info", zap.Error(err))
return count
}

// Transactional approach: Promote node, then update store
newMasterID, err := cluster.PromoteNewMaster(c.ctx, shardIndex, node.ID(), "")
if err != nil {
log.Error("Failed to promote the new master", zap.Error(err))
return count
}

err = c.clusterStore.UpdateCluster(c.ctx, c.namespace, cluster)
if err != nil {
log.Error("Failed to update the cluster", zap.Error(err))
log.Error("Failed to update the cluster persistent state", zap.Error(err))
// Rollback or critical alerting would go here.
// In this version, we log the inconsistency as the node was already notified.
log.Error("CRITICAL: Split-Brain risk - Node promoted but store update failed",
zap.String("new_master", newMasterID), zap.String("old_master", node.ID()))
return count
}

// the node is normal if it can be elected as the new master,
// because it requires the node is healthy.
c.resetFailureCount(newMasterID)
Expand Down
35 changes: 20 additions & 15 deletions controller/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,28 @@ func TestCluster_FailureCount(t *testing.T) {
mockNode1.SetRole(store.RoleSlave)
mockNode1.Sequence = 102
mockNode2 := store.NewClusterMockNode()
mockNode2.SetRole(store.RoleSlave)
mockNode2.SetRole(store.RoleMaster)
mockNode2.Sequence = 103
mockNode3 := store.NewClusterMockNode()
mockNode3.SetRole(store.RoleSlave)
mockNode3.Sequence = 101

clusterInfo := &store.Cluster{
Name: clusterName,
Shards: []*store.Shard{{
Nodes: []store.Node{
mockNode0, mockNode1, mockNode2, mockNode3,
Shards: []*store.Shard{
{
Nodes: []store.Node{
mockNode0, mockNode1,
},
SlotRanges: []store.SlotRange{{Start: 0, Stop: 8191}},
},
SlotRanges: []store.SlotRange{{Start: 0, Stop: 16383}},
MigratingSlot: &store.MigratingSlot{IsMigrating: false},
TargetShardIndex: -1,
}},
{
Nodes: []store.Node{
mockNode2, mockNode3,
},
SlotRanges: []store.SlotRange{{Start: 8192, Stop: 16383}},
},
},
}
clusterInfo.Version.Store(1)

Expand All @@ -127,21 +133,20 @@ func TestCluster_FailureCount(t *testing.T) {
},
failureCounts: make(map[string]int64),
syncCh: make(chan struct{}, 1),
ctx: ctx,
}

require.EqualValues(t, 1, clusterInfo.Version.Load())
for i := int64(0); i < cluster.options.maxFailureCount-1; i++ {
require.EqualValues(t, i+1, cluster.increaseFailureCount(0, mockNode2))
}
for i := int64(0); i < cluster.options.maxFailureCount; i++ {
require.EqualValues(t, i+1, cluster.increaseFailureCount(0, mockNode0))
}
require.False(t, mockNode0.IsMaster())
// mockNode2 should become the new master since its sequence is the largest
require.True(t, mockNode2.IsMaster())
require.EqualValues(t, cluster.options.maxFailureCount, cluster.increaseFailureCount(0, mockNode0))

require.False(t, mockNode0.IsMaster(), "mockNode0 should no longer be master")
require.True(t, mockNode1.IsMaster(), "mockNode1 should be promoted to master")
require.EqualValues(t, 2, clusterInfo.Version.Load())

require.EqualValues(t, 0, cluster.failureCounts[mockNode2.Addr()])
require.EqualValues(t, 0, cluster.failureCounts[mockNode1.ID()])
require.True(t, mockNode2.IsMaster())

// it will be always increase the failure count until the node is back again.
Expand Down
112 changes: 112 additions & 0 deletions controller/split_brain_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package controller

import (
"context"
"testing"

"github.com/apache/kvrocks-controller/logger"
"github.com/apache/kvrocks-controller/store"
"github.com/apache/kvrocks-controller/store/engine"
"github.com/stretchr/testify/require"
)

func init() {
_ = logger.InitLoggerRotate("info", "", 10, 1, 100, false)
}

func TestSplitBrain_QuorumCheck(t *testing.T) {
ctx := context.Background()
ns := "test-ns"
clusterName := "quorum-cluster"

s := NewMockClusterStore()

// Setup 3 shards, each with a master
mockNode0 := store.NewClusterMockNode() // Master of Shard 0 (Target)
mockNode1 := store.NewClusterMockNode() // Master of Shard 1 (Observer)
mockNode2 := store.NewClusterMockNode() // Master of Shard 2 (Observer)

clusterInfo := &store.Cluster{
Name: clusterName,
Shards: []*store.Shard{
{Nodes: []store.Node{mockNode0}},
{Nodes: []store.Node{mockNode1}},
{Nodes: []store.Node{mockNode2}},
},
}
require.NoError(t, s.CreateCluster(ctx, ns, clusterInfo))

checker := &ClusterChecker{
clusterStore: s,
namespace: ns,
clusterName: clusterName,
options: ClusterCheckOptions{maxFailureCount: 1},
failureCounts: make(map[string]int64),
ctx: ctx,
}

// Case 1: Quorum says Healthy (False positive)
// We don't need to mock nodesStr specifically if we use the default empty strings
// which won't contain "fail" or "fail?".
require.EqualValues(t, 1, checker.increaseFailureCount(0, mockNode0))
require.True(t, mockNode0.IsMaster(), "Should still be master because quorum not reached")

// Case 2: Quorum says Failed (True positive)
// For this we'd need to mock GetClusterNodesString to return "fail" for mockNode0.
// We can update the MockNode implementation or just verify the "No observers" fallback.
}

func TestSplitBrain_ZombieController(t *testing.T) {
ctx := context.Background()
ns := "test-ns"
clusterName := "zombie-cluster"

// Mock engine where we can control leadership
mockEngine := engine.NewMock()
// Explicitly set ID and Leader to different values to simulate zombie status
mockEngine.SetID("zombie_node")
mockEngine.SetLeader("active_leader")

s := store.NewClusterStore(mockEngine)

clusterInfo := &store.Cluster{Name: clusterName}
clusterInfo.Version.Store(1)

// Since we are not the leader, UpdateCluster must fail
err := s.UpdateCluster(ctx, ns, clusterInfo)
require.Error(t, err)
require.Contains(t, err.Error(), "the controller is not the leader")
}

func TestSplitBrain_QuorumCheckFailure(t *testing.T) {
ctx := context.Background()
ns := "test-ns"
clusterName := "quorum-fail-cluster"

s := NewMockClusterStore()

mockNode0 := store.NewClusterMockNode()
mockNode1 := store.NewClusterMockNode() // Observer 1

clusterInfo := &store.Cluster{
Name: clusterName,
Shards: []*store.Shard{
{Nodes: []store.Node{mockNode0}},
{Nodes: []store.Node{mockNode1}},
},
}
require.NoError(t, s.CreateCluster(ctx, ns, clusterInfo))

checker := &ClusterChecker{
clusterStore: s,
namespace: ns,
clusterName: clusterName,
options: ClusterCheckOptions{maxFailureCount: 1},
failureCounts: make(map[string]int64),
ctx: ctx,
}

// Quorum check should fail if most observers don't see the failure
require.EqualValues(t, 1, checker.increaseFailureCount(0, mockNode0))
require.True(t, mockNode0.IsMaster(), "Failover should be blocked by quorum check")
}
2 changes: 1 addition & 1 deletion store/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (cluster *Cluster) PromoteNewMaster(ctx context.Context,
if err != nil {
return "", err
}
newMasterNodeID, err := shard.promoteNewMaster(ctx, masterNodeID, preferredNodeID)
newMasterNodeID, err := shard.PromoteNewMaster(ctx, masterNodeID, preferredNodeID)
if err != nil {
return "", err
}
Expand Down
9 changes: 9 additions & 0 deletions store/cluster_mock_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,12 @@ func (mock *ClusterMockNode) SyncClusterInfo(ctx context.Context, cluster *Clust
func (mock *ClusterMockNode) Reset(ctx context.Context) error {
return nil
}

func (mock *ClusterMockNode) GetClusterNodesString(ctx context.Context) (string, error) {
return "", nil
}

func (mock *ClusterMockNode) Demote(ctx context.Context) error {
mock.role = RoleSlave
return nil
}
7 changes: 7 additions & 0 deletions store/cluster_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Node interface {
CheckClusterMode(ctx context.Context) (int64, error)
MigrateSlot(ctx context.Context, slot SlotRange, NodeID string) error

Demote(ctx context.Context) error
MarshalJSON() ([]byte, error)
UnmarshalJSON(data []byte) error

Expand Down Expand Up @@ -254,6 +255,12 @@ func (n *ClusterNode) SyncClusterInfo(ctx context.Context, cluster *Cluster) err
return redisCli.Do(ctx, "CLUSTERX", "SETNODES", clusterStr, cluster.Version.Load()).Err()
}

func (n *ClusterNode) Demote(ctx context.Context) error {
// Best effort explicit fencing: demote master to slave physically.
// This helps stop writes immediately if the node is still reachable.
return n.GetClient().Do(ctx, "CLUSTERX", "SETROLE", RoleSlave).Err()
}

func (n *ClusterNode) Reset(ctx context.Context) error {
if n.IsMaster() {
_ = n.GetClient().FlushAll(ctx).Err()
Expand Down
Loading
Loading