diff --git a/.changeset/tall-mice-sink.md b/.changeset/tall-mice-sink.md new file mode 100644 index 0000000000..5ffda6547a --- /dev/null +++ b/.changeset/tall-mice-sink.md @@ -0,0 +1,5 @@ +--- +'@workflow/ai': patch +--- + +Reduce DurableAgent step boundary payload by reconstructing `StepResult` outside the step instead of inside. diff --git a/packages/ai/src/agent/do-stream-step.test.ts b/packages/ai/src/agent/do-stream-step.test.ts index 28f6122881..acb2839cfa 100644 --- a/packages/ai/src/agent/do-stream-step.test.ts +++ b/packages/ai/src/agent/do-stream-step.test.ts @@ -187,8 +187,8 @@ describe('doStreamStep', () => { { sendStart: false } ); - expect(result.step.toolCalls).toHaveLength(1); - expect(result.step.toolCalls[0]?.input).toBe('{"city":"San Francisco"'); + expect(result.toolCalls).toHaveLength(1); + expect(result.toolCalls[0]?.input).toBe('{"city":"San Francisco"'); expect(writtenChunks).toContainEqual( expect.objectContaining({ type: 'tool-input-available', diff --git a/packages/ai/src/agent/do-stream-step.ts b/packages/ai/src/agent/do-stream-step.ts index 15ba779dad..8a064df220 100644 --- a/packages/ai/src/agent/do-stream-step.ts +++ b/packages/ai/src/agent/do-stream-step.ts @@ -10,18 +10,17 @@ import { type FinishReason, gateway, generateId, - type StepResult, type StopCondition, type ToolChoice, type ToolSet, type UIMessageChunk, } from 'ai'; +import { getErrorMessage } from '../get-error-message.js'; import type { ProviderOptions, StreamTextTransform, TelemetrySettings, } from './durable-agent.js'; -import { getErrorMessage } from '../get-error-message.js'; import { safeParseToolCallInput } from './safe-parse-tool-call-input.js'; import { recordSpan } from './telemetry.js'; import type { CompatibleLanguageModel } from './types.js'; @@ -52,6 +51,68 @@ function uint8ArrayToBase64(data: Uint8Array): string { return btoa(binary); } +/** + * Reasoning part captured during streaming, in source order. + */ +export interface RawReasoningPart { + text: string; + providerMetadata?: SharedV3ProviderOptions; +} + +/** + * File chunk captured during streaming. The `data` field is the raw value + * emitted by the model — base64/URL string, URL object, or Uint8Array. + */ +export interface RawFile { + mediaType: string; + data: Uint8Array | string | URL; +} + +/** + * Response metadata extracted from the model's `response-metadata` chunk. + */ +export interface RawResponseMetadata { + id?: string; + modelId?: string; + timestamp?: Date | string; +} + +/** + * Minimal aggregates needed to reconstruct a `StepResult` outside the step + * boundary. By returning only these fields (instead of a fully-populated + * StepResult), we avoid serializing the redundant copies the AI SDK keeps + * in StepResult — `toolCalls`/`dynamicToolCalls`/`staticToolCalls`, + * `content`, `reasoningText`, the always-empty `*ToolResults` arrays, the + * dual base64+uint8Array file encoding, and `request.body` (a JSON dump of + * the input prompt). The caller reconstructs the full StepResult from + * these fields plus the conversation prompt it already holds. + */ +export interface DoStreamStepRawResult { + text: string; + reasoning: RawReasoningPart[]; + files: RawFile[]; + sources: Array>; + warnings?: Extract< + LanguageModelV3StreamPart, + { type: 'stream-start' } + >['warnings']; + responseMetadata?: RawResponseMetadata; + /** Raw finish reason as emitted by the model (V3 may emit object or string). */ + rawFinishReason: unknown; + usage?: FinishPart['usage']; + providerMetadata?: SharedV3ProviderOptions; +} + +/** + * Result returned across the `doStreamStep` step boundary. + */ +export interface DoStreamStepResult { + toolCalls: LanguageModelV3ToolCall[]; + raw: DoStreamStepRawResult; + uiChunks: UIMessageChunk[] | undefined; + providerExecutedToolResults: Map; +} + /** * Options for the doStreamStep function. */ @@ -111,7 +172,7 @@ export async function doStreamStep( writable: WritableStream, tools?: LanguageModelV3CallOptions['tools'], options?: DoStreamStepOptions -) { +): Promise { 'use step'; let model: CompatibleLanguageModel | undefined; @@ -218,7 +279,29 @@ export async function doStreamStep( string, ProviderExecutedToolResult >(); - const chunks: LanguageModelV3StreamPart[] = []; + + // Raw aggregates streamed in alongside chunks. We collect these here + // so we don't have to retain the full V3 chunk array, and so callers + // outside the step boundary can rebuild a StepResult without paying + // for StepResult's redundant fields across the boundary. + let textBuffer = ''; + const reasoningById = new Map< + string, + { text: string; providerMetadata?: SharedV3ProviderOptions } + >(); + const reasoningOrder: string[] = []; + const files: RawFile[] = []; + const sources: Array< + Extract + > = []; + let warnings: + | Extract< + LanguageModelV3StreamPart, + { type: 'stream-start' } + >['warnings'] + | undefined; + let responseMetadata: RawResponseMetadata | undefined; + const includeRawChunks = options?.includeRawChunks ?? false; const collectUIChunks = options?.collectUIChunks ?? false; const uiChunks: UIMessageChunk[] = []; @@ -253,23 +336,86 @@ export async function doStreamStep( if (msToFirstChunk === undefined) { msToFirstChunk = Date.now() - startTime; } - if (chunk.type === 'tool-call') { - toolCalls.push({ - ...chunk, - input: chunk.input || '{}', - }); - } else if (chunk.type === 'tool-result') { - // In V3, all tool-result stream parts are provider-executed by definition - providerExecutedToolResults.set(chunk.toolCallId, { - toolCallId: chunk.toolCallId, - toolName: chunk.toolName, - result: chunk.result, - isError: chunk.isError, - }); - } else if (chunk.type === 'finish') { - finish = chunk; + switch (chunk.type) { + case 'tool-call': + toolCalls.push({ + ...chunk, + input: chunk.input || '{}', + }); + break; + case 'tool-result': + // In V3, all tool-result stream parts are provider-executed by definition + providerExecutedToolResults.set(chunk.toolCallId, { + toolCallId: chunk.toolCallId, + toolName: chunk.toolName, + result: chunk.result, + isError: chunk.isError, + }); + break; + case 'finish': + finish = chunk; + break; + case 'text-delta': + textBuffer += chunk.delta; + break; + case 'reasoning-start': + reasoningById.set(chunk.id, { + text: '', + providerMetadata: chunk.providerMetadata as + | SharedV3ProviderOptions + | undefined, + }); + reasoningOrder.push(chunk.id); + break; + case 'reasoning-delta': { + const entry = reasoningById.get(chunk.id); + if (entry) { + entry.text += chunk.delta; + if (chunk.providerMetadata != null) { + entry.providerMetadata = + chunk.providerMetadata as SharedV3ProviderOptions; + } + } else { + // Delta without a preceding start — still collect it + reasoningById.set(chunk.id, { + text: chunk.delta, + providerMetadata: chunk.providerMetadata as + | SharedV3ProviderOptions + | undefined, + }); + reasoningOrder.push(chunk.id); + } + break; + } + case 'reasoning-end': { + // Mirror the AI SDK's behavior: reasoning-end can carry final providerMetadata. + const entry = reasoningById.get(chunk.id); + if (entry && chunk.providerMetadata != null) { + entry.providerMetadata = + chunk.providerMetadata as SharedV3ProviderOptions; + } + break; + } + case 'file': + files.push({ + mediaType: chunk.mediaType, + data: chunk.data, + }); + break; + case 'source': + sources.push(chunk); + break; + case 'stream-start': + warnings = chunk.warnings; + break; + case 'response-metadata': + responseMetadata = { + id: chunk.id, + modelId: chunk.modelId, + timestamp: chunk.timestamp, + }; + break; } - chunks.push(chunk); controller.enqueue(chunk); }, }) @@ -532,19 +678,27 @@ export async function doStreamStep( ) .pipeTo(writable, { preventClose: true }); - const step = chunksToStep(chunks, toolCalls, conversationPrompt, finish); + // Materialize the reasoning aggregate in source order. Captured here + // so we can both compute telemetry and ship it across the boundary. + const reasoningParts: RawReasoningPart[] = reasoningOrder.map((id) => { + const entry = reasoningById.get(id)!; + return { + text: entry.text, + ...(entry.providerMetadata != null + ? { providerMetadata: entry.providerMetadata } + : {}), + }; + }); + + const reasoningTextForTelemetry = reasoningParts + .map((r) => r.text) + .join(''); // ── Record response-time telemetry attributes on the span ── if (span) { const msToFinish = Date.now() - startTime; const finishReason = normalizeFinishReason(finish?.finishReason); - // Extract response metadata from collected chunks - const responseMetadata = chunks.find( - (c): c is Extract => - c.type === 'response-metadata' - ); - // Usage attributes (not gated) const inputTokens = finish?.usage?.inputTokens?.total ?? 0; const outputTokens = finish?.usage?.outputTokens?.total ?? 0; @@ -598,14 +752,12 @@ export async function doStreamStep( 'gen_ai.usage.output_tokens': outputTokens, }; - // Output-gated response attributes — reuse aggregated values - // from chunksToStep to avoid redundant iteration over chunks. if (telemetry?.recordOutputs !== false) { - if (step.text) { - responseAttrs['ai.response.text'] = step.text; + if (textBuffer) { + responseAttrs['ai.response.text'] = textBuffer; } - if (step.reasoningText) { - responseAttrs['ai.response.reasoning'] = step.reasoningText; + if (reasoningTextForTelemetry) { + responseAttrs['ai.response.reasoning'] = reasoningTextForTelemetry; } if (toolCalls.length > 0) { responseAttrs['ai.response.toolCalls'] = JSON.stringify(toolCalls); @@ -615,10 +767,23 @@ export async function doStreamStep( span.setAttributes(responseAttrs); } + const raw: DoStreamStepRawResult = { + text: textBuffer, + reasoning: reasoningParts, + files, + sources, + ...(warnings !== undefined ? { warnings } : {}), + ...(responseMetadata !== undefined ? { responseMetadata } : {}), + rawFinishReason: finish?.finishReason, + ...(finish?.usage !== undefined ? { usage: finish.usage } : {}), + ...(finish?.providerMetadata !== undefined + ? { providerMetadata: finish.providerMetadata } + : {}), + }; + return { toolCalls, - finish, - step, + raw, uiChunks: collectUIChunks ? uiChunks : undefined, providerExecutedToolResults, }; @@ -657,215 +822,3 @@ export function normalizeFinishReason(rawFinishReason: unknown): FinishReason { } return 'other'; } - -// This is a stand-in for logic in the AI-SDK streamText code which aggregates -// chunks into a single step result. -function chunksToStep( - chunks: LanguageModelV3StreamPart[], - toolCalls: LanguageModelV3ToolCall[], - conversationPrompt: LanguageModelV3Prompt, - finish?: FinishPart -): StepResult { - // Transform chunks to a single step result - const text = chunks - .filter( - (chunk): chunk is Extract => - chunk.type === 'text-delta' - ) - .map((chunk) => chunk.delta) - .join(''); - - // Collect reasoning parts by ID, mirroring how the AI SDK aggregates them: - // reasoning-start creates the part (with providerMetadata), reasoning-delta - // appends text, reasoning-end finalizes. For encrypted reasoning (e.g. OpenAI - // o-series), there may be no deltas — only start+end with providerMetadata - // carrying the itemId needed for Responses API item references. - const reasoningById = new Map< - string, - { text: string; providerMetadata?: unknown } - >(); - for (const chunk of chunks) { - if (chunk.type === 'reasoning-start') { - reasoningById.set(chunk.id, { - text: '', - providerMetadata: chunk.providerMetadata, - }); - } else if (chunk.type === 'reasoning-delta') { - const entry = reasoningById.get(chunk.id); - if (entry) { - entry.text += chunk.delta; - if (chunk.providerMetadata != null) { - entry.providerMetadata = chunk.providerMetadata; - } - } else { - // Delta without a preceding start — still collect it - reasoningById.set(chunk.id, { - text: chunk.delta, - providerMetadata: chunk.providerMetadata, - }); - } - } else if (chunk.type === 'reasoning-end') { - // Merge reasoning-end metadata, mirroring the AI SDK's behavior - // where reasoning-end can carry final providerMetadata. - const entry = reasoningById.get(chunk.id); - if (entry && chunk.providerMetadata != null) { - entry.providerMetadata = chunk.providerMetadata; - } - } - } - const reasoning = Array.from(reasoningById.values()); - - const reasoningText = reasoning.map((r) => r.text).join(''); - - // Extract warnings from stream-start chunk - const streamStart = chunks.find( - (chunk): chunk is Extract => - chunk.type === 'stream-start' - ); - - // Extract response metadata from response-metadata chunk - const responseMetadata = chunks.find( - (chunk): chunk is Extract => - chunk.type === 'response-metadata' - ); - - // Extract files from file chunks - // File chunks contain mediaType and data (base64 string or Uint8Array) - // GeneratedFile requires both base64 and uint8Array properties - const files = chunks - .filter( - (chunk): chunk is Extract => - chunk.type === 'file' - ) - .map((chunk) => { - const data = chunk.data; - // If data is already a Uint8Array, convert to base64; otherwise use as-is - if (data instanceof Uint8Array) { - // Convert Uint8Array to base64 string - const base64 = uint8ArrayToBase64(data); - return { - mediaType: chunk.mediaType, - base64, - uint8Array: data, - }; - } else { - // Data is base64 string, decode to Uint8Array - const binaryString = atob(data); - const bytes = new Uint8Array(binaryString.length); - for (let i = 0; i < binaryString.length; i++) { - bytes[i] = binaryString.charCodeAt(i); - } - return { - mediaType: chunk.mediaType, - base64: data, - uint8Array: bytes, - }; - } - }); - - // Extract sources from source chunks - const sources = chunks - .filter( - (chunk): chunk is Extract => - chunk.type === 'source' - ) - .map((chunk) => chunk); - - // Extract the raw finish reason from the V3 finish reason object - const v3FinishReason = finish?.finishReason; - const rawFinishReason = - typeof v3FinishReason === 'object' && v3FinishReason !== null - ? (v3FinishReason as { raw?: string }).raw - : typeof v3FinishReason === 'string' - ? v3FinishReason - : undefined; - - const mapToolCall = (toolCall: LanguageModelV3ToolCall) => ({ - type: 'tool-call' as const, - toolCallId: toolCall.toolCallId, - toolName: toolCall.toolName, - input: safeParseToolCallInput(toolCall.input), - dynamic: true as const, - }); - - const stepResult: StepResult = { - stepNumber: 0, // Will be overridden by the caller - model: { - provider: responseMetadata?.modelId?.split(':')[0] ?? 'unknown', - modelId: responseMetadata?.modelId ?? 'unknown', - }, - functionId: undefined, - metadata: undefined, - experimental_context: undefined, - content: [ - ...(text ? [{ type: 'text' as const, text }] : []), - ...toolCalls.map(mapToolCall), - ], - text, - reasoning: reasoning.map((r) => ({ - type: 'reasoning' as const, - text: r.text, - ...(r.providerMetadata != null - ? { providerOptions: r.providerMetadata as SharedV3ProviderOptions } - : {}), - })), - reasoningText: reasoningText || undefined, - files, - sources, - toolCalls: toolCalls.map(mapToolCall), - staticToolCalls: [], - dynamicToolCalls: toolCalls.map(mapToolCall), - toolResults: [], - staticToolResults: [], - dynamicToolResults: [], - finishReason: normalizeFinishReason(finish?.finishReason), - rawFinishReason, - usage: finish?.usage - ? { - inputTokens: finish.usage.inputTokens?.total ?? 0, - inputTokenDetails: { - noCacheTokens: finish.usage.inputTokens?.noCache, - cacheReadTokens: finish.usage.inputTokens?.cacheRead, - cacheWriteTokens: finish.usage.inputTokens?.cacheWrite, - }, - outputTokens: finish.usage.outputTokens?.total ?? 0, - outputTokenDetails: { - textTokens: finish.usage.outputTokens?.text, - reasoningTokens: finish.usage.outputTokens?.reasoning, - }, - totalTokens: - (finish.usage.inputTokens?.total ?? 0) + - (finish.usage.outputTokens?.total ?? 0), - } - : { - inputTokens: 0, - inputTokenDetails: { - noCacheTokens: undefined, - cacheReadTokens: undefined, - cacheWriteTokens: undefined, - }, - outputTokens: 0, - outputTokenDetails: { - textTokens: undefined, - reasoningTokens: undefined, - }, - totalTokens: 0, - }, - warnings: streamStart?.warnings, - request: { - body: JSON.stringify({ - prompt: conversationPrompt, - tools: toolCalls.map(mapToolCall), - }), - }, - response: { - id: responseMetadata?.id ?? 'unknown', - timestamp: responseMetadata?.timestamp ?? new Date(), - modelId: responseMetadata?.modelId ?? 'unknown', - messages: [], - }, - providerMetadata: finish?.providerMetadata || {}, - }; - - return stepResult; -} diff --git a/packages/ai/src/agent/stream-text-iterator.test.ts b/packages/ai/src/agent/stream-text-iterator.test.ts index 90eabc1446..697fc86943 100644 --- a/packages/ai/src/agent/stream-text-iterator.test.ts +++ b/packages/ai/src/agent/stream-text-iterator.test.ts @@ -11,14 +11,20 @@ import type { LanguageModelV3ToolCall, LanguageModelV3ToolResult, LanguageModelV3ToolResultPart, + SharedV3ProviderOptions, } from '@ai-sdk/provider'; -import type { StepResult, ToolSet, UIMessageChunk } from 'ai'; +import type { ToolSet, UIMessageChunk } from 'ai'; import { beforeEach, describe, expect, it, vi } from 'vitest'; -// Mock doStreamStep -vi.mock('./do-stream-step.js', () => ({ - doStreamStep: vi.fn(), -})); +// Mock doStreamStep but keep the rest of the module (notably +// `normalizeFinishReason`, which buildStepResult uses). +vi.mock(import('./do-stream-step.js'), async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + doStreamStep: vi.fn(), + }; +}); // Import after mocking const { streamTextIterator } = await import('./stream-text-iterator.js'); @@ -35,36 +41,38 @@ function createMockWritable(): WritableStream { } /** - * Helper to create a minimal step result for testing + * Helper to create a mock `doStreamStep` result. Inputs are described in + * StepResult-style terms (finishReason, reasoning) and translated to the + * raw aggregate shape that `doStreamStep` actually returns; the iterator + * reconstructs the StepResult via buildStepResult(). */ -function createMockStepResult( - overrides: Partial> = {} -): StepResult { +function createMockResult( + overrides: { + toolCalls?: LanguageModelV3ToolCall[]; + finishReason?: string; + reasoning?: Array<{ + text: string; + providerOptions?: SharedV3ProviderOptions; + }>; + text?: string; + } = {} +) { return { - content: [], - text: '', - reasoning: [], - reasoningText: undefined, - files: [], - sources: [], - toolCalls: [], - staticToolCalls: [], - dynamicToolCalls: [], - toolResults: [], - staticToolResults: [], - dynamicToolResults: [], - finishReason: 'stop', - usage: { inputTokens: 0, outputTokens: 0, totalTokens: 0 }, - warnings: [], - request: { body: '' }, - response: { - id: 'test', - timestamp: new Date(), - modelId: 'test', - messages: [], + toolCalls: overrides.toolCalls ?? [], + raw: { + text: overrides.text ?? '', + reasoning: (overrides.reasoning ?? []).map((r) => ({ + text: r.text, + ...(r.providerOptions != null + ? { providerMetadata: r.providerOptions } + : {}), + })), + files: [], + sources: [], + rawFinishReason: overrides.finishReason, }, - providerMetadata: {}, - ...overrides, + uiChunks: undefined, + providerExecutedToolResults: new Map(), }; } @@ -96,19 +104,19 @@ describe('streamTextIterator', () => { // First call returns tool-calls with providerMetadata // Second call (after tool results) should receive the updated prompt vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [toolCallWithMetadata], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }) + .mockResolvedValueOnce( + createMockResult({ + toolCalls: [toolCallWithMetadata], + finishReason: 'tool-calls', + }) + ) .mockImplementationOnce(async (prompt) => { // Capture the prompt on the second call to verify providerOptions capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -177,18 +185,18 @@ describe('streamTextIterator', () => { }; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [toolCallWithoutMetadata], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }) + .mockResolvedValueOnce( + createMockResult({ + toolCalls: [toolCallWithoutMetadata], + finishReason: 'tool-calls', + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -256,18 +264,18 @@ describe('streamTextIterator', () => { ]; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls, - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }) + .mockResolvedValueOnce( + createMockResult({ + toolCalls, + finishReason: 'tool-calls', + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -358,18 +366,18 @@ describe('streamTextIterator', () => { ]; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls, - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }) + .mockResolvedValueOnce( + createMockResult({ + toolCalls, + finishReason: 'tool-calls', + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -448,18 +456,18 @@ describe('streamTextIterator', () => { }; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [toolCallWithOpenAIMetadata], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }) + .mockResolvedValueOnce( + createMockResult({ + toolCalls: [toolCallWithOpenAIMetadata], + finishReason: 'tool-calls', + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -523,18 +531,18 @@ describe('streamTextIterator', () => { }; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [toolCallWithMixedOpenAIMetadata], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }) + .mockResolvedValueOnce( + createMockResult({ + toolCalls: [toolCallWithMixedOpenAIMetadata], + finishReason: 'tool-calls', + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -600,18 +608,18 @@ describe('streamTextIterator', () => { }; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [toolCallWithMixedProviders], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }) + .mockResolvedValueOnce( + createMockResult({ + toolCalls: [toolCallWithMixedProviders], + finishReason: 'tool-calls', + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -673,24 +681,22 @@ describe('streamTextIterator', () => { }; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [toolCall], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ + .mockResolvedValueOnce( + createMockResult({ + toolCalls: [toolCall], finishReason: 'tool-calls', reasoning: [ { type: 'reasoning', text: 'Let me think about this...' }, { type: 'reasoning', text: 'I should use the test tool.' }, ], - }), - }) + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -751,10 +757,9 @@ describe('streamTextIterator', () => { }; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [toolCall], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ + .mockResolvedValueOnce( + createMockResult({ + toolCalls: [toolCall], finishReason: 'tool-calls', reasoning: [ { @@ -765,15 +770,14 @@ describe('streamTextIterator', () => { }, }, ], - }), - }) + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -829,21 +833,19 @@ describe('streamTextIterator', () => { }; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [toolCall], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ + .mockResolvedValueOnce( + createMockResult({ + toolCalls: [toolCall], finishReason: 'tool-calls', reasoning: [], - }), - }) + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -891,11 +893,10 @@ describe('streamTextIterator', () => { vi.mocked(doStreamStep).mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -929,11 +930,10 @@ describe('streamTextIterator', () => { vi.mocked(doStreamStep).mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); // prepareStep returns both system and messages — system should NOT be lost @@ -980,11 +980,10 @@ describe('streamTextIterator', () => { vi.mocked(doStreamStep).mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); // Messages already include a system message — prepareStep's system should replace it @@ -1036,19 +1035,17 @@ describe('streamTextIterator', () => { vi.mocked(doStreamStep) .mockImplementationOnce(async (prompt) => { capturedPrompts.push([...prompt]); - return { + return createMockResult({ toolCalls: [toolCall], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ finishReason: 'tool-calls' }), - }; + finishReason: 'tool-calls', + }); }) .mockImplementationOnce(async (prompt) => { capturedPrompts.push([...prompt]); - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ @@ -1111,38 +1108,18 @@ describe('streamTextIterator', () => { }; vi.mocked(doStreamStep) - .mockResolvedValueOnce({ - toolCalls: [malformedToolCall], - finish: { finishReason: 'tool-calls' }, - step: createMockStepResult({ + .mockResolvedValueOnce( + createMockResult({ + toolCalls: [malformedToolCall], finishReason: 'tool-calls', - toolCalls: [ - { - type: 'tool-call', - toolCallId: 'call-1', - toolName: 'testTool', - input: '{"query":"test"', - dynamic: true as const, - }, - ], - dynamicToolCalls: [ - { - type: 'tool-call', - toolCallId: 'call-1', - toolName: 'testTool', - input: '{"query":"test"', - dynamic: true as const, - }, - ], - }), - }) + }) + ) .mockImplementationOnce(async (prompt) => { capturedPrompt = prompt; - return { + return createMockResult({ toolCalls: [], - finish: { finishReason: 'stop' }, - step: createMockStepResult({ finishReason: 'stop' }), - }; + finishReason: 'stop', + }); }); const iterator = streamTextIterator({ diff --git a/packages/ai/src/agent/stream-text-iterator.ts b/packages/ai/src/agent/stream-text-iterator.ts index 07b7e87bd5..10a3f2aaa2 100644 --- a/packages/ai/src/agent/stream-text-iterator.ts +++ b/packages/ai/src/agent/stream-text-iterator.ts @@ -3,6 +3,7 @@ import type { LanguageModelV3Prompt, LanguageModelV3ToolCall, LanguageModelV3ToolResultPart, + SharedV3ProviderOptions, } from '@ai-sdk/provider'; import type { FinishReason, @@ -13,8 +14,10 @@ import type { UIMessageChunk, } from 'ai'; import { + type DoStreamStepRawResult, doStreamStep, type ModelStopCondition, + normalizeFinishReason as normalizeFinishReasonStrict, type ProviderExecutedToolResult, } from './do-stream-step.js'; import type { @@ -24,13 +27,13 @@ import type { StreamTextTransform, TelemetrySettings, } from './durable-agent.js'; +import { safeParseToolCallInput } from './safe-parse-tool-call-input.js'; import { createSpan, endSpan, runInContext, type SpanHandle, } from './telemetry.js'; -import { safeParseToolCallInput } from './safe-parse-tool-call-input.js'; import { toolsToModelTools } from './tools-to-model-tools.js'; import type { CompatibleLanguageModel } from './types.js'; @@ -295,8 +298,7 @@ export async function* streamTextIterator({ const modelTools = await toolsToModelTools(effectiveTools); const { toolCalls, - finish, - step, + raw, uiChunks: stepUIChunks, providerExecutedToolResults, } = await runInContext(outerSpanHandle, () => @@ -311,6 +313,9 @@ export async function* streamTextIterator({ collectUIChunks, }) ); + // Reconstruct the full StepResult outside the step boundary so the + // event log doesn't carry StepResult's redundant copies. + const step = buildStepResult(raw, toolCalls, conversationPrompt); isFirstIteration = false; stepNumber++; steps.push(step); @@ -325,7 +330,7 @@ export async function* streamTextIterator({ ]; // Normalize finishReason - AI SDK v6 returns { unified, raw }, v5 returns a string - const finishReason = normalizeFinishReason(finish?.finishReason); + const finishReason = normalizeFinishReason(raw.rawFinishReason); if (finishReason === 'tool-calls') { lastStepWasToolCalls = true; @@ -444,7 +449,7 @@ export async function* streamTextIterator({ done = true; } else { throw new Error( - `Unexpected finish reason: ${typeof finish?.finishReason === 'object' ? JSON.stringify(finish?.finishReason) : finish?.finishReason}` + `Unexpected finish reason: ${typeof raw.rawFinishReason === 'object' ? JSON.stringify(raw.rawFinishReason) : raw.rawFinishReason}` ); } @@ -567,3 +572,169 @@ function normalizeFinishReason(raw: unknown): FinishReason | undefined { } return undefined; } + +/** + * Convert a Uint8Array to a base64 string safely. + * Uses a loop instead of spread operator to avoid stack overflow on large arrays. + */ +function uint8ArrayToBase64(data: Uint8Array): string { + let binary = ''; + for (let i = 0; i < data.length; i++) { + binary += String.fromCharCode(data[i]); + } + return btoa(binary); +} + +/** + * Reconstruct a full `StepResult` from the minimal raw aggregates returned + * by `doStreamStep`. Runs outside the step boundary so StepResult's + * redundant fields (duplicate tool-call lists, content, reasoningText, + * dual base64+uint8Array file encoding, request body) don't cross it. + * + * The shape returned matches what the AI SDK's `streamText` would expose + * to user callbacks (`onStepFinish`, the `steps` array). + */ +function buildStepResult( + raw: DoStreamStepRawResult, + toolCalls: LanguageModelV3ToolCall[], + conversationPrompt: LanguageModelV3Prompt +): StepResult { + const reasoning = raw.reasoning.map((r) => ({ + type: 'reasoning' as const, + text: r.text, + ...(r.providerMetadata != null + ? { providerOptions: r.providerMetadata } + : {}), + })); + + const reasoningText = raw.reasoning.map((r) => r.text).join(''); + + // Expand each file to the AI SDK's GeneratedFile shape (base64 + uint8Array). + // The dual encoding doubles the file payload, so we only do it here, after + // crossing the step boundary. + const files = raw.files.map((file) => { + const data = file.data; + if (data instanceof Uint8Array) { + const base64 = uint8ArrayToBase64(data); + return { + mediaType: file.mediaType, + base64, + uint8Array: data, + }; + } else { + // Data is a base64 string. (URL is not currently supported here — + // matches prior behavior in chunksToStep.) + const binaryString = atob(data as string); + const bytes = new Uint8Array(binaryString.length); + for (let i = 0; i < binaryString.length; i++) { + bytes[i] = binaryString.charCodeAt(i); + } + return { + mediaType: file.mediaType, + base64: data as string, + uint8Array: bytes, + }; + } + }); + + // Extract the raw finish reason from the V3 finish reason object/string. + const rawFinish = raw.rawFinishReason; + const rawFinishReason = + typeof rawFinish === 'object' && rawFinish !== null + ? (rawFinish as { raw?: string }).raw + : typeof rawFinish === 'string' + ? rawFinish + : undefined; + + const mapToolCall = (toolCall: LanguageModelV3ToolCall) => ({ + type: 'tool-call' as const, + toolCallId: toolCall.toolCallId, + toolName: toolCall.toolName, + input: safeParseToolCallInput(toolCall.input), + dynamic: true as const, + }); + + return { + stepNumber: 0, // Will be overridden by the caller + model: { + provider: raw.responseMetadata?.modelId?.split(':')[0] ?? 'unknown', + modelId: raw.responseMetadata?.modelId ?? 'unknown', + }, + functionId: undefined, + metadata: undefined, + experimental_context: undefined, + content: [ + ...(raw.text ? [{ type: 'text' as const, text: raw.text }] : []), + ...toolCalls.map(mapToolCall), + ], + text: raw.text, + reasoning: reasoning.map((r) => ({ + type: 'reasoning' as const, + text: r.text, + ...(r.providerOptions != null + ? { providerOptions: r.providerOptions as SharedV3ProviderOptions } + : {}), + })), + reasoningText: reasoningText || undefined, + files, + sources: raw.sources, + toolCalls: toolCalls.map(mapToolCall), + staticToolCalls: [], + dynamicToolCalls: toolCalls.map(mapToolCall), + toolResults: [], + staticToolResults: [], + dynamicToolResults: [], + finishReason: normalizeFinishReasonStrict(raw.rawFinishReason), + rawFinishReason, + usage: raw.usage + ? { + inputTokens: raw.usage.inputTokens?.total ?? 0, + inputTokenDetails: { + noCacheTokens: raw.usage.inputTokens?.noCache, + cacheReadTokens: raw.usage.inputTokens?.cacheRead, + cacheWriteTokens: raw.usage.inputTokens?.cacheWrite, + }, + outputTokens: raw.usage.outputTokens?.total ?? 0, + outputTokenDetails: { + textTokens: raw.usage.outputTokens?.text, + reasoningTokens: raw.usage.outputTokens?.reasoning, + }, + totalTokens: + (raw.usage.inputTokens?.total ?? 0) + + (raw.usage.outputTokens?.total ?? 0), + } + : { + inputTokens: 0, + inputTokenDetails: { + noCacheTokens: undefined, + cacheReadTokens: undefined, + cacheWriteTokens: undefined, + }, + outputTokens: 0, + outputTokenDetails: { + textTokens: undefined, + reasoningTokens: undefined, + }, + totalTokens: 0, + }, + warnings: raw.warnings, + request: { + body: JSON.stringify({ + prompt: conversationPrompt, + tools: toolCalls.map(mapToolCall), + }), + }, + response: { + id: raw.responseMetadata?.id ?? 'unknown', + timestamp: + raw.responseMetadata?.timestamp instanceof Date + ? raw.responseMetadata.timestamp + : raw.responseMetadata?.timestamp != null + ? new Date(raw.responseMetadata.timestamp) + : new Date(), + modelId: raw.responseMetadata?.modelId ?? 'unknown', + messages: [], + }, + providerMetadata: raw.providerMetadata || {}, + }; +}