From 5954a74372b0eca1ebecd820f1ef8b05c8772586 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 27 Feb 2026 09:46:03 +0000 Subject: [PATCH] fix: legacy master queue drain should never re-add entries --- packages/redis-worker/src/fair-queue/index.ts | 14 ++--- .../fair-queue/tests/tenantDispatch.test.ts | 57 +++++++++++++++++++ 2 files changed, 64 insertions(+), 7 deletions(-) diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index bfb60b6c55..cf9d7d6197 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -1763,7 +1763,7 @@ return (#ARGV - 1) / 3 `, }); - // Update master queue if queue is empty (legacy, used for drain) + // Remove queue from legacy master queue if empty (drain-only, never re-adds) this.redis.defineCommand("updateMasterQueueIfEmpty", { numberOfKeys: 2, lua: ` @@ -1775,13 +1775,13 @@ local count = redis.call('ZCARD', queueKey) if count == 0 then redis.call('ZREM', masterQueueKey, queueId) return 1 -else - local oldest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') - if #oldest >= 2 then - redis.call('ZADD', masterQueueKey, oldest[2], queueId) - end - return 0 end + +-- Queue still has messages but don't re-add to legacy master queue. +-- New enqueues go through the V2 dispatch path, so we only drain here. +-- Just remove it so it doesn't linger. +redis.call('ZREM', masterQueueKey, queueId) +return 0 `, }); diff --git a/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts b/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts index feb1c93a0d..b26fb98119 100644 --- a/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts @@ -456,6 +456,63 @@ describe("Two-Level Tenant Dispatch", () => { await redis.quit(); } ); + + redisTest( + "should not re-populate legacy master queue when completing messages", + { timeout: 20000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + + // Simulate pre-deploy state: queue with 2 messages in old master queue + const queueId = "tenant:t1:queue:legacy-noreinsert"; + const queueKey = keys.queueKey(queueId); + const queueItemsKey = keys.queueItemsKey(queueId); + const masterQueueKey = keys.masterQueueKey(0); + const timestamp = Date.now(); + + for (let i = 0; i < 2; i++) { + const msg: StoredMessage = { + id: `legacy-msg-${i}`, + queueId, + tenantId: "t1", + payload: { value: `msg-${i}` }, + timestamp: timestamp + i, + attempt: 1, + }; + await redis.zadd(queueKey, timestamp + i, `legacy-msg-${i}`); + await redis.hset(queueItemsKey, `legacy-msg-${i}`, JSON.stringify(msg)); + } + await redis.zadd(masterQueueKey, timestamp, queueId); + + const processed: string[] = []; + const helper = new TestHelper(redisOptions, keys); + + helper.onMessage(async (ctx) => { + processed.push(ctx.message.payload.value); + await ctx.complete(); + + // After completing the first message, the queue still has 1 message + // The legacy master queue should NOT be re-populated + if (processed.length === 1) { + const masterCount = await redis.zcard(masterQueueKey); + // Should be 0 (drained) not 1 (re-added) + expect(masterCount).toBe(0); + } + }); + helper.start(); + + await waitFor(() => processed.length === 2, 10000); + expect(processed).toHaveLength(2); + + // Final check: master queue should be completely empty + const masterFinal = await redis.zcard(masterQueueKey); + expect(masterFinal).toBe(0); + + await helper.close(); + await redis.quit(); + } + ); }); describe("DRR selectQueuesFromDispatch", () => {