Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/fix-think-agent-tool-non-chat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@cloudflare/ai-chat": patch
"@cloudflare/think": patch
---

Allow Think agent-tool children to complete without emitting assistant text. Non-chat tool-step agents can now provide structured output through `getAgentToolOutput`, with summaries derived from assistant text, string output, structured output, or an empty string.

Fix `useAgentChat().isServerStreaming` cleanup when a resumed stream first enters the fallback observer path and later becomes transport-owned.
11 changes: 11 additions & 0 deletions design/agent-tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ Durable Object RPC. If a parent restarts while a run is non-terminal, V1 replays
stored chunks and marks the parent row `interrupted`; live-tail reattach is
deferred.

Think child completion is not tied to assistant text. Assistant text is only the
default summary source for chat-like helper agents. Workflow-style Think
children can complete without text chunks and can expose durable structured
output through `getAgentToolOutput()` plus an optional `getAgentToolSummary()`
override. This keeps execution, observation, and result synthesis separate:
finishing the turn determines terminal status, child chunks are retained for UI
observation, and output/summary hooks determine what the parent receives. The
output hook is evaluated immediately after the child turn resolves, so workflow
children should commit durable output before the turn finishes and keep summaries
small enough for display.

## Tradeoffs

- Runs and facets are retained by default so refresh, drill-in, and debugging
Expand Down
24 changes: 24 additions & 0 deletions docs/agent-tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,30 @@ The generated tool calls `this.runAgentTool(ChildAgent, ...)`, streams
summary to the parent model. If the run fails, aborts, or is interrupted, the
tool returns a structured failure instead of an empty success value.

For `Think` children that do workflow-style work without user-facing assistant
text, override `getAgentToolOutput()` and, if needed, `getAgentToolSummary()`.
Assistant text remains the default summary when present, but a Think agent-tool
run can complete successfully without emitting text chunks.
Persist any structured output before the child turn finishes, because
`getAgentToolOutput()` is read as soon as `saveMessages()` resolves. Keep
`getAgentToolSummary()` concise for display; the full structured value is stored
separately as the tool output.

```ts
export class Extractor extends Think<Env> {
protected override getAgentToolOutput(runId: string) {
const rows = this.sql<{ result_json: string }>`
SELECT result_json FROM extraction_runs WHERE id = ${runId}
`;
return rows[0] ? JSON.parse(rows[0].result_json) : undefined;
}

protected override getAgentToolSummary(_runId: string, output: unknown) {
return output ? "Extraction complete" : "";
}
}
```

## Run an Agent tool imperatively

Use `runAgentTool()` for deterministic workflows, scheduled work, HTTP
Expand Down
109 changes: 109 additions & 0 deletions packages/ai-chat/src/react-tests/use-agent-chat.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3899,6 +3899,115 @@ describe("useAgentChat isServerStreaming / isStreaming (issue #1226)", () => {
.toHaveTextContent("false");
});

it("isServerStreaming resets when a fallback-observed stream later becomes transport-owned", async () => {
const { agent, target } = createAgentWithTarget({
name: "server-stream-fallback-to-transport",
url: "ws://localhost:3000/agents/chat/server-stream-fallback-to-transport?_pk=abc"
});

let chatInstance: ReturnType<typeof useAgentChat> | null = null;

const TestComponent = () => {
const chat = useAgentChat({
agent,
getInitialMessages: null,
messages: [] as UIMessage[]
});
chatInstance = chat;
return (
<div data-testid="isServerStreaming">
{String(chat.isServerStreaming)}
</div>
);
};

const screen = await act(async () => {
const screen = render(<TestComponent />, {
wrapper: ({ children }) => (
<StrictMode>
<Suspense fallback="Loading...">{children}</Suspense>
</StrictMode>
)
});
await sleep(10);
return screen;
});

await act(async () => {
dispatch(target, { type: "cf_agent_stream_resume_none" });
await sleep(10);
});

await act(async () => {
dispatch(target, {
type: "cf_agent_stream_resuming",
id: "fallback-to-transport-1"
});
await sleep(10);
});

await expect
.element(screen.getByTestId("isServerStreaming"))
.toHaveTextContent("true");

await act(async () => {
void chatInstance!.resumeStream();
await sleep(10);
});

await act(async () => {
dispatch(target, {
type: "cf_agent_stream_resuming",
id: "fallback-to-transport-1"
});
await sleep(10);
});

await act(async () => {
dispatch(target, {
type: "cf_agent_use_chat_response",
id: "fallback-to-transport-1",
body: "",
done: false,
replay: true,
replayComplete: true
});
await sleep(10);
});

await expect
.element(screen.getByTestId("isServerStreaming"))
.toHaveTextContent("true");

await act(async () => {
dispatch(target, {
type: "cf_agent_use_chat_response",
id: "fallback-to-transport-1",
body: '{"type":"text-delta","id":"t1","delta":"live"}',
done: false
});
await sleep(10);
});

await expect
.element(screen.getByTestId("isServerStreaming"))
.toHaveTextContent("true");

await act(async () => {
dispatch(target, {
type: "cf_agent_use_chat_response",
id: "fallback-to-transport-1",
body: "",
done: true
});
await sleep(10);
});

await expect
.element(screen.getByTestId("isServerStreaming"))
.toHaveTextContent("false");
});

it("isServerStreaming works with continuation broadcasts", async () => {
const { agent, target } = createAgentWithTarget({
name: "server-stream-continuation",
Expand Down
8 changes: 7 additions & 1 deletion packages/ai-chat/src/react.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -1776,8 +1776,14 @@ export function useAgentChat<
if (data.done || data.replayComplete) {
pendingReplayResumeRequestIdsRef.current.delete(data.id);
}

if (data.done) {
if (
streamStateRef.current.status === "observing" &&
streamStateRef.current.streamId === data.id
) {
streamStateRef.current = { status: "idle" };
setIsServerStreaming(false);
}
customTransport.handleServerTurnCompleted(data.id);
restoreProtectedStreamingAssistant(localResponseIds.get(data.id));
localResponseIds.delete(data.id);
Expand Down
82 changes: 82 additions & 0 deletions packages/think/src/tests/agent-tools.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ type AgentToolInspection = Awaited<
type ThinkAgentToolTestStub = {
inspectAgentToolRun(runId: string): Promise<AgentToolInspection>;
seedAgentToolLastErrorForTest(runId: string, error: string): Promise<void>;
setAgentToolOutputForTest(runId: string, output: unknown): Promise<void>;
clearAgentToolOutputForTest(runId: string): Promise<void>;
setStripTextResponseForTest(strip: boolean): Promise<void>;
setBeforeStepAsyncDelay(ms: number): Promise<void>;
resetTurnStateForTest(): Promise<void>;
startAgentToolRun(
input: unknown,
options: { runId: string }
Expand Down Expand Up @@ -48,6 +53,83 @@ async function waitForAgentToolRun(
}

describe("Think agent tools", () => {
it("uses assistant text as the default agent-tool summary", async () => {
const agent = await freshAgent();
const runId = crypto.randomUUID();

await agent.startAgentToolRun("chat-like probe", { runId });
const inspection = await waitForAgentToolRun(agent, runId);

expect(inspection).toMatchObject({
runId,
status: "completed",
summary: "Hello from the assistant!"
});
expect(inspection?.error).toBeUndefined();
});

it("completes when a non-chat agent-tool run emits no assistant text", async () => {
const agent = await freshAgent();
const runId = crypto.randomUUID();

await agent.setStripTextResponseForTest(true);
await agent.startAgentToolRun("non-chat probe", { runId });
const inspection = await waitForAgentToolRun(agent, runId);

expect(inspection).toMatchObject({
runId,
status: "completed",
summary: ""
});
expect(inspection?.error).toBeUndefined();
});

it("returns structured output for a non-chat agent-tool run", async () => {
const agent = await freshAgent();
const runId = crypto.randomUUID();

await agent.setStripTextResponseForTest(true);
await agent.setAgentToolOutputForTest(runId, {
ok: true,
value: "workflow-result"
});
await agent.startAgentToolRun("structured non-chat probe", { runId });
const inspection = await waitForAgentToolRun(agent, runId);

expect(inspection).toMatchObject({
runId,
status: "completed",
output: { ok: true, value: "workflow-result" },
summary: '{"ok":true,"value":"workflow-result"}'
});

await agent.clearAgentToolOutputForTest(runId);
await expect(agent.inspectAgentToolRun(runId)).resolves.toMatchObject({
runId,
status: "completed",
output: { ok: true, value: "workflow-result" },
summary: '{"ok":true,"value":"workflow-result"}'
});
});

it("marks skipped agent-tool turns as errors", async () => {
const agent = await freshAgent();
const runId = crypto.randomUUID();

await agent.setBeforeStepAsyncDelay(50);
await agent.startAgentToolRun("skipped probe", { runId });
await new Promise((resolve) => setTimeout(resolve, 5));
await agent.resetTurnStateForTest();

const inspection = await waitForAgentToolRun(agent, runId);

expect(inspection).toMatchObject({
runId,
status: "error",
error: "Agent tool run was skipped before the child could finish."
});
});

it("cleans in-memory agent-tool bookkeeping after a run completes", async () => {
const agent = await freshAgent();
const runId = crypto.randomUUID();
Expand Down
Loading
Loading