From 2e08ee585d56e59e7cef9a2d6fb6c61f95c20333 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 5 May 2026 09:50:03 +0900 Subject: [PATCH 1/4] [core] V2: pre-schedule the wait timer before inline-executing a step MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix `Promise.race(step, sleep)` semantics in V2 mixed suspensions without losing inline step execution. Inline `await executeStep(...)` blocks the V2 handler for the full step duration, but `wait_completed` events are only created on the *next* loop iteration's "complete elapsed waits" pass. So if the sleep is shorter than the step, replay always picked the step because the wait_completed event hadn't been written yet — `sleepWinsRaceWorkflow` returned `'step'` instead of `'sleep'`. Fix: when a suspension contains both an owned inline step and at least one pending wait, queue a delayed self-message with `delaySeconds = suspensionResult.timeoutSeconds` *before* starting inline execution. The queued continuation fires in a separate function invocation while the step is still running. That parallel invocation's "complete elapsed waits" pass writes wait_completed, replay observes the elapsed wait, and `Promise.race` resolves with the sleep correctly. The original (still-running) inline invocation finishes its step, sees `run_completed` on the next loop iteration, and exits. This preserves inline-step execution speed for the step-wins case: the step finishes inline and the workflow returns directly. The eagerly-queued wait continuation fires after the step has won and just no-ops on the terminal run. Test plan: - New e2e tests `sleepWinsRaceWorkflow` and `stepWinsRaceWorkflow` exercising `Promise.race` between a step function and `sleep()`, in both directions. - Verified locally against `nextjs-turbopack` workbench: both pass. Event log confirms `wait_completed` is created at t≈1s after `wait_created` (1s sleep) instead of at t≈11s after the inline step finishes. Eager-processing changelog updated with a "Mixed Suspensions" section describing the pre-scheduled wait approach and its rationale. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../fix-step-vs-wait-race-eager-wait.md | 5 +++ .../docs/changelog/eager-processing.mdx | 6 ++- packages/core/e2e/e2e.test.ts | 16 +++++++ packages/core/src/runtime.ts | 43 +++++++++++++++++++ workbench/example/workflows/99_e2e.ts | 30 +++++++++++++ 5 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 .changeset/fix-step-vs-wait-race-eager-wait.md diff --git a/.changeset/fix-step-vs-wait-race-eager-wait.md b/.changeset/fix-step-vs-wait-race-eager-wait.md new file mode 100644 index 0000000000..b64acdbb3f --- /dev/null +++ b/.changeset/fix-step-vs-wait-race-eager-wait.md @@ -0,0 +1,5 @@ +--- +"@workflow/core": patch +--- + +Fix `Promise.race(step, sleep)` semantics in V2 mixed suspensions: when a workflow suspension contains both pending steps and at least one wait (sleep), the runtime now pre-schedules a delayed self-message for the wait timeout *before* inline-executing the step. Without this, an inline step longer than the sleep would block the handler past the sleep's `resumeAt`, the wait timer would never fire on time, and replay would always resolve the race with the step. Pre-scheduling the wait continuation lets the wait timer fire in a parallel function invocation while the step is still running, restoring V1's race semantics while preserving inline step execution for the step-wins case. diff --git a/docs/content/docs/changelog/eager-processing.mdx b/docs/content/docs/changelog/eager-processing.mdx index 32218b33cc..d3ecb24c9b 100644 --- a/docs/content/docs/changelog/eager-processing.mdx +++ b/docs/content/docs/changelog/eager-processing.mdx @@ -230,7 +230,11 @@ When an inline step fails with retries remaining: ### Mixed Suspensions -A suspension may contain steps, hooks, and waits simultaneously. The handler creates events for all, executes any pending step inline, and returns with the wait timeout if applicable. The workflow will re-suspend on next replay for the still-pending hooks/waits. +A suspension may contain steps, hooks, and waits simultaneously. The handler creates events for all, then executes any pending step inline and returns with the wait timeout if applicable. The workflow will re-suspend on next replay for the still-pending hooks/waits. + +When a suspension contains **both pending steps and at least one wait**, the handler additionally enqueues a delayed self-message **before** starting inline step execution. The delay is the wait's `timeoutSeconds`. The reason: inline `await executeStep(...)` blocks the handler for the full step duration, but `wait_completed` events are only created on the *next* loop iteration's "complete elapsed waits" pass — so a longer-running step would prevent the wait from firing on time, breaking `Promise.race(step, sleep)` semantics. Pre-scheduling the wait continuation lets the wait timer fire in a separate function invocation while the inline step is still running. That parallel invocation observes the elapsed wait, writes `wait_completed`, replays, resolves the race with `'sleep'`, and completes the run. The original (still-running inline) invocation eventually finishes its step, sees `run_completed` on the next loop iteration, and exits cleanly. + +This pre-schedule is in addition to the `{ timeoutSeconds }` return value the handler emits for the same suspension. Duplicate continuations are harmless: every replay observes the current event log and either advances the workflow, exits on a terminal event, or no-ops. ### Hook Conflicts diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 76d9fc88dd..e10b0f3ab5 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -581,6 +581,22 @@ describe('e2e', () => { expect(elapsed).toBeLessThan(25_000); }); + test('sleepWinsRaceWorkflow', { timeout: 60_000 }, async () => { + const run = await start(await e2e('sleepWinsRaceWorkflow'), []); + const returnValue = await run.returnValue; + expect(returnValue.winner).toBe('sleep'); + // Sleep is 1s; step would take 10s. Should resolve in ~1s, well under 5s. + expect(returnValue.durationMs).toBeLessThan(5_000); + }); + + test('stepWinsRaceWorkflow', { timeout: 60_000 }, async () => { + const run = await start(await e2e('stepWinsRaceWorkflow'), []); + const returnValue = await run.returnValue; + expect(returnValue.winner).toBe('step'); + // Step is 1s; sleep would take 10s. Should resolve in ~1s, well under 5s. + expect(returnValue.durationMs).toBeLessThan(5_000); + }); + test('nullByteWorkflow', { timeout: 60_000 }, async () => { const run = await start(await e2e('nullByteWorkflow'), []); const returnValue = await run.returnValue; diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 3c2bfb8f01..54b88bf5b9 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -886,6 +886,49 @@ export function workflowEntrypoint( | (typeof pendingSteps)[number] | undefined = ownedPendingSteps[0]; + // Eagerly schedule the wait timer's continuation + // BEFORE we inline-execute the step. Inline `await + // executeStep(...)` blocks the handler for the full + // step duration; if the wait timer were only encoded + // as the function's return value (`{ timeoutSeconds + // }`), it would not fire until the inline step + // finished. That defeats `Promise.race(step, sleep)` + // semantics whenever the sleep is shorter than the + // step. Queueing a delayed self-message now lets the + // wait timer fire in a separate function invocation + // while the step is still running, so the parallel + // replay can observe `wait_completed` and resolve + // the race correctly. + // + // Only the inline executor needs to do this — if + // we're not inlining (no owned step), the + // `{ timeoutSeconds }` return below already conveys + // the wait timer to the queue without blocking. + // + // Note: this scheduled continuation is in addition + // to the `{ timeoutSeconds }` return value below. + // Duplicate continuations are harmless — each replay + // observes the current event log and either advances + // the workflow, exits on a terminal event, or no-ops. + if ( + inlineStep && + suspensionResult.timeoutSeconds !== undefined + ) { + const traceCarrier = await serializeTraceCarrier(); + await queueMessage( + world, + getWorkflowQueueName(workflowName), + { + runId, + traceCarrier, + requestedAt: new Date(), + }, + { + delaySeconds: suspensionResult.timeoutSeconds, + } + ); + } + // Queue every pending step except the one we're // executing inline. This mirrors V1's unconditional // enqueue-with-idempotency pattern and is what makes diff --git a/workbench/example/workflows/99_e2e.ts b/workbench/example/workflows/99_e2e.ts index 131535650e..54c46fe725 100644 --- a/workbench/example/workflows/99_e2e.ts +++ b/workbench/example/workflows/99_e2e.ts @@ -215,6 +215,36 @@ export async function parallelSleepWorkflow() { ////////////////////////////////////////////////////////// +async function delayMsStep(ms: number, label: string) { + 'use step'; + await new Promise((resolve) => setTimeout(resolve, ms)); + return label; +} + +export async function sleepWinsRaceWorkflow() { + 'use workflow'; + const startTime = Date.now(); + const winner = await Promise.race([ + delayMsStep(10_000, 'step'), + sleep('1s').then(() => 'sleep'), + ]); + const endTime = Date.now(); + return { winner, durationMs: endTime - startTime }; +} + +export async function stepWinsRaceWorkflow() { + 'use workflow'; + const startTime = Date.now(); + const winner = await Promise.race([ + delayMsStep(1_000, 'step'), + sleep('10s').then(() => 'sleep'), + ]); + const endTime = Date.now(); + return { winner, durationMs: endTime - startTime }; +} + +////////////////////////////////////////////////////////// + async function nullByteStep() { 'use step'; return 'null byte \0'; From a5148ce6a38ea028e9f2fa42a1cffdce76dfed22 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 5 May 2026 10:13:55 +0900 Subject: [PATCH 2/4] [world-local] Honor delaySeconds before message delivery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The local queue's `queue()` enqueue path ignored the `delaySeconds` option entirely — every message was delivered immediately, regardless of the requested delay. VQS-side queues (used by world-vercel and world-postgres) honor delaySeconds at the broker, so this brings world-local in line with production semantics. The runtime needs this to land before the wait-as-continuation unification in the next commit: that change starts queueing wait timers as fresh delayed continuations instead of returning `{ timeoutSeconds }`. Without delaySeconds support, those wait continuations would fire instantly in dev and trigger spurious replays. Sleep happens outside the queue's worker semaphore so a delayed message doesn't tie up a worker slot during its delay window — other immediate messages are free to dispatch in parallel. New tests in queue.test.ts cover: - delaySeconds > 0 → setTimeout called with the right ms value - delaySeconds === 0 → no setTimeout (immediate dispatch) - delaySeconds omitted → no setTimeout (immediate dispatch) --- packages/world-local/src/queue.test.ts | 83 ++++++++++++++++++++++++++ packages/world-local/src/queue.ts | 11 ++++ 2 files changed, 94 insertions(+) diff --git a/packages/world-local/src/queue.test.ts b/packages/world-local/src/queue.test.ts index 8f0b4f4491..2cdbcbd312 100644 --- a/packages/world-local/src/queue.test.ts +++ b/packages/world-local/src/queue.test.ts @@ -167,3 +167,86 @@ describe('queue timeout re-enqueue', () => { expect(mockSetTimeout).not.toHaveBeenCalled(); }); }); + +describe('queue delaySeconds', () => { + let localQueue: ReturnType; + + beforeEach(() => { + localQueue = createQueue({ baseUrl: 'http://localhost:3000' }); + }); + + afterEach(async () => { + await localQueue.close(); + }); + + it('honors delaySeconds before delivering the message', async () => { + const { setTimeout: mockSetTimeout } = await import('node:timers/promises'); + vi.mocked(mockSetTimeout).mockClear(); + + let callCount = 0; + const handler = localQueue.createQueueHandler('__wkf_step_', async () => { + callCount++; + return undefined; + }); + + localQueue.registerHandler('__wkf_step_', handler); + + await localQueue.queue('__wkf_step_test' as any, stepPayload, { + delaySeconds: 7, + }); + + await vi.waitFor(() => { + expect(callCount).toBe(1); + }); + + // setTimeout should have been called with the delay (7s = 7000ms) + // before the message was delivered. + expect(mockSetTimeout).toHaveBeenCalledWith(7000); + }); + + it('does not call setTimeout for delaySeconds: 0', async () => { + const { setTimeout: mockSetTimeout } = await import('node:timers/promises'); + vi.mocked(mockSetTimeout).mockClear(); + + let callCount = 0; + const handler = localQueue.createQueueHandler('__wkf_step_', async () => { + callCount++; + return undefined; + }); + + localQueue.registerHandler('__wkf_step_', handler); + + await localQueue.queue('__wkf_step_test' as any, stepPayload, { + delaySeconds: 0, + }); + + await vi.waitFor(() => { + expect(callCount).toBe(1); + }); + + // setTimeout should NOT have been called for delaySeconds: 0 (the + // delay-honoring branch is gated on `delaySeconds > 0`). + expect(mockSetTimeout).not.toHaveBeenCalled(); + }); + + it('does not call setTimeout when delaySeconds is omitted', async () => { + const { setTimeout: mockSetTimeout } = await import('node:timers/promises'); + vi.mocked(mockSetTimeout).mockClear(); + + let callCount = 0; + const handler = localQueue.createQueueHandler('__wkf_step_', async () => { + callCount++; + return undefined; + }); + + localQueue.registerHandler('__wkf_step_', handler); + + await localQueue.queue('__wkf_step_test' as any, stepPayload); + + await vi.waitFor(() => { + expect(callCount).toBe(1); + }); + + expect(mockSetTimeout).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/world-local/src/queue.ts b/packages/world-local/src/queue.ts index 5a7bf6d7f7..f4bae9c634 100644 --- a/packages/world-local/src/queue.ts +++ b/packages/world-local/src/queue.ts @@ -133,6 +133,17 @@ export function createQueue(config: Partial): LocalQueue { } (async () => { + // Honor the caller's requested delivery delay before acquiring a queue + // slot. Sleeping outside the semaphore so a delayed message doesn't + // hold a worker hostage for its delay window — the worker should be + // free to process other (immediate) messages until this one is ready. + // VQS-side queues honor delaySeconds at the broker, so this brings + // world-local in line with production behavior. + if (opts?.delaySeconds && opts.delaySeconds > 0) { + const delayMs = Math.min(opts.delaySeconds * 1000, MAX_SAFE_TIMEOUT_MS); + await setTimeout(delayMs); + } + const token = semaphore.tryAcquire(); if (!token) { console.warn( From 6776b690eba85003745b3e4d5fb0b9db22ca8820 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 5 May 2026 10:17:11 +0900 Subject: [PATCH 3/4] [core] V2: unify wait+step queue dispatch in suspension processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the asymmetric "steps go to the queue, waits become a { timeoutSeconds } return value" pattern with a single Promise.all batch that queues every pending operation we are not running inline. Before this change, suspension processing had three branches that all needed to keep the wait/step asymmetry consistent: - pendingSteps.length === 0 returned { timeoutSeconds } - inlineStep + waits eagerly queued a delayed self-message AND set inlineStep to undefined (Option A) AND returned { timeoutSeconds } - inlineStep retry path returned { timeoutSeconds } if there were waits After this change, every suspension goes through one path: for non-inline pendingSteps: queue stepId message if timeoutSeconds defined: queue delayed continuation await Promise.all(dispatches) if !inlineStep: return await executeStep(inlineStep) Behaviorally, this restores inline step execution even when the suspension also has a wait (Option A's carve-out is no longer necessary): the wait timer fires in a separate function invocation on the queue, in parallel with the inline step. If the sleep wins the race, that parallel invocation observes wait_completed via the "complete elapsed waits" pass and finishes the run; if the step wins, the wait continuation fires later and no-ops on the terminal run via the existing terminal-event check. Other cleanups: - The inline-step retry path no longer needs to forward suspensionResult.timeoutSeconds — the wait timer was already enqueued as part of the unified dispatch above. - A dead post-step `if (timeoutSeconds && pendingSteps.length === 1)` block (just a comment, no body) is removed; the loop's "complete elapsed waits" pass handles the same case correctly. - Step queueing now uses a shared `traceCarrier` rather than re-serializing per step. Retry/throttle and hook-conflict paths still return { timeoutSeconds } since their semantics are "redeliver THIS message after a delay" rather than "schedule a fresh wait timer." Those can be unified in a follow-up. Test plan: - New e2e tests `sleepWinsRaceWorkflow` and `stepWinsRaceWorkflow` pass against the `nextjs-turbopack` workbench. - Event log inspection confirms wait_completed fires at t≈1s (after wait_created at t≈0s) for the sleep-wins case, and that the inline step runs only once (no duplicate step_started events that the earlier eager-queue approach produced in dev). - All 842 @workflow/core unit tests pass. - All 346 @workflow/world-local unit tests pass (with the delaySeconds support added in the previous commit). Requires the world-local delaySeconds fix in the prior commit; without it, wait continuations would fire instantly in dev and the parallel replay would re-enter handleSuspension before the wait elapsed (recoverable via existing redelivery, but inefficient). --- packages/core/src/runtime.ts | 191 +++++++++++++---------------------- 1 file changed, 70 insertions(+), 121 deletions(-) diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 55054ac9d5..22916e15af 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -858,16 +858,6 @@ export function workflowEntrypoint( const pendingSteps = suspensionResult.pendingSteps; - if (pendingSteps.length === 0) { - // No steps — only waits/hooks - if (suspensionResult.timeoutSeconds !== undefined) { - return { - timeoutSeconds: suspensionResult.timeoutSeconds, - }; - } - return; - } - // Inline execution is gated on ownership: only the // handler that actually wrote the step_created event // may run the step body inline. The world-level @@ -881,83 +871,39 @@ export function workflowEntrypoint( ); // Pick one owned step to execute inline (if any). - // The rest of the pending steps are queued below. - // - // Skip inline execution entirely when the suspension - // also has a pending wait (sleep): an inline `await - // executeStep(...)` blocks the handler for the full - // step duration, so the wait timer never has a chance - // to fire on time. That defeats `Promise.race(step, - // sleep)` semantics — if the sleep is shorter than - // the step, replay still picks the step because - // wait_completed is only created on the *next* loop - // iteration, which doesn't run until the step - // finishes. Queueing every step in this case lets - // the wait timeout drive a continuation in parallel, - // matching V1's behavior where each step ran in a - // separate function invocation. + // The rest of the pending steps, plus any wait + // timer, are queued below in a single parallel + // batch. const inlineStep: | (typeof pendingSteps)[number] - | undefined = - suspensionResult.timeoutSeconds === undefined - ? ownedPendingSteps[0] - : undefined; + | undefined = ownedPendingSteps[0]; - // Eagerly schedule the wait timer's continuation - // BEFORE we inline-execute the step. Inline `await - // executeStep(...)` blocks the handler for the full - // step duration; if the wait timer were only encoded - // as the function's return value (`{ timeoutSeconds - // }`), it would not fire until the inline step - // finished. That defeats `Promise.race(step, sleep)` - // semantics whenever the sleep is shorter than the - // step. Queueing a delayed self-message now lets the - // wait timer fire in a separate function invocation - // while the step is still running, so the parallel - // replay can observe `wait_completed` and resolve - // the race correctly. + // Unified queue dispatch for everything we are NOT + // inline-executing. Steps are queued with stepId so + // the receiver runs them; the wait timer is queued + // as a generic continuation that fires after the + // wait elapses and lets the next replay observe the + // elapsed wait via the "complete elapsed waits" + // pass. // - // Only the inline executor needs to do this — if - // we're not inlining (no owned step), the - // `{ timeoutSeconds }` return below already conveys - // the wait timer to the queue without blocking. + // Step queueing is unconditional (covers crash + // recovery: if a prior handler wrote step_created + // but crashed before queueing, a later handler will + // queue them; idempotencyKey on correlationId + // dedupes redundant queues across concurrent + // handlers). // - // Note: this scheduled continuation is in addition - // to the `{ timeoutSeconds }` return value below. - // Duplicate continuations are harmless — each replay - // observes the current event log and either advances - // the workflow, exits on a terminal event, or no-ops. - if ( - inlineStep && - suspensionResult.timeoutSeconds !== undefined - ) { - const traceCarrier = await serializeTraceCarrier(); - await queueMessage( - world, - getWorkflowQueueName(workflowName), - { - runId, - traceCarrier, - requestedAt: new Date(), - }, - { - delaySeconds: suspensionResult.timeoutSeconds, - } - ); - } - - // Queue every pending step except the one we're - // executing inline. This mirrors V1's unconditional - // enqueue-with-idempotency pattern and is what makes - // crash recovery work: if a prior handler wrote - // step_created events but crashed before enqueuing, - // a later handler (e.g., from flow-message - // redelivery or reenqueueActiveRuns) will enqueue - // the orphaned steps. In the happy path with a - // single owner, concurrent handlers' queue attempts - // dedupe on correlationId. Skipping the inline step - // avoids a queue handler racing against our own - // inline executor. + // The wait continuation is what makes + // `Promise.race(step, sleep)` behave correctly with + // inline step execution: even if the inline step + // blocks this handler for the full step duration, + // the wait timer fires in a separate function + // invocation. If the sleep wins, that parallel + // invocation completes the run; if the step wins, + // the wait continuation fires later and no-ops on + // the terminal run. + const traceCarrier = await serializeTraceCarrier(); + const dispatches: Promise[] = []; for (const step of pendingSteps) { if ( inlineStep && @@ -965,31 +911,45 @@ export function workflowEntrypoint( ) { continue; } - const traceCarrier = await serializeTraceCarrier(); - await queueMessage( - world, - getWorkflowQueueName(workflowName), - { - runId, - stepId: step.correlationId, - stepName: step.stepName, - traceCarrier, - requestedAt: new Date(), - }, - { - idempotencyKey: step.correlationId, - } + dispatches.push( + queueMessage( + world, + getWorkflowQueueName(workflowName), + { + runId, + stepId: step.correlationId, + stepName: step.stepName, + traceCarrier, + requestedAt: new Date(), + }, + { + idempotencyKey: step.correlationId, + } + ) + ); + } + if (suspensionResult.timeoutSeconds !== undefined) { + dispatches.push( + queueMessage( + world, + getWorkflowQueueName(workflowName), + { + runId, + traceCarrier, + requestedAt: new Date(), + }, + { + delaySeconds: suspensionResult.timeoutSeconds, + } + ) ); } + await Promise.all(dispatches); - // Nothing to execute inline — we already queued all - // pending steps above, exit and let the queue drive. + // Nothing to execute inline — everything has been + // queued (or no work needs scheduling). Exit and let + // the queue drive subsequent replays. if (!inlineStep) { - if (suspensionResult.timeoutSeconds !== undefined) { - return { - timeoutSeconds: suspensionResult.timeoutSeconds, - }; - } return; } @@ -1004,8 +964,12 @@ export function workflowEntrypoint( }); if (stepResult.type === 'retry') { - // Step needs retry — queue self with stepId for retry - const traceCarrier = await serializeTraceCarrier(); + // Step needs retry — queue self with stepId for retry. + // Any pending wait timer was already enqueued as part + // of the unified dispatch above, so we can return + // unconditionally here. + const retryTraceCarrier = + await serializeTraceCarrier(); await queueMessage( world, getWorkflowQueueName(workflowName), @@ -1013,19 +977,13 @@ export function workflowEntrypoint( runId, stepId: inlineStep.correlationId, stepName: inlineStep.stepName, - traceCarrier, + traceCarrier: retryTraceCarrier, requestedAt: new Date(), }, { delaySeconds: stepResult.timeoutSeconds, } ); - // If there are also waits, return their timeout - if (suspensionResult.timeoutSeconds !== undefined) { - return { - timeoutSeconds: suspensionResult.timeoutSeconds, - }; - } return; } @@ -1068,15 +1026,6 @@ export function workflowEntrypoint( ); return; } - - if ( - suspensionResult.timeoutSeconds !== undefined && - pendingSteps.length === 1 - ) { - // Only 1 step and there's also waits/hooks, - // step is done, but we need the wait timeout - // Loop back to replay which will re-evaluate - } } else { // User code error from runWorkflow — create run_failed. if (err instanceof Error) { From 3f67059bcc838bb39c42290ae888389553d21cbe Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 5 May 2026 10:18:22 +0900 Subject: [PATCH 4/4] [docs] V2 unified suspension dispatch + changeset Update the "Mixed Suspensions" section in eager-processing.mdx to describe the unified parallel-dispatch model: - All non-inline pendingSteps are queued with stepId - The wait timer (if any) is queued as a delayed continuation - All dispatched in one Promise.all batch - One owned step is then inline-executed (if any) The doc previously described Option A (the carve-out where waits forced all steps to be queued); the unified model removes that carve-out and explains why the wait continuation works in parallel with the inline step. Also notes the dependency on world-local's new delaySeconds support (landed earlier in the same PR series). Changeset bumps both @workflow/core and @workflow/world-local since both packages have user-observable behavior changes. --- .../fix-step-vs-wait-race-eager-wait.md | 3 ++- .../docs/changelog/eager-processing.mdx | 26 ++++++++++++++----- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/.changeset/fix-step-vs-wait-race-eager-wait.md b/.changeset/fix-step-vs-wait-race-eager-wait.md index b64acdbb3f..940a48bc05 100644 --- a/.changeset/fix-step-vs-wait-race-eager-wait.md +++ b/.changeset/fix-step-vs-wait-race-eager-wait.md @@ -1,5 +1,6 @@ --- "@workflow/core": patch +"@workflow/world-local": patch --- -Fix `Promise.race(step, sleep)` semantics in V2 mixed suspensions: when a workflow suspension contains both pending steps and at least one wait (sleep), the runtime now pre-schedules a delayed self-message for the wait timeout *before* inline-executing the step. Without this, an inline step longer than the sleep would block the handler past the sleep's `resumeAt`, the wait timer would never fire on time, and replay would always resolve the race with the step. Pre-scheduling the wait continuation lets the wait timer fire in a parallel function invocation while the step is still running, restoring V1's race semantics while preserving inline step execution for the step-wins case. +V2 suspension processing: unify wait + step queue dispatch into a single parallel batch. The runtime now queues every pending operation (non-inline steps + wait timer) in one `Promise.all` and then inline-executes one owned step (if any). The asymmetric `{ timeoutSeconds }` return contract for waits is dropped from suspension processing; waits become normal queue continuations with `delaySeconds`. This restores inline step execution for `Promise.race(step, sleep)` workflows without any of the carve-outs the prior fix needed: even when the inline step blocks the handler, the wait continuation fires in parallel and drives the next replay. As part of the same change, `world-local`'s queue now honors `delaySeconds` (matches `world-vercel` / `world-postgres`); without this, wait continuations would fire instantly in dev. diff --git a/docs/content/docs/changelog/eager-processing.mdx b/docs/content/docs/changelog/eager-processing.mdx index d163d13c42..c1ee43d8ac 100644 --- a/docs/content/docs/changelog/eager-processing.mdx +++ b/docs/content/docs/changelog/eager-processing.mdx @@ -230,15 +230,29 @@ When an inline step fails with retries remaining: ### Mixed Suspensions -A suspension may contain steps, hooks, and waits simultaneously. The handler creates events for all, then chooses between inline execution and queue dispatch: +A suspension may contain steps, hooks, and waits simultaneously. The handler creates events for all, then dispatches everything we are not running inline as a single parallel batch of queue messages: -- **Steps only** (no waits): one owned step is executed inline; the rest are queued. The loop continues after the inline step completes. -- **Steps + at least one wait**: every step is queued (no inline execution). The handler returns with the wait timeout. Whichever lands first — a step's continuation or the wait timer — drives the next replay. -- **Hooks / waits only**: handler returns with the wait timeout (or no timeout, for hook-only suspensions). The next continuation is driven by external resume or the wait timer. +``` +ownedPendingSteps = pendingSteps.filter(owned by this handler) +inlineStep = ownedPendingSteps[0] // optional + +dispatches = [ + ...for each non-inline pendingStep: queue stepId message (idempotency=correlationId), + ...if timeoutSeconds defined: queue delayed continuation (delaySeconds=timeoutSeconds), +] +await Promise.all(dispatches) + +if (!inlineStep) return +await executeStep(inlineStep) +``` + +The wait timer is queued as its own continuation rather than encoded in the handler's return value (`{ timeoutSeconds }`). This is what makes `Promise.race(step, sleep)` behave correctly: even when the inline step blocks the handler for the full step duration, the wait continuation fires in a separate function invocation. If the sleep wins, that parallel invocation observes `wait_completed` via the "complete elapsed waits" pass and finishes the run; if the step wins, the wait continuation fires later and no-ops on the terminal run via the existing terminal-event check. + +Step queueing remains unconditional (covers crash recovery: if a prior handler wrote `step_created` but crashed before queueing, a later handler will queue it; idempotency keys dedupe redundant queues across concurrent handlers). -The "no inline when there's a wait" carve-out is necessary to preserve `Promise.race(step, sleep)` semantics. Inline `await executeStep(...)` blocks the handler for the full step duration, and `wait_completed` events are only created on the *next* loop iteration's "complete elapsed waits" pass — so a longer-running step would always swallow the shorter sleep and `Promise.race` would resolve incorrectly. Queueing the step in this case lets the wait timer drive a continuation in parallel, matching V1's behavior where each step ran in a separate function invocation. +The retry/throttle and hook-conflict paths still return `{ timeoutSeconds }` since their semantics are "redeliver THIS message after a delay" rather than "schedule a fresh wait timer." Those can be unified in a follow-up. -Pure step suspensions (without waits) still benefit from inline execution; the carve-out only costs an extra queue roundtrip when a step and a sleep coexist. +The unified dispatch requires `world-local` to honor `delaySeconds` on the queue (added in the same PR series). Without it, the wait continuation would fire instantly in dev and trigger a spurious replay before the wait elapsed (recoverable via redelivery, but inefficient and observable as duplicate `step_started` events under contention). ### Hook Conflicts