Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/quiet-tools-stream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/ai": minor
---

Add `suppressToolInputDeltas` to `DurableAgent.stream()` to avoid writing incremental tool input deltas to UI streams while preserving final tool inputs and execution state.
106 changes: 106 additions & 0 deletions packages/ai/src/agent/do-stream-step.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,73 @@ import { describe, expect, it } from 'vitest';
import { doStreamStep, normalizeFinishReason } from './do-stream-step.js';
import { safeParseToolCallInput } from './safe-parse-tool-call-input.js';

function createToolInputDeltaModel() {
return async () =>
({
provider: 'mock',
modelId: 'mock-tool-input-deltas',
doStream: async () => ({
stream: new ReadableStream({
start(controller) {
for (const part of [
{ type: 'stream-start', warnings: [] },
{
type: 'response-metadata',
id: 'response-1',
modelId: 'mock-tool-input-deltas',
timestamp: new Date(),
},
{
type: 'tool-input-start',
id: 'call-1',
toolName: 'writeFile',
},
{
type: 'tool-input-delta',
id: 'call-1',
delta: '{"path":"src/App.tsx"',
},
{
type: 'tool-input-delta',
id: 'call-1',
delta: ',"content":"x"}',
},
{ type: 'tool-input-end', id: 'call-1' },
{
type: 'tool-call',
toolCallId: 'call-1',
toolName: 'writeFile',
input: '{"path":"src/App.tsx","content":"x"}',
},
{
type: 'finish',
finishReason: { type: 'tool-calls' },
usage: {
inputTokens: { total: 1 },
outputTokens: { total: 1 },
},
},
]) {
controller.enqueue(part);
}
controller.close();
},
}),
}),
}) as any;
}

function createChunkCollector() {
const chunks: any[] = [];
const writable = new WritableStream({
write(chunk) {
chunks.push(chunk);
},
});

return { chunks, writable };
}

describe('normalizeFinishReason', () => {
describe('string finish reasons', () => {
it('should pass through "stop"', () => {
Expand Down Expand Up @@ -198,4 +265,43 @@ describe('doStreamStep', () => {
})
);
});

it('can suppress tool input deltas without dropping final tool input', async () => {
const { chunks, writable } = createChunkCollector();

const result = await doStreamStep(
[{ role: 'user', content: [{ type: 'text', text: 'write a file' }] }],
createToolInputDeltaModel(),
writable,
[],
{ suppressToolInputDeltas: true }
);

expect(chunks.map((chunk) => chunk.type)).toEqual([
'start-step',
'tool-input-start',
'tool-input-available',
'finish-step',
]);
expect(result.toolCalls[0]?.input).toBe(
'{"path":"src/App.tsx","content":"x"}'
);
expect(result.step.toolCalls[0]?.input).toEqual({
path: 'src/App.tsx',
content: 'x',
});
});

it('streams tool input deltas by default', async () => {
const { chunks, writable } = createChunkCollector();

await doStreamStep(
[{ role: 'user', content: [{ type: 'text', text: 'write a file' }] }],
createToolInputDeltaModel(),
writable,
[]
);

expect(chunks.map((chunk) => chunk.type)).toContain('tool-input-delta');
});
});
12 changes: 11 additions & 1 deletion packages/ai/src/agent/do-stream-step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import {
type ToolSet,
type UIMessageChunk,
} from 'ai';
import { getErrorMessage } from '../get-error-message.js';
import type {
ProviderOptions,
StreamTextTransform,
TelemetrySettings,
} from './durable-agent.js';
import { getErrorMessage } from '../get-error-message.js';
import { safeParseToolCallInput } from './safe-parse-tool-call-input.js';
import { recordSpan } from './telemetry.js';
import type { CompatibleLanguageModel } from './types.js';
Expand Down Expand Up @@ -71,6 +71,11 @@ export interface DoStreamStepOptions {
providerOptions?: ProviderOptions;
toolChoice?: ToolChoice<ToolSet>;
includeRawChunks?: boolean;
/**
* If true, suppresses incremental tool-input-delta UIMessageChunks while
* preserving raw stream processing and final tool input availability.
*/
suppressToolInputDeltas?: boolean;
experimental_telemetry?: TelemetrySettings;
transforms?: Array<StreamTextTransform<ToolSet>>;
responseFormat?: LanguageModelV3CallOptions['responseFormat'];
Expand Down Expand Up @@ -221,6 +226,7 @@ export async function doStreamStep(
const chunks: LanguageModelV3StreamPart[] = [];
const includeRawChunks = options?.includeRawChunks ?? false;
const collectUIChunks = options?.collectUIChunks ?? false;
const suppressToolInputDeltas = options?.suppressToolInputDeltas ?? false;
const uiChunks: UIMessageChunk[] = [];
let msToFirstChunk: number | undefined;

Expand Down Expand Up @@ -437,6 +443,10 @@ export async function doStreamStep(
}

case 'tool-input-delta': {
if (suppressToolInputDeltas) {
break;
}

controller.enqueue({
type: 'tool-input-delta',
toolCallId: part.id,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import type { LanguageModelV3 } from '@ai-sdk/provider';
import { describe, expect, it, vi } from 'vitest';

vi.mock('./stream-text-iterator.js', () => ({
streamTextIterator: vi.fn(),
}));

const { DurableAgent } = await import('./durable-agent.js');

function createMockModel(): LanguageModelV3 {
return {
specificationVersion: 'v3' as const,
provider: 'test',
modelId: 'test-model',
doGenerate: vi.fn(),
doStream: vi.fn(),
supportedUrls: {},
};
}

describe('DurableAgent suppressToolInputDeltas', () => {
it('passes suppressToolInputDeltas to streamTextIterator', async () => {
const mockModel = createMockModel();
const agent = new DurableAgent({
model: async () => mockModel,
tools: {},
});
const mockWritable = new WritableStream({
write: vi.fn(),
close: vi.fn(),
});

const { streamTextIterator } = await import('./stream-text-iterator.js');
const mockIterator = {
next: vi.fn().mockResolvedValueOnce({ done: true, value: [] }),
};
vi.mocked(streamTextIterator).mockReturnValue(mockIterator as any);

await agent.stream({
messages: [{ role: 'user', content: 'test' }],
writable: mockWritable,
suppressToolInputDeltas: true,
});

expect(streamTextIterator).toHaveBeenCalledWith(
expect.objectContaining({
suppressToolInputDeltas: true,
})
);
});
});
10 changes: 10 additions & 0 deletions packages/ai/src/agent/durable-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,15 @@ export interface DurableAgentStreamOptions<
*/
includeRawChunks?: boolean;

/**
* If true, suppresses incremental `tool-input-delta` UI stream chunks.
*
* This only affects UI stream persistence/transport. The raw model stream,
* final parsed tool input, tool execution, and conversation state are
* preserved.
*/
suppressToolInputDeltas?: boolean;

/**
* A function that attempts to repair a tool call that failed to parse.
*/
Expand Down Expand Up @@ -1007,6 +1016,7 @@ export class DurableAgent<TBaseTools extends ToolSet = ToolSet> {
experimental_context: experimentalContext,
experimental_telemetry: effectiveTelemetry,
includeRawChunks: options.includeRawChunks ?? false,
suppressToolInputDeltas: options.suppressToolInputDeltas ?? false,
experimental_transform: options.experimental_transform as
| StreamTextTransform<ToolSet>
| Array<StreamTextTransform<ToolSet>>,
Expand Down
7 changes: 7 additions & 0 deletions packages/ai/src/agent/stream-text-iterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export async function* streamTextIterator({
experimental_context,
experimental_telemetry,
includeRawChunks = false,
suppressToolInputDeltas = false,
experimental_transform,
responseFormat,
collectUIChunks = false,
Expand All @@ -99,6 +100,11 @@ export async function* streamTextIterator({
experimental_context?: unknown;
experimental_telemetry?: TelemetrySettings;
includeRawChunks?: boolean;
/**
* If true, suppresses incremental tool-input-delta UIMessageChunks while
* preserving raw stream processing and final tool input availability.
*/
suppressToolInputDeltas?: boolean;
experimental_transform?:
| StreamTextTransform<ToolSet>
| Array<StreamTextTransform<ToolSet>>;
Expand Down Expand Up @@ -305,6 +311,7 @@ export async function* streamTextIterator({
...currentGenerationSettings,
toolChoice: currentToolChoice,
includeRawChunks,
suppressToolInputDeltas,
experimental_telemetry,
transforms,
responseFormat,
Expand Down
Loading