From 34a3ce5c25e4d05298afc82223073a8ab46a00fa Mon Sep 17 00:00:00 2001 From: Karthik Kalyanaraman Date: Wed, 29 Apr 2026 16:26:13 -0700 Subject: [PATCH] docs(cookbook): add Upgrading Workflows guide MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New /cookbook/advanced/upgrading-workflows page covering long-running workflows that respawn themselves on the latest deployment instead of migrating in-flight runs. Documents two methods: - Method 1: spawn the successor on every iteration via start() with deploymentId: "latest" wrapped in a "use step" helper. - Method 2: a dedicated upgradeHook raced against the work hook so upgrades are triggered explicitly via a separate endpoint — ideal for rolling out a fix to a fleet of in-flight runs after a deploy. Wires the page into cookbook-tree.ts, advanced/meta.json, and the cookbook index landing page. Made-with: Cursor --- docs/content/docs/cookbook/advanced/meta.json | 1 + .../cookbook/advanced/upgrading-workflows.mdx | 207 ++++++++++++++++++ docs/content/docs/cookbook/index.mdx | 1 + docs/lib/cookbook-tree.ts | 8 + 4 files changed, 217 insertions(+) create mode 100644 docs/content/docs/cookbook/advanced/upgrading-workflows.mdx diff --git a/docs/content/docs/cookbook/advanced/meta.json b/docs/content/docs/cookbook/advanced/meta.json index 1b8ea39d44..48610b096d 100644 --- a/docs/content/docs/cookbook/advanced/meta.json +++ b/docs/content/docs/cookbook/advanced/meta.json @@ -3,6 +3,7 @@ "pages": [ "child-workflows", "distributed-abort-controller", + "upgrading-workflows", "serializable-steps", "publishing-libraries" ] diff --git a/docs/content/docs/cookbook/advanced/upgrading-workflows.mdx b/docs/content/docs/cookbook/advanced/upgrading-workflows.mdx new file mode 100644 index 0000000000..16881aa75d --- /dev/null +++ b/docs/content/docs/cookbook/advanced/upgrading-workflows.mdx @@ -0,0 +1,207 @@ +--- +title: Upgrading Workflows +description: Identify a clean upgrade point in a long-running workflow and spawn a fresh run on the latest deployment carrying state forward. +type: guide +summary: 'Identify a clean upgrade point and hand off to a fresh run via `start(self, [state], { deploymentId: "latest" })` — either automatically on every iteration, or on demand via a dedicated upgrade hook.' +related: + - /cookbook/common-patterns/workflow-composition + - /docs/api-reference/workflow-api/start + - /docs/foundations/hooks +--- + +Workflows that block on external events for days, weeks, or months can outlive many deployments. **The key is to identify a clean upgrade point in the workflow** — a moment where it's safe to checkpoint state and start fresh — and then call [`start()`](/docs/api-reference/workflow-api/start) with `deploymentId: "latest"` to spawn a new run carrying that state forward. The current run ends; the next run begins on whatever deployment is live at that moment, so shipped fixes apply immediately without ever migrating an in-flight run. + +A clean upgrade point is any spot in the workflow where: + +- All in-progress side effects have completed (or aren't needed by the next iteration) +- The relevant state can be serialized into the workflow's input arguments +- It's natural for the workflow to "checkpoint" — typically right after handling an external event, completing a batch, or finishing a logical phase + +There are two ways to apply this: + +1. **Upgrade on every iteration** ([Method 1](#method-1-upgrade-on-every-iteration)). Each run handles a single event and unconditionally hands off to a fresh run on the latest deployment before exiting. Simple — no extra triggers — but every event pays the respawn cost. +2. **Upgrade on demand via a dedicated hook** ([Method 2](#method-2-upgrade-on-demand-via-a-dedicated-hook)). A single long-lived run handles many events in a loop and only respawns when an `upgradeHook` fires. A separate endpoint resumes that hook from your control plane (e.g. after a deploy). More control and fewer respawns, at the cost of an explicit trigger. + +### When to use each + +- **Method 1** when iterations are short and frequent, the work is cheap to checkpoint, and you want shipped fixes to apply on the very next event. Long-lived "session" workflows (subscriptions, queues, FSMs) that already process events one at a time fit this naturally. +- **Method 2** when iterations are infrequent or expensive (you don't want to respawn on every event), or when you need to roll out a fix to a fleet of in-flight runs after a deploy by fanning out to a control-plane endpoint. Also fits when "upgrade" should be an explicit operation rather than a side effect of handling each event. + +## Method 1: Upgrade on every iteration + +Each run inherits state via its argument, blocks on a hook, processes the resume, then unconditionally hands off to its successor. The `start()` call is wrapped in a `"use step"` function (required) and passes `deploymentId: "latest"` so the new run lands on the freshest code. + +```typescript lineNumbers +import { defineHook, getWorkflowMetadata } from "workflow"; +import { start } from "workflow/api"; + +declare function processItem(itemId: string): Promise; // @setup + +interface QueueState { + processed: number; + cursor: string | null; +} + +export const nextItemHook = defineHook<{ itemId: string }>(); + +async function spawnSelfOnLatest(state: QueueState): Promise { + "use step"; // [!code highlight] + + // `deploymentId: "latest"` resolves to whichever deployment is current + // when this spawn lands — NOT the deployment running this code. + const next = await start(longRunningQueue, [state], { // [!code highlight] + deploymentId: "latest", // [!code highlight] + }); // [!code highlight] + return next.runId; +} + +export async function longRunningQueue( + state: QueueState = { processed: 0, cursor: null }, +): Promise { + "use workflow"; + + const { workflowRunId } = getWorkflowMetadata(); + + // Block until something fires the hook — could be hours, days, or longer. + // Per-run hook tokens (workflowRunId) keep concurrent chains isolated. + const { itemId } = await nextItemHook.create({ token: workflowRunId }); // [!code highlight] + + await processItem(itemId); + + // Hand off to a fresh run on the latest deployment. THIS run ends here. + await spawnSelfOnLatest({ // [!code highlight] + processed: state.processed + 1, // [!code highlight] + cursor: itemId, // [!code highlight] + }); // [!code highlight] +} +``` + +### Resuming the hook + +Any server-side code can resume the currently-active iteration by calling `.resume()` with the run ID: + +```typescript +import { nextItemHook } from "@/workflows/long-running-queue"; + +export async function POST(req: Request) { + const { runId, itemId } = await req.json(); + + await nextItemHook.resume(runId, { itemId }); // [!code highlight] + + return Response.json({ success: true }); +} +``` + +The caller tracks the active `runId` (e.g. in a database, KV, or returned from the previous iteration) and updates it whenever the chain advances. + +## Method 2: Upgrade on demand via a dedicated hook + +Use a single long-running workflow that handles events in a loop. Define a second hook — `upgradeHook` — alongside the work hook, and race them. While only the work hook fires, the run keeps handling events on its current deployment. When `upgradeHook` resumes, the workflow captures current state and respawns on the latest deployment, then exits. + +```typescript lineNumbers +import { defineHook, getWorkflowMetadata } from "workflow"; +import { start } from "workflow/api"; + +declare function processItem(itemId: string): Promise; // @setup + +interface QueueState { + processed: number; + cursor: string | null; +} + +export const nextItemHook = defineHook<{ itemId: string }>(); +export const upgradeHook = defineHook<{ reason?: string }>(); // [!code highlight] + +async function spawnSelfOnLatest(state: QueueState): Promise { + "use step"; + + const next = await start(longRunningQueue, [state], { + deploymentId: "latest", + }); + return next.runId; +} + +export async function longRunningQueue( + state: QueueState = { processed: 0, cursor: null }, +): Promise { + "use workflow"; + + const { workflowRunId } = getWorkflowMetadata(); + + while (true) { + // Race a normal work event against the upgrade signal. + const event = await Promise.race([ // [!code highlight] + nextItemHook + .create({ token: workflowRunId }) + .then((payload) => ({ kind: "work" as const, payload })), + upgradeHook // [!code highlight] + .create({ token: workflowRunId }) // [!code highlight] + .then(() => ({ kind: "upgrade" as const })), // [!code highlight] + ]); + + if (event.kind === "upgrade") { // [!code highlight] + // Checkpoint current state and hand off to a fresh run + // on whatever deployment is live now. THIS run ends here. + await spawnSelfOnLatest(state); // [!code highlight] + return; // [!code highlight] + } + + await processItem(event.payload.itemId); + state = { + processed: state.processed + 1, + cursor: event.payload.itemId, + }; + } +} +``` + +### Triggering the upgrade + +Expose a separate endpoint that resumes `upgradeHook` for a given run. Call it from your deploy pipeline, an admin UI, or a fan-out script that iterates over every active run after shipping a fix. + +```typescript +import { upgradeHook } from "@/workflows/long-running-queue"; + +export async function POST(req: Request) { + const { runId, reason } = await req.json(); + + // The workflow exits its loop, captures state, and respawns + // on the latest deployment. + await upgradeHook.resume(runId, { reason }); // [!code highlight] + + return Response.json({ success: true }); +} +``` + +To upgrade a fleet of runs after a deploy, list active runs (e.g. from a tracking store) and call this endpoint for each. + +## How it works + +1. **`deploymentId: "latest"` is the upgrade knob.** Without it, the spawn pins to the current deployment. With it, the new run resolves to whatever deployment is current when the runtime picks it up — so any shipped fix applies starting from that respawn. Both methods rely on this. +2. **`start()` from a step.** [`start()`](/docs/api-reference/workflow-api/start) is not allowed directly inside `"use workflow"` functions — wrap it in a `"use step"` helper to keep the spawn deterministic across replays. +3. **State carries through the function argument.** The accumulating context flows from run N to run N+1 as a serialized argument. No external store is required for the state itself. +4. **Per-run hook tokens.** Using `workflowRunId` as the hook token scopes each iteration's wait to its own run, so multiple chains can run concurrently without interfering. +5. **Method 1 vs Method 2 is just where the spawn happens.** In Method 1 every run spawns its successor unconditionally before exiting — there is no long-lived process to migrate. In Method 2 the spawn happens only when the upgrade hook fires; otherwise the loop keeps handling events on the same run. + +## Adapting to your use case + +- **Combine with a sleep.** Race the hook against `sleep()` so iterations also tick on a timer: `Promise.race([hook, sleep("1d")])` lets the workflow advance even if no external event arrives. +- **Stateless successors.** If the next iteration doesn't need the previous state (e.g. a pure event router), call `start(longRunningQueue, [], { deploymentId: "latest" })` and skip the argument plumbing. +- **Persist state externally.** If state needs to be readable from outside the workflow (dashboards, debugging, recovery), write it to a database in a step before spawning the next run. +- **Track the active runId externally.** Whatever resumes the hook needs to know the current run. Have the spawn step write the new `runId` to a KV/database keyed by a stable session identifier so resumers always look up the latest one. + +## Caveats + +- **Backward compatibility matters.** Because the next run executes on a different deployment, the workflow's input arguments and return type must remain compatible across deployments. Adding required fields, removing fields, or changing types can cause serialization failures. See the [`deploymentId: "latest"` callout](/docs/api-reference/workflow-api/start#using-deploymentid-latest). +- **Workflow identity is the function name + file path.** Renaming the function or moving the file across a deployment changes the workflow ID — the next iteration will fail to resolve. Treat the workflow's name and location as stable interfaces. +- **There is a tiny gap between iterations.** The current run ends as soon as `start()` returns; the next run starts asynchronously. A resume that arrives in that window can fail with "hook not found." Make resumers retry, or have the API persist pending payloads and apply them once the next iteration is ready. +- **Method 2: track active runs externally.** Because Method 2's runs are long-lived, the set of in-flight runs only changes when one starts, completes, or upgrades. Persist run IDs (and clean them up on completion or upgrade) so a rollout script can fan out reliably. After resuming `upgradeHook`, also update the tracked run ID once the new run reports back, the same way you would in Method 1. +- **`start()` must be called from a step**, never directly from the workflow body. + +## Key APIs + +- [`"use workflow"`](/docs/foundations/workflows-and-steps) — marks the orchestrator function +- [`"use step"`](/docs/foundations/workflows-and-steps) — required wrapper for `start()` calls +- [`start()`](/docs/api-reference/workflow-api/start) with [`deploymentId: "latest"`](/docs/api-reference/workflow-api/start#using-deploymentid-latest) — spawn the successor on the newest deployment +- [`defineHook()`](/docs/api-reference/workflow/define-hook) — suspend the workflow until an external event resumes it +- [`getWorkflowMetadata()`](/docs/api-reference/workflow/get-workflow-metadata) — exposes `workflowRunId` for per-run hook tokens diff --git a/docs/content/docs/cookbook/index.mdx b/docs/content/docs/cookbook/index.mdx index 94cb6d9d18..ea1ecbb724 100644 --- a/docs/content/docs/cookbook/index.mdx +++ b/docs/content/docs/cookbook/index.mdx @@ -34,5 +34,6 @@ A curated collection of workflow patterns with clean, copy-paste code examples f - [**Child Workflows**](/cookbook/advanced/child-workflows) — Spawn and orchestrate child workflows from a parent - [**Distributed Abort Controller**](/cookbook/advanced/distributed-abort-controller) — Build a cross-process abort controller using workflow streams and hooks +- [**Upgrading Workflows**](/cookbook/advanced/upgrading-workflows) — Identify a clean upgrade point in a long-running workflow and spawn a fresh run on the latest deployment carrying state forward - [**Serializable Steps**](/cookbook/advanced/serializable-steps) — Wrap non-serializable third-party objects so they cross the workflow boundary - [**Publishing Libraries**](/cookbook/advanced/publishing-libraries) — Ship npm packages that export reusable workflow functions diff --git a/docs/lib/cookbook-tree.ts b/docs/lib/cookbook-tree.ts index cca5a77183..1592534074 100644 --- a/docs/lib/cookbook-tree.ts +++ b/docs/lib/cookbook-tree.ts @@ -51,6 +51,7 @@ export const slugToCategory: Record = { // Advanced 'child-workflows': 'advanced', 'distributed-abort-controller': 'advanced', + 'upgrading-workflows': 'advanced', 'serializable-steps': 'advanced', 'publishing-libraries': 'advanced', }; @@ -183,6 +184,13 @@ export const recipes: Record = { 'Build a cross-process abort controller using workflow streams and hooks to coordinate cancellation by semantic ID.', category: 'advanced', }, + 'upgrading-workflows': { + slug: 'upgrading-workflows', + title: 'Upgrading Workflows', + description: + 'Identify a clean upgrade point in a long-running workflow and spawn a fresh run on the latest deployment carrying state forward.', + category: 'advanced', + }, 'serializable-steps': { slug: 'serializable-steps', title: 'Serializable Steps',