-
Notifications
You must be signed in to change notification settings - Fork 2
Ensure shard router commits to all groups #243
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,8 +5,6 @@ import ( | |
| "log" | ||
| "net" | ||
| "strconv" | ||
| "sync" | ||
| "sync/atomic" | ||
| "testing" | ||
| "time" | ||
|
|
||
|
|
@@ -19,7 +17,7 @@ import ( | |
| "github.com/cockroachdb/errors" | ||
| "github.com/hashicorp/go-hclog" | ||
| "github.com/hashicorp/raft" | ||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
| "google.golang.org/grpc" | ||
| "google.golang.org/grpc/credentials/insecure" | ||
| ) | ||
|
|
@@ -42,57 +40,7 @@ func shutdown(nodes []Node) { | |
| } | ||
| } | ||
|
|
||
| type portsAdress struct { | ||
| grpc int | ||
| raft int | ||
| redis int | ||
| dynamo int | ||
| grpcAddress string | ||
| raftAddress string | ||
| redisAddress string | ||
| dynamoAddress string | ||
| } | ||
|
|
||
| const ( | ||
| // raft and the grpc requested by the client use grpc and are received on the same port | ||
| grpcPort = 50000 | ||
| raftPort = 50000 | ||
|
|
||
| redisPort = 63790 | ||
| dynamoPort = 28000 | ||
| ) | ||
|
|
||
| var mu sync.Mutex | ||
| var portGrpc atomic.Int32 | ||
| var portRaft atomic.Int32 | ||
| var portRedis atomic.Int32 | ||
| var portDynamo atomic.Int32 | ||
|
|
||
| func init() { | ||
| portGrpc.Store(raftPort) | ||
| portRaft.Store(grpcPort) | ||
| portRedis.Store(redisPort) | ||
| portDynamo.Store(dynamoPort) | ||
| } | ||
|
|
||
| func portAssigner() portsAdress { | ||
| mu.Lock() | ||
| defer mu.Unlock() | ||
| gp := portGrpc.Add(1) | ||
| rp := portRaft.Add(1) | ||
| rd := portRedis.Add(1) | ||
| dn := portDynamo.Add(1) | ||
| return portsAdress{ | ||
| grpc: int(gp), | ||
| raft: int(rp), | ||
| redis: int(rd), | ||
| dynamo: int(dn), | ||
| grpcAddress: net.JoinHostPort("localhost", strconv.Itoa(int(gp))), | ||
| raftAddress: net.JoinHostPort("localhost", strconv.Itoa(int(rp))), | ||
| redisAddress: net.JoinHostPort("localhost", strconv.Itoa(int(rd))), | ||
| dynamoAddress: net.JoinHostPort("localhost", strconv.Itoa(int(dn))), | ||
| } | ||
| } | ||
| // Node groups the servers and addresses used in tests. | ||
|
|
||
| type Node struct { | ||
| grpcAddress string | ||
|
|
@@ -131,43 +79,38 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) { | |
| waitInterval = 100 * time.Millisecond | ||
| ) | ||
|
|
||
| cfg := raft.Configuration{} | ||
| ports := make([]portsAdress, n) | ||
|
|
||
| ctx := context.Background() | ||
| var lc net.ListenConfig | ||
|
|
||
| // port assign | ||
| // allocate listeners for gRPC/raft in advance so ports are reserved | ||
| grpcListeners := make([]net.Listener, n) | ||
| cfg := raft.Configuration{} | ||
| for i := 0; i < n; i++ { | ||
| ports[i] = portAssigner() | ||
| } | ||
| l, err := net.Listen("tcp", "127.0.0.1:0") | ||
| require.NoError(t, err) | ||
| grpcListeners[i] = l | ||
| addr := l.Addr().String() | ||
| grpcAdders = append(grpcAdders, addr) | ||
|
|
||
| // build raft node config | ||
| for i := 0; i < n; i++ { | ||
| var suffrage raft.ServerSuffrage | ||
| if i == 0 { | ||
| suffrage = raft.Voter | ||
| } else { | ||
| suffrage = raft.Nonvoter | ||
| } | ||
|
|
||
| server := raft.Server{ | ||
| cfg.Servers = append(cfg.Servers, raft.Server{ | ||
| Suffrage: suffrage, | ||
| ID: raft.ServerID(strconv.Itoa(i)), | ||
| Address: raft.ServerAddress(ports[i].raftAddress), | ||
| } | ||
| cfg.Servers = append(cfg.Servers, server) | ||
| Address: raft.ServerAddress(addr), | ||
| }) | ||
| } | ||
|
|
||
| for i := 0; i < n; i++ { | ||
| st := store.NewRbMemoryStore() | ||
| trxSt := store.NewMemoryStoreDefaultTTL() | ||
| fsm := kv.NewKvFSM(st, trxSt) | ||
|
|
||
| port := ports[i] | ||
|
|
||
| r, tm, err := newRaft(strconv.Itoa(i), port.raftAddress, fsm, i == 0, cfg) | ||
| assert.NoError(t, err) | ||
| r, tm, err := newRaft(strconv.Itoa(i), grpcAdders[i], fsm, i == 0, cfg) | ||
| require.NoError(t, err) | ||
|
|
||
| s := grpc.NewServer() | ||
| trx := kv.NewTransaction(r) | ||
|
|
@@ -181,34 +124,32 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) { | |
| leaderhealth.Setup(r, s, []string{"Example"}) | ||
| raftadmin.Register(s, r) | ||
|
|
||
| grpcSock, err := lc.Listen(ctx, "tcp", port.grpcAddress) | ||
| assert.NoError(t, err) | ||
|
|
||
| grpcAdders = append(grpcAdders, port.grpcAddress) | ||
| redisAdders = append(redisAdders, port.redisAddress) | ||
| go func() { | ||
| assert.NoError(t, s.Serve(grpcSock)) | ||
| }() | ||
| go func(l net.Listener) { | ||
| require.NoError(t, s.Serve(l)) | ||
| }(grpcListeners[i]) | ||
|
|
||
| l, err := lc.Listen(ctx, "tcp", port.redisAddress) | ||
| assert.NoError(t, err) | ||
| l, err := net.Listen("tcp", "127.0.0.1:0") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚫 [golangci] reported by reviewdog 🐶 |
||
| require.NoError(t, err) | ||
| redisAddr := l.Addr().String() | ||
| redisAdders = append(redisAdders, redisAddr) | ||
| rd := NewRedisServer(l, st, coordinator) | ||
| go func() { | ||
| assert.NoError(t, rd.Run()) | ||
| require.NoError(t, rd.Run()) | ||
| }() | ||
|
|
||
| dl, err := lc.Listen(ctx, "tcp", port.dynamoAddress) | ||
| assert.NoError(t, err) | ||
| dl, err := net.Listen("tcp", "127.0.0.1:0") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚫 [golangci] reported by reviewdog 🐶 |
||
| require.NoError(t, err) | ||
| dynamoAddr := dl.Addr().String() | ||
| ds := NewDynamoDBServer(dl, st, coordinator) | ||
| go func() { | ||
| assert.NoError(t, ds.Run()) | ||
| require.NoError(t, ds.Run()) | ||
| }() | ||
|
|
||
| nodes = append(nodes, newNode( | ||
| port.grpcAddress, | ||
| port.raftAddress, | ||
| port.redisAddress, | ||
| port.dynamoAddress, | ||
| grpcAdders[i], | ||
| grpcAdders[i], | ||
| redisAddr, | ||
| dynamoAddr, | ||
| r, | ||
| tm, | ||
| s, | ||
|
|
@@ -219,7 +160,7 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) { | |
|
|
||
| d := &net.Dialer{Timeout: time.Second} | ||
| for _, n := range nodes { | ||
| assert.Eventually(t, func() bool { | ||
| require.Eventually(t, func() bool { | ||
| conn, err := d.DialContext(ctx, "tcp", n.grpcAddress) | ||
| if err != nil { | ||
| return false | ||
|
|
@@ -239,7 +180,7 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) { | |
| }, waitTimeout, waitInterval) | ||
| } | ||
|
|
||
| assert.Eventually(t, func() bool { | ||
| require.Eventually(t, func() bool { | ||
| return nodes[0].raft.State() == raft.Leader | ||
| }, waitTimeout, waitInterval) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,20 +59,25 @@ func (s *ShardRouter) process(reqs []*pb.Request, fn func(*routerGroup, []*pb.Re | |
| } | ||
|
|
||
| var max uint64 | ||
| var errs error | ||
| for gid, rs := range grouped { | ||
| g, ok := s.getGroup(gid) | ||
| if !ok { | ||
| return nil, errors.Wrapf(ErrInvalidRequest, "unknown group %d", gid) | ||
| err := errors.Wrapf(ErrInvalidRequest, "unknown group %d", gid) | ||
| errs = errors.CombineErrors(errs, err) | ||
| continue | ||
| } | ||
| r, err := fn(g, rs) | ||
| if err != nil { | ||
| return nil, errors.WithStack(err) | ||
| errs = errors.CombineErrors(errs, errors.WithStack(err)) | ||
| continue | ||
| } | ||
|
Comment on lines
71
to
74
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When an error occurs during the |
||
| if r.CommitIndex > max { | ||
| max = r.CommitIndex | ||
| } | ||
| } | ||
| return &TransactionResponse{CommitIndex: max}, nil | ||
| resp := &TransactionResponse{CommitIndex: max} | ||
| return resp, errs | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚫 [golangci] reported by reviewdog 🐶 |
||
| } | ||
|
|
||
| func (s *ShardRouter) getGroup(id uint64) (*routerGroup, bool) { | ||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚫 [golangci] reported by reviewdog 🐶 elastickv/kv/shard_router_test.go Line 17 in ce94d05
|
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -3,6 +3,7 @@ package kv | |||||||||||||
| import ( | ||||||||||||||
| "context" | ||||||||||||||
| "fmt" | ||||||||||||||
| "strings" | ||||||||||||||
| "testing" | ||||||||||||||
| "time" | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -253,3 +254,32 @@ func TestShardRouterCommitFailure(t *testing.T) { | |||||||||||||
| t.Fatalf("unexpected abort on successful group") | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| func TestShardRouterCommitMultipleFailures(t *testing.T) { | ||||||||||||||
| e := distribution.NewEngine() | ||||||||||||||
| e.UpdateRoute([]byte("a"), []byte("m"), 1) | ||||||||||||||
| e.UpdateRoute([]byte("m"), nil, 2) | ||||||||||||||
|
|
||||||||||||||
| router := NewShardRouter(e) | ||||||||||||||
|
|
||||||||||||||
| fail1 := &fakeTM{commitErr: true} | ||||||||||||||
| fail2 := &fakeTM{commitErr: true} | ||||||||||||||
| router.Register(1, fail1, nil) | ||||||||||||||
| router.Register(2, fail2, nil) | ||||||||||||||
|
|
||||||||||||||
| reqs := []*pb.Request{ | ||||||||||||||
| {IsTxn: false, Phase: pb.Phase_NONE, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("b"), Value: []byte("v1")}}}, | ||||||||||||||
| {IsTxn: false, Phase: pb.Phase_NONE, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("x"), Value: []byte("v2")}}}, | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| _, err := router.Commit(reqs) | ||||||||||||||
| if err == nil { | ||||||||||||||
| t.Fatalf("expected error") | ||||||||||||||
| } | ||||||||||||||
| if c := strings.Count(fmt.Sprintf("%+v", err), "commit fail"); c < 2 { | ||||||||||||||
| t.Fatalf("expected combined errors, got %d: %+v", c, err) | ||||||||||||||
| } | ||||||||||||||
|
Comment on lines
+279
to
+281
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check is a bit brittle because it relies on the verbose string formatting of the error (
Suggested change
|
||||||||||||||
| if fail1.commitCalls == 0 || fail2.commitCalls == 0 { | ||||||||||||||
| t.Fatalf("expected commits on both groups") | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚫 [golangci] reported by reviewdog 🐶
net.Listen must not be called. use (*net.ListenConfig).Listen (noctx)