Skip to content

Commit aa5a8cd

Browse files
committed
fix: address CodeRabbit review feedback on input streams PR
- Prevent waitpoint linkage loss: get before complete, then delete - Fix timeout header validation for 0 and NaN values - Allow skipAccessTokens as alternative to access token for v2 gating - Clear timeout timer on abort in once() to prevent leaks - Clean up tail state after error/completion for reconnection - Use strict undefined check for falsy input payloads in wait()
1 parent 7f1ce0d commit aa5a8cd

File tree

6 files changed

+61
-22
lines changed

6 files changed

+61
-22
lines changed

apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import { json } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { $replica } from "~/db.server";
4-
import { getAndDeleteInputStreamWaitpoint } from "~/services/inputStreamWaitpointCache.server";
4+
import {
5+
getInputStreamWaitpoint,
6+
deleteInputStreamWaitpoint,
7+
} from "~/services/inputStreamWaitpointCache.server";
58
import {
69
createActionApiRoute,
710
createLoaderApiRoute,
@@ -80,7 +83,8 @@ const { action } = createActionApiRoute(
8083
);
8184

8285
// Check Redis cache for a linked .wait() waitpoint (fast, no DB hit if none)
83-
const waitpointId = await getAndDeleteInputStreamWaitpoint(params.runId, params.streamId);
86+
// Get first, complete, then delete — so the mapping survives if completeWaitpoint throws
87+
const waitpointId = await getInputStreamWaitpoint(params.runId, params.streamId);
8488
if (waitpointId) {
8589
await engine.completeWaitpoint({
8690
id: waitpointId,
@@ -90,6 +94,7 @@ const { action } = createActionApiRoute(
9094
isError: false,
9195
},
9296
});
97+
await deleteInputStreamWaitpoint(params.runId, params.streamId);
9398
}
9499

95100
return json({ ok: true });
@@ -132,17 +137,18 @@ const loader = createLoaderApiRoute(
132137
const lastEventId = request.headers.get("Last-Event-ID") || undefined;
133138

134139
const timeoutInSecondsRaw = request.headers.get("Timeout-Seconds") ?? undefined;
135-
const timeoutInSeconds = timeoutInSecondsRaw ? parseInt(timeoutInSecondsRaw) : undefined;
140+
const timeoutInSeconds =
141+
timeoutInSecondsRaw !== undefined ? parseInt(timeoutInSecondsRaw, 10) : undefined;
136142

137-
if (timeoutInSeconds && isNaN(timeoutInSeconds)) {
143+
if (timeoutInSeconds !== undefined && isNaN(timeoutInSeconds)) {
138144
return new Response("Invalid timeout seconds", { status: 400 });
139145
}
140146

141-
if (timeoutInSeconds && timeoutInSeconds < 1) {
147+
if (timeoutInSeconds !== undefined && timeoutInSeconds < 1) {
142148
return new Response("Timeout seconds must be greater than 0", { status: 400 });
143149
}
144150

145-
if (timeoutInSeconds && timeoutInSeconds > 600) {
151+
if (timeoutInSeconds !== undefined && timeoutInSeconds > 600) {
146152
return new Response("Timeout seconds must be less than 600", { status: 400 });
147153
}
148154

apps/webapp/app/services/inputStreamWaitpointCache.server.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,29 @@ export async function setInputStreamWaitpoint(
5454
}
5555
}
5656

57+
/**
58+
* Get the waitpoint ID for an input stream without deleting it.
59+
* Called from the `.send()` route before completing the waitpoint.
60+
*/
61+
export async function getInputStreamWaitpoint(
62+
runFriendlyId: string,
63+
streamId: string
64+
): Promise<string | null> {
65+
if (!redis) return null;
66+
67+
try {
68+
const key = buildKey(runFriendlyId, streamId);
69+
return await redis.get(key);
70+
} catch (error) {
71+
logger.error("Failed to get input stream waitpoint cache", {
72+
runFriendlyId,
73+
streamId,
74+
error,
75+
});
76+
return null;
77+
}
78+
}
79+
5780
/**
5881
* Atomically get and delete the waitpoint ID for an input stream.
5982
* Uses GETDEL for atomicity — only one concurrent `.send()` call will get the ID.

apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@ export function getRealtimeStreamInstance(
3636
if (streamVersion === "v1") {
3737
return v1RealtimeStreams;
3838
} else {
39-
if (env.REALTIME_STREAMS_S2_BASIN && env.REALTIME_STREAMS_S2_ACCESS_TOKEN) {
39+
if (
40+
env.REALTIME_STREAMS_S2_BASIN &&
41+
(env.REALTIME_STREAMS_S2_ACCESS_TOKEN ||
42+
env.REALTIME_STREAMS_S2_SKIP_ACCESS_TOKENS === "true")
43+
) {
4044
return new S2RealtimeStreams({
4145
basin: env.REALTIME_STREAMS_S2_BASIN,
4246
accessToken: env.REALTIME_STREAMS_S2_ACCESS_TOKEN,
@@ -70,7 +74,7 @@ export function determineRealtimeStreamsVersion(streamVersion?: string): "v1" |
7074
if (
7175
streamVersion === "v2" &&
7276
env.REALTIME_STREAMS_S2_BASIN &&
73-
env.REALTIME_STREAMS_S2_ACCESS_TOKEN
77+
(env.REALTIME_STREAMS_S2_ACCESS_TOKEN || env.REALTIME_STREAMS_S2_SKIP_ACCESS_TOKENS === "true")
7478
) {
7579
return "v2";
7680
}

packages/core/src/v3/inputStreams/manager.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ export class StandardInputStreamManager implements InputStreamManager {
100100
options.signal.addEventListener(
101101
"abort",
102102
() => {
103+
if (waiter.timeoutHandle) {
104+
clearTimeout(waiter.timeoutHandle);
105+
}
103106
this.#removeOnceWaiter(streamId, waiter);
104107
reject(new Error("Aborted"));
105108
},
@@ -174,13 +177,15 @@ export class StandardInputStreamManager implements InputStreamManager {
174177
#ensureStreamTailConnected(streamId: string): void {
175178
if (!this.tails.has(streamId) && this.currentRunId) {
176179
const abortController = new AbortController();
177-
const promise = this.#runTail(this.currentRunId, streamId, abortController.signal).catch(
178-
(error) => {
180+
const promise = this.#runTail(this.currentRunId, streamId, abortController.signal)
181+
.catch((error) => {
179182
if (this.debug) {
180183
console.error(`[InputStreamManager] Tail error for "${streamId}":`, error);
181184
}
182-
}
183-
);
185+
})
186+
.finally(() => {
187+
this.tails.delete(streamId);
188+
});
184189
this.tails.set(streamId, { abortController, promise });
185190
}
186191
}

packages/trigger-sdk/src/v3/streams.ts

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -771,15 +771,16 @@ function input<TData>(opts: { id: string }): RealtimeDefinedInputStream<TData> {
771771
const waitResult = await runtime.waitUntil(response.waitpointId);
772772

773773
// 4. Parse the output
774-
const data = waitResult.output
775-
? await conditionallyImportAndParsePacket(
776-
{
777-
data: waitResult.output,
778-
dataType: waitResult.outputType ?? "application/json",
779-
},
780-
apiClient
781-
)
782-
: undefined;
774+
const data =
775+
waitResult.output !== undefined
776+
? await conditionallyImportAndParsePacket(
777+
{
778+
data: waitResult.output,
779+
dataType: waitResult.outputType ?? "application/json",
780+
},
781+
apiClient
782+
)
783+
: undefined;
783784

784785
if (waitResult.ok) {
785786
return { ok: true as const, output: data as TData };

references/hello-world/src/trigger/inputStreams.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ export const inputStreamCoordinator = task({
2020
run: async () => {
2121
const results: Record<string, unknown> = {};
2222

23-
// --- Test 1: .once() ---
23+
// --- Test 1: .once() ----
2424
logger.info("Test 1: .once()");
2525
const onceHandle = await inputStreamOnce.trigger({});
2626
await wait.for({ seconds: 5 });

0 commit comments

Comments
 (0)