Skip to content

Commit 170951a

Browse files
committed
retryable errs cleanup
1 parent 4999666 commit 170951a

2 files changed

Lines changed: 95 additions & 9 deletions

File tree

apps/sim/app/api/schedules/execute/route.test.ts

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -615,13 +615,15 @@ describe('Scheduled Workflow Execution API Route', () => {
615615
)
616616
})
617617

618-
it('releases schedule claims when lookup fails before enqueue', async () => {
618+
it('defers schedule claims when retryable lookup infrastructure fails before enqueue', async () => {
619619
const claimedAt = new Date('2025-01-01T00:00:00.000Z')
620620
const schedule = {
621621
...SINGLE_SCHEDULE[0],
622622
lastQueuedAt: claimedAt,
623623
}
624-
mockGetJob.mockRejectedValueOnce(new Error('queue lookup failed'))
624+
mockGetJob.mockRejectedValueOnce(
625+
Object.assign(new Error('queue lookup failed'), { code: 'ECONNRESET' })
626+
)
625627
dbChainMockFns.limit
626628
.mockResolvedValueOnce(SINGLE_CLAIMED_SCHEDULE_ROWS)
627629
.mockResolvedValueOnce([])
@@ -641,6 +643,60 @@ describe('Scheduled Workflow Execution API Route', () => {
641643
)
642644
})
643645

646+
it('marks schedules failed when non-retryable setup errors happen before enqueue', async () => {
647+
const claimedAt = new Date('2025-01-01T00:00:00.000Z')
648+
const schedule = {
649+
...SINGLE_SCHEDULE[0],
650+
lastQueuedAt: claimedAt,
651+
}
652+
mockGetJob.mockRejectedValueOnce(new Error('bad setup invariant'))
653+
dbChainMockFns.limit
654+
.mockResolvedValueOnce(SINGLE_CLAIMED_SCHEDULE_ROWS)
655+
.mockResolvedValueOnce([])
656+
dbChainMockFns.returning.mockReturnValueOnce([schedule]).mockReturnValueOnce([])
657+
658+
const response = await GET(createMockRequest())
659+
660+
expect(response.status).toBe(200)
661+
expect(mockEnqueue).not.toHaveBeenCalled()
662+
expect(dbChainMockFns.set).toHaveBeenCalledWith(
663+
expect.objectContaining({
664+
lastQueuedAt: null,
665+
lastFailedAt: expect.any(Date),
666+
nextRunAt: expect.any(Date),
667+
infraRetryCount: 0,
668+
})
669+
)
670+
expect(dbChainMockFns.set).not.toHaveBeenCalledWith(
671+
expect.objectContaining({
672+
infraRetryCount: 1,
673+
})
674+
)
675+
})
676+
677+
it('uses one backend mode decision for slot accounting and schedule processing', async () => {
678+
mockShouldExecuteInline.mockReturnValue(true)
679+
const randomSpy = vi.spyOn(Math, 'random').mockReturnValue(0)
680+
dbChainMockFns.limit
681+
.mockResolvedValueOnce(SINGLE_CLAIMED_SCHEDULE_ROWS)
682+
.mockResolvedValueOnce([])
683+
dbChainMockFns.returning
684+
.mockReturnValueOnce(SINGLE_SCHEDULE)
685+
.mockResolvedValueOnce([{ id: 'job-id-1' }])
686+
687+
try {
688+
const response = await GET(createMockRequest())
689+
690+
expect(response.status).toBe(200)
691+
expect(mockShouldExecuteInline).toHaveBeenCalledTimes(1)
692+
expect(mockExecuteScheduleJob).toHaveBeenCalledWith(
693+
expect.objectContaining({ scheduleId: 'schedule-1' })
694+
)
695+
} finally {
696+
randomSpy.mockRestore()
697+
}
698+
})
699+
644700
it('restores the original claim token when an active durable job owns the occurrence', async () => {
645701
const originalClaim = new Date()
646702
const staleReclaim = new Date(originalClaim.getTime() + 60_000)

apps/sim/app/api/schedules/execute/route.ts

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import type { ExecuteSchedulesResponse } from '@/lib/api/contracts/schedules'
1212
import { verifyCronAuth } from '@/lib/auth/internal'
1313
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
1414
import { JOB_STATUS, type Job } from '@/lib/core/async-jobs/types'
15+
import { isRetryableInfrastructureError } from '@/lib/core/errors/retryable-infrastructure'
1516
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
1617
import { generateRequestId } from '@/lib/core/utils/request'
1718
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
@@ -400,6 +401,33 @@ async function deferClaimedScheduleAfterQueueFailure(
400401
})
401402
}
402403

404+
async function handleClaimedScheduleSetupFailure(
405+
schedule: ClaimedSchedule,
406+
requestId: string,
407+
expectedLastQueuedAt: Date,
408+
error: unknown,
409+
retryContext: string,
410+
failureContext: string
411+
): Promise<void> {
412+
if (isRetryableInfrastructureError(error)) {
413+
await deferClaimedScheduleAfterQueueFailure(
414+
schedule,
415+
requestId,
416+
expectedLastQueuedAt,
417+
error,
418+
retryContext
419+
)
420+
return
421+
}
422+
423+
logger.error(`[${requestId}] Non-retryable schedule setup failure`, {
424+
scheduleId: schedule.id,
425+
workflowId: schedule.workflowId,
426+
error: toError(error).message,
427+
})
428+
await markClaimedScheduleFailed(schedule, requestId, expectedLastQueuedAt, failureContext)
429+
}
430+
403431
async function recoverStaleDatabaseScheduleJobs(now: Date): Promise<void> {
404432
const staleStartedBefore = getStaleScheduleExecutionCutoff(now)
405433

@@ -721,7 +749,8 @@ async function processScheduleItem(
721749
schedule: ClaimedSchedule,
722750
queuedAt: Date,
723751
requestId: string,
724-
jobQueue: JobQueue
752+
jobQueue: JobQueue,
753+
useDatabaseFallback: boolean
725754
) {
726755
const queueTime = schedule.lastQueuedAt ?? queuedAt
727756
const executionId = generateId()
@@ -756,7 +785,6 @@ async function processScheduleItem(
756785
let enqueuedJobId: string | null = null
757786

758787
try {
759-
const useDatabaseFallback = shouldExecuteInline()
760788
const delayMs = Math.floor(Math.random() * SCHEDULE_JITTER_MAX_MS)
761789

762790
const scheduleJobId = buildScheduleExecutionJobId(schedule)
@@ -908,12 +936,13 @@ async function processScheduleItem(
908936
`[${requestId}] Failed to enqueue schedule execution for workflow ${schedule.workflowId}`,
909937
error
910938
)
911-
await deferClaimedScheduleAfterQueueFailure(
939+
await handleClaimedScheduleSetupFailure(
912940
schedule,
913941
requestId,
914942
queueTime,
915943
error,
916-
`Failed to defer schedule ${schedule.id} after enqueue failure`
944+
`Failed to defer schedule ${schedule.id} after enqueue failure`,
945+
`Failed to mark schedule ${schedule.id} failed after non-retryable enqueue failure`
917946
)
918947
return
919948
}
@@ -1004,12 +1033,13 @@ async function processScheduleItem(
10041033
error
10051034
)
10061035
if (!enqueuedJobId) {
1007-
await deferClaimedScheduleAfterQueueFailure(
1036+
await handleClaimedScheduleSetupFailure(
10081037
schedule,
10091038
requestId,
10101039
queueTime,
10111040
error,
1012-
`Failed to defer schedule ${schedule.id} after pre-enqueue failure`
1041+
`Failed to defer schedule ${schedule.id} after pre-enqueue failure`,
1042+
`Failed to mark schedule ${schedule.id} failed after non-retryable setup failure`
10131043
)
10141044
}
10151045
}
@@ -1117,7 +1147,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
11171147
const schedulePromises =
11181148
dueSchedules.length > 0
11191149
? dueSchedules.map((schedule) =>
1120-
processScheduleItem(schedule, queuedAt, requestId, jobQueue)
1150+
processScheduleItem(schedule, queuedAt, requestId, jobQueue, useDatabaseFallback)
11211151
)
11221152
: []
11231153

0 commit comments

Comments
 (0)