From 5c0947d1c46c9983c6fb509a66730d77b338ace6 Mon Sep 17 00:00:00 2001 From: Atila Fassina Date: Wed, 6 May 2026 10:58:37 +0200 Subject: [PATCH 1/2] fix(appkit-ui): ignore late SSE events for an already-aborted controller in useAnalyticsQuery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit useAnalyticsQuery's `onError` callback bails silently when `abortController.signal.aborted` is true, but `onMessage` had no matching guard. Under React StrictMode's double-mount, the first mount's cleanup aborted controller A, but the server-side SSE writer still emitted a final `event: error` envelope on the already-open stream (cancellation hand-off). That envelope arrived at the hook's onMessage, sailed past the missing guard, hit the `parsed.type === "error"` branch, and surfaced a user-visible error before the second mount's data arrived. Production builds (StrictMode no-op) didn't see it; dev did. Mirror the `onError` guard at the top of `onMessage`. Once the controller is aborted, drop ALL events from that stream uniformly (`result`, `arrow`, `error`) — the next mount/refetch creates a fresh controller and runs cleanly. The `arrow` branch matters specifically because it triggers a side-effectful `ArrowClient.fetchArrow` network call, which the consumer has already given up on. Port of d8a3bc30 from analytics-exp, which applied the same fix to useMetricView. The remaining cross-mount race (cache singleflight propagating mount-1's AbortError to mount-2's awaited promise), server-side AbortError mis-classification as UPSTREAM_ERROR, and the analogous gap in useGenieChat are tracked as follow-up PRs. Tests: new file use-analytics-query.test.ts covering the regression (late `error`, `result`, `arrow` envelopes after abort all dropped), the happy paths (normal `error` and `result` envelopes still set state), URL/payload assertions, and the unmount-aborts-controller contract. 8 tests; 276 appkit-ui tests pass; build, biome, typecheck green. Co-authored-by: Isaac Signed-off-by: Atila Fassina --- .../__tests__/use-analytics-query.test.ts | 187 ++++++++++++++++++ .../src/react/hooks/use-analytics-query.ts | 9 + 2 files changed, 196 insertions(+) create mode 100644 packages/appkit-ui/src/react/hooks/__tests__/use-analytics-query.test.ts diff --git a/packages/appkit-ui/src/react/hooks/__tests__/use-analytics-query.test.ts b/packages/appkit-ui/src/react/hooks/__tests__/use-analytics-query.test.ts new file mode 100644 index 000000000..6554292c4 --- /dev/null +++ b/packages/appkit-ui/src/react/hooks/__tests__/use-analytics-query.test.ts @@ -0,0 +1,187 @@ +import { act, renderHook, waitFor } from "@testing-library/react"; +import { afterEach, describe, expect, test, vi } from "vitest"; + +// Mock connectSSE — capture callbacks so we can simulate SSE events +let capturedCallbacks: { + onMessage?: (msg: { data: string }) => void; + onError?: (err: Error) => void; + signal?: AbortSignal; +} = {}; + +const mockConnectSSE = vi.fn().mockImplementation((opts: any) => { + capturedCallbacks = { + onMessage: opts.onMessage, + onError: opts.onError, + signal: opts.signal, + }; + return new Promise((resolve) => { + setTimeout(resolve, 0); + }); +}); + +const mockFetchArrow = vi.fn(); +const mockProcessArrowBuffer = vi.fn(); + +vi.mock("@/js", () => ({ + connectSSE: (...args: unknown[]) => mockConnectSSE(...args), + ArrowClient: { + fetchArrow: (...args: unknown[]) => mockFetchArrow(...args), + processArrowBuffer: (...args: unknown[]) => mockProcessArrowBuffer(...args), + }, +})); + +import { useAnalyticsQuery } from "../use-analytics-query"; + +/** Force `signal.aborted` to true on the captured signal — simulates the + * cleanup phase of the first StrictMode mount. */ +function markAborted() { + const sig = capturedCallbacks.signal; + if (!sig) throw new Error("signal not captured yet"); + Object.defineProperty(sig, "aborted", { value: true, configurable: true }); +} + +describe("useAnalyticsQuery", () => { + afterEach(() => { + capturedCallbacks = {}; + vi.clearAllMocks(); + }); + + test("initial state is idle", () => { + const { result } = renderHook(() => + useAnalyticsQuery("revenue", null, { autoStart: false }), + ); + + expect(result.current.data).toBeNull(); + expect(result.current.loading).toBe(false); + expect(result.current.error).toBeNull(); + }); + + test("autoStart calls connectSSE with the query URL on mount", async () => { + renderHook(() => useAnalyticsQuery("revenue", { region: "us" })); + + await waitFor(() => { + expect(mockConnectSSE).toHaveBeenCalledWith( + expect.objectContaining({ + url: "/api/analytics/query/revenue", + payload: JSON.stringify({ + parameters: { region: "us" }, + format: "JSON", + }), + }), + ); + }); + }); + + test("normal `result` envelope sets data on the happy path", async () => { + const { result } = renderHook(() => useAnalyticsQuery("revenue", null)); + + await waitFor(() => expect(capturedCallbacks.onMessage).toBeDefined()); + + await act(async () => { + await capturedCallbacks.onMessage?.({ + data: JSON.stringify({ type: "result", data: [{ id: 1 }] }), + }); + }); + + expect(result.current.data).toEqual([{ id: 1 }]); + expect(result.current.error).toBeNull(); + expect(result.current.loading).toBe(false); + }); + + test("normal `error` envelope sets error on the unhappy path", async () => { + const { result } = renderHook(() => useAnalyticsQuery("revenue", null)); + + await waitFor(() => expect(capturedCallbacks.onMessage).toBeDefined()); + + await act(async () => { + await capturedCallbacks.onMessage?.({ + data: JSON.stringify({ + type: "error", + error: "Statement failed", + code: "EXECUTION_ERROR", + }), + }); + }); + + expect(result.current.error).toBe("Statement failed"); + expect(result.current.loading).toBe(false); + }); + + test("ignores late `error` envelope arriving after the controller was aborted", async () => { + // Regression: under React StrictMode the first mount's cleanup aborts + // the controller it owns, but the server-side SSE writer can still + // emit a final `event: error` envelope on the already-open stream + // (cancellation hand-off). Without an early `aborted` guard in + // onMessage, that envelope hit the error branch and surfaced a + // transient user-visible error before the second mount's data arrived. + // The fix mirrors the guard already present at the top of `onError`. + const { result } = renderHook(() => useAnalyticsQuery("revenue", null)); + + await waitFor(() => expect(capturedCallbacks.signal).toBeDefined()); + + markAborted(); + + await act(async () => { + await capturedCallbacks.onMessage?.({ + data: JSON.stringify({ + type: "error", + error: "Statement was canceled", + code: "STREAM_ABORTED", + }), + }); + }); + + expect(result.current.error).toBeNull(); + }); + + test("ignores late `result` envelope arriving after the controller was aborted", async () => { + const { result } = renderHook(() => useAnalyticsQuery("revenue", null)); + + await waitFor(() => expect(capturedCallbacks.signal).toBeDefined()); + + markAborted(); + + await act(async () => { + await capturedCallbacks.onMessage?.({ + data: JSON.stringify({ type: "result", data: [{ id: 99 }] }), + }); + }); + + expect(result.current.data).toBeNull(); + }); + + test("ignores late `arrow` envelope arriving after the controller was aborted", async () => { + const { result } = renderHook(() => + useAnalyticsQuery("revenue", null, { format: "ARROW" }), + ); + + await waitFor(() => expect(capturedCallbacks.signal).toBeDefined()); + + markAborted(); + + await act(async () => { + await capturedCallbacks.onMessage?.({ + data: JSON.stringify({ type: "arrow", statement_id: "stmt-123" }), + }); + }); + + // Critical: an aborted controller must NOT trigger the side-effectful + // arrow fetch — that would hit the network for a result the consumer + // has already given up on. + expect(mockFetchArrow).not.toHaveBeenCalled(); + expect(result.current.data).toBeNull(); + expect(result.current.error).toBeNull(); + }); + + test("aborts the controller on unmount", async () => { + const { unmount } = renderHook(() => useAnalyticsQuery("revenue", null)); + + await waitFor(() => expect(capturedCallbacks.signal).toBeDefined()); + + expect(capturedCallbacks.signal?.aborted).toBe(false); + + unmount(); + + expect(capturedCallbacks.signal?.aborted).toBe(true); + }); +}); diff --git a/packages/appkit-ui/src/react/hooks/use-analytics-query.ts b/packages/appkit-ui/src/react/hooks/use-analytics-query.ts index 24e03ea3b..ce07cdb03 100644 --- a/packages/appkit-ui/src/react/hooks/use-analytics-query.ts +++ b/packages/appkit-ui/src/react/hooks/use-analytics-query.ts @@ -119,6 +119,15 @@ export function useAnalyticsQuery< payload: payload, signal: abortController.signal, onMessage: async (message) => { + // Bail silently if the controller we own is already aborted. Mirrors + // the guard at the top of `onError` below. Without this, the server- + // side SSE writer's final error envelope (emitted in response to our + // own cleanup-driven abort) lands on this handler, hits the + // `parsed.type === "error"` branch, and surfaces a transient error to + // the user — visible in dev under React StrictMode's double-mount, + // hidden in prod where StrictMode is a no-op. The next mount/refetch + // creates a fresh controller and runs cleanly. + if (abortController.signal.aborted) return; try { const parsed = JSON.parse(message.data); From a4b6d9c36d6ede2e2c93f91a61ef5d9b396c09d3 Mon Sep 17 00:00:00 2001 From: Atila Fassina Date: Thu, 7 May 2026 12:01:05 +0200 Subject: [PATCH 2/2] fix: apply strict-mode abort guard to all SSE message handlers --- packages/appkit-ui/src/react/genie/use-genie-chat.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/appkit-ui/src/react/genie/use-genie-chat.ts b/packages/appkit-ui/src/react/genie/use-genie-chat.ts index e9bfabde5..9b1520040 100644 --- a/packages/appkit-ui/src/react/genie/use-genie-chat.ts +++ b/packages/appkit-ui/src/react/genie/use-genie-chat.ts @@ -122,6 +122,7 @@ function fetchConversationPage( url: `${basePath}/${encodeURIComponent(alias)}/conversations/${encodeURIComponent(convId)}?${params}`, signal: options.signal, onMessage: async (message) => { + if (options.signal?.aborted) return; try { const event = JSON.parse(message.data) as GenieStreamEvent; switch (event.type) { @@ -318,6 +319,7 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { }, signal: abortController.signal, onMessage: async (message) => { + if (abortController.signal.aborted) return; try { processStreamEventRef.current( JSON.parse(message.data) as GenieStreamEvent, @@ -408,6 +410,7 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { url, signal: parentAbortController.signal, onMessage: async (message) => { + if (parentAbortController.signal.aborted) return; try { processStreamEventRef.current( JSON.parse(message.data) as GenieStreamEvent,