Skip to content

Commit 1877f2c

Browse files
fix(cron): add staleness takeover to single-flight guard
1 parent cb5b5f4 commit 1877f2c

3 files changed

Lines changed: 88 additions & 13 deletions

File tree

apps/sim/app/api/schedules/execute/route.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1072,7 +1072,11 @@ async function processJobItem(job: ClaimedJob, queuedAt: Date, requestId: string
10721072
}
10731073
}
10741074

1075-
const scheduleTickGuard = createSingleFlight()
1075+
/**
1076+
* A tick self-bounds at `MAX_TICK_DURATION_MS`; allow a grace window beyond that
1077+
* before a hung tick is considered stale and a new one is allowed to take over.
1078+
*/
1079+
const scheduleTickGuard = createSingleFlight({ staleAfterMs: MAX_TICK_DURATION_MS + 60_000 })
10761080

10771081
interface ScheduleTickResult {
10781082
processedCount: number

apps/sim/lib/core/utils/background.test.ts

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ describe('runDetached', () => {
3636

3737
describe('createSingleFlight', () => {
3838
it('starts work and reports active while in flight', async () => {
39-
const guard = createSingleFlight()
39+
const guard = createSingleFlight({ staleAfterMs: 60_000 })
4040
let release: () => void = () => {}
4141
const gate = new Promise<void>((resolve) => {
4242
release = resolve
@@ -52,7 +52,7 @@ describe('createSingleFlight', () => {
5252
})
5353

5454
it('refuses a second run while one is already in flight', async () => {
55-
const guard = createSingleFlight()
55+
const guard = createSingleFlight({ staleAfterMs: 60_000 })
5656
let release: () => void = () => {}
5757
const gate = new Promise<void>((resolve) => {
5858
release = resolve
@@ -67,10 +67,54 @@ describe('createSingleFlight', () => {
6767
})
6868

6969
it('clears the active flag even when work rejects', async () => {
70-
const guard = createSingleFlight()
70+
const guard = createSingleFlight({ staleAfterMs: 60_000 })
7171

7272
expect(guard.run('task', () => Promise.reject(new Error('boom')))).toBe(true)
7373
await flushMicrotasks()
7474
expect(guard.isActive()).toBe(false)
7575
})
76+
77+
it('takes over a stale run whose work never settles', async () => {
78+
const guard = createSingleFlight({ staleAfterMs: 10 })
79+
80+
// A run whose promise never settles — its `finally` never fires.
81+
expect(guard.run('task', () => new Promise<void>(() => {}))).toBe(true)
82+
expect(guard.run('task', () => Promise.resolve())).toBe(false)
83+
84+
await new Promise((resolve) => setTimeout(resolve, 20))
85+
86+
const second = vi.fn().mockResolvedValue(undefined)
87+
expect(guard.run('task', second)).toBe(true)
88+
await flushMicrotasks()
89+
expect(second).toHaveBeenCalledTimes(1)
90+
expect(guard.isActive()).toBe(false)
91+
})
92+
93+
it('does not let a late stale run clear a newer run slot', async () => {
94+
const guard = createSingleFlight({ staleAfterMs: 10 })
95+
96+
let releaseStale: () => void = () => {}
97+
const stale = new Promise<void>((resolve) => {
98+
releaseStale = resolve
99+
})
100+
expect(guard.run('task', () => stale)).toBe(true)
101+
102+
await new Promise((resolve) => setTimeout(resolve, 20))
103+
104+
// New run takes over the stale slot.
105+
let releaseFresh: () => void = () => {}
106+
const fresh = new Promise<void>((resolve) => {
107+
releaseFresh = resolve
108+
})
109+
expect(guard.run('task', () => fresh)).toBe(true)
110+
111+
// The original stale run settling late must not release the newer slot.
112+
releaseStale()
113+
await flushMicrotasks()
114+
expect(guard.isActive()).toBe(true)
115+
116+
releaseFresh()
117+
await flushMicrotasks()
118+
expect(guard.isActive()).toBe(false)
119+
})
76120
})

apps/sim/lib/core/utils/background.ts

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,34 +25,61 @@ export function runDetached(label: string, work: () => Promise<unknown>): void {
2525
})
2626
}
2727

28+
interface SingleFlightOptions {
29+
/**
30+
* How long a run may hold the slot before it is treated as stale. A later
31+
* `run` call past this window takes over and starts a fresh run, so a hung
32+
* task (one whose promise never settles) cannot wedge the slot permanently.
33+
* This is the in-process equivalent of a distributed lock's TTL.
34+
*/
35+
staleAfterMs: number
36+
}
37+
2838
/**
2939
* A per-process single-flight guard. Prevents a long-running detached task from
3040
* piling up when it is invoked again before the previous run finishes.
3141
*
3242
* This guards a single Node process only — cross-replica deduplication must be
3343
* handled by the underlying work (e.g. database row claiming or a distributed
3444
* lock).
45+
*
46+
* A held slot is released when its run settles, or — if the run hangs and never
47+
* settles — taken over by the next `run` call after `staleAfterMs`. Ownership is
48+
* tracked by token so a stale run that settles late cannot clear a newer run's
49+
* slot.
3550
*/
36-
export function createSingleFlight() {
37-
let active = false
51+
export function createSingleFlight({ staleAfterMs }: SingleFlightOptions) {
52+
let activeToken: symbol | null = null
53+
let activeSince = 0
3854

3955
return {
40-
/** Whether a run is currently in flight in this process. */
41-
isActive: (): boolean => active,
56+
/** Whether a run currently holds the slot in this process. */
57+
isActive: (): boolean => activeToken !== null,
4258

4359
/**
44-
* Starts `work` detached if no run is active.
60+
* Starts `work` detached unless a non-stale run already holds the slot.
4561
*
46-
* @returns `true` if a new run started, `false` if one was already in flight.
62+
* @returns `true` if a new run started, `false` if a run was already in flight.
4763
*/
4864
run(label: string, work: () => Promise<unknown>): boolean {
49-
if (active) return false
50-
active = true
65+
const now = Date.now()
66+
if (activeToken !== null) {
67+
if (now - activeSince < staleAfterMs) return false
68+
logger.warn(
69+
`Single-flight "${label}" held for ${now - activeSince}ms (> ${staleAfterMs}ms); starting a new run`
70+
)
71+
}
72+
73+
const token = Symbol(label)
74+
activeToken = token
75+
activeSince = now
5176
runDetached(label, () =>
5277
Promise.resolve()
5378
.then(work)
5479
.finally(() => {
55-
active = false
80+
if (activeToken === token) {
81+
activeToken = null
82+
}
5683
})
5784
)
5885
return true

0 commit comments

Comments
 (0)