Make resumeHook() resilient to transient hook_received event write failures#1834
Make resumeHook() resilient to transient hook_received event write failures#1834TooTallNate wants to merge 1 commit intomainfrom
Conversation
…ilures
When events.create('hook_received') fails with a retryable error (429/5xx),
resumeHook() now dispatches the queue message with a `hookInput` payload
carrying the dehydrated hook payload. The workflow runtime materializes the
missing hook_received event from that payload on its next delivery, mirroring
the existing resilient-start behavior of start() / run_created / run_started.
Returned Hook carries a new `resilientResume: true` flag when the fallback
path was taken. Both write paths share a client-minted `resumeId` as an
idempotency key so the runtime can dedup if the direct write actually
committed but the client saw a transient error.
Uses a sequential write-then-queue flow (not parallel) to avoid a dedup race
on the happy path: hook_received events have no entity-level conflict guard
(unlike run_created), so a duplicate written before the direct write commits
would double-deliver the payload to the workflow.
🦋 Changeset detectedLatest commit: 63ceeb2 The changes in this PR will be included in the next version bump. This PR includes changesets to release 21 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
📊 Benchmark Results
workflow with no steps💻 Local Development
workflow with 1 step💻 Local Development
workflow with 10 sequential steps💻 Local Development
workflow with 25 sequential steps💻 Local Development
workflow with 50 sequential steps💻 Local Development
Promise.all with 10 concurrent steps💻 Local Development
Promise.all with 25 concurrent steps💻 Local Development
Promise.all with 50 concurrent steps💻 Local Development
Promise.race with 10 concurrent steps💻 Local Development
Promise.race with 25 concurrent steps💻 Local Development
Promise.race with 50 concurrent steps💻 Local Development
workflow with 10 sequential data payload steps (10KB)💻 Local Development
workflow with 25 sequential data payload steps (10KB)💻 Local Development
workflow with 50 sequential data payload steps (10KB)💻 Local Development
workflow with 10 concurrent data payload steps (10KB)💻 Local Development
workflow with 25 concurrent data payload steps (10KB)💻 Local Development
workflow with 50 concurrent data payload steps (10KB)💻 Local Development
Stream Benchmarks (includes TTFB metrics)workflow with stream💻 Local Development
stream pipeline with 5 transform steps (1MB)💻 Local Development
10 parallel streams (1MB each)💻 Local Development
fan-out fan-in 10 streams (1MB each)💻 Local Development
SummaryFastest Framework by WorldWinner determined by most benchmark wins
Fastest World by FrameworkWinner determined by most benchmark wins
Column Definitions
Worlds:
❌ Some benchmark jobs failed:
Check the workflow run for details. |
🧪 E2E Test Results❌ Some tests failed Summary
❌ Failed Tests💻 Local Development (2 failed)vite-stable (2 failed):
Details by Category❌ 💻 Local Development
✅ 📦 Local Production
✅ 🐘 Local Postgres
✅ 📋 Other
❌ Some E2E test jobs failed:
Check the workflow run for details. |
There was a problem hiding this comment.
Pull request overview
Adds resilient behavior to resumeHook() to mirror start()’s resilient-start pattern: if hook_received event creation fails transiently (429/5xx) but queue dispatch succeeds, the runtime can reconstruct (“materialize”) the missing hook_received event from data carried on the queue message.
Changes:
- Extend queue payload and hook event schemas to optionally carry
hookInput(hookId,resumeId,payload) andresumeIdfor dedup/materialization. - Update
resumeHook()to mint aresumeId, attempt a directhook_receivedwrite first, and fall back to queue-carriedhookInputonly on retryable event-write failures. - Add runtime-side materialization logic plus unit/e2e tests, and extract
isRetryableEventErrorfor shared use.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/world/src/queue.ts | Adds HookInputSchema and optional hookInput on workflow invoke payloads. |
| packages/world/src/events.ts | Adds optional resumeId to hook_received event data for dedup. |
| packages/core/src/telemetry/semantic-conventions.ts | Adds span attributes for resilient resume and materialization. |
| packages/core/src/runtime/start.ts | Switches to shared isRetryableEventError helper. |
| packages/core/src/runtime/resume-hook.ts | Implements sequential write-then-queue behavior and resilient fallback signaling. |
| packages/core/src/runtime/resume-hook.test.ts | Adds unit coverage for resilient resume behavior and ordering. |
| packages/core/src/runtime/helpers.ts | Extracts shared isRetryableEventError. |
| packages/core/src/runtime.ts | Materializes missing hook_received from hookInput during workflow execution. |
| packages/core/e2e/e2e.test.ts | Adds e2e validating payload delivery when hook_received write fails with 500. |
| .changeset/resilient-resume-hook.md | Declares minor bumps and documents the new resilient resume behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| * ## Resilient resume | ||
| * | ||
| * `resumeHook()` fires the `hook_received` event creation and the workflow | ||
| * queue dispatch in parallel. If the event creation fails with a retryable | ||
| * error (429/5xx) but the queue dispatch succeeds, the workflow runtime will | ||
| * materialize the missing `hook_received` event from the payload carried on | ||
| * the queue message — the returned hook has `resilientResume: true` to | ||
| * signal this fallback path was taken. This mirrors the resilient-start | ||
| * behavior of {@link start}. |
There was a problem hiding this comment.
The JSDoc for the “Resilient resume” section says resumeHook() fires events.create and the queue dispatch in parallel, but the implementation below is explicitly sequential (events.create first, then queue) to avoid the dedup race. Update the doc comment to match the actual behavior so callers/operators aren’t misled about ordering/latency and failure modes.
| // When `resumeHook()` fires its hook_received event write and | ||
| // queue dispatch in parallel, the event write may fail with | ||
| // a transient 429/5xx while the queue dispatch succeeds. | ||
| // In that case `hookInput` is present on the queue payload, | ||
| // carrying the dehydrated payload + a client-minted | ||
| // idempotency key (`resumeId`). If no existing hook_received | ||
| // event already carries that `resumeId`, we materialize one | ||
| // here so the workflow replay sees the payload. Mirrors | ||
| // `start()`'s resilient path for run_created → run_started. |
There was a problem hiding this comment.
This comment explains resilient resume in terms of resumeHook() writing the event and queueing “in parallel”, but resumeHook() is now sequential specifically to avoid the duplicate-materialization race. Please update this block to reflect the current ordering and the actual condition for hookInput being present (only when the direct write failed with a retryable error).
| // When `resumeHook()` fires its hook_received event write and | |
| // queue dispatch in parallel, the event write may fail with | |
| // a transient 429/5xx while the queue dispatch succeeds. | |
| // In that case `hookInput` is present on the queue payload, | |
| // carrying the dehydrated payload + a client-minted | |
| // idempotency key (`resumeId`). If no existing hook_received | |
| // event already carries that `resumeId`, we materialize one | |
| // here so the workflow replay sees the payload. Mirrors | |
| // `start()`'s resilient path for run_created → run_started. | |
| // `resumeHook()` now tries to write `hook_received` first and | |
| // only enqueues a resume carrying `hookInput` if that direct | |
| // write fails with a retryable error (for example, a transient | |
| // 429/5xx). In that recovery path, `hookInput` contains the | |
| // dehydrated payload plus the client-minted idempotency key | |
| // (`resumeId`). If no existing `hook_received` event already | |
| // carries that `resumeId`, we materialize one here so replay | |
| // can see the payload while avoiding duplicate | |
| // materialization. |
|
|
||
| describe('isRetryableEventError', () => { | ||
| // Indirectly tested via resumeHook above. The helper is also unit-covered | ||
| // via start.test.ts's resilient start suite; no duplicate tests needed. | ||
| it('is exercised via resumeHook resilient resume tests', () => { | ||
| expect(SPEC_VERSION_CURRENT).toBeGreaterThanOrEqual( | ||
| SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT | ||
| ); | ||
| }); | ||
| }); |
There was a problem hiding this comment.
The describe('isRetryableEventError') block doesn’t actually test isRetryableEventError—it only asserts a relationship between spec version constants. This is brittle (unrelated constant changes could fail the suite) and provides no coverage of the helper’s behavior. Either remove this block or replace it with direct unit tests for isRetryableEventError (e.g., in helpers.test.ts).
| describe('isRetryableEventError', () => { | |
| // Indirectly tested via resumeHook above. The helper is also unit-covered | |
| // via start.test.ts's resilient start suite; no duplicate tests needed. | |
| it('is exercised via resumeHook resilient resume tests', () => { | |
| expect(SPEC_VERSION_CURRENT).toBeGreaterThanOrEqual( | |
| SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT | |
| ); | |
| }); | |
| }); |
There was a problem hiding this comment.
LGTM, seems like a straight improvement.
I wish we had world-side constraints that allowed us to parallelize the calls like with start. Alternatively, we could completely drop the event creation and just do the queue, but that wouldn't be backwards compatible, so it'd be a bigger headache. So current PR state is fine.
| // First, attempt the direct hook_received event write. This is | ||
| // sequential (not parallel with queue dispatch) to avoid a race | ||
| // where the queue handler processes the message before the event | ||
| // write has committed, which would otherwise cause the runtime | ||
| // fallback to materialize a duplicate hook_received event. |
There was a problem hiding this comment.
Would it? We don't have this issue with lazy start, and that's what the idempotency key is for, right? Or I guess it would require World-side support and we're avoiding that to keep the scope small?
| 'Hook event creation failed, but the workflow was re-triggered via the queue. ' + | ||
| 'The hook_received event will be materialized by the runtime via the resilient resume path.', |
There was a problem hiding this comment.
A bit verbose
| 'Hook event creation failed, but the workflow was re-triggered via the queue. ' + | |
| 'The hook_received event will be materialized by the runtime via the resilient resume path.', | |
| 'hook_received event could not immediately be created, re-trying via queue.', |
| (e.eventData as { resumeId?: string } | undefined) | ||
| ?.resumeId === hookInput.resumeId | ||
| ); | ||
| if (!alreadyMaterialized) { |
There was a problem hiding this comment.
I'd usually say this isn't safe (TOCTOU race): We should pass resumeId to the world and the world should enforce idempotency if possible. I know it's hard in this case, because the World might not be able to enforce uniqueness on resumeId during insert.
However, since events.create -> queue is in sequence, and we only send hookInput if the former fails, this seems like a really niche extra check that doesn't hurt, though I'd assume alreadyMaterialized to always be false (unless there's another race condition) given the above
Summary
Brings
resumeHook()to feature-parity withstart()'s resilient-start behavior: whenevents.create('hook_received')fails with a transient 429/5xx but the queue dispatch succeeded, the workflow runtime materializes the missinghook_receivedevent from a payload carried on the queue message.hookInputfield onWorkflowInvokePayloadSchema(hookId+resumeId+payload) and optionalresumeIdonHookReceivedEventSchema.eventData. Both additive/optional — no server changes required (verified against workflow-server'shook_receivedwrite path).resumeHook()returnsResumedHook = Hook & { resilientResume?: boolean }; the flag is set when the fallback path was taken.isRetryableEventErrorinto a shared helper used by bothstart()andresumeHook().Scope
Does not extend the pattern to other primitives (steps, waits,
wakeUp(),cancelRun()) — onlystart()/resumeHook()meet all three conditions that justify this pattern: externally-initiated + paired-with-queue-dispatch + carries-data-the-runtime-cannot-reconstruct. Step/wait events either run inside durable queue handlers (retry handles them) or carry no payload (a lighter mechanism would suffice).Test plan
packages/core/src/runtime/resume-hook.test.ts(9 unit tests): happy path, all failure combinations (retryable/non-retryable events error, queue error, both), sequential-ordering check, legacy-spec fail-fast.resilient resume: hookWorkflow receives payload when hook_received returns 500inpackages/core/e2e/e2e.test.ts— stubs the world to makeevents.create('hook_received')throw 500 and verifies the workflow still receives the payload.nextjs-turbopackworkbench locally: all 13 hook-related e2e tests pass (including the new one and existing resilient-start).@workflow/world-testing's hooks test (initial parallel-dispatch approach was double-delivering payloads; switched to sequential).