Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/fix-step-vs-wait-race-eager-wait.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@workflow/core": patch
"@workflow/world-local": patch
---

V2 suspension processing: unify wait + step queue dispatch into a single parallel batch. The runtime now queues every pending operation (non-inline steps + wait timer) in one `Promise.all` and then inline-executes one owned step (if any). The asymmetric `{ timeoutSeconds }` return contract for waits is dropped from suspension processing; waits become normal queue continuations with `delaySeconds`. This restores inline step execution for `Promise.race(step, sleep)` workflows without any of the carve-outs the prior fix needed: even when the inline step blocks the handler, the wait continuation fires in parallel and drives the next replay. As part of the same change, `world-local`'s queue now honors `delaySeconds` (matches `world-vercel` / `world-postgres`); without this, wait continuations would fire instantly in dev.
26 changes: 20 additions & 6 deletions docs/content/docs/changelog/eager-processing.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -230,15 +230,29 @@ When an inline step fails with retries remaining:

### Mixed Suspensions

A suspension may contain steps, hooks, and waits simultaneously. The handler creates events for all, then chooses between inline execution and queue dispatch:
A suspension may contain steps, hooks, and waits simultaneously. The handler creates events for all, then dispatches everything we are not running inline as a single parallel batch of queue messages:

- **Steps only** (no waits): one owned step is executed inline; the rest are queued. The loop continues after the inline step completes.
- **Steps + at least one wait**: every step is queued (no inline execution). The handler returns with the wait timeout. Whichever lands first — a step's continuation or the wait timer — drives the next replay.
- **Hooks / waits only**: handler returns with the wait timeout (or no timeout, for hook-only suspensions). The next continuation is driven by external resume or the wait timer.
```
ownedPendingSteps = pendingSteps.filter(owned by this handler)
inlineStep = ownedPendingSteps[0] // optional

dispatches = [
...for each non-inline pendingStep: queue stepId message (idempotency=correlationId),
...if timeoutSeconds defined: queue delayed continuation (delaySeconds=timeoutSeconds),
]
await Promise.all(dispatches)

if (!inlineStep) return
await executeStep(inlineStep)
```

The wait timer is queued as its own continuation rather than encoded in the handler's return value (`{ timeoutSeconds }`). This is what makes `Promise.race(step, sleep)` behave correctly: even when the inline step blocks the handler for the full step duration, the wait continuation fires in a separate function invocation. If the sleep wins, that parallel invocation observes `wait_completed` via the "complete elapsed waits" pass and finishes the run; if the step wins, the wait continuation fires later and no-ops on the terminal run via the existing terminal-event check.

Step queueing remains unconditional (covers crash recovery: if a prior handler wrote `step_created` but crashed before queueing, a later handler will queue it; idempotency keys dedupe redundant queues across concurrent handlers).

The "no inline when there's a wait" carve-out is necessary to preserve `Promise.race(step, sleep)` semantics. Inline `await executeStep(...)` blocks the handler for the full step duration, and `wait_completed` events are only created on the *next* loop iteration's "complete elapsed waits" pass — so a longer-running step would always swallow the shorter sleep and `Promise.race` would resolve incorrectly. Queueing the step in this case lets the wait timer drive a continuation in parallel, matching V1's behavior where each step ran in a separate function invocation.
The retry/throttle and hook-conflict paths still return `{ timeoutSeconds }` since their semantics are "redeliver THIS message after a delay" rather than "schedule a fresh wait timer." Those can be unified in a follow-up.

Pure step suspensions (without waits) still benefit from inline execution; the carve-out only costs an extra queue roundtrip when a step and a sleep coexist.
The unified dispatch requires `world-local` to honor `delaySeconds` on the queue (added in the same PR series). Without it, the wait continuation would fire instantly in dev and trigger a spurious replay before the wait elapsed (recoverable via redelivery, but inefficient and observable as duplicate `step_started` events under contention).

### Hook Conflicts

Expand Down
152 changes: 72 additions & 80 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -858,16 +858,6 @@ export function workflowEntrypoint(

const pendingSteps = suspensionResult.pendingSteps;

if (pendingSteps.length === 0) {
// No steps — only waits/hooks
if (suspensionResult.timeoutSeconds !== undefined) {
return {
timeoutSeconds: suspensionResult.timeoutSeconds,
};
}
return;
}

// Inline execution is gated on ownership: only the
// handler that actually wrote the step_created event
// may run the step body inline. The world-level
Expand All @@ -881,72 +871,85 @@ export function workflowEntrypoint(
);

// Pick one owned step to execute inline (if any).
// The rest of the pending steps are queued below.
//
// Skip inline execution entirely when the suspension
// also has a pending wait (sleep): an inline `await
// executeStep(...)` blocks the handler for the full
// step duration, so the wait timer never has a chance
// to fire on time. That defeats `Promise.race(step,
// sleep)` semantics — if the sleep is shorter than
// the step, replay still picks the step because
// wait_completed is only created on the *next* loop
// iteration, which doesn't run until the step
// finishes. Queueing every step in this case lets
// the wait timeout drive a continuation in parallel,
// matching V1's behavior where each step ran in a
// separate function invocation.
// The rest of the pending steps, plus any wait
// timer, are queued below in a single parallel
// batch.
const inlineStep:
| (typeof pendingSteps)[number]
| undefined =
suspensionResult.timeoutSeconds === undefined
? ownedPendingSteps[0]
: undefined;
| undefined = ownedPendingSteps[0];

// Queue every pending step except the one we're
// executing inline. This mirrors V1's unconditional
// enqueue-with-idempotency pattern and is what makes
// crash recovery work: if a prior handler wrote
// step_created events but crashed before enqueuing,
// a later handler (e.g., from flow-message
// redelivery or reenqueueActiveRuns) will enqueue
// the orphaned steps. In the happy path with a
// single owner, concurrent handlers' queue attempts
// dedupe on correlationId. Skipping the inline step
// avoids a queue handler racing against our own
// inline executor.
// Unified queue dispatch for everything we are NOT
// inline-executing. Steps are queued with stepId so
// the receiver runs them; the wait timer is queued
// as a generic continuation that fires after the
// wait elapses and lets the next replay observe the
// elapsed wait via the "complete elapsed waits"
// pass.
//
// Step queueing is unconditional (covers crash
// recovery: if a prior handler wrote step_created
// but crashed before queueing, a later handler will
// queue them; idempotencyKey on correlationId
// dedupes redundant queues across concurrent
// handlers).
//
// The wait continuation is what makes
// `Promise.race(step, sleep)` behave correctly with
// inline step execution: even if the inline step
// blocks this handler for the full step duration,
// the wait timer fires in a separate function
// invocation. If the sleep wins, that parallel
// invocation completes the run; if the step wins,
// the wait continuation fires later and no-ops on
// the terminal run.
const traceCarrier = await serializeTraceCarrier();
const dispatches: Promise<unknown>[] = [];
for (const step of pendingSteps) {
if (
inlineStep &&
step.correlationId === inlineStep.correlationId
) {
continue;
}
const traceCarrier = await serializeTraceCarrier();
await queueMessage(
world,
getWorkflowQueueName(workflowName),
{
runId,
stepId: step.correlationId,
stepName: step.stepName,
traceCarrier,
requestedAt: new Date(),
},
{
idempotencyKey: step.correlationId,
}
dispatches.push(
queueMessage(
world,
getWorkflowQueueName(workflowName),
{
runId,
stepId: step.correlationId,
stepName: step.stepName,
traceCarrier,
requestedAt: new Date(),
},
{
idempotencyKey: step.correlationId,
}
)
);
}
if (suspensionResult.timeoutSeconds !== undefined) {
dispatches.push(
queueMessage(
world,
getWorkflowQueueName(workflowName),
{
runId,
traceCarrier,
requestedAt: new Date(),
},
{
delaySeconds: suspensionResult.timeoutSeconds,
}
)
);
}
await Promise.all(dispatches);

// Nothing to execute inline — we already queued all
// pending steps above, exit and let the queue drive.
// Nothing to execute inline — everything has been
// queued (or no work needs scheduling). Exit and let
// the queue drive subsequent replays.
if (!inlineStep) {
if (suspensionResult.timeoutSeconds !== undefined) {
return {
timeoutSeconds: suspensionResult.timeoutSeconds,
};
}
return;
}

Expand All @@ -961,28 +964,26 @@ export function workflowEntrypoint(
});

if (stepResult.type === 'retry') {
// Step needs retry — queue self with stepId for retry
const traceCarrier = await serializeTraceCarrier();
// Step needs retry — queue self with stepId for retry.
// Any pending wait timer was already enqueued as part
// of the unified dispatch above, so we can return
// unconditionally here.
const retryTraceCarrier =
await serializeTraceCarrier();
await queueMessage(
world,
getWorkflowQueueName(workflowName),
{
runId,
stepId: inlineStep.correlationId,
stepName: inlineStep.stepName,
traceCarrier,
traceCarrier: retryTraceCarrier,
requestedAt: new Date(),
},
{
delaySeconds: stepResult.timeoutSeconds,
}
);
// If there are also waits, return their timeout
if (suspensionResult.timeoutSeconds !== undefined) {
return {
timeoutSeconds: suspensionResult.timeoutSeconds,
};
}
return;
}

Expand Down Expand Up @@ -1025,15 +1026,6 @@ export function workflowEntrypoint(
);
return;
}

if (
suspensionResult.timeoutSeconds !== undefined &&
pendingSteps.length === 1
) {
// Only 1 step and there's also waits/hooks,
// step is done, but we need the wait timeout
// Loop back to replay which will re-evaluate
}
} else {
// User code error from runWorkflow — create run_failed.
if (err instanceof Error) {
Expand Down
83 changes: 83 additions & 0 deletions packages/world-local/src/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,86 @@ describe('queue timeout re-enqueue', () => {
expect(mockSetTimeout).not.toHaveBeenCalled();
});
});

describe('queue delaySeconds', () => {
let localQueue: ReturnType<typeof createQueue>;

beforeEach(() => {
localQueue = createQueue({ baseUrl: 'http://localhost:3000' });
});

afterEach(async () => {
await localQueue.close();
});

it('honors delaySeconds before delivering the message', async () => {
const { setTimeout: mockSetTimeout } = await import('node:timers/promises');
vi.mocked(mockSetTimeout).mockClear();

let callCount = 0;
const handler = localQueue.createQueueHandler('__wkf_step_', async () => {
callCount++;
return undefined;
});

localQueue.registerHandler('__wkf_step_', handler);

await localQueue.queue('__wkf_step_test' as any, stepPayload, {
delaySeconds: 7,
});

await vi.waitFor(() => {
expect(callCount).toBe(1);
});

// setTimeout should have been called with the delay (7s = 7000ms)
// before the message was delivered.
expect(mockSetTimeout).toHaveBeenCalledWith(7000);
});

it('does not call setTimeout for delaySeconds: 0', async () => {
const { setTimeout: mockSetTimeout } = await import('node:timers/promises');
vi.mocked(mockSetTimeout).mockClear();

let callCount = 0;
const handler = localQueue.createQueueHandler('__wkf_step_', async () => {
callCount++;
return undefined;
});

localQueue.registerHandler('__wkf_step_', handler);

await localQueue.queue('__wkf_step_test' as any, stepPayload, {
delaySeconds: 0,
});

await vi.waitFor(() => {
expect(callCount).toBe(1);
});

// setTimeout should NOT have been called for delaySeconds: 0 (the
// delay-honoring branch is gated on `delaySeconds > 0`).
expect(mockSetTimeout).not.toHaveBeenCalled();
});

it('does not call setTimeout when delaySeconds is omitted', async () => {
const { setTimeout: mockSetTimeout } = await import('node:timers/promises');
vi.mocked(mockSetTimeout).mockClear();

let callCount = 0;
const handler = localQueue.createQueueHandler('__wkf_step_', async () => {
callCount++;
return undefined;
});

localQueue.registerHandler('__wkf_step_', handler);

await localQueue.queue('__wkf_step_test' as any, stepPayload);

await vi.waitFor(() => {
expect(callCount).toBe(1);
});

expect(mockSetTimeout).not.toHaveBeenCalled();
});
});
11 changes: 11 additions & 0 deletions packages/world-local/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,17 @@ export function createQueue(config: Partial<Config>): LocalQueue {
}

(async () => {
// Honor the caller's requested delivery delay before acquiring a queue
// slot. Sleeping outside the semaphore so a delayed message doesn't
// hold a worker hostage for its delay window — the worker should be
// free to process other (immediate) messages until this one is ready.
// VQS-side queues honor delaySeconds at the broker, so this brings
// world-local in line with production behavior.
if (opts?.delaySeconds && opts.delaySeconds > 0) {
const delayMs = Math.min(opts.delaySeconds * 1000, MAX_SAFE_TIMEOUT_MS);
await setTimeout(delayMs);
}

const token = semaphore.tryAcquire();
if (!token) {
console.warn(
Expand Down
Loading