Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/durable-chat-cancellation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@cloudflare/ai-chat": minor
---

Add `cancelOnClientAbort` to `useAgentChat`. Generic browser/client stream cleanup is now local-only by default so server turns can continue and resume; explicit `stop()` still cancels the server turn. Set `cancelOnClientAbort: true` to make generic client aborts cancel the server turn.
13 changes: 12 additions & 1 deletion docs/chat-agents.md
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,7 @@ function Chat() {
| `onToolCall` | `({ toolCall, addToolOutput }) => void` | — | Handle client-side tool execution |
| `autoContinueAfterToolResult` | `boolean` | `true` | Auto-continue conversation after client tool results and approvals |
| `resume` | `boolean` | `true` | Enable automatic stream resumption on reconnect |
| `cancelOnClientAbort` | `boolean` | `false` | Cancel the server turn when generic client stream abort/cleanup occurs. Explicit `stop()` always cancels the server turn |
| `body` | `object \| () => object` | — | Custom data sent with every request |
| `prepareSendMessagesRequest` | `(options) => { body?, headers? }` | — | Advanced per-request customization |
| `tools` | `Record<string, AITool>` | — | Dynamic client-defined tools for SDK/platform use cases. Schemas are sent to the server automatically |
Expand Down Expand Up @@ -1169,7 +1170,17 @@ When streaming is active:
2. If the client disconnects, the server continues streaming and buffering
3. When the client reconnects, it receives all buffered chunks and resumes live streaming

Disable with `resume: false`:
Generic client stream abort/cleanup stays local to the browser by default, so the server turn keeps running and can be resumed later. An explicit `stop()` still sends server cancellation:

```tsx
const { messages, stop } = useAgentChat({ agent });
```

If your app intentionally wants client lifecycle to own server lifecycle, set `cancelOnClientAbort: true`.
This is useful for request-lifetime or token-saving flows; explicit `stop()` is
always server-side cancellation regardless of this option.

Disable resume with `resume: false`:

```tsx
const { messages } = useAgentChat({ agent, resume: false });
Expand Down
26 changes: 25 additions & 1 deletion docs/resumable-streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ When you use `AIChatAgent` with `useAgentChat`:
2. **On disconnect**: The stream continues server-side, buffering chunks
3. **On reconnect**: Client requests a resume, receives all buffered chunks, and continues streaming

No extra code is needed -- it just works.
No extra code is needed -- it just works. Generic client stream abort/cleanup is local-only by default, so browser navigation or React cleanup does not stop the server turn. An explicit `stop()` still cancels the server turn.

## Example

Expand Down Expand Up @@ -51,6 +51,8 @@ function Chat() {
const { messages, sendMessage, status } = useAgentChat({
agent
// resume: true is the default - streams automatically resume on reconnect
// cancelOnClientAbort: false is the default - browser cleanup does not
// cancel the server turn
});

// ... render your chat UI
Expand Down Expand Up @@ -80,6 +82,28 @@ function Chat() {

Replayed chunks include `replay: true` to distinguish them from live chunks. The client uses this to batch-apply all replayed chunks before rendering, which prevents intermediate states (like reasoning "Thinking..." indicators) from flashing briefly during replay. During a live stream, chunks arrive gradually and React renders each intermediate state naturally.

## Durable client cleanup

`resume: true` controls whether the client tries to reconnect to an active stream. `cancelOnClientAbort: false` is the default cancellation behavior: generic client stream abort/cleanup is local-only, while explicit `stop()` still cancels the server turn.

```tsx
const { messages, stop } = useAgentChat({
agent
});
```

If your app intentionally wants client lifecycle to own server lifecycle, opt in:

```tsx
const { messages } = useAgentChat({
agent,
cancelOnClientAbort: true
});
```

Use this for request-lifetime or token-saving flows. Explicit `stop()` is always
server-side cancellation regardless of `cancelOnClientAbort`.

## Disabling Resume

If you do not want automatic resume (for example, for short responses), disable it:
Expand Down
39 changes: 29 additions & 10 deletions packages/ai-chat/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,25 @@ Streams automatically resume on disconnect/reconnect. No configuration needed.

When a client disconnects mid-stream, chunks are buffered in SQLite. On reconnect, the client receives all buffered chunks and continues receiving the live stream.

Disable with `resume: false`:
Generic client stream abort/cleanup is local-only by default: the server turn continues and can be resumed later. An explicit `stop()` still cancels the server turn:

```tsx
const { messages, stop } = useAgentChat({ agent });
```

If your app intentionally wants client lifecycle to own server lifecycle, opt in to cancellation on client abort:

```tsx
const { messages } = useAgentChat({
agent,
cancelOnClientAbort: true
});
```

Use this for request-lifetime or token-saving flows. Explicit `stop()` is always
server-side cancellation regardless of `cancelOnClientAbort`.

Disable resume with `resume: false`:

```tsx
const { messages } = useAgentChat({ agent, resume: false });
Expand Down Expand Up @@ -429,15 +447,16 @@ React hook for chat interactions. Wraps the AI SDK's `useChat` with WebSocket tr

**Options:**

| Option | Type | Description |
| ----------------------------- | --------------------------------------- | -------------------------------------------------------- |
| `agent` | `ReturnType<typeof useAgent>` | Agent connection (required) |
| `onToolCall` | `({ toolCall, addToolOutput }) => void` | Handle client-side tool execution |
| `autoContinueAfterToolResult` | `boolean` | Auto-continue after client tool results. Default: `true` |
| `resume` | `boolean` | Enable stream resumption. Default: `true` |
| `body` | `object \| () => object` | Custom data sent with every request (see below) |
| `prepareSendMessagesRequest` | `(options) => { body?, headers? }` | Advanced per-request customization |
| `getInitialMessages` | `(options) => Promise<ChatMessage[]>` | Custom initial message loader |
| Option | Type | Description |
| ----------------------------- | --------------------------------------- | -------------------------------------------------------------------------------------------------------------------------- |
| `agent` | `ReturnType<typeof useAgent>` | Agent connection (required) |
| `onToolCall` | `({ toolCall, addToolOutput }) => void` | Handle client-side tool execution |
| `autoContinueAfterToolResult` | `boolean` | Auto-continue after client tool results. Default: `true` |
| `resume` | `boolean` | Enable stream resumption. Default: `true` |
| `cancelOnClientAbort` | `boolean` | Cancel the server turn when generic client stream abort/cleanup occurs. Explicit `stop()` always cancels. Default: `false` |
| `body` | `object \| () => object` | Custom data sent with every request (see below) |
| `prepareSendMessagesRequest` | `(options) => { body?, headers? }` | Advanced per-request customization |
| `getInitialMessages` | `(options) => Promise<ChatMessage[]>` | Custom initial message loader |

**Returns:**

Expand Down
72 changes: 72 additions & 0 deletions packages/ai-chat/src/react-tests/use-agent-chat.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4452,6 +4452,78 @@ describe("useAgentChat tool approval continuations (issue #1108)", () => {
});
});

describe("useAgentChat client abort cancellation", () => {
function createAgentWithTarget({ name, url }: { name: string; url: string }) {
const target = new EventTarget();
const sentMessages: string[] = [];
const agent = createAgent({
name,
url,
send: (data: string) => sentMessages.push(data)
});

(agent as unknown as Record<string, unknown>).addEventListener =
target.addEventListener.bind(target);
(agent as unknown as Record<string, unknown>).removeEventListener =
target.removeEventListener.bind(target);

return { agent, target, sentMessages };
}

it("keeps explicit stop() as server cancellation by default", async () => {
const { agent, sentMessages } = createAgentWithTarget({
name: "explicit-stop",
url: "ws://localhost:3000/agents/chat/explicit-stop?_pk=abc"
});

let chatInstance: ReturnType<typeof useAgentChat> | null = null;

const TestComponent = () => {
const chat = useAgentChat({
agent,
getInitialMessages: null,
messages: [] as UIMessage[]
});
chatInstance = chat;
return <div data-testid="status">{chat.status}</div>;
};

await act(async () => {
render(<TestComponent />, {
wrapper: ({ children }) => (
<StrictMode>
<Suspense fallback="Loading...">{children}</Suspense>
</StrictMode>
)
});
await sleep(10);
});

await act(async () => {
void chatInstance!.sendMessage({ text: "Hello" });
await sleep(10);
});

const requestId = sentMessages
.map((message) => JSON.parse(message) as Record<string, unknown>)
.find((message) => message.type === "cf_agent_use_chat_request")?.id;
expect(requestId).toBeDefined();

await act(async () => {
await chatInstance!.stop();
await sleep(10);
});

const cancelMessage = sentMessages
.map((message) => JSON.parse(message) as Record<string, unknown>)
.find((message) => message.type === "cf_agent_chat_request_cancel");
expect(cancelMessage).toEqual({
type: "cf_agent_chat_request_cancel",
id: requestId
});
});
});

describe("useAgentChat overlapping submits (issue #1231)", () => {
function createAgentWithTarget({ name, url }: { name: string; url: string }) {
const target = new EventTarget();
Expand Down
18 changes: 18 additions & 0 deletions packages/ai-chat/src/react.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,15 @@ type UseAgentChatOptions<
* @default true
*/
resume?: boolean;
/**
* Whether generic client-side stream abort/cleanup should cancel the server
* turn. By default, client cleanup is local-only so the server turn can
* continue and be resumed on reconnect. Explicit stop() always cancels the
* server turn.
*
* @default false
*/
cancelOnClientAbort?: boolean;
/**
* Custom data to include in every chat request body.
* Accepts a static object or a function that returns one (for dynamic values).
Expand Down Expand Up @@ -615,6 +624,7 @@ export function useAgentChat<
autoContinueAfterToolResult = true, // Server auto-continues after tool results/approvals
autoSendAfterAllConfirmationsResolved = true, // Legacy option for client-side batching
resume = true, // Enable stream resumption by default
cancelOnClientAbort = false,
body: bodyOption,
prepareSendMessagesRequest,
...rest
Expand Down Expand Up @@ -887,6 +897,7 @@ export function useAgentChat<
customTransportRef.current = new WebSocketChatTransport<ChatMessage>({
agent: agentRef.current,
activeRequestIds: localRequestIdsRef.current,
cancelOnClientAbort,
prepareBody: async ({ messages: msgs, trigger, messageId }) => {
// Start with the top-level body option (static or dynamic)
let extraBody: Record<string, unknown> = {};
Expand Down Expand Up @@ -928,6 +939,7 @@ export function useAgentChat<
// Always point the transport at the latest socket so sends/listeners
// go through the current connection after _pk changes.
customTransportRef.current.agent = agentRef.current;
customTransportRef.current.setCancelOnClientAbort(cancelOnClientAbort);
const customTransport = customTransportRef.current;

// Use a stable Chat ID that doesn't change when _pk changes.
Expand Down Expand Up @@ -1020,6 +1032,7 @@ export function useAgentChat<

const stopWithToolContinuationAbort: typeof stop = useCallback(async () => {
try {
customTransport.cancelActiveServerTurn();
await stop();
} finally {
customTransport.abortActiveToolContinuation();
Expand Down Expand Up @@ -1722,6 +1735,7 @@ export function useAgentChat<
streamId: data.id,
messageId: nanoid()
}).state;
customTransport.observeServerTurn(data.id);
setIsServerStreaming(true);
agentRef.current.send(
JSON.stringify({
Expand Down Expand Up @@ -1764,6 +1778,7 @@ export function useAgentChat<
}

if (data.done) {
customTransport.handleServerTurnCompleted(data.id);
restoreProtectedStreamingAssistant(localResponseIds.get(data.id));
localResponseIds.delete(data.id);
localRequestIdsRef.current.delete(data.id);
Expand Down Expand Up @@ -1820,6 +1835,9 @@ export function useAgentChat<
if (data.done || data.replayComplete) {
pendingReplayResumeRequestIdsRef.current.delete(data.id);
}
if (data.done) {
customTransport.handleServerTurnCompleted(data.id);
}

const result = broadcastTransition(streamStateRef.current, {
type: "response",
Expand Down
67 changes: 61 additions & 6 deletions packages/ai-chat/src/tests/ws-transport-abort.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { describe, it, expect } from "vitest";
import type { UIMessage as ChatMessage, UIMessageChunk } from "ai";
import { connectChatWS } from "./test-utils";
import { WebSocketChatTransport } from "../ws-chat-transport";
import { MessageType, type OutgoingMessage } from "../types";

function connectSlowStream(room: string) {
return connectChatWS(`/agents/slow-stream-agent/${room}`);
Expand Down Expand Up @@ -134,7 +135,7 @@ describe("WebSocketChatTransport abort", () => {
}
});

it("stream.cancel() sends cancel to server and terminates", async () => {
it("stream.cancel() terminates the local stream", async () => {
const room = crypto.randomUUID();
const { ws } = await connectSlowStream(room);
await new Promise((r) => setTimeout(r, 50));
Expand All @@ -159,15 +160,15 @@ describe("WebSocketChatTransport abort", () => {
// Let a few chunks arrive
await new Promise((r) => setTimeout(r, 200));

// cancel() should not hang — it sends CF_AGENT_CHAT_REQUEST_CANCEL
// and terminates the stream
// cancel() should not hang — generic stream cleanup is local-only by
// default, but it still terminates the client stream.
await expect(withTimeout(stream.cancel(), 2000)).resolves.toBeUndefined();
} finally {
ws.close(1000);
}
});

it("keeps requestId in activeRequestIds after abort until server sends done", async () => {
it("leaves the server stream resumable after default client abort", async () => {
const room = crypto.randomUUID();
const { ws } = await connectSlowStream(room);
await new Promise((r) => setTimeout(r, 50));
Expand All @@ -178,6 +179,60 @@ describe("WebSocketChatTransport abort", () => {
agent: ws,
activeRequestIds
});
ws.addEventListener("message", (event) => {
const data = JSON.parse(
event.data as string
) as OutgoingMessage<ChatMessage>;
if (data.type === MessageType.CF_AGENT_STREAM_RESUMING) {
transport.handleStreamResuming(data);
} else if (data.type === MessageType.CF_AGENT_STREAM_RESUME_NONE) {
transport.handleStreamResumeNone();
}
});

const abortController = new AbortController();
const stream = await transport.sendMessages({
chatId: "chat",
messages: [userMessage],
abortSignal: abortController.signal,
trigger: "submit-message",
body: {
format: "plaintext",
chunkCount: 20,
chunkDelayMs: 50
}
});

const reader = stream.getReader();
await new Promise((r) => setTimeout(r, 200));
abortController.abort();
await readUntilDoneOrAbort(reader, 5000);

expect(activeRequestIds.size).toBe(0);

const resumed = await transport.reconnectToStream({ chatId: "chat" });
expect(resumed).not.toBeNull();
if (!resumed) throw new Error("Expected stream to resume");

const chunks = await collectChunks(resumed, 5000);
expect(chunks.length).toBeGreaterThan(0);
} finally {
ws.close(1000);
}
});

it("keeps requestId in activeRequestIds after abort when cancelOnClientAbort is true", async () => {
const room = crypto.randomUUID();
const { ws } = await connectSlowStream(room);
await new Promise((r) => setTimeout(r, 50));

try {
const activeRequestIds = new Set<string>();
const transport = new WebSocketChatTransport<ChatMessage>({
agent: ws,
activeRequestIds,
cancelOnClientAbort: true
});
const abortController = new AbortController();

const stream = await transport.sendMessages({
Expand All @@ -204,8 +259,8 @@ describe("WebSocketChatTransport abort", () => {
// Abort mid-stream
abortController.abort();

// After abort, requestId must still be in activeRequestIds so that
// onAgentMessage skips in-flight server chunks (issue #1100).
// After server cancellation, requestId must still be in activeRequestIds
// so that onAgentMessage skips in-flight server chunks (issue #1100).
expect(activeRequestIds.has(requestId)).toBe(true);

// Drain the stream (it errors with AbortError)
Expand Down
Loading
Loading