From 8dbc11aa9edcde1dc4d6ce331a758aefe54751ce Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 5 May 2026 11:49:48 +0900 Subject: [PATCH] [ai] Reduce doStreamStep step boundary payload MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move chunksToStep out of the step boundary, returning minimal raw aggregates (text, reasoning, files, sources, warnings, response metadata, finishReason, usage, providerMetadata) and reconstructing the full StepResult in streamTextIterator (workflow context). Also drops the separate `finish` field from the return — its data was fully duplicated by the StepResult fields the caller already needed. The user-facing StepResult exposed via `onStepFinish` and `steps[]` is unchanged. The event-log payload no longer carries: - StepResult's redundant tool-call lists (`dynamicToolCalls`, `staticToolCalls`, `staticToolResults`, `dynamicToolResults`) - The always-empty `toolResults` and `response.messages` arrays - The duplicate `content` and `reasoningText` aggregates - The dual base64 + Uint8Array file encoding (now expanded only after crossing the boundary) - `request.body` (a JSON dump of the input prompt the caller already has) and `finish` (its fields all appear on `step`) Closes #1929. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/tall-mice-sink.md | 5 + packages/ai/src/agent/do-stream-step.test.ts | 4 +- packages/ai/src/agent/do-stream-step.ts | 447 ++++++++---------- .../ai/src/agent/stream-text-iterator.test.ts | 323 ++++++------- packages/ai/src/agent/stream-text-iterator.ts | 181 ++++++- 5 files changed, 533 insertions(+), 427 deletions(-) create mode 100644 .changeset/tall-mice-sink.md diff --git a/.changeset/tall-mice-sink.md b/.changeset/tall-mice-sink.md new file mode 100644 index 0000000000..5ffda6547a --- /dev/null +++ b/.changeset/tall-mice-sink.md @@ -0,0 +1,5 @@ +--- +'@workflow/ai': patch +--- + +Reduce DurableAgent step boundary payload by reconstructing `StepResult` outside the step instead of inside. diff --git a/packages/ai/src/agent/do-stream-step.test.ts b/packages/ai/src/agent/do-stream-step.test.ts index 28f6122881..acb2839cfa 100644 --- a/packages/ai/src/agent/do-stream-step.test.ts +++ b/packages/ai/src/agent/do-stream-step.test.ts @@ -187,8 +187,8 @@ describe('doStreamStep', () => { { sendStart: false } ); - expect(result.step.toolCalls).toHaveLength(1); - expect(result.step.toolCalls[0]?.input).toBe('{"city":"San Francisco"'); + expect(result.toolCalls).toHaveLength(1); + expect(result.toolCalls[0]?.input).toBe('{"city":"San Francisco"'); expect(writtenChunks).toContainEqual( expect.objectContaining({ type: 'tool-input-available', diff --git a/packages/ai/src/agent/do-stream-step.ts b/packages/ai/src/agent/do-stream-step.ts index 15ba779dad..8a064df220 100644 --- a/packages/ai/src/agent/do-stream-step.ts +++ b/packages/ai/src/agent/do-stream-step.ts @@ -10,18 +10,17 @@ import { type FinishReason, gateway, generateId, - type StepResult, type StopCondition, type ToolChoice, type ToolSet, type UIMessageChunk, } from 'ai'; +import { getErrorMessage } from '../get-error-message.js'; import type { ProviderOptions, StreamTextTransform, TelemetrySettings, } from './durable-agent.js'; -import { getErrorMessage } from '../get-error-message.js'; import { safeParseToolCallInput } from './safe-parse-tool-call-input.js'; import { recordSpan } from './telemetry.js'; import type { CompatibleLanguageModel } from './types.js'; @@ -52,6 +51,68 @@ function uint8ArrayToBase64(data: Uint8Array): string { return btoa(binary); } +/** + * Reasoning part captured during streaming, in source order. + */ +export interface RawReasoningPart { + text: string; + providerMetadata?: SharedV3ProviderOptions; +} + +/** + * File chunk captured during streaming. The `data` field is the raw value + * emitted by the model — base64/URL string, URL object, or Uint8Array. + */ +export interface RawFile { + mediaType: string; + data: Uint8Array | string | URL; +} + +/** + * Response metadata extracted from the model's `response-metadata` chunk. + */ +export interface RawResponseMetadata { + id?: string; + modelId?: string; + timestamp?: Date | string; +} + +/** + * Minimal aggregates needed to reconstruct a `StepResult` outside the step + * boundary. By returning only these fields (instead of a fully-populated + * StepResult), we avoid serializing the redundant copies the AI SDK keeps + * in StepResult — `toolCalls`/`dynamicToolCalls`/`staticToolCalls`, + * `content`, `reasoningText`, the always-empty `*ToolResults` arrays, the + * dual base64+uint8Array file encoding, and `request.body` (a JSON dump of + * the input prompt). The caller reconstructs the full StepResult from + * these fields plus the conversation prompt it already holds. + */ +export interface DoStreamStepRawResult { + text: string; + reasoning: RawReasoningPart[]; + files: RawFile[]; + sources: Array>; + warnings?: Extract< + LanguageModelV3StreamPart, + { type: 'stream-start' } + >['warnings']; + responseMetadata?: RawResponseMetadata; + /** Raw finish reason as emitted by the model (V3 may emit object or string). */ + rawFinishReason: unknown; + usage?: FinishPart['usage']; + providerMetadata?: SharedV3ProviderOptions; +} + +/** + * Result returned across the `doStreamStep` step boundary. + */ +export interface DoStreamStepResult { + toolCalls: LanguageModelV3ToolCall[]; + raw: DoStreamStepRawResult; + uiChunks: UIMessageChunk[] | undefined; + providerExecutedToolResults: Map; +} + /** * Options for the doStreamStep function. */ @@ -111,7 +172,7 @@ export async function doStreamStep( writable: WritableStream, tools?: LanguageModelV3CallOptions['tools'], options?: DoStreamStepOptions -) { +): Promise { 'use step'; let model: CompatibleLanguageModel | undefined; @@ -218,7 +279,29 @@ export async function doStreamStep( string, ProviderExecutedToolResult >(); - const chunks: LanguageModelV3StreamPart[] = []; + + // Raw aggregates streamed in alongside chunks. We collect these here + // so we don't have to retain the full V3 chunk array, and so callers + // outside the step boundary can rebuild a StepResult without paying + // for StepResult's redundant fields across the boundary. + let textBuffer = ''; + const reasoningById = new Map< + string, + { text: string; providerMetadata?: SharedV3ProviderOptions } + >(); + const reasoningOrder: string[] = []; + const files: RawFile[] = []; + const sources: Array< + Extract + > = []; + let warnings: + | Extract< + LanguageModelV3StreamPart, + { type: 'stream-start' } + >['warnings'] + | undefined; + let responseMetadata: RawResponseMetadata | undefined; + const includeRawChunks = options?.includeRawChunks ?? false; const collectUIChunks = options?.collectUIChunks ?? false; const uiChunks: UIMessageChunk[] = []; @@ -253,23 +336,86 @@ export async function doStreamStep( if (msToFirstChunk === undefined) { msToFirstChunk = Date.now() - startTime; } - if (chunk.type === 'tool-call') { - toolCalls.push({ - ...chunk, - input: chunk.input || '{}', - }); - } else if (chunk.type === 'tool-result') { - // In V3, all tool-result stream parts are provider-executed by definition - providerExecutedToolResults.set(chunk.toolCallId, { - toolCallId: chunk.toolCallId, - toolName: chunk.toolName, - result: chunk.result, - isError: chunk.isError, - }); - } else if (chunk.type === 'finish') { - finish = chunk; + switch (chunk.type) { + case 'tool-call': + toolCalls.push({ + ...chunk, + input: chunk.input || '{}', + }); + break; + case 'tool-result': + // In V3, all tool-result stream parts are provider-executed by definition + providerExecutedToolResults.set(chunk.toolCallId, { + toolCallId: chunk.toolCallId, + toolName: chunk.toolName, + result: chunk.result, + isError: chunk.isError, + }); + break; + case 'finish': + finish = chunk; + break; + case 'text-delta': + textBuffer += chunk.delta; + break; + case 'reasoning-start': + reasoningById.set(chunk.id, { + text: '', + providerMetadata: chunk.providerMetadata as + | SharedV3ProviderOptions + | undefined, + }); + reasoningOrder.push(chunk.id); + break; + case 'reasoning-delta': { + const entry = reasoningById.get(chunk.id); + if (entry) { + entry.text += chunk.delta; + if (chunk.providerMetadata != null) { + entry.providerMetadata = + chunk.providerMetadata as SharedV3ProviderOptions; + } + } else { + // Delta without a preceding start — still collect it + reasoningById.set(chunk.id, { + text: chunk.delta, + providerMetadata: chunk.providerMetadata as + | SharedV3ProviderOptions + | undefined, + }); + reasoningOrder.push(chunk.id); + } + break; + } + case 'reasoning-end': { + // Mirror the AI SDK's behavior: reasoning-end can carry final providerMetadata. + const entry = reasoningById.get(chunk.id); + if (entry && chunk.providerMetadata != null) { + entry.providerMetadata = + chunk.providerMetadata as SharedV3ProviderOptions; + } + break; + } + case 'file': + files.push({ + mediaType: chunk.mediaType, + data: chunk.data, + }); + break; + case 'source': + sources.push(chunk); + break; + case 'stream-start': + warnings = chunk.warnings; + break; + case 'response-metadata': + responseMetadata = { + id: chunk.id, + modelId: chunk.modelId, + timestamp: chunk.timestamp, + }; + break; } - chunks.push(chunk); controller.enqueue(chunk); }, }) @@ -532,19 +678,27 @@ export async function doStreamStep( ) .pipeTo(writable, { preventClose: true }); - const step = chunksToStep(chunks, toolCalls, conversationPrompt, finish); + // Materialize the reasoning aggregate in source order. Captured here + // so we can both compute telemetry and ship it across the boundary. + const reasoningParts: RawReasoningPart[] = reasoningOrder.map((id) => { + const entry = reasoningById.get(id)!; + return { + text: entry.text, + ...(entry.providerMetadata != null + ? { providerMetadata: entry.providerMetadata } + : {}), + }; + }); + + const reasoningTextForTelemetry = reasoningParts + .map((r) => r.text) + .join(''); // ── Record response-time telemetry attributes on the span ── if (span) { const msToFinish = Date.now() - startTime; const finishReason = normalizeFinishReason(finish?.finishReason); - // Extract response metadata from collected chunks - const responseMetadata = chunks.find( - (c): c is Extract => - c.type === 'response-metadata' - ); - // Usage attributes (not gated) const inputTokens = finish?.usage?.inputTokens?.total ?? 0; const outputTokens = finish?.usage?.outputTokens?.total ?? 0; @@ -598,14 +752,12 @@ export async function doStreamStep( 'gen_ai.usage.output_tokens': outputTokens, }; - // Output-gated response attributes — reuse aggregated values - // from chunksToStep to avoid redundant iteration over chunks. if (telemetry?.recordOutputs !== false) { - if (step.text) { - responseAttrs['ai.response.text'] = step.text; + if (textBuffer) { + responseAttrs['ai.response.text'] = textBuffer; } - if (step.reasoningText) { - responseAttrs['ai.response.reasoning'] = step.reasoningText; + if (reasoningTextForTelemetry) { + responseAttrs['ai.response.reasoning'] = reasoningTextForTelemetry; } if (toolCalls.length > 0) { responseAttrs['ai.response.toolCalls'] = JSON.stringify(toolCalls); @@ -615,10 +767,23 @@ export async function doStreamStep( span.setAttributes(responseAttrs); } + const raw: DoStreamStepRawResult = { + text: textBuffer, + reasoning: reasoningParts, + files, + sources, + ...(warnings !== undefined ? { warnings } : {}), + ...(responseMetadata !== undefined ? { responseMetadata } : {}), + rawFinishReason: finish?.finishReason, + ...(finish?.usage !== undefined ? { usage: finish.usage } : {}), + ...(finish?.providerMetadata !== undefined + ? { providerMetadata: finish.providerMetadata } + : {}), + }; + return { toolCalls, - finish, - step, + raw, uiChunks: collectUIChunks ? uiChunks : undefined, providerExecutedToolResults, }; @@ -657,215 +822,3 @@ export function normalizeFinishReason(rawFinishReason: unknown): FinishReason { } return 'other'; } - -// This is a stand-in for logic in the AI-SDK streamText code which aggregates -// chunks into a single step result. -function chunksToStep( - chunks: LanguageModelV3StreamPart[], - toolCalls: LanguageModelV3ToolCall[], - conversationPrompt: LanguageModelV3Prompt, - finish?: FinishPart -): StepResult { - // Transform chunks to a single step result - const text = chunks - .filter( - (chunk): chunk is Extract => - chunk.type === 'text-delta' - ) - .map((chunk) => chunk.delta) - .join(''); - - // Collect reasoning parts by ID, mirroring how the AI SDK aggregates them: - // reasoning-start creates the part (with providerMetadata), reasoning-delta - // appends text, reasoning-end finalizes. For encrypted reasoning (e.g. OpenAI - // o-series), there may be no deltas — only start+end with providerMetadata - // carrying the itemId needed for Responses API item references. - const reasoningById = new Map< - string, - { text: string; providerMetadata?: unknown } - >(); - for (const chunk of chunks) { - if (chunk.type === 'reasoning-start') { - reasoningById.set(chunk.id, { - text: '', - providerMetadata: chunk.providerMetadata, - }); - } else if (chunk.type === 'reasoning-delta') { - const entry = reasoningById.get(chunk.id); - if (entry) { - entry.text += chunk.delta; - if (chunk.providerMetadata != null) { - entry.providerMetadata = chunk.providerMetadata; - } - } else { - // Delta without a preceding start — still collect it - reasoningById.set(chunk.id, { - text: chunk.delta, - providerMetadata: chunk.providerMetadata, - }); - } - } else if (chunk.type === 'reasoning-end') { - // Merge reasoning-end metadata, mirroring the AI SDK's behavior - // where reasoning-end can carry final providerMetadata. - const entry = reasoningById.get(chunk.id); - if (entry && chunk.providerMetadata != null) { - entry.providerMetadata = chunk.providerMetadata; - } - } - } - const reasoning = Array.from(reasoningById.values()); - - const reasoningText = reasoning.map((r) => r.text).join(''); - - // Extract warnings from stream-start chunk - const streamStart = chunks.find( - (chunk): chunk is Extract => - chunk.type === 'stream-start' - ); - - // Extract response metadata from response-metadata chunk - const responseMetadata = chunks.find( - (chunk): chunk is Extract => - chunk.type === 'response-metadata' - ); - - // Extract files from file chunks - // File chunks contain mediaType and data (base64 string or Uint8Array) - // GeneratedFile requires both base64 and uint8Array properties - const files = chunks - .filter( - (chunk): chunk is Extract => - chunk.type === 'file' - ) - .map((chunk) => { - const data = chunk.data; - // If data is already a Uint8Array, convert to base64; otherwise use as-is - if (data instanceof Uint8Array) { - // Convert Uint8Array to base64 string - const base64 = uint8ArrayToBase64(data); - return { - mediaType: chunk.mediaType, - base64, - uint8Array: data, - }; - } else { - // Data is base64 string, decode to Uint8Array - const binaryString = atob(data); - const bytes = new Uint8Array(binaryString.length); - for (let i = 0; i < binaryString.length; i++) { - bytes[i] = binaryString.charCodeAt(i); - } - return { - mediaType: chunk.mediaType, - base64: data, - uint8Array: bytes, - }; - } - }); - - // Extract sources from source chunks - const sources = chunks - .filter( - (chunk): chunk is Extract => - chunk.type === 'source' - ) - .map((chunk) => chunk); - - // Extract the raw finish reason from the V3 finish reason object - const v3FinishReason = finish?.finishReason; - const rawFinishReason = - typeof v3FinishReason === 'object' && v3FinishReason !== null - ? (v3FinishReason as { raw?: string }).raw - : typeof v3FinishReason === 'string' - ? v3FinishReason - : undefined; - - const mapToolCall = (toolCall: LanguageModelV3ToolCall) => ({ - type: 'tool-call' as const, - toolCallId: toolCall.toolCallId, - toolName: toolCall.toolName, - input: safeParseToolCallInput(toolCall.input), - dynamic: true as const, - }); - - const stepResult: StepResult = { - stepNumber: 0, // Will be overridden by the caller - model: { - provider: responseMetadata?.modelId?.split(':')[0] ?? 'unknown', - modelId: responseMetadata?.modelId ?? 'unknown', - }, - functionId: undefined, - metadata: undefined, - experimental_context: undefined, - content: [ - ...(text ? [{ type: 'text' as const, text }] : []), - ...toolCalls.map(mapToolCall), - ], - text, - reasoning: reasoning.map((r) => ({ - type: 'reasoning' as const, - text: r.text, - ...(r.providerMetadata != null - ? { providerOptions: r.providerMetadata as SharedV3ProviderOptions } - : {}), - })), - reasoningText: reasoningText || undefined, - files, - sources, - toolCalls: toolCalls.map(mapToolCall), - staticToolCalls: [], - dynamicToolCalls: toolCalls.map(mapToolCall), - toolResults: [], - staticToolResults: [], - dynamicToolResults: [], - finishReason: normalizeFinishReason(finish?.finishReason), - rawFinishReason, - usage: finish?.usage - ? { - inputTokens: finish.usage.inputTokens?.total ?? 0, - inputTokenDetails: { - noCacheTokens: finish.usage.inputTokens?.noCache, - cacheReadTokens: finish.usage.inputTokens?.cacheRead, - cacheWriteTokens: finish.usage.inputTokens?.cacheWrite, - }, - outputTokens: finish.usage.outputTokens?.total ?? 0, - outputTokenDetails: { - textTokens: finish.usage.outputTokens?.text, - reasoningTokens: finish.usage.outputTokens?.reasoning, - }, - totalTokens: - (finish.usage.inputTokens?.total ?? 0) + - (finish.usage.outputTokens?.total ?? 0), - } - : { - inputTokens: 0, - inputTokenDetails: { - noCacheTokens: undefined, - cacheReadTokens: undefined, - cacheWriteTokens: undefined, - }, - outputTokens: 0, - outputTokenDetails: { - textTokens: undefined, - reasoningTokens: undefined, - }, - totalTokens: 0, - }, - warnings: streamStart?.warnings, - request: { - body: JSON.stringify({ - prompt: conversationPrompt, - tools: toolCalls.map(mapToolCall), - }), - }, - response: { - id: responseMetadata?.id ?? 'unknown', - timestamp: responseMetadata?.timestamp ?? new Date(), - modelId: responseMetadata?.modelId ?? 'unknown', - messages: [], - }, - providerMetadata: finish?.providerMetadata || {}, - }; - - return stepResult; -} diff --git a/packages/ai/src/agent/stream-text-iterator.test.ts b/packages/ai/src/agent/stream-text-iterator.test.ts index 90eabc1446..697fc86943 100644 --- a/packages/ai/src/agent/stream-text-iterator.test.ts +++ b/packages/ai/src/agent/stream-text-iterator.test.ts @@ -11,14 +11,20 @@ import type { LanguageModelV3ToolCall, LanguageModelV3ToolResult, LanguageModelV3ToolResultPart, + SharedV3ProviderOptions, } from '@ai-sdk/provider'; -import type { StepResult, ToolSet, UIMessageChunk } from 'ai'; +import type { ToolSet, UIMessageChunk } from 'ai'; import { beforeEach, describe, expect, it, vi } from 'vitest'; -// Mock doStreamStep -vi.mock('./do-stream-step.js', () => ({ - doStreamStep: vi.fn(), -})); +// Mock doStreamStep but keep the rest of the module (notably +// `normalizeFinishReason`, which buildStepResult uses). +vi.mock(import('./do-stream-step.js'), async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + doStreamStep: vi.fn(), + }; +}); // Import after mocking const { streamTextIterator } = await import('./stream-text-iterator.js'); @@ -35,36 +41,38 @@ function createMockWritable(): WritableStream { } /** - * Helper to create a minimal step result for testing + * Helper to create a mock `doStreamStep` result. Inputs are described in + * StepResult-style terms (finishReason, reasoning) and translated to the + * raw aggregate shape that `doStreamStep` actually returns; the iterator + * reconstructs the StepResult via buildStepResult(). */ -function createMockStepResult( - overrides: Partial> = {} -): StepResult { +function createMockResult( + overrides: { + toolCalls?: LanguageModelV3ToolCall[]; + finishReason?: string; + reasoning?: Array<{ + text: string; + providerOptions?: SharedV3ProviderOptions; + }>; + text?: string; + } = {} +) { return { - content: [], - text: '', - reasoning: [], - reasoningText: undefined, - files: [], - sources: [], - toolCalls: [], - staticToolCalls: [], - dynamicToolCalls: [], - toolResults: [], - staticToolResults: [], - dynamicToolResults: [], - finishReason: 'stop', - usage: { inputTokens: 0, outputTokens: 0, totalTokens: 0 }, - warnings: [], - request: { body: '' }, - response: { - id: 'test', - timestamp: new Date(), - modelId: 'test', - messages: [], + toolCalls: overrides.toolCalls ?? [], + raw: { + text: overrides.text ?? '', + reasoning: (overrides.reasoning ?? []).map((r) => ({ + text: r.text, + ...(r.providerOptions != null + ? { providerMetadata: r.providerOptions } + : {}), + })), + files: [], + sources: [], + rawFinishReason: overrides.finishReason, }, - providerMetadata: {}, - ...overrides, + uiChunks: undefined, + providerExecutedToolResults: new Map(), }; } @@ -96,19 +104,19 @@ describe('streamTextIterator', () => { // First call returns tool-calls with providerMetadata // Second call (after tool results) should receive the updated prompt vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [toolCallWithMetadata], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }) + .mockResolvedValueOnce( + createMockResult({ + toolCalls: [toolCallWithMetadata], + finishReason: 'tool-calls', + }) + ) .mockImplementationOnce(async (prompt) => { // Capture the prompt on the second call to verify providerOptions capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -177,18 +185,18 @@ describe('streamTextIterator', () => { }; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [toolCallWithoutMetadata], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }) + .mockResolvedValueOnce( + createMockResult({ + toolCalls: [toolCallWithoutMetadata], + finishReason: 'tool-calls', + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -256,18 +264,18 @@ describe('streamTextIterator', () => { ]; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls, - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }) + .mockResolvedValueOnce( + createMockResult({ + toolCalls, + finishReason: 'tool-calls', + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -358,18 +366,18 @@ describe('streamTextIterator', () => { ]; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls, - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }) + .mockResolvedValueOnce( + createMockResult({ + toolCalls, + finishReason: 'tool-calls', + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -448,18 +456,18 @@ describe('streamTextIterator', () => { }; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [toolCallWithOpenAIMetadata], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }) + .mockResolvedValueOnce( + createMockResult({ + toolCalls: [toolCallWithOpenAIMetadata], + finishReason: 'tool-calls', + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -523,18 +531,18 @@ describe('streamTextIterator', () => { }; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [toolCallWithMixedOpenAIMetadata], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }) + .mockResolvedValueOnce( + createMockResult({ + toolCalls: [toolCallWithMixedOpenAIMetadata], + finishReason: 'tool-calls', + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -600,18 +608,18 @@ describe('streamTextIterator', () => { }; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [toolCallWithMixedProviders], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }) + .mockResolvedValueOnce( + createMockResult({ + toolCalls: [toolCallWithMixedProviders], + finishReason: 'tool-calls', + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -673,24 +681,22 @@ describe('streamTextIterator', () => { }; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [toolCall], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ + .mockResolvedValueOnce( + createMockResult({ + toolCalls: [toolCall], finishReason: 'tool-calls', reasoning: [ { type: 'reasoning', text: 'Let me think about this...' }, { type: 'reasoning', text: 'I should use the test tool.' }, ], - }), - }) + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -751,10 +757,9 @@ describe('streamTextIterator', () => { }; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [toolCall], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ + .mockResolvedValueOnce( + createMockResult({ + toolCalls: [toolCall], finishReason: 'tool-calls', reasoning: [ { @@ -765,15 +770,14 @@ describe('streamTextIterator', () => { }, }, ], - }), - }) + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -829,21 +833,19 @@ describe('streamTextIterator', () => { }; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [toolCall], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ + .mockResolvedValueOnce( + createMockResult({ + toolCalls: [toolCall], finishReason: 'tool-calls', reasoning: [], - }), - }) + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -891,11 +893,10 @@ describe('streamTextIterator', () => { vi.mocked(doStreamStep).mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -929,11 +930,10 @@ describe('streamTextIterator', () => { vi.mocked(doStreamStep).mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); // prepareStep returns both system and messages — system should NOT be lost @@ -980,11 +980,10 @@ describe('streamTextIterator', () => { vi.mocked(doStreamStep).mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); // Messages already include a system message — prepareStep's system should replace it @@ -1036,19 +1035,17 @@ describe('streamTextIterator', () => { vi.mocked(doStreamStep) .mockImplementationOnce(async (prompt) => { capturedPrompts.push([...prompt]); - return { + return createMockResult({ toolCalls: [toolCall], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }; + finishReason: 'tool-calls', + }); }) .mockImplementationOnce(async (prompt) => { capturedPrompts.push([...prompt]); - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -1111,38 +1108,18 @@ describe('streamTextIterator', () => { }; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [malformedToolCall], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ + .mockResolvedValueOnce( + createMockResult({ + toolCalls: [malformedToolCall], finishReason: 'tool-calls', - toolCalls: [ - { - type: 'tool-call', - toolCallId: 'call-1', - toolName: 'testTool', - input: '{"query":"test"', - dynamic: true as const, - }, - ], - dynamicToolCalls: [ - { - type: 'tool-call', - toolCallId: 'call-1', - toolName: 'testTool', - input: '{"query":"test"', - dynamic: true as const, - }, - ], - }), - }) + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ diff --git a/packages/ai/src/agent/stream-text-iterator.ts b/packages/ai/src/agent/stream-text-iterator.ts index 07b7e87bd5..10a3f2aaa2 100644 --- a/packages/ai/src/agent/stream-text-iterator.ts +++ b/packages/ai/src/agent/stream-text-iterator.ts @@ -3,6 +3,7 @@ import type { LanguageModelV3Prompt, LanguageModelV3ToolCall, LanguageModelV3ToolResultPart, + SharedV3ProviderOptions, } from '@ai-sdk/provider'; import type { FinishReason, @@ -13,8 +14,10 @@ import type { UIMessageChunk, } from 'ai'; import { + type DoStreamStepRawResult, doStreamStep, type ModelStopCondition, + normalizeFinishReason as normalizeFinishReasonStrict, type ProviderExecutedToolResult, } from './do-stream-step.js'; import type { @@ -24,13 +27,13 @@ import type { StreamTextTransform, TelemetrySettings, } from './durable-agent.js'; +import { safeParseToolCallInput } from './safe-parse-tool-call-input.js'; import { createSpan, endSpan, runInContext, type SpanHandle, } from './telemetry.js'; -import { safeParseToolCallInput } from './safe-parse-tool-call-input.js'; import { toolsToModelTools } from './tools-to-model-tools.js'; import type { CompatibleLanguageModel } from './types.js'; @@ -295,8 +298,7 @@ export async function* streamTextIterator({ const modelTools = await toolsToModelTools(effectiveTools); const { toolCalls, - finish, - step, + raw, uiChunks: stepUIChunks, providerExecutedToolResults, } = await runInContext(outerSpanHandle, () => @@ -311,6 +313,9 @@ export async function* streamTextIterator({ collectUIChunks, }) ); + // Reconstruct the full StepResult outside the step boundary so the + // event log doesn't carry StepResult's redundant copies. + const step = buildStepResult(raw, toolCalls, conversationPrompt); isFirstIteration = false; stepNumber++; steps.push(step); @@ -325,7 +330,7 @@ export async function* streamTextIterator({ ]; // Normalize finishReason - AI SDK v6 returns { unified, raw }, v5 returns a string - const finishReason = normalizeFinishReason(finish?.finishReason); + const finishReason = normalizeFinishReason(raw.rawFinishReason); if (finishReason === 'tool-calls') { lastStepWasToolCalls = true; @@ -444,7 +449,7 @@ export async function* streamTextIterator({ done = true; } else { throw new Error( - `Unexpected finish reason: ${typeof finish?.finishReason === 'object' ? JSON.stringify(finish?.finishReason) : finish?.finishReason}` + `Unexpected finish reason: ${typeof raw.rawFinishReason === 'object' ? JSON.stringify(raw.rawFinishReason) : raw.rawFinishReason}` ); } @@ -567,3 +572,169 @@ function normalizeFinishReason(raw: unknown): FinishReason | undefined { } return undefined; } + +/** + * Convert a Uint8Array to a base64 string safely. + * Uses a loop instead of spread operator to avoid stack overflow on large arrays. + */ +function uint8ArrayToBase64(data: Uint8Array): string { + let binary = ''; + for (let i = 0; i < data.length; i++) { + binary += String.fromCharCode(data[i]); + } + return btoa(binary); +} + +/** + * Reconstruct a full `StepResult` from the minimal raw aggregates returned + * by `doStreamStep`. Runs outside the step boundary so StepResult's + * redundant fields (duplicate tool-call lists, content, reasoningText, + * dual base64+uint8Array file encoding, request body) don't cross it. + * + * The shape returned matches what the AI SDK's `streamText` would expose + * to user callbacks (`onStepFinish`, the `steps` array). + */ +function buildStepResult( + raw: DoStreamStepRawResult, + toolCalls: LanguageModelV3ToolCall[], + conversationPrompt: LanguageModelV3Prompt +): StepResult { + const reasoning = raw.reasoning.map((r) => ({ + type: 'reasoning' as const, + text: r.text, + ...(r.providerMetadata != null + ? { providerOptions: r.providerMetadata } + : {}), + })); + + const reasoningText = raw.reasoning.map((r) => r.text).join(''); + + // Expand each file to the AI SDK's GeneratedFile shape (base64 + uint8Array). + // The dual encoding doubles the file payload, so we only do it here, after + // crossing the step boundary. + const files = raw.files.map((file) => { + const data = file.data; + if (data instanceof Uint8Array) { + const base64 = uint8ArrayToBase64(data); + return { + mediaType: file.mediaType, + base64, + uint8Array: data, + }; + } else { + // Data is a base64 string. (URL is not currently supported here — + // matches prior behavior in chunksToStep.) + const binaryString = atob(data as string); + const bytes = new Uint8Array(binaryString.length); + for (let i = 0; i < binaryString.length; i++) { + bytes[i] = binaryString.charCodeAt(i); + } + return { + mediaType: file.mediaType, + base64: data as string, + uint8Array: bytes, + }; + } + }); + + // Extract the raw finish reason from the V3 finish reason object/string. + const rawFinish = raw.rawFinishReason; + const rawFinishReason = + typeof rawFinish === 'object' && rawFinish !== null + ? (rawFinish as { raw?: string }).raw + : typeof rawFinish === 'string' + ? rawFinish + : undefined; + + const mapToolCall = (toolCall: LanguageModelV3ToolCall) => ({ + type: 'tool-call' as const, + toolCallId: toolCall.toolCallId, + toolName: toolCall.toolName, + input: safeParseToolCallInput(toolCall.input), + dynamic: true as const, + }); + + return { + stepNumber: 0, // Will be overridden by the caller + model: { + provider: raw.responseMetadata?.modelId?.split(':')[0] ?? 'unknown', + modelId: raw.responseMetadata?.modelId ?? 'unknown', + }, + functionId: undefined, + metadata: undefined, + experimental_context: undefined, + content: [ + ...(raw.text ? [{ type: 'text' as const, text: raw.text }] : []), + ...toolCalls.map(mapToolCall), + ], + text: raw.text, + reasoning: reasoning.map((r) => ({ + type: 'reasoning' as const, + text: r.text, + ...(r.providerOptions != null + ? { providerOptions: r.providerOptions as SharedV3ProviderOptions } + : {}), + })), + reasoningText: reasoningText || undefined, + files, + sources: raw.sources, + toolCalls: toolCalls.map(mapToolCall), + staticToolCalls: [], + dynamicToolCalls: toolCalls.map(mapToolCall), + toolResults: [], + staticToolResults: [], + dynamicToolResults: [], + finishReason: normalizeFinishReasonStrict(raw.rawFinishReason), + rawFinishReason, + usage: raw.usage + ? { + inputTokens: raw.usage.inputTokens?.total ?? 0, + inputTokenDetails: { + noCacheTokens: raw.usage.inputTokens?.noCache, + cacheReadTokens: raw.usage.inputTokens?.cacheRead, + cacheWriteTokens: raw.usage.inputTokens?.cacheWrite, + }, + outputTokens: raw.usage.outputTokens?.total ?? 0, + outputTokenDetails: { + textTokens: raw.usage.outputTokens?.text, + reasoningTokens: raw.usage.outputTokens?.reasoning, + }, + totalTokens: + (raw.usage.inputTokens?.total ?? 0) + + (raw.usage.outputTokens?.total ?? 0), + } + : { + inputTokens: 0, + inputTokenDetails: { + noCacheTokens: undefined, + cacheReadTokens: undefined, + cacheWriteTokens: undefined, + }, + outputTokens: 0, + outputTokenDetails: { + textTokens: undefined, + reasoningTokens: undefined, + }, + totalTokens: 0, + }, + warnings: raw.warnings, + request: { + body: JSON.stringify({ + prompt: conversationPrompt, + tools: toolCalls.map(mapToolCall), + }), + }, + response: { + id: raw.responseMetadata?.id ?? 'unknown', + timestamp: + raw.responseMetadata?.timestamp instanceof Date + ? raw.responseMetadata.timestamp + : raw.responseMetadata?.timestamp != null + ? new Date(raw.responseMetadata.timestamp) + : new Date(), + modelId: raw.responseMetadata?.modelId ?? 'unknown', + messages: [], + }, + providerMetadata: raw.providerMetadata || {}, + }; +}