Skip to content

Commit f7876f7

Browse files
committed
notes about off() not being necessary anymore
1 parent 216cc17 commit f7876f7

File tree

1 file changed

+20
-32
lines changed

1 file changed

+20
-32
lines changed

docs/tasks/input-streams.mdx

Lines changed: 20 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ const result = await approval.once({ signal: controller.signal });
236236

237237
### `on()` — Listen for every value
238238

239-
Registers a persistent handler that fires on every piece of data. Returns a subscription with an `.off()` method.
239+
Registers a persistent handler that fires on every piece of data. Handlers are automatically cleaned up when the task run completes, so you don't need to manually unsubscribe. If you need to stop listening early (before the run ends), call `.off()` on the returned subscription.
240240

241241
```ts
242242
import { task } from "@trigger.dev/sdk";
@@ -247,30 +247,22 @@ export const streamingTask = task({
247247
run: async (payload: { prompt: string }) => {
248248
const controller = new AbortController();
249249

250-
// Listen for cancel signals
251-
const sub = cancelSignal.on((data) => {
250+
// Listen for cancel signals — automatically cleaned up when run completes
251+
cancelSignal.on((data) => {
252252
console.log("Cancelled:", data.reason);
253253
controller.abort();
254254
});
255255

256-
try {
257-
const result = await streamText({
258-
model: openai("gpt-4o"),
259-
prompt: payload.prompt,
260-
abortSignal: controller.signal,
261-
});
262-
return result;
263-
} finally {
264-
sub.off(); // Always clean up the listener
265-
}
256+
const result = await streamText({
257+
model: openai("gpt-4o"),
258+
prompt: payload.prompt,
259+
abortSignal: controller.signal,
260+
});
261+
return result;
266262
},
267263
});
268264
```
269265

270-
<Warning>
271-
Always call `.off()` in a `finally` block to avoid memory leaks. The subscription stays active for the lifetime of the run if not cleaned up.
272-
</Warning>
273-
274266
### `peek()` — Non-blocking check
275267

276268
Returns the most recent buffered value without waiting, or `undefined` if nothing has been received yet.
@@ -323,25 +315,21 @@ export const aiTask = task({
323315
const controller = new AbortController();
324316

325317
// If the user cancels, abort the LLM call
326-
const sub = cancelStream.on(() => {
318+
cancelStream.on(() => {
327319
controller.abort();
328320
});
329321

330-
try {
331-
const result = streamText({
332-
model: openai("gpt-4o"),
333-
prompt: payload.prompt,
334-
abortSignal: controller.signal,
335-
});
322+
const result = streamText({
323+
model: openai("gpt-4o"),
324+
prompt: payload.prompt,
325+
abortSignal: controller.signal,
326+
});
336327

337-
// Stream output to the frontend in real-time
338-
const { waitUntilComplete } = aiOutput.pipe(result.textStream);
339-
await waitUntilComplete();
328+
// Stream output to the frontend in real-time
329+
const { waitUntilComplete } = aiOutput.pipe(result.textStream);
330+
await waitUntilComplete();
340331

341-
return { text: await result.text };
342-
} finally {
343-
sub.off();
344-
}
332+
return { text: await result.text };
345333
},
346334
});
347335
```
@@ -409,7 +397,7 @@ export function AIChat({
409397
## Best Practices
410398

411399
1. **Use `.wait()` for long waits**: If the task has nothing else to do until data arrives (approval gates, human-in-the-loop), use `.wait()` to free compute resources. Use `.once()` only for short waits or when doing concurrent work.
412-
2. **Always clean up listeners**: Call `.off()` in a `finally` block when using `.on()` to prevent memory leaks
400+
2. **Listeners auto-cleanup**: `.on()` handlers are automatically cleaned up when the task run completes. Call `.off()` only if you need to stop listening early
413401
3. **Use timeouts**: Both `.wait()` and `.once()` support timeouts — always set one to avoid indefinite hangs
414402
4. **Use idempotency keys with `.wait()`**: If your task has retries enabled, pass an `idempotencyKey` to `.wait()` so retries resume the same wait instead of creating a new one
415403
5. **Define streams in shared files**: Keep your `streams.input()` definitions in a shared location (like `trigger/streams.ts`) so both task code and backend/frontend can import them with full type safety

0 commit comments

Comments
 (0)