Skip to content

Commit 82d194c

Browse files
improvement(cron): drop single-flight guard, rely on DB row claiming
1 parent 1877f2c commit 82d194c

5 files changed

Lines changed: 6 additions & 175 deletions

File tree

apps/sim/app/api/schedules/execute/route.test.ts

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -877,21 +877,5 @@ describe('Scheduled Workflow Execution API Route', () => {
877877
const data = await response.json()
878878
expect(data).toMatchObject({ status: 'started' })
879879
})
880-
881-
it('returns already_running while a tick is in flight in the same process', async () => {
882-
// Both calls run synchronously up to their `return` (no awaits before the
883-
// single-flight guard), so the detached tick from the first call cannot
884-
// progress past its first `await` before the second call checks the guard.
885-
const first = GET(createMockRequest())
886-
const second = GET(createMockRequest())
887-
888-
const [firstData, secondData] = await Promise.all([
889-
first.then((r) => r.json()),
890-
second.then((r) => r.json()),
891-
])
892-
893-
expect(firstData).toMatchObject({ status: 'started' })
894-
expect(secondData).toMatchObject({ status: 'already_running' })
895-
})
896880
})
897881
})

apps/sim/app/api/schedules/execute/route.ts

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
1414
import { JOB_STATUS, type Job } from '@/lib/core/async-jobs/types'
1515
import { isRetryableInfrastructureError } from '@/lib/core/errors/retryable-infrastructure'
1616
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
17-
import { createSingleFlight } from '@/lib/core/utils/background'
17+
import { runDetached } from '@/lib/core/utils/background'
1818
import { generateRequestId } from '@/lib/core/utils/request'
1919
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
2020
import {
@@ -1072,12 +1072,6 @@ async function processJobItem(job: ClaimedJob, queuedAt: Date, requestId: string
10721072
}
10731073
}
10741074

1075-
/**
1076-
* A tick self-bounds at `MAX_TICK_DURATION_MS`; allow a grace window beyond that
1077-
* before a hung tick is considered stale and a new one is allowed to take over.
1078-
*/
1079-
const scheduleTickGuard = createSingleFlight({ staleAfterMs: MAX_TICK_DURATION_MS + 60_000 })
1080-
10811075
interface ScheduleTickResult {
10821076
processedCount: number
10831077
totalSchedules: number
@@ -1191,11 +1185,11 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
11911185
return authError
11921186
}
11931187

1194-
const started = scheduleTickGuard.run('schedule-execution-tick', () => runScheduleTick(requestId))
1188+
runDetached('schedule-execution-tick', () => runScheduleTick(requestId))
11951189

11961190
const response = {
1197-
message: started ? 'Scheduled execution started' : 'Scheduled execution already in progress',
1198-
status: started ? 'started' : 'already_running',
1191+
message: 'Scheduled execution started',
1192+
status: 'started',
11991193
} satisfies ExecuteSchedulesResponse
12001194

12011195
return NextResponse.json(response, { status: 202 })

apps/sim/lib/api/contracts/schedules.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ const messageResponseSchema = z.object({
134134

135135
export const executeSchedulesResponseSchema = z.object({
136136
message: z.string(),
137-
status: z.enum(['started', 'already_running']),
137+
status: z.literal('started'),
138138
})
139139

140140
export type ExecuteSchedulesResponse = z.output<typeof executeSchedulesResponseSchema>

apps/sim/lib/core/utils/background.test.ts

Lines changed: 1 addition & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
* @vitest-environment node
33
*/
44
import { describe, expect, it, vi } from 'vitest'
5-
import { createSingleFlight, runDetached } from '@/lib/core/utils/background'
5+
import { runDetached } from '@/lib/core/utils/background'
66

77
const flushMicrotasks = () => new Promise((resolve) => setTimeout(resolve, 0))
88

@@ -33,88 +33,3 @@ describe('runDetached', () => {
3333
await flushMicrotasks()
3434
})
3535
})
36-
37-
describe('createSingleFlight', () => {
38-
it('starts work and reports active while in flight', async () => {
39-
const guard = createSingleFlight({ staleAfterMs: 60_000 })
40-
let release: () => void = () => {}
41-
const gate = new Promise<void>((resolve) => {
42-
release = resolve
43-
})
44-
45-
const started = guard.run('task', () => gate)
46-
expect(started).toBe(true)
47-
expect(guard.isActive()).toBe(true)
48-
49-
release()
50-
await flushMicrotasks()
51-
expect(guard.isActive()).toBe(false)
52-
})
53-
54-
it('refuses a second run while one is already in flight', async () => {
55-
const guard = createSingleFlight({ staleAfterMs: 60_000 })
56-
let release: () => void = () => {}
57-
const gate = new Promise<void>((resolve) => {
58-
release = resolve
59-
})
60-
61-
expect(guard.run('task', () => gate)).toBe(true)
62-
expect(guard.run('task', () => Promise.resolve())).toBe(false)
63-
64-
release()
65-
await flushMicrotasks()
66-
expect(guard.run('task', () => Promise.resolve())).toBe(true)
67-
})
68-
69-
it('clears the active flag even when work rejects', async () => {
70-
const guard = createSingleFlight({ staleAfterMs: 60_000 })
71-
72-
expect(guard.run('task', () => Promise.reject(new Error('boom')))).toBe(true)
73-
await flushMicrotasks()
74-
expect(guard.isActive()).toBe(false)
75-
})
76-
77-
it('takes over a stale run whose work never settles', async () => {
78-
const guard = createSingleFlight({ staleAfterMs: 10 })
79-
80-
// A run whose promise never settles — its `finally` never fires.
81-
expect(guard.run('task', () => new Promise<void>(() => {}))).toBe(true)
82-
expect(guard.run('task', () => Promise.resolve())).toBe(false)
83-
84-
await new Promise((resolve) => setTimeout(resolve, 20))
85-
86-
const second = vi.fn().mockResolvedValue(undefined)
87-
expect(guard.run('task', second)).toBe(true)
88-
await flushMicrotasks()
89-
expect(second).toHaveBeenCalledTimes(1)
90-
expect(guard.isActive()).toBe(false)
91-
})
92-
93-
it('does not let a late stale run clear a newer run slot', async () => {
94-
const guard = createSingleFlight({ staleAfterMs: 10 })
95-
96-
let releaseStale: () => void = () => {}
97-
const stale = new Promise<void>((resolve) => {
98-
releaseStale = resolve
99-
})
100-
expect(guard.run('task', () => stale)).toBe(true)
101-
102-
await new Promise((resolve) => setTimeout(resolve, 20))
103-
104-
// New run takes over the stale slot.
105-
let releaseFresh: () => void = () => {}
106-
const fresh = new Promise<void>((resolve) => {
107-
releaseFresh = resolve
108-
})
109-
expect(guard.run('task', () => fresh)).toBe(true)
110-
111-
// The original stale run settling late must not release the newer slot.
112-
releaseStale()
113-
await flushMicrotasks()
114-
expect(guard.isActive()).toBe(true)
115-
116-
releaseFresh()
117-
await flushMicrotasks()
118-
expect(guard.isActive()).toBe(false)
119-
})
120-
})

apps/sim/lib/core/utils/background.ts

Lines changed: 0 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -24,65 +24,3 @@ export function runDetached(label: string, work: () => Promise<unknown>): void {
2424
logger.error(`Background task failed: ${label}`, toError(error))
2525
})
2626
}
27-
28-
interface SingleFlightOptions {
29-
/**
30-
* How long a run may hold the slot before it is treated as stale. A later
31-
* `run` call past this window takes over and starts a fresh run, so a hung
32-
* task (one whose promise never settles) cannot wedge the slot permanently.
33-
* This is the in-process equivalent of a distributed lock's TTL.
34-
*/
35-
staleAfterMs: number
36-
}
37-
38-
/**
39-
* A per-process single-flight guard. Prevents a long-running detached task from
40-
* piling up when it is invoked again before the previous run finishes.
41-
*
42-
* This guards a single Node process only — cross-replica deduplication must be
43-
* handled by the underlying work (e.g. database row claiming or a distributed
44-
* lock).
45-
*
46-
* A held slot is released when its run settles, or — if the run hangs and never
47-
* settles — taken over by the next `run` call after `staleAfterMs`. Ownership is
48-
* tracked by token so a stale run that settles late cannot clear a newer run's
49-
* slot.
50-
*/
51-
export function createSingleFlight({ staleAfterMs }: SingleFlightOptions) {
52-
let activeToken: symbol | null = null
53-
let activeSince = 0
54-
55-
return {
56-
/** Whether a run currently holds the slot in this process. */
57-
isActive: (): boolean => activeToken !== null,
58-
59-
/**
60-
* Starts `work` detached unless a non-stale run already holds the slot.
61-
*
62-
* @returns `true` if a new run started, `false` if a run was already in flight.
63-
*/
64-
run(label: string, work: () => Promise<unknown>): boolean {
65-
const now = Date.now()
66-
if (activeToken !== null) {
67-
if (now - activeSince < staleAfterMs) return false
68-
logger.warn(
69-
`Single-flight "${label}" held for ${now - activeSince}ms (> ${staleAfterMs}ms); starting a new run`
70-
)
71-
}
72-
73-
const token = Symbol(label)
74-
activeToken = token
75-
activeSince = now
76-
runDetached(label, () =>
77-
Promise.resolve()
78-
.then(work)
79-
.finally(() => {
80-
if (activeToken === token) {
81-
activeToken = null
82-
}
83-
})
84-
)
85-
return true
86-
},
87-
}
88-
}

0 commit comments

Comments
 (0)