diff --git a/.changeset/stream-ping-timeout-noise.md b/.changeset/stream-ping-timeout-noise.md new file mode 100644 index 00000000..51b2f8c7 --- /dev/null +++ b/.changeset/stream-ping-timeout-noise.md @@ -0,0 +1,12 @@ +--- +"@vercel/flags-core": patch +--- + +Fix stream reconnect path producing noisy `AbortError` spans on instrumented +fetches. + +When the ping watchdog fires (no ping received within the timeout), the SDK now +cancels the body reader instead of aborting the fetch signal. The read loop +exits via `{ done: true }` and the fetch span closes cleanly, so APM/error +tracking (e.g. `@vercel/otel/fetch`) no longer reports the expected reconnect +as `AbortError: This operation was aborted`. Reconnect behavior is unchanged. diff --git a/packages/vercel-flags-core/src/controller/stream-connection.test.ts b/packages/vercel-flags-core/src/controller/stream-connection.test.ts index 5093984e..116e84fa 100644 --- a/packages/vercel-flags-core/src/controller/stream-connection.test.ts +++ b/packages/vercel-flags-core/src/controller/stream-connection.test.ts @@ -583,6 +583,57 @@ describe('connectStream', () => { abortController.abort(); }); + it('should not abort the fetch signal when ping timeout fires', async () => { + // Regression: aborting the fetch signal surfaced as AbortError on + // instrumented fetch spans (e.g. @vercel/otel/fetch) and got reported + // by APM/error tracking even though this is an expected reconnect. + // The ping timeout must instead cancel the body reader, leaving the + // fetch span to close cleanly. + let requestCount = 0; + const signals: AbortSignal[] = []; + + fetchMock.mockImplementation((_input, init) => { + requestCount++; + if (init?.signal) signals.push(init.signal); + return streamResponse( + new ReadableStream({ + start(controller) { + controller.enqueue( + new TextEncoder().encode(`${JSON.stringify(datafileMsg())}\n`), + ); + // Simulate a zombie connection (server stops sending pings). + init?.signal?.addEventListener('abort', () => { + controller.close(); + }); + }, + }), + ); + }); + + const abortController = new AbortController(); + + await connectStream( + { host: HOST, sdkKey: 'vf_test', abortController, fetch: fetchMock }, + { onDatafile: vi.fn() }, + ); + + expect(requestCount).toBe(1); + + // Trigger the ping timeout. + await vi.advanceTimersByTimeAsync(90_000); + await vi.advanceTimersByTimeAsync(0); + // Advance past the reconnection backoff (min 1s gap). + await vi.advanceTimersByTimeAsync(1_000); + await vi.advanceTimersByTimeAsync(0); + + // Reconnected, but the first request's signal was never aborted — + // only the body reader was cancelled. + expect(requestCount).toBeGreaterThanOrEqual(2); + expect(signals[0]?.aborted).toBe(false); + + abortController.abort(); + }); + it('should reset timeout on each ping', async () => { let streamController: ReadableStreamDefaultController; diff --git a/packages/vercel-flags-core/src/controller/stream-connection.ts b/packages/vercel-flags-core/src/controller/stream-connection.ts index fcf4c156..012eba0b 100644 --- a/packages/vercel-flags-core/src/controller/stream-connection.ts +++ b/packages/vercel-flags-core/src/controller/stream-connection.ts @@ -91,24 +91,29 @@ export async function connectStream( break; } - // Per-connection abort controller — allows ping timeout to abort a single - // connection without stopping the entire retry loop. - const connectionAbort = new AbortController(); - const onMainAbort = (): void => connectionAbort.abort(); + // Per-connection abort controller for the fetch signal. Forwarded from + // the external main abort so a hung fetch (e.g. before headers arrive) + // can still be cancelled on shutdown. + const fetchAbort = new AbortController(); + const onMainAbort = (): void => fetchAbort.abort(); abortController.signal.addEventListener('abort', onMainAbort, { once: true, }); + // Tracks the in-flight body reader so the ping timeout can cancel reads + // gracefully. Cancelling the reader breaks the read loop via + // `{ done: true }` without aborting the fetch signal — the fetch span + // ends cleanly. Aborting the signal would surface as AbortError on + // instrumented fetch spans (e.g. via @vercel/otel/fetch) and get + // reported by APM/error tracking even though this is an expected + // reconnect path. + let reader: ReadableStreamDefaultReader | undefined; let pingTimeoutId: ReturnType | undefined; - // Reference to the response body so the ping timeout can cancel it - // to break out of the for-await loop. - let responseBody: ReadableStream | undefined; const resetPingTimeout = (): void => { if (pingTimeoutId !== undefined) clearTimeout(pingTimeoutId); if (!initialDataReceived) return; pingTimeoutId = setTimeout(() => { - responseBody?.cancel().catch(() => {}); - connectionAbort.abort(); + reader?.cancel().catch(() => {}); }, PING_TIMEOUT_MS); }; @@ -129,7 +134,7 @@ export async function connectStream( } const response = await fetchFn(`${host}/v1/stream`, { headers, - signal: connectionAbort.signal, + signal: fetchAbort.signal, }); if (!response.ok) { @@ -148,17 +153,18 @@ export async function connectStream( throw new Error('stream body was not present'); } - responseBody = response.body; - const reader = response.body.getReader(); + reader = response.body.getReader(); const decoder = new TextDecoder(); const bufferChunks: string[] = []; - // Allow the ping timeout (or main abort) to cancel the reader, - // which breaks the read loop immediately even on a zombie connection. - const onConnectionAbort = (): void => { - reader.cancel().catch(() => {}); + // Ensure the read loop exits promptly when the fetch signal is + // aborted, even on zombie connections where the body doesn't observe + // the abort. Registered after getReader() so any listeners attached + // by the underlying source (e.g. test mocks) fire first. + const onFetchAbort = (): void => { + reader?.cancel().catch(() => {}); }; - connectionAbort.signal.addEventListener('abort', onConnectionAbort, { + fetchAbort.signal.addEventListener('abort', onFetchAbort, { once: true, }); @@ -218,13 +224,11 @@ export async function connectStream( } } } finally { - connectionAbort.signal.removeEventListener( - 'abort', - onConnectionAbort, - ); + fetchAbort.signal.removeEventListener('abort', onFetchAbort); } - // Stream ended normally (server closed connection) - reconnect + // Stream ended (server closed, ping timeout cancelled the reader, + // or external abort). Either reconnect or exit the outer loop. clearTimeout(pingTimeoutId); abortController.signal.removeEventListener('abort', onMainAbort); if (!abortController.signal.aborted) { @@ -241,11 +245,7 @@ export async function connectStream( if (abortController.signal.aborted) { break; } - // Ping timeout aborts only the per-connection controller; this is - // an expected reconnect, not a real error — skip the noisy log. - if (!connectionAbort.signal.aborted) { - console.error('@vercel/flags-core: Stream error', error); - } + console.error('@vercel/flags-core: Stream error', error); onDisconnect?.(); retryCount++; const elapsed = Date.now() - lastAttemptTime;