From 81d0b5fda40a5ef45581b512dafe053b5797f0b1 Mon Sep 17 00:00:00 2001 From: paragrf Date: Tue, 31 Mar 2026 11:41:37 +0800 Subject: [PATCH 1/4] feat(api): add Online/Offline interfaces --- consts/errors.go | 1 + server/api/node.go | 37 +++++++++++++++++++++ server/api/node_test.go | 49 ++++++++++++++++++++++++++++ server/helper/helper.go | 2 ++ server/route.go | 1 + store/cluster.go | 59 +++++++++++++++++++++++++++++++--- store/cluster_node.go | 14 ++++++++ store/cluster_shard.go | 6 +++- store/cluster_shard_test.go | 36 +++++++++++++++++++++ store/cluster_test.go | 64 +++++++++++++++++++++++++++++++++++++ 10 files changed, 264 insertions(+), 5 deletions(-) diff --git a/consts/errors.go b/consts/errors.go index bf7e51d2..4d3f1bc2 100644 --- a/consts/errors.go +++ b/consts/errors.go @@ -38,5 +38,6 @@ var ( ErrShardIsServicing = errors.New("shard is servicing") ErrShardSlotIsMigrating = errors.New("shard slot is migrating") ErrShardNoMatchNewMaster = errors.New("no match new master in shard") + ErrCannotOfflineMaster = errors.New("cannot take master node offline, failover first") ErrSlotStartAndStopEqual = errors.New("start and stop of a range cannot be equal") ) diff --git a/server/api/node.go b/server/api/node.go index 44071084..cf60ec47 100644 --- a/server/api/node.go +++ b/server/api/node.go @@ -23,7 +23,10 @@ package api import ( "strconv" + "go.uber.org/zap" + "github.com/apache/kvrocks-controller/consts" + "github.com/apache/kvrocks-controller/logger" "github.com/apache/kvrocks-controller/server/helper" "github.com/gin-gonic/gin" @@ -82,3 +85,37 @@ func (handler *NodeHandler) Remove(c *gin.Context) { } helper.ResponseNoContent(c) } + +func (handler *NodeHandler) SetStatus(c *gin.Context) { + ns := c.Param("namespace") + cluster, _ := c.MustGet(consts.ContextKeyCluster).(*store.Cluster) + + var req struct { + Addrs []string `json:"addrs" binding:"required"` + Online bool `json:"online"` + } + if err := c.ShouldBindJSON(&req); err != nil { + helper.ResponseBadRequest(c, err) + return + } + + var err error + if req.Online { + err = cluster.SetNodesOnline(req.Addrs) + } else { + err = cluster.SetNodesOffline(req.Addrs) + } + if err != nil { + helper.ResponseError(c, err) + return + } + + if err := handler.s.UpdateCluster(c, ns, cluster); err != nil { + helper.ResponseError(c, err) + return + } + if err := cluster.SyncToNodes(c); err != nil { + logger.Get().With(zap.Error(err)).Warn("Failed to sync cluster info to nodes after status change") + } + helper.ResponseOK(c, nil) +} diff --git a/server/api/node_test.go b/server/api/node_test.go index 86291856..a93e0ca4 100644 --- a/server/api/node_test.go +++ b/server/api/node_test.go @@ -124,3 +124,52 @@ func TestNodeBasics(t *testing.T) { runRemove(t, cluster.Shards[0].Nodes[1].ID(), http.StatusNoContent) }) } + +func TestNodeSetStatus(t *testing.T) { + ns := "test-ns" + cluster, err := store.NewCluster("test-cluster", []string{"127.0.0.1:1234", "127.0.0.1:1235"}, 2) + require.NoError(t, err) + + handler := &NodeHandler{s: store.NewClusterStore(engine.NewMock())} + require.NoError(t, handler.s.CreateCluster(context.Background(), ns, cluster)) + + slaveAddr := cluster.Shards[0].Nodes[1].Addr() + masterAddr := cluster.Shards[0].Nodes[0].Addr() + + runSetStatus := func(t *testing.T, addrs []string, online bool, expectedCode int) { + var req struct { + Addrs []string `json:"addrs"` + Online bool `json:"online"` + } + req.Addrs = addrs + req.Online = online + + recorder := httptest.NewRecorder() + ctx := GetTestContext(recorder) + body, err := json.Marshal(req) + require.NoError(t, err) + + ctx.Set(consts.ContextKeyStore, handler.s) + ctx.Request.Body = io.NopCloser(bytes.NewBuffer(body)) + ctx.Params = []gin.Param{ + {Key: "namespace", Value: ns}, + {Key: "cluster", Value: cluster.Name}, + } + middleware.RequiredCluster(ctx) + require.Equal(t, http.StatusOK, recorder.Code) + handler.SetStatus(ctx) + require.Equal(t, expectedCode, recorder.Code) + } + + t.Run("offline slave", func(t *testing.T) { + runSetStatus(t, []string{slaveAddr}, false, http.StatusOK) + }) + + t.Run("online slave", func(t *testing.T) { + runSetStatus(t, []string{slaveAddr}, true, http.StatusOK) + }) + + t.Run("offline master rejected", func(t *testing.T) { + runSetStatus(t, []string{masterAddr}, false, http.StatusBadRequest) + }) +} diff --git a/server/helper/helper.go b/server/helper/helper.go index 896a2a47..88550c05 100644 --- a/server/helper/helper.go +++ b/server/helper/helper.go @@ -77,6 +77,8 @@ func ResponseError(c *gin.Context, err error) { code = http.StatusForbidden } else if errors.Is(err, consts.ErrInvalidArgument) { code = http.StatusBadRequest + } else if errors.Is(err, consts.ErrCannotOfflineMaster) { + code = http.StatusBadRequest } c.JSON(code, Response{ Error: &Error{Message: err.Error()}, diff --git a/server/route.go b/server/route.go index 109f8dd4..b5eb94a2 100644 --- a/server/route.go +++ b/server/route.go @@ -70,6 +70,7 @@ func (srv *Server) initHandlers() { clusters.GET("/:cluster", middleware.RequiredCluster, handler.Cluster.Get) clusters.DELETE("/:cluster", middleware.RequiredCluster, handler.Cluster.Remove) clusters.POST("/:cluster/migrate", handler.Cluster.MigrateSlot) + clusters.POST("/:cluster/nodes/status", middleware.RequiredCluster, handler.Node.SetStatus) } shards := clusters.Group("/:cluster/shards") diff --git a/store/cluster.go b/store/cluster.go index e33e41a1..0a6933f0 100644 --- a/store/cluster.go +++ b/store/cluster.go @@ -157,6 +157,48 @@ func (cluster *Cluster) SyncToNodes(ctx context.Context) error { return nil } +func (cluster *Cluster) findNodeByAddr(addr string) Node { + for _, shard := range cluster.Shards { + for _, node := range shard.Nodes { + if node.Addr() == addr { + return node + } + } + } + return nil +} + +func (cluster *Cluster) SetNodesOffline(addrs []string) error { + // Validate all addrs first: must exist and must not be master. + for _, addr := range addrs { + node := cluster.findNodeByAddr(addr) + if node == nil { + return fmt.Errorf("node %s: %w", addr, consts.ErrNotFound) + } + if node.IsMaster() { + return fmt.Errorf("node %s: %w", addr, consts.ErrCannotOfflineMaster) + } + } + for _, addr := range addrs { + cluster.findNodeByAddr(addr).SetFailed(true) + } + return nil +} + +func (cluster *Cluster) SetNodesOnline(addrs []string) error { + // Validate all addrs first: must exist. + for _, addr := range addrs { + node := cluster.findNodeByAddr(addr) + if node == nil { + return fmt.Errorf("node %s: %w", addr, consts.ErrNotFound) + } + } + for _, addr := range addrs { + cluster.findNodeByAddr(addr).SetFailed(false) + } + return nil +} + func (cluster *Cluster) GetNodes() []Node { nodes := make([]Node, 0) for i := 0; i < len(cluster.Shards); i++ { @@ -273,10 +315,19 @@ func ParseCluster(clusterStr string) (*Cluster, error) { addr: strings.Split(fields[1], "@")[0], } - if strings.Contains(fields[2], ",") { - node.role = strings.Split(fields[2], ",")[1] - } else { - node.role = fields[2] + // Parse comma-separated flags (e.g. "slave,fail", "myself,master") + // to extract role and failed state. + roleFlags := strings.Split(fields[2], ",") + for _, flag := range roleFlags { + switch flag { + case RoleMaster: + node.role = RoleMaster + case RoleSlave: + node.role = RoleSlave + case "fail": + node.failed = true + } + // ignore: myself, pfail, handshake, noaddr, nofailover, noflags } var err error diff --git a/store/cluster_node.go b/store/cluster_node.go index 8fcd1063..7dbbcd67 100755 --- a/store/cluster_node.go +++ b/store/cluster_node.go @@ -59,9 +59,11 @@ type Node interface { Password() string Addr() string IsMaster() bool + Failed() bool SetRole(string) SetPassword(string) + SetFailed(bool) Reset(ctx context.Context) error GetClusterNodeInfo(ctx context.Context) (*ClusterNodeInfo, error) @@ -82,6 +84,7 @@ type ClusterNode struct { role string password string createdAt int64 + failed bool } type ClusterInfo struct { @@ -121,6 +124,14 @@ func (n *ClusterNode) SetRole(role string) { n.role = role } +func (n *ClusterNode) Failed() bool { + return n.failed +} + +func (n *ClusterNode) SetFailed(failed bool) { + n.failed = failed +} + func (n *ClusterNode) Addr() string { return n.addr } @@ -272,6 +283,7 @@ func (n *ClusterNode) MarshalJSON() ([]byte, error) { "role": n.role, "password": n.password, "created_at": n.createdAt, + "failed": n.failed, }) } @@ -282,6 +294,7 @@ func (n *ClusterNode) UnmarshalJSON(bytes []byte) error { Role string `json:"role"` Password string `json:"password"` CreatedAt int64 `json:"created_at"` + Failed bool `json:"failed"` } if err := json.Unmarshal(bytes, &data); err != nil { return err @@ -292,5 +305,6 @@ func (n *ClusterNode) UnmarshalJSON(bytes []byte) error { n.role = data.Role n.password = data.Password n.createdAt = data.CreatedAt + n.failed = data.Failed return nil } diff --git a/store/cluster_shard.go b/store/cluster_shard.go index 1181283a..910943b5 100644 --- a/store/cluster_shard.go +++ b/store/cluster_shard.go @@ -293,7 +293,11 @@ func (shard *Shard) ToSlotsString() (string, error) { } } } else { - builder.WriteString(RoleSlave) + if node.Failed() { + builder.WriteString(RoleSlave + ",fail") + } else { + builder.WriteString(RoleSlave) + } builder.WriteByte(' ') builder.WriteString(shard.Nodes[masterNodeIndex].ID()) } diff --git a/store/cluster_shard_test.go b/store/cluster_shard_test.go index 1406f351..f47e79ac 100644 --- a/store/cluster_shard_test.go +++ b/store/cluster_shard_test.go @@ -78,3 +78,39 @@ func TestShard_IsServicing(t *testing.T) { shard.SlotRanges = []SlotRange{{Start: -1, Stop: -1}} require.False(t, shard.IsServicing()) } + +func TestToSlotsString_WithFailedSlave(t *testing.T) { + shard := NewShard() + shard.SlotRanges = []SlotRange{{Start: 0, Stop: 100}} + + master := NewClusterNode("127.0.0.1:6379", "") + master.SetRole(RoleMaster) + + slave := NewClusterNode("127.0.0.1:6380", "") + slave.SetRole(RoleSlave) + slave.SetFailed(true) + + shard.Nodes = []Node{master, slave} + + result, err := shard.ToSlotsString() + require.NoError(t, err) + require.Contains(t, result, "slave,fail "+master.ID()) +} + +func TestToSlotsString_WithOnlineSlave(t *testing.T) { + shard := NewShard() + shard.SlotRanges = []SlotRange{{Start: 0, Stop: 100}} + + master := NewClusterNode("127.0.0.1:6379", "") + master.SetRole(RoleMaster) + + slave := NewClusterNode("127.0.0.1:6380", "") + slave.SetRole(RoleSlave) + + shard.Nodes = []Node{master, slave} + + result, err := shard.ToSlotsString() + require.NoError(t, err) + require.Contains(t, result, "slave "+master.ID()) + require.NotContains(t, result, "slave,fail") +} diff --git a/store/cluster_test.go b/store/cluster_test.go index 975f03f6..eb8fe164 100644 --- a/store/cluster_test.go +++ b/store/cluster_test.go @@ -106,3 +106,67 @@ func TestCluster_PromoteNewMaster(t *testing.T) { require.NoError(t, err) require.Equal(t, node2.ID(), newMasterID) } + +func TestCluster_SetNodesOffline(t *testing.T) { + cluster, err := NewCluster("test", []string{"node1", "node2"}, 2) + require.NoError(t, err) + require.Len(t, cluster.Shards, 1) + + masterAddr := cluster.Shards[0].Nodes[0].Addr() + slaveAddr := cluster.Shards[0].Nodes[1].Addr() + + // Cannot offline master + err = cluster.SetNodesOffline([]string{masterAddr}) + require.ErrorIs(t, err, consts.ErrCannotOfflineMaster) + + // Can offline slave + err = cluster.SetNodesOffline([]string{slaveAddr}) + require.NoError(t, err) + require.True(t, cluster.Shards[0].Nodes[1].Failed()) + + // Addr not found + err = cluster.SetNodesOffline([]string{"nonexistent:1234"}) + require.ErrorIs(t, err, consts.ErrNotFound) + + // Atomic: if any addr is invalid, none are applied + cluster.Shards[0].Nodes[1].SetFailed(false) + err = cluster.SetNodesOffline([]string{slaveAddr, "nonexistent:1234"}) + require.ErrorIs(t, err, consts.ErrNotFound) + require.False(t, cluster.Shards[0].Nodes[1].Failed()) // not modified +} + +func TestCluster_SetNodesOnline(t *testing.T) { + cluster, err := NewCluster("test", []string{"node1", "node2"}, 2) + require.NoError(t, err) + + slaveAddr := cluster.Shards[0].Nodes[1].Addr() + + // First offline + err = cluster.SetNodesOffline([]string{slaveAddr}) + require.NoError(t, err) + require.True(t, cluster.Shards[0].Nodes[1].Failed()) + + // Then online + err = cluster.SetNodesOnline([]string{slaveAddr}) + require.NoError(t, err) + require.False(t, cluster.Shards[0].Nodes[1].Failed()) +} + +func TestParseCluster_WithFailFlag(t *testing.T) { + // Build a cluster nodes string with a failed slave + clusterStr := "cfb28ef1deee4e0fa78da86abe5d24c8589b4f09 127.0.0.1:30001 master - 0 0 1 connected 0-5460\n" + + "e44242e22c74bbe4deab41c6a9dfb68e099f2f08 127.0.0.1:30004 slave,fail cfb28ef1deee4e0fa78da86abe5d24c8589b4f09 0 0 1 connected" + + cluster, err := ParseCluster(clusterStr) + require.NoError(t, err) + require.Len(t, cluster.Shards, 1) + require.Len(t, cluster.Shards[0].Nodes, 2) + + master := cluster.Shards[0].Nodes[0] + require.True(t, master.IsMaster()) + require.False(t, master.Failed()) + + slave := cluster.Shards[0].Nodes[1] + require.False(t, slave.IsMaster()) + require.True(t, slave.Failed()) +} From 87937e4d791de2787970cb50c7689115e970e22d Mon Sep 17 00:00:00 2001 From: paragrf Date: Thu, 9 Apr 2026 11:45:57 +0800 Subject: [PATCH 2/4] feat(ha): topology is also updated upon slave node failure --- controller/cluster.go | 35 ++++++++++++++---- controller/cluster_test.go | 72 +++++++++++++++++++++++++++++++++++++- store/cluster.go | 26 ++++++++++---- store/cluster_test.go | 23 ++++++++++++ 4 files changed, 143 insertions(+), 13 deletions(-) diff --git a/controller/cluster.go b/controller/cluster.go index 8d4e70d3..8e0ba4d2 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -132,17 +132,37 @@ func (c *ClusterChecker) increaseFailureCount(shardIndex int, node store.Node) i count := c.failureCounts[id] c.failureMu.Unlock() - // don't add the node into the failover candidates if it's not a master node if !node.IsMaster() { + if count >= c.options.maxFailureCount && !node.Failed() { + log := logger.Get().With( + zap.String("cluster_name", c.clusterName), + zap.String("id", node.ID()), + zap.String("addr", node.Addr()), + zap.Int64("failure_count", count)) + cluster, err := c.clusterStore.GetCluster(c.ctx, c.namespace, c.clusterName) + if err != nil { + log.Error("Failed to get the cluster info", zap.Error(err)) + return count + } + if err := cluster.SetNodeFailedByID(node.ID(), true); err != nil { + log.Error("Failed to set slave node as failed", zap.Error(err)) + return count + } + if err := c.clusterStore.UpdateCluster(c.ctx, c.namespace, cluster); err != nil { + log.Error("Failed to update the cluster", zap.Error(err)) + return count + } + log.Info("Marked slave node as failed due to probe failures") + } return count } - log := logger.Get().With( - zap.String("cluster_name", c.clusterName), - zap.String("id", node.ID()), - zap.Bool("is_master", node.IsMaster()), - zap.String("addr", node.Addr())) if count%c.options.maxFailureCount == 0 || count > c.options.maxFailureCount { + log := logger.Get().With( + zap.String("cluster_name", c.clusterName), + zap.String("id", node.ID()), + zap.Bool("is_master", node.IsMaster()), + zap.String("addr", node.Addr())) cluster, err := c.clusterStore.GetCluster(c.ctx, c.namespace, c.clusterName) if err != nil { log.Error("Failed to get the cluster info", zap.Error(err)) @@ -188,6 +208,9 @@ func (c *ClusterChecker) syncClusterToNodes(ctx context.Context) error { version := clusterInfo.Version.Load() for _, shard := range clusterInfo.Shards { for _, node := range shard.Nodes { + if node.Failed() { + continue + } go func(n store.Node) { log := logger.Get().With( zap.String("namespace", c.namespace), diff --git a/controller/cluster_test.go b/controller/cluster_test.go index a1a720a6..70d99f2b 100644 --- a/controller/cluster_test.go +++ b/controller/cluster_test.go @@ -144,14 +144,84 @@ func TestCluster_FailureCount(t *testing.T) { require.EqualValues(t, 0, cluster.failureCounts[mockNode2.Addr()]) require.True(t, mockNode2.IsMaster()) - // it will be always increase the failure count until the node is back again. + // Slave failure count keeps increasing; at threshold the slave is auto-marked as failed. for i := int64(0); i < cluster.options.maxFailureCount*2; i++ { require.EqualValues(t, i+1, cluster.increaseFailureCount(0, mockNode3)) } + require.True(t, mockNode3.Failed()) + require.EqualValues(t, 3, clusterInfo.Version.Load()) cluster.resetFailureCount(mockNode3.ID()) require.EqualValues(t, 0, cluster.failureCounts[mockNode3.ID()]) } +func TestCluster_SlaveFailureAutoOffline(t *testing.T) { + ctx := context.Background() + ns := "test-ns" + clusterName := "test-slave-offline" + + s := NewMockClusterStore() + mockMaster := store.NewClusterMockNode() + mockMaster.SetRole(store.RoleMaster) + mockMaster.Sequence = 100 + + mockSlave1 := store.NewClusterMockNode() + mockSlave1.SetRole(store.RoleSlave) + mockSlave1.Sequence = 90 + + mockSlave2 := store.NewClusterMockNode() + mockSlave2.SetRole(store.RoleSlave) + mockSlave2.Sequence = 80 + + clusterInfo := &store.Cluster{ + Name: clusterName, + Shards: []*store.Shard{{ + Nodes: []store.Node{mockMaster, mockSlave1, mockSlave2}, + SlotRanges: []store.SlotRange{{Start: 0, Stop: 16383}}, + MigratingSlot: &store.MigratingSlot{IsMigrating: false}, + TargetShardIndex: -1, + }}, + } + clusterInfo.Version.Store(1) + require.NoError(t, s.CreateCluster(ctx, ns, clusterInfo)) + + checker := &ClusterChecker{ + clusterStore: s, + namespace: ns, + clusterName: clusterName, + options: ClusterCheckOptions{ + pingInterval: time.Second, + maxFailureCount: 3, + }, + failureCounts: make(map[string]int64), + syncCh: make(chan struct{}, 1), + } + + // Slave should not be marked as failed before reaching threshold + require.False(t, mockSlave1.Failed()) + for i := int64(0); i < checker.options.maxFailureCount-1; i++ { + checker.increaseFailureCount(0, mockSlave1) + } + require.False(t, mockSlave1.Failed()) + require.EqualValues(t, 1, clusterInfo.Version.Load()) + + // Slave should be marked as failed when reaching threshold + checker.increaseFailureCount(0, mockSlave1) + require.True(t, mockSlave1.Failed()) + require.EqualValues(t, 2, clusterInfo.Version.Load()) + + // Subsequent failures should not trigger another update (already failed) + checker.increaseFailureCount(0, mockSlave1) + require.True(t, mockSlave1.Failed()) + require.EqualValues(t, 2, clusterInfo.Version.Load()) + + // Other slaves are not affected + require.False(t, mockSlave2.Failed()) + + // Master should not be affected by slave offline logic + require.True(t, mockMaster.IsMaster()) + require.False(t, mockMaster.Failed()) +} + func TestCluster_LoadAndProbe(t *testing.T) { ctx := context.Background() ns := "test-ns" diff --git a/store/cluster.go b/store/cluster.go index 0a6933f0..68b2ba00 100644 --- a/store/cluster.go +++ b/store/cluster.go @@ -168,8 +168,20 @@ func (cluster *Cluster) findNodeByAddr(addr string) Node { return nil } +func (cluster *Cluster) SetNodeFailedByID(nodeID string, failed bool) error { + for _, shard := range cluster.Shards { + for _, node := range shard.Nodes { + if node.ID() == nodeID { + node.SetFailed(failed) + return nil + } + } + } + return fmt.Errorf("node %s: %w", nodeID, consts.ErrNotFound) +} + func (cluster *Cluster) SetNodesOffline(addrs []string) error { - // Validate all addrs first: must exist and must not be master. + nodes := make([]Node, 0, len(addrs)) for _, addr := range addrs { node := cluster.findNodeByAddr(addr) if node == nil { @@ -178,23 +190,25 @@ func (cluster *Cluster) SetNodesOffline(addrs []string) error { if node.IsMaster() { return fmt.Errorf("node %s: %w", addr, consts.ErrCannotOfflineMaster) } + nodes = append(nodes, node) } - for _, addr := range addrs { - cluster.findNodeByAddr(addr).SetFailed(true) + for _, node := range nodes { + node.SetFailed(true) } return nil } func (cluster *Cluster) SetNodesOnline(addrs []string) error { - // Validate all addrs first: must exist. + nodes := make([]Node, 0, len(addrs)) for _, addr := range addrs { node := cluster.findNodeByAddr(addr) if node == nil { return fmt.Errorf("node %s: %w", addr, consts.ErrNotFound) } + nodes = append(nodes, node) } - for _, addr := range addrs { - cluster.findNodeByAddr(addr).SetFailed(false) + for _, node := range nodes { + node.SetFailed(false) } return nil } diff --git a/store/cluster_test.go b/store/cluster_test.go index eb8fe164..7c5d6ad9 100644 --- a/store/cluster_test.go +++ b/store/cluster_test.go @@ -107,6 +107,29 @@ func TestCluster_PromoteNewMaster(t *testing.T) { require.Equal(t, node2.ID(), newMasterID) } +func TestCluster_SetNodeFailedByID(t *testing.T) { + cluster, err := NewCluster("test", []string{"node1", "node2", "node3"}, 3) + require.NoError(t, err) + require.Len(t, cluster.Shards, 1) + + slaveNode := cluster.Shards[0].Nodes[1] + require.False(t, slaveNode.Failed()) + + // Set failed by ID + err = cluster.SetNodeFailedByID(slaveNode.ID(), true) + require.NoError(t, err) + require.True(t, slaveNode.Failed()) + + // Set back to not-failed + err = cluster.SetNodeFailedByID(slaveNode.ID(), false) + require.NoError(t, err) + require.False(t, slaveNode.Failed()) + + // Non-existent node ID + err = cluster.SetNodeFailedByID("nonexistent-id", true) + require.ErrorIs(t, err, consts.ErrNotFound) +} + func TestCluster_SetNodesOffline(t *testing.T) { cluster, err := NewCluster("test", []string{"node1", "node2"}, 2) require.NoError(t, err) From 377ef72a4ea76ab11df97c01ebd576163bd44fd2 Mon Sep 17 00:00:00 2001 From: paragrf Date: Thu, 23 Apr 2026 15:50:38 +0800 Subject: [PATCH 3/4] feature:Enhance compatibility for diverse node statuses --- controller/cluster.go | 2 +- store/cluster.go | 40 ++++++++++++----------- store/cluster_node.go | 65 ++++++++++++++++++++++++++++++------- store/cluster_shard_test.go | 2 +- store/cluster_test.go | 20 ++++++------ 5 files changed, 86 insertions(+), 43 deletions(-) diff --git a/controller/cluster.go b/controller/cluster.go index 8e0ba4d2..e6594374 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -144,7 +144,7 @@ func (c *ClusterChecker) increaseFailureCount(shardIndex int, node store.Node) i log.Error("Failed to get the cluster info", zap.Error(err)) return count } - if err := cluster.SetNodeFailedByID(node.ID(), true); err != nil { + if err := cluster.SetNodeStatusByID(node.ID(), store.NodeStatusFailed); err != nil { log.Error("Failed to set slave node as failed", zap.Error(err)) return count } diff --git a/store/cluster.go b/store/cluster.go index 68b2ba00..00c0ee39 100644 --- a/store/cluster.go +++ b/store/cluster.go @@ -168,11 +168,11 @@ func (cluster *Cluster) findNodeByAddr(addr string) Node { return nil } -func (cluster *Cluster) SetNodeFailedByID(nodeID string, failed bool) error { +func (cluster *Cluster) SetNodeStatusByID(nodeID string, status NodeStatus) error { for _, shard := range cluster.Shards { for _, node := range shard.Nodes { if node.ID() == nodeID { - node.SetFailed(failed) + node.SetStatus(status) return nil } } @@ -180,37 +180,39 @@ func (cluster *Cluster) SetNodeFailedByID(nodeID string, failed bool) error { return fmt.Errorf("node %s: %w", nodeID, consts.ErrNotFound) } -func (cluster *Cluster) SetNodesOffline(addrs []string) error { +// setNodesStatus finds nodes by address, applies an optional per-node validation, +// and only updates status when all nodes pass — ensuring all-or-nothing semantics. +func (cluster *Cluster) setNodesStatus(addrs []string, status NodeStatus, validate func(Node) error) error { nodes := make([]Node, 0, len(addrs)) for _, addr := range addrs { node := cluster.findNodeByAddr(addr) if node == nil { return fmt.Errorf("node %s: %w", addr, consts.ErrNotFound) } - if node.IsMaster() { - return fmt.Errorf("node %s: %w", addr, consts.ErrCannotOfflineMaster) + if validate != nil { + if err := validate(node); err != nil { + return err + } } nodes = append(nodes, node) } for _, node := range nodes { - node.SetFailed(true) + node.SetStatus(status) } return nil } -func (cluster *Cluster) SetNodesOnline(addrs []string) error { - nodes := make([]Node, 0, len(addrs)) - for _, addr := range addrs { - node := cluster.findNodeByAddr(addr) - if node == nil { - return fmt.Errorf("node %s: %w", addr, consts.ErrNotFound) +func (cluster *Cluster) SetNodesOffline(addrs []string) error { + return cluster.setNodesStatus(addrs, NodeStatusFailed, func(node Node) error { + if node.IsMaster() { + return fmt.Errorf("node %s: %w", node.Addr(), consts.ErrCannotOfflineMaster) } - nodes = append(nodes, node) - } - for _, node := range nodes { - node.SetFailed(false) - } - return nil + return nil + }) +} + +func (cluster *Cluster) SetNodesOnline(addrs []string) error { + return cluster.setNodesStatus(addrs, NodeStatusNormal, nil) } func (cluster *Cluster) GetNodes() []Node { @@ -339,7 +341,7 @@ func ParseCluster(clusterStr string) (*Cluster, error) { case RoleSlave: node.role = RoleSlave case "fail": - node.failed = true + node.status = NodeStatusFailed } // ignore: myself, pfail, handshake, noaddr, nofailover, noflags } diff --git a/store/cluster_node.go b/store/cluster_node.go index 7dbbcd67..c4d3a942 100755 --- a/store/cluster_node.go +++ b/store/cluster_node.go @@ -42,6 +42,21 @@ const ( NodeIDLen = 40 ) +type NodeStatus string + +const ( + NodeStatusNormal NodeStatus = "normal" + NodeStatusFailed NodeStatus = "failed" +) + +func (s NodeStatus) IsValid() bool { + switch s { + case NodeStatusNormal, NodeStatusFailed: + return true + } + return false +} + const ( dialTimeout = 3200 * time.Millisecond readTimeout = 3 * time.Second @@ -59,11 +74,12 @@ type Node interface { Password() string Addr() string IsMaster() bool + Status() NodeStatus Failed() bool SetRole(string) SetPassword(string) - SetFailed(bool) + SetStatus(NodeStatus) Reset(ctx context.Context) error GetClusterNodeInfo(ctx context.Context) (*ClusterNodeInfo, error) @@ -84,7 +100,7 @@ type ClusterNode struct { role string password string createdAt int64 - failed bool + status NodeStatus } type ClusterInfo struct { @@ -104,6 +120,7 @@ func NewClusterNode(addr, password string) *ClusterNode { addr: addr, password: password, role: RoleMaster, + status: NodeStatusNormal, createdAt: time.Now().Unix(), } } @@ -124,12 +141,24 @@ func (n *ClusterNode) SetRole(role string) { n.role = role } +func (n *ClusterNode) Status() NodeStatus { + return n.status +} + +func (n *ClusterNode) SetStatus(status NodeStatus) { + n.status = status +} + func (n *ClusterNode) Failed() bool { - return n.failed + return n.status == NodeStatusFailed } func (n *ClusterNode) SetFailed(failed bool) { - n.failed = failed + if failed { + n.status = NodeStatusFailed + } else { + n.status = NodeStatusNormal + } } func (n *ClusterNode) Addr() string { @@ -283,18 +312,20 @@ func (n *ClusterNode) MarshalJSON() ([]byte, error) { "role": n.role, "password": n.password, "created_at": n.createdAt, - "failed": n.failed, + "status": n.status, }) } func (n *ClusterNode) UnmarshalJSON(bytes []byte) error { var data struct { - ID string `json:"id"` - Addr string `json:"addr"` - Role string `json:"role"` - Password string `json:"password"` - CreatedAt int64 `json:"created_at"` - Failed bool `json:"failed"` + ID string `json:"id"` + Addr string `json:"addr"` + Role string `json:"role"` + Password string `json:"password"` + CreatedAt int64 `json:"created_at"` + Status NodeStatus `json:"status"` + // Failed is kept for backward compatibility with persisted data + Failed bool `json:"failed"` } if err := json.Unmarshal(bytes, &data); err != nil { return err @@ -305,6 +336,16 @@ func (n *ClusterNode) UnmarshalJSON(bytes []byte) error { n.role = data.Role n.password = data.Password n.createdAt = data.CreatedAt - n.failed = data.Failed + switch { + case data.Status != "" && !data.Status.IsValid(): + return fmt.Errorf("unknown node status: %q", data.Status) + case data.Status != "": + n.status = data.Status + case data.Failed: + // backward compatibility with persisted data that used the old "failed" bool field + n.status = NodeStatusFailed + default: + n.status = NodeStatusNormal + } return nil } diff --git a/store/cluster_shard_test.go b/store/cluster_shard_test.go index f47e79ac..971bde1c 100644 --- a/store/cluster_shard_test.go +++ b/store/cluster_shard_test.go @@ -88,7 +88,7 @@ func TestToSlotsString_WithFailedSlave(t *testing.T) { slave := NewClusterNode("127.0.0.1:6380", "") slave.SetRole(RoleSlave) - slave.SetFailed(true) + slave.SetStatus(NodeStatusFailed) shard.Nodes = []Node{master, slave} diff --git a/store/cluster_test.go b/store/cluster_test.go index 7c5d6ad9..31ae9058 100644 --- a/store/cluster_test.go +++ b/store/cluster_test.go @@ -107,26 +107,26 @@ func TestCluster_PromoteNewMaster(t *testing.T) { require.Equal(t, node2.ID(), newMasterID) } -func TestCluster_SetNodeFailedByID(t *testing.T) { +func TestCluster_SetNodeStatusByID(t *testing.T) { cluster, err := NewCluster("test", []string{"node1", "node2", "node3"}, 3) require.NoError(t, err) require.Len(t, cluster.Shards, 1) slaveNode := cluster.Shards[0].Nodes[1] - require.False(t, slaveNode.Failed()) + require.Equal(t, NodeStatusNormal, slaveNode.Status()) - // Set failed by ID - err = cluster.SetNodeFailedByID(slaveNode.ID(), true) + // Set to failed + err = cluster.SetNodeStatusByID(slaveNode.ID(), NodeStatusFailed) require.NoError(t, err) - require.True(t, slaveNode.Failed()) + require.Equal(t, NodeStatusFailed, slaveNode.Status()) - // Set back to not-failed - err = cluster.SetNodeFailedByID(slaveNode.ID(), false) + // Set back to normal + err = cluster.SetNodeStatusByID(slaveNode.ID(), NodeStatusNormal) require.NoError(t, err) - require.False(t, slaveNode.Failed()) + require.Equal(t, NodeStatusNormal, slaveNode.Status()) // Non-existent node ID - err = cluster.SetNodeFailedByID("nonexistent-id", true) + err = cluster.SetNodeStatusByID("nonexistent-id", NodeStatusFailed) require.ErrorIs(t, err, consts.ErrNotFound) } @@ -152,7 +152,7 @@ func TestCluster_SetNodesOffline(t *testing.T) { require.ErrorIs(t, err, consts.ErrNotFound) // Atomic: if any addr is invalid, none are applied - cluster.Shards[0].Nodes[1].SetFailed(false) + cluster.Shards[0].Nodes[1].SetStatus(NodeStatusNormal) err = cluster.SetNodesOffline([]string{slaveAddr, "nonexistent:1234"}) require.ErrorIs(t, err, consts.ErrNotFound) require.False(t, cluster.Shards[0].Nodes[1].Failed()) // not modified From cc055b5839ab98fa87fdd64d1599119a432e9dce Mon Sep 17 00:00:00 2001 From: paragrf Date: Fri, 24 Apr 2026 14:10:57 +0800 Subject: [PATCH 4/4] bugfix:compatible with legacy Kvrocks --- config/config.go | 4 ++++ controller/cluster.go | 12 +++++++++--- controller/cluster_test.go | 10 ++++++---- controller/controller.go | 3 ++- store/cluster_node.go | 5 ----- 5 files changed, 21 insertions(+), 13 deletions(-) diff --git a/config/config.go b/config/config.go index 69558fb0..d88862a2 100644 --- a/config/config.go +++ b/config/config.go @@ -43,6 +43,10 @@ type AdminConfig struct { type FailOverConfig struct { PingIntervalSeconds int `yaml:"ping_interval_seconds"` MaxPingCount int64 `yaml:"max_ping_count"` + // EnableSlaveHAUpdate controls whether HA logic marks failed slave nodes and + // propagates the updated topology. Requires kvrocks to support node status + // modification (new versions only). Defaults to false for backward compatibility. + EnableSlaveHAUpdate bool `yaml:"enable_slave_ha_update"` } type ControllerConfig struct { diff --git a/controller/cluster.go b/controller/cluster.go index e6594374..22242a9b 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -38,8 +38,9 @@ var ( ) type ClusterCheckOptions struct { - pingInterval time.Duration - maxFailureCount int64 + pingInterval time.Duration + maxFailureCount int64 + enableSlaveHAUpdate bool } type ClusterChecker struct { @@ -104,6 +105,11 @@ func (c *ClusterChecker) WithMaxFailureCount(count int64) *ClusterChecker { return c } +func (c *ClusterChecker) WithSlaveHAUpdate(enable bool) *ClusterChecker { + c.options.enableSlaveHAUpdate = enable + return c +} + func (c *ClusterChecker) probeNode(ctx context.Context, node store.Node) (int64, error) { clusterInfo, err := node.GetClusterInfo(ctx) if err != nil { @@ -133,7 +139,7 @@ func (c *ClusterChecker) increaseFailureCount(shardIndex int, node store.Node) i c.failureMu.Unlock() if !node.IsMaster() { - if count >= c.options.maxFailureCount && !node.Failed() { + if c.options.enableSlaveHAUpdate && count >= c.options.maxFailureCount && !node.Failed() { log := logger.Get().With( zap.String("cluster_name", c.clusterName), zap.String("id", node.ID()), diff --git a/controller/cluster_test.go b/controller/cluster_test.go index 70d99f2b..a3774446 100644 --- a/controller/cluster_test.go +++ b/controller/cluster_test.go @@ -122,8 +122,9 @@ func TestCluster_FailureCount(t *testing.T) { namespace: ns, clusterName: clusterName, options: ClusterCheckOptions{ - pingInterval: time.Second, - maxFailureCount: 3, + pingInterval: time.Second, + maxFailureCount: 3, + enableSlaveHAUpdate: true, }, failureCounts: make(map[string]int64), syncCh: make(chan struct{}, 1), @@ -189,8 +190,9 @@ func TestCluster_SlaveFailureAutoOffline(t *testing.T) { namespace: ns, clusterName: clusterName, options: ClusterCheckOptions{ - pingInterval: time.Second, - maxFailureCount: 3, + pingInterval: time.Second, + maxFailureCount: 3, + enableSlaveHAUpdate: true, }, failureCounts: make(map[string]int64), syncCh: make(chan struct{}, 1), diff --git a/controller/controller.go b/controller/controller.go index 806c34a9..5d2c446d 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -189,7 +189,8 @@ func (c *Controller) addCluster(namespace, clusterName string) { cluster := NewClusterChecker(c.clusterStore, namespace, clusterName). WithPingInterval(time.Duration(c.config.FailOver.PingIntervalSeconds) * time.Second). - WithMaxFailureCount(c.config.FailOver.MaxPingCount) + WithMaxFailureCount(c.config.FailOver.MaxPingCount). + WithSlaveHAUpdate(c.config.FailOver.EnableSlaveHAUpdate) cluster.Start() c.mu.Lock() diff --git a/store/cluster_node.go b/store/cluster_node.go index c4d3a942..d0b25237 100755 --- a/store/cluster_node.go +++ b/store/cluster_node.go @@ -324,8 +324,6 @@ func (n *ClusterNode) UnmarshalJSON(bytes []byte) error { Password string `json:"password"` CreatedAt int64 `json:"created_at"` Status NodeStatus `json:"status"` - // Failed is kept for backward compatibility with persisted data - Failed bool `json:"failed"` } if err := json.Unmarshal(bytes, &data); err != nil { return err @@ -341,9 +339,6 @@ func (n *ClusterNode) UnmarshalJSON(bytes []byte) error { return fmt.Errorf("unknown node status: %q", data.Status) case data.Status != "": n.status = data.Status - case data.Failed: - // backward compatibility with persisted data that used the old "failed" bool field - n.status = NodeStatusFailed default: n.status = NodeStatusNormal }