Skip to content

Commit af54cdf

Browse files
committed
fix: dispatch index shard must be tenant-based, not queue-based
1 parent b855e48 commit af54cdf

File tree

3 files changed

+110
-25
lines changed

3 files changed

+110
-25
lines changed

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -273,9 +273,9 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
273273
const timestamp = options.timestamp ?? Date.now();
274274
const queueKey = this.keys.queueKey(options.queueId);
275275
const queueItemsKey = this.keys.queueItemsKey(options.queueId);
276-
const shardId = this.masterQueue.getShardForQueue(options.queueId);
276+
const dispatchShardId = this.tenantDispatch.getShardForTenant(options.tenantId);
277277
const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(options.tenantId);
278-
const dispatchKey = this.keys.dispatchKey(shardId);
278+
const dispatchKey = this.keys.dispatchKey(dispatchShardId);
279279

280280
// Validate payload if schema provided and validation enabled
281281
if (this.validateOnEnqueue && this.payloadSchema) {
@@ -332,7 +332,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
332332
[FairQueueAttributes.QUEUE_ID]: options.queueId,
333333
[FairQueueAttributes.TENANT_ID]: options.tenantId,
334334
[FairQueueAttributes.MESSAGE_ID]: messageId,
335-
[FairQueueAttributes.SHARD_ID]: shardId.toString(),
335+
[FairQueueAttributes.SHARD_ID]: dispatchShardId.toString(),
336336
});
337337

338338
this.telemetry.recordEnqueue();
@@ -363,9 +363,9 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
363363
async (span) => {
364364
const queueKey = this.keys.queueKey(options.queueId);
365365
const queueItemsKey = this.keys.queueItemsKey(options.queueId);
366-
const shardId = this.masterQueue.getShardForQueue(options.queueId);
366+
const dispatchShardId = this.tenantDispatch.getShardForTenant(options.tenantId);
367367
const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(options.tenantId);
368-
const dispatchKey = this.keys.dispatchKey(shardId);
368+
const dispatchKey = this.keys.dispatchKey(dispatchShardId);
369369
const now = Date.now();
370370

371371
// Store queue descriptor
@@ -433,7 +433,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
433433
[FairQueueAttributes.QUEUE_ID]: options.queueId,
434434
[FairQueueAttributes.TENANT_ID]: options.tenantId,
435435
[FairQueueAttributes.MESSAGE_COUNT]: messageIds.length,
436-
[FairQueueAttributes.SHARD_ID]: shardId.toString(),
436+
[FairQueueAttributes.SHARD_ID]: dispatchShardId.toString(),
437437
});
438438

439439
this.telemetry.recordEnqueueBatch(messageIds.length);
@@ -1082,8 +1082,11 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
10821082
loopId: string,
10831083
queueId: string,
10841084
tenantId: string,
1085-
shardId: number
1085+
_consumerShardId: number
10861086
): Promise<number> {
1087+
// Dispatch shard is tenant-based (tenantId hash), not queue-based.
1088+
// In-flight/master queue shard is queue-based.
1089+
const dispatchShardId = this.tenantDispatch.getShardForTenant(tenantId);
10871090
const queueKey = this.keys.queueKey(queueId);
10881091
const queueItemsKey = this.keys.queueItemsKey(queueId);
10891092
const descriptor = this.queueDescriptorCache.get(queueId) ?? {
@@ -1126,7 +1129,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
11261129

11271130
if (claimedMessages.length === 0) {
11281131
// Queue is empty, update both old and new indexes and clean up caches
1129-
await this.#updateAllIndexesAfterDequeue(queueId, tenantId, shardId);
1132+
await this.#updateAllIndexesAfterDequeue(queueId, tenantId);
11301133
this.queueDescriptorCache.delete(queueId);
11311134
this.queueCooloffStates.delete(queueId);
11321135
return 0;
@@ -1145,7 +1148,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
11451148
// Release ALL remaining messages (from index i onward) back to queue
11461149
// This prevents messages from being stranded in the in-flight set
11471150
const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(tenantId);
1148-
const dispatchKey = this.keys.dispatchKey(shardId);
1151+
const dispatchKey = this.keys.dispatchKey(dispatchShardId);
11491152
await this.visibilityManager.releaseBatch(
11501153
claimedMessages.slice(i),
11511154
queueId,
@@ -1261,8 +1264,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
12611264
// Update both old and new indexes, clean up caches if queue is empty
12621265
const { queueEmpty } = await this.#updateAllIndexesAfterDequeue(
12631266
queueId,
1264-
descriptor.tenantId,
1265-
shardId
1267+
descriptor.tenantId
12661268
);
12671269
if (queueEmpty) {
12681270
this.queueDescriptorCache.delete(queueId);
@@ -1310,8 +1312,10 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
13101312
: { id: queueId, tenantId: this.keys.extractTenantId(queueId), metadata: {} };
13111313

13121314
// Release back to queue (visibility manager updates dispatch indexes atomically)
1315+
// Dispatch shard is tenant-based, not queue-based
1316+
const dispatchShardId = this.tenantDispatch.getShardForTenant(descriptor.tenantId);
13131317
const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId);
1314-
const dispatchKey = this.keys.dispatchKey(shardId);
1318+
const dispatchKey = this.keys.dispatchKey(dispatchShardId);
13151319
await this.visibilityManager.release(
13161320
messageId,
13171321
queueId,
@@ -1372,12 +1376,13 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
13721376
metadata: storedMessage.metadata ?? {},
13731377
};
13741378

1379+
const dispatchShardId = this.tenantDispatch.getShardForTenant(descriptor.tenantId);
13751380
await this.#handleMessageFailure(
13761381
storedMessage,
13771382
queueId,
13781383
queueKey,
13791384
queueItemsKey,
1380-
shardId,
1385+
dispatchShardId,
13811386
descriptor,
13821387
error
13831388
);
@@ -1392,7 +1397,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
13921397
queueId: string,
13931398
queueKey: string,
13941399
queueItemsKey: string,
1395-
shardId: number,
1400+
dispatchShardId: number,
13961401
descriptor: QueueDescriptor,
13971402
error?: Error
13981403
): Promise<void> {
@@ -1412,7 +1417,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
14121417
// Release with delay, passing the updated message data so the Lua script
14131418
// atomically writes the incremented attempt count when re-queuing.
14141419
const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId);
1415-
const dispatchKey = this.keys.dispatchKey(shardId);
1420+
const dispatchKey = this.keys.dispatchKey(dispatchShardId);
14161421
await this.visibilityManager.release(
14171422
storedMessage.id,
14181423
queueId,
@@ -1529,12 +1534,12 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
15291534
for (let shardId = 0; shardId < this.shardCount; shardId++) {
15301535
const reclaimedMessages = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => {
15311536
const tenantId = this.keys.extractTenantId(queueId);
1532-
const queueShardId = this.masterQueue.getShardForQueue(queueId);
1537+
const dispatchShardId = this.tenantDispatch.getShardForTenant(tenantId);
15331538
return {
15341539
queueKey: this.keys.queueKey(queueId),
15351540
queueItemsKey: this.keys.queueItemsKey(queueId),
15361541
tenantQueueIndexKey: this.keys.tenantQueueIndexKey(tenantId),
1537-
dispatchKey: this.keys.dispatchKey(queueShardId),
1542+
dispatchKey: this.keys.dispatchKey(dispatchShardId),
15381543
tenantId,
15391544
};
15401545
});
@@ -1645,13 +1650,14 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
16451650
*/
16461651
async #updateAllIndexesAfterDequeue(
16471652
queueId: string,
1648-
tenantId: string,
1649-
shardId: number
1653+
tenantId: string
16501654
): Promise<{ queueEmpty: boolean }> {
1655+
const queueShardId = this.masterQueue.getShardForQueue(queueId);
1656+
const dispatchShardId = this.tenantDispatch.getShardForTenant(tenantId);
16511657
const queueKey = this.keys.queueKey(queueId);
1652-
const masterQueueKey = this.keys.masterQueueKey(shardId);
1658+
const masterQueueKey = this.keys.masterQueueKey(queueShardId);
16531659
const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(tenantId);
1654-
const dispatchKey = this.keys.dispatchKey(shardId);
1660+
const dispatchKey = this.keys.dispatchKey(dispatchShardId);
16551661

16561662
// Update legacy master queue (drain path, no-op if queue not there)
16571663
const removedFromMaster = await this.redis.updateMasterQueueIfEmpty(

packages/redis-worker/src/fair-queue/tenantDispatch.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,12 @@ export class TenantDispatch {
4040
}
4141

4242
/**
43-
* Get the shard ID for a queue.
44-
* Uses the same jump consistent hash as MasterQueue for consistency.
43+
* Get the dispatch shard ID for a tenant.
44+
* Uses jump consistent hash on the tenant ID so each tenant
45+
* always maps to exactly one dispatch shard.
4546
*/
46-
getShardForQueue(queueId: string): number {
47-
return jumpHash(queueId, this.shardCount);
47+
getShardForTenant(tenantId: string): number {
48+
return jumpHash(tenantId, this.shardCount);
4849
}
4950

5051
/**

packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -991,6 +991,84 @@ describe("Two-Level Tenant Dispatch", () => {
991991
}
992992
);
993993
});
994+
describe("dispatch shard is tenant-based, not queue-based", () => {
995+
redisTest(
996+
"tenant with queues in different queue shards should appear in only one dispatch shard",
997+
{ timeout: 15000 },
998+
async ({ redisOptions }) => {
999+
const keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
1000+
const redis = createRedisClient(redisOptions);
1001+
const shardCount = 2;
1002+
1003+
const helper = new TestHelper(redisOptions, keys, { shardCount });
1004+
1005+
try {
1006+
const tenantId = "tenant-shard-test";
1007+
1008+
// Find two queue IDs for the same tenant that hash to different queue shards
1009+
// by trying different queue names
1010+
const { MasterQueue: MQ } = await import("../masterQueue.js");
1011+
const mq = new MQ({ redis: redisOptions, keys, shardCount });
1012+
const { TenantDispatch: TD } = await import("../tenantDispatch.js");
1013+
const td = new TD({ redis: redisOptions, keys, shardCount });
1014+
1015+
let queueShard0: string | null = null;
1016+
let queueShard1: string | null = null;
1017+
1018+
for (let i = 0; i < 100; i++) {
1019+
const qId = `tenant:${tenantId}:queue:q${i}`;
1020+
const shard = mq.getShardForQueue(qId);
1021+
if (shard === 0 && !queueShard0) queueShard0 = qId;
1022+
if (shard === 1 && !queueShard1) queueShard1 = qId;
1023+
if (queueShard0 && queueShard1) break;
1024+
}
1025+
1026+
expect(queueShard0).not.toBeNull();
1027+
expect(queueShard1).not.toBeNull();
1028+
1029+
// Both queues belong to the same tenant, so dispatch shard should be the same
1030+
const expectedDispatchShard = td.getShardForTenant(tenantId);
1031+
1032+
// Enqueue to both queues
1033+
await helper.fairQueue.enqueue({
1034+
queueId: queueShard0!,
1035+
tenantId,
1036+
payload: { value: "msg-shard0" },
1037+
});
1038+
await helper.fairQueue.enqueue({
1039+
queueId: queueShard1!,
1040+
tenantId,
1041+
payload: { value: "msg-shard1" },
1042+
});
1043+
1044+
// Verify: tenant should only appear in one dispatch shard
1045+
const dispatch0 = await redis.zrange(keys.dispatchKey(0), 0, -1);
1046+
const dispatch1 = await redis.zrange(keys.dispatchKey(1), 0, -1);
1047+
1048+
const inShard0 = dispatch0.includes(tenantId);
1049+
const inShard1 = dispatch1.includes(tenantId);
1050+
1051+
// Tenant should appear in exactly one shard
1052+
expect(inShard0 !== inShard1).toBe(true);
1053+
1054+
// And it should be the expected one
1055+
if (expectedDispatchShard === 0) {
1056+
expect(inShard0).toBe(true);
1057+
expect(inShard1).toBe(false);
1058+
} else {
1059+
expect(inShard0).toBe(false);
1060+
expect(inShard1).toBe(true);
1061+
}
1062+
1063+
await mq.close();
1064+
await td.close();
1065+
} finally {
1066+
await helper.close();
1067+
await redis.quit();
1068+
}
1069+
}
1070+
);
1071+
});
9941072
});
9951073

9961074
// Helper to wait for a condition

0 commit comments

Comments
 (0)