Skip to content

Commit b65f035

Browse files
authored
Merge branch 'main' into dependabot/go_modules/all-go-7de9e40c71
2 parents ebebcd0 + c753c0b commit b65f035

6 files changed

Lines changed: 129 additions & 13 deletions

File tree

pkg/raft/node.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,18 @@ func (n *Node) NodeID() string {
257257
return n.config.NodeID
258258
}
259259

260+
// LeaderID returns the server ID of the current cluster leader.
261+
// Returns an empty string if the receiver is nil, raft is uninitialized, or no
262+
// leader has been elected yet. The value may be momentarily stale between raft
263+
// leadership changes; callers that need a strong guarantee should cross-check
264+
// with HasQuorum.
265+
func (n *Node) LeaderID() string {
266+
if n == nil || n.raft == nil {
267+
return ""
268+
}
269+
return n.leaderID()
270+
}
271+
260272
func (n *Node) leaderID() string {
261273
_, id := n.raft.LeaderWithID()
262274
return string(id)

pkg/rpc/server/http.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,16 @@ func RegisterCustomHTTPEndpoints(mux *http.ServeMux, s store.Store, pm p2p.P2PRP
139139
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
140140
return
141141
}
142+
leaderID := raftNode.LeaderID()
143+
isLeader := raftNode.IsLeader()
144+
if leaderID != "" {
145+
isLeader = leaderID == raftNode.NodeID()
146+
}
142147
rsp := struct {
143148
IsLeader bool `json:"is_leader"`
144149
NodeID string `json:"node_id"`
145150
}{
146-
IsLeader: raftNode.IsLeader(),
151+
IsLeader: isLeader,
147152
NodeID: raftNode.NodeID(),
148153
}
149154
w.Header().Set("Content-Type", "application/json")

pkg/rpc/server/http_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package server
22

33
import (
4+
"encoding/json"
45
"io"
56
"net/http"
67
"net/http/httptest"
@@ -45,6 +46,103 @@ func TestRegisterCustomHTTPEndpoints(t *testing.T) {
4546
mockStore.AssertExpectations(t)
4647
}
4748

49+
type testRaftNodeSource struct {
50+
isLeader bool
51+
leaderID string
52+
nodeID string
53+
}
54+
55+
func (t testRaftNodeSource) IsLeader() bool {
56+
return t.isLeader
57+
}
58+
59+
func (t testRaftNodeSource) LeaderID() string {
60+
return t.leaderID
61+
}
62+
63+
func (t testRaftNodeSource) NodeID() string {
64+
return t.nodeID
65+
}
66+
67+
func TestRegisterCustomHTTPEndpoints_RaftNodeStatus(t *testing.T) {
68+
type bodyShape struct {
69+
IsLeader bool `json:"is_leader"`
70+
NodeID string `json:"node_id"`
71+
}
72+
73+
cases := []struct {
74+
name string
75+
node testRaftNodeSource
76+
method string
77+
wantStatus int
78+
wantIsLeader bool
79+
wantNodeID string
80+
skipBodyDecode bool
81+
}{
82+
{
83+
// leaderID == nodeID: handler derives is_leader=true from LeaderID(),
84+
// regardless of the IsLeader() field on testRaftNodeSource.
85+
name: "leader matches — is_leader true",
86+
node: testRaftNodeSource{leaderID: "node-a", nodeID: "node-a"},
87+
method: http.MethodGet,
88+
wantStatus: http.StatusOK,
89+
wantIsLeader: true,
90+
wantNodeID: "node-a",
91+
},
92+
{
93+
// leaderID != nodeID: handler derives is_leader=false.
94+
name: "leader differs — is_leader false",
95+
node: testRaftNodeSource{leaderID: "node-b", nodeID: "node-a"},
96+
method: http.MethodGet,
97+
wantStatus: http.StatusOK,
98+
wantIsLeader: false,
99+
wantNodeID: "node-a",
100+
},
101+
{
102+
// empty leaderID: fallback — is_leader=false (no elected leader known).
103+
name: "empty leaderID fallback — is_leader false",
104+
node: testRaftNodeSource{leaderID: "", nodeID: "node-a"},
105+
method: http.MethodGet,
106+
wantStatus: http.StatusOK,
107+
wantIsLeader: false,
108+
wantNodeID: "node-a",
109+
},
110+
{
111+
name: "non-GET method — 405",
112+
node: testRaftNodeSource{},
113+
method: http.MethodPost,
114+
wantStatus: http.StatusMethodNotAllowed,
115+
skipBodyDecode: true,
116+
},
117+
}
118+
119+
for _, tc := range cases {
120+
t.Run(tc.name, func(t *testing.T) {
121+
mux := http.NewServeMux()
122+
RegisterCustomHTTPEndpoints(mux, nil, nil, config.DefaultConfig(), nil, zerolog.Nop(), tc.node)
123+
124+
ts := httptest.NewServer(mux)
125+
t.Cleanup(ts.Close)
126+
127+
req, err := http.NewRequest(tc.method, ts.URL+"/raft/node", nil)
128+
require.NoError(t, err)
129+
resp, err := http.DefaultClient.Do(req) //nolint:gosec // test-only request to httptest server
130+
require.NoError(t, err)
131+
t.Cleanup(func() { _ = resp.Body.Close() })
132+
133+
require.Equal(t, tc.wantStatus, resp.StatusCode)
134+
if tc.skipBodyDecode {
135+
return
136+
}
137+
138+
var body bodyShape
139+
require.NoError(t, json.NewDecoder(resp.Body).Decode(&body))
140+
assert.Equal(t, tc.wantIsLeader, body.IsLeader)
141+
assert.Equal(t, tc.wantNodeID, body.NodeID)
142+
})
143+
}
144+
}
145+
48146
func TestHealthReady_aggregatorBlockDelay(t *testing.T) {
49147
logger := zerolog.Nop()
50148

pkg/rpc/server/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,7 @@ func (p *P2PServer) GetNetInfo(
368368

369369
type RaftNodeSource interface {
370370
IsLeader() bool
371+
LeaderID() string
371372
NodeID() string
372373
}
373374

pkg/signer/aws/signer_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,20 +154,20 @@ func TestSign_RetryBehavior(t *testing.T) {
154154
t.Run(name, func(t *testing.T) {
155155
_, der := generateTestEd25519DER(t)
156156

157-
var calls int32
157+
var calls atomic.Int32
158158
signer := newTestSigner(t, &mockKMSClient{
159159
keyID: awsTestKeyID,
160160
pubKeyDER: der,
161161
signFn: func(_ context.Context, _ *kms.SignInput) (*kms.SignOutput, error) {
162-
atomic.AddInt32(&calls, 1)
162+
calls.Add(1)
163163
return nil, spec.signErr
164164
},
165165
}, spec.opts)
166166

167167
_, err := signer.Sign(t.Context(), []byte("hello world"))
168168
require.Error(t, err)
169169
assert.Contains(t, err.Error(), spec.errSubstr)
170-
assert.Equal(t, spec.expectedCall, atomic.LoadInt32(&calls))
170+
assert.Equal(t, spec.expectedCall, calls.Load())
171171
})
172172
}
173173
}

pkg/signer/gcp/signer_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -164,20 +164,20 @@ func TestSign_RetryBehavior(t *testing.T) {
164164
t.Run(name, func(t *testing.T) {
165165
_, publicKeyPEM := generateTestEd25519PEM(t)
166166

167-
var calls int32
167+
var calls atomic.Int32
168168
signer := newTestSigner(t, &mockKMSClient{
169169
keyID: myKeyID,
170170
publicKeyPEM: publicKeyPEM,
171171
signFn: func(_ context.Context, _ *kmspb.AsymmetricSignRequest) (*kmspb.AsymmetricSignResponse, error) {
172-
atomic.AddInt32(&calls, 1)
172+
calls.Add(1)
173173
return nil, spec.signErr
174174
},
175175
}, spec.opts)
176176

177177
_, err := signer.Sign(t.Context(), []byte("hello world"))
178178
require.Error(t, err)
179179
assert.Contains(t, err.Error(), spec.errSubstr)
180-
assert.Equal(t, spec.expectedCall, atomic.LoadInt32(&calls))
180+
assert.Equal(t, spec.expectedCall, calls.Load())
181181
})
182182
}
183183
}
@@ -228,34 +228,34 @@ func TestSign_IntegrityFailures_RetryAndFail(t *testing.T) {
228228
t.Run(name, func(t *testing.T) {
229229
_, publicKeyPEM := generateTestEd25519PEM(t)
230230

231-
var calls int32
231+
var calls atomic.Int32
232232
signer := newTestSigner(t, &mockKMSClient{
233233
keyID: myKeyID,
234234
publicKeyPEM: publicKeyPEM,
235235
signFn: func(_ context.Context, _ *kmspb.AsymmetricSignRequest) (*kmspb.AsymmetricSignResponse, error) {
236-
atomic.AddInt32(&calls, 1)
236+
calls.Add(1)
237237
return spec.responseFn(), nil
238238
},
239239
}, &Options{MaxRetries: 1})
240240

241241
_, err := signer.Sign(t.Context(), []byte("hello world"))
242242
require.Error(t, err)
243243
assert.Contains(t, err.Error(), spec.expectedErrSubstr)
244-
assert.Equal(t, int32(2), atomic.LoadInt32(&calls), "integrity failures should be retried")
244+
assert.Equal(t, int32(2), calls.Load(), "integrity failures should be retried")
245245
})
246246
}
247247
}
248248

249249
func TestSign_IntegrityCheckRecoversOnRetry(t *testing.T) {
250250
_, publicKeyPEM := generateTestEd25519PEM(t)
251251

252-
var calls int32
252+
var calls atomic.Int32
253253
expectedSig := []byte("valid-signature")
254254
mock := &mockKMSClient{
255255
keyID: myKeyID,
256256
publicKeyPEM: publicKeyPEM,
257257
signFn: func(_ context.Context, _ *kmspb.AsymmetricSignRequest) (*kmspb.AsymmetricSignResponse, error) {
258-
attempt := atomic.AddInt32(&calls, 1)
258+
attempt := calls.Add(1)
259259
if attempt == 1 {
260260
return &kmspb.AsymmetricSignResponse{
261261
Name: myKeyID,
@@ -278,7 +278,7 @@ func TestSign_IntegrityCheckRecoversOnRetry(t *testing.T) {
278278
got, err := s.Sign(t.Context(), []byte("hello world"))
279279
require.NoError(t, err)
280280
assert.Equal(t, expectedSig, got)
281-
assert.Equal(t, int32(2), atomic.LoadInt32(&calls), "second attempt should succeed")
281+
assert.Equal(t, int32(2), calls.Load(), "second attempt should succeed")
282282
}
283283

284284
func TestRetryBackoff_Capped(t *testing.T) {

0 commit comments

Comments
 (0)