Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/resilient-resume-hook.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@workflow/core": minor
"workflow": minor
"@workflow/world": minor
---

Make `resumeHook()` resilient to transient `hook_received` event write failures (429/5xx) by carrying the payload on the queue message for the runtime to materialize. Returned `Hook` gets a new `resilientResume: true` flag when this fallback path is taken.
72 changes: 72 additions & 0 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
healthCheck,
start as rawStart,
resumeHook,
setWorld,
} from '../src/runtime';
import {
cliCancel,
Expand Down Expand Up @@ -2297,6 +2298,77 @@ describe('e2e', () => {
}
);

// ============================================================
// Resilient resume: hook payload delivered even when hook_received fails
// ============================================================
test(
'resilient resume: hookWorkflow receives payload when hook_received returns 500',
{ timeout: 60_000 },
async () => {
const token = `resilient-resume-${Math.random().toString(36).slice(2)}`;
const customData = Math.random().toString(36).slice(2);

// Start the hook-awaiting workflow normally
const run = await start(await e2e('hookWorkflow'), [token, customData]);

// Wait for the hook to be registered
await sleep(5_000);

// Build a stubbed world whose events.create throws a 500 on the
// hook_received write, but passes all other events through. The queue
// dispatch should still succeed, and the workflow runtime should
// materialize the missing hook_received event from `hookInput` on the
// queue message (resilient resume).
const realWorld = await getWorld();
const stubbedWorld: World = {
...realWorld,
events: {
...realWorld.events,
create: (async (...args: Parameters<World['events']['create']>) => {
const [, event] = args;
if (event.eventType === 'hook_received') {
throw new WorkflowWorldError('Simulated storage outage', {
status: 500,
});
}
return realWorld.events.create(...args);
}) as World['events']['create'],
},
};

const hook = await getHookByToken(token);
expect(hook.runId).toBe(run.runId);

// Swap in the stubbed world for the duration of the resumeHook() call.
// `resumeHook` uses `getWorld()` internally (no `world` option), so we
// use `setWorld()` to replace the cached instance and restore the real
// one afterwards.
setWorld(stubbedWorld);
let resumedHook;
try {
resumedHook = await resumeHook(hook, {
message: 'via-resilient-resume',
customData: (hook.metadata as any)?.customData,
done: true,
});
} finally {
setWorld(realWorld);
}

// The direct hook_received write failed with 500, so resumeHook should
// have taken the resilient path and flagged the returned hook.
expect(resumedHook.resilientResume).toBe(true);

// Despite hook_received failing, the workflow should still receive
// the payload via the runtime's queue-payload fallback.
const returnValue = await run.returnValue;
expect(returnValue).toHaveLength(1);
expect(returnValue[0].message).toBe('via-resilient-resume');
expect(returnValue[0].customData).toBe(customData);
expect(returnValue[0].done).toBe(true);
}
);

test(
'getterStepWorkflow - getter functions with "use step" directive',
{ timeout: 60_000 },
Expand Down
81 changes: 81 additions & 0 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
EntityConflictError,
HookNotFoundError,
RUN_ERROR_CODES,
RunExpiredError,
WorkflowRuntimeError,
Expand Down Expand Up @@ -125,6 +126,7 @@ export function workflowEntrypoint(
traceCarrier: traceContext,
requestedAt,
runInput,
hookInput,
} = WorkflowInvokePayloadSchema.parse(message_);
const { requestId } = metadata;
// Extract the workflow name from the topic name
Expand Down Expand Up @@ -450,6 +452,85 @@ export function workflowEntrypoint(
}
}

// --- Resilient resume: materialize missing hook_received ---
// When `resumeHook()` fires its hook_received event write and
// queue dispatch in parallel, the event write may fail with
// a transient 429/5xx while the queue dispatch succeeds.
// In that case `hookInput` is present on the queue payload,
// carrying the dehydrated payload + a client-minted
// idempotency key (`resumeId`). If no existing hook_received
// event already carries that `resumeId`, we materialize one
// here so the workflow replay sees the payload. Mirrors
// `start()`'s resilient path for run_created → run_started.
Comment on lines +456 to +464
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment explains resilient resume in terms of resumeHook() writing the event and queueing “in parallel”, but resumeHook() is now sequential specifically to avoid the duplicate-materialization race. Please update this block to reflect the current ordering and the actual condition for hookInput being present (only when the direct write failed with a retryable error).

Suggested change
// When `resumeHook()` fires its hook_received event write and
// queue dispatch in parallel, the event write may fail with
// a transient 429/5xx while the queue dispatch succeeds.
// In that case `hookInput` is present on the queue payload,
// carrying the dehydrated payload + a client-minted
// idempotency key (`resumeId`). If no existing hook_received
// event already carries that `resumeId`, we materialize one
// here so the workflow replay sees the payload. Mirrors
// `start()`'s resilient path for run_created → run_started.
// `resumeHook()` now tries to write `hook_received` first and
// only enqueues a resume carrying `hookInput` if that direct
// write fails with a retryable error (for example, a transient
// 429/5xx). In that recovery path, `hookInput` contains the
// dehydrated payload plus the client-minted idempotency key
// (`resumeId`). If no existing `hook_received` event already
// carries that `resumeId`, we materialize one here so replay
// can see the payload while avoiding duplicate
// materialization.

Copilot uses AI. Check for mistakes.
if (hookInput) {
const alreadyMaterialized = events.some(
(e) =>
e.eventType === 'hook_received' &&
e.correlationId === hookInput.hookId &&
(e.eventData as { resumeId?: string } | undefined)
?.resumeId === hookInput.resumeId
);
if (!alreadyMaterialized) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd usually say this isn't safe (TOCTOU race): We should pass resumeId to the world and the world should enforce idempotency if possible. I know it's hard in this case, because the World might not be able to enforce uniqueness on resumeId during insert.

However, since events.create -> queue is in sequence, and we only send hookInput if the former fails, this seems like a really niche extra check that doesn't hurt, though I'd assume alreadyMaterialized to always be false (unless there's another race condition) given the above

try {
const result = await world.events.create(
runId,
{
eventType: 'hook_received',
specVersion:
workflowRun.specVersion ?? SPEC_VERSION_CURRENT,
correlationId: hookInput.hookId,
eventData: {
payload: hookInput.payload as any,
resumeId: hookInput.resumeId,
},
},
{ requestId }
);
if (result.event) {
events.push(result.event);
}
runtimeLogger.warn(
'Materialized hook_received event from queue payload (resilient resume)',
{
workflowRunId: runId,
hookId: hookInput.hookId,
resumeId: hookInput.resumeId,
}
);
span?.setAttributes({
...Attribute.HookResilientResumeMaterialized(true),
});
} catch (err) {
if (EntityConflictError.is(err)) {
// Another queue delivery already materialized this
// hook_received event — safe to ignore.
runtimeLogger.info(
'Hook resilient-resume materialization skipped (already exists)',
{
workflowRunId: runId,
hookId: hookInput.hookId,
resumeId: hookInput.resumeId,
}
);
} else if (HookNotFoundError.is(err)) {
// The hook was disposed between resumeHook() and
// this queue delivery. Drop the resume — there is
// no active awaiter to deliver it to.
runtimeLogger.warn(
'Hook was disposed before resilient resume could materialize — dropping payload',
{
workflowRunId: runId,
hookId: hookInput.hookId,
resumeId: hookInput.resumeId,
}
);
} else {
throw err;
}
}
}
}

// Resolve the encryption key for this run's deployment
const rawKey =
await world.getEncryptionKeyForRun?.(workflowRun);
Expand Down
17 changes: 17 additions & 0 deletions packages/core/src/runtime/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { ThrottleError, WorkflowWorldError } from '@workflow/errors';
import type {
Event,
HealthCheckPayload,
Expand All @@ -17,6 +18,22 @@ import { getSpanKind, trace } from '../telemetry.js';
import { version as workflowCoreVersion } from '../version.js';
import { getWorld } from './world.js';

/**
* Checks if an error from events.create() is retryable via the queue-payload
* fallback path. Used by `start()` (resilient start — run_created → run_started
* fallback) and `resumeHook()` (resilient resume — hook_received fallback
* materialized by the workflow runtime from `hookInput` on the queue message).
*
* - ThrottleError (429): rate limited, will likely succeed later
* - WorkflowWorldError with status >= 500: transient server error
*/
export function isRetryableEventError(err: unknown): boolean {
if (ThrottleError.is(err)) return true;
if (WorkflowWorldError.is(err) && err.status && err.status >= 500)
return true;
return false;
}

/** Default timeout for health checks in milliseconds */
const DEFAULT_HEALTH_CHECK_TIMEOUT = 30_000;

Expand Down
Loading
Loading