From 44a0cc4314abe10a23c9dd4e940c10f06e880715 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 23 Feb 2026 11:09:08 +0000 Subject: [PATCH 1/7] input stream waitpoints and tests in the hello world reference project docs: version input streams under 4.4.2 and add docs site pages Move input streams documentation from rules/4.3.0 to rules/4.4.2, reverting 4.3.0 to its pre-input-streams state. Add comprehensive input streams documentation to the Mintlify docs site with pages for task-side usage, backend sending, and React hook patterns. - rules/4.4.2/realtime.md: input streams docs (moved from 4.3.0) - rules/manifest.json: add 4.4.2 version, revert 4.3.0 realtime - docs/tasks/input-streams.mdx: main input streams guide - docs/realtime/backend/input-streams.mdx: backend sending patterns - docs/realtime/react-hooks/input-streams.mdx: React UI patterns - docs/docs.json: add input streams to navigation - Cross-references from existing streams and realtime overview docs https://claude.ai/code/session_01SJHJts7r2yAxmoKLLz8vpc input stream waitpoints and tests in the hello world reference project docs --- docs/realtime/backend/input-streams.mdx | 151 +++++++++ docs/tasks/input-streams.mdx | 405 ++++++++++++++++++++++++ docs/wait-for-token.mdx | 4 + docs/wait.mdx | 11 +- 4 files changed, 566 insertions(+), 5 deletions(-) create mode 100644 docs/realtime/backend/input-streams.mdx create mode 100644 docs/tasks/input-streams.mdx diff --git a/docs/realtime/backend/input-streams.mdx b/docs/realtime/backend/input-streams.mdx new file mode 100644 index 00000000000..46261a00d15 --- /dev/null +++ b/docs/realtime/backend/input-streams.mdx @@ -0,0 +1,151 @@ +--- +title: Input Streams +sidebarTitle: Input Streams +description: Send data into running tasks from your backend code +--- + +The Input Streams API allows you to send data into running Trigger.dev tasks from your backend code. This enables bidirectional communication — while [output streams](/realtime/backend/streams) let you read data from tasks, input streams let you push data into them. + + + To learn how to receive input stream data inside your tasks, see our [Input Streams](/tasks/input-streams) documentation. For frontend applications using React, see our [React hooks input streams documentation](/realtime/react-hooks/input-streams). + + +## Sending data to a running task + +### Using defined input streams (Recommended) + +The recommended approach is to use [defined input streams](/tasks/input-streams#defining-input-streams) for full type safety: + +```ts +import { cancelSignal, approval } from "./trigger/streams"; + +// Cancel a running AI stream +await cancelSignal.send(runId, { reason: "User clicked stop" }); + +// Approve a draft +await approval.send(runId, { approved: true, reviewer: "alice@example.com" }); +``` + +The `.send()` method is fully typed — the data parameter must match the generic type you defined on the input stream. + + + `.send()` works the same regardless of how the task is listening — whether it uses `.wait()` (suspending), `.once()` (non-suspending), or `.on()` (continuous). The sender doesn't need to know how the task is consuming the data. See [Input Streams](/tasks/input-streams) for details on each receiving method. + + +## Practical examples + +### Cancel from a Next.js API route + +```ts app/api/cancel/route.ts +import { cancelStream } from "@/trigger/streams"; + +export async function POST(req: Request) { + const { runId } = await req.json(); + + await cancelStream.send(runId, { reason: "User clicked stop" }); + + return Response.json({ cancelled: true }); +} +``` + +### Approval workflow API + +```ts app/api/approve/route.ts +import { approval } from "@/trigger/streams"; + +export async function POST(req: Request) { + const { runId, approved, reviewer } = await req.json(); + + await approval.send(runId, { + approved, + reviewer, + }); + + return Response.json({ success: true }); +} +``` + +### Remix action handler + +```ts app/routes/api.approve.ts +import { json, type ActionFunctionArgs } from "@remix-run/node"; +import { approval } from "~/trigger/streams"; + +export async function action({ request }: ActionFunctionArgs) { + const formData = await request.formData(); + const runId = formData.get("runId") as string; + const approved = formData.get("approved") === "true"; + const reviewer = formData.get("reviewer") as string; + + await approval.send(runId, { approved, reviewer }); + + return json({ success: true }); +} +``` + +### Express handler + +```ts +import express from "express"; +import { cancelSignal } from "./trigger/streams"; + +const app = express(); +app.use(express.json()); + +app.post("/api/cancel", async (req, res) => { + const { runId, reason } = req.body; + + await cancelSignal.send(runId, { reason }); + + res.json({ cancelled: true }); +}); +``` + +### Sending from another task + +You can send input stream data from one task to another running task: + +```ts +import { task } from "@trigger.dev/sdk"; +import { approval } from "./streams"; + +export const reviewerTask = task({ + id: "auto-reviewer", + run: async (payload: { targetRunId: string }) => { + // Perform automated review logic... + const isApproved = await performReview(); + + // Send approval to the waiting task + await approval.send(payload.targetRunId, { + approved: isApproved, + reviewer: "auto-reviewer", + }); + }, +}); +``` + +## Error handling + +The `.send()` method will throw if: + +- The run has already completed, failed, or been canceled +- The payload exceeds the 1MB size limit +- The run ID is invalid + +```ts +import { cancelSignal } from "./trigger/streams"; + +try { + await cancelSignal.send(runId, { reason: "User clicked stop" }); +} catch (error) { + console.error("Failed to send:", error); + // Handle the error — the run may have already completed +} +``` + +## Important notes + +- Maximum payload size per `.send()` call is **1MB** +- You cannot send data to a completed, failed, or canceled run +- Data sent before a listener is registered inside the task is **buffered** and delivered when a listener attaches +- Input streams require [Realtime Streams v2](/tasks/streams#enabling-streams-v2) (enabled by default in SDK 4.1.0+) diff --git a/docs/tasks/input-streams.mdx b/docs/tasks/input-streams.mdx new file mode 100644 index 00000000000..83f7bf2f581 --- /dev/null +++ b/docs/tasks/input-streams.mdx @@ -0,0 +1,405 @@ +--- +title: "Input Streams" +sidebarTitle: "Input Streams" +description: "Send data into running tasks from your backend or frontend using input streams." +--- + +Input Streams let you send data **into** a running task from your backend or frontend. While [output streams](/tasks/streams) send data out of tasks, input streams complete the loop — enabling bidirectional communication with your running tasks. + + + Input Streams require SDK version **4.4.2 or later** and [Realtime Streams v2](/tasks/streams#enabling-streams-v2) (enabled by default in SDK 4.1.0+). + + +## Overview + +Input Streams solve three common problems: + +- **Cancelling AI streams mid-generation.** When you use AI SDK's `streamText` inside a task, the LLM keeps generating tokens until it's done — even if the user navigated away or clicked "Stop generating." With input streams, your frontend sends a cancel signal and the task aborts the LLM call immediately. + +- **Human-in-the-loop workflows.** A task generates a draft email, then pauses and waits for the user to approve or edit it before sending. + +- **Interactive agents.** An AI agent running as a task needs follow-up information from the user mid-execution — clarifying a question, choosing between options, or providing additional context. + +## Quick Start + +### 1. Define your input streams + +Define input streams in a shared file so both your task code and your backend/frontend can import them: + +```ts trigger/streams.ts +import { streams } from "@trigger.dev/sdk"; + +// Typed input stream — the generic parameter defines the shape of data sent in +export const cancelSignal = streams.input<{ reason?: string }>({ + id: "cancel", +}); + +export const approval = streams.input<{ approved: boolean; reviewer: string }>({ + id: "approval", +}); +``` + +### 2. Receive data inside your task + +```ts trigger/draft-email.ts +import { task } from "@trigger.dev/sdk"; +import { approval } from "./streams"; + +export const draftEmailTask = task({ + id: "draft-email", + run: async (payload: { to: string; subject: string }) => { + const draft = await generateDraft(payload); + + // Suspend until someone sends approval — no compute cost while waiting + const { approved, reviewer } = await approval.wait({ timeout: "7d" }).unwrap(); + + if (approved) { + await sendEmail(draft); + return { sent: true, reviewer }; + } + + return { sent: false, reviewer }; + }, +}); +``` + +### 3. Send data from your backend + +```ts +import { approval } from "./trigger/streams"; + +// Approve a draft from your API route +await approval.send(runId, { approved: true, reviewer: "alice@example.com" }); +``` + +## Defining Input Streams + +Use `streams.input()` to define a typed input stream. The generic parameter controls the shape of data that can be sent: + +```ts trigger/streams.ts +import { streams } from "@trigger.dev/sdk"; + +// Simple signal (no data needed beyond the type) +export const cancelSignal = streams.input<{ reason?: string }>({ + id: "cancel", +}); + +// Structured data +export const approval = streams.input<{ approved: boolean; reviewer: string }>({ + id: "approval", +}); + +// Complex objects +export const userResponse = streams.input<{ + action: "approve" | "reject" | "edit"; + message?: string; + edits?: Record; +}>({ + id: "user-response", +}); +``` + +Type safety is enforced through the generic parameter — both `.send()` and the receiving methods (`.wait()`, `.once()`, `.on()`, `.peek()`) share the same type. + +## Receiving Data Inside a Task + +### Choosing the right method + +| Method | Task suspended? | Compute cost while waiting | Best for | +|--------|----------------|---------------------------|----------| +| `.wait()` | **Yes** | **None** — process freed | Approval gates, human-in-the-loop, long waits | +| `.once()` | No | Full — process stays alive | Short waits, or when doing concurrent work | +| `.on(handler)` | No | Full — process stays alive | Continuous listening (cancel signals, live updates) | + +### `wait()` — Suspend until data arrives + +Suspends the task entirely, freeing compute resources. The task resumes when data arrives via `.send()`. This is the most efficient option when the task has nothing else to do while waiting. + +Returns a [`ManualWaitpointPromise`](/wait-for-token) — the same type returned by `wait.forToken()`. + +```ts +import { task } from "@trigger.dev/sdk"; +import { approval } from "./streams"; + +export const publishPost = task({ + id: "publish-post", + run: async (payload: { postId: string }) => { + const draft = await prepareDraft(payload.postId); + await notifyReviewer(draft); + + // Suspend until reviewer responds — no compute cost while waiting + const result = await approval.wait({ timeout: "7d" }); + + if (result.ok) { + if (result.output.approved) { + await publish(draft); + return { published: true, reviewer: result.output.reviewer }; + } + return { published: false, reason: result.output.comment }; + } + + // Timed out after 7 days + return { published: false, timedOut: true }; + }, +}); +``` + +Use `.unwrap()` to throw on timeout instead of checking `ok`: + +```ts +// Throws WaitpointTimeoutError if the timeout is reached +const data = await approval.wait({ timeout: "24h" }).unwrap(); +console.log(data.approved); // TData directly +``` + +#### Options + +| Option | Type | Description | +|--------|------|-------------| +| `timeout` | `string` | Maximum wait time before timeout. Period format: `"30s"`, `"5m"`, `"1h"`, `"24h"`, `"7d"`. | +| `idempotencyKey` | `string` | Reuse the same waitpoint across retries. If the task retries, it resumes the same wait instead of creating a new one. | +| `idempotencyKeyTTL` | `string` | Expiration for the idempotency key. After this period, the same key creates a new waitpoint. | +| `tags` | `string[]` | Tags for the underlying waitpoint, useful for filtering via `wait.listTokens()`. | + +#### Idempotent waits for retries + +```ts +export const processOrder = task({ + id: "process-order", + retry: { maxAttempts: 3 }, + run: async (payload: { orderId: string }) => { + await prepareOrder(payload.orderId); + + // Same idempotency key across retries — won't create duplicate waitpoints + const result = await approval.wait({ + timeout: "48h", + idempotencyKey: `order-approval-${payload.orderId}`, + }); + + if (!result.ok) { + throw new Error("Approval timed out after 48 hours"); + } + + await fulfillOrder(payload.orderId, result.output); + }, +}); +``` + +### `once()` — Wait for the next value (non-suspending) + +Blocks until data arrives, but keeps the task process alive. Useful for short waits or when doing concurrent work. + +```ts +import { task } from "@trigger.dev/sdk"; +import { approval } from "./streams"; + +export const draftEmailTask = task({ + id: "draft-email", + run: async (payload: { to: string; subject: string }) => { + const draft = await generateDraft(payload); + + // Task pauses here until someone sends approval + const result = await approval.once(); + + if (result.approved) { + await sendEmail(draft); + return { sent: true, reviewer: result.reviewer }; + } + + return { sent: false, reviewer: result.reviewer }; + }, +}); +``` + +`once()` accepts options for timeouts and abort signals: + +```ts +// With a timeout — rejects if no data arrives within 5 minutes +const result = await approval.once({ timeoutMs: 300_000 }); + +// With an abort signal +const controller = new AbortController(); +const result = await approval.once({ signal: controller.signal }); +``` + +### `on()` — Listen for every value + +Registers a persistent handler that fires on every piece of data. Returns a subscription with an `.off()` method. + +```ts +import { task } from "@trigger.dev/sdk"; +import { cancelSignal } from "./streams"; + +export const streamingTask = task({ + id: "streaming-task", + run: async (payload: { prompt: string }) => { + const controller = new AbortController(); + + // Listen for cancel signals + const sub = cancelSignal.on((data) => { + console.log("Cancelled:", data.reason); + controller.abort(); + }); + + try { + const result = await streamText({ + model: openai("gpt-4o"), + prompt: payload.prompt, + abortSignal: controller.signal, + }); + return result; + } finally { + sub.off(); // Always clean up the listener + } + }, +}); +``` + + + Always call `.off()` in a `finally` block to avoid memory leaks. The subscription stays active for the lifetime of the run if not cleaned up. + + +### `peek()` — Non-blocking check + +Returns the most recent buffered value without waiting, or `undefined` if nothing has been received yet. + +```ts +const latest = cancelSignal.peek(); +if (latest) { + // A cancel was already sent before we checked +} +``` + +## Sending Data to a Running Task + +Use `.send()` from your backend to push data into a running task. See the [backend input streams guide](/realtime/backend/input-streams) for detailed examples including API route patterns. + +```ts +import { cancelSignal, approval } from "./trigger/streams"; + +// Cancel a running AI stream +await cancelSignal.send(runId, { reason: "User clicked stop" }); + +// Approve a draft +await approval.send(runId, { approved: true, reviewer: "alice@example.com" }); +``` + +## Complete Example: Cancellable AI Streaming + +This is the most common use case — streaming an AI response while allowing the user to cancel mid-generation. + +### Define the streams + +```ts trigger/streams.ts +import { streams } from "@trigger.dev/sdk"; + +export const aiOutput = streams.define({ id: "ai" }); +export const cancelStream = streams.input<{ reason?: string }>({ id: "cancel" }); +``` + +### Create the task + +```ts trigger/ai-task.ts +import { task } from "@trigger.dev/sdk"; +import { streamText } from "ai"; +import { openai } from "@ai-sdk/openai"; +import { aiOutput, cancelStream } from "./streams"; + +export const aiTask = task({ + id: "ai-chat", + run: async (payload: { prompt: string }) => { + const controller = new AbortController(); + + // If the user cancels, abort the LLM call + const sub = cancelStream.on(() => { + controller.abort(); + }); + + try { + const result = streamText({ + model: openai("gpt-4o"), + prompt: payload.prompt, + abortSignal: controller.signal, + }); + + // Stream output to the frontend in real-time + const { waitUntilComplete } = aiOutput.pipe(result.textStream); + await waitUntilComplete(); + + return { text: await result.text }; + } finally { + sub.off(); + } + }, +}); +``` + +### Cancel from a backend API route + +```ts app/api/cancel/route.ts +import { cancelStream } from "./trigger/streams"; + +export async function POST(req: Request) { + const { runId } = await req.json(); + await cancelStream.send(runId, { reason: "User clicked stop" }); + return Response.json({ cancelled: true }); +} +``` + +### Display in your frontend + +```tsx components/ai-chat.tsx +"use client"; + +import { useRealtimeStream } from "@trigger.dev/react-hooks"; +import { aiOutput } from "@/trigger/streams"; + +export function AIChat({ + runId, + accessToken, +}: { + runId: string; + accessToken: string; +}) { + const { parts, error } = useRealtimeStream(aiOutput, runId, { + accessToken, + timeoutInSeconds: 300, + }); + + const handleCancel = async () => { + await fetch("/api/cancel", { + method: "POST", + body: JSON.stringify({ runId }), + }); + }; + + if (error) return
Error: {error.message}
; + if (!parts) return
Loading...
; + + return ( +
+
{parts.join("")}
+ +
+ ); +} +``` + +## Important Notes + +- Input streams require [Realtime Streams v2](/tasks/streams#enabling-streams-v2) (enabled by default in SDK 4.1.0+). If you're on an older version, calling `.on()` or `.once()` will throw with instructions to enable it. +- You cannot send data to a completed, failed, or canceled run. +- Maximum payload size per `.send()` call is 1MB. +- Data sent before any listener is registered is buffered and delivered when a listener attaches (for `.once()` and `.on()`). +- `.wait()` handles the buffering race automatically — if data was sent before `.wait()` is called, it will still be received. +- Type safety is enforced through the generic parameter on `streams.input()`. + +## Best Practices + +1. **Use `.wait()` for long waits**: If the task has nothing else to do until data arrives (approval gates, human-in-the-loop), use `.wait()` to free compute resources. Use `.once()` only for short waits or when doing concurrent work. +2. **Always clean up listeners**: Call `.off()` in a `finally` block when using `.on()` to prevent memory leaks +3. **Use timeouts**: Both `.wait()` and `.once()` support timeouts — always set one to avoid indefinite hangs +4. **Use idempotency keys with `.wait()`**: If your task has retries enabled, pass an `idempotencyKey` to `.wait()` so retries resume the same wait instead of creating a new one +5. **Define streams in shared files**: Keep your `streams.input()` definitions in a shared location (like `trigger/streams.ts`) so both task code and backend/frontend can import them with full type safety +6. **Combine with output streams**: Input streams pair naturally with [output streams](/tasks/streams) for full bidirectional communication — stream AI output to the frontend while accepting cancel signals from it +7. **Use descriptive stream IDs**: Choose clear IDs like `"cancel"`, `"approval"`, or `"user-response"` instead of generic names diff --git a/docs/wait-for-token.mdx b/docs/wait-for-token.mdx index 98d7ec96e7d..2b8f56a003e 100644 --- a/docs/wait-for-token.mdx +++ b/docs/wait-for-token.mdx @@ -7,6 +7,10 @@ Waitpoint tokens pause task runs until you complete the token. They're commonly You can complete a token using the SDK or by making a POST request to the token's URL. + + If you're waiting for data from an [input stream](/tasks/input-streams), use [`inputStream.wait()`](/tasks/input-streams#wait--suspend-until-data-arrives) instead — it uses waitpoint tokens internally but provides a simpler API with full type safety from your stream definition. + + ## Usage To get started using wait tokens, you need to first create a token using the `wait.createToken` function: diff --git a/docs/wait.mdx b/docs/wait.mdx index cfe5b2385bf..06956e0f660 100644 --- a/docs/wait.mdx +++ b/docs/wait.mdx @@ -10,8 +10,9 @@ Waiting allows you to write complex tasks as a set of async code, without having -| Function | What it does | -| :--------------------------------- | :----------------------------------------------- | -| [wait.for()](/wait-for) | Waits for a specific period of time, e.g. 1 day. | -| [wait.until()](/wait-until) | Waits until the provided `Date`. | -| [wait.forToken()](/wait-for-token) | Pauses runs until a token is completed. | +| Function | What it does | +| :------------------------------------------------ | :--------------------------------------------------------------- | +| [wait.for()](/wait-for) | Waits for a specific period of time, e.g. 1 day. | +| [wait.until()](/wait-until) | Waits until the provided `Date`. | +| [wait.forToken()](/wait-for-token) | Pauses runs until a token is completed. | +| [inputStream.wait()](/tasks/input-streams#wait--suspend-until-data-arrives) | Pauses runs until data arrives on an input stream. | From bac2c7d2e43c227cd65637abeb3676ccccb9c603 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 22 Feb 2026 10:02:50 +0000 Subject: [PATCH 2/7] docs: version input streams under 4.4.2 and add docs site pages Move input streams documentation from rules/4.3.0 to rules/4.4.2, reverting 4.3.0 to its pre-input-streams state. Add comprehensive input streams documentation to the Mintlify docs site with pages for task-side usage, backend sending, and React hook patterns. - rules/4.4.2/realtime.md: input streams docs (moved from 4.3.0) - rules/manifest.json: add 4.4.2 version, revert 4.3.0 realtime - docs/tasks/input-streams.mdx: main input streams guide - docs/realtime/backend/input-streams.mdx: backend sending patterns - docs/realtime/react-hooks/input-streams.mdx: React UI patterns - docs/docs.json: add input streams to navigation - Cross-references from existing streams and realtime overview docs https://claude.ai/code/session_01SJHJts7r2yAxmoKLLz8vpc --- docs/realtime/backend/input-streams.mdx | 151 ++++++++++++++++++++++++ 1 file changed, 151 insertions(+) diff --git a/docs/realtime/backend/input-streams.mdx b/docs/realtime/backend/input-streams.mdx index 46261a00d15..b1a5b4537fd 100644 --- a/docs/realtime/backend/input-streams.mdx +++ b/docs/realtime/backend/input-streams.mdx @@ -1,3 +1,4 @@ +<<<<<<< ours --- title: Input Streams sidebarTitle: Input Streams @@ -149,3 +150,153 @@ try { - You cannot send data to a completed, failed, or canceled run - Data sent before a listener is registered inside the task is **buffered** and delivered when a listener attaches - Input streams require [Realtime Streams v2](/tasks/streams#enabling-streams-v2) (enabled by default in SDK 4.1.0+) +||||||| +======= +--- +title: Input Streams +sidebarTitle: Input Streams +description: Send data into running tasks from your backend code +--- + +The Input Streams API allows you to send data into running Trigger.dev tasks from your backend code. This enables bidirectional communication — while [output streams](/realtime/backend/streams) let you read data from tasks, input streams let you push data into them. + + + To learn how to receive input stream data inside your tasks, see our [Input Streams](/tasks/input-streams) documentation. For frontend applications using React, see our [React hooks input streams documentation](/realtime/react-hooks/input-streams). + + +## Sending data to a running task + +### Using defined input streams (Recommended) + +The recommended approach is to use [defined input streams](/tasks/input-streams#defining-input-streams) for full type safety: + +```ts +import { cancelSignal, approval } from "./trigger/streams"; + +// Cancel a running AI stream +await cancelSignal.send(runId, { reason: "User clicked stop" }); + +// Approve a draft +await approval.send(runId, { approved: true, reviewer: "alice@example.com" }); +``` + +The `.send()` method is fully typed — the data parameter must match the generic type you defined on the input stream. + +## Practical examples + +### Cancel from a Next.js API route + +```ts app/api/cancel/route.ts +import { cancelStream } from "@/trigger/streams"; + +export async function POST(req: Request) { + const { runId } = await req.json(); + + await cancelStream.send(runId, { reason: "User clicked stop" }); + + return Response.json({ cancelled: true }); +} +``` + +### Approval workflow API + +```ts app/api/approve/route.ts +import { approval } from "@/trigger/streams"; + +export async function POST(req: Request) { + const { runId, approved, reviewer } = await req.json(); + + await approval.send(runId, { + approved, + reviewer, + }); + + return Response.json({ success: true }); +} +``` + +### Remix action handler + +```ts app/routes/api.approve.ts +import { json, type ActionFunctionArgs } from "@remix-run/node"; +import { approval } from "~/trigger/streams"; + +export async function action({ request }: ActionFunctionArgs) { + const formData = await request.formData(); + const runId = formData.get("runId") as string; + const approved = formData.get("approved") === "true"; + const reviewer = formData.get("reviewer") as string; + + await approval.send(runId, { approved, reviewer }); + + return json({ success: true }); +} +``` + +### Express handler + +```ts +import express from "express"; +import { cancelSignal } from "./trigger/streams"; + +const app = express(); +app.use(express.json()); + +app.post("/api/cancel", async (req, res) => { + const { runId, reason } = req.body; + + await cancelSignal.send(runId, { reason }); + + res.json({ cancelled: true }); +}); +``` + +### Sending from another task + +You can send input stream data from one task to another running task: + +```ts +import { task } from "@trigger.dev/sdk"; +import { approval } from "./streams"; + +export const reviewerTask = task({ + id: "auto-reviewer", + run: async (payload: { targetRunId: string }) => { + // Perform automated review logic... + const isApproved = await performReview(); + + // Send approval to the waiting task + await approval.send(payload.targetRunId, { + approved: isApproved, + reviewer: "auto-reviewer", + }); + }, +}); +``` + +## Error handling + +The `.send()` method will throw if: + +- The run has already completed, failed, or been canceled +- The payload exceeds the 1MB size limit +- The run ID is invalid + +```ts +import { cancelSignal } from "./trigger/streams"; + +try { + await cancelSignal.send(runId, { reason: "User clicked stop" }); +} catch (error) { + console.error("Failed to send:", error); + // Handle the error — the run may have already completed +} +``` + +## Important notes + +- Maximum payload size per `.send()` call is **1MB** +- You cannot send data to a completed, failed, or canceled run +- Data sent before a listener is registered inside the task is **buffered** and delivered when a listener attaches +- Input streams require [Realtime Streams v2](/tasks/streams#enabling-streams-v2) (enabled by default in SDK 4.1.0+) +>>>>>>> theirs From 1b6ebc2a38f722b0bec72b26ff6b7b6c9df59177 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 27 Feb 2026 14:27:52 +0000 Subject: [PATCH 3/7] fix conflict --- docs/realtime/backend/input-streams.mdx | 160 ++---------------------- 1 file changed, 7 insertions(+), 153 deletions(-) diff --git a/docs/realtime/backend/input-streams.mdx b/docs/realtime/backend/input-streams.mdx index b1a5b4537fd..0b2718bab0d 100644 --- a/docs/realtime/backend/input-streams.mdx +++ b/docs/realtime/backend/input-streams.mdx @@ -1,4 +1,3 @@ -<<<<<<< ours --- title: Input Streams sidebarTitle: Input Streams @@ -8,7 +7,9 @@ description: Send data into running tasks from your backend code The Input Streams API allows you to send data into running Trigger.dev tasks from your backend code. This enables bidirectional communication — while [output streams](/realtime/backend/streams) let you read data from tasks, input streams let you push data into them. - To learn how to receive input stream data inside your tasks, see our [Input Streams](/tasks/input-streams) documentation. For frontend applications using React, see our [React hooks input streams documentation](/realtime/react-hooks/input-streams). + To learn how to receive input stream data inside your tasks, see our [Input + Streams](/tasks/input-streams) documentation. For frontend applications using React, see our + [React hooks input streams documentation](/realtime/react-hooks/input-streams). ## Sending data to a running task @@ -30,7 +31,10 @@ await approval.send(runId, { approved: true, reviewer: "alice@example.com" }); The `.send()` method is fully typed — the data parameter must match the generic type you defined on the input stream. - `.send()` works the same regardless of how the task is listening — whether it uses `.wait()` (suspending), `.once()` (non-suspending), or `.on()` (continuous). The sender doesn't need to know how the task is consuming the data. See [Input Streams](/tasks/input-streams) for details on each receiving method. + `.send()` works the same regardless of how the task is listening — whether it uses `.wait()` + (suspending), `.once()` (non-suspending), or `.on()` (continuous). The sender doesn't need to know + how the task is consuming the data. See [Input Streams](/tasks/input-streams) for details on each + receiving method. ## Practical examples @@ -150,153 +154,3 @@ try { - You cannot send data to a completed, failed, or canceled run - Data sent before a listener is registered inside the task is **buffered** and delivered when a listener attaches - Input streams require [Realtime Streams v2](/tasks/streams#enabling-streams-v2) (enabled by default in SDK 4.1.0+) -||||||| -======= ---- -title: Input Streams -sidebarTitle: Input Streams -description: Send data into running tasks from your backend code ---- - -The Input Streams API allows you to send data into running Trigger.dev tasks from your backend code. This enables bidirectional communication — while [output streams](/realtime/backend/streams) let you read data from tasks, input streams let you push data into them. - - - To learn how to receive input stream data inside your tasks, see our [Input Streams](/tasks/input-streams) documentation. For frontend applications using React, see our [React hooks input streams documentation](/realtime/react-hooks/input-streams). - - -## Sending data to a running task - -### Using defined input streams (Recommended) - -The recommended approach is to use [defined input streams](/tasks/input-streams#defining-input-streams) for full type safety: - -```ts -import { cancelSignal, approval } from "./trigger/streams"; - -// Cancel a running AI stream -await cancelSignal.send(runId, { reason: "User clicked stop" }); - -// Approve a draft -await approval.send(runId, { approved: true, reviewer: "alice@example.com" }); -``` - -The `.send()` method is fully typed — the data parameter must match the generic type you defined on the input stream. - -## Practical examples - -### Cancel from a Next.js API route - -```ts app/api/cancel/route.ts -import { cancelStream } from "@/trigger/streams"; - -export async function POST(req: Request) { - const { runId } = await req.json(); - - await cancelStream.send(runId, { reason: "User clicked stop" }); - - return Response.json({ cancelled: true }); -} -``` - -### Approval workflow API - -```ts app/api/approve/route.ts -import { approval } from "@/trigger/streams"; - -export async function POST(req: Request) { - const { runId, approved, reviewer } = await req.json(); - - await approval.send(runId, { - approved, - reviewer, - }); - - return Response.json({ success: true }); -} -``` - -### Remix action handler - -```ts app/routes/api.approve.ts -import { json, type ActionFunctionArgs } from "@remix-run/node"; -import { approval } from "~/trigger/streams"; - -export async function action({ request }: ActionFunctionArgs) { - const formData = await request.formData(); - const runId = formData.get("runId") as string; - const approved = formData.get("approved") === "true"; - const reviewer = formData.get("reviewer") as string; - - await approval.send(runId, { approved, reviewer }); - - return json({ success: true }); -} -``` - -### Express handler - -```ts -import express from "express"; -import { cancelSignal } from "./trigger/streams"; - -const app = express(); -app.use(express.json()); - -app.post("/api/cancel", async (req, res) => { - const { runId, reason } = req.body; - - await cancelSignal.send(runId, { reason }); - - res.json({ cancelled: true }); -}); -``` - -### Sending from another task - -You can send input stream data from one task to another running task: - -```ts -import { task } from "@trigger.dev/sdk"; -import { approval } from "./streams"; - -export const reviewerTask = task({ - id: "auto-reviewer", - run: async (payload: { targetRunId: string }) => { - // Perform automated review logic... - const isApproved = await performReview(); - - // Send approval to the waiting task - await approval.send(payload.targetRunId, { - approved: isApproved, - reviewer: "auto-reviewer", - }); - }, -}); -``` - -## Error handling - -The `.send()` method will throw if: - -- The run has already completed, failed, or been canceled -- The payload exceeds the 1MB size limit -- The run ID is invalid - -```ts -import { cancelSignal } from "./trigger/streams"; - -try { - await cancelSignal.send(runId, { reason: "User clicked stop" }); -} catch (error) { - console.error("Failed to send:", error); - // Handle the error — the run may have already completed -} -``` - -## Important notes - -- Maximum payload size per `.send()` call is **1MB** -- You cannot send data to a completed, failed, or canceled run -- Data sent before a listener is registered inside the task is **buffered** and delivered when a listener attaches -- Input streams require [Realtime Streams v2](/tasks/streams#enabling-streams-v2) (enabled by default in SDK 4.1.0+) ->>>>>>> theirs From ba7d95614f7f0845b0dd089d8f87f3762177ebb7 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 27 Feb 2026 19:46:29 +0000 Subject: [PATCH 4/7] document changes to .once() --- docs/tasks/input-streams.mdx | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/docs/tasks/input-streams.mdx b/docs/tasks/input-streams.mdx index 83f7bf2f581..f7d2c0b9091 100644 --- a/docs/tasks/input-streams.mdx +++ b/docs/tasks/input-streams.mdx @@ -108,7 +108,7 @@ Type safety is enforced through the generic parameter — both `.send()` and the | Method | Task suspended? | Compute cost while waiting | Best for | |--------|----------------|---------------------------|----------| | `.wait()` | **Yes** | **None** — process freed | Approval gates, human-in-the-loop, long waits | -| `.once()` | No | Full — process stays alive | Short waits, or when doing concurrent work | +| `.once()` | No | Full — process stays alive | Short waits, concurrent work. Returns result object with `.unwrap()` | | `.on(handler)` | No | Full — process stays alive | Continuous listening (cancel signals, live updates) | ### `wait()` — Suspend until data arrives @@ -189,6 +189,8 @@ export const processOrder = task({ Blocks until data arrives, but keeps the task process alive. Useful for short waits or when doing concurrent work. +Returns an `InputStreamOncePromise` — similar to `ManualWaitpointPromise` from `.wait()`. Await it for a result object, or chain `.unwrap()` to get the data directly. + ```ts import { task } from "@trigger.dev/sdk"; import { approval } from "./streams"; @@ -198,26 +200,36 @@ export const draftEmailTask = task({ run: async (payload: { to: string; subject: string }) => { const draft = await generateDraft(payload); - // Task pauses here until someone sends approval - const result = await approval.once(); + // Task pauses here until someone sends approval (with a 5-minute timeout) + const result = await approval.once({ timeoutMs: 300_000 }); - if (result.approved) { + if (!result.ok) { + // Timed out — result.error is an InputStreamTimeoutError + return { sent: false, timedOut: true }; + } + + if (result.output.approved) { await sendEmail(draft); - return { sent: true, reviewer: result.reviewer }; + return { sent: true, reviewer: result.output.reviewer }; } - return { sent: false, reviewer: result.reviewer }; + return { sent: false, reviewer: result.output.reviewer }; }, }); ``` -`once()` accepts options for timeouts and abort signals: +Use `.unwrap()` to throw on timeout instead of checking `ok`: ```ts -// With a timeout — rejects if no data arrives within 5 minutes -const result = await approval.once({ timeoutMs: 300_000 }); +// Throws InputStreamTimeoutError if no data arrives within 5 minutes +const data = await approval.once({ timeoutMs: 300_000 }).unwrap(); +console.log(data.approved); // TData directly +``` -// With an abort signal +`once()` also accepts an abort signal for cancellation: + +```ts +// With an abort signal — rejects the promise when aborted const controller = new AbortController(); const result = await approval.once({ signal: controller.signal }); ``` From 216cc174808a9b05a4d6da70e79fb8b089bd0d36 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 27 Feb 2026 21:31:33 +0000 Subject: [PATCH 5/7] a couple of docs fixes --- docs/realtime/backend/input-streams.mdx | 3 +-- docs/tasks/input-streams.mdx | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/docs/realtime/backend/input-streams.mdx b/docs/realtime/backend/input-streams.mdx index 0b2718bab0d..6a1a8637602 100644 --- a/docs/realtime/backend/input-streams.mdx +++ b/docs/realtime/backend/input-streams.mdx @@ -8,8 +8,7 @@ The Input Streams API allows you to send data into running Trigger.dev tasks fro To learn how to receive input stream data inside your tasks, see our [Input - Streams](/tasks/input-streams) documentation. For frontend applications using React, see our - [React hooks input streams documentation](/realtime/react-hooks/input-streams). + Streams](/tasks/input-streams) documentation. ## Sending data to a running task diff --git a/docs/tasks/input-streams.mdx b/docs/tasks/input-streams.mdx index f7d2c0b9091..211f78bb3f4 100644 --- a/docs/tasks/input-streams.mdx +++ b/docs/tasks/input-streams.mdx @@ -135,7 +135,7 @@ export const publishPost = task({ await publish(draft); return { published: true, reviewer: result.output.reviewer }; } - return { published: false, reason: result.output.comment }; + return { published: false, reviewer: result.output.reviewer }; } // Timed out after 7 days @@ -349,7 +349,7 @@ export const aiTask = task({ ### Cancel from a backend API route ```ts app/api/cancel/route.ts -import { cancelStream } from "./trigger/streams"; +import { cancelStream } from "@/trigger/streams"; export async function POST(req: Request) { const { runId } = await req.json(); From f7876f7571fe89ed78110b236f390f7b0608b930 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 27 Feb 2026 22:19:49 +0000 Subject: [PATCH 6/7] notes about off() not being necessary anymore --- docs/tasks/input-streams.mdx | 52 ++++++++++++++---------------------- 1 file changed, 20 insertions(+), 32 deletions(-) diff --git a/docs/tasks/input-streams.mdx b/docs/tasks/input-streams.mdx index 211f78bb3f4..0e123e782b1 100644 --- a/docs/tasks/input-streams.mdx +++ b/docs/tasks/input-streams.mdx @@ -236,7 +236,7 @@ const result = await approval.once({ signal: controller.signal }); ### `on()` — Listen for every value -Registers a persistent handler that fires on every piece of data. Returns a subscription with an `.off()` method. +Registers a persistent handler that fires on every piece of data. Handlers are automatically cleaned up when the task run completes, so you don't need to manually unsubscribe. If you need to stop listening early (before the run ends), call `.off()` on the returned subscription. ```ts import { task } from "@trigger.dev/sdk"; @@ -247,30 +247,22 @@ export const streamingTask = task({ run: async (payload: { prompt: string }) => { const controller = new AbortController(); - // Listen for cancel signals - const sub = cancelSignal.on((data) => { + // Listen for cancel signals — automatically cleaned up when run completes + cancelSignal.on((data) => { console.log("Cancelled:", data.reason); controller.abort(); }); - try { - const result = await streamText({ - model: openai("gpt-4o"), - prompt: payload.prompt, - abortSignal: controller.signal, - }); - return result; - } finally { - sub.off(); // Always clean up the listener - } + const result = await streamText({ + model: openai("gpt-4o"), + prompt: payload.prompt, + abortSignal: controller.signal, + }); + return result; }, }); ``` - - Always call `.off()` in a `finally` block to avoid memory leaks. The subscription stays active for the lifetime of the run if not cleaned up. - - ### `peek()` — Non-blocking check Returns the most recent buffered value without waiting, or `undefined` if nothing has been received yet. @@ -323,25 +315,21 @@ export const aiTask = task({ const controller = new AbortController(); // If the user cancels, abort the LLM call - const sub = cancelStream.on(() => { + cancelStream.on(() => { controller.abort(); }); - try { - const result = streamText({ - model: openai("gpt-4o"), - prompt: payload.prompt, - abortSignal: controller.signal, - }); + const result = streamText({ + model: openai("gpt-4o"), + prompt: payload.prompt, + abortSignal: controller.signal, + }); - // Stream output to the frontend in real-time - const { waitUntilComplete } = aiOutput.pipe(result.textStream); - await waitUntilComplete(); + // Stream output to the frontend in real-time + const { waitUntilComplete } = aiOutput.pipe(result.textStream); + await waitUntilComplete(); - return { text: await result.text }; - } finally { - sub.off(); - } + return { text: await result.text }; }, }); ``` @@ -409,7 +397,7 @@ export function AIChat({ ## Best Practices 1. **Use `.wait()` for long waits**: If the task has nothing else to do until data arrives (approval gates, human-in-the-loop), use `.wait()` to free compute resources. Use `.once()` only for short waits or when doing concurrent work. -2. **Always clean up listeners**: Call `.off()` in a `finally` block when using `.on()` to prevent memory leaks +2. **Listeners auto-cleanup**: `.on()` handlers are automatically cleaned up when the task run completes. Call `.off()` only if you need to stop listening early 3. **Use timeouts**: Both `.wait()` and `.once()` support timeouts — always set one to avoid indefinite hangs 4. **Use idempotency keys with `.wait()`**: If your task has retries enabled, pass an `idempotencyKey` to `.wait()` so retries resume the same wait instead of creating a new one 5. **Define streams in shared files**: Keep your `streams.input()` definitions in a shared location (like `trigger/streams.ts`) so both task code and backend/frontend can import them with full type safety From 2f0d2e70852e74a0f1aa227cc9c9bcb43b4edadd Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 28 Feb 2026 09:30:34 +0000 Subject: [PATCH 7/7] Proper docs pass, v2 streams are now the default --- docs/realtime/backend/input-streams.mdx | 10 +- docs/realtime/react-hooks/streams.mdx | 65 +++- docs/tasks/input-streams.mdx | 405 ------------------------ docs/tasks/streams.mdx | 217 ++++++++++--- docs/wait-for-token.mdx | 2 +- docs/wait.mdx | 2 +- 6 files changed, 251 insertions(+), 450 deletions(-) delete mode 100644 docs/tasks/input-streams.mdx diff --git a/docs/realtime/backend/input-streams.mdx b/docs/realtime/backend/input-streams.mdx index 6a1a8637602..1224e24244e 100644 --- a/docs/realtime/backend/input-streams.mdx +++ b/docs/realtime/backend/input-streams.mdx @@ -7,15 +7,15 @@ description: Send data into running tasks from your backend code The Input Streams API allows you to send data into running Trigger.dev tasks from your backend code. This enables bidirectional communication — while [output streams](/realtime/backend/streams) let you read data from tasks, input streams let you push data into them. - To learn how to receive input stream data inside your tasks, see our [Input - Streams](/tasks/input-streams) documentation. + To learn how to receive input stream data inside your tasks, see [Input + Streams](/tasks/streams#input-streams) in the Streams doc. ## Sending data to a running task ### Using defined input streams (Recommended) -The recommended approach is to use [defined input streams](/tasks/input-streams#defining-input-streams) for full type safety: +The recommended approach is to use [defined input streams](/tasks/streams#defining-input-streams) for full type safety: ```ts import { cancelSignal, approval } from "./trigger/streams"; @@ -32,7 +32,7 @@ The `.send()` method is fully typed — the data parameter must match the generi `.send()` works the same regardless of how the task is listening — whether it uses `.wait()` (suspending), `.once()` (non-suspending), or `.on()` (continuous). The sender doesn't need to know - how the task is consuming the data. See [Input Streams](/tasks/input-streams) for details on each + how the task is consuming the data. See [Input Streams](/tasks/streams#input-streams) for details on each receiving method. @@ -152,4 +152,4 @@ try { - Maximum payload size per `.send()` call is **1MB** - You cannot send data to a completed, failed, or canceled run - Data sent before a listener is registered inside the task is **buffered** and delivered when a listener attaches -- Input streams require [Realtime Streams v2](/tasks/streams#enabling-streams-v2) (enabled by default in SDK 4.1.0+) +- Input streams require the current streams implementation (v2 is the default in SDK 4.1.0+). See [Streams](/tasks/streams) for details. diff --git a/docs/realtime/react-hooks/streams.mdx b/docs/realtime/react-hooks/streams.mdx index 670941e8997..f5c1ab0679e 100644 --- a/docs/realtime/react-hooks/streams.mdx +++ b/docs/realtime/react-hooks/streams.mdx @@ -151,7 +151,70 @@ const { parts, error } = useRealtimeStream(runId, { }); ``` -For more information on defining and using streams, see the [Realtime Streams v2](/tasks/streams) documentation. +For more information on defining and using streams, see the [Realtime Streams](/tasks/streams) documentation. + +## useInputStreamSend + +The `useInputStreamSend` hook lets you send data from your frontend into a running task's [input stream](/tasks/streams#input-streams). Use it for cancel buttons, approval forms, or any UI that needs to push typed data into a running task. + +### Basic usage + +Pass the input stream's `id` (string), the run ID, and options such as `accessToken`. You typically get `runId` and `accessToken` from the object returned when you trigger the task (e.g. `handle.id`, `handle.publicAccessToken`). The hook returns `send`, `isLoading`, `error`, and `isReady`: + +```tsx +"use client"; + +import { useInputStreamSend } from "@trigger.dev/react-hooks"; +import { approval } from "@/trigger/streams"; + +export function ApprovalForm({ + runId, + accessToken, +}: { + runId: string; + accessToken: string; +}) { + const { send, isLoading, isReady } = useInputStreamSend( + approval.id, + runId, + { accessToken } + ); + + return ( + + ); +} +``` + +With a generic for type-safe payloads when not using a defined stream: + +```tsx +type ApprovalPayload = { approved: boolean; reviewer: string }; +const { send } = useInputStreamSend("approval", runId, { + accessToken, +}); +send({ approved: true, reviewer: "alice" }); +``` + +### Options and return value + +- **`streamId`**: The input stream identifier (string). Use the `id` from your defined stream (e.g. `approval.id`) or the same string you used in `streams.input({ id: "approval" })`. +- **`runId`**: The run to send input to. When `runId` is undefined, `isReady` is false and `send` will not trigger. +- **`options`**: `accessToken` (required for client usage), `baseURL` (optional). See [Realtime auth](/realtime/auth) for generating a public access token with the right scopes (e.g. input streams write for that run). + +Return value: + +- **`send(data)`**: Sends typed data to the input stream. Uses SWR mutation under the hood. +- **`isLoading`**: True while a send is in progress. +- **`error`**: Set if the last send failed. +- **`isReady`**: True when both `runId` and access token are available. + +For receiving input stream data inside a task (`.wait()`, `.once()`, `.on()`), see [Input Streams](/tasks/streams#input-streams) in the Streams doc. ## useRealtimeRunWithStreams diff --git a/docs/tasks/input-streams.mdx b/docs/tasks/input-streams.mdx deleted file mode 100644 index 0e123e782b1..00000000000 --- a/docs/tasks/input-streams.mdx +++ /dev/null @@ -1,405 +0,0 @@ ---- -title: "Input Streams" -sidebarTitle: "Input Streams" -description: "Send data into running tasks from your backend or frontend using input streams." ---- - -Input Streams let you send data **into** a running task from your backend or frontend. While [output streams](/tasks/streams) send data out of tasks, input streams complete the loop — enabling bidirectional communication with your running tasks. - - - Input Streams require SDK version **4.4.2 or later** and [Realtime Streams v2](/tasks/streams#enabling-streams-v2) (enabled by default in SDK 4.1.0+). - - -## Overview - -Input Streams solve three common problems: - -- **Cancelling AI streams mid-generation.** When you use AI SDK's `streamText` inside a task, the LLM keeps generating tokens until it's done — even if the user navigated away or clicked "Stop generating." With input streams, your frontend sends a cancel signal and the task aborts the LLM call immediately. - -- **Human-in-the-loop workflows.** A task generates a draft email, then pauses and waits for the user to approve or edit it before sending. - -- **Interactive agents.** An AI agent running as a task needs follow-up information from the user mid-execution — clarifying a question, choosing between options, or providing additional context. - -## Quick Start - -### 1. Define your input streams - -Define input streams in a shared file so both your task code and your backend/frontend can import them: - -```ts trigger/streams.ts -import { streams } from "@trigger.dev/sdk"; - -// Typed input stream — the generic parameter defines the shape of data sent in -export const cancelSignal = streams.input<{ reason?: string }>({ - id: "cancel", -}); - -export const approval = streams.input<{ approved: boolean; reviewer: string }>({ - id: "approval", -}); -``` - -### 2. Receive data inside your task - -```ts trigger/draft-email.ts -import { task } from "@trigger.dev/sdk"; -import { approval } from "./streams"; - -export const draftEmailTask = task({ - id: "draft-email", - run: async (payload: { to: string; subject: string }) => { - const draft = await generateDraft(payload); - - // Suspend until someone sends approval — no compute cost while waiting - const { approved, reviewer } = await approval.wait({ timeout: "7d" }).unwrap(); - - if (approved) { - await sendEmail(draft); - return { sent: true, reviewer }; - } - - return { sent: false, reviewer }; - }, -}); -``` - -### 3. Send data from your backend - -```ts -import { approval } from "./trigger/streams"; - -// Approve a draft from your API route -await approval.send(runId, { approved: true, reviewer: "alice@example.com" }); -``` - -## Defining Input Streams - -Use `streams.input()` to define a typed input stream. The generic parameter controls the shape of data that can be sent: - -```ts trigger/streams.ts -import { streams } from "@trigger.dev/sdk"; - -// Simple signal (no data needed beyond the type) -export const cancelSignal = streams.input<{ reason?: string }>({ - id: "cancel", -}); - -// Structured data -export const approval = streams.input<{ approved: boolean; reviewer: string }>({ - id: "approval", -}); - -// Complex objects -export const userResponse = streams.input<{ - action: "approve" | "reject" | "edit"; - message?: string; - edits?: Record; -}>({ - id: "user-response", -}); -``` - -Type safety is enforced through the generic parameter — both `.send()` and the receiving methods (`.wait()`, `.once()`, `.on()`, `.peek()`) share the same type. - -## Receiving Data Inside a Task - -### Choosing the right method - -| Method | Task suspended? | Compute cost while waiting | Best for | -|--------|----------------|---------------------------|----------| -| `.wait()` | **Yes** | **None** — process freed | Approval gates, human-in-the-loop, long waits | -| `.once()` | No | Full — process stays alive | Short waits, concurrent work. Returns result object with `.unwrap()` | -| `.on(handler)` | No | Full — process stays alive | Continuous listening (cancel signals, live updates) | - -### `wait()` — Suspend until data arrives - -Suspends the task entirely, freeing compute resources. The task resumes when data arrives via `.send()`. This is the most efficient option when the task has nothing else to do while waiting. - -Returns a [`ManualWaitpointPromise`](/wait-for-token) — the same type returned by `wait.forToken()`. - -```ts -import { task } from "@trigger.dev/sdk"; -import { approval } from "./streams"; - -export const publishPost = task({ - id: "publish-post", - run: async (payload: { postId: string }) => { - const draft = await prepareDraft(payload.postId); - await notifyReviewer(draft); - - // Suspend until reviewer responds — no compute cost while waiting - const result = await approval.wait({ timeout: "7d" }); - - if (result.ok) { - if (result.output.approved) { - await publish(draft); - return { published: true, reviewer: result.output.reviewer }; - } - return { published: false, reviewer: result.output.reviewer }; - } - - // Timed out after 7 days - return { published: false, timedOut: true }; - }, -}); -``` - -Use `.unwrap()` to throw on timeout instead of checking `ok`: - -```ts -// Throws WaitpointTimeoutError if the timeout is reached -const data = await approval.wait({ timeout: "24h" }).unwrap(); -console.log(data.approved); // TData directly -``` - -#### Options - -| Option | Type | Description | -|--------|------|-------------| -| `timeout` | `string` | Maximum wait time before timeout. Period format: `"30s"`, `"5m"`, `"1h"`, `"24h"`, `"7d"`. | -| `idempotencyKey` | `string` | Reuse the same waitpoint across retries. If the task retries, it resumes the same wait instead of creating a new one. | -| `idempotencyKeyTTL` | `string` | Expiration for the idempotency key. After this period, the same key creates a new waitpoint. | -| `tags` | `string[]` | Tags for the underlying waitpoint, useful for filtering via `wait.listTokens()`. | - -#### Idempotent waits for retries - -```ts -export const processOrder = task({ - id: "process-order", - retry: { maxAttempts: 3 }, - run: async (payload: { orderId: string }) => { - await prepareOrder(payload.orderId); - - // Same idempotency key across retries — won't create duplicate waitpoints - const result = await approval.wait({ - timeout: "48h", - idempotencyKey: `order-approval-${payload.orderId}`, - }); - - if (!result.ok) { - throw new Error("Approval timed out after 48 hours"); - } - - await fulfillOrder(payload.orderId, result.output); - }, -}); -``` - -### `once()` — Wait for the next value (non-suspending) - -Blocks until data arrives, but keeps the task process alive. Useful for short waits or when doing concurrent work. - -Returns an `InputStreamOncePromise` — similar to `ManualWaitpointPromise` from `.wait()`. Await it for a result object, or chain `.unwrap()` to get the data directly. - -```ts -import { task } from "@trigger.dev/sdk"; -import { approval } from "./streams"; - -export const draftEmailTask = task({ - id: "draft-email", - run: async (payload: { to: string; subject: string }) => { - const draft = await generateDraft(payload); - - // Task pauses here until someone sends approval (with a 5-minute timeout) - const result = await approval.once({ timeoutMs: 300_000 }); - - if (!result.ok) { - // Timed out — result.error is an InputStreamTimeoutError - return { sent: false, timedOut: true }; - } - - if (result.output.approved) { - await sendEmail(draft); - return { sent: true, reviewer: result.output.reviewer }; - } - - return { sent: false, reviewer: result.output.reviewer }; - }, -}); -``` - -Use `.unwrap()` to throw on timeout instead of checking `ok`: - -```ts -// Throws InputStreamTimeoutError if no data arrives within 5 minutes -const data = await approval.once({ timeoutMs: 300_000 }).unwrap(); -console.log(data.approved); // TData directly -``` - -`once()` also accepts an abort signal for cancellation: - -```ts -// With an abort signal — rejects the promise when aborted -const controller = new AbortController(); -const result = await approval.once({ signal: controller.signal }); -``` - -### `on()` — Listen for every value - -Registers a persistent handler that fires on every piece of data. Handlers are automatically cleaned up when the task run completes, so you don't need to manually unsubscribe. If you need to stop listening early (before the run ends), call `.off()` on the returned subscription. - -```ts -import { task } from "@trigger.dev/sdk"; -import { cancelSignal } from "./streams"; - -export const streamingTask = task({ - id: "streaming-task", - run: async (payload: { prompt: string }) => { - const controller = new AbortController(); - - // Listen for cancel signals — automatically cleaned up when run completes - cancelSignal.on((data) => { - console.log("Cancelled:", data.reason); - controller.abort(); - }); - - const result = await streamText({ - model: openai("gpt-4o"), - prompt: payload.prompt, - abortSignal: controller.signal, - }); - return result; - }, -}); -``` - -### `peek()` — Non-blocking check - -Returns the most recent buffered value without waiting, or `undefined` if nothing has been received yet. - -```ts -const latest = cancelSignal.peek(); -if (latest) { - // A cancel was already sent before we checked -} -``` - -## Sending Data to a Running Task - -Use `.send()` from your backend to push data into a running task. See the [backend input streams guide](/realtime/backend/input-streams) for detailed examples including API route patterns. - -```ts -import { cancelSignal, approval } from "./trigger/streams"; - -// Cancel a running AI stream -await cancelSignal.send(runId, { reason: "User clicked stop" }); - -// Approve a draft -await approval.send(runId, { approved: true, reviewer: "alice@example.com" }); -``` - -## Complete Example: Cancellable AI Streaming - -This is the most common use case — streaming an AI response while allowing the user to cancel mid-generation. - -### Define the streams - -```ts trigger/streams.ts -import { streams } from "@trigger.dev/sdk"; - -export const aiOutput = streams.define({ id: "ai" }); -export const cancelStream = streams.input<{ reason?: string }>({ id: "cancel" }); -``` - -### Create the task - -```ts trigger/ai-task.ts -import { task } from "@trigger.dev/sdk"; -import { streamText } from "ai"; -import { openai } from "@ai-sdk/openai"; -import { aiOutput, cancelStream } from "./streams"; - -export const aiTask = task({ - id: "ai-chat", - run: async (payload: { prompt: string }) => { - const controller = new AbortController(); - - // If the user cancels, abort the LLM call - cancelStream.on(() => { - controller.abort(); - }); - - const result = streamText({ - model: openai("gpt-4o"), - prompt: payload.prompt, - abortSignal: controller.signal, - }); - - // Stream output to the frontend in real-time - const { waitUntilComplete } = aiOutput.pipe(result.textStream); - await waitUntilComplete(); - - return { text: await result.text }; - }, -}); -``` - -### Cancel from a backend API route - -```ts app/api/cancel/route.ts -import { cancelStream } from "@/trigger/streams"; - -export async function POST(req: Request) { - const { runId } = await req.json(); - await cancelStream.send(runId, { reason: "User clicked stop" }); - return Response.json({ cancelled: true }); -} -``` - -### Display in your frontend - -```tsx components/ai-chat.tsx -"use client"; - -import { useRealtimeStream } from "@trigger.dev/react-hooks"; -import { aiOutput } from "@/trigger/streams"; - -export function AIChat({ - runId, - accessToken, -}: { - runId: string; - accessToken: string; -}) { - const { parts, error } = useRealtimeStream(aiOutput, runId, { - accessToken, - timeoutInSeconds: 300, - }); - - const handleCancel = async () => { - await fetch("/api/cancel", { - method: "POST", - body: JSON.stringify({ runId }), - }); - }; - - if (error) return
Error: {error.message}
; - if (!parts) return
Loading...
; - - return ( -
-
{parts.join("")}
- -
- ); -} -``` - -## Important Notes - -- Input streams require [Realtime Streams v2](/tasks/streams#enabling-streams-v2) (enabled by default in SDK 4.1.0+). If you're on an older version, calling `.on()` or `.once()` will throw with instructions to enable it. -- You cannot send data to a completed, failed, or canceled run. -- Maximum payload size per `.send()` call is 1MB. -- Data sent before any listener is registered is buffered and delivered when a listener attaches (for `.once()` and `.on()`). -- `.wait()` handles the buffering race automatically — if data was sent before `.wait()` is called, it will still be received. -- Type safety is enforced through the generic parameter on `streams.input()`. - -## Best Practices - -1. **Use `.wait()` for long waits**: If the task has nothing else to do until data arrives (approval gates, human-in-the-loop), use `.wait()` to free compute resources. Use `.once()` only for short waits or when doing concurrent work. -2. **Listeners auto-cleanup**: `.on()` handlers are automatically cleaned up when the task run completes. Call `.off()` only if you need to stop listening early -3. **Use timeouts**: Both `.wait()` and `.once()` support timeouts — always set one to avoid indefinite hangs -4. **Use idempotency keys with `.wait()`**: If your task has retries enabled, pass an `idempotencyKey` to `.wait()` so retries resume the same wait instead of creating a new one -5. **Define streams in shared files**: Keep your `streams.input()` definitions in a shared location (like `trigger/streams.ts`) so both task code and backend/frontend can import them with full type safety -6. **Combine with output streams**: Input streams pair naturally with [output streams](/tasks/streams) for full bidirectional communication — stream AI output to the frontend while accepting cancel signals from it -7. **Use descriptive stream IDs**: Choose clear IDs like `"cancel"`, `"approval"`, or `"user-response"` instead of generic names diff --git a/docs/tasks/streams.mdx b/docs/tasks/streams.mdx index 2d494977a32..563ccee2639 100644 --- a/docs/tasks/streams.mdx +++ b/docs/tasks/streams.mdx @@ -4,17 +4,17 @@ sidebarTitle: "Streams" description: "Stream data in realtime from your Trigger.dev tasks to your frontend or backend applications." --- -Realtime Streams allow you to pipe streaming data from your Trigger.dev tasks to your frontend or backend applications in real-time. This is perfect for use cases like streaming AI completions, progress updates, or any continuous data flow. +Realtime Streams allow you to pipe streaming data from your Trigger.dev tasks to your frontend or backend applications in real-time. This is perfect for use cases like streaming AI completions, progress updates, or any continuous data flow. You can also **send data into** running tasks with [Input Streams](#input-streams) for bidirectional flows (e.g. cancel buttons, approvals). - Streams v2 requires SDK version **4.1.0 or later**. Make sure to upgrade your `@trigger.dev/sdk` - and `@trigger.dev/react-hooks` packages to use these features. If you're on an earlier version, - see the [metadata.stream()](/runs/metadata#stream) documentation. + Streams require SDK version **4.1.0 or later** (`@trigger.dev/sdk` and `@trigger.dev/react-hooks`). + This doc describes the current streams behavior (v2 is the default). For pre-4.1.0 streams, see + [Pre-4.1.0 streams (legacy)](#pre-410-streams-legacy) below. ## Overview -Streams v2 is a major upgrade that provides: +Realtime Streams provide: - **Unlimited stream length** (previously capped at 2000 chunks) - **Unlimited active streams per run** (previously 5) @@ -23,47 +23,27 @@ Streams v2 is a major upgrade that provides: - **Multiple client streams** can pipe to a single stream - **Enhanced dashboard visibility** for viewing stream data in real-time -## Enabling Streams v2 - -Streams v2 is **automatically enabled** when triggering runs from the SDK using 4.1.0 or later. If you aren't triggering via the SDK, you'll need to explicitly enable v2 streams via setting the `x-trigger-realtime-streams-version=v2` header when triggering the task. - -If you'd like to **opt-out** of the v2 streams, you can see so in one of the following two ways: - -### Option 1: Configure the SDK - -```ts -import { auth } from "@trigger.dev/sdk"; - -auth.configure({ - future: { - v2RealtimeStreams: false, - }, -}); -``` - -### Option 2: Environment Variable - -Set the `TRIGGER_V2_REALTIME_STREAMS=0` environment variable in your backend code (where you trigger tasks). +Streams v2 is the **default** when using SDK 4.1.0 or later. If you trigger tasks outside the SDK, set the `x-trigger-realtime-streams-version=v2` header. To opt out, use `auth.configure({ future: { v2RealtimeStreams: false } })` or `TRIGGER_V2_REALTIME_STREAMS=0`. ## Limits Comparison -| Limit | Streams v1 | Streams v2 | -| -------------------------------- | ---------- | ---------- | -| Maximum stream length | 2000 | Unlimited | -| Number of active streams per run | 5 | Unlimited | -| Maximum streams per run | 10 | Unlimited | -| Maximum stream TTL | 1 day | 28 days | -| Maximum stream size | 10MB | 300 MiB | +| Limit | Legacy (pre-4.1.0) | Current | +| -------------------------------- | ------------------ | --------- | +| Maximum stream length | 2000 | Unlimited | +| Number of active streams per run | 5 | Unlimited | +| Maximum streams per run | 10 | Unlimited | +| Maximum stream TTL | 1 day | 28 days | +| Maximum stream size | 10MB | 300 MiB | ## Quick Start -The recommended workflow for using Realtime Streams v2: +The recommended workflow for **output** streams (data from task to client): 1. **Define your streams** in a shared location using `streams.define()` 2. **Use the defined stream** in your tasks with `.pipe()`, `.append()`, or `.writer()` 3. **Read from the stream** using `.read()` or the `useRealtimeStream` hook in React -This approach gives you full type safety, better code organization, and easier maintenance as your application grows. +This approach gives you full type safety, better code organization, and easier maintenance as your application grows. For **input** streams (sending data into a running task), see [Input Streams](#input-streams) below. ## Defining Typed Streams (Recommended) @@ -517,6 +497,161 @@ const { parts, error } = useRealtimeStream(streamDef, runId, { }); ``` +## Input Streams + +Input Streams let you send data **into** a running task from your backend or frontend. While output streams (above) send data out of tasks, input streams complete the loop — enabling bidirectional communication. + + + Input Streams require SDK version **4.4.2 or later** and use the same streams infrastructure (v2 is the default). If you're on an older SDK, calling `.on()` or `.once()` will throw with instructions to enable v2 streams. See [Pre-4.1.0 streams (legacy)](#pre-410-streams-legacy) for the older metadata-based API. + + +### Input Streams overview + +Input Streams solve three common problems: + +- **Cancelling AI streams mid-generation.** When you use AI SDK's `streamText` inside a task, the LLM keeps generating until it's done — even if the user clicked "Stop." With input streams, your frontend sends a cancel signal and the task aborts the LLM call immediately. +- **Human-in-the-loop workflows.** A task generates a draft, then pauses and waits for the user to approve or edit it before continuing. +- **Interactive agents.** An AI agent running as a task needs follow-up information from the user mid-execution — clarifying a question, choosing between options, or providing additional context. + +### Quick Start (Input Streams) + +1. **Define** input streams in a shared file with `streams.input({ id: "..." })`. +2. **Receive** in your task with `.wait()`, `.once()`, `.on()`, or `.peek()`. +3. **Send** from your backend with `.send(runId, data)` or from the frontend with the `useInputStreamSend` hook (see [Realtime React hooks](/realtime/react-hooks/streams#useinputstreamsend)). + +### Defining Input Streams + +Use `streams.input()` to define a typed input stream. The generic parameter controls the shape of data that can be sent: + +```ts +import { streams } from "@trigger.dev/sdk"; + +export const cancelSignal = streams.input<{ reason?: string }>({ + id: "cancel", +}); + +export const approval = streams.input<{ approved: boolean; reviewer: string }>({ + id: "approval", +}); + +export const userResponse = streams.input<{ + action: "approve" | "reject" | "edit"; + message?: string; + edits?: Record; +}>({ + id: "user-response", +}); +``` + +Type safety is enforced through the generic — both `.send()` and the receiving methods (`.wait()`, `.once()`, `.on()`, `.peek()`) share the same type. + +### Receiving data inside a task + +| Method | Task suspended? | Compute cost while waiting | Best for | +|--------|-----------------|----------------------------|-----------| +| `.wait()` | **Yes** | **None** — process freed | Approval gates, human-in-the-loop, long waits | +| `.once()` | No | Full — process stays alive | Short waits, concurrent work; returns result object with `.unwrap()` | +| `.on(handler)` | No | Full — process stays alive | Continuous listening (cancel signals, live updates) | +| `.peek()` | No | None | Non-blocking check for latest buffered value | + +#### `wait()` — Suspend until data arrives + +Suspends the task entirely, freeing compute resources. The task resumes when data arrives via `.send()`. Returns a [`ManualWaitpointPromise`](/wait-for-token) — the same type as `wait.forToken()`. + +```ts +import { task } from "@trigger.dev/sdk"; +import { approval } from "./streams"; + +export const publishPost = task({ + id: "publish-post", + run: async (payload: { postId: string }) => { + const draft = await prepareDraft(payload.postId); + await notifyReviewer(draft); + + const result = await approval.wait({ timeout: "7d" }); + + if (result.ok) { + if (result.output.approved) { + await publish(draft); + return { published: true, reviewer: result.output.reviewer }; + } + return { published: false, reviewer: result.output.reviewer }; + } + return { published: false, timedOut: true }; + }, +}); +``` + +Use `.unwrap()` to throw on timeout: `const data = await approval.wait({ timeout: "24h" }).unwrap();` + +**Options:** `timeout` (e.g. `"30s"`, `"5m"`, `"24h"`, `"7d"`), `idempotencyKey`, `idempotencyKeyTTL`, `tags`. Use `idempotencyKey` when your task has retries so the same waitpoint is resumed across retries. + +#### `once()` — Wait for the next value (non-suspending) + +Blocks until data arrives but keeps the task process alive. Returns a result object; use `.unwrap()` to get the data or throw on timeout. + +```ts +const result = await approval.once({ timeoutMs: 300_000 }); +if (result.ok) { + console.log(result.output.approved); +} +// Or: const data = await approval.once({ timeoutMs: 300_000 }).unwrap(); +``` + +`once()` also accepts a `signal` (e.g. `AbortController.signal`) for cancellation. + +#### `on()` — Listen for every value + +Registers a persistent handler that fires on every piece of data. Handlers are automatically cleaned up when the task run completes. Call `.off()` on the returned subscription to stop listening early. + +```ts +const controller = new AbortController(); +cancelSignal.on((data) => { + console.log("Cancelled:", data.reason); + controller.abort(); +}); +const result = await streamText({ ..., abortSignal: controller.signal }); +``` + +#### `peek()` — Non-blocking check + +Returns the most recent buffered value without waiting, or `undefined` if nothing has been received yet. + +```ts +const latest = cancelSignal.peek(); +if (latest) { + // A cancel was already sent before we checked +} +``` + +### Sending data to a running task + +Use `.send(runId, data)` from your backend to push data into a running task. See the [backend input streams guide](/realtime/backend/input-streams) for API route patterns. + +```ts +import { cancelSignal, approval } from "./trigger/streams"; + +await cancelSignal.send(runId, { reason: "User clicked stop" }); +await approval.send(runId, { approved: true, reviewer: "alice@example.com" }); +``` + +### Complete example: Cancellable AI streaming + +Stream an AI response while allowing the user to cancel mid-generation. + +**Define the streams:** + +```ts +import { streams } from "@trigger.dev/sdk"; + +export const aiOutput = streams.define({ id: "ai" }); +export const cancelStream = streams.input<{ reason?: string }>({ id: "cancel" }); +``` + +**Task:** Register `cancelStream.on()` to abort an `AbortController`, then pipe `streamText(...).textStream` to `aiOutput`. **Backend:** POST to an API route that calls `cancelStream.send(runId, { reason: "User clicked stop" })`. **Frontend:** Use `useRealtimeStream(aiOutput, runId, { accessToken })` and a button that calls your cancel API (or use the `useInputStreamSend` hook; see [Realtime React hooks](/realtime/react-hooks/streams#useinputstreamsend)). + +**Important notes (input streams):** You cannot send to a completed, failed, or canceled run. Max payload per `.send()` is 1MB. Data sent before a listener is registered is buffered and delivered when a listener attaches; `.wait()` handles the buffering race automatically. Use `.wait()` for long waits to free compute; use `.once()` for short waits or concurrent work. Define input streams in a shared location and combine with output streams for full bidirectional communication. + ## Complete Example: AI Streaming ### Define the stream @@ -709,11 +844,14 @@ Streams are now visible in the Trigger.dev dashboard, allowing you to: 6. **Throttle frontend updates**: Use `throttleInMs` in `useRealtimeStream` to prevent excessive re-renders 7. **Use descriptive stream IDs**: Choose clear, descriptive IDs like `"ai-output"` or `"progress"` instead of generic names +## Pre-4.1.0 streams (legacy) + +Prior to SDK 4.1.0, streams used the older metadata-based API. If you're on an earlier version, see [metadata.stream()](/runs/metadata#stream) for legacy usage. With 4.4.2+, [Input Streams](#input-streams) are available and documented in this page. + ## Troubleshooting ### Stream not appearing in dashboard -- Ensure you've enabled Streams v2 via the future flag or environment variable - Verify your task is actually writing to the stream - Check that the stream key matches between writing and reading @@ -725,6 +863,11 @@ Streams are now visible in the Trigger.dev dashboard, allowing you to: ### Missing chunks -- With v2, chunks should never be lost due to automatic resumption +- With the current streams implementation, chunks should not be lost due to automatic resumption - Verify you're reading from the correct stream key - Check the `startIndex` option if you're not seeing expected chunks + +### Input streams not working + +- Input streams require SDK **4.4.2 or later** and the default streams (v2) infrastructure. Ensure you're on a recent SDK and not using the legacy metadata.stream() API. +- If `.on()` or `.once()` throw, follow the error message to enable v2 streams (they are default in 4.1.0+). diff --git a/docs/wait-for-token.mdx b/docs/wait-for-token.mdx index 2b8f56a003e..9ac13af2c20 100644 --- a/docs/wait-for-token.mdx +++ b/docs/wait-for-token.mdx @@ -8,7 +8,7 @@ Waitpoint tokens pause task runs until you complete the token. They're commonly You can complete a token using the SDK or by making a POST request to the token's URL. - If you're waiting for data from an [input stream](/tasks/input-streams), use [`inputStream.wait()`](/tasks/input-streams#wait--suspend-until-data-arrives) instead — it uses waitpoint tokens internally but provides a simpler API with full type safety from your stream definition. + If you're waiting for data from an [input stream](/tasks/streams#input-streams), use [`inputStream.wait()`](/tasks/streams#wait--suspend-until-data-arrives) instead — it uses waitpoint tokens internally but provides a simpler API with full type safety from your stream definition. ## Usage diff --git a/docs/wait.mdx b/docs/wait.mdx index 06956e0f660..e67fbad9e6e 100644 --- a/docs/wait.mdx +++ b/docs/wait.mdx @@ -15,4 +15,4 @@ Waiting allows you to write complex tasks as a set of async code, without having | [wait.for()](/wait-for) | Waits for a specific period of time, e.g. 1 day. | | [wait.until()](/wait-until) | Waits until the provided `Date`. | | [wait.forToken()](/wait-for-token) | Pauses runs until a token is completed. | -| [inputStream.wait()](/tasks/input-streams#wait--suspend-until-data-arrives) | Pauses runs until data arrives on an input stream. | +| [inputStream.wait()](/tasks/streams#wait--suspend-until-data-arrives) | Pauses runs until data arrives on an input stream. |