-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlockfile_reader_test.go
More file actions
310 lines (256 loc) · 7.73 KB
/
lockfile_reader_test.go
File metadata and controls
310 lines (256 loc) · 7.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
package bbolt
import (
"os"
"path/filepath"
"sync/atomic"
"testing"
)
// openTestLockFileN creates a temporary lock file with a custom maxReaders
// count suitable for testing. Uses t.Cleanup for automatic teardown.
func openTestLockFileN(t *testing.T, maxReaders int) *LockFile {
t.Helper()
dir := t.TempDir()
path := filepath.Join(dir, "test.db-lock")
lf, err := openLockFile(path, maxReaders)
if err != nil {
t.Fatalf("openLockFile: %v", err)
}
t.Cleanup(func() { lf.Close() })
return lf
}
func TestAcquireReleaseReaderSlot(t *testing.T) {
lf := openTestLockFileN(t, 4)
pid := uint32(os.Getpid())
// Acquire a slot.
slot, err := lf.AcquireReaderSlot()
if err != nil {
t.Fatalf("AcquireReaderSlot: %v", err)
}
if slot < 0 || slot >= 4 {
t.Fatalf("unexpected slot index: %d", slot)
}
// Verify pid is set.
got := atomic.LoadUint32(lf.slotPidPtr(slot))
if got != pid {
t.Errorf("slot pid = %d, want %d", got, pid)
}
// Verify txid is idle.
txid := atomic.LoadUint64(lf.slotTxidPtr(slot))
if txid != txidIdle {
t.Errorf("slot txid = %d, want txidIdle (%d)", txid, txidIdle)
}
// Release the slot.
lf.ReleaseReaderSlot(slot)
// Verify pid is cleared.
got = atomic.LoadUint32(lf.slotPidPtr(slot))
if got != 0 {
t.Errorf("after release: slot pid = %d, want 0", got)
}
}
func TestMultipleReaderSlots(t *testing.T) {
lf := openTestLockFileN(t, 8)
pid := uint32(os.Getpid())
slots := make([]int, 5)
// Acquire 5 slots.
for i := range slots {
slot, err := lf.AcquireReaderSlot()
if err != nil {
t.Fatalf("AcquireReaderSlot[%d]: %v", i, err)
}
slots[i] = slot
}
// Verify all have the correct pid and unique indices.
seen := make(map[int]bool)
for i, slot := range slots {
if seen[slot] {
t.Fatalf("duplicate slot index: %d", slot)
}
seen[slot] = true
got := atomic.LoadUint32(lf.slotPidPtr(slot))
if got != pid {
t.Errorf("slot[%d] pid = %d, want %d", i, got, pid)
}
}
// Release all.
for _, slot := range slots {
lf.ReleaseReaderSlot(slot)
}
}
func TestReaderSlotFull(t *testing.T) {
maxReaders := 3
lf := openTestLockFileN(t, maxReaders)
// Fill all slots.
for i := range maxReaders {
_, err := lf.AcquireReaderSlot()
if err != nil {
t.Fatalf("AcquireReaderSlot[%d]: %v", i, err)
}
}
// Next acquire should fail.
_, err := lf.AcquireReaderSlot()
if err != ErrReaderTableFull {
t.Fatalf("expected ErrReaderTableFull, got: %v", err)
}
}
func TestSetClearTxid(t *testing.T) {
lf := openTestLockFileN(t, 4)
slot, err := lf.AcquireReaderSlot()
if err != nil {
t.Fatalf("AcquireReaderSlot: %v", err)
}
defer lf.ReleaseReaderSlot(slot)
// Initially idle.
txid := atomic.LoadUint64(lf.slotTxidPtr(slot))
if txid != txidIdle {
t.Errorf("initial txid = %d, want txidIdle", txid)
}
// Set a txid.
lf.SetSlotTxid(slot, 42)
txid = atomic.LoadUint64(lf.slotTxidPtr(slot))
if txid != 42 {
t.Errorf("after SetSlotTxid: txid = %d, want 42", txid)
}
// Clear txid back to idle.
lf.ClearSlotTxid(slot)
txid = atomic.LoadUint64(lf.slotTxidPtr(slot))
if txid != txidIdle {
t.Errorf("after ClearSlotTxid: txid = %d, want txidIdle", txid)
}
}
func TestOldestReaderTxid(t *testing.T) {
lf := openTestLockFileN(t, 8)
// No readers: should return fallback.
fallback := uint64(100)
got := lf.OldestReaderTxid(fallback)
if got != fallback {
t.Errorf("no readers: OldestReaderTxid = %d, want %d", got, fallback)
}
// Acquire slots and set various txids.
slot1, _ := lf.AcquireReaderSlot()
slot2, _ := lf.AcquireReaderSlot()
slot3, _ := lf.AcquireReaderSlot()
defer lf.ReleaseReaderSlot(slot1)
defer lf.ReleaseReaderSlot(slot2)
defer lf.ReleaseReaderSlot(slot3)
// Slot1: idle (no active tx)
// Slot2: txid 50
// Slot3: txid 30
lf.SetSlotTxid(slot2, 50)
lf.SetSlotTxid(slot3, 30)
got = lf.OldestReaderTxid(fallback)
if got != 30 {
t.Errorf("OldestReaderTxid = %d, want 30", got)
}
// Clear slot3, now slot2 is the only active reader.
lf.ClearSlotTxid(slot3)
got = lf.OldestReaderTxid(fallback)
if got != 50 {
t.Errorf("after clearing slot3: OldestReaderTxid = %d, want 50", got)
}
// Clear slot2, no active readers. Should return fallback.
lf.ClearSlotTxid(slot2)
got = lf.OldestReaderTxid(fallback)
if got != fallback {
t.Errorf("all idle: OldestReaderTxid = %d, want %d", got, fallback)
}
}
func TestOldestReaderTxidWithZeroTxid(t *testing.T) {
lf := openTestLockFileN(t, 4)
slot, _ := lf.AcquireReaderSlot()
defer lf.ReleaseReaderSlot(slot)
// A reader at txid 0 is valid and should be the minimum.
lf.SetSlotTxid(slot, 0)
got := lf.OldestReaderTxid(100)
if got != 0 {
t.Errorf("OldestReaderTxid = %d, want 0", got)
}
}
func TestReaderSlotReuse(t *testing.T) {
lf := openTestLockFileN(t, 4)
// Acquire and release a slot.
slot1, _ := lf.AcquireReaderSlot()
lf.ReleaseReaderSlot(slot1)
// Acquire again: should be able to reuse the same slot.
slot2, err := lf.AcquireReaderSlot()
if err != nil {
t.Fatalf("AcquireReaderSlot after release: %v", err)
}
// The slot index may or may not be the same, but we should succeed.
_ = slot2
lf.ReleaseReaderSlot(slot2)
}
func TestClearStaleReaders(t *testing.T) {
lf := openTestLockFileN(t, 8)
// We can't easily spawn and kill a process in a unit test.
// Instead, simulate a stale reader by directly writing a bogus pid
// into a slot. PID 1 (init/launchd) will always be alive so we
// use a very large pid that almost certainly does not exist.
//
// NOTE: This test checks the mechanics of ClearStaleReaders but
// cannot fully verify cross-process stale detection without a
// subprocess harness (see Step 5 multi-process tests).
stalePid := uint32(4000000) // unlikely to exist
staleSlot := 2
// Manually write a stale reader into the slot.
atomic.StoreUint32(lf.slotPidPtr(staleSlot), stalePid)
atomic.StoreUint64(lf.slotTxidPtr(staleSlot), 10)
// Verify it's there.
if atomic.LoadUint32(lf.slotPidPtr(staleSlot)) != stalePid {
t.Fatal("failed to set stale pid")
}
// Clear stale readers.
cleared := lf.ClearStaleReaders()
if cleared < 1 {
t.Errorf("ClearStaleReaders = %d, want >= 1", cleared)
}
// Verify the stale slot was cleared.
pid := atomic.LoadUint32(lf.slotPidPtr(staleSlot))
if pid != 0 {
t.Errorf("stale slot pid = %d, want 0", pid)
}
txid := atomic.LoadUint64(lf.slotTxidPtr(staleSlot))
if txid != txidIdle {
t.Errorf("stale slot txid = %d, want txidIdle", txid)
}
}
func TestClearStaleReadersPreservesLiveSlots(t *testing.T) {
lf := openTestLockFileN(t, 8)
// Acquire a real slot (our process is alive).
slot, _ := lf.AcquireReaderSlot()
lf.SetSlotTxid(slot, 42)
// ClearStaleReaders should not touch our slot.
cleared := lf.ClearStaleReaders()
if cleared != 0 {
t.Errorf("ClearStaleReaders = %d, want 0 (no stale)", cleared)
}
pid := atomic.LoadUint32(lf.slotPidPtr(slot))
if pid != uint32(os.Getpid()) {
t.Errorf("live slot pid = %d, want %d", pid, os.Getpid())
}
txid := atomic.LoadUint64(lf.slotTxidPtr(slot))
if txid != 42 {
t.Errorf("live slot txid = %d, want 42", txid)
}
lf.ReleaseReaderSlot(slot)
}
func TestOldestReaderTxidClearsStale(t *testing.T) {
lf := openTestLockFileN(t, 8)
// Insert a stale reader with a very old txid.
stalePid := uint32(4000001)
atomic.StoreUint32(lf.slotPidPtr(0), stalePid)
atomic.StoreUint64(lf.slotTxidPtr(0), 5)
// Acquire a real slot with a higher txid.
slot, _ := lf.AcquireReaderSlot()
lf.SetSlotTxid(slot, 50)
defer lf.ReleaseReaderSlot(slot)
// OldestReaderTxid should clear the stale reader first, then return 50.
got := lf.OldestReaderTxid(100)
if got != 50 {
t.Errorf("OldestReaderTxid = %d, want 50 (stale reader should be cleared)", got)
}
// Verify the stale slot was cleared.
pid := atomic.LoadUint32(lf.slotPidPtr(0))
if pid != 0 {
t.Errorf("stale slot pid = %d after OldestReaderTxid, want 0", pid)
}
}