diff --git a/.changeset/input-stream-wait.md b/.changeset/input-stream-wait.md new file mode 100644 index 00000000000..96aeba1cdf2 --- /dev/null +++ b/.changeset/input-stream-wait.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/core": patch +"@trigger.dev/sdk": patch +--- + +Add `.wait()` method to input streams for suspending tasks while waiting for data. Unlike `.once()` which keeps the task process alive, `.wait()` suspends the task entirely, freeing compute resources. The task resumes when data arrives via `.send()`. diff --git a/.claude/skills/trigger-dev-tasks/realtime.md b/.claude/skills/trigger-dev-tasks/realtime.md index c1c4c5821a9..811f13ebba0 100644 --- a/.claude/skills/trigger-dev-tasks/realtime.md +++ b/.claude/skills/trigger-dev-tasks/realtime.md @@ -9,6 +9,7 @@ Realtime allows you to: - Subscribe to run status changes, metadata updates, and streams - Build real-time dashboards and UI updates - Monitor task progress from frontend and backend +- Send data into running tasks with input streams ## Authentication @@ -103,6 +104,178 @@ for await (const chunk of stream) { } ``` +## Input Streams + +Input streams let you send data **into** a running task from your backend or frontend. Output streams send data out of tasks; input streams complete the loop. + +### Problems Input Streams Solve + +**Cancelling AI streams mid-generation.** When you use AI SDK's `streamText` inside a task, the LLM keeps generating tokens until it's done — even if the user has navigated away or clicked "Stop generating." Without input streams, there's no way to tell the running task to abort. With input streams, your frontend sends a cancel signal and the task aborts the LLM call immediately. + +**Human-in-the-loop workflows.** A task generates a draft, then pauses and waits for user approval before proceeding. + +**Interactive agents.** An AI agent running as a task needs follow-up information from the user mid-execution. + +### Defining Input Streams + +```ts +// trigger/streams.ts +import { streams } from "@trigger.dev/sdk"; + +export const cancelSignal = streams.input<{ reason?: string }>({ id: "cancel" }); +export const approval = streams.input<{ approved: boolean; reviewer: string }>({ id: "approval" }); +``` + +### Receiving Data Inside a Task + +#### `wait()` — Suspend until data arrives (recommended for long waits) + +Suspends the task entirely, freeing compute. Returns `ManualWaitpointPromise` (same as `wait.forToken()`). + +```ts +import { task } from "@trigger.dev/sdk"; +import { approval } from "./streams"; + +export const publishPost = task({ + id: "publish-post", + run: async (payload: { postId: string }) => { + const draft = await prepareDraft(payload.postId); + await notifyReviewer(draft); + + // Suspend — no compute cost while waiting + const result = await approval.wait({ timeout: "7d" }); + + if (result.ok) { + return { published: result.output.approved }; + } + return { published: false, timedOut: true }; + }, +}); +``` + +Options: `timeout` (period string), `idempotencyKey`, `idempotencyKeyTTL`, `tags`. + +Use `.unwrap()` to throw on timeout: `const data = await approval.wait({ timeout: "24h" }).unwrap();` + +**Use `.wait()` when:** nothing to do until data arrives, wait could be long, want zero compute cost. + +#### `once()` — Wait for the next value (non-suspending) + +Keeps the task process alive. Use for short waits or when doing concurrent work. + +```ts +import { task } from "@trigger.dev/sdk"; +import { approval } from "./streams"; + +export const draftEmailTask = task({ + id: "draft-email", + run: async (payload: { to: string; subject: string }) => { + const draft = await generateDraft(payload); + const result = await approval.once(); // Blocks until data arrives + + if (result.approved) { + await sendEmail(draft); + } + return { sent: result.approved, reviewer: result.reviewer }; + }, +}); +``` + +Options: `once({ timeoutMs: 300_000 })` or `once({ signal: controller.signal })`. + +**Use `.once()` when:** wait is short, doing concurrent work, need AbortSignal support. + +#### `on()` — Listen for every value + +```ts +import { task } from "@trigger.dev/sdk"; +import { cancelSignal } from "./streams"; + +export const streamingTask = task({ + id: "streaming-task", + run: async (payload: { prompt: string }) => { + const controller = new AbortController(); + + const sub = cancelSignal.on(() => { + controller.abort(); + }); + + try { + const result = await streamText({ + model: openai("gpt-4o"), + prompt: payload.prompt, + abortSignal: controller.signal, + }); + return result; + } finally { + sub.off(); + } + }, +}); +``` + +#### `peek()` — Non-blocking check + +```ts +const latest = cancelSignal.peek(); // undefined if nothing received yet +``` + +### Sending Data to a Running Task + +```ts +import { cancelSignal, approval } from "./trigger/streams"; + +await cancelSignal.send(runId, { reason: "User clicked stop" }); +await approval.send(runId, { approved: true, reviewer: "alice@example.com" }); +``` + +### Full Example: Cancellable AI Streaming + +```ts +// trigger/streams.ts +import { streams } from "@trigger.dev/sdk"; + +export const aiOutput = streams.define({ id: "ai" }); +export const cancelStream = streams.input<{ reason?: string }>({ id: "cancel" }); +``` + +```ts +// trigger/ai-task.ts +import { task } from "@trigger.dev/sdk"; +import { streamText } from "ai"; +import { openai } from "@ai-sdk/openai"; +import { aiOutput, cancelStream } from "./streams"; + +export const aiTask = task({ + id: "ai-chat", + run: async (payload: { prompt: string }) => { + const controller = new AbortController(); + const sub = cancelStream.on(() => controller.abort()); + + try { + const result = streamText({ + model: openai("gpt-4o"), + prompt: payload.prompt, + abortSignal: controller.signal, + }); + + const { waitUntilComplete } = aiOutput.pipe(result.textStream); + await waitUntilComplete(); + return { text: await result.text }; + } finally { + sub.off(); + } + }, +}); +``` + +### Important Notes + +- Input streams require v2 realtime streams (SDK 4.1.0+). Calling `.on()` or `.once()` without v2 throws an error. +- Cannot send data to completed/failed/canceled runs. +- Max 1MB per `.send()` call. +- Data sent before a listener is registered gets buffered and delivered when a listener attaches. + ## React Frontend Usage ### Installation @@ -242,3 +415,5 @@ Key properties available in run subscriptions: - **Handle errors**: Always check for errors in hooks and subscriptions - **Type safety**: Use task types for proper payload/output typing - **Cleanup subscriptions**: Backend subscriptions auto-complete, frontend hooks auto-cleanup +- **Clean up input stream listeners**: Always call `.off()` in a `finally` block to avoid leaks +- **Use timeouts with `once()`**: Avoid hanging indefinitely if the signal never arrives diff --git a/.gitignore b/.gitignore index 071b9b59035..5f6adddba0a 100644 --- a/.gitignore +++ b/.gitignore @@ -67,4 +67,5 @@ apps/**/public/build **/.claude/settings.local.json .mcp.log .mcp.json -.cursor/debug.log \ No newline at end of file +.cursor/debug.log +ailogger-output.log \ No newline at end of file diff --git a/.server-changes/input-stream-wait.md b/.server-changes/input-stream-wait.md new file mode 100644 index 00000000000..dc3f05b2821 --- /dev/null +++ b/.server-changes/input-stream-wait.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Add input stream `.wait()` support: new API route for creating input-stream-linked waitpoints, Redis cache for fast waitpoint lookup from `.send()`, and waitpoint completion bridging in the send route. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index fa019f2f75e..97f702c7444 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1344,6 +1344,8 @@ const EnvironmentSchema = z REALTIME_STREAMS_S2_BASIN: z.string().optional(), REALTIME_STREAMS_S2_ACCESS_TOKEN: z.string().optional(), + REALTIME_STREAMS_S2_ENDPOINT: z.string().optional(), + REALTIME_STREAMS_S2_SKIP_ACCESS_TOKENS: z.enum(["true", "false"]).default("false"), REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS: z.coerce .number() .int() diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index a85d8b20dd2..ce83c2e242b 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -629,6 +629,41 @@ export class SpanPresenter extends BasePresenter { }, }; } + case "input-stream": { + if (!span.entity.id) { + logger.error(`SpanPresenter: No input stream id`, { + spanId, + inputStreamId: span.entity.id, + }); + return { ...data, entity: null }; + } + + const [runId, streamId] = span.entity.id.split(":"); + + if (!runId || !streamId) { + logger.error(`SpanPresenter: Invalid input stream id`, { + spanId, + inputStreamId: span.entity.id, + }); + return { ...data, entity: null }; + } + + // Translate user-facing stream ID to internal S2 stream name + const s2StreamKey = `$trigger.input:${streamId}`; + + return { + ...data, + entity: { + type: "realtime-stream" as const, + object: { + runId, + streamKey: s2StreamKey, + displayName: streamId, + metadata: undefined, + }, + }, + }; + } default: return { ...data, entity: null }; } diff --git a/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts b/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts new file mode 100644 index 00000000000..eeb6673a43f --- /dev/null +++ b/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts @@ -0,0 +1,150 @@ +import { json } from "@remix-run/server-runtime"; +import { z } from "zod"; +import { + CreateInputStreamWaitpointRequestBody, + type CreateInputStreamWaitpointResponseBody, +} from "@trigger.dev/core/v3"; +import { WaitpointId } from "@trigger.dev/core/v3/isomorphic"; +import { $replica } from "~/db.server"; +import { createWaitpointTag, MAX_TAGS_PER_WAITPOINT } from "~/models/waitpointTag.server"; +import { + deleteInputStreamWaitpoint, + setInputStreamWaitpoint, +} from "~/services/inputStreamWaitpointCache.server"; +import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; +import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; +import { parseDelay } from "~/utils/delays"; +import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; +import { engine } from "~/v3/runEngine.server"; +import { ServiceValidationError } from "~/v3/services/baseService.server"; + +const ParamsSchema = z.object({ + runFriendlyId: z.string(), +}); + +const { action, loader } = createActionApiRoute( + { + params: ParamsSchema, + body: CreateInputStreamWaitpointRequestBody, + maxContentLength: 1024 * 10, // 10KB + method: "POST", + }, + async ({ authentication, body, params }) => { + try { + const run = await $replica.taskRun.findFirst({ + where: { + friendlyId: params.runFriendlyId, + runtimeEnvironmentId: authentication.environment.id, + }, + select: { + id: true, + friendlyId: true, + realtimeStreamsVersion: true, + }, + }); + + if (!run) { + return json({ error: "Run not found" }, { status: 404 }); + } + + const idempotencyKeyExpiresAt = body.idempotencyKeyTTL + ? resolveIdempotencyKeyTTL(body.idempotencyKeyTTL) + : undefined; + + const timeout = await parseDelay(body.timeout); + + // Process tags (same pattern as api.v1.waitpoints.tokens.ts) + const bodyTags = typeof body.tags === "string" ? [body.tags] : body.tags; + + if (bodyTags && bodyTags.length > MAX_TAGS_PER_WAITPOINT) { + throw new ServiceValidationError( + `Waitpoints can only have ${MAX_TAGS_PER_WAITPOINT} tags, you're trying to set ${bodyTags.length}.` + ); + } + + if (bodyTags && bodyTags.length > 0) { + for (const tag of bodyTags) { + await createWaitpointTag({ + tag, + environmentId: authentication.environment.id, + projectId: authentication.environment.projectId, + }); + } + } + + // Step 1: Create the waitpoint + const result = await engine.createManualWaitpoint({ + environmentId: authentication.environment.id, + projectId: authentication.environment.projectId, + idempotencyKey: body.idempotencyKey, + idempotencyKeyExpiresAt, + timeout, + tags: bodyTags, + }); + + // Step 2: Cache the mapping in Redis for fast lookup from .send() + const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined; + await setInputStreamWaitpoint( + run.friendlyId, + body.streamId, + result.waitpoint.id, + ttlMs && ttlMs > 0 ? ttlMs : undefined + ); + + // Step 3: Check if data was already sent to this input stream (race condition handling). + // If .send() landed before .wait(), the data is in the S2 stream but no waitpoint + // existed to complete. We check from the client's last known position. + if (!result.isCached) { + try { + const realtimeStream = getRealtimeStreamInstance( + authentication.environment, + run.realtimeStreamsVersion + ); + + if (realtimeStream.readRecords) { + const records = await realtimeStream.readRecords( + run.friendlyId, + `$trigger.input:${body.streamId}`, + body.lastSeqNum + ); + + if (records.length > 0) { + const record = records[0]!; + + // Record data is the raw user payload — no wrapper to unwrap + await engine.completeWaitpoint({ + id: result.waitpoint.id, + output: { + value: record.data, + type: "application/json", + isError: false, + }, + }); + + // Clean up the Redis cache since we completed it ourselves + await deleteInputStreamWaitpoint(run.friendlyId, body.streamId); + } + } + } catch { + // Non-fatal: if the S2 check fails, the waitpoint is still PENDING. + // The next .send() will complete it via the Redis cache path. + } + } + + return json({ + waitpointId: WaitpointId.toFriendlyId(result.waitpoint.id), + isCached: result.isCached, + }); + } catch (error) { + if (error instanceof ServiceValidationError) { + return json({ error: error.message }, { status: 422 }); + } else if (error instanceof Error) { + return json({ error: error.message }, { status: 500 }); + } + + return json({ error: "Something went wrong" }, { status: 500 }); + } + } +); + +export { action, loader }; diff --git a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts index 0c188c17768..d55c3659eea 100644 --- a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts +++ b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts @@ -166,7 +166,7 @@ async function responseHeaders( const claims = { sub: environment.id, pub: true, - scopes: [`read:runs:${run.friendlyId}`], + scopes: [`read:runs:${run.friendlyId}`, `write:inputStreams:${run.friendlyId}`], realtime, }; diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts new file mode 100644 index 00000000000..c33ad89f350 --- /dev/null +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts @@ -0,0 +1,168 @@ +import { json } from "@remix-run/server-runtime"; +import { z } from "zod"; +import { $replica } from "~/db.server"; +import { getAndDeleteInputStreamWaitpoint } from "~/services/inputStreamWaitpointCache.server"; +import { + createActionApiRoute, + createLoaderApiRoute, +} from "~/services/routeBuilders/apiBuilder.server"; +import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; +import { engine } from "~/v3/runEngine.server"; + +const ParamsSchema = z.object({ + runId: z.string(), + streamId: z.string(), +}); + +const BodySchema = z.object({ + data: z.unknown(), +}); + +// POST: Send data to an input stream +const { action } = createActionApiRoute( + { + params: ParamsSchema, + maxContentLength: 1024 * 1024, // 1MB max + allowJWT: true, + corsStrategy: "all", + authorization: { + action: "write", + resource: (params) => ({ inputStreams: params.runId }), + superScopes: ["write:inputStreams", "write:all", "admin"], + }, + }, + async ({ request, params, authentication }) => { + const run = await $replica.taskRun.findFirst({ + where: { + friendlyId: params.runId, + runtimeEnvironmentId: authentication.environment.id, + }, + select: { + id: true, + friendlyId: true, + completedAt: true, + realtimeStreamsVersion: true, + }, + }); + + if (!run) { + return json({ ok: false, error: "Run not found" }, { status: 404 }); + } + + if (run.completedAt) { + return json( + { ok: false, error: "Cannot send to input stream on a completed run" }, + { status: 400 } + ); + } + + const body = BodySchema.safeParse(await request.json()); + + if (!body.success) { + return json({ ok: false, error: "Invalid request body" }, { status: 400 }); + } + + const realtimeStream = getRealtimeStreamInstance( + authentication.environment, + run.realtimeStreamsVersion + ); + + // Build the input stream record (raw user data, no wrapper) + const recordId = `inp_${crypto.randomUUID().replace(/-/g, "").slice(0, 12)}`; + const record = JSON.stringify(body.data.data); + + // Append the record to the per-stream S2 stream (auto-creates on first write) + await realtimeStream.appendPart( + record, + recordId, + run.friendlyId, + `$trigger.input:${params.streamId}` + ); + + // Check Redis cache for a linked .wait() waitpoint (fast, no DB hit if none) + const waitpointId = await getAndDeleteInputStreamWaitpoint(params.runId, params.streamId); + if (waitpointId) { + await engine.completeWaitpoint({ + id: waitpointId, + output: { + value: JSON.stringify(body.data.data), + type: "application/json", + isError: false, + }, + }); + } + + return json({ ok: true }); + } +); + +// GET: SSE stream for reading input stream data (used by the in-task SSE tail) +const loader = createLoaderApiRoute( + { + params: ParamsSchema, + allowJWT: true, + corsStrategy: "all", + findResource: async (params, auth) => { + return $replica.taskRun.findFirst({ + where: { + friendlyId: params.runId, + runtimeEnvironmentId: auth.environment.id, + }, + include: { + batch: { + select: { + friendlyId: true, + }, + }, + }, + }); + }, + authorization: { + action: "read", + resource: (run) => ({ + runs: run.friendlyId, + tags: run.runTags, + batch: run.batch?.friendlyId, + tasks: run.taskIdentifier, + }), + superScopes: ["read:runs", "read:all", "admin"], + }, + }, + async ({ params, request, resource: run, authentication }) => { + const lastEventId = request.headers.get("Last-Event-ID") || undefined; + + const timeoutInSecondsRaw = request.headers.get("Timeout-Seconds") ?? undefined; + const timeoutInSeconds = timeoutInSecondsRaw ? parseInt(timeoutInSecondsRaw) : undefined; + + if (timeoutInSeconds && isNaN(timeoutInSeconds)) { + return new Response("Invalid timeout seconds", { status: 400 }); + } + + if (timeoutInSeconds && timeoutInSeconds < 1) { + return new Response("Timeout seconds must be greater than 0", { status: 400 }); + } + + if (timeoutInSeconds && timeoutInSeconds > 600) { + return new Response("Timeout seconds must be less than 600", { status: 400 }); + } + + const realtimeStream = getRealtimeStreamInstance( + authentication.environment, + run.realtimeStreamsVersion + ); + + // Read from the internal S2 stream name (prefixed to avoid user stream collisions) + return realtimeStream.streamResponse( + request, + run.friendlyId, + `$trigger.input:${params.streamId}`, + request.signal, + { + lastEventId, + timeoutInSeconds, + } + ); + } +); + +export { action, loader }; diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx index a78c95d6036..e46eaa5148f 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx @@ -1348,6 +1348,7 @@ function SpanEntity({ span }: { span: Span }) { runId={span.entity.object.runId} streamKey={span.entity.object.streamKey} metadata={span.entity.object.metadata} + displayName={span.entity.object.displayName} /> ); } diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx index a1d94159700..b5763bb4e9c 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx @@ -98,10 +98,12 @@ export function RealtimeStreamViewer({ runId, streamKey, metadata, + displayName, }: { runId: string; streamKey: string; metadata: Record | undefined; + displayName?: string; }) { const organization = useOrganization(); const project = useProject(); @@ -244,8 +246,8 @@ export function RealtimeStreamViewer({ variant="small/bright" className="mb-0 flex min-w-0 items-center gap-1 truncate whitespace-nowrap" > - Stream: - {streamKey} + {displayName ? "Input stream:" : "Stream:"} + {displayName ?? streamKey}
@@ -487,6 +489,9 @@ function useRealtimeStream(resourcePath: string, startIndex?: number) { const [isConnected, setIsConnected] = useState(false); useEffect(() => { + setChunks([]); + setError(null); + const abortController = new AbortController(); let reader: ReadableStreamDefaultReader> | null = null; diff --git a/apps/webapp/app/services/authorization.server.ts b/apps/webapp/app/services/authorization.server.ts index 15f85cc3278..2ea410e2c1d 100644 --- a/apps/webapp/app/services/authorization.server.ts +++ b/apps/webapp/app/services/authorization.server.ts @@ -1,6 +1,6 @@ export type AuthorizationAction = "read" | "write" | string; // Add more actions as needed -const ResourceTypes = ["tasks", "tags", "runs", "batch", "waitpoints", "deployments"] as const; +const ResourceTypes = ["tasks", "tags", "runs", "batch", "waitpoints", "deployments", "inputStreams"] as const; export type AuthorizationResources = { [key in (typeof ResourceTypes)[number]]?: string | string[]; diff --git a/apps/webapp/app/services/inputStreamWaitpointCache.server.ts b/apps/webapp/app/services/inputStreamWaitpointCache.server.ts new file mode 100644 index 00000000000..e272b117b09 --- /dev/null +++ b/apps/webapp/app/services/inputStreamWaitpointCache.server.ts @@ -0,0 +1,101 @@ +import { Redis } from "ioredis"; +import { env } from "~/env.server"; +import { singleton } from "~/utils/singleton"; +import { logger } from "./logger.server"; + +const KEY_PREFIX = "isw:"; +const DEFAULT_TTL_MS = 7 * 24 * 60 * 60 * 1000; // 7 days + +function buildKey(runFriendlyId: string, streamId: string): string { + return `${KEY_PREFIX}${runFriendlyId}:${streamId}`; +} + +function initializeRedis(): Redis | undefined { + const host = env.CACHE_REDIS_HOST; + if (!host) { + return undefined; + } + + return new Redis({ + connectionName: "inputStreamWaitpointCache", + host, + port: env.CACHE_REDIS_PORT, + username: env.CACHE_REDIS_USERNAME, + password: env.CACHE_REDIS_PASSWORD, + keyPrefix: "tr:", + enableAutoPipelining: true, + ...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }); +} + +const redis = singleton("inputStreamWaitpointCache", initializeRedis); + +/** + * Store a mapping from input stream to waitpoint ID in Redis. + * Called when `.wait()` creates a new waitpoint. + */ +export async function setInputStreamWaitpoint( + runFriendlyId: string, + streamId: string, + waitpointId: string, + ttlMs?: number +): Promise { + if (!redis) return; + + try { + const key = buildKey(runFriendlyId, streamId); + await redis.set(key, waitpointId, "PX", ttlMs ?? DEFAULT_TTL_MS); + } catch (error) { + logger.error("Failed to set input stream waitpoint cache", { + runFriendlyId, + streamId, + error, + }); + } +} + +/** + * Atomically get and delete the waitpoint ID for an input stream. + * Uses GETDEL for atomicity — only one concurrent `.send()` call will get the ID. + * Called from the `.send()` route. + */ +export async function getAndDeleteInputStreamWaitpoint( + runFriendlyId: string, + streamId: string +): Promise { + if (!redis) return null; + + try { + const key = buildKey(runFriendlyId, streamId); + return await redis.getdel(key); + } catch (error) { + logger.error("Failed to get input stream waitpoint cache", { + runFriendlyId, + streamId, + error, + }); + return null; + } +} + +/** + * Delete the cache entry for an input stream waitpoint. + * Called when a waitpoint is completed or timed out. + */ +export async function deleteInputStreamWaitpoint( + runFriendlyId: string, + streamId: string +): Promise { + if (!redis) return; + + try { + const key = buildKey(runFriendlyId, streamId); + await redis.del(key); + } catch (error) { + logger.error("Failed to delete input stream waitpoint cache", { + runFriendlyId, + streamId, + error, + }); + } +} diff --git a/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts b/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts index 1d03116ff57..4a7acb60606 100644 --- a/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts @@ -1,6 +1,6 @@ // app/realtime/S2RealtimeStreams.ts import type { UnkeyCache } from "@internal/cache"; -import { StreamIngestor, StreamResponder, StreamResponseOptions } from "./types"; +import { StreamIngestor, StreamRecord, StreamResponder, StreamResponseOptions } from "./types"; import { Logger, LogLevel } from "@trigger.dev/core/logger"; import { randomUUID } from "node:crypto"; @@ -10,6 +10,12 @@ export type S2RealtimeStreamsOptions = { accessToken: string; // "Bearer" token issued in S2 console streamPrefix?: string; // defaults to "" + // Custom endpoint for s2-lite (self-hosted) + endpoint?: string; // e.g., "http://localhost:4566/v1" + + // Skip access token issuance (s2-lite doesn't support /access-tokens) + skipAccessTokens?: boolean; + // Read behavior s2WaitSeconds?: number; @@ -37,8 +43,11 @@ type S2AppendAck = { export class S2RealtimeStreams implements StreamResponder, StreamIngestor { private readonly basin: string; private readonly baseUrl: string; + private readonly accountUrl: string; + private readonly endpoint?: string; private readonly token: string; private readonly streamPrefix: string; + private readonly skipAccessTokens: boolean; private readonly s2WaitSeconds: number; @@ -56,9 +65,12 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { constructor(opts: S2RealtimeStreamsOptions) { this.basin = opts.basin; - this.baseUrl = `https://${this.basin}.b.aws.s2.dev/v1`; + this.baseUrl = opts.endpoint ?? `https://${this.basin}.b.aws.s2.dev/v1`; + this.accountUrl = opts.endpoint ?? `https://aws.s2.dev/v1`; + this.endpoint = opts.endpoint; this.token = opts.accessToken; this.streamPrefix = opts.streamPrefix ?? ""; + this.skipAccessTokens = opts.skipAccessTokens ?? false; this.s2WaitSeconds = opts.s2WaitSeconds ?? 60; @@ -80,17 +92,20 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { runId: string, streamId: string ): Promise<{ responseHeaders?: Record }> { - const id = randomUUID(); - - const accessToken = await this.getS2AccessToken(id); + const accessToken = this.skipAccessTokens + ? this.token + : await this.getS2AccessToken(randomUUID()); return { responseHeaders: { "X-S2-Access-Token": accessToken, - "X-S2-Stream-Name": `/runs/${runId}/${streamId}`, + "X-S2-Stream-Name": this.skipAccessTokens + ? this.toStreamName(runId, streamId) + : `/runs/${runId}/${streamId}`, "X-S2-Basin": this.basin, "X-S2-Flush-Interval-Ms": this.flushIntervalMs.toString(), "X-S2-Max-Retries": this.maxRetries.toString(), + ...(this.endpoint ? { "X-S2-Endpoint": this.endpoint } : {}), }, }; } @@ -121,6 +136,88 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { throw new Error("S2 streams are written to S2 via the client, not from the server"); } + async readRecords( + runId: string, + streamId: string, + afterSeqNum?: number + ): Promise { + const s2Stream = this.toStreamName(runId, streamId); + const startSeq = afterSeqNum != null ? afterSeqNum + 1 : 0; + + const qs = new URLSearchParams(); + qs.set("seq_num", String(startSeq)); + qs.set("clamp", "true"); + qs.set("wait", "0"); // Non-blocking: return immediately with existing records + + const res = await fetch( + `${this.baseUrl}/streams/${encodeURIComponent(s2Stream)}/records?${qs}`, + { + method: "GET", + headers: { + Authorization: `Bearer ${this.token}`, + Accept: "text/event-stream", + "S2-Format": "raw", + "S2-Basin": this.basin, + }, + } + ); + + if (!res.ok) { + // Stream may not exist yet (no data sent) + if (res.status === 404) { + return []; + } + const text = await res.text().catch(() => ""); + throw new Error(`S2 readRecords failed: ${res.status} ${res.statusText} ${text}`); + } + + // Parse the SSE response body to extract records + const body = await res.text(); + return this.parseSSEBatchRecords(body); + } + + private parseSSEBatchRecords(sseText: string): StreamRecord[] { + const records: StreamRecord[] = []; + + // SSE events are separated by double newlines + const events = sseText.split("\n\n").filter((e) => e.trim()); + + for (const event of events) { + const lines = event.split("\n"); + let eventType: string | undefined; + let data: string | undefined; + + for (const line of lines) { + if (line.startsWith("event:")) { + eventType = line.slice(6).trim(); + } else if (line.startsWith("data:")) { + data = line.slice(5).trim(); + } + } + + if (eventType === "batch" && data) { + try { + const parsed = JSON.parse(data) as { + records: Array<{ body: string; seq_num: number; timestamp: number }>; + }; + + for (const record of parsed.records) { + const parsedBody = JSON.parse(record.body) as { data: string; id: string }; + records.push({ + data: parsedBody.data, + id: parsedBody.id, + seqNum: record.seq_num, + }); + } + } catch { + // Skip malformed events + } + } + } + + return records; + } + // ---------- Serve SSE from S2 ---------- async streamResponse( @@ -155,7 +252,8 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { headers: { Authorization: `Bearer ${this.token}`, "Content-Type": "application/json", - "S2-Format": "raw", // UTF-8 JSON encoding (no base64 overhead) when your data is text. :contentReference[oaicite:8]{index=8} + "S2-Format": "raw", + "S2-Basin": this.basin, }, body: JSON.stringify(body), }); @@ -184,7 +282,7 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { private async s2IssueAccessToken(id: string): Promise { // POST /v1/access-tokens - const res = await fetch(`https://aws.s2.dev/v1/access-tokens`, { + const res = await fetch(`${this.accountUrl}/access-tokens`, { method: "POST", headers: { Authorization: `Bearer ${this.token}`, @@ -235,6 +333,7 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { Authorization: `Bearer ${this.token}`, Accept: "text/event-stream", "S2-Format": "raw", + "S2-Basin": this.basin, }, signal: opts.signal, }); diff --git a/apps/webapp/app/services/realtime/types.ts b/apps/webapp/app/services/realtime/types.ts index 912711019ab..208e9f52401 100644 --- a/apps/webapp/app/services/realtime/types.ts +++ b/apps/webapp/app/services/realtime/types.ts @@ -1,3 +1,9 @@ +export type StreamRecord = { + data: string; + id: string; + seqNum: number; +}; + // Interface for stream ingestion export interface StreamIngestor { initializeStream( @@ -16,6 +22,17 @@ export interface StreamIngestor { appendPart(part: string, partId: string, runId: string, streamId: string): Promise; getLastChunkIndex(runId: string, streamId: string, clientId: string): Promise; + + /** + * Read records from a stream starting after a given sequence number. + * Returns immediately with whatever records exist (non-blocking). + * Not all backends support this — returns undefined if unsupported. + */ + readRecords?( + runId: string, + streamId: string, + afterSeqNum?: number + ): Promise; } export type StreamResponseOptions = { diff --git a/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts b/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts index 7cc21101bf2..4e7b8b41206 100644 --- a/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts +++ b/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts @@ -40,6 +40,8 @@ export function getRealtimeStreamInstance( return new S2RealtimeStreams({ basin: env.REALTIME_STREAMS_S2_BASIN, accessToken: env.REALTIME_STREAMS_S2_ACCESS_TOKEN, + endpoint: env.REALTIME_STREAMS_S2_ENDPOINT, + skipAccessTokens: env.REALTIME_STREAMS_S2_SKIP_ACCESS_TOKENS === "true", streamPrefix: [ "org", environment.organization.id, diff --git a/apps/webapp/app/v3/services/deployment.server.ts b/apps/webapp/app/v3/services/deployment.server.ts index 985e0e47543..13d9d43cfc6 100644 --- a/apps/webapp/app/v3/services/deployment.server.ts +++ b/apps/webapp/app/v3/services/deployment.server.ts @@ -368,7 +368,7 @@ export class DeploymentService extends BaseService { ); return fromPromise( - stream.append(events.map((event) => AppendRecord.make(JSON.stringify(event)))), + stream.append(events.map((event) => AppendRecord.string({ body: JSON.stringify(event) }))), (error) => ({ type: "failed_to_append_to_event_log" as const, cause: error, diff --git a/apps/webapp/package.json b/apps/webapp/package.json index 42a1741d2eb..80c7552d766 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -107,7 +107,7 @@ "@remix-run/serve": "2.1.0", "@remix-run/server-runtime": "2.1.0", "@remix-run/v1-meta": "^0.1.3", - "@s2-dev/streamstore": "^0.17.2", + "@s2-dev/streamstore": "^0.22.5", "@sentry/remix": "9.46.0", "@slack/web-api": "7.9.1", "@socket.io/redis-adapter": "^8.3.0", diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index c80648d710c..0410013edd4 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -143,6 +143,30 @@ services: networks: - app_network + s2: + image: ghcr.io/s2-streamstore/s2 + command: lite + ports: + - "4566:80" + networks: + - app_network + healthcheck: + test: ["CMD-SHELL", "wget -qO- http://localhost:80/v1/basins?limit=1 || exit 1"] + interval: 2s + timeout: 3s + retries: 5 + start_period: 3s + + s2-init: + image: curlimages/curl:latest + depends_on: + s2: + condition: service_healthy + networks: + - app_network + restart: "no" + entrypoint: ["sh", "-c", "curl -sf -X PUT http://s2:80/v1/basins/trigger-local -H 'Content-Type: application/json' -d '{\"config\":{\"create_stream_on_append\":true,\"create_stream_on_read\":true}}' && echo ' Basin trigger-local ready' || echo ' Basin trigger-local already exists'"] + toxiproxy: container_name: toxiproxy image: ghcr.io/shopify/toxiproxy:latest diff --git a/docs/designs/input-stream-wait.md b/docs/designs/input-stream-wait.md new file mode 100644 index 00000000000..943c06eaa3d --- /dev/null +++ b/docs/designs/input-stream-wait.md @@ -0,0 +1,370 @@ +# Input Stream `.wait()` — SDK Design + +## Problem + +The existing input stream methods (`.on()`, `.once()`, `.peek()`) are all **non-suspending**. When a task calls `await approval.once()`, the task process stays alive with an open SSE tail connection, consuming compute the entire time it waits for data. + +This is fine for short-lived waits or cases where the task is doing other work concurrently (like streaming AI output while listening for a cancel signal). But for use cases where the task genuinely has nothing to do until data arrives — approval gates, human-in-the-loop decisions, waiting for external webhooks — keeping the process alive wastes compute and money. + +`wait.forToken()` already solves this for arbitrary waitpoints: the task suspends, the process is freed, and execution resumes when the token is completed. Input stream `.wait()` brings that same suspension behavior to input streams, so you get the ergonomics of typed input streams with the efficiency of waitpoint-based suspension. + +## API Surface + +### `.wait()` method on input streams + +```ts +const approval = streams.input<{ approved: boolean; reviewer: string }>({ + id: "approval", +}); + +// Inside a task — suspends execution until data arrives +const result = await approval.wait(); +``` + +#### Signature + +```ts +type RealtimeDefinedInputStream = { + // ... existing methods ... + + /** + * Suspend the task until data arrives on this input stream. + * + * Unlike `.once()` which keeps the task process alive while waiting, + * `.wait()` suspends the task entirely — freeing compute resources. + * The task resumes when data is sent via `.send()`. + * + * Uses a waitpoint token internally. Can only be called inside a task.run(). + */ + wait: (options?: InputStreamWaitOptions) => ManualWaitpointPromise; +}; +``` + +#### Options + +```ts +type InputStreamWaitOptions = { + /** + * Maximum time to wait before the waitpoint times out. + * Uses the same period format as `wait.createToken()`. + * If the timeout is reached, the result will be `{ ok: false, error }`. + * + * @example "30s", "5m", "1h", "24h", "7d" + */ + timeout?: string; + + /** + * Idempotency key for the underlying waitpoint token. + * If the same key is used again (and hasn't expired), the existing + * waitpoint is reused. This means if the task retries, it will + * resume waiting on the same waitpoint rather than creating a new one. + */ + idempotencyKey?: string; + + /** + * TTL for the idempotency key. After this period, the same key + * will create a new waitpoint. + */ + idempotencyKeyTTL?: string; + + /** + * Tags for the underlying waitpoint token, useful for querying + * and filtering waitpoints via `wait.listTokens()`. + */ + tags?: string[]; +}; +``` + +#### Return type + +Returns `ManualWaitpointPromise` — the same type returned by `wait.forToken()`. This gives you two ways to handle the result: + +**Check `ok` explicitly:** + +```ts +const result = await approval.wait({ timeout: "24h" }); + +if (result.ok) { + console.log(result.output.approved); // TData, fully typed +} else { + console.log("Timed out:", result.error.message); +} +``` + +**Use `.unwrap()` to throw on timeout:** + +```ts +// Throws WaitpointTimeoutError if the timeout is reached +const data = await approval.wait({ timeout: "24h" }).unwrap(); +console.log(data.approved); // TData directly +``` + +## When to Use `.wait()` vs `.once()` vs `.on()` + +| Method | Task suspended? | Compute cost while waiting | Best for | +|--------|----------------|---------------------------|----------| +| `.on(handler)` | No | Full — process stays alive | Continuous listening (cancel signals, live updates) | +| `.once()` | No | Full — process stays alive | Short waits, or when doing concurrent work | +| `.wait()` | **Yes** | **None** — process freed | Approval gates, human-in-the-loop, long waits | + +### Use `.wait()` when: + +- The task has **nothing else to do** until data arrives +- The wait could be **long** (minutes, hours, days) — e.g., waiting for a human to review something +- You want to **minimize compute cost** — the task suspends and doesn't burn resources +- You want **timeout behavior** matching `wait.forToken()` (auto-timeout with `ok: false`) +- You need **idempotency** for retries — the same idempotency key resumes the same wait + +### Use `.once()` when: + +- The wait is **short** (seconds) and suspending would add unnecessary overhead +- You're doing **concurrent work** while waiting — e.g., streaming AI output and waiting for the next user message at the same time +- You want **AbortSignal support** — `.once()` accepts a signal for cancellation from within the task +- You need to **check a buffer** — `.once()` resolves immediately if data was already sent before the call + +### Use `.on()` when: + +- You need to **react to every value**, not just the first one +- You're implementing **event-driven patterns** like cancel signals +- The handler runs **alongside other task work** (e.g., abort an AI stream when cancel arrives) + +## Examples + +### Approval gate — the core use case + +The simplest case: a task does some work, then waits for human approval before continuing. With `.once()` this burns compute for the entire review period. With `.wait()` the task suspends. + +```ts trigger/streams.ts +import { streams } from "@trigger.dev/sdk"; + +export const approval = streams.input<{ + approved: boolean; + reviewer: string; + comment?: string; +}>({ id: "approval" }); +``` + +```ts trigger/publish-post.ts +import { task } from "@trigger.dev/sdk"; +import { approval } from "./streams"; + +export const publishPost = task({ + id: "publish-post", + run: async (payload: { postId: string }) => { + const draft = await prepareDraft(payload.postId); + + // Notify reviewer (email, Slack, etc.) + await notifyReviewer(draft); + + // Suspend until reviewer responds — no compute cost while waiting + const result = await approval.wait({ timeout: "7d" }).unwrap(); + + if (result.approved) { + await publish(draft); + return { published: true, reviewer: result.reviewer }; + } + + return { published: false, reason: result.comment }; + }, +}); +``` + +```ts app/api/review/route.ts +import { approval } from "@/trigger/streams"; + +export async function POST(req: Request) { + const { runId, approved, comment } = await req.json(); + + await approval.send(runId, { + approved, + reviewer: "alice@example.com", + comment, + }); + + return Response.json({ ok: true }); +} +``` + +### Idempotent waits for retries + +If a task retries after a failure, you don't want to create a duplicate waitpoint. Use `idempotencyKey` to ensure the retry resumes the same wait. + +```ts +export const processOrder = task({ + id: "process-order", + retry: { maxAttempts: 3 }, + run: async (payload: { orderId: string }) => { + await prepareOrder(payload.orderId); + + // Same idempotency key across retries — won't create duplicate waitpoints + const result = await approval.wait({ + timeout: "48h", + idempotencyKey: `order-approval-${payload.orderId}`, + tags: [`order:${payload.orderId}`], + }); + + if (!result.ok) { + throw new Error("Approval timed out after 48 hours"); + } + + await fulfillOrder(payload.orderId, result.output); + }, +}); +``` + +### Multi-step conversation with an AI agent + +An AI agent that pauses to ask the user for clarification. Each step suspends until the user responds. + +```ts trigger/streams.ts +import { streams } from "@trigger.dev/sdk"; + +export const userMessage = streams.input<{ + text: string; + attachments?: string[]; +}>({ id: "user-message" }); + +export const agentOutput = streams.define({ id: "agent" }); +``` + +```ts trigger/agent.ts +import { task } from "@trigger.dev/sdk"; +import { streamText } from "ai"; +import { openai } from "@ai-sdk/openai"; +import { userMessage, agentOutput } from "./streams"; + +export const agentTask = task({ + id: "ai-agent", + run: async (payload: { initialPrompt: string }) => { + const messages: Array<{ role: string; content: string }> = [ + { role: "user", content: payload.initialPrompt }, + ]; + + for (let turn = 0; turn < 10; turn++) { + // Generate a response + const result = streamText({ + model: openai("gpt-4o"), + messages, + }); + + const { waitUntilComplete } = agentOutput.pipe(result.textStream); + await waitUntilComplete(); + + const text = await result.text; + messages.push({ role: "assistant", content: text }); + + // Check if the agent wants to ask the user something + if (!needsUserInput(text)) { + break; + } + + // Suspend and wait for the user to respond — zero compute cost + const reply = await userMessage.wait({ timeout: "1h" }).unwrap(); + messages.push({ role: "user", content: reply.text }); + } + + return { messages }; + }, +}); +``` + +### Timeout handling + +When a `.wait()` times out, you get `{ ok: false, error }` — just like `wait.forToken()`. + +```ts +const result = await approval.wait({ timeout: "24h" }); + +if (!result.ok) { + // WaitpointTimeoutError — the 24 hours elapsed + await escalate(payload.ticketId); + return { escalated: true }; +} + +// result.output is the typed data +await processApproval(result.output); +``` + +Or let it throw with `.unwrap()`: + +```ts +try { + const data = await approval.wait({ timeout: "24h" }).unwrap(); + await processApproval(data); +} catch (error) { + if (error instanceof WaitpointTimeoutError) { + await escalate(payload.ticketId); + } + throw error; +} +``` + +### Combining `.wait()` and `.on()` in the same task + +A task that waits for structured user input (suspending) but also listens for a cancel signal (non-suspending) during the active work phases. + +```ts trigger/streams.ts +import { streams } from "@trigger.dev/sdk"; + +export const cancelSignal = streams.input<{ reason?: string }>({ id: "cancel" }); +export const userInput = streams.input<{ choice: "a" | "b" | "c" }>({ id: "user-input" }); +``` + +```ts trigger/interactive-task.ts +import { task } from "@trigger.dev/sdk"; +import { cancelSignal, userInput } from "./streams"; + +export const interactiveTask = task({ + id: "interactive", + run: async (payload: { question: string }) => { + // Phase 1: Suspend and wait for user choice (no compute cost) + const { choice } = await userInput.wait({ timeout: "1h" }).unwrap(); + + // Phase 2: Do expensive work with cancel support (compute is running) + const controller = new AbortController(); + const sub = cancelSignal.on(() => controller.abort()); + + try { + const result = await doExpensiveWork(choice, controller.signal); + return result; + } finally { + sub.off(); + } + }, +}); +``` + +## Sending data — no changes + +`.send()` works exactly the same whether the task is waiting via `.wait()`, `.once()`, or `.on()`. The caller doesn't need to know how the task is listening: + +```ts +// This works regardless of whether the task used .wait(), .once(), or .on() +await approval.send(runId, { approved: true, reviewer: "alice" }); +``` + +## Behavioral differences from `.once()` + +| Behavior | `.once()` | `.wait()` | +|----------|-----------|-----------| +| Task process | Stays alive | Suspended | +| Buffered data | Resolves immediately from buffer | N/A — creates waitpoint before checking buffer | +| AbortSignal | Supported via `options.signal` | Not supported — use `timeout` instead | +| Timeout format | `timeoutMs` (milliseconds) | `timeout` (period string: `"24h"`, `"7d"`) | +| Timeout result | Rejects with `Error` | Resolves with `{ ok: false, error }` (or throws via `.unwrap()`) | +| Return type | `Promise` | `ManualWaitpointPromise` | +| Idempotency | None | Supported via `idempotencyKey` | +| Tags | None | Supported via `tags` | +| Multiple calls | Each `.once()` waits for the next value | Each `.wait()` creates a new waitpoint | +| Can use outside task | No (needs SSE tail) | No (needs `runtime.waitUntil()`) | + +## How it works (conceptual) + +Under the hood, `.wait()` bridges input streams with the waitpoint token system: + +1. **Task calls `approval.wait()`** — creates a waitpoint token internally and tells the platform to link it to this input stream +2. **Task suspends** via `runtime.waitUntil(tokenId)` — process is freed, zero compute cost +3. **Caller sends data** via `approval.send(runId, data)` — the platform sees the linked waitpoint and completes it with the sent data +4. **Task resumes** — the waitpoint resolves and `.wait()` returns the typed data + +The key insight is that the platform handles the bridging: it knows which waitpoint token is associated with which input stream, so when data arrives on the stream, it completes the corresponding waitpoint. The task doesn't need a running process to receive the data. diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index e28b951f05d..89325996c80 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -778,6 +778,7 @@ model TaskRun { /// Store the stream keys that are being used by the run realtimeStreams String[] @default([]) + @@unique([oneTimeUseToken]) @@unique([runtimeEnvironmentId, taskIdentifier, idempotencyKey]) // Finding child runs diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index 0da61bfba67..ae9a2667f51 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -33,6 +33,7 @@ import { traceContext, heartbeats, realtimeStreams, + inputStreams, } from "@trigger.dev/core/v3"; import { TriggerTracer } from "@trigger.dev/core/v3/tracer"; import { @@ -59,6 +60,7 @@ import { StandardTraceContextManager, StandardHeartbeatsManager, StandardRealtimeStreamsManager, + StandardInputStreamManager, } from "@trigger.dev/core/v3/workers"; import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc"; import { readFile } from "node:fs/promises"; @@ -160,6 +162,14 @@ const standardRealtimeStreamsManager = new StandardRealtimeStreamsManager( ); realtimeStreams.setGlobalManager(standardRealtimeStreamsManager); +const standardInputStreamManager = new StandardInputStreamManager( + apiClientManager.clientOrThrow(), + getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev", + (getEnvVar("TRIGGER_STREAMS_DEBUG") === "1" || getEnvVar("TRIGGER_STREAMS_DEBUG") === "true") ?? + false +); +inputStreams.setGlobalManager(standardInputStreamManager); + const waitUntilTimeoutInMs = getNumberEnvVar("TRIGGER_WAIT_UNTIL_TIMEOUT_MS", 60_000); const waitUntilManager = new StandardWaitUntilManager(waitUntilTimeoutInMs); waitUntil.setGlobalManager(waitUntilManager); @@ -333,6 +343,7 @@ function resetExecutionEnvironment() { usageTimeoutManager.reset(); runMetadataManager.reset(); standardRealtimeStreamsManager.reset(); + standardInputStreamManager.reset(); waitUntilManager.reset(); _sharedWorkerRuntime?.reset(); durableClock.reset(); @@ -380,6 +391,7 @@ const zodIpc = new ZodIpcConnection({ } resetExecutionEnvironment(); + standardInputStreamManager.setRunId(execution.run.id, execution.run.realtimeStreamsVersion); standardTraceContextManager.traceContext = traceContext; standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics, isWarmStart); diff --git a/packages/cli-v3/src/entryPoints/managed-run-worker.ts b/packages/cli-v3/src/entryPoints/managed-run-worker.ts index 922accfa4a5..f8234aa68c4 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-worker.ts @@ -32,6 +32,7 @@ import { traceContext, heartbeats, realtimeStreams, + inputStreams, } from "@trigger.dev/core/v3"; import { TriggerTracer } from "@trigger.dev/core/v3/tracer"; import { @@ -59,6 +60,7 @@ import { StandardTraceContextManager, StandardHeartbeatsManager, StandardRealtimeStreamsManager, + StandardInputStreamManager, } from "@trigger.dev/core/v3/workers"; import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc"; import { readFile } from "node:fs/promises"; @@ -140,6 +142,14 @@ const standardRealtimeStreamsManager = new StandardRealtimeStreamsManager( ); realtimeStreams.setGlobalManager(standardRealtimeStreamsManager); +const standardInputStreamManager = new StandardInputStreamManager( + apiClientManager.clientOrThrow(), + getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev", + (getEnvVar("TRIGGER_STREAMS_DEBUG") === "1" || getEnvVar("TRIGGER_STREAMS_DEBUG") === "true") ?? + false +); +inputStreams.setGlobalManager(standardInputStreamManager); + const waitUntilTimeoutInMs = getNumberEnvVar("TRIGGER_WAIT_UNTIL_TIMEOUT_MS", 60_000); const waitUntilManager = new StandardWaitUntilManager(waitUntilTimeoutInMs); waitUntil.setGlobalManager(waitUntilManager); @@ -313,6 +323,7 @@ function resetExecutionEnvironment() { runMetadataManager.reset(); waitUntilManager.reset(); standardRealtimeStreamsManager.reset(); + standardInputStreamManager.reset(); _sharedWorkerRuntime?.reset(); durableClock.reset(); taskContext.disable(); @@ -364,6 +375,7 @@ const zodIpc = new ZodIpcConnection({ } resetExecutionEnvironment(); + standardInputStreamManager.setRunId(execution.run.id, execution.run.realtimeStreamsVersion); standardTraceContextManager.traceContext = traceContext; diff --git a/packages/core/package.json b/packages/core/package.json index 369b3266c7c..52d3b877483 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -177,8 +177,8 @@ "@opentelemetry/core": "2.0.1", "@opentelemetry/exporter-logs-otlp-http": "0.203.0", "@opentelemetry/exporter-metrics-otlp-http": "0.203.0", - "@opentelemetry/host-metrics": "^0.37.0", "@opentelemetry/exporter-trace-otlp-http": "0.203.0", + "@opentelemetry/host-metrics": "^0.37.0", "@opentelemetry/instrumentation": "0.203.0", "@opentelemetry/resources": "2.0.1", "@opentelemetry/sdk-logs": "0.203.0", @@ -186,7 +186,7 @@ "@opentelemetry/sdk-trace-base": "2.0.1", "@opentelemetry/sdk-trace-node": "2.0.1", "@opentelemetry/semantic-conventions": "1.36.0", - "@s2-dev/streamstore": "0.17.3", + "@s2-dev/streamstore": "0.22.5", "dequal": "^2.0.3", "eventsource": "^3.0.5", "eventsource-parser": "^3.0.0", diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index 428493b71e2..7de6e275fc4 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -17,6 +17,8 @@ import { CreateBatchRequestBody, CreateBatchResponse, CreateEnvironmentVariableRequestBody, + CreateInputStreamWaitpointRequestBody, + CreateInputStreamWaitpointResponseBody, CreateScheduleOptions, CreateStreamResponseBody, CreateUploadPayloadUrlResponseBody, @@ -41,6 +43,7 @@ import { RetrieveRunResponse, RetrieveRunTraceResponseBody, ScheduleObject, + SendInputStreamResponseBody, StreamBatchItemsResponse, TaskRunExecutionResult, TriggerTaskRequestBody, @@ -1317,6 +1320,8 @@ export class ApiClient { onComplete?: () => void; onError?: (error: Error) => void; lastEventId?: string; + /** Called for each SSE event with the full event metadata (id, timestamp). */ + onPart?: (part: SSEStreamPart) => void; } ): Promise> { const streamFactory = new SSEStreamSubscriptionFactory(options?.baseUrl ?? this.baseUrl, { @@ -1333,10 +1338,14 @@ export class ApiClient { const stream = await subscription.subscribe(); + const onPart = options?.onPart; + return stream.pipeThrough( new TransformStream({ transform(chunk, controller) { - controller.enqueue(chunk.chunk as T); + const data = chunk.chunk as T; + onPart?.(chunk as SSEStreamPart); + controller.enqueue(data); }, }) ); @@ -1385,6 +1394,41 @@ export class ApiClient { ); } + async sendInputStream( + runId: string, + streamId: string, + data: unknown, + requestOptions?: ZodFetchOptions + ) { + return zodfetch( + SendInputStreamResponseBody, + `${this.baseUrl}/realtime/v1/streams/${runId}/input/${streamId}`, + { + method: "POST", + headers: this.#getHeaders(false), + body: JSON.stringify({ data }), + }, + mergeRequestOptions(this.defaultRequestOptions, requestOptions) + ); + } + + async createInputStreamWaitpoint( + runFriendlyId: string, + body: CreateInputStreamWaitpointRequestBody, + requestOptions?: ZodFetchOptions + ) { + return zodfetch( + CreateInputStreamWaitpointResponseBody, + `${this.baseUrl}/api/v1/runs/${runFriendlyId}/input-streams/wait`, + { + method: "POST", + headers: this.#getHeaders(false), + body: JSON.stringify(body), + }, + mergeRequestOptions(this.defaultRequestOptions, requestOptions) + ); + } + async generateJWTClaims(requestOptions?: ZodFetchOptions): Promise> { return zodfetch( z.record(z.any()), diff --git a/packages/core/src/v3/index.ts b/packages/core/src/v3/index.ts index b714d8cb933..2757363f4be 100644 --- a/packages/core/src/v3/index.ts +++ b/packages/core/src/v3/index.ts @@ -20,6 +20,8 @@ export * from "./lifecycle-hooks-api.js"; export * from "./locals-api.js"; export * from "./heartbeats-api.js"; export * from "./realtime-streams-api.js"; +export * from "./input-streams-api.js"; +export * from "./waitpoints/index.js"; export * from "./schemas/index.js"; export { SemanticInternalAttributes } from "./semanticInternalAttributes.js"; export * from "./resource-catalog-api.js"; diff --git a/packages/core/src/v3/input-streams-api.ts b/packages/core/src/v3/input-streams-api.ts new file mode 100644 index 00000000000..17875db5053 --- /dev/null +++ b/packages/core/src/v3/input-streams-api.ts @@ -0,0 +1,7 @@ +// Split module-level variable definition into separate files to allow +// tree-shaking on each api instance. +import { InputStreamsAPI } from "./inputStreams/index.js"; + +export const inputStreams = InputStreamsAPI.getInstance(); + +export * from "./inputStreams/types.js"; diff --git a/packages/core/src/v3/inputStreams/index.ts b/packages/core/src/v3/inputStreams/index.ts new file mode 100644 index 00000000000..3246a39c87b --- /dev/null +++ b/packages/core/src/v3/inputStreams/index.ts @@ -0,0 +1,65 @@ +import { getGlobal, registerGlobal } from "../utils/globals.js"; +import { NoopInputStreamManager } from "./noopManager.js"; +import { InputStreamManager } from "./types.js"; +import { InputStreamOnceOptions } from "../realtimeStreams/types.js"; + +const API_NAME = "input-streams"; + +const NOOP_MANAGER = new NoopInputStreamManager(); + +export class InputStreamsAPI implements InputStreamManager { + private static _instance?: InputStreamsAPI; + + private constructor() {} + + public static getInstance(): InputStreamsAPI { + if (!this._instance) { + this._instance = new InputStreamsAPI(); + } + + return this._instance; + } + + setGlobalManager(manager: InputStreamManager): boolean { + return registerGlobal(API_NAME, manager); + } + + #getManager(): InputStreamManager { + return getGlobal(API_NAME) ?? NOOP_MANAGER; + } + + public setRunId(runId: string, streamsVersion?: string): void { + this.#getManager().setRunId(runId, streamsVersion); + } + + public on( + streamId: string, + handler: (data: unknown) => void | Promise + ): { off: () => void } { + return this.#getManager().on(streamId, handler); + } + + public once(streamId: string, options?: InputStreamOnceOptions): Promise { + return this.#getManager().once(streamId, options); + } + + public peek(streamId: string): unknown | undefined { + return this.#getManager().peek(streamId); + } + + public lastSeqNum(streamId: string): number | undefined { + return this.#getManager().lastSeqNum(streamId); + } + + public reset(): void { + this.#getManager().reset(); + } + + public disconnect(): void { + this.#getManager().disconnect(); + } + + public connectTail(runId: string, fromSeq?: number): void { + this.#getManager().connectTail(runId, fromSeq); + } +} diff --git a/packages/core/src/v3/inputStreams/manager.ts b/packages/core/src/v3/inputStreams/manager.ts new file mode 100644 index 00000000000..7347f76d1f1 --- /dev/null +++ b/packages/core/src/v3/inputStreams/manager.ts @@ -0,0 +1,313 @@ +import { ApiClient } from "../apiClient/index.js"; +import { InputStreamManager } from "./types.js"; +import { InputStreamOnceOptions } from "../realtimeStreams/types.js"; + +type InputStreamHandler = (data: unknown) => void | Promise; + +type OnceWaiter = { + resolve: (data: unknown) => void; + reject: (error: Error) => void; + timeoutHandle?: ReturnType; +}; + + +type TailState = { + abortController: AbortController; + promise: Promise; +}; + +export class StandardInputStreamManager implements InputStreamManager { + private handlers = new Map>(); + private onceWaiters = new Map(); + private buffer = new Map(); + private tails = new Map(); + private seqNums = new Map(); + private currentRunId: string | null = null; + private streamsVersion: string | undefined; + + constructor( + private apiClient: ApiClient, + private baseUrl: string, + private debug: boolean = false + ) {} + + lastSeqNum(streamId: string): number | undefined { + return this.seqNums.get(streamId); + } + + setRunId(runId: string, streamsVersion?: string): void { + this.currentRunId = runId; + this.streamsVersion = streamsVersion; + } + + on(streamId: string, handler: InputStreamHandler): { off: () => void } { + this.#requireV2Streams(); + + let handlerSet = this.handlers.get(streamId); + if (!handlerSet) { + handlerSet = new Set(); + this.handlers.set(streamId, handlerSet); + } + handlerSet.add(handler); + + // Lazily connect a tail for this stream + this.#ensureStreamTailConnected(streamId); + + // Flush any buffered data for this stream + const buffered = this.buffer.get(streamId); + if (buffered && buffered.length > 0) { + for (const data of buffered) { + this.#invokeHandler(handler, data); + } + this.buffer.delete(streamId); + } + + return { + off: () => { + handlerSet?.delete(handler); + if (handlerSet?.size === 0) { + this.handlers.delete(streamId); + } + }, + }; + } + + once(streamId: string, options?: InputStreamOnceOptions): Promise { + this.#requireV2Streams(); + + // Lazily connect a tail for this stream + this.#ensureStreamTailConnected(streamId); + + // Check buffer first + const buffered = this.buffer.get(streamId); + if (buffered && buffered.length > 0) { + const data = buffered.shift()!; + if (buffered.length === 0) { + this.buffer.delete(streamId); + } + return Promise.resolve(data); + } + + return new Promise((resolve, reject) => { + const waiter: OnceWaiter = { resolve, reject }; + + // Handle abort signal + if (options?.signal) { + if (options.signal.aborted) { + reject(new Error("Aborted")); + return; + } + options.signal.addEventListener( + "abort", + () => { + this.#removeOnceWaiter(streamId, waiter); + reject(new Error("Aborted")); + }, + { once: true } + ); + } + + // Handle timeout + if (options?.timeoutMs) { + waiter.timeoutHandle = setTimeout(() => { + this.#removeOnceWaiter(streamId, waiter); + reject(new Error(`Timeout waiting for input stream "${streamId}" after ${options.timeoutMs}ms`)); + }, options.timeoutMs); + } + + let waiters = this.onceWaiters.get(streamId); + if (!waiters) { + waiters = []; + this.onceWaiters.set(streamId, waiters); + } + waiters.push(waiter); + }); + } + + peek(streamId: string): unknown | undefined { + const buffered = this.buffer.get(streamId); + if (buffered && buffered.length > 0) { + return buffered[0]; + } + return undefined; + } + + connectTail(runId: string, _fromSeq?: number): void { + // No-op: tails are now created per-stream lazily + } + + disconnect(): void { + for (const [, tail] of this.tails) { + tail.abortController.abort(); + } + this.tails.clear(); + } + + reset(): void { + this.disconnect(); + this.currentRunId = null; + this.streamsVersion = undefined; + this.seqNums.clear(); + this.handlers.clear(); + + // Reject all pending once waiters + for (const [, waiters] of this.onceWaiters) { + for (const waiter of waiters) { + if (waiter.timeoutHandle) { + clearTimeout(waiter.timeoutHandle); + } + waiter.reject(new Error("Input stream manager reset")); + } + } + this.onceWaiters.clear(); + this.buffer.clear(); + } + + #requireV2Streams(): void { + if (this.currentRunId && this.streamsVersion !== "v2") { + throw new Error( + "Input streams require v2 realtime streams. Enable them with: { future: { v2RealtimeStreams: true } }" + ); + } + } + + #ensureStreamTailConnected(streamId: string): void { + if (!this.tails.has(streamId) && this.currentRunId) { + const abortController = new AbortController(); + const promise = this.#runTail(this.currentRunId, streamId, abortController.signal).catch( + (error) => { + if (this.debug) { + console.error(`[InputStreamManager] Tail error for "${streamId}":`, error); + } + } + ); + this.tails.set(streamId, { abortController, promise }); + } + } + + async #runTail(runId: string, streamId: string, signal: AbortSignal): Promise { + try { + const stream = await this.apiClient.fetchStream( + runId, + `input/${streamId}`, + { + signal, + baseUrl: this.baseUrl, + // Max allowed by the SSE endpoint is 600s; the tail will reconnect on close + timeoutInSeconds: 600, + onPart: (part) => { + const seqNum = parseInt(part.id, 10); + if (Number.isFinite(seqNum)) { + this.seqNums.set(streamId, seqNum); + } + }, + onComplete: () => { + if (this.debug) { + console.log(`[InputStreamManager] Tail stream completed for "${streamId}"`); + } + }, + onError: (error) => { + if (this.debug) { + console.error(`[InputStreamManager] Tail stream error for "${streamId}":`, error); + } + }, + } + ); + + for await (const record of stream) { + if (signal.aborted) break; + + // S2 SSE returns record bodies as JSON strings; parse if needed + let data: unknown; + if (typeof record === "string") { + try { + data = JSON.parse(record); + } catch { + data = record; + } + } else { + data = record; + } + + this.#dispatch(streamId, data); + } + } catch (error) { + // AbortError is expected when disconnecting + if (error instanceof Error && error.name === "AbortError") { + return; + } + throw error; + } + } + + #dispatch(streamId: string, data: unknown): void { + // First try to resolve a once waiter + const waiters = this.onceWaiters.get(streamId); + if (waiters && waiters.length > 0) { + const waiter = waiters.shift()!; + if (waiters.length === 0) { + this.onceWaiters.delete(streamId); + } + if (waiter.timeoutHandle) { + clearTimeout(waiter.timeoutHandle); + } + waiter.resolve(data); + // Also invoke persistent handlers + this.#invokeHandlers(streamId, data); + return; + } + + // Invoke persistent handlers + const handlers = this.handlers.get(streamId); + if (handlers && handlers.size > 0) { + this.#invokeHandlers(streamId, data); + return; + } + + // No handlers, buffer the data + let buffered = this.buffer.get(streamId); + if (!buffered) { + buffered = []; + this.buffer.set(streamId, buffered); + } + buffered.push(data); + } + + #invokeHandlers(streamId: string, data: unknown): void { + const handlers = this.handlers.get(streamId); + if (!handlers) return; + for (const handler of handlers) { + this.#invokeHandler(handler, data); + } + } + + #invokeHandler(handler: InputStreamHandler, data: unknown): void { + try { + const result = handler(data); + // If the handler returns a promise, catch errors silently + if (result && typeof result === "object" && "catch" in result) { + (result as Promise).catch((error) => { + if (this.debug) { + console.error("[InputStreamManager] Handler error:", error); + } + }); + } + } catch (error) { + if (this.debug) { + console.error("[InputStreamManager] Handler error:", error); + } + } + } + + #removeOnceWaiter(streamId: string, waiter: OnceWaiter): void { + const waiters = this.onceWaiters.get(streamId); + if (!waiters) return; + const index = waiters.indexOf(waiter); + if (index !== -1) { + waiters.splice(index, 1); + } + if (waiters.length === 0) { + this.onceWaiters.delete(streamId); + } + } +} diff --git a/packages/core/src/v3/inputStreams/noopManager.ts b/packages/core/src/v3/inputStreams/noopManager.ts new file mode 100644 index 00000000000..34025468614 --- /dev/null +++ b/packages/core/src/v3/inputStreams/noopManager.ts @@ -0,0 +1,28 @@ +import { InputStreamManager } from "./types.js"; +import { InputStreamOnceOptions } from "../realtimeStreams/types.js"; + +export class NoopInputStreamManager implements InputStreamManager { + setRunId(_runId: string, _streamsVersion?: string): void {} + + on(_streamId: string, _handler: (data: unknown) => void | Promise): { off: () => void } { + return { off: () => {} }; + } + + once(_streamId: string, _options?: InputStreamOnceOptions): Promise { + return new Promise(() => { + // Never resolves in noop mode + }); + } + + peek(_streamId: string): unknown | undefined { + return undefined; + } + + lastSeqNum(_streamId: string): number | undefined { + return undefined; + } + + reset(): void {} + disconnect(): void {} + connectTail(_runId: string, _fromSeq?: number): void {} +} diff --git a/packages/core/src/v3/inputStreams/types.ts b/packages/core/src/v3/inputStreams/types.ts new file mode 100644 index 00000000000..522e239d47d --- /dev/null +++ b/packages/core/src/v3/inputStreams/types.ts @@ -0,0 +1,46 @@ +import { InputStreamOnceOptions } from "../realtimeStreams/types.js"; + +export interface InputStreamManager { + /** + * Set the current run ID and streams version. The tail connection will be + * established lazily when `on()` or `once()` is first called, but only + * for v2 (S2-backed) realtime streams. + */ + setRunId(runId: string, streamsVersion?: string): void; + + /** + * Register a handler that fires every time data arrives on the given input stream. + */ + on(streamId: string, handler: (data: unknown) => void | Promise): { off: () => void }; + + /** + * Wait for the next piece of data on the given input stream. + */ + once(streamId: string, options?: InputStreamOnceOptions): Promise; + + /** + * Non-blocking peek at the most recent data on the given input stream. + */ + peek(streamId: string): unknown | undefined; + + /** + * The last S2 sequence number seen for the given input stream. + * Used by `.wait()` to tell the server where to check for existing data. + */ + lastSeqNum(streamId: string): number | undefined; + + /** + * Reset state between task executions. + */ + reset(): void; + + /** + * Disconnect any active tails / connections. + */ + disconnect(): void; + + /** + * Connect a tail to receive input stream records for the given run. + */ + connectTail(runId: string, fromSeq?: number): void; +} diff --git a/packages/core/src/v3/realtimeStreams/streamInstance.ts b/packages/core/src/v3/realtimeStreams/streamInstance.ts index 5efbcb225a3..6d8106ffe6c 100644 --- a/packages/core/src/v3/realtimeStreams/streamInstance.ts +++ b/packages/core/src/v3/realtimeStreams/streamInstance.ts @@ -52,6 +52,7 @@ export class StreamInstance implements StreamsWriter { basin: parsedResponse.basin, stream: parsedResponse.streamName ?? this.options.key, accessToken: parsedResponse.accessToken, + endpoint: parsedResponse.endpoint, source: this.options.source, signal: this.options.signal, debug: this.options.debug, @@ -103,6 +104,7 @@ type ParsedStreamResponse = version: "v2"; accessToken: string; basin: string; + endpoint?: string; flushIntervalMs?: number; maxRetries?: number; streamName?: string; @@ -123,6 +125,7 @@ function parseCreateStreamResponse( return { version: "v1" }; } + const endpoint = headers?.["x-s2-endpoint"]; const flushIntervalMs = headers?.["x-s2-flush-interval-ms"]; const maxRetries = headers?.["x-s2-max-retries"]; const streamName = headers?.["x-s2-stream-name"]; @@ -131,6 +134,7 @@ function parseCreateStreamResponse( version: "v2", accessToken, basin, + endpoint, flushIntervalMs: flushIntervalMs ? parseInt(flushIntervalMs) : undefined, maxRetries: maxRetries ? parseInt(maxRetries) : undefined, streamName, diff --git a/packages/core/src/v3/realtimeStreams/streamsWriterV2.ts b/packages/core/src/v3/realtimeStreams/streamsWriterV2.ts index 568ff5574e6..91713630dbe 100644 --- a/packages/core/src/v3/realtimeStreams/streamsWriterV2.ts +++ b/packages/core/src/v3/realtimeStreams/streamsWriterV2.ts @@ -6,12 +6,13 @@ export type StreamsWriterV2Options = { basin: string; stream: string; accessToken: string; + endpoint?: string; // Custom S2 endpoint (for s2-lite) source: ReadableStream; signal?: AbortSignal; flushIntervalMs?: number; // Used as lingerDuration for BatchTransform (default 200ms) maxRetries?: number; // Not used with appendSession, kept for compatibility debug?: boolean; // Enable debug logging (default false) - maxQueuedBytes?: number; // Max queued bytes for appendSession (default 10MB) + maxInflightBytes?: number; // Max queued bytes for appendSession (default 10MB) }; /** @@ -50,18 +51,28 @@ export class StreamsWriterV2 implements StreamsWriter { private streamPromise: Promise; private readonly flushIntervalMs: number; private readonly debug: boolean; - private readonly maxQueuedBytes: number; + private readonly maxInflightBytes: number; private aborted = false; private sessionWritable: WritableStream | null = null; constructor(private options: StreamsWriterV2Options) { this.debug = options.debug ?? false; - this.s2Client = new S2({ accessToken: options.accessToken }); + this.s2Client = new S2({ + accessToken: options.accessToken, + ...(options.endpoint + ? { + endpoints: { + account: options.endpoint, + basin: options.endpoint, + }, + } + : {}), + }); this.flushIntervalMs = options.flushIntervalMs ?? 200; - this.maxQueuedBytes = options.maxQueuedBytes ?? 1024 * 1024 * 10; // 10MB default + this.maxInflightBytes = options.maxInflightBytes ?? 1024 * 1024 * 10; // 10MB default this.log( - `[S2MetadataStream] Initializing: basin=${options.basin}, stream=${options.stream}, flushIntervalMs=${this.flushIntervalMs}, maxQueuedBytes=${this.maxQueuedBytes}` + `[S2MetadataStream] Initializing: basin=${options.basin}, stream=${options.stream}, flushIntervalMs=${this.flushIntervalMs}, maxInflightBytes=${this.maxInflightBytes}` ); // Check if already aborted @@ -124,7 +135,7 @@ export class StreamsWriterV2 implements StreamsWriter { const stream = basin.stream(this.options.stream); const session = await stream.appendSession({ - maxQueuedBytes: this.maxQueuedBytes, + maxInflightBytes: this.maxInflightBytes, }); this.sessionWritable = session.writable; @@ -141,7 +152,7 @@ export class StreamsWriterV2 implements StreamsWriter { return; } // Convert each chunk to JSON string and wrap in AppendRecord - controller.enqueue(AppendRecord.make(JSON.stringify({ data: chunk, id: nanoid(7) }))); + controller.enqueue(AppendRecord.string({ body: JSON.stringify({ data: chunk, id: nanoid(7) }) })); }, }) ) @@ -158,9 +169,9 @@ export class StreamsWriterV2 implements StreamsWriter { const lastAcked = session.lastAckedPosition(); if (lastAcked?.end) { - const recordsWritten = lastAcked.end.seq_num; + const recordsWritten = lastAcked.end.seqNum; this.log( - `[S2MetadataStream] Written ${recordsWritten} records, ending at seq_num=${lastAcked.end.seq_num}` + `[S2MetadataStream] Written ${recordsWritten} records, ending at seqNum=${lastAcked.end.seqNum}` ); } } catch (error) { diff --git a/packages/core/src/v3/realtimeStreams/types.ts b/packages/core/src/v3/realtimeStreams/types.ts index 25f116d5d96..33b77c9af8e 100644 --- a/packages/core/src/v3/realtimeStreams/types.ts +++ b/packages/core/src/v3/realtimeStreams/types.ts @@ -1,6 +1,7 @@ import { AnyZodFetchOptions, ApiRequestOptions } from "../apiClient/core.js"; import { AsyncIterableStream } from "../streams/asyncIterableStream.js"; import { Prettify } from "../types/utils.js"; +import type { ManualWaitpointPromise } from "../waitpoints/index.js"; export type RealtimeStreamOperationOptions = { signal?: AbortSignal; @@ -143,3 +144,104 @@ export type WriterStreamOptions = Prettify< }) => Promise | void; } >; + +// --- Input streams (inbound data to running tasks) --- + +/** + * A defined input stream that can receive typed data from external callers. + * + * Inside a task, use `.on()`, `.once()`, or `.peek()` to receive data. + * Outside a task, use `.send()` to send data to a running task. + */ +export type RealtimeDefinedInputStream = { + id: string; + /** + * Register a handler that fires every time data arrives on this input stream. + * Returns a subscription object with an `.off()` method to unsubscribe. + */ + on: (handler: (data: TData) => void | Promise) => InputStreamSubscription; + /** + * Wait for the next piece of data on this input stream. + * Resolves with the data when it arrives. + */ + once: (options?: InputStreamOnceOptions) => Promise; + /** + * Non-blocking peek at the most recent data received on this input stream. + * Returns `undefined` if no data has been received yet. + */ + peek: () => TData | undefined; + /** + * Suspend the task until data arrives on this input stream. + * + * Unlike `.once()` which keeps the task process alive while waiting, + * `.wait()` suspends the task entirely — freeing compute resources. + * The task resumes when data is sent via `.send()`. + * + * Uses a waitpoint token internally. Can only be called inside a task.run(). + */ + wait: (options?: InputStreamWaitOptions) => ManualWaitpointPromise; + /** + * Send data to this input stream on a specific run. + * This is used from outside the task (e.g., from your backend or another task). + */ + send: (runId: string, data: TData, options?: SendInputStreamOptions) => Promise; +}; + +export type InputStreamSubscription = { + off: () => void; +}; + +export type InputStreamOnceOptions = { + signal?: AbortSignal; + timeoutMs?: number; +}; + +export type SendInputStreamOptions = { + requestOptions?: ApiRequestOptions; +}; + +export type InputStreamWaitOptions = { + /** + * Maximum time to wait before the waitpoint times out. + * Uses the same period format as `wait.createToken()`. + * If the timeout is reached, the result will be `{ ok: false, error }`. + * + * @example "30s", "5m", "1h", "24h", "7d" + */ + timeout?: string; + + /** + * Idempotency key for the underlying waitpoint token. + * If the same key is used again (and hasn't expired), the existing + * waitpoint is reused. This means if the task retries, it will + * resume waiting on the same waitpoint rather than creating a new one. + */ + idempotencyKey?: string; + + /** + * TTL for the idempotency key. After this period, the same key + * will create a new waitpoint. + */ + idempotencyKeyTTL?: string; + + /** + * Tags for the underlying waitpoint token, useful for querying + * and filtering waitpoints via `wait.listTokens()`. + */ + tags?: string[]; +}; + +export type InferInputStreamType = T extends RealtimeDefinedInputStream + ? TData + : unknown; + +/** + * Internal record format for multiplexed input stream data on S2. + * All input streams for a run share a single S2 stream, demuxed by `stream` field. + */ +export type InputStreamRecord = { + stream: string; + data: unknown; + ts: number; + id: string; +}; diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index 2a7bcb96502..d42e6158096 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -1370,6 +1370,31 @@ export const CreateWaitpointTokenResponseBody = z.object({ }); export type CreateWaitpointTokenResponseBody = z.infer; +export const CreateInputStreamWaitpointRequestBody = z.object({ + streamId: z.string(), + timeout: z.string().optional(), + idempotencyKey: z.string().optional(), + idempotencyKeyTTL: z.string().optional(), + tags: z.union([z.string(), z.array(z.string())]).optional(), + /** + * The last S2 sequence number the client has seen on this input stream. + * Used to check for data that arrived before .wait() was called. + * If undefined, the server checks from the beginning of the stream. + */ + lastSeqNum: z.number().optional(), +}); +export type CreateInputStreamWaitpointRequestBody = z.infer< + typeof CreateInputStreamWaitpointRequestBody +>; + +export const CreateInputStreamWaitpointResponseBody = z.object({ + waitpointId: z.string(), + isCached: z.boolean(), +}); +export type CreateInputStreamWaitpointResponseBody = z.infer< + typeof CreateInputStreamWaitpointResponseBody +>; + export const waitpointTokenStatuses = ["WAITING", "COMPLETED", "TIMED_OUT"] as const; export const WaitpointTokenStatus = z.enum(waitpointTokenStatuses); export type WaitpointTokenStatus = z.infer; @@ -1598,3 +1623,8 @@ export const AppendToStreamResponseBody = z.object({ message: z.string().optional(), }); export type AppendToStreamResponseBody = z.infer; + +export const SendInputStreamResponseBody = z.object({ + ok: z.boolean(), +}); +export type SendInputStreamResponseBody = z.infer; diff --git a/packages/core/src/v3/utils/globals.ts b/packages/core/src/v3/utils/globals.ts index 47cabd60808..08b62d379b2 100644 --- a/packages/core/src/v3/utils/globals.ts +++ b/packages/core/src/v3/utils/globals.ts @@ -2,6 +2,7 @@ import { ApiClientConfiguration } from "../apiClientManager/types.js"; import { Clock } from "../clock/clock.js"; import { HeartbeatsManager } from "../heartbeats/types.js"; import type { IdempotencyKeyCatalog } from "../idempotency-key-catalog/catalog.js"; +import { InputStreamManager } from "../inputStreams/types.js"; import { LifecycleHooksManager } from "../lifecycleHooks/types.js"; import { LocalsManager } from "../locals/types.js"; import { RealtimeStreamsManager } from "../realtimeStreams/types.js"; @@ -74,4 +75,5 @@ type TriggerDotDevGlobalAPI = { ["trace-context"]?: TraceContextManager; ["heartbeats"]?: HeartbeatsManager; ["realtime-streams"]?: RealtimeStreamsManager; + ["input-streams"]?: InputStreamManager; }; diff --git a/packages/core/src/v3/waitpoints/index.ts b/packages/core/src/v3/waitpoints/index.ts new file mode 100644 index 00000000000..fc70ff57f21 --- /dev/null +++ b/packages/core/src/v3/waitpoints/index.ts @@ -0,0 +1,35 @@ +import type { WaitpointTokenTypedResult } from "../schemas/common.js"; + +export class WaitpointTimeoutError extends Error { + constructor(message: string) { + super(message); + this.name = "WaitpointTimeoutError"; + } +} + +export class ManualWaitpointPromise extends Promise< + WaitpointTokenTypedResult +> { + constructor( + executor: ( + resolve: ( + value: + | WaitpointTokenTypedResult + | PromiseLike> + ) => void, + reject: (reason?: any) => void + ) => void + ) { + super(executor); + } + + unwrap(): Promise { + return this.then((result) => { + if (result.ok) { + return result.output; + } else { + throw new WaitpointTimeoutError(result.error.message); + } + }); + } +} diff --git a/packages/core/src/v3/workers/index.ts b/packages/core/src/v3/workers/index.ts index 4ca301fcdc7..e5f8eecff98 100644 --- a/packages/core/src/v3/workers/index.ts +++ b/packages/core/src/v3/workers/index.ts @@ -32,3 +32,4 @@ export { populateEnv } from "./populateEnv.js"; export { StandardTraceContextManager } from "../traceContext/manager.js"; export { StandardHeartbeatsManager } from "../heartbeats/manager.js"; export { StandardRealtimeStreamsManager } from "../realtimeStreams/manager.js"; +export { StandardInputStreamManager } from "../inputStreams/manager.js"; diff --git a/packages/react-hooks/src/hooks/useInputStreamSend.ts b/packages/react-hooks/src/hooks/useInputStreamSend.ts new file mode 100644 index 00000000000..d3f816f30e8 --- /dev/null +++ b/packages/react-hooks/src/hooks/useInputStreamSend.ts @@ -0,0 +1,60 @@ +"use client"; + +import useSWRMutation from "swr/mutation"; +import { useApiClient, UseApiClientOptions } from "./useApiClient.js"; + +export interface InputStreamSendInstance { + /** Send data to the input stream */ + send: (data: TData) => void; + /** Whether a send is currently in progress */ + isLoading: boolean; + /** Any error that occurred during the last send */ + error?: Error; + /** Whether the hook is ready to send (has runId and access token) */ + isReady: boolean; +} + +/** + * Hook to send data to an input stream on a running task. + * + * @template TData - The type of data to send + * @param streamId - The input stream identifier + * @param runId - The run to send input stream data to + * @param options - API client options (e.g. accessToken) + * + * @example + * ```tsx + * const { send, isLoading } = useInputStreamSend("my-stream", runId, { accessToken }); + * send({ message: "hello" }); + * ``` + */ +export function useInputStreamSend( + streamId: string, + runId?: string, + options?: UseApiClientOptions +): InputStreamSendInstance { + const apiClient = useApiClient(options); + + async function sendToStream(key: string, { arg }: { arg: { data: TData } }) { + if (!apiClient) { + throw new Error("Could not send to input stream: Missing access token"); + } + + if (!runId) { + throw new Error("Could not send to input stream: Missing run ID"); + } + + return await apiClient.sendInputStream(runId, streamId, arg.data); + } + + const mutation = useSWRMutation(runId ? `input-stream:${runId}:${streamId}` : null, sendToStream); + + return { + send: (data) => { + mutation.trigger({ data }); + }, + isLoading: mutation.isMutating, + isReady: !!runId && !!apiClient, + error: mutation.error, + }; +} diff --git a/packages/react-hooks/src/index.ts b/packages/react-hooks/src/index.ts index bc20cf837d4..23c8ca947d5 100644 --- a/packages/react-hooks/src/index.ts +++ b/packages/react-hooks/src/index.ts @@ -4,3 +4,4 @@ export * from "./hooks/useRun.js"; export * from "./hooks/useRealtime.js"; export * from "./hooks/useTaskTrigger.js"; export * from "./hooks/useWaitToken.js"; +export * from "./hooks/useInputStreamSend.js"; diff --git a/packages/trigger-sdk/src/v3/auth.ts b/packages/trigger-sdk/src/v3/auth.ts index ddcf92569af..1f2df463b6f 100644 --- a/packages/trigger-sdk/src/v3/auth.ts +++ b/packages/trigger-sdk/src/v3/auth.ts @@ -62,6 +62,11 @@ type PublicTokenPermissionProperties = { * Grant access to specific waitpoints */ waitpoints?: string | string[]; + + /** + * Grant access to send data to input streams on specific runs + */ + inputStreams?: string | string[]; }; export type PublicTokenPermissions = { diff --git a/packages/trigger-sdk/src/v3/streams.ts b/packages/trigger-sdk/src/v3/streams.ts index 3bdde70bea1..2cfbfaa4041 100644 --- a/packages/trigger-sdk/src/v3/streams.ts +++ b/packages/trigger-sdk/src/v3/streams.ts @@ -1,6 +1,7 @@ import { type ApiRequestOptions, realtimeStreams, + inputStreams, taskContext, type RealtimeStreamOperationOptions, mergeRequestOptions, @@ -15,7 +16,17 @@ import { AppendStreamOptions, RealtimeDefinedStream, InferStreamType, + ManualWaitpointPromise, + WaitpointTimeoutError, + runtime, + type RealtimeDefinedInputStream, + type InputStreamSubscription, + type InputStreamOnceOptions, + type InputStreamWaitOptions, + type SendInputStreamOptions, + type InferInputStreamType, } from "@trigger.dev/core/v3"; +import { conditionallyImportAndParsePacket } from "@trigger.dev/core/v3/utils/ioSerialization"; import { tracer } from "./tracer.js"; import { SpanStatusCode } from "@opentelemetry/api"; @@ -652,7 +663,183 @@ function define(opts: RealtimeDefineStreamOptions): RealtimeDefinedStream }; } -export type { InferStreamType }; +export type { InferStreamType, InferInputStreamType }; + +/** + * Define an input stream that can receive typed data from external callers. + * + * Inside a task, use `.on()`, `.once()`, or `.peek()` to receive data. + * Outside a task (e.g., from your backend), use `.send(runId, data)` to send data. + * + * @template TData - The type of data this input stream receives + * @param opts - Options including a unique `id` for this input stream + * + * @example + * ```ts + * import { streams, task } from "@trigger.dev/sdk"; + * + * const approval = streams.input<{ approved: boolean; reviewer: string }>({ id: "approval" }); + * + * export const myTask = task({ + * id: "my-task", + * run: async (payload) => { + * // Wait for the next approval + * const data = await approval.once(); + * console.log(data.approved, data.reviewer); + * }, + * }); + * + * // From your backend: + * // await approval.send(runId, { approved: true, reviewer: "alice" }); + * ``` + */ +function input(opts: { id: string }): RealtimeDefinedInputStream { + return { + id: opts.id, + on(handler) { + return inputStreams.on( + opts.id, + handler as (data: unknown) => void | Promise + ); + }, + once(options) { + const ctx = taskContext.ctx; + const runId = ctx?.run.id; + + return tracer.startActiveSpan( + `inputStream.once()`, + async () => { + return inputStreams.once(opts.id, options) as Promise; + }, + { + attributes: { + [SemanticInternalAttributes.STYLE_ICON]: "streams", + [SemanticInternalAttributes.ENTITY_TYPE]: "input-stream", + ...(runId + ? { [SemanticInternalAttributes.ENTITY_ID]: `${runId}:${opts.id}` } + : {}), + streamId: opts.id, + ...accessoryAttributes({ + items: [{ text: opts.id, variant: "normal" }], + style: "codepath", + }), + }, + } + ); + }, + peek() { + return inputStreams.peek(opts.id) as TData | undefined; + }, + wait(options) { + return new ManualWaitpointPromise(async (resolve, reject) => { + try { + const ctx = taskContext.ctx; + + if (!ctx) { + throw new Error("inputStream.wait() can only be used from inside a task.run()"); + } + + const apiClient = apiClientManager.clientOrThrow(); + + const result = await tracer.startActiveSpan( + `inputStream.wait()`, + async (span) => { + // 1. Create a waitpoint linked to this input stream + const response = await apiClient.createInputStreamWaitpoint(ctx.run.id, { + streamId: opts.id, + timeout: options?.timeout, + idempotencyKey: options?.idempotencyKey, + idempotencyKeyTTL: options?.idempotencyKeyTTL, + tags: options?.tags, + lastSeqNum: inputStreams.lastSeqNum(opts.id), + }); + + // Set the entity ID now that we have the waitpoint ID + span.setAttribute(SemanticInternalAttributes.ENTITY_ID, response.waitpointId); + + // 2. Block the run on the waitpoint + const waitResponse = await apiClient.waitForWaitpointToken({ + runFriendlyId: ctx.run.id, + waitpointFriendlyId: response.waitpointId, + }); + + if (!waitResponse.success) { + throw new Error("Failed to block on input stream waitpoint"); + } + + // 3. Suspend the task + const waitResult = await runtime.waitUntil(response.waitpointId); + + // 4. Parse the output + const data = waitResult.output + ? await conditionallyImportAndParsePacket( + { + data: waitResult.output, + dataType: waitResult.outputType ?? "application/json", + }, + apiClient + ) + : undefined; + + if (waitResult.ok) { + return { ok: true as const, output: data as TData }; + } else { + const error = new WaitpointTimeoutError(data?.message ?? "Timed out"); + + span.recordException(error); + span.setStatus({ code: SpanStatusCode.ERROR }); + + return { ok: false as const, error }; + } + }, + { + attributes: { + [SemanticInternalAttributes.STYLE_ICON]: "wait", + [SemanticInternalAttributes.ENTITY_TYPE]: "waitpoint", + streamId: opts.id, + ...accessoryAttributes({ + items: [ + { + text: opts.id, + variant: "normal", + }, + ], + style: "codepath", + }), + }, + } + ); + + resolve(result); + } catch (error) { + reject(error); + } + }); + }, + async send(runId, data, options) { + return tracer.startActiveSpan( + `inputStream.send()`, + async () => { + const apiClient = apiClientManager.clientOrThrow(); + await apiClient.sendInputStream(runId, opts.id, data, options?.requestOptions); + }, + { + attributes: { + [SemanticInternalAttributes.STYLE_ICON]: "streams", + [SemanticInternalAttributes.ENTITY_TYPE]: "input-stream", + [SemanticInternalAttributes.ENTITY_ID]: `${runId}:${opts.id}`, + streamId: opts.id, + runId, + ...accessoryAttributes({ + items: [{ text: opts.id, variant: "normal" }], + style: "codepath", + }), + }, + } + ); + }, + }; +} export const streams = { pipe, @@ -660,6 +847,7 @@ export const streams = { append, writer, define, + input, }; function getRunIdForOptions(options?: RealtimeStreamOperationOptions): string | undefined { diff --git a/packages/trigger-sdk/src/v3/wait.ts b/packages/trigger-sdk/src/v3/wait.ts index 7ac85b03429..aab0797f4f3 100644 --- a/packages/trigger-sdk/src/v3/wait.ts +++ b/packages/trigger-sdk/src/v3/wait.ts @@ -11,6 +11,7 @@ import { CursorPagePromise, flattenAttributes, ListWaitpointTokensQueryParams, + ManualWaitpointPromise, mergeRequestOptions, runtime, SemanticInternalAttributes, @@ -19,6 +20,7 @@ import { WaitpointRetrieveTokenResponse, WaitpointTokenStatus, WaitpointTokenTypedResult, + WaitpointTimeoutError, } from "@trigger.dev/core/v3"; import { conditionallyImportAndParsePacket } from "@trigger.dev/core/v3/utils/ioSerialization"; import { tracer } from "./tracer.js"; @@ -378,12 +380,7 @@ type WaitPeriod = years: number; }; -export class WaitpointTimeoutError extends Error { - constructor(message: string) { - super(message); - this.name = "WaitpointTimeoutError"; - } -} +export { WaitpointTimeoutError, ManualWaitpointPromise } from "@trigger.dev/core/v3"; const DURATION_WAIT_CHARGE_THRESHOLD_MS = 5000; @@ -393,29 +390,6 @@ function printWaitBelowThreshold() { ); } -class ManualWaitpointPromise extends Promise> { - constructor( - executor: ( - resolve: ( - value: WaitpointTokenTypedResult | PromiseLike> - ) => void, - reject: (reason?: any) => void - ) => void - ) { - super(executor); - } - - unwrap(): Promise { - return this.then((result) => { - if (result.ok) { - return result.output; - } else { - throw new WaitpointTimeoutError(result.error.message); - } - }); - } -} - export const wait = { for: async (options: WaitForOptions) => { const ctx = taskContext.ctx; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 48e6dd6ec01..24c692ab2ed 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -456,8 +456,8 @@ importers: specifier: ^0.1.3 version: 0.1.3(@remix-run/react@2.1.0(react-dom@18.2.0(react@18.2.0))(react@18.2.0)(typescript@5.5.4))(@remix-run/server-runtime@2.1.0(typescript@5.5.4)) '@s2-dev/streamstore': - specifier: ^0.17.2 - version: 0.17.3(typescript@5.5.4) + specifier: ^0.22.5 + version: 0.22.5 '@sentry/remix': specifier: 9.46.0 version: 9.46.0(patch_hash=146126b032581925294aaed63ab53ce3f5e0356a755f1763d7a9a76b9846943b)(@remix-run/node@2.1.0(typescript@5.5.4))(@remix-run/react@2.1.0(react-dom@18.2.0(react@18.2.0))(react@18.2.0)(typescript@5.5.4))(@remix-run/server-runtime@2.1.0(typescript@5.5.4))(encoding@0.1.13)(react@18.2.0) @@ -1729,8 +1729,8 @@ importers: specifier: 1.36.0 version: 1.36.0 '@s2-dev/streamstore': - specifier: 0.17.3 - version: 0.17.3(typescript@5.5.4) + specifier: 0.22.5 + version: 0.22.5 dequal: specifier: ^2.0.3 version: 2.0.3 @@ -9412,14 +9412,12 @@ packages: '@rushstack/eslint-patch@1.2.0': resolution: {integrity: sha512-sXo/qW2/pAcmT43VoRKOJbDOfV3cYpq3szSVfIThQXNt+E4DfKj361vaAt3c88U5tPUxzEswam7GW48PJqtKAg==} - '@s2-dev/streamstore@0.17.3': - resolution: {integrity: sha512-UeXL5+MgZQfNkbhCgEDVm7PrV5B3bxh6Zp4C5pUzQQwaoA+iGh2QiiIptRZynWgayzRv4vh0PYfnKpTzJEXegQ==} - peerDependencies: - typescript: 5.5.4 - '@s2-dev/streamstore@0.17.6': resolution: {integrity: sha512-ocjZfKaPKmo2yhudM58zVNHv3rBLSbTKkabVoLFn9nAxU6iLrR2CO3QmSo7/waohI3EZHAWxF/Pw8kA8d6QH2g==} + '@s2-dev/streamstore@0.22.5': + resolution: {integrity: sha512-GqdOKIbIoIxT+40fnKzHbrsHB6gBqKdECmFe7D3Ojk4FoN1Hu0LhFzZv6ZmVMjoHHU+55debS1xSWjZwQmbIyQ==} + '@sec-ant/readable-stream@0.4.1': resolution: {integrity: sha512-831qok9r2t8AlxLko40y2ebgSDhenenCatLVeW/uBtnHPyhHOvG0C7TvfgecV+wHzIm5KUICgzmVpWS+IMEAeg==} @@ -22393,7 +22391,7 @@ snapshots: '@babel/traverse': 7.24.7 '@babel/types': 7.24.0 convert-source-map: 1.9.0 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) gensync: 1.0.0-beta.2 json5: 2.2.3 semver: 6.3.1 @@ -22681,7 +22679,7 @@ snapshots: '@babel/helper-split-export-declaration': 7.24.7 '@babel/parser': 7.27.5 '@babel/types': 7.27.3 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) globals: 11.12.0 transitivePeerDependencies: - supports-color @@ -23130,7 +23128,7 @@ snapshots: '@epic-web/test-server@0.1.0(bufferutil@4.0.9)': dependencies: '@hono/node-server': 1.12.2(hono@4.5.11) - '@hono/node-ws': 1.0.4(@hono/node-server@1.12.2(hono@4.11.8))(bufferutil@4.0.9) + '@hono/node-ws': 1.0.4(@hono/node-server@1.12.2(hono@4.5.11))(bufferutil@4.0.9) '@open-draft/deferred-promise': 2.2.0 '@types/ws': 8.5.12 hono: 4.5.11 @@ -23662,7 +23660,7 @@ snapshots: '@eslint/eslintrc@1.4.1': dependencies: ajv: 6.12.6 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) espree: 9.6.0 globals: 13.19.0 ignore: 5.2.4 @@ -23885,7 +23883,7 @@ snapshots: dependencies: hono: 4.11.8 - '@hono/node-ws@1.0.4(@hono/node-server@1.12.2(hono@4.11.8))(bufferutil@4.0.9)': + '@hono/node-ws@1.0.4(@hono/node-server@1.12.2(hono@4.5.11))(bufferutil@4.0.9)': dependencies: '@hono/node-server': 1.12.2(hono@4.5.11) ws: 8.18.3(bufferutil@4.0.9) @@ -23896,7 +23894,7 @@ snapshots: '@humanwhocodes/config-array@0.11.8': dependencies: '@humanwhocodes/object-schema': 1.2.1 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) minimatch: 3.1.2 transitivePeerDependencies: - supports-color @@ -23912,7 +23910,7 @@ snapshots: '@antfu/install-pkg': 1.1.0 '@antfu/utils': 9.3.0 '@iconify/types': 2.0.0 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) globals: 15.15.0 kolorist: 1.8.0 local-pkg: 1.1.2 @@ -25645,7 +25643,7 @@ snapshots: '@puppeteer/browsers@2.10.6': dependencies: - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) extract-zip: 2.0.1 progress: 2.0.3 proxy-agent: 6.5.0 @@ -29208,14 +29206,16 @@ snapshots: '@rushstack/eslint-patch@1.2.0': {} - '@s2-dev/streamstore@0.17.3(typescript@5.5.4)': + '@s2-dev/streamstore@0.17.6': dependencies: '@protobuf-ts/runtime': 2.11.1 - typescript: 5.5.4 - '@s2-dev/streamstore@0.17.6': + '@s2-dev/streamstore@0.22.5': dependencies: '@protobuf-ts/runtime': 2.11.1 + debug: 4.4.3(supports-color@10.0.0) + transitivePeerDependencies: + - supports-color '@sec-ant/readable-stream@0.4.1': {} @@ -31296,7 +31296,7 @@ snapshots: dependencies: '@typescript-eslint/typescript-estree': 5.59.6(typescript@5.5.4) '@typescript-eslint/utils': 5.59.6(eslint@8.31.0)(typescript@5.5.4) - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) eslint: 8.31.0 tsutils: 3.21.0(typescript@5.5.4) optionalDependencies: @@ -31310,7 +31310,7 @@ snapshots: dependencies: '@typescript-eslint/types': 5.59.6 '@typescript-eslint/visitor-keys': 5.59.6 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) globby: 11.1.0 is-glob: 4.0.3 semver: 7.7.3 @@ -31819,7 +31819,7 @@ snapshots: agent-base@6.0.2: dependencies: - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) transitivePeerDependencies: - supports-color @@ -32315,7 +32315,7 @@ snapshots: dependencies: bytes: 3.1.2 content-type: 1.0.5 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) http-errors: 2.0.0 iconv-lite: 0.6.3 on-finished: 2.4.1 @@ -32329,7 +32329,7 @@ snapshots: dependencies: bytes: 3.1.2 content-type: 1.0.5 - debug: 4.4.3 + debug: 4.4.3(supports-color@10.0.0) http-errors: 2.0.0 iconv-lite: 0.7.2 on-finished: 2.4.1 @@ -33321,15 +33321,15 @@ snapshots: dependencies: ms: 2.1.3 - debug@4.4.1(supports-color@10.0.0): + debug@4.4.1: dependencies: ms: 2.1.3 - optionalDependencies: - supports-color: 10.0.0 - debug@4.4.3: + debug@4.4.3(supports-color@10.0.0): dependencies: ms: 2.1.3 + optionalDependencies: + supports-color: 10.0.0 decamelize-keys@1.1.1: dependencies: @@ -33473,7 +33473,7 @@ snapshots: docker-modem@5.0.6: dependencies: - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) readable-stream: 3.6.0 split-ca: 1.0.1 ssh2: 1.16.0 @@ -34170,7 +34170,7 @@ snapshots: eslint-import-resolver-typescript@3.5.5(@typescript-eslint/parser@5.59.6(eslint@8.31.0)(typescript@5.5.4))(eslint-import-resolver-node@0.3.7)(eslint-plugin-import@2.29.1)(eslint@8.31.0): dependencies: - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) enhanced-resolve: 5.15.0 eslint: 8.31.0 eslint-module-utils: 2.7.4(@typescript-eslint/parser@5.59.6(eslint@8.31.0)(typescript@5.5.4))(eslint-import-resolver-node@0.3.7)(eslint-import-resolver-typescript@3.5.5)(eslint@8.31.0) @@ -34652,7 +34652,7 @@ snapshots: content-type: 1.0.5 cookie: 0.7.1 cookie-signature: 1.2.2 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) depd: 2.0.0 encodeurl: 2.0.0 escape-html: 1.0.3 @@ -34691,7 +34691,7 @@ snapshots: extract-zip@2.0.1: dependencies: - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) get-stream: 5.2.0 yauzl: 2.10.0 optionalDependencies: @@ -34865,7 +34865,7 @@ snapshots: finalhandler@2.1.0(supports-color@10.0.0): dependencies: - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) encodeurl: 2.0.0 escape-html: 1.0.3 on-finished: 2.4.1 @@ -35125,7 +35125,7 @@ snapshots: dependencies: basic-ftp: 5.0.3 data-uri-to-buffer: 5.0.1 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) fs-extra: 8.1.0 transitivePeerDependencies: - supports-color @@ -35284,7 +35284,7 @@ snapshots: '@types/node': 20.14.14 '@types/semver': 7.5.1 chalk: 4.1.2 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) interpret: 3.1.1 semver: 7.7.3 tslib: 2.8.1 @@ -35568,7 +35568,7 @@ snapshots: http-proxy-agent@7.0.2: dependencies: agent-base: 7.1.4 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) transitivePeerDependencies: - supports-color @@ -35581,14 +35581,14 @@ snapshots: https-proxy-agent@5.0.1: dependencies: agent-base: 6.0.2 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) transitivePeerDependencies: - supports-color https-proxy-agent@7.0.6: dependencies: agent-base: 7.1.4 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) transitivePeerDependencies: - supports-color @@ -35953,7 +35953,7 @@ snapshots: istanbul-lib-source-maps@5.0.6: dependencies: '@jridgewell/trace-mapping': 0.3.25 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) istanbul-lib-coverage: 3.2.2 transitivePeerDependencies: - supports-color @@ -37249,7 +37249,7 @@ snapshots: micromark@3.1.0: dependencies: '@types/debug': 4.1.12 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) decode-named-character-reference: 1.0.2 micromark-core-commonmark: 1.0.6 micromark-factory-space: 1.0.0 @@ -37271,7 +37271,7 @@ snapshots: micromark@4.0.2: dependencies: '@types/debug': 4.1.12 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) decode-named-character-reference: 1.0.2 devlop: 1.1.0 micromark-core-commonmark: 2.0.3 @@ -38159,7 +38159,7 @@ snapshots: dependencies: '@tootallnate/quickjs-emscripten': 0.23.0 agent-base: 7.1.4 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) get-uri: 6.0.1 http-proxy-agent: 7.0.2 https-proxy-agent: 7.0.6 @@ -38918,7 +38918,7 @@ snapshots: proxy-agent@6.5.0: dependencies: agent-base: 7.1.4 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) http-proxy-agent: 7.0.2 https-proxy-agent: 7.0.6 lru-cache: 7.18.3 @@ -38958,7 +38958,7 @@ snapshots: dependencies: '@puppeteer/browsers': 2.10.6 chromium-bidi: 7.2.0(devtools-protocol@0.0.1464554) - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.1 devtools-protocol: 0.0.1464554 typed-query-selector: 2.12.0 ws: 8.18.3(bufferutil@4.0.9) @@ -39779,7 +39779,7 @@ snapshots: remix-auth-oauth2@1.11.0(@remix-run/server-runtime@2.1.0(typescript@5.5.4))(remix-auth@3.6.0(@remix-run/react@2.1.0(react-dom@18.2.0(react@18.2.0))(react@18.2.0)(typescript@5.5.4))(@remix-run/server-runtime@2.1.0(typescript@5.5.4))): dependencies: '@remix-run/server-runtime': 2.1.0(typescript@5.5.4) - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) remix-auth: 3.6.0(@remix-run/react@2.1.0(react-dom@18.2.0(react@18.2.0))(react@18.2.0)(typescript@5.5.4))(@remix-run/server-runtime@2.1.0(typescript@5.5.4)) transitivePeerDependencies: - supports-color @@ -39843,7 +39843,7 @@ snapshots: require-in-the-middle@7.1.1(supports-color@10.0.0): dependencies: - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) module-details-from-path: 1.0.3 resolve: 1.22.8 transitivePeerDependencies: @@ -39974,7 +39974,7 @@ snapshots: router@2.2.0: dependencies: - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) depd: 2.0.0 is-promise: 4.0.0 parseurl: 1.3.3 @@ -40128,7 +40128,7 @@ snapshots: send@1.1.0(supports-color@10.0.0): dependencies: - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) destroy: 1.2.0 encodeurl: 2.0.0 escape-html: 1.0.3 @@ -40145,7 +40145,7 @@ snapshots: send@1.2.1: dependencies: - debug: 4.4.3 + debug: 4.4.3(supports-color@10.0.0) encodeurl: 2.0.0 escape-html: 1.0.3 etag: 1.8.1 @@ -40471,7 +40471,7 @@ snapshots: socks-proxy-agent@8.0.5: dependencies: agent-base: 7.1.4 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) socks: 2.8.3 transitivePeerDependencies: - supports-color @@ -40843,7 +40843,7 @@ snapshots: dependencies: component-emitter: 1.3.1 cookiejar: 2.1.4 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) fast-safe-stringify: 2.1.1 form-data: 4.0.4 formidable: 3.5.1 @@ -41963,7 +41963,7 @@ snapshots: vite-node@0.28.5(@types/node@20.14.14)(lightningcss@1.29.2)(terser@5.44.1): dependencies: cac: 6.7.14 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) mlly: 1.7.4 pathe: 1.1.2 picocolors: 1.1.1 @@ -41983,7 +41983,7 @@ snapshots: vite-node@3.1.4(@types/node@20.14.14)(lightningcss@1.29.2)(terser@5.44.1): dependencies: cac: 6.7.14 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) es-module-lexer: 1.7.0 pathe: 2.0.3 vite: 5.4.21(@types/node@20.14.14)(lightningcss@1.29.2)(terser@5.44.1) @@ -42039,7 +42039,7 @@ snapshots: '@vitest/spy': 3.1.4 '@vitest/utils': 3.1.4 chai: 5.2.0 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.1 expect-type: 1.2.1 magic-string: 0.30.21 pathe: 2.0.3 diff --git a/references/hello-world/src/trigger/inputStreams.ts b/references/hello-world/src/trigger/inputStreams.ts new file mode 100644 index 00000000000..8d9ff4261c0 --- /dev/null +++ b/references/hello-world/src/trigger/inputStreams.ts @@ -0,0 +1,118 @@ +import { logger, runs, streams, task, wait } from "@trigger.dev/sdk/v3"; + +// Define typed input streams +const approvalStream = streams.input<{ approved: boolean; reviewer: string }>({ + id: "approval", +}); + +const messageStream = streams.input<{ text: string }>({ id: "messages" }); + +/** + * Coordinator task that exercises all input stream patterns end-to-end. + * + * 1. .once() — trigger a child, send it data via SSE tail, poll until complete + * 2. .on() — trigger a child, send it multiple messages, poll until complete + * 3. .wait() — trigger a child, send it data (completes its waitpoint), poll until complete + * 4. .wait() race — send data before child calls .wait(), verify race handling + */ +export const inputStreamCoordinator = task({ + id: "input-stream-coordinator", + run: async () => { + const results: Record = {}; + + // --- Test 1: .once() --- + logger.info("Test 1: .once()"); + const onceHandle = await inputStreamOnce.trigger({}); + await wait.for({ seconds: 5 }); + await approvalStream.send(onceHandle.id, { approved: true, reviewer: "coordinator-once" }); + const onceRun = await runs.poll(onceHandle, { pollIntervalMs: 1000 }); + results.once = onceRun.output; + logger.info("Test 1 passed", { output: onceRun.output }); + + // --- Test 2: .on() with multiple messages --- + logger.info("Test 2: .on()"); + const onHandle = await inputStreamOn.trigger({ messageCount: 3 }); + await wait.for({ seconds: 5 }); + for (let i = 0; i < 3; i++) { + await messageStream.send(onHandle.id, { text: `message-${i}` }); + await wait.for({ seconds: 1 }); + } + const onRun = await runs.poll(onHandle, { pollIntervalMs: 1000 }); + results.on = onRun.output; + logger.info("Test 2 passed", { output: onRun.output }); + + // --- Test 3: .wait() (waitpoint-based) --- + logger.info("Test 3: .wait()"); + const waitHandle = await inputStreamWait.trigger({ timeout: "1m" }); + await wait.for({ seconds: 5 }); + await approvalStream.send(waitHandle.id, { approved: true, reviewer: "coordinator-wait" }); + const waitRun = await runs.poll(waitHandle, { pollIntervalMs: 1000 }); + results.wait = waitRun.output; + logger.info("Test 3 passed", { output: waitRun.output }); + + // --- Test 4: .wait() race condition (send before child calls .wait()) --- + logger.info("Test 4: .wait() race"); + const raceHandle = await inputStreamWait.trigger({ timeout: "1m" }); + await approvalStream.send(raceHandle.id, { approved: false, reviewer: "race-test" }); + const raceRun = await runs.poll(raceHandle, { pollIntervalMs: 1000 }); + results.race = raceRun.output; + logger.info("Test 4 passed", { output: raceRun.output }); + + logger.info("All input stream tests passed", { results }); + return results; + }, +}); + +/** + * Uses .once() to wait for a single input stream message. + */ +export const inputStreamOnce = task({ + id: "input-stream-once", + run: async (_payload: Record) => { + logger.info("Waiting for approval via .once()"); + const approval = await approvalStream.once(); + logger.info("Received approval", { approval }); + return { approval }; + }, +}); + +/** + * Uses .on() to subscribe and collect multiple messages. + */ +export const inputStreamOn = task({ + id: "input-stream-on", + run: async (payload: { messageCount?: number }) => { + const expected = payload.messageCount ?? 3; + const received: { text: string }[] = []; + + logger.info("Subscribing to messages via .on()", { expected }); + + const { off } = messageStream.on((data) => { + logger.info("Received message", { data }); + received.push(data); + }); + + while (received.length < expected) { + await wait.for({ seconds: 1 }); + } + + off(); + logger.info("Done receiving messages", { count: received.length }); + return { messages: received }; + }, +}); + +/** + * Uses .wait() to suspend the task via a waitpoint until data arrives. + */ +export const inputStreamWait = task({ + id: "input-stream-wait", + run: async (payload: { timeout?: string }) => { + logger.info("Waiting for approval via .wait()"); + const approval = await approvalStream.wait({ + timeout: payload.timeout ?? "5m", + }); + logger.info("Received approval via .wait()", { approval }); + return { approval }; + }, +}); diff --git a/references/hello-world/src/trigger/realtime.ts b/references/hello-world/src/trigger/realtime.ts index c53bb2f16ad..3bd051f6a95 100644 --- a/references/hello-world/src/trigger/realtime.ts +++ b/references/hello-world/src/trigger/realtime.ts @@ -80,13 +80,13 @@ export const realtimeStreamsTask = task({ export const realtimeStreamsV2Task = task({ id: "realtime-streams-v2", run: async () => { - const mockStream1 = createStreamFromGenerator(generateMockData(5 * 60 * 1000)); + const mockStream1 = createStreamFromGenerator(generateMockData(10_000)); await metadata.stream("mock-data", mockStream1); - await setTimeout(10000); // Offset by 10 seconds + await setTimeout(5000); // Offset by 5 seconds - const mockStream2 = createStreamFromGenerator(generateMockData(5 * 60 * 1000)); + const mockStream2 = createStreamFromGenerator(generateMockData(10_000)); const stream2 = await metadata.stream("mock-data", mockStream2); for await (const chunk of stream2) { diff --git a/references/hello-world/src/trigger/streams.ts b/references/hello-world/src/trigger/streams.ts new file mode 100644 index 00000000000..9a10888b532 --- /dev/null +++ b/references/hello-world/src/trigger/streams.ts @@ -0,0 +1,17 @@ +import { streams, InferStreamType } from "@trigger.dev/sdk"; + +export const textStream = streams.define({ + id: "text", +}); + +export const progressStream = streams.define<{ step: string; percent: number }>({ + id: "progress", +}); + +export const logStream = streams.define({ + id: "logs", +}); + +export type TextStreamPart = InferStreamType; +export type ProgressStreamPart = InferStreamType; +export type LogStreamPart = InferStreamType; diff --git a/references/hello-world/src/trigger/streamsV2.ts b/references/hello-world/src/trigger/streamsV2.ts new file mode 100644 index 00000000000..cf51e40e5d3 --- /dev/null +++ b/references/hello-world/src/trigger/streamsV2.ts @@ -0,0 +1,229 @@ +import { logger, streams, task } from "@trigger.dev/sdk"; +import { setTimeout } from "timers/promises"; +import { textStream, progressStream, logStream } from "./streams.js"; + +// Test 1: .pipe() then read back from S2 via a coordinator +export const streamsPipeTask = task({ + id: "streams-pipe", + run: async () => { + const source = ReadableStream.from(generateChunks(5)); + const { waitUntilComplete } = textStream.pipe(source); + await waitUntilComplete(); + + return { written: 5 }; + }, +}); + +export const streamsPipeReadTask = task({ + id: "streams-pipe-read", + run: async () => { + const handle = await streamsPipeTask.trigger({}); + + const stream = await textStream.read(handle.id); + const chunks: string[] = []; + for await (const chunk of stream) { + logger.info("read chunk from pipe", { chunk }); + chunks.push(chunk); + } + + return { chunks }; + }, +}); + +// Test 2: .append() then read back from S2 +export const streamsAppendTask = task({ + id: "streams-append", + run: async () => { + await logStream.append("Starting processing"); + await progressStream.append({ step: "init", percent: 0 }); + + await setTimeout(500); + await logStream.append("Step 1 complete"); + await progressStream.append({ step: "step-1", percent: 33 }); + + await setTimeout(500); + await logStream.append("Step 2 complete"); + await progressStream.append({ step: "step-2", percent: 66 }); + + await setTimeout(500); + await logStream.append("All done"); + await progressStream.append({ step: "done", percent: 100 }); + + return { success: true }; + }, +}); + +export const streamsAppendReadTask = task({ + id: "streams-append-read", + run: async () => { + const handle = await streamsAppendTask.trigger({}); + + // Read both log and progress streams from the child + const logStreamReader = await logStream.read(handle.id); + const logs: string[] = []; + for await (const chunk of logStreamReader) { + logger.info("read log", { chunk }); + logs.push(chunk); + } + + const progressStreamReader = await progressStream.read(handle.id); + const steps: Array<{ step: string; percent: number }> = []; + for await (const chunk of progressStreamReader) { + logger.info("read progress", { chunk }); + steps.push(chunk); + } + + return { logs, steps }; + }, +}); + +// Test 3: .writer() then read back +export const streamsWriterTask = task({ + id: "streams-writer", + run: async () => { + const { waitUntilComplete } = logStream.writer({ + execute: ({ write, merge }) => { + write("Line 1 from write()"); + write("Line 2 from write()"); + + const moreLines = ReadableStream.from(["Line 3 from merge()", "Line 4 from merge()"]); + merge(moreLines); + }, + }); + + await waitUntilComplete(); + + return { written: 4 }; + }, +}); + +export const streamsWriterReadTask = task({ + id: "streams-writer-read", + run: async () => { + const handle = await streamsWriterTask.trigger({}); + + const stream = await logStream.read(handle.id); + const lines: string[] = []; + for await (const chunk of stream) { + logger.info("read writer line", { chunk }); + lines.push(chunk); + } + + return { lines }; + }, +}); + +// Test 4: Direct streams.pipe() then read back with streams.read() +export const streamsDirectPipeTask = task({ + id: "streams-direct-pipe", + run: async () => { + const source = ReadableStream.from(generateChunks(3)); + const { waitUntilComplete } = streams.pipe("direct-output", source); + await waitUntilComplete(); + + return { written: 3 }; + }, +}); + +export const streamsDirectPipeReadTask = task({ + id: "streams-direct-pipe-read", + run: async () => { + const handle = await streamsDirectPipeTask.trigger({}); + + const stream = await streams.read(handle.id, "direct-output"); + const chunks: string[] = []; + for await (const chunk of stream) { + logger.info("read direct pipe chunk", { chunk }); + chunks.push(chunk as string); + } + + return { chunks }; + }, +}); + +// Test 5: Direct streams.append() then read back +export const streamsDirectAppendTask = task({ + id: "streams-direct-append", + run: async () => { + await streams.append("direct-logs", "Log entry 1"); + await setTimeout(300); + await streams.append("direct-logs", "Log entry 2"); + await setTimeout(300); + await streams.append("direct-logs", "Log entry 3"); + + return { written: 3 }; + }, +}); + +export const streamsDirectAppendReadTask = task({ + id: "streams-direct-append-read", + run: async () => { + const handle = await streamsDirectAppendTask.trigger({}); + + const stream = await streams.read(handle.id, "direct-logs"); + const entries: string[] = []; + for await (const chunk of stream) { + logger.info("read direct append entry", { chunk }); + entries.push(chunk as string); + } + + return { entries }; + }, +}); + +// Test 6: Multiple streams in one task, read all back +export const streamsMultiTask = task({ + id: "streams-multi", + run: async () => { + await logStream.append("Starting multi-stream test"); + await progressStream.append({ step: "start", percent: 0 }); + + const source = ReadableStream.from(generateChunks(3)); + const { waitUntilComplete } = textStream.pipe(source); + + await setTimeout(500); + await logStream.append("Text stream piped"); + await progressStream.append({ step: "piped", percent: 50 }); + + await waitUntilComplete(); + + await logStream.append("Complete"); + await progressStream.append({ step: "done", percent: 100 }); + + return { success: true }; + }, +}); + +export const streamsMultiReadTask = task({ + id: "streams-multi-read", + run: async () => { + const handle = await streamsMultiTask.trigger({}); + + const logReader = await logStream.read(handle.id); + const logs: string[] = []; + for await (const chunk of logReader) { + logs.push(chunk); + } + + const progressReader = await progressStream.read(handle.id); + const steps: Array<{ step: string; percent: number }> = []; + for await (const chunk of progressReader) { + steps.push(chunk); + } + + const textReader = await textStream.read(handle.id); + const texts: string[] = []; + for await (const chunk of textReader) { + texts.push(chunk); + } + + return { logs, steps, texts }; + }, +}); + +async function* generateChunks(count: number) { + for (let i = 0; i < count; i++) { + await setTimeout(200); + yield `chunk-${i}`; + } +} diff --git a/rules/4.3.0/realtime.md b/rules/4.3.0/realtime.md new file mode 100644 index 00000000000..af1e6398324 --- /dev/null +++ b/rules/4.3.0/realtime.md @@ -0,0 +1,463 @@ +# Trigger.dev Realtime (v4) + +**Real-time monitoring and updates for runs** + +## Core Concepts + +Realtime allows you to: + +- Subscribe to run status changes, metadata updates, and streams +- Build real-time dashboards and UI updates +- Monitor task progress from frontend and backend +- Send data into running tasks with input streams + +## Authentication + +### Public Access Tokens + +```ts +import { auth } from "@trigger.dev/sdk"; + +// Read-only token for specific runs +const publicToken = await auth.createPublicToken({ + scopes: { + read: { + runs: ["run_123", "run_456"], + tasks: ["my-task-1", "my-task-2"], + }, + }, + expirationTime: "1h", // Default: 15 minutes +}); +``` + +### Trigger Tokens (Frontend only) + +```ts +// Single-use token for triggering tasks +const triggerToken = await auth.createTriggerPublicToken("my-task", { + expirationTime: "30m", +}); +``` + +## Backend Usage + +### Subscribe to Runs + +```ts +import { runs, tasks } from "@trigger.dev/sdk"; + +// Trigger and subscribe +const handle = await tasks.trigger("my-task", { data: "value" }); + +// Subscribe to specific run +for await (const run of runs.subscribeToRun(handle.id)) { + console.log(`Status: ${run.status}, Progress: ${run.metadata?.progress}`); + if (run.status === "COMPLETED") break; +} + +// Subscribe to runs with tag +for await (const run of runs.subscribeToRunsWithTag("user-123")) { + console.log(`Tagged run ${run.id}: ${run.status}`); +} + +// Subscribe to batch +for await (const run of runs.subscribeToBatch(batchId)) { + console.log(`Batch run ${run.id}: ${run.status}`); +} +``` + +### Realtime Streams v2 (Recommended) + +```ts +import { streams, InferStreamType } from "@trigger.dev/sdk"; + +// 1. Define streams (shared location) +export const aiStream = streams.define({ + id: "ai-output", +}); + +export type AIStreamPart = InferStreamType; + +// 2. Pipe from task +export const streamingTask = task({ + id: "streaming-task", + run: async (payload) => { + const completion = await openai.chat.completions.create({ + model: "gpt-4", + messages: [{ role: "user", content: payload.prompt }], + stream: true, + }); + + const { waitUntilComplete } = aiStream.pipe(completion); + await waitUntilComplete(); + }, +}); + +// 3. Read from backend +const stream = await aiStream.read(runId, { + timeoutInSeconds: 300, + startIndex: 0, // Resume from specific chunk +}); + +for await (const chunk of stream) { + console.log("Chunk:", chunk); // Fully typed +} +``` + +Enable v2 by upgrading to 4.1.0 or later. + +## Input Streams + +Input streams let you send data **into** a running task from your backend or frontend. This enables bidirectional communication — output streams send data out of tasks, input streams complete the loop. + +### Problems Input Streams Solve + +**Cancelling AI streams mid-generation.** When you use AI SDK's `streamText` inside a task, the LLM keeps generating tokens until it's done — even if the user has navigated away or clicked "Stop generating." Without input streams, there's no way to tell the running task to abort. The task burns through tokens and compute for a response nobody will read. With input streams, your frontend sends a cancel signal and the task aborts the LLM call immediately. + +**Human-in-the-loop workflows.** A task generates a draft email, then pauses and waits for the user to approve or edit it before sending. Input streams let the task block on `approval.once()` until the user responds. + +**Interactive agents.** An AI agent running as a task needs follow-up information from the user mid-execution — clarifying a question, choosing between options, or providing additional context. + +### Defining Input Streams + +Define input streams in a shared file so both your task code and your backend/frontend can import them: + +```ts +// trigger/streams.ts +import { streams } from "@trigger.dev/sdk"; + +// Typed input stream — the generic parameter defines the shape of data sent in +export const cancelSignal = streams.input<{ reason?: string }>({ + id: "cancel", +}); + +export const approval = streams.input<{ approved: boolean; reviewer: string }>({ + id: "approval", +}); +``` + +### Receiving Data Inside a Task + +#### `once()` — Wait for the next value + +Blocks until data arrives. Useful for approval gates and one-shot signals. + +```ts +import { task } from "@trigger.dev/sdk"; +import { approval } from "./streams"; + +export const draftEmailTask = task({ + id: "draft-email", + run: async (payload: { to: string; subject: string }) => { + const draft = await generateDraft(payload); + + // Task pauses here until someone sends approval + const result = await approval.once(); + + if (result.approved) { + await sendEmail(draft); + return { sent: true, reviewer: result.reviewer }; + } + + return { sent: false, reviewer: result.reviewer }; + }, +}); +``` + +`once()` accepts options for timeouts and abort signals: + +```ts +// With a timeout — rejects if no data arrives within 5 minutes +const result = await approval.once({ timeoutMs: 300_000 }); + +// With an abort signal +const controller = new AbortController(); +const result = await approval.once({ signal: controller.signal }); +``` + +#### `on()` — Listen for every value + +Registers a persistent handler that fires on every piece of data. Returns a subscription with an `.off()` method. + +```ts +import { task } from "@trigger.dev/sdk"; +import { cancelSignal } from "./streams"; + +export const streamingTask = task({ + id: "streaming-task", + run: async (payload: { prompt: string }) => { + const controller = new AbortController(); + + // Listen for cancel signals + const sub = cancelSignal.on((data) => { + console.log("Cancelled:", data.reason); + controller.abort(); + }); + + try { + const result = await streamText({ + model: openai("gpt-4o"), + prompt: payload.prompt, + abortSignal: controller.signal, + }); + return result; + } finally { + sub.off(); // Clean up the listener + } + }, +}); +``` + +#### `peek()` — Non-blocking check + +Returns the most recent buffered value without waiting, or `undefined` if nothing has been received yet. + +```ts +const latest = cancelSignal.peek(); +if (latest) { + // A cancel was already sent before we checked +} +``` + +### Sending Data to a Running Task + +Use `.send()` from your backend to push data into a running task: + +```ts +import { cancelSignal, approval } from "./trigger/streams"; + +// Cancel a running AI stream +await cancelSignal.send(runId, { reason: "User clicked stop" }); + +// Approve a draft +await approval.send(runId, { approved: true, reviewer: "alice@example.com" }); +``` + +### Cancelling AI SDK `streamText` Mid-Stream + +This is the most common use case. Here's a complete example: + +```ts +// trigger/streams.ts +import { streams } from "@trigger.dev/sdk"; + +export const aiOutput = streams.define({ id: "ai" }); +export const cancelStream = streams.input<{ reason?: string }>({ id: "cancel" }); +``` + +```ts +// trigger/ai-task.ts +import { task } from "@trigger.dev/sdk"; +import { streamText } from "ai"; +import { openai } from "@ai-sdk/openai"; +import { aiOutput, cancelStream } from "./streams"; + +export const aiTask = task({ + id: "ai-chat", + run: async (payload: { prompt: string }) => { + const controller = new AbortController(); + + // If the user cancels, abort the LLM call + const sub = cancelStream.on(() => { + controller.abort(); + }); + + try { + const result = streamText({ + model: openai("gpt-4o"), + prompt: payload.prompt, + abortSignal: controller.signal, + }); + + // Stream output to the frontend in real-time + const { waitUntilComplete } = aiOutput.pipe(result.textStream); + await waitUntilComplete(); + + return { text: await result.text }; + } finally { + sub.off(); + } + }, +}); +``` + +```ts +// Backend: cancel from an API route +import { cancelStream } from "./trigger/streams"; + +export async function POST(req: Request) { + const { runId } = await req.json(); + await cancelStream.send(runId, { reason: "User clicked stop" }); + return Response.json({ cancelled: true }); +} +``` + +### Important Notes + +- Input streams require v2 realtime streams (enabled by default in SDK 4.1.0+). If you're on an older version, calling `.on()` or `.once()` will throw with instructions to enable it. +- You cannot send data to a completed, failed, or canceled run. +- Maximum payload size per `.send()` call is 1MB. +- Data sent before any listener is registered is buffered and delivered when a listener attaches. +- Type safety is enforced through the generic parameter on `streams.input()`. + +## React Frontend Usage + +### Installation + +```bash +npm add @trigger.dev/react-hooks +``` + +### Triggering Tasks + +```tsx +"use client"; +import { useTaskTrigger, useRealtimeTaskTrigger } from "@trigger.dev/react-hooks"; +import type { myTask } from "../trigger/tasks"; + +function TriggerComponent({ accessToken }: { accessToken: string }) { + // Basic trigger + const { submit, handle, isLoading } = useTaskTrigger("my-task", { + accessToken, + }); + + // Trigger with realtime updates + const { + submit: realtimeSubmit, + run, + isLoading: isRealtimeLoading, + } = useRealtimeTaskTrigger("my-task", { accessToken }); + + return ( +
+ + + + + {run &&
Status: {run.status}
} +
+ ); +} +``` + +### Subscribing to Runs + +```tsx +"use client"; +import { useRealtimeRun, useRealtimeRunsWithTag } from "@trigger.dev/react-hooks"; +import type { myTask } from "../trigger/tasks"; + +function SubscribeComponent({ runId, accessToken }: { runId: string; accessToken: string }) { + // Subscribe to specific run + const { run, error } = useRealtimeRun(runId, { + accessToken, + onComplete: (run) => { + console.log("Task completed:", run.output); + }, + }); + + // Subscribe to tagged runs + const { runs } = useRealtimeRunsWithTag("user-123", { accessToken }); + + if (error) return
Error: {error.message}
; + if (!run) return
Loading...
; + + return ( +
+
Status: {run.status}
+
Progress: {run.metadata?.progress || 0}%
+ {run.output &&
Result: {JSON.stringify(run.output)}
} + +

Tagged Runs:

+ {runs.map((r) => ( +
+ {r.id}: {r.status} +
+ ))} +
+ ); +} +``` + +### Realtime Streams with React + +```tsx +"use client"; +import { useRealtimeStream } from "@trigger.dev/react-hooks"; +import { aiStream } from "../trigger/streams"; + +function StreamComponent({ runId, accessToken }: { runId: string; accessToken: string }) { + // Pass defined stream directly for type safety + const { parts, error } = useRealtimeStream(aiStream, runId, { + accessToken, + timeoutInSeconds: 300, + throttleInMs: 50, // Control re-render frequency + }); + + if (error) return
Error: {error.message}
; + if (!parts) return
Loading...
; + + const text = parts.join(""); // parts is typed as AIStreamPart[] + + return
Streamed Text: {text}
; +} +``` + +### Wait Tokens + +```tsx +"use client"; +import { useWaitToken } from "@trigger.dev/react-hooks"; + +function WaitTokenComponent({ tokenId, accessToken }: { tokenId: string; accessToken: string }) { + const { complete } = useWaitToken(tokenId, { accessToken }); + + return ; +} +``` + +### SWR Hooks (Fetch Once) + +```tsx +"use client"; +import { useRun } from "@trigger.dev/react-hooks"; +import type { myTask } from "../trigger/tasks"; + +function SWRComponent({ runId, accessToken }: { runId: string; accessToken: string }) { + const { run, error, isLoading } = useRun(runId, { + accessToken, + refreshInterval: 0, // Disable polling (recommended) + }); + + if (isLoading) return
Loading...
; + if (error) return
Error: {error.message}
; + + return
Run: {run?.status}
; +} +``` + +## Run Object Properties + +Key properties available in run subscriptions: + +- `id`: Unique run identifier +- `status`: `QUEUED`, `EXECUTING`, `COMPLETED`, `FAILED`, `CANCELED`, etc. +- `payload`: Task input data (typed) +- `output`: Task result (typed, when completed) +- `metadata`: Real-time updatable data +- `createdAt`, `updatedAt`: Timestamps +- `costInCents`: Execution cost + +## Best Practices + +- **Use Realtime over SWR**: Recommended for most use cases due to rate limits +- **Scope tokens properly**: Only grant necessary read/trigger permissions +- **Handle errors**: Always check for errors in hooks and subscriptions +- **Type safety**: Use task types for proper payload/output typing +- **Cleanup subscriptions**: Backend subscriptions auto-complete, frontend hooks auto-cleanup +- **Clean up input stream listeners**: Always call `.off()` in a `finally` block to avoid leaks +- **Use timeouts with `once()`**: Avoid hanging indefinitely if the signal never arrives