Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ type AdminConfig struct {
type FailOverConfig struct {
PingIntervalSeconds int `yaml:"ping_interval_seconds"`
MaxPingCount int64 `yaml:"max_ping_count"`
// EnableSlaveHAUpdate controls whether HA logic marks failed slave nodes and
// propagates the updated topology. Requires kvrocks to support node status
// modification (new versions only). Defaults to false for backward compatibility.
EnableSlaveHAUpdate bool `yaml:"enable_slave_ha_update"`
}

type ControllerConfig struct {
Expand Down
1 change: 1 addition & 0 deletions consts/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ var (
ErrShardIsServicing = errors.New("shard is servicing")
ErrShardSlotIsMigrating = errors.New("shard slot is migrating")
ErrShardNoMatchNewMaster = errors.New("no match new master in shard")
ErrCannotOfflineMaster = errors.New("cannot take master node offline, failover first")
ErrSlotStartAndStopEqual = errors.New("start and stop of a range cannot be equal")
)
45 changes: 37 additions & 8 deletions controller/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ var (
)

type ClusterCheckOptions struct {
pingInterval time.Duration
maxFailureCount int64
pingInterval time.Duration
maxFailureCount int64
enableSlaveHAUpdate bool
}

type ClusterChecker struct {
Expand Down Expand Up @@ -104,6 +105,11 @@ func (c *ClusterChecker) WithMaxFailureCount(count int64) *ClusterChecker {
return c
}

func (c *ClusterChecker) WithSlaveHAUpdate(enable bool) *ClusterChecker {
c.options.enableSlaveHAUpdate = enable
return c
}

func (c *ClusterChecker) probeNode(ctx context.Context, node store.Node) (int64, error) {
clusterInfo, err := node.GetClusterInfo(ctx)
if err != nil {
Expand Down Expand Up @@ -132,17 +138,37 @@ func (c *ClusterChecker) increaseFailureCount(shardIndex int, node store.Node) i
count := c.failureCounts[id]
c.failureMu.Unlock()

// don't add the node into the failover candidates if it's not a master node
if !node.IsMaster() {
if c.options.enableSlaveHAUpdate && count >= c.options.maxFailureCount && !node.Failed() {
log := logger.Get().With(
zap.String("cluster_name", c.clusterName),
zap.String("id", node.ID()),
zap.String("addr", node.Addr()),
zap.Int64("failure_count", count))
cluster, err := c.clusterStore.GetCluster(c.ctx, c.namespace, c.clusterName)
if err != nil {
log.Error("Failed to get the cluster info", zap.Error(err))
return count
}
if err := cluster.SetNodeStatusByID(node.ID(), store.NodeStatusFailed); err != nil {
log.Error("Failed to set slave node as failed", zap.Error(err))
return count
}
if err := c.clusterStore.UpdateCluster(c.ctx, c.namespace, cluster); err != nil {
log.Error("Failed to update the cluster", zap.Error(err))
return count
}
log.Info("Marked slave node as failed due to probe failures")
}
return count
}

log := logger.Get().With(
zap.String("cluster_name", c.clusterName),
zap.String("id", node.ID()),
zap.Bool("is_master", node.IsMaster()),
zap.String("addr", node.Addr()))
if count%c.options.maxFailureCount == 0 || count > c.options.maxFailureCount {
log := logger.Get().With(
zap.String("cluster_name", c.clusterName),
zap.String("id", node.ID()),
zap.Bool("is_master", node.IsMaster()),
zap.String("addr", node.Addr()))
cluster, err := c.clusterStore.GetCluster(c.ctx, c.namespace, c.clusterName)
if err != nil {
log.Error("Failed to get the cluster info", zap.Error(err))
Expand Down Expand Up @@ -188,6 +214,9 @@ func (c *ClusterChecker) syncClusterToNodes(ctx context.Context) error {
version := clusterInfo.Version.Load()
for _, shard := range clusterInfo.Shards {
for _, node := range shard.Nodes {
if node.Failed() {
continue
}
go func(n store.Node) {
log := logger.Get().With(
zap.String("namespace", c.namespace),
Expand Down
78 changes: 75 additions & 3 deletions controller/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ func TestCluster_FailureCount(t *testing.T) {
namespace: ns,
clusterName: clusterName,
options: ClusterCheckOptions{
pingInterval: time.Second,
maxFailureCount: 3,
pingInterval: time.Second,
maxFailureCount: 3,
enableSlaveHAUpdate: true,
},
failureCounts: make(map[string]int64),
syncCh: make(chan struct{}, 1),
Expand All @@ -144,14 +145,85 @@ func TestCluster_FailureCount(t *testing.T) {
require.EqualValues(t, 0, cluster.failureCounts[mockNode2.Addr()])
require.True(t, mockNode2.IsMaster())

// it will be always increase the failure count until the node is back again.
// Slave failure count keeps increasing; at threshold the slave is auto-marked as failed.
for i := int64(0); i < cluster.options.maxFailureCount*2; i++ {
require.EqualValues(t, i+1, cluster.increaseFailureCount(0, mockNode3))
}
require.True(t, mockNode3.Failed())
require.EqualValues(t, 3, clusterInfo.Version.Load())
cluster.resetFailureCount(mockNode3.ID())
require.EqualValues(t, 0, cluster.failureCounts[mockNode3.ID()])
}

func TestCluster_SlaveFailureAutoOffline(t *testing.T) {
ctx := context.Background()
ns := "test-ns"
clusterName := "test-slave-offline"

s := NewMockClusterStore()
mockMaster := store.NewClusterMockNode()
mockMaster.SetRole(store.RoleMaster)
mockMaster.Sequence = 100

mockSlave1 := store.NewClusterMockNode()
mockSlave1.SetRole(store.RoleSlave)
mockSlave1.Sequence = 90

mockSlave2 := store.NewClusterMockNode()
mockSlave2.SetRole(store.RoleSlave)
mockSlave2.Sequence = 80

clusterInfo := &store.Cluster{
Name: clusterName,
Shards: []*store.Shard{{
Nodes: []store.Node{mockMaster, mockSlave1, mockSlave2},
SlotRanges: []store.SlotRange{{Start: 0, Stop: 16383}},
MigratingSlot: &store.MigratingSlot{IsMigrating: false},
TargetShardIndex: -1,
}},
}
clusterInfo.Version.Store(1)
require.NoError(t, s.CreateCluster(ctx, ns, clusterInfo))

checker := &ClusterChecker{
clusterStore: s,
namespace: ns,
clusterName: clusterName,
options: ClusterCheckOptions{
pingInterval: time.Second,
maxFailureCount: 3,
enableSlaveHAUpdate: true,
},
failureCounts: make(map[string]int64),
syncCh: make(chan struct{}, 1),
}

// Slave should not be marked as failed before reaching threshold
require.False(t, mockSlave1.Failed())
for i := int64(0); i < checker.options.maxFailureCount-1; i++ {
checker.increaseFailureCount(0, mockSlave1)
}
require.False(t, mockSlave1.Failed())
require.EqualValues(t, 1, clusterInfo.Version.Load())

// Slave should be marked as failed when reaching threshold
checker.increaseFailureCount(0, mockSlave1)
require.True(t, mockSlave1.Failed())
require.EqualValues(t, 2, clusterInfo.Version.Load())

// Subsequent failures should not trigger another update (already failed)
checker.increaseFailureCount(0, mockSlave1)
require.True(t, mockSlave1.Failed())
require.EqualValues(t, 2, clusterInfo.Version.Load())

// Other slaves are not affected
require.False(t, mockSlave2.Failed())

// Master should not be affected by slave offline logic
require.True(t, mockMaster.IsMaster())
require.False(t, mockMaster.Failed())
}

func TestCluster_LoadAndProbe(t *testing.T) {
ctx := context.Background()
ns := "test-ns"
Expand Down
3 changes: 2 additions & 1 deletion controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ func (c *Controller) addCluster(namespace, clusterName string) {

cluster := NewClusterChecker(c.clusterStore, namespace, clusterName).
WithPingInterval(time.Duration(c.config.FailOver.PingIntervalSeconds) * time.Second).
WithMaxFailureCount(c.config.FailOver.MaxPingCount)
WithMaxFailureCount(c.config.FailOver.MaxPingCount).
WithSlaveHAUpdate(c.config.FailOver.EnableSlaveHAUpdate)
cluster.Start()

c.mu.Lock()
Expand Down
37 changes: 37 additions & 0 deletions server/api/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ package api
import (
"strconv"

"go.uber.org/zap"

"github.com/apache/kvrocks-controller/consts"
"github.com/apache/kvrocks-controller/logger"
"github.com/apache/kvrocks-controller/server/helper"
"github.com/gin-gonic/gin"

Expand Down Expand Up @@ -82,3 +85,37 @@ func (handler *NodeHandler) Remove(c *gin.Context) {
}
helper.ResponseNoContent(c)
}

func (handler *NodeHandler) SetStatus(c *gin.Context) {
ns := c.Param("namespace")
cluster, _ := c.MustGet(consts.ContextKeyCluster).(*store.Cluster)

var req struct {
Addrs []string `json:"addrs" binding:"required"`
Online bool `json:"online"`
}
if err := c.ShouldBindJSON(&req); err != nil {
helper.ResponseBadRequest(c, err)
return
}

var err error
if req.Online {
err = cluster.SetNodesOnline(req.Addrs)
} else {
err = cluster.SetNodesOffline(req.Addrs)
}
if err != nil {
helper.ResponseError(c, err)
return
}

if err := handler.s.UpdateCluster(c, ns, cluster); err != nil {
helper.ResponseError(c, err)
return
}
if err := cluster.SyncToNodes(c); err != nil {
logger.Get().With(zap.Error(err)).Warn("Failed to sync cluster info to nodes after status change")
}
helper.ResponseOK(c, nil)
}
49 changes: 49 additions & 0 deletions server/api/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,52 @@ func TestNodeBasics(t *testing.T) {
runRemove(t, cluster.Shards[0].Nodes[1].ID(), http.StatusNoContent)
})
}

func TestNodeSetStatus(t *testing.T) {
ns := "test-ns"
cluster, err := store.NewCluster("test-cluster", []string{"127.0.0.1:1234", "127.0.0.1:1235"}, 2)
require.NoError(t, err)

handler := &NodeHandler{s: store.NewClusterStore(engine.NewMock())}
require.NoError(t, handler.s.CreateCluster(context.Background(), ns, cluster))

slaveAddr := cluster.Shards[0].Nodes[1].Addr()
masterAddr := cluster.Shards[0].Nodes[0].Addr()

runSetStatus := func(t *testing.T, addrs []string, online bool, expectedCode int) {
var req struct {
Addrs []string `json:"addrs"`
Online bool `json:"online"`
}
req.Addrs = addrs
req.Online = online

recorder := httptest.NewRecorder()
ctx := GetTestContext(recorder)
body, err := json.Marshal(req)
require.NoError(t, err)

ctx.Set(consts.ContextKeyStore, handler.s)
ctx.Request.Body = io.NopCloser(bytes.NewBuffer(body))
ctx.Params = []gin.Param{
{Key: "namespace", Value: ns},
{Key: "cluster", Value: cluster.Name},
}
middleware.RequiredCluster(ctx)
require.Equal(t, http.StatusOK, recorder.Code)
handler.SetStatus(ctx)
require.Equal(t, expectedCode, recorder.Code)
}

t.Run("offline slave", func(t *testing.T) {
runSetStatus(t, []string{slaveAddr}, false, http.StatusOK)
})

t.Run("online slave", func(t *testing.T) {
runSetStatus(t, []string{slaveAddr}, true, http.StatusOK)
})

t.Run("offline master rejected", func(t *testing.T) {
runSetStatus(t, []string{masterAddr}, false, http.StatusBadRequest)
})
}
2 changes: 2 additions & 0 deletions server/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func ResponseError(c *gin.Context, err error) {
code = http.StatusForbidden
} else if errors.Is(err, consts.ErrInvalidArgument) {
code = http.StatusBadRequest
} else if errors.Is(err, consts.ErrCannotOfflineMaster) {
code = http.StatusBadRequest
}
c.JSON(code, Response{
Error: &Error{Message: err.Error()},
Expand Down
1 change: 1 addition & 0 deletions server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (srv *Server) initHandlers() {
clusters.GET("/:cluster", middleware.RequiredCluster, handler.Cluster.Get)
clusters.DELETE("/:cluster", middleware.RequiredCluster, handler.Cluster.Remove)
clusters.POST("/:cluster/migrate", handler.Cluster.MigrateSlot)
clusters.POST("/:cluster/nodes/status", middleware.RequiredCluster, handler.Node.SetStatus)
}

shards := clusters.Group("/:cluster/shards")
Expand Down
Loading
Loading