Skip to content

feat: Realtime input streams for bidirectional task communication#3145

Closed
ericallam wants to merge 12 commits intomainfrom
claude/temporal-signals-versioning-xzlHf
Closed

feat: Realtime input streams for bidirectional task communication#3145
ericallam wants to merge 12 commits intomainfrom
claude/temporal-signals-versioning-xzlHf

Conversation

@ericallam
Copy link
Member

No description provided.

@changeset-bot
Copy link

changeset-bot bot commented Feb 27, 2026

🦋 Changeset detected

Latest commit: f0f7315

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 28 packages
Name Type
@trigger.dev/core Patch
@trigger.dev/sdk Patch
@trigger.dev/build Patch
trigger.dev Patch
@trigger.dev/python Patch
@trigger.dev/redis-worker Patch
@trigger.dev/schema-to-json Patch
@internal/cache Patch
@internal/clickhouse Patch
@internal/redis Patch
@internal/replication Patch
@internal/run-engine Patch
@internal/schedule-engine Patch
@internal/testcontainers Patch
@internal/tracing Patch
@internal/tsql Patch
@internal/zod-worker Patch
@internal/sdk-compat-tests Patch
d3-chat Patch
references-d3-openai-agents Patch
references-nextjs-realtime Patch
references-realtime-hooks-test Patch
references-realtime-streams Patch
references-telemetry Patch
@trigger.dev/react-hooks Patch
@trigger.dev/rsc Patch
@trigger.dev/database Patch
@trigger.dev/otlp-importer Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 27, 2026

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between db7bdea and f0f7315.

⛔ Files ignored due to path filters (5)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
  • references/hello-world/src/trigger/inputStreams.ts is excluded by !references/**
  • references/hello-world/src/trigger/realtime.ts is excluded by !references/**
  • references/hello-world/src/trigger/streams.ts is excluded by !references/**
  • references/hello-world/src/trigger/streamsV2.ts is excluded by !references/**
📒 Files selected for processing (44)
  • .changeset/input-stream-wait.md
  • .claude/skills/trigger-dev-tasks/realtime.md
  • .gitignore
  • .server-changes/input-stream-wait.md
  • apps/webapp/app/env.server.ts
  • apps/webapp/app/presenters/v3/SpanPresenter.server.ts
  • apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts
  • apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
  • apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts
  • apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx
  • apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx
  • apps/webapp/app/services/authorization.server.ts
  • apps/webapp/app/services/inputStreamWaitpointCache.server.ts
  • apps/webapp/app/services/realtime/s2realtimeStreams.server.ts
  • apps/webapp/app/services/realtime/types.ts
  • apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts
  • apps/webapp/app/v3/services/deployment.server.ts
  • apps/webapp/package.json
  • docker/docker-compose.yml
  • docs/designs/input-stream-wait.md
  • internal-packages/database/prisma/schema.prisma
  • packages/cli-v3/src/entryPoints/dev-run-worker.ts
  • packages/cli-v3/src/entryPoints/managed-run-worker.ts
  • packages/core/package.json
  • packages/core/src/v3/apiClient/index.ts
  • packages/core/src/v3/index.ts
  • packages/core/src/v3/input-streams-api.ts
  • packages/core/src/v3/inputStreams/index.ts
  • packages/core/src/v3/inputStreams/manager.ts
  • packages/core/src/v3/inputStreams/noopManager.ts
  • packages/core/src/v3/inputStreams/types.ts
  • packages/core/src/v3/realtimeStreams/streamInstance.ts
  • packages/core/src/v3/realtimeStreams/streamsWriterV2.ts
  • packages/core/src/v3/realtimeStreams/types.ts
  • packages/core/src/v3/schemas/api.ts
  • packages/core/src/v3/utils/globals.ts
  • packages/core/src/v3/waitpoints/index.ts
  • packages/core/src/v3/workers/index.ts
  • packages/react-hooks/src/hooks/useInputStreamSend.ts
  • packages/react-hooks/src/index.ts
  • packages/trigger-sdk/src/v3/auth.ts
  • packages/trigger-sdk/src/v3/streams.ts
  • packages/trigger-sdk/src/v3/wait.ts
  • rules/4.3.0/realtime.md

Walkthrough

This pull request implements a .wait() method for input streams, enabling tasks to suspend execution while waiting for real-time data. It introduces new API routes to create input stream waitpoints, adds a Redis-backed cache for fast waitpoint lookup, extends the S2 realtime streams service with endpoint configuration and record-reading capabilities, defines comprehensive type schemas for input stream operations, implements React hooks for sending data to input streams, updates worker runtimes to initialize input stream managers, and provides extensive design documentation. Database schema changes add a unique constraint on oneTimeUseToken, and environment configuration is extended with S2 endpoint and token-skipping options.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The pull request description is completely empty, missing all required template sections including testing details, changelog, and the PR checklist. Add a complete PR description following the template, including testing steps, a changelog summary, and verification of the contributing guide and title convention.
Docstring Coverage ⚠️ Warning Docstring coverage is 31.58% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically describes the main feature being added: input streams for bidirectional task communication in the Realtime system.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch claude/temporal-signals-versioning-xzlHf

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 Trivy (0.69.1)

Failed to read Trivy output file: ENOENT: no such file or directory, open '/inmem/1255/nsjail-d42ba2d6-68e9-4168-82ba-25f6c7edfff8/merged/.trivy-output.json'


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

claude and others added 12 commits February 27, 2026 14:21
Adds a new `streams.input<T>()` API that enables external callers to send
typed data into running tasks. Inside tasks, data is received via `.on()`,
`.once()`, and `.peek()` methods. Outside tasks, `.send(runId, data)` pushes
data through a new platform API route.

Architecture:
- All input streams for a run multiplex onto a single `__input` stream
- Worker demuxes by stream ID and routes to registered handlers
- Lazy stream creation on first `.send()` call
- New IPC message `INPUT_STREAM_CREATED` triggers worker tail connection

New files:
- packages/core/src/v3/inputStreams/ (manager, types, noop, singleton)
- apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts
- DB migration for hasInputStream field on TaskRun

https://claude.ai/code/session_01SJHJts7r2yAxmoKLLz8vpc
Instead of relying on an INPUT_STREAM_CREATED IPC message from the
platform (which was never wired up), the worker now connects the SSE
tail automatically when task code first calls .on() or .once() on an
input stream. The worker receives the run ID via setRunId() at
execution start, and #ensureTailConnected() opens the tail on demand.

This eliminates the need for platform→coordinator→executor→worker IPC
signaling and ensures no race conditions since the SSE tail reads from
the beginning of the __input stream.

https://claude.ai/code/session_01SJHJts7r2yAxmoKLLz8vpc
Input streams are backed by S2 which is only available with v2 realtime
streams. The lazy tail connection now checks that streamsVersion is "v2"
before connecting, so runs using v1 (Redis-backed) streams won't
attempt to open an S2 tail.

https://claude.ai/code/session_01SJHJts7r2yAxmoKLLz8vpc
Instead of silently hanging, on() and once() now throw immediately with
a clear error message telling the developer to enable v2RealtimeStreams.

https://claude.ai/code/session_01SJHJts7r2yAxmoKLLz8vpc
Cover the problems input streams solve (cancelling AI SDK streamText
mid-stream, human-in-the-loop workflows, interactive agents), the full
API surface (on, once, peek, send), and a complete cancellable AI
streaming example.

Updates rules/4.3.0/realtime.md, manifest.json, and the Claude Code
skill docs.

https://claude.ai/code/session_01SJHJts7r2yAxmoKLLz8vpc
Proposes version-aware delivery for input streams and wait tokens,
enabling senders to guard against schema mismatches across deploys.

https://claude.ai/code/session_01SJHJts7r2yAxmoKLLz8vpc
Not the direction we're going — focusing on input stream .wait() using waitpoint tokens instead.

https://claude.ai/code/session_01SJHJts7r2yAxmoKLLz8vpc
Documents the proposed .wait() method on input streams that uses waitpoint
tokens internally to suspend task execution. Covers API surface, options,
return types, examples, and behavioral differences from .once().

https://claude.ai/code/session_01SJHJts7r2yAxmoKLLz8vpc
@ericallam ericallam force-pushed the claude/temporal-signals-versioning-xzlHf branch from db7bdea to 77553cd Compare February 27, 2026 14:25
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx (1)

491-546: ⚠️ Potential issue | 🟡 Minor

Guard isConnected updates against stale effect instances.

When stream params change quickly, an older effect can run finally -> setIsConnected(false) after a newer connection is already active, producing a false disconnected state.

Proposed fix
   useEffect(() => {
+    let active = true;
     setChunks([]);
     setError(null);
+    setIsConnected(false);
 
     const abortController = new AbortController();
     let reader: ReadableStreamDefaultReader<SSEStreamPart<unknown>> | null = null;
@@
         const stream = await sseSubscription.subscribe();
-        setIsConnected(true);
+        if (active) setIsConnected(true);
@@
-        if (!abortController.signal.aborted) {
+        if (active && !abortController.signal.aborted) {
           setError(err instanceof Error ? err : new Error(String(err)));
         }
       } finally {
-        setIsConnected(false);
+        if (active) setIsConnected(false);
       }
     }
@@
     return () => {
+      active = false;
       abortController.abort();
-      reader?.cancel();
+      void reader?.cancel();
     };
   }, [resourcePath, startIndex]);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/webapp/app/routes/resources.orgs`.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx
around lines 491 - 546, Guard updates to isConnected by tracking the current
effect/connection instance: when starting connectAndConsume generate a unique
token (store it in a ref like currentConnectionRef) and set that token before
calling setIsConnected(true); in the catch/finally and in the cleanup only call
setIsConnected(false) (and cancel reader) if the token in currentConnectionRef
still matches this effect's token (also clear the ref on cleanup or when
aborting). Use the existing abortController, reader, connectAndConsume and
setIsConnected names to locate where to add the token/ref checks so older effect
instances no longer flip the connected state for newer connections.
🧹 Nitpick comments (7)
apps/webapp/app/services/inputStreamWaitpointCache.server.ts (1)

13-31: Split configuration wiring from cache logic for testability.

This module currently combines Redis/env singleton initialization with business operations. Consider extracting pure cache operations into a testable service and keeping singleton/env wiring in a separate global config module.

As per coding guidelines "Separate testable services from configuration files; follow the pattern of realtimeClient.server.ts (testable service) and realtimeClientGlobal.server.ts (configuration) in the webapp".

Also applies to: 37-101

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/webapp/app/services/inputStreamWaitpointCache.server.ts` around lines 13
- 31, This file currently mixes Redis/env wiring (initializeRedis and the
singleton call that creates redis) with business cache logic; refactor by moving
the environment-dependent initialization and singleton binding (initializeRedis
and the const redis = singleton("inputStreamWaitpointCache", initializeRedis))
into a new global configuration module (mirror pattern in
realtimeClientGlobal.server.ts) that exports a lazily-created Redis client, and
transform this module into a pure, testable cache service that exposes functions
or a class (e.g., InputStreamWaitpointCache) which accept a Redis instance as a
constructor arg or parameter; keep all cache operation functions in
inputStreamWaitpointCache.server.ts but remove direct use of env/singleton so
tests can inject a mock Redis client.
packages/core/src/v3/inputStreams/types.ts (1)

1-1: Use a type alias for this exported contract (and make the import type-only).

This keeps the file aligned with repo TypeScript conventions and avoids a value import where only types are used.

♻️ Proposed refactor
-import { InputStreamOnceOptions } from "../realtimeStreams/types.js";
+import type { InputStreamOnceOptions } from "../realtimeStreams/types.js";

-export interface InputStreamManager {
+export type InputStreamManager = {
   /**
    * Set the current run ID and streams version. The tail connection will be
    * established lazily when `on()` or `once()` is first called, but only
    * for v2 (S2-backed) realtime streams.
    */
   setRunId(runId: string, streamsVersion?: string): void;
@@
   /**
    * Connect a tail to receive input stream records for the given run.
    */
   connectTail(runId: string, fromSeq?: number): void;
-}
+};

As per coding guidelines: **/*.{ts,tsx}: Use types over interfaces for TypeScript.

Also applies to: 3-46

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/core/src/v3/inputStreams/types.ts` at line 1, Change the value
import to a type-only import ("import type { InputStreamOnceOptions } ...") and
replace the exported contract in this file with a type alias instead of an
interface/value export; reference the existing InputStreamOnceOptions where
needed and declare the exported symbol as "export type <ContractName> = { ... }"
so the file uses type-only imports and a type alias per project conventions.
docs/designs/input-stream-wait.md (1)

339-339: Optional wording trim.

At Line 339, “exactly the same” can be shortened to “the same” for tighter phrasing.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/designs/input-stream-wait.md` at line 339, Edit the sentence containing
`.send()` and the listener methods `.wait()`, `.once()`, and `.on()` to replace
“exactly the same” with the shorter phrase “the same” so it reads that `.send()`
works the same whether the task is waiting via `.wait()`, `.once()`, or `.on()`;
update that single sentence in the paragraph referencing `.send()` to tighten
the wording.
packages/core/src/v3/utils/globals.ts (1)

5-5: Prefer a type-only import for InputStreamManager.

This prevents an unnecessary runtime import in TS transpilation paths.

♻️ Proposed tweak
-import { InputStreamManager } from "../inputStreams/types.js";
+import type { InputStreamManager } from "../inputStreams/types.js";
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/core/src/v3/utils/globals.ts` at line 5, Change the runtime import
of InputStreamManager to a type-only import to avoid emitting a needless require
at runtime: replace the current import statement that brings in
InputStreamManager from "../inputStreams/types.js" with a type-only import (e.g.
use "import type { InputStreamManager } from '../inputStreams/types.js'") so the
symbol is used only for typing and no runtime module is imported.
packages/react-hooks/src/hooks/useInputStreamSend.ts (1)

6-15: Use a type alias instead of an interface for exported TS shape.

This should follow the repository rule to prefer type over interface in TypeScript.

🔧 Proposed fix
-export interface InputStreamSendInstance<TData> {
+export type InputStreamSendInstance<TData> = {
   /** Send data to the input stream */
   send: (data: TData) => void;
   /** Whether a send is currently in progress */
   isLoading: boolean;
   /** Any error that occurred during the last send */
   error?: Error;
   /** Whether the hook is ready to send (has runId and access token) */
   isReady: boolean;
-}
+};

As per coding guidelines, **/*.{ts,tsx}: Use types over interfaces for TypeScript.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/react-hooks/src/hooks/useInputStreamSend.ts` around lines 6 - 15,
Replace the exported interface InputStreamSendInstance<TData> with an exported
type alias of the same name and shape; update the declaration from "export
interface InputStreamSendInstance<TData> { ... }" to "export type
InputStreamSendInstance<TData> = { ... }" preserving all properties (send,
isLoading, error?, isReady) and their types so callers remain unchanged.
apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts (1)

128-131: Avoid fully silent failure in the race-handling catch block.

This path intentionally keeps the request successful, but swallowing all errors without logging makes production diagnosis much harder when pre-completion fails.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/webapp/app/routes/api.v1.runs`.$runFriendlyId.input-streams.wait.ts
around lines 128 - 131, The empty catch is swallowing errors and hindering
diagnostics; change it to catch the error (e.g., catch (err)) and log a
non-fatal message with context before continuing so behavior stays the same. In
the function handling the S2 pre-completion check (the try/catch around the
"waitpoint is still PENDING" logic), call the existing logger (e.g.,
processLogger or logger) to emit a warning or debug including runFriendlyId,
waitpoint id/state and the caught error, then allow execution to continue so the
Redis-cache path still completes the waitpoint.
apps/webapp/app/services/realtime/types.ts (1)

26-35: Convert realtime contracts from interface to type aliases.

This update extends StreamIngestor via an interface, but repo rules for TypeScript prefer type over interface.

♻️ Suggested refactor
-export interface StreamIngestor {
+export type StreamIngestor = {
   initializeStream(
     runId: string,
     streamId: string
   ): Promise<{ responseHeaders?: Record<string, string> }>;
@@
   readRecords?(
     runId: string,
     streamId: string,
     afterSeqNum?: number
   ): Promise<StreamRecord[] | undefined>;
-}
+};

@@
-export interface StreamResponder {
+export type StreamResponder = {
   streamResponse(
     request: Request,
     runId: string,
     streamId: string,
     signal: AbortSignal,
     options?: StreamResponseOptions
   ): Promise<Response>;
-}
+};

As per coding guidelines: **/*.{ts,tsx}: Use types over interfaces for TypeScript.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/webapp/app/services/realtime/types.ts` around lines 26 - 35, The
realtime contract declared as an interface should be converted to a type alias
per repo rules: replace the interface declaration for StreamIngestor (and any
other interfaces in this file) with a type alias, and if the interface used
extends another interface/class, express that via an intersection type (e.g.,
type StreamIngestor = BaseType & { ... }). Ensure method signatures (like
readRecords(runId, streamId, afterSeqNum?) => Promise<StreamRecord[] |
undefined>) and exported names remain identical and update any local
references/imports if necessary.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/webapp/app/services/realtime/s2realtimeStreams.server.ts`:
- Around line 179-216: parseSSEBatchRecords currently splits on "\n\n" and only
reads a single "data:" line which breaks on CRLF and multi-line SSE data,
causing valid batch events to be dropped; update parseSSEBatchRecords to (1)
normalize line endings (replace "\r\n" with "\n") and split events on the SSE
separator pattern (two consecutive newlines) and (2) when iterating event lines,
accumulate all lines that start with "data:" by concatenating their payloads
with "\n" (do not trim inner newlines) before JSON.parse so multi-line JSON
payloads are preserved; ensure you still handle optional "event:" lines and
robustly catch JSON errors so readRecords() stops silently dropping valid
records.

In `@docker/docker-compose.yml`:
- Line 168: The current entrypoint for s2-init runs a curl with "&& ... || ..."
which masks curl/network/API failures; update the entrypoint command in
docker-compose (the entrypoint that runs the curl to
http://s2:80/v1/basins/trigger-local) so it explicitly checks the HTTP result
and only treats a 2xx (or specifically created status) as success, treats a 409
(already exists) as the benign case, and exits non‑zero for any other status or
network error; implement this by capturing curl's HTTP status (using curl
options to write out the status or using --fail and checking exit code) and
branching (if/elif/else) to echo the appropriate message and exit with failure
when the call truly failed.

In `@docs/tasks/input-streams.mdx`:
- Around line 88-90: The approval input type declared in the streams.input call
is missing the comment field referenced later (result.output.comment), so update
the generic type on the approval constant (the streams.input<{ ... }> in the
approval declaration) to include comment (e.g., add comment: string or comment?:
string depending on whether it can be absent) so the shape matches usages of
result.output.comment.

In `@packages/core/src/v3/apiClient/index.ts`:
- Around line 1397-1423: The URL path segments are built by directly
interpolating unescaped values (e.g., runId, streamId, runFriendlyId) which can
break routes if they contain reserved characters; update the callers in
sendInputStream and createInputStreamWaitpoint to encode dynamic path segments
with encodeURIComponent (or a shared helper) before concatenating into the URL
so each path segment is safely percent-encoded; ensure you apply this to any
other similar methods in this file that interpolate run or stream identifiers.

In `@packages/core/src/v3/inputStreams/manager.ts`:
- Around line 174-185: The current `#ensureStreamTailConnected` adds
this.tails.set(streamId, { abortController, promise }) but if `#runTail` rejects
the catch only logs the error and leaves a stale entry, preventing future
reconnects; update the promise.catch handler in `#ensureStreamTailConnected` (for
the promise returned by `#runTail`) to remove the stale this.tails entry (e.g.,
this.tails.delete(streamId)) and perform any necessary abortController cleanup
so subsequent calls to `#ensureStreamTailConnected` can recreate the tail; ensure
you reference the same streamId, abortController, promise variables already used
in that method.
- Around line 38-41: setRunId can leave prior run state (tails, buffers,
seqNumbers) live and cause cross-run contamination; update setRunId(runId:
string, streamsVersion?: string) to detect a runId change and clear/reset
internal per-run state (call or inline the existing reset() logic) before
assigning this.currentRunId and this.streamsVersion, or add an explicit
parameter (e.g., reset=true) to force a reset; target the setRunId method and
ensure it clears tails, buffers, sequence tracking and any per-run
timers/handlers so no previous-run state persists.
- Around line 267-274: The buffered array stored via this.buffer.get(streamId)
can grow unbounded when no consumer is attached; add a fixed, configurable cap
(e.g. MAX_BUFFER_ITEMS or a per-stream max in the manager) and check before
buffered.push(data) to prevent unlimited growth: if the buffer length >= max,
either drop oldest items (shift) or drop the incoming item and emit/log a
warning/error (or signal backpressure) so memory usage is bounded; update the
buffering logic around this.buffer.get/this.buffer.set/ buffered.push(data) and
surface the config constant to callers.

In `@packages/core/src/v3/inputStreams/noopManager.ts`:
- Around line 11-15: The once() implementation currently never settles; change
it to immediately handle InputStreamOnceOptions: if options?.signal?.aborted
reject with an AbortError (or DOMException with name 'AbortError'); if
options?.signal is provided attach an 'abort' listener that rejects; if
options?.timeoutMs is set start a timeout that rejects after timeoutMs (cleanup
listeners/timeout on settle); if neither abort nor timeout is present resolve
the Promise immediately (return Promise.resolve(undefined)). Update the once
method in noopManager.ts and reference InputStreamOnceOptions, options.signal
and options.timeoutMs when implementing.

In `@packages/core/src/v3/schemas/api.ts`:
- Around line 1373-1385: The CreateInputStreamWaitpointRequestBody schema
currently allows arbitrary `tags` and non-integer/negative `lastSeqNum`; update
validation on the CreateInputStreamWaitpointRequestBody object so that `tags`
reuses the same constraints used for run tags (e.g., non-empty string pattern
and an array variant of that same constrained string) instead of a plain
z.string(), and tighten `lastSeqNum` to only accept non-negative integers (use
integer validation with min(0)). Modify the `tags` and `lastSeqNum` validators
in CreateInputStreamWaitpointRequestBody accordingly to reference the existing
run-tag constraint and integer >= 0 rule so downstream filtering remains
consistent.

In `@packages/react-hooks/src/index.ts`:
- Line 7: The public API of `@trigger.dev/react-hooks` was expanded by exporting
useInputStreamSend from packages/react-hooks/src/index.ts (export * from
"./hooks/useInputStreamSend.js"); add a changeset documenting this public API
change by running the repository's changeset workflow (e.g., run pnpm run
changeset:add) and create a changeset entry that notes the package
"@trigger.dev/react-hooks" and the addition of useInputStreamSend so the release
tooling picks up the public API update.

---

Outside diff comments:
In
`@apps/webapp/app/routes/resources.orgs`.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx:
- Around line 491-546: Guard updates to isConnected by tracking the current
effect/connection instance: when starting connectAndConsume generate a unique
token (store it in a ref like currentConnectionRef) and set that token before
calling setIsConnected(true); in the catch/finally and in the cleanup only call
setIsConnected(false) (and cancel reader) if the token in currentConnectionRef
still matches this effect's token (also clear the ref on cleanup or when
aborting). Use the existing abortController, reader, connectAndConsume and
setIsConnected names to locate where to add the token/ref checks so older effect
instances no longer flip the connected state for newer connections.

---

Nitpick comments:
In `@apps/webapp/app/routes/api.v1.runs`.$runFriendlyId.input-streams.wait.ts:
- Around line 128-131: The empty catch is swallowing errors and hindering
diagnostics; change it to catch the error (e.g., catch (err)) and log a
non-fatal message with context before continuing so behavior stays the same. In
the function handling the S2 pre-completion check (the try/catch around the
"waitpoint is still PENDING" logic), call the existing logger (e.g.,
processLogger or logger) to emit a warning or debug including runFriendlyId,
waitpoint id/state and the caught error, then allow execution to continue so the
Redis-cache path still completes the waitpoint.

In `@apps/webapp/app/services/inputStreamWaitpointCache.server.ts`:
- Around line 13-31: This file currently mixes Redis/env wiring (initializeRedis
and the singleton call that creates redis) with business cache logic; refactor
by moving the environment-dependent initialization and singleton binding
(initializeRedis and the const redis = singleton("inputStreamWaitpointCache",
initializeRedis)) into a new global configuration module (mirror pattern in
realtimeClientGlobal.server.ts) that exports a lazily-created Redis client, and
transform this module into a pure, testable cache service that exposes functions
or a class (e.g., InputStreamWaitpointCache) which accept a Redis instance as a
constructor arg or parameter; keep all cache operation functions in
inputStreamWaitpointCache.server.ts but remove direct use of env/singleton so
tests can inject a mock Redis client.

In `@apps/webapp/app/services/realtime/types.ts`:
- Around line 26-35: The realtime contract declared as an interface should be
converted to a type alias per repo rules: replace the interface declaration for
StreamIngestor (and any other interfaces in this file) with a type alias, and if
the interface used extends another interface/class, express that via an
intersection type (e.g., type StreamIngestor = BaseType & { ... }). Ensure
method signatures (like readRecords(runId, streamId, afterSeqNum?) =>
Promise<StreamRecord[] | undefined>) and exported names remain identical and
update any local references/imports if necessary.

In `@docs/designs/input-stream-wait.md`:
- Line 339: Edit the sentence containing `.send()` and the listener methods
`.wait()`, `.once()`, and `.on()` to replace “exactly the same” with the shorter
phrase “the same” so it reads that `.send()` works the same whether the task is
waiting via `.wait()`, `.once()`, or `.on()`; update that single sentence in the
paragraph referencing `.send()` to tighten the wording.

In `@packages/core/src/v3/inputStreams/types.ts`:
- Line 1: Change the value import to a type-only import ("import type {
InputStreamOnceOptions } ...") and replace the exported contract in this file
with a type alias instead of an interface/value export; reference the existing
InputStreamOnceOptions where needed and declare the exported symbol as "export
type <ContractName> = { ... }" so the file uses type-only imports and a type
alias per project conventions.

In `@packages/core/src/v3/utils/globals.ts`:
- Line 5: Change the runtime import of InputStreamManager to a type-only import
to avoid emitting a needless require at runtime: replace the current import
statement that brings in InputStreamManager from "../inputStreams/types.js" with
a type-only import (e.g. use "import type { InputStreamManager } from
'../inputStreams/types.js'") so the symbol is used only for typing and no
runtime module is imported.

In `@packages/react-hooks/src/hooks/useInputStreamSend.ts`:
- Around line 6-15: Replace the exported interface
InputStreamSendInstance<TData> with an exported type alias of the same name and
shape; update the declaration from "export interface
InputStreamSendInstance<TData> { ... }" to "export type
InputStreamSendInstance<TData> = { ... }" preserving all properties (send,
isLoading, error?, isReady) and their types so callers remain unchanged.

ℹ️ Review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8003923 and db7bdea.

⛔ Files ignored due to path filters (5)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
  • references/hello-world/src/trigger/inputStreams.ts is excluded by !references/**
  • references/hello-world/src/trigger/realtime.ts is excluded by !references/**
  • references/hello-world/src/trigger/streams.ts is excluded by !references/**
  • references/hello-world/src/trigger/streamsV2.ts is excluded by !references/**
📒 Files selected for processing (54)
  • .changeset/input-stream-wait.md
  • .claude/skills/trigger-dev-tasks/realtime.md
  • .gitignore
  • .server-changes/input-stream-wait.md
  • apps/webapp/app/env.server.ts
  • apps/webapp/app/presenters/v3/SpanPresenter.server.ts
  • apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts
  • apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
  • apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts
  • apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx
  • apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx
  • apps/webapp/app/services/authorization.server.ts
  • apps/webapp/app/services/inputStreamWaitpointCache.server.ts
  • apps/webapp/app/services/realtime/s2realtimeStreams.server.ts
  • apps/webapp/app/services/realtime/types.ts
  • apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts
  • apps/webapp/app/v3/services/deployment.server.ts
  • apps/webapp/package.json
  • docker/docker-compose.yml
  • docs/designs/input-stream-wait.md
  • docs/docs.json
  • docs/realtime/backend/input-streams.mdx
  • docs/realtime/backend/overview.mdx
  • docs/realtime/overview.mdx
  • docs/realtime/react-hooks/input-streams.mdx
  • docs/tasks/input-streams.mdx
  • docs/tasks/streams.mdx
  • docs/wait-for-token.mdx
  • docs/wait.mdx
  • internal-packages/database/prisma/schema.prisma
  • packages/cli-v3/src/entryPoints/dev-run-worker.ts
  • packages/cli-v3/src/entryPoints/managed-run-worker.ts
  • packages/core/package.json
  • packages/core/src/v3/apiClient/index.ts
  • packages/core/src/v3/index.ts
  • packages/core/src/v3/input-streams-api.ts
  • packages/core/src/v3/inputStreams/index.ts
  • packages/core/src/v3/inputStreams/manager.ts
  • packages/core/src/v3/inputStreams/noopManager.ts
  • packages/core/src/v3/inputStreams/types.ts
  • packages/core/src/v3/realtimeStreams/streamInstance.ts
  • packages/core/src/v3/realtimeStreams/streamsWriterV2.ts
  • packages/core/src/v3/realtimeStreams/types.ts
  • packages/core/src/v3/schemas/api.ts
  • packages/core/src/v3/utils/globals.ts
  • packages/core/src/v3/waitpoints/index.ts
  • packages/core/src/v3/workers/index.ts
  • packages/react-hooks/src/hooks/useInputStreamSend.ts
  • packages/react-hooks/src/index.ts
  • packages/trigger-sdk/src/v3/auth.ts
  • packages/trigger-sdk/src/v3/streams.ts
  • packages/trigger-sdk/src/v3/wait.ts
  • rules/4.4.2/realtime.md
  • rules/manifest.json

Comment on lines +179 to +216
private parseSSEBatchRecords(sseText: string): StreamRecord[] {
const records: StreamRecord[] = [];

// SSE events are separated by double newlines
const events = sseText.split("\n\n").filter((e) => e.trim());

for (const event of events) {
const lines = event.split("\n");
let eventType: string | undefined;
let data: string | undefined;

for (const line of lines) {
if (line.startsWith("event:")) {
eventType = line.slice(6).trim();
} else if (line.startsWith("data:")) {
data = line.slice(5).trim();
}
}

if (eventType === "batch" && data) {
try {
const parsed = JSON.parse(data) as {
records: Array<{ body: string; seq_num: number; timestamp: number }>;
};

for (const record of parsed.records) {
const parsedBody = JSON.parse(record.body) as { data: string; id: string };
records.push({
data: parsedBody.data,
id: parsedBody.id,
seqNum: record.seq_num,
});
}
} catch {
// Skip malformed events
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

SSE parsing is fragile and can silently drop records.

parseSSEBatchRecords() assumes \n\n separators and a single data: line. If S2/proxies emit CRLF (\r\n) or multi-line data: payloads, valid batch events can be missed and readRecords() may return empty unexpectedly.

🐛 Suggested fix
   private parseSSEBatchRecords(sseText: string): StreamRecord[] {
     const records: StreamRecord[] = [];

-    // SSE events are separated by double newlines
-    const events = sseText.split("\n\n").filter((e) => e.trim());
+    // Normalize newlines, then split SSE events on blank lines
+    const normalized = sseText.replace(/\r\n/g, "\n");
+    const events = normalized.split("\n\n").filter((e) => e.trim());

     for (const event of events) {
       const lines = event.split("\n");
       let eventType: string | undefined;
-      let data: string | undefined;
+      const dataLines: string[] = [];

       for (const line of lines) {
         if (line.startsWith("event:")) {
           eventType = line.slice(6).trim();
         } else if (line.startsWith("data:")) {
-          data = line.slice(5).trim();
+          dataLines.push(line.slice(5).trimStart());
         }
       }

-      if (eventType === "batch" && data) {
+      if (eventType === "batch" && dataLines.length > 0) {
         try {
-          const parsed = JSON.parse(data) as {
+          const parsed = JSON.parse(dataLines.join("\n")) as {
             records: Array<{ body: string; seq_num: number; timestamp: number }>;
           };

           for (const record of parsed.records) {
             const parsedBody = JSON.parse(record.body) as { data: string; id: string };
             records.push({
               data: parsedBody.data,
               id: parsedBody.id,
               seqNum: record.seq_num,
             });
           }
         } catch {
           // Skip malformed events
         }
       }
     }

     return records;
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/webapp/app/services/realtime/s2realtimeStreams.server.ts` around lines
179 - 216, parseSSEBatchRecords currently splits on "\n\n" and only reads a
single "data:" line which breaks on CRLF and multi-line SSE data, causing valid
batch events to be dropped; update parseSSEBatchRecords to (1) normalize line
endings (replace "\r\n" with "\n") and split events on the SSE separator pattern
(two consecutive newlines) and (2) when iterating event lines, accumulate all
lines that start with "data:" by concatenating their payloads with "\n" (do not
trim inner newlines) before JSON.parse so multi-line JSON payloads are
preserved; ensure you still handle optional "event:" lines and robustly catch
JSON errors so readRecords() stops silently dropping valid records.

networks:
- app_network
restart: "no"
entrypoint: ["sh", "-c", "curl -sf -X PUT http://s2:80/v1/basins/trigger-local -H 'Content-Type: application/json' -d '{\"config\":{\"create_stream_on_append\":true,\"create_stream_on_read\":true}}' && echo ' Basin trigger-local ready' || echo ' Basin trigger-local already exists'"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

s2-init currently hides real initialization failures.

curl ... && echo "ready" || echo "already exists" exits 0 on any curl failure, so network/API errors are misreported as success.

Proposed fix
-    entrypoint: ["sh", "-c", "curl -sf -X PUT http://s2:80/v1/basins/trigger-local -H 'Content-Type: application/json' -d '{\"config\":{\"create_stream_on_append\":true,\"create_stream_on_read\":true}}' && echo ' Basin trigger-local ready' || echo ' Basin trigger-local already exists'"]
+    entrypoint:
+      - sh
+      - -c
+      - |
+        status="$(curl -sS -o /tmp/s2-init.out -w '%{http_code}' -X PUT http://s2:80/v1/basins/trigger-local -H 'Content-Type: application/json' -d '{"config":{"create_stream_on_append":true,"create_stream_on_read":true}}' || true)"
+        if [ "$status" = "200" ] || [ "$status" = "201" ]; then
+          echo "Basin trigger-local ready"
+        elif [ "$status" = "409" ]; then
+          echo "Basin trigger-local already exists"
+        else
+          echo "Failed to initialize basin (status: $status)"
+          cat /tmp/s2-init.out
+          exit 1
+        fi
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
entrypoint: ["sh", "-c", "curl -sf -X PUT http://s2:80/v1/basins/trigger-local -H 'Content-Type: application/json' -d '{\"config\":{\"create_stream_on_append\":true,\"create_stream_on_read\":true}}' && echo ' Basin trigger-local ready' || echo ' Basin trigger-local already exists'"]
entrypoint:
- sh
- -c
- |
status="$(curl -sS -o /tmp/s2-init.out -w '%{http_code}' -X PUT http://s2:80/v1/basins/trigger-local -H 'Content-Type: application/json' -d '{"config":{"create_stream_on_append":true,"create_stream_on_read":true}}' || true)"
if [ "$status" = "200" ] || [ "$status" = "201" ]; then
echo "Basin trigger-local ready"
elif [ "$status" = "409" ]; then
echo "Basin trigger-local already exists"
else
echo "Failed to initialize basin (status: $status)"
cat /tmp/s2-init.out
exit 1
fi
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docker/docker-compose.yml` at line 168, The current entrypoint for s2-init
runs a curl with "&& ... || ..." which masks curl/network/API failures; update
the entrypoint command in docker-compose (the entrypoint that runs the curl to
http://s2:80/v1/basins/trigger-local) so it explicitly checks the HTTP result
and only treats a 2xx (or specifically created status) as success, treats a 409
(already exists) as the benign case, and exits non‑zero for any other status or
network error; implement this by capturing curl's HTTP status (using curl
options to write out the status or using --fail and checking exit code) and
branching (if/elif/else) to echo the appropriate message and exit with failure
when the call truly failed.

Comment on lines 88 to 90
export const approval = streams.input<{ approved: boolean; reviewer: string }>({
id: "approval",
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Type mismatch in approval example causes a TS error.

approval is typed without comment (Lines 88-90), but result.output.comment is read at Line 138.

🛠️ Proposed doc fix
-export const approval = streams.input<{ approved: boolean; reviewer: string }>({
+export const approval = streams.input<{
+  approved: boolean;
+  reviewer: string;
+  comment?: string;
+}>({
   id: "approval",
 });

Also applies to: 138-139

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/tasks/input-streams.mdx` around lines 88 - 90, The approval input type
declared in the streams.input call is missing the comment field referenced later
(result.output.comment), so update the generic type on the approval constant
(the streams.input<{ ... }> in the approval declaration) to include comment
(e.g., add comment: string or comment?: string depending on whether it can be
absent) so the shape matches usages of result.output.comment.

Comment on lines +1397 to +1423
async sendInputStream(
runId: string,
streamId: string,
data: unknown,
requestOptions?: ZodFetchOptions
) {
return zodfetch(
SendInputStreamResponseBody,
`${this.baseUrl}/realtime/v1/streams/${runId}/input/${streamId}`,
{
method: "POST",
headers: this.#getHeaders(false),
body: JSON.stringify({ data }),
},
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
);
}

async createInputStreamWaitpoint(
runFriendlyId: string,
body: CreateInputStreamWaitpointRequestBody,
requestOptions?: ZodFetchOptions
) {
return zodfetch(
CreateInputStreamWaitpointResponseBody,
`${this.baseUrl}/api/v1/runs/${runFriendlyId}/input-streams/wait`,
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Encode dynamic path segments before building input-stream URLs.

runId, streamId, and runFriendlyId are interpolated directly into URL path segments. If they contain reserved characters (e.g. /, ?, %), requests can hit the wrong endpoint.

🔧 Proposed fix
   async sendInputStream(
     runId: string,
     streamId: string,
     data: unknown,
     requestOptions?: ZodFetchOptions
   ) {
+    const encodedRunId = encodeURIComponent(runId);
+    const encodedStreamId = encodeURIComponent(streamId);
+
     return zodfetch(
       SendInputStreamResponseBody,
-      `${this.baseUrl}/realtime/v1/streams/${runId}/input/${streamId}`,
+      `${this.baseUrl}/realtime/v1/streams/${encodedRunId}/input/${encodedStreamId}`,
       {
         method: "POST",
         headers: this.#getHeaders(false),
         body: JSON.stringify({ data }),
       },
       mergeRequestOptions(this.defaultRequestOptions, requestOptions)
     );
   }

   async createInputStreamWaitpoint(
     runFriendlyId: string,
     body: CreateInputStreamWaitpointRequestBody,
     requestOptions?: ZodFetchOptions
   ) {
+    const encodedRunFriendlyId = encodeURIComponent(runFriendlyId);
+
     return zodfetch(
       CreateInputStreamWaitpointResponseBody,
-      `${this.baseUrl}/api/v1/runs/${runFriendlyId}/input-streams/wait`,
+      `${this.baseUrl}/api/v1/runs/${encodedRunFriendlyId}/input-streams/wait`,
       {
         method: "POST",
         headers: this.#getHeaders(false),
         body: JSON.stringify(body),
       },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async sendInputStream(
runId: string,
streamId: string,
data: unknown,
requestOptions?: ZodFetchOptions
) {
return zodfetch(
SendInputStreamResponseBody,
`${this.baseUrl}/realtime/v1/streams/${runId}/input/${streamId}`,
{
method: "POST",
headers: this.#getHeaders(false),
body: JSON.stringify({ data }),
},
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
);
}
async createInputStreamWaitpoint(
runFriendlyId: string,
body: CreateInputStreamWaitpointRequestBody,
requestOptions?: ZodFetchOptions
) {
return zodfetch(
CreateInputStreamWaitpointResponseBody,
`${this.baseUrl}/api/v1/runs/${runFriendlyId}/input-streams/wait`,
{
async sendInputStream(
runId: string,
streamId: string,
data: unknown,
requestOptions?: ZodFetchOptions
) {
const encodedRunId = encodeURIComponent(runId);
const encodedStreamId = encodeURIComponent(streamId);
return zodfetch(
SendInputStreamResponseBody,
`${this.baseUrl}/realtime/v1/streams/${encodedRunId}/input/${encodedStreamId}`,
{
method: "POST",
headers: this.#getHeaders(false),
body: JSON.stringify({ data }),
},
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
);
}
async createInputStreamWaitpoint(
runFriendlyId: string,
body: CreateInputStreamWaitpointRequestBody,
requestOptions?: ZodFetchOptions
) {
const encodedRunFriendlyId = encodeURIComponent(runFriendlyId);
return zodfetch(
CreateInputStreamWaitpointResponseBody,
`${this.baseUrl}/api/v1/runs/${encodedRunFriendlyId}/input-streams/wait`,
{
method: "POST",
headers: this.#getHeaders(false),
body: JSON.stringify(body),
},
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/core/src/v3/apiClient/index.ts` around lines 1397 - 1423, The URL
path segments are built by directly interpolating unescaped values (e.g., runId,
streamId, runFriendlyId) which can break routes if they contain reserved
characters; update the callers in sendInputStream and createInputStreamWaitpoint
to encode dynamic path segments with encodeURIComponent (or a shared helper)
before concatenating into the URL so each path segment is safely
percent-encoded; ensure you apply this to any other similar methods in this file
that interpolate run or stream identifiers.

Comment on lines +38 to +41
setRunId(runId: string, streamsVersion?: string): void {
this.currentRunId = runId;
this.streamsVersion = streamsVersion;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

setRunId() can leak previous run state into the next run.

If setRunId() is called with a new run ID without an explicit reset(), existing tails/buffers/seq numbers remain live and can mix cross-run data.

🔧 Proposed fix
   setRunId(runId: string, streamsVersion?: string): void {
+    if (this.currentRunId && this.currentRunId !== runId) {
+      this.reset();
+    }
     this.currentRunId = runId;
     this.streamsVersion = streamsVersion;
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
setRunId(runId: string, streamsVersion?: string): void {
this.currentRunId = runId;
this.streamsVersion = streamsVersion;
}
setRunId(runId: string, streamsVersion?: string): void {
if (this.currentRunId && this.currentRunId !== runId) {
this.reset();
}
this.currentRunId = runId;
this.streamsVersion = streamsVersion;
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/core/src/v3/inputStreams/manager.ts` around lines 38 - 41, setRunId
can leave prior run state (tails, buffers, seqNumbers) live and cause cross-run
contamination; update setRunId(runId: string, streamsVersion?: string) to detect
a runId change and clear/reset internal per-run state (call or inline the
existing reset() logic) before assigning this.currentRunId and
this.streamsVersion, or add an explicit parameter (e.g., reset=true) to force a
reset; target the setRunId method and ensure it clears tails, buffers, sequence
tracking and any per-run timers/handlers so no previous-run state persists.

Comment on lines +174 to +185
#ensureStreamTailConnected(streamId: string): void {
if (!this.tails.has(streamId) && this.currentRunId) {
const abortController = new AbortController();
const promise = this.#runTail(this.currentRunId, streamId, abortController.signal).catch(
(error) => {
if (this.debug) {
console.error(`[InputStreamManager] Tail error for "${streamId}":`, error);
}
}
);
this.tails.set(streamId, { abortController, promise });
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Tail failure currently leaves a stale tail entry and prevents reconnect.

When #runTail() fails, the error is caught, but this.tails still retains streamId. Future calls to #ensureStreamTailConnected() won't recreate the tail.

🔧 Proposed fix
   `#ensureStreamTailConnected`(streamId: string): void {
     if (!this.tails.has(streamId) && this.currentRunId) {
       const abortController = new AbortController();
       const promise = this.#runTail(this.currentRunId, streamId, abortController.signal).catch(
         (error) => {
           if (this.debug) {
             console.error(`[InputStreamManager] Tail error for "${streamId}":`, error);
           }
         }
-      );
+      ).finally(() => {
+        this.tails.delete(streamId);
+
+        const hasHandlers = (this.handlers.get(streamId)?.size ?? 0) > 0;
+        const hasWaiters = (this.onceWaiters.get(streamId)?.length ?? 0) > 0;
+
+        if (!abortController.signal.aborted && this.currentRunId && (hasHandlers || hasWaiters)) {
+          this.#ensureStreamTailConnected(streamId);
+        }
+      });
       this.tails.set(streamId, { abortController, promise });
     }
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/core/src/v3/inputStreams/manager.ts` around lines 174 - 185, The
current `#ensureStreamTailConnected` adds this.tails.set(streamId, {
abortController, promise }) but if `#runTail` rejects the catch only logs the
error and leaves a stale entry, preventing future reconnects; update the
promise.catch handler in `#ensureStreamTailConnected` (for the promise returned by
`#runTail`) to remove the stale this.tails entry (e.g.,
this.tails.delete(streamId)) and perform any necessary abortController cleanup
so subsequent calls to `#ensureStreamTailConnected` can recreate the tail; ensure
you reference the same streamId, abortController, promise variables already used
in that method.

Comment on lines +267 to +274
// No handlers, buffer the data
let buffered = this.buffer.get(streamId);
if (!buffered) {
buffered = [];
this.buffer.set(streamId, buffered);
}
buffered.push(data);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Buffer growth is unbounded when no consumer is attached.

If data keeps arriving with no handlers/waiters, this array can grow without limit and cause memory pressure for long-lived workers.

🔧 Proposed fix
+const MAX_BUFFERED_ITEMS_PER_STREAM = 1000;
+
   `#dispatch`(streamId: string, data: unknown): void {
@@
     let buffered = this.buffer.get(streamId);
     if (!buffered) {
       buffered = [];
       this.buffer.set(streamId, buffered);
     }
+    if (buffered.length >= MAX_BUFFERED_ITEMS_PER_STREAM) {
+      buffered.shift();
+    }
     buffered.push(data);
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// No handlers, buffer the data
let buffered = this.buffer.get(streamId);
if (!buffered) {
buffered = [];
this.buffer.set(streamId, buffered);
}
buffered.push(data);
}
// No handlers, buffer the data
let buffered = this.buffer.get(streamId);
if (!buffered) {
buffered = [];
this.buffer.set(streamId, buffered);
}
if (buffered.length >= MAX_BUFFERED_ITEMS_PER_STREAM) {
buffered.shift();
}
buffered.push(data);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/core/src/v3/inputStreams/manager.ts` around lines 267 - 274, The
buffered array stored via this.buffer.get(streamId) can grow unbounded when no
consumer is attached; add a fixed, configurable cap (e.g. MAX_BUFFER_ITEMS or a
per-stream max in the manager) and check before buffered.push(data) to prevent
unlimited growth: if the buffer length >= max, either drop oldest items (shift)
or drop the incoming item and emit/log a warning/error (or signal backpressure)
so memory usage is bounded; update the buffering logic around
this.buffer.get/this.buffer.set/ buffered.push(data) and surface the config
constant to callers.

Comment on lines +11 to +15
once(_streamId: string, _options?: InputStreamOnceOptions): Promise<unknown> {
return new Promise(() => {
// Never resolves in noop mode
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

once() should respect abort/timeout options instead of hanging forever.

The current implementation never settles, even when options.signal aborts or options.timeoutMs is set. That can strand callers waiting indefinitely in noop mode.

Proposed fix
   once(_streamId: string, _options?: InputStreamOnceOptions): Promise<unknown> {
-    return new Promise(() => {
-      // Never resolves in noop mode
-    });
+    return new Promise((_resolve, reject) => {
+      const options = _options;
+
+      if (options?.signal?.aborted) {
+        reject(options.signal.reason ?? new Error("Aborted"));
+        return;
+      }
+
+      let timeoutHandle: ReturnType<typeof setTimeout> | undefined;
+      const onAbort = () => {
+        if (timeoutHandle) clearTimeout(timeoutHandle);
+        reject(options?.signal?.reason ?? new Error("Aborted"));
+      };
+
+      options?.signal?.addEventListener("abort", onAbort, { once: true });
+
+      if (typeof options?.timeoutMs === "number") {
+        timeoutHandle = setTimeout(() => {
+          options?.signal?.removeEventListener("abort", onAbort);
+          reject(new Error(`Timed out waiting for input stream data after ${options.timeoutMs}ms`));
+        }, options.timeoutMs);
+      }
+    });
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/core/src/v3/inputStreams/noopManager.ts` around lines 11 - 15, The
once() implementation currently never settles; change it to immediately handle
InputStreamOnceOptions: if options?.signal?.aborted reject with an AbortError
(or DOMException with name 'AbortError'); if options?.signal is provided attach
an 'abort' listener that rejects; if options?.timeoutMs is set start a timeout
that rejects after timeoutMs (cleanup listeners/timeout on settle); if neither
abort nor timeout is present resolve the Promise immediately (return
Promise.resolve(undefined)). Update the once method in noopManager.ts and
reference InputStreamOnceOptions, options.signal and options.timeoutMs when
implementing.

Comment on lines +1373 to +1385
export const CreateInputStreamWaitpointRequestBody = z.object({
streamId: z.string(),
timeout: z.string().optional(),
idempotencyKey: z.string().optional(),
idempotencyKeyTTL: z.string().optional(),
tags: z.union([z.string(), z.array(z.string())]).optional(),
/**
* The last S2 sequence number the client has seen on this input stream.
* Used to check for data that arrived before .wait() was called.
* If undefined, the server checks from the beginning of the stream.
*/
lastSeqNum: z.number().optional(),
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Strengthen input-stream waitpoint validation constraints.

tags currently bypasses existing run-tag constraints, and lastSeqNum accepts floats/negative values. This can produce invalid filtering windows and inconsistent tag handling downstream.

🔧 Proposed fix
 export const CreateInputStreamWaitpointRequestBody = z.object({
   streamId: z.string(),
-  timeout: z.string().optional(),
+  timeout: TimePeriod.optional(),
   idempotencyKey: z.string().optional(),
   idempotencyKeyTTL: z.string().optional(),
-  tags: z.union([z.string(), z.array(z.string())]).optional(),
+  tags: RunTags.optional(),
   /**
    * The last S2 sequence number the client has seen on this input stream.
    * Used to check for data that arrived before .wait() was called.
    * If undefined, the server checks from the beginning of the stream.
    */
-  lastSeqNum: z.number().optional(),
+  lastSeqNum: z.number().int().nonnegative().optional(),
 });
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/core/src/v3/schemas/api.ts` around lines 1373 - 1385, The
CreateInputStreamWaitpointRequestBody schema currently allows arbitrary `tags`
and non-integer/negative `lastSeqNum`; update validation on the
CreateInputStreamWaitpointRequestBody object so that `tags` reuses the same
constraints used for run tags (e.g., non-empty string pattern and an array
variant of that same constrained string) instead of a plain z.string(), and
tighten `lastSeqNum` to only accept non-negative integers (use integer
validation with min(0)). Modify the `tags` and `lastSeqNum` validators in
CreateInputStreamWaitpointRequestBody accordingly to reference the existing
run-tag constraint and integer >= 0 rule so downstream filtering remains
consistent.

export * from "./hooks/useRealtime.js";
export * from "./hooks/useTaskTrigger.js";
export * from "./hooks/useWaitToken.js";
export * from "./hooks/useInputStreamSend.js";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check if .changeset directory exists
if [ ! -d .changeset ]; then
  echo "Missing .changeset directory"
  exit 1
fi

echo "=== Searching changesets for react-hooks package mentions ==="
rg -n --glob '*.md' '@trigger\.dev/react-hooks|react-hooks' .changeset || echo "No matches found"

echo ""
echo "=== Listing all changeset files ==="
fd -e 'md' . .changeset | head -20

echo ""
echo "=== Checking content of recent changesets ==="
ls -lt .changeset/*.md 2>/dev/null | head -10

Repository: triggerdotdev/trigger.dev

Length of output: 787


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Content of input-stream-wait.md ==="
cat .changeset/input-stream-wait.md

echo ""
echo "=== Content of modern-boxes-watch.md ==="
cat .changeset/modern-boxes-watch.md

echo ""
echo "=== Content of metal-steaks-try.md ==="
cat .changeset/metal-steaks-try.md

echo ""
echo "=== Content of fix-batch-queue-processing.md ==="
cat .changeset/fix-batch-queue-processing.md

Repository: triggerdotdev/trigger.dev

Length of output: 1093


Add a changeset for the public API expansion of @trigger.dev/react-hooks.

Line 7 adds useInputStreamSend to the public export surface. Per the coding guidelines, modifications to public packages under packages/* require a changeset entry. Run pnpm run changeset:add to document this change.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/react-hooks/src/index.ts` at line 7, The public API of
`@trigger.dev/react-hooks` was expanded by exporting useInputStreamSend from
packages/react-hooks/src/index.ts (export * from
"./hooks/useInputStreamSend.js"); add a changeset documenting this public API
change by running the repository's changeset workflow (e.g., run pnpm run
changeset:add) and create a changeset entry that notes the package
"@trigger.dev/react-hooks" and the addition of useInputStreamSend so the release
tooling picks up the public API update.

@ericallam ericallam force-pushed the claude/temporal-signals-versioning-xzlHf branch from 77553cd to f0f7315 Compare February 27, 2026 14:28
@ericallam ericallam closed this Feb 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants