Skip to content

Commit 3eb16f5

Browse files
committed
automatically cleanup input stream on handlers and tail SSE requests when a task execution cleansup
1 parent 8bd3c69 commit 3eb16f5

File tree

7 files changed

+31
-3
lines changed

7 files changed

+31
-3
lines changed

packages/core/src/v3/inputStreams/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ export class InputStreamsAPI implements InputStreamManager {
5151
return this.#getManager().lastSeqNum(streamId);
5252
}
5353

54+
public clearHandlers(): void {
55+
this.#getManager().clearHandlers();
56+
}
57+
5458
public reset(): void {
5559
this.#getManager().reset();
5660
}

packages/core/src/v3/inputStreams/manager.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,19 @@ export class StandardInputStreamManager implements InputStreamManager {
145145
return undefined;
146146
}
147147

148+
clearHandlers(): void {
149+
this.handlers.clear();
150+
151+
// Abort tails that no longer have any once waiters either
152+
for (const [streamId, tail] of this.tails) {
153+
const hasWaiters = this.onceWaiters.has(streamId) && this.onceWaiters.get(streamId)!.length > 0;
154+
if (!hasWaiters) {
155+
tail.abortController.abort();
156+
this.tails.delete(streamId);
157+
}
158+
}
159+
}
160+
148161
connectTail(runId: string, _fromSeq?: number): void {
149162
// No-op: tails are now created per-stream lazily
150163
}

packages/core/src/v3/inputStreams/noopManager.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ export class NoopInputStreamManager implements InputStreamManager {
2222
return undefined;
2323
}
2424

25+
clearHandlers(): void {}
2526
reset(): void {}
2627
disconnect(): void {}
2728
connectTail(_runId: string, _fromSeq?: number): void {}

packages/core/src/v3/inputStreams/types.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ export interface InputStreamManager {
4747

4848
/**
4949
* Register a handler that fires every time data arrives on the given input stream.
50+
* Handlers are automatically cleaned up when the task run completes.
51+
* Returns `{ off }` for early unsubscription if needed.
5052
*/
5153
on(streamId: string, handler: (data: unknown) => void | Promise<void>): { off: () => void };
5254

@@ -68,6 +70,12 @@ export interface InputStreamManager {
6870
*/
6971
lastSeqNum(streamId: string): number | undefined;
7072

73+
/**
74+
* Clear all persistent `.on()` handlers and abort tails that have no remaining once waiters.
75+
* Called automatically when a task run completes.
76+
*/
77+
clearHandlers(): void;
78+
7179
/**
7280
* Reset state between task executions.
7381
*/

packages/core/src/v3/realtimeStreams/types.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ export type RealtimeDefinedInputStream<TData> = {
160160
id: string;
161161
/**
162162
* Register a handler that fires every time data arrives on this input stream.
163-
* Returns a subscription object with an `.off()` method to unsubscribe.
163+
* Handlers are automatically cleaned up when the task run completes, so calling
164+
* `.off()` is optional. Returns a subscription with `.off()` for early unsubscription.
164165
*/
165166
on: (handler: (data: TData) => void | Promise<void>) => InputStreamSubscription;
166167
/**

packages/core/src/v3/workers/taskExecutor.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
accessoryAttributes,
1414
attemptKey,
1515
flattenAttributes,
16+
inputStreams,
1617
lifecycleHooks,
1718
OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT,
1819
runMetadata,
@@ -1046,6 +1047,7 @@ export class TaskExecutor {
10461047
signal: AbortSignal
10471048
) {
10481049
await this.#callCleanupFunctions(payload, ctx, initOutput, signal);
1050+
inputStreams.clearHandlers();
10491051
await this.#blockForWaitUntil();
10501052
}
10511053

references/hello-world/src/trigger/inputStreams.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ export const inputStreamOn = task({
119119

120120
logger.info("Subscribing to messages via .on()", { expected });
121121

122-
const { off } = messageStream.on((data) => {
122+
messageStream.on((data) => {
123123
logger.info("Received message", { data });
124124
received.push(data);
125125
});
@@ -128,7 +128,6 @@ export const inputStreamOn = task({
128128
await wait.for({ seconds: 1 });
129129
}
130130

131-
off();
132131
logger.info("Done receiving messages", { count: received.length });
133132
return { messages: received };
134133
},

0 commit comments

Comments
 (0)