Skip to content

Commit 23d2922

Browse files
fix(rate-limiter): hosted-key queue follow-up fixes (#4762)
* fix(rate-limiter): hosted-key queue follow-ups from #4756 review * chore(rate-limiter): trim verbose comments
1 parent 81bcdf2 commit 23d2922

4 files changed

Lines changed: 28 additions & 9 deletions

File tree

apps/sim/background/workflow-column-execution.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ export async function executeWorkflowGroupCellJob(
7070
...currentPayload,
7171
groupId: next.id,
7272
workflowId: next.workflowId,
73+
// Re-derive so a workflow group after an enrichment group doesn't keep a stale enrichmentId.
74+
enrichmentId: next.enrichmentId,
7375
executionId: generateId(),
7476
}
7577
}

apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,16 @@ function interruptibleSleep(ms: number, signal?: AbortSignal): Promise<void> {
6565
return new Promise<void>((resolve) => {
6666
const onAbort = () => {
6767
clearTimeout(timer)
68+
signal.removeEventListener('abort', onAbort)
6869
resolve()
6970
}
7071
const timer = setTimeout(() => {
7172
signal.removeEventListener('abort', onAbort)
7273
resolve()
7374
}, ms)
7475
signal.addEventListener('abort', onAbort, { once: true })
76+
// Catch an abort that fired between the guard above and addEventListener.
77+
if (signal.aborted) onAbort()
7578
})
7679
}
7780

apps/sim/lib/core/rate-limiter/hosted-key/queue.test.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,24 +170,32 @@ describe('HostedKeyQueue', () => {
170170
})
171171

172172
describe('refreshHeartbeat', () => {
173-
it('writes the heartbeat key with TTL', async () => {
174-
mockRedis.set.mockResolvedValueOnce('OK')
173+
it('writes the heartbeat key with TTL and re-extends the queue list TTL', async () => {
174+
mockRedis.pipeline.exec.mockResolvedValueOnce([
175+
[null, 'OK'],
176+
[null, 1],
177+
])
175178

176179
await queue.refreshHeartbeat(provider, workspaceId, ticketId)
177180

178-
expect(mockRedis.set).toHaveBeenCalledWith(
181+
expect(mockRedis.pipeline.set).toHaveBeenCalledWith(
179182
'hosted-queue-tkt:exa:workspace-1:ticket-1',
180183
'1',
181184
'EX',
182185
expect.any(Number)
183186
)
187+
expect(mockRedis.pipeline.expire).toHaveBeenCalledWith(
188+
'hosted-queue:exa:workspace-1',
189+
expect.any(Number)
190+
)
191+
expect(mockRedis.pipeline.exec).toHaveBeenCalledTimes(1)
184192
})
185193

186194
it('is a no-op when Redis is unavailable', async () => {
187195
redisConfigMockFns.mockGetRedisClient.mockReturnValueOnce(null)
188196

189197
await expect(queue.refreshHeartbeat(provider, workspaceId, ticketId)).resolves.toBeUndefined()
190-
expect(mockRedis.set).not.toHaveBeenCalled()
198+
expect(mockRedis.multi).not.toHaveBeenCalled()
191199
})
192200
})
193201

apps/sim/lib/core/rate-limiter/hosted-key/queue.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ const TICKET_HEARTBEAT_TTL_SECONDS = 30
1515
export const HEARTBEAT_REFRESH_INTERVAL_MS = 10_000
1616

1717
/**
18-
* TTL on the queue list itself. Set on every enqueue. Prevents abandoned queues
19-
* (whole workspace went silent) from sticking around forever in Redis.
18+
* TTL on the queue list itself. Set on enqueue and re-extended by the head's heartbeat,
19+
* so a long-waiting head can't let the list expire out from under the waiters behind it.
20+
* Prevents abandoned queues from sticking around forever in Redis.
2021
*/
2122
const QUEUE_LIST_TTL_SECONDS = 600
2223

@@ -147,8 +148,9 @@ export class HostedKeyQueue {
147148
}
148149

149150
/**
150-
* Refresh the ticket's heartbeat. Called periodically by the head while it's
151-
* waiting on the bucket so it doesn't get reaped as dead.
151+
* Refresh the ticket's heartbeat so the head isn't reaped as dead while waiting on the
152+
* bucket. Also re-extends the queue list TTL so a wait outliving {@link QUEUE_LIST_TTL_SECONDS}
153+
* doesn't let the list expire and collapse FIFO ordering.
152154
*/
153155
async refreshHeartbeat(
154156
provider: string,
@@ -158,9 +160,13 @@ export class HostedKeyQueue {
158160
const redis = getRedisClient()
159161
if (!redis) return
160162

163+
const listKey = queueListKey(provider, billingActorId)
161164
const hbKey = heartbeatKey(provider, billingActorId, ticketId)
162165
try {
163-
await redis.set(hbKey, '1', 'EX', TICKET_HEARTBEAT_TTL_SECONDS)
166+
const pipeline = redis.multi()
167+
pipeline.set(hbKey, '1', 'EX', TICKET_HEARTBEAT_TTL_SECONDS)
168+
pipeline.expire(listKey, QUEUE_LIST_TTL_SECONDS)
169+
await pipeline.exec()
164170
} catch (error) {
165171
logger.warn(`Queue heartbeat refresh failed for ${hbKey}`, {
166172
error: toError(error).message,

0 commit comments

Comments
 (0)