Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/appkit-ui/src/react/genie/use-genie-chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ function fetchConversationPage(
url: `${basePath}/${encodeURIComponent(alias)}/conversations/${encodeURIComponent(convId)}?${params}`,
signal: options.signal,
onMessage: async (message) => {
if (options.signal?.aborted) return;
try {
const event = JSON.parse(message.data) as GenieStreamEvent;
switch (event.type) {
Expand Down Expand Up @@ -318,6 +319,7 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn {
},
signal: abortController.signal,
onMessage: async (message) => {
if (abortController.signal.aborted) return;
try {
processStreamEventRef.current(
JSON.parse(message.data) as GenieStreamEvent,
Expand Down Expand Up @@ -408,6 +410,7 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn {
url,
signal: parentAbortController.signal,
onMessage: async (message) => {
if (parentAbortController.signal.aborted) return;
try {
processStreamEventRef.current(
JSON.parse(message.data) as GenieStreamEvent,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import { act, renderHook, waitFor } from "@testing-library/react";
import { afterEach, describe, expect, test, vi } from "vitest";

// Mock connectSSE — capture callbacks so we can simulate SSE events
let capturedCallbacks: {
onMessage?: (msg: { data: string }) => void;
onError?: (err: Error) => void;
signal?: AbortSignal;
} = {};

const mockConnectSSE = vi.fn().mockImplementation((opts: any) => {
capturedCallbacks = {
onMessage: opts.onMessage,
onError: opts.onError,
signal: opts.signal,
};
return new Promise<void>((resolve) => {
setTimeout(resolve, 0);
});
});

const mockFetchArrow = vi.fn();
const mockProcessArrowBuffer = vi.fn();

vi.mock("@/js", () => ({
connectSSE: (...args: unknown[]) => mockConnectSSE(...args),
ArrowClient: {
fetchArrow: (...args: unknown[]) => mockFetchArrow(...args),
processArrowBuffer: (...args: unknown[]) => mockProcessArrowBuffer(...args),
},
}));

import { useAnalyticsQuery } from "../use-analytics-query";

/** Force `signal.aborted` to true on the captured signal — simulates the
* cleanup phase of the first StrictMode mount. */
function markAborted() {
const sig = capturedCallbacks.signal;
if (!sig) throw new Error("signal not captured yet");
Object.defineProperty(sig, "aborted", { value: true, configurable: true });
}

describe("useAnalyticsQuery", () => {
afterEach(() => {
capturedCallbacks = {};
vi.clearAllMocks();
});

test("initial state is idle", () => {
const { result } = renderHook(() =>
useAnalyticsQuery("revenue", null, { autoStart: false }),
);

expect(result.current.data).toBeNull();
expect(result.current.loading).toBe(false);
expect(result.current.error).toBeNull();
});

test("autoStart calls connectSSE with the query URL on mount", async () => {
renderHook(() => useAnalyticsQuery("revenue", { region: "us" }));

await waitFor(() => {
expect(mockConnectSSE).toHaveBeenCalledWith(
expect.objectContaining({
url: "/api/analytics/query/revenue",
payload: JSON.stringify({
parameters: { region: "us" },
format: "JSON",
}),
}),
);
});
});

test("normal `result` envelope sets data on the happy path", async () => {
const { result } = renderHook(() => useAnalyticsQuery("revenue", null));

await waitFor(() => expect(capturedCallbacks.onMessage).toBeDefined());

await act(async () => {
await capturedCallbacks.onMessage?.({
data: JSON.stringify({ type: "result", data: [{ id: 1 }] }),
});
});

expect(result.current.data).toEqual([{ id: 1 }]);
expect(result.current.error).toBeNull();
expect(result.current.loading).toBe(false);
});

test("normal `error` envelope sets error on the unhappy path", async () => {
const { result } = renderHook(() => useAnalyticsQuery("revenue", null));

await waitFor(() => expect(capturedCallbacks.onMessage).toBeDefined());

await act(async () => {
await capturedCallbacks.onMessage?.({
data: JSON.stringify({
type: "error",
error: "Statement failed",
code: "EXECUTION_ERROR",
}),
});
});

expect(result.current.error).toBe("Statement failed");
expect(result.current.loading).toBe(false);
});

test("ignores late `error` envelope arriving after the controller was aborted", async () => {
// Regression: under React StrictMode the first mount's cleanup aborts
// the controller it owns, but the server-side SSE writer can still
// emit a final `event: error` envelope on the already-open stream
// (cancellation hand-off). Without an early `aborted` guard in
// onMessage, that envelope hit the error branch and surfaced a
// transient user-visible error before the second mount's data arrived.
// The fix mirrors the guard already present at the top of `onError`.
const { result } = renderHook(() => useAnalyticsQuery("revenue", null));

await waitFor(() => expect(capturedCallbacks.signal).toBeDefined());

markAborted();

await act(async () => {
await capturedCallbacks.onMessage?.({
data: JSON.stringify({
type: "error",
error: "Statement was canceled",
code: "STREAM_ABORTED",
}),
});
});

expect(result.current.error).toBeNull();
});

test("ignores late `result` envelope arriving after the controller was aborted", async () => {
const { result } = renderHook(() => useAnalyticsQuery("revenue", null));

await waitFor(() => expect(capturedCallbacks.signal).toBeDefined());

markAborted();

await act(async () => {
await capturedCallbacks.onMessage?.({
data: JSON.stringify({ type: "result", data: [{ id: 99 }] }),
});
});

expect(result.current.data).toBeNull();
});

test("ignores late `arrow` envelope arriving after the controller was aborted", async () => {
const { result } = renderHook(() =>
useAnalyticsQuery("revenue", null, { format: "ARROW" }),
);

await waitFor(() => expect(capturedCallbacks.signal).toBeDefined());

markAborted();

await act(async () => {
await capturedCallbacks.onMessage?.({
data: JSON.stringify({ type: "arrow", statement_id: "stmt-123" }),
});
});

// Critical: an aborted controller must NOT trigger the side-effectful
// arrow fetch — that would hit the network for a result the consumer
// has already given up on.
expect(mockFetchArrow).not.toHaveBeenCalled();
expect(result.current.data).toBeNull();
expect(result.current.error).toBeNull();
});

test("aborts the controller on unmount", async () => {
const { unmount } = renderHook(() => useAnalyticsQuery("revenue", null));

await waitFor(() => expect(capturedCallbacks.signal).toBeDefined());

expect(capturedCallbacks.signal?.aborted).toBe(false);

unmount();

expect(capturedCallbacks.signal?.aborted).toBe(true);
});
});
9 changes: 9 additions & 0 deletions packages/appkit-ui/src/react/hooks/use-analytics-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ export function useAnalyticsQuery<
payload: payload,
signal: abortController.signal,
onMessage: async (message) => {
// Bail silently if the controller we own is already aborted. Mirrors
// the guard at the top of `onError` below. Without this, the server-
// side SSE writer's final error envelope (emitted in response to our
// own cleanup-driven abort) lands on this handler, hits the
// `parsed.type === "error"` branch, and surfaces a transient error to
// the user — visible in dev under React StrictMode's double-mount,
// hidden in prod where StrictMode is a no-op. The next mount/refetch
// creates a fresh controller and runs cleanly.
if (abortController.signal.aborted) return;
try {
const parsed = JSON.parse(message.data);

Expand Down
Loading