Skip to content

Commit 5954a74

Browse files
committed
fix: legacy master queue drain should never re-add entries
1 parent 719a44d commit 5954a74

File tree

2 files changed

+64
-7
lines changed

2 files changed

+64
-7
lines changed

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1763,7 +1763,7 @@ return (#ARGV - 1) / 3
17631763
`,
17641764
});
17651765

1766-
// Update master queue if queue is empty (legacy, used for drain)
1766+
// Remove queue from legacy master queue if empty (drain-only, never re-adds)
17671767
this.redis.defineCommand("updateMasterQueueIfEmpty", {
17681768
numberOfKeys: 2,
17691769
lua: `
@@ -1775,13 +1775,13 @@ local count = redis.call('ZCARD', queueKey)
17751775
if count == 0 then
17761776
redis.call('ZREM', masterQueueKey, queueId)
17771777
return 1
1778-
else
1779-
local oldest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES')
1780-
if #oldest >= 2 then
1781-
redis.call('ZADD', masterQueueKey, oldest[2], queueId)
1782-
end
1783-
return 0
17841778
end
1779+
1780+
-- Queue still has messages but don't re-add to legacy master queue.
1781+
-- New enqueues go through the V2 dispatch path, so we only drain here.
1782+
-- Just remove it so it doesn't linger.
1783+
redis.call('ZREM', masterQueueKey, queueId)
1784+
return 0
17851785
`,
17861786
});
17871787

packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,63 @@ describe("Two-Level Tenant Dispatch", () => {
456456
await redis.quit();
457457
}
458458
);
459+
460+
redisTest(
461+
"should not re-populate legacy master queue when completing messages",
462+
{ timeout: 20000 },
463+
async ({ redisOptions }) => {
464+
const keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
465+
const redis = createRedisClient(redisOptions);
466+
467+
// Simulate pre-deploy state: queue with 2 messages in old master queue
468+
const queueId = "tenant:t1:queue:legacy-noreinsert";
469+
const queueKey = keys.queueKey(queueId);
470+
const queueItemsKey = keys.queueItemsKey(queueId);
471+
const masterQueueKey = keys.masterQueueKey(0);
472+
const timestamp = Date.now();
473+
474+
for (let i = 0; i < 2; i++) {
475+
const msg: StoredMessage<TestPayload> = {
476+
id: `legacy-msg-${i}`,
477+
queueId,
478+
tenantId: "t1",
479+
payload: { value: `msg-${i}` },
480+
timestamp: timestamp + i,
481+
attempt: 1,
482+
};
483+
await redis.zadd(queueKey, timestamp + i, `legacy-msg-${i}`);
484+
await redis.hset(queueItemsKey, `legacy-msg-${i}`, JSON.stringify(msg));
485+
}
486+
await redis.zadd(masterQueueKey, timestamp, queueId);
487+
488+
const processed: string[] = [];
489+
const helper = new TestHelper(redisOptions, keys);
490+
491+
helper.onMessage(async (ctx) => {
492+
processed.push(ctx.message.payload.value);
493+
await ctx.complete();
494+
495+
// After completing the first message, the queue still has 1 message
496+
// The legacy master queue should NOT be re-populated
497+
if (processed.length === 1) {
498+
const masterCount = await redis.zcard(masterQueueKey);
499+
// Should be 0 (drained) not 1 (re-added)
500+
expect(masterCount).toBe(0);
501+
}
502+
});
503+
helper.start();
504+
505+
await waitFor(() => processed.length === 2, 10000);
506+
expect(processed).toHaveLength(2);
507+
508+
// Final check: master queue should be completely empty
509+
const masterFinal = await redis.zcard(masterQueueKey);
510+
expect(masterFinal).toBe(0);
511+
512+
await helper.close();
513+
await redis.quit();
514+
}
515+
);
459516
});
460517

461518
describe("DRR selectQueuesFromDispatch", () => {

0 commit comments

Comments
 (0)