Skip to content

Commit d79ca44

Browse files
committed
fix: fallback dispatch scheduler was reading empty master queue
The fallback path for schedulers without selectQueuesFromDispatch was building allQueues from the dispatch index but then ignoring it and calling scheduler.selectQueues with the old master queue key (which is empty for new messages). Fixed to group the fetched queues by tenant directly with capacity filtering.
1 parent 8d0a826 commit d79ca44

File tree

2 files changed

+17
-17
lines changed

2 files changed

+17
-17
lines changed

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import type {
2727
GlobalRateLimiter,
2828
QueueCooloffState,
2929
QueueDescriptor,
30-
QueueWithScore,
3130
SchedulerContext,
3231
StoredMessage,
3332
TenantQueues,
@@ -968,8 +967,8 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
968967

969968
/**
970969
* Fallback for schedulers that don't implement selectQueuesFromDispatch.
971-
* Reads dispatch index, fetches per-tenant queues, flattens into the
972-
* old-style master queue key format, and calls selectQueues.
970+
* Reads dispatch index, fetches per-tenant queues, groups by tenant,
971+
* and filters at-capacity tenants. No DRR deficit tracking in this path.
973972
*/
974973
async #fallbackDispatchToLegacyScheduler(
975974
loopId: string,
@@ -981,23 +980,20 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
981980
const tenants = await this.tenantDispatch.getTenantsFromShard(shardId);
982981
if (tenants.length === 0) return [];
983982

984-
// For each tenant, get their queues and build a flat list
985-
const allQueues: QueueWithScore[] = [];
983+
// For each tenant, get their queues and build grouped result
984+
const tenantQueues: TenantQueues[] = [];
986985
for (const { tenantId } of tenants) {
986+
if (this.concurrencyManager) {
987+
const atCapacity = await this.concurrencyManager.isAtCapacity("tenant", tenantId);
988+
if (atCapacity) continue;
989+
}
987990
const queues = await this.tenantDispatch.getQueuesForTenant(tenantId);
988-
allQueues.push(...queues);
991+
if (queues.length > 0) {
992+
tenantQueues.push({ tenantId, queues: queues.map((q) => q.queueId) });
993+
}
989994
}
990995

991-
if (allQueues.length === 0) return [];
992-
993-
// Use the base scheduler context (without dispatch methods)
994-
const baseContext = this.#createSchedulerContext();
995-
996-
// We need a temporary master queue key for the scheduler. Rather than
997-
// writing to Redis, we'll group the queues by tenant ourselves and
998-
// apply the same logic as the legacy selectQueues.
999-
const masterQueueKey = this.keys.masterQueueKey(shardId);
1000-
return this.scheduler.selectQueues(masterQueueKey, loopId, baseContext);
996+
return tenantQueues;
1001997
}
1002998

1003999
/**

scripts/enhance-release-pr.mjs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,11 @@ function formatPrBody({ version, packageEntries, serverEntries, rawBody }) {
302302
"These changes affect the self-hosted Docker image and Trigger.dev Cloud:"
303303
);
304304
lines.push("");
305-
for (const entry of allServer) lines.push(`- ${entry.text}`);
305+
for (const entry of allServer) {
306+
// Indent continuation lines so multi-line entries stay inside the list item
307+
const indented = entry.text.replace(/\n/g, "\n ");
308+
lines.push(`- ${indented}`);
309+
}
306310
lines.push("");
307311
}
308312

0 commit comments

Comments
 (0)