Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/input-stream-wait.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/core": patch
"@trigger.dev/sdk": patch
---

Add `.wait()` method to input streams for suspending tasks while waiting for data. Unlike `.once()` which keeps the task process alive, `.wait()` suspends the task entirely, freeing compute resources. The task resumes when data arrives via `.send()`.
175 changes: 175 additions & 0 deletions .claude/skills/trigger-dev-tasks/realtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Realtime allows you to:
- Subscribe to run status changes, metadata updates, and streams
- Build real-time dashboards and UI updates
- Monitor task progress from frontend and backend
- Send data into running tasks with input streams

## Authentication

Expand Down Expand Up @@ -103,6 +104,178 @@ for await (const chunk of stream) {
}
```

## Input Streams

Input streams let you send data **into** a running task from your backend or frontend. Output streams send data out of tasks; input streams complete the loop.

### Problems Input Streams Solve

**Cancelling AI streams mid-generation.** When you use AI SDK's `streamText` inside a task, the LLM keeps generating tokens until it's done — even if the user has navigated away or clicked "Stop generating." Without input streams, there's no way to tell the running task to abort. With input streams, your frontend sends a cancel signal and the task aborts the LLM call immediately.

**Human-in-the-loop workflows.** A task generates a draft, then pauses and waits for user approval before proceeding.

**Interactive agents.** An AI agent running as a task needs follow-up information from the user mid-execution.

### Defining Input Streams

```ts
// trigger/streams.ts
import { streams } from "@trigger.dev/sdk";

export const cancelSignal = streams.input<{ reason?: string }>({ id: "cancel" });
export const approval = streams.input<{ approved: boolean; reviewer: string }>({ id: "approval" });
```

### Receiving Data Inside a Task

#### `wait()` — Suspend until data arrives (recommended for long waits)

Suspends the task entirely, freeing compute. Returns `ManualWaitpointPromise<TData>` (same as `wait.forToken()`).

```ts
import { task } from "@trigger.dev/sdk";
import { approval } from "./streams";

export const publishPost = task({
id: "publish-post",
run: async (payload: { postId: string }) => {
const draft = await prepareDraft(payload.postId);
await notifyReviewer(draft);

// Suspend — no compute cost while waiting
const result = await approval.wait({ timeout: "7d" });

if (result.ok) {
return { published: result.output.approved };
}
return { published: false, timedOut: true };
},
});
```

Options: `timeout` (period string), `idempotencyKey`, `idempotencyKeyTTL`, `tags`.

Use `.unwrap()` to throw on timeout: `const data = await approval.wait({ timeout: "24h" }).unwrap();`

**Use `.wait()` when:** nothing to do until data arrives, wait could be long, want zero compute cost.

#### `once()` — Wait for the next value (non-suspending)

Keeps the task process alive. Use for short waits or when doing concurrent work.

```ts
import { task } from "@trigger.dev/sdk";
import { approval } from "./streams";

export const draftEmailTask = task({
id: "draft-email",
run: async (payload: { to: string; subject: string }) => {
const draft = await generateDraft(payload);
const result = await approval.once(); // Blocks until data arrives

if (result.approved) {
await sendEmail(draft);
}
return { sent: result.approved, reviewer: result.reviewer };
},
});
```

Options: `once({ timeoutMs: 300_000 })` or `once({ signal: controller.signal })`.

**Use `.once()` when:** wait is short, doing concurrent work, need AbortSignal support.

#### `on()` — Listen for every value

```ts
import { task } from "@trigger.dev/sdk";
import { cancelSignal } from "./streams";

export const streamingTask = task({
id: "streaming-task",
run: async (payload: { prompt: string }) => {
const controller = new AbortController();

const sub = cancelSignal.on(() => {
controller.abort();
});

try {
const result = await streamText({
model: openai("gpt-4o"),
prompt: payload.prompt,
abortSignal: controller.signal,
});
return result;
} finally {
sub.off();
}
},
});
```

#### `peek()` — Non-blocking check

```ts
const latest = cancelSignal.peek(); // undefined if nothing received yet
```

### Sending Data to a Running Task

```ts
import { cancelSignal, approval } from "./trigger/streams";

await cancelSignal.send(runId, { reason: "User clicked stop" });
await approval.send(runId, { approved: true, reviewer: "alice@example.com" });
```

### Full Example: Cancellable AI Streaming

```ts
// trigger/streams.ts
import { streams } from "@trigger.dev/sdk";

export const aiOutput = streams.define<string>({ id: "ai" });
export const cancelStream = streams.input<{ reason?: string }>({ id: "cancel" });
```

```ts
// trigger/ai-task.ts
import { task } from "@trigger.dev/sdk";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";
import { aiOutput, cancelStream } from "./streams";

export const aiTask = task({
id: "ai-chat",
run: async (payload: { prompt: string }) => {
const controller = new AbortController();
const sub = cancelStream.on(() => controller.abort());

try {
const result = streamText({
model: openai("gpt-4o"),
prompt: payload.prompt,
abortSignal: controller.signal,
});

const { waitUntilComplete } = aiOutput.pipe(result.textStream);
await waitUntilComplete();
return { text: await result.text };
} finally {
sub.off();
}
},
});
```

### Important Notes

- Input streams require v2 realtime streams (SDK 4.1.0+). Calling `.on()` or `.once()` without v2 throws an error.
- Cannot send data to completed/failed/canceled runs.
- Max 1MB per `.send()` call.
- Data sent before a listener is registered gets buffered and delivered when a listener attaches.

## React Frontend Usage

### Installation
Expand Down Expand Up @@ -242,3 +415,5 @@ Key properties available in run subscriptions:
- **Handle errors**: Always check for errors in hooks and subscriptions
- **Type safety**: Use task types for proper payload/output typing
- **Cleanup subscriptions**: Backend subscriptions auto-complete, frontend hooks auto-cleanup
- **Clean up input stream listeners**: Always call `.off()` in a `finally` block to avoid leaks
- **Use timeouts with `once()`**: Avoid hanging indefinitely if the signal never arrives
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,5 @@ apps/**/public/build
**/.claude/settings.local.json
.mcp.log
.mcp.json
.cursor/debug.log
.cursor/debug.log
ailogger-output.log
6 changes: 6 additions & 0 deletions .server-changes/input-stream-wait.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Add input stream `.wait()` support: new API route for creating input-stream-linked waitpoints, Redis cache for fast waitpoint lookup from `.send()`, and waitpoint completion bridging in the send route.
2 changes: 2 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1344,6 +1344,8 @@ const EnvironmentSchema = z

REALTIME_STREAMS_S2_BASIN: z.string().optional(),
REALTIME_STREAMS_S2_ACCESS_TOKEN: z.string().optional(),
REALTIME_STREAMS_S2_ENDPOINT: z.string().optional(),
REALTIME_STREAMS_S2_SKIP_ACCESS_TOKENS: z.enum(["true", "false"]).default("false"),
REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS: z.coerce
.number()
.int()
Expand Down
35 changes: 35 additions & 0 deletions apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,41 @@ export class SpanPresenter extends BasePresenter {
},
};
}
case "input-stream": {
if (!span.entity.id) {
logger.error(`SpanPresenter: No input stream id`, {
spanId,
inputStreamId: span.entity.id,
});
return { ...data, entity: null };
}

const [runId, streamId] = span.entity.id.split(":");

if (!runId || !streamId) {
logger.error(`SpanPresenter: Invalid input stream id`, {
spanId,
inputStreamId: span.entity.id,
});
return { ...data, entity: null };
}

// Translate user-facing stream ID to internal S2 stream name
const s2StreamKey = `$trigger.input:${streamId}`;

return {
...data,
entity: {
type: "realtime-stream" as const,
object: {
runId,
streamKey: s2StreamKey,
displayName: streamId,
metadata: undefined,
},
},
};
}
default:
return { ...data, entity: null };
}
Expand Down
Loading
Loading