diff --git a/lib/request/response-handler.ts b/lib/request/response-handler.ts index 861be8ab..ee5a5bec 100644 --- a/lib/request/response-handler.ts +++ b/lib/request/response-handler.ts @@ -1,5 +1,6 @@ import { createLogger, logRequest, LOGGING_ENABLED } from "../logger.js"; import { PLUGIN_NAME } from "../constants.js"; +import { isRecord } from "../utils.js"; import type { SSEEventData } from "../types.js"; @@ -7,6 +8,485 @@ const log = createLogger("response-handler"); const MAX_SSE_SIZE = 10 * 1024 * 1024; // 10MB limit to prevent memory exhaustion const DEFAULT_STREAM_STALL_TIMEOUT_MS = 45_000; +const MAX_SYNTHESIZED_EVENT_INDEX = 255; + +type MutableRecord = Record; + +interface ParsedResponseState { + finalResponse: MutableRecord | null; + lastPhase: string | null; + outputItems: Map; + outputText: Map; + outputTextPhases: Map; + phaseTextSegments: Map; + phaseSegmentOrder: Map; + phaseText: Map; + reasoningSummaryText: Map; + seenResponseIds: Set; + encounteredError: boolean; +} + +function createParsedResponseState(): ParsedResponseState { + return { + finalResponse: null, + lastPhase: null, + outputItems: new Map(), + outputText: new Map(), + outputTextPhases: new Map(), + phaseTextSegments: new Map(), + phaseSegmentOrder: new Map(), + phaseText: new Map(), + reasoningSummaryText: new Map(), + seenResponseIds: new Set(), + encounteredError: false, + }; +} + +function toMutableRecord(value: unknown): MutableRecord | null { + return isRecord(value) ? { ...value } : null; +} + +function getNumberField(record: MutableRecord, key: string): number | null { + const value = record[key]; + return typeof value === "number" && Number.isFinite(value) ? value : null; +} + +/** + * Read a trimmed, non-empty string field for identifier-like values. + * + * For textual payloads where whitespace is meaningful, use a field-specific + * accessor such as `getDeltaField` instead of reusing this helper. + */ +function getStringField(record: MutableRecord, key: string): string | null { + const value = record[key]; + return typeof value === "string" && value.trim().length > 0 ? value : null; +} + +function getDeltaField(record: MutableRecord, key: string): string | null { + const value = record[key]; + return typeof value === "string" && value.length > 0 ? value : null; +} + +function isValidSynthesizedIndex(index: number | null): index is number { + return ( + index !== null && + Number.isInteger(index) && + index >= 0 && + index <= MAX_SYNTHESIZED_EVENT_INDEX + ); +} + +function cloneContentArray(content: unknown): MutableRecord[] { + if (!Array.isArray(content)) return []; + return content.filter(isRecord).map((part) => ({ ...part })); +} + +function mergeRecord(base: MutableRecord | null, update: MutableRecord): MutableRecord { + if (!base) return { ...update }; + const merged: MutableRecord = { ...base, ...update }; + if ("content" in update || "content" in base) { + const updateContent = cloneContentArray(update.content); + merged.content = + updateContent.length > 0 || !("content" in base) + ? updateContent + : cloneContentArray(base.content); + } + return merged; +} + +function makeOutputTextKey(outputIndex: number | null, contentIndex: number | null): string | null { + if ( + !isValidSynthesizedIndex(outputIndex) || + !isValidSynthesizedIndex(contentIndex) + ) { + return null; + } + return `${outputIndex}:${contentIndex}`; +} + +function makePhaseTextSegmentKey(phase: string, outputTextKey: string): string { + return `${phase}\u0000${outputTextKey}`; +} + +function makeSummaryKey(outputIndex: number | null, summaryIndex: number | null): string | null { + if ( + !isValidSynthesizedIndex(outputIndex) || + !isValidSynthesizedIndex(summaryIndex) + ) { + return null; + } + return `${outputIndex}:${summaryIndex}`; +} + +function getPartText(part: unknown): string | null { + if (!isRecord(part)) return null; + const text = getStringField(part, "text"); + if (text) return text; + return null; +} + +function capturePhase( + state: ParsedResponseState, + phase: unknown, +): void { + if (typeof phase !== "string" || phase.trim().length === 0) return; + state.lastPhase = phase.trim(); +} + +function rememberPhaseSegmentOrder( + state: ParsedResponseState, + phase: string, + segmentKey: string, +): string[] { + const existingOrder = state.phaseSegmentOrder.get(phase); + if (existingOrder?.includes(segmentKey)) { + return existingOrder; + } + const nextOrder = [...(existingOrder ?? []), segmentKey]; + state.phaseSegmentOrder.set(phase, nextOrder); + return nextOrder; +} + +function removePhaseSegmentOrder( + state: ParsedResponseState, + phase: string, + segmentKey: string, +): void { + const existingOrder = state.phaseSegmentOrder.get(phase); + if (!existingOrder) return; + const nextOrder = existingOrder.filter((key) => key !== segmentKey); + if (nextOrder.length === 0) { + state.phaseSegmentOrder.delete(phase); + return; + } + state.phaseSegmentOrder.set(phase, nextOrder); +} + +function rebuildPhaseText(state: ParsedResponseState, phase: string): void { + const orderedKeys = state.phaseSegmentOrder.get(phase) ?? []; + const text = orderedKeys + .map((key) => state.phaseTextSegments.get(key) ?? "") + .filter((value) => value.length > 0) + .join(""); + if (text.length === 0) { + state.phaseText.delete(phase); + return; + } + state.phaseText.set(phase, text); +} + +function setPhaseTextSegment( + state: ParsedResponseState, + phase: unknown, + outputTextKey: string, + text: string | null, +): void { + const normalizedPhase = + typeof phase === "string" && phase.trim().length > 0 + ? phase.trim() + : state.outputTextPhases.get(outputTextKey) ?? null; + if (!normalizedPhase) return; + state.outputTextPhases.set(outputTextKey, normalizedPhase); + state.lastPhase = normalizedPhase; + const segmentKey = makePhaseTextSegmentKey(normalizedPhase, outputTextKey); + if (!text || text.length === 0) { + state.phaseTextSegments.delete(segmentKey); + removePhaseSegmentOrder(state, normalizedPhase, segmentKey); + rebuildPhaseText(state, normalizedPhase); + return; + } + rememberPhaseSegmentOrder(state, normalizedPhase, segmentKey); + state.phaseTextSegments.set(segmentKey, text); + rebuildPhaseText(state, normalizedPhase); +} + +function appendPhaseTextSegment( + state: ParsedResponseState, + phase: unknown, + outputTextKey: string, + delta: string | null, +): void { + if (!delta || delta.length === 0) { + return; + } + const normalizedPhase = + typeof phase === "string" && phase.trim().length > 0 + ? phase.trim() + : state.outputTextPhases.get(outputTextKey) ?? null; + if (!normalizedPhase) return; + state.outputTextPhases.set(outputTextKey, normalizedPhase); + state.lastPhase = normalizedPhase; + const segmentKey = makePhaseTextSegmentKey(normalizedPhase, outputTextKey); + const phaseOrder = rememberPhaseSegmentOrder(state, normalizedPhase, segmentKey); + const existing = state.phaseTextSegments.get(segmentKey) ?? ""; + state.phaseTextSegments.set(segmentKey, `${existing}${delta}`); + if (phaseOrder[phaseOrder.length - 1] === segmentKey) { + const existingPhaseText = state.phaseText.get(normalizedPhase) ?? ""; + state.phaseText.set(normalizedPhase, `${existingPhaseText}${delta}`); + return; + } + rebuildPhaseText(state, normalizedPhase); +} + +function upsertOutputItem(state: ParsedResponseState, outputIndex: number | null, item: unknown): void { + if (!isValidSynthesizedIndex(outputIndex) || !isRecord(item)) return; + const current = state.outputItems.get(outputIndex) ?? null; + const merged = mergeRecord(current, item); + state.outputItems.set(outputIndex, merged); + capturePhase(state, merged.phase); +} + +function setOutputTextValue( + state: ParsedResponseState, + outputIndex: number | null, + contentIndex: number | null, + text: string | null, + phase: unknown = undefined, +): void { + const key = makeOutputTextKey(outputIndex, contentIndex); + if (!key) return; + if (!text) { + state.outputText.delete(key); + setPhaseTextSegment(state, phase, key, null); + return; + } + state.outputText.set(key, text); + setPhaseTextSegment(state, phase, key, text); +} + +function appendOutputTextValue( + state: ParsedResponseState, + outputIndex: number | null, + contentIndex: number | null, + delta: string | null, + phase: unknown = undefined, +): void { + if (!delta) return; + const key = makeOutputTextKey(outputIndex, contentIndex); + if (!key) return; + const existing = state.outputText.get(key) ?? ""; + state.outputText.set(key, `${existing}${delta}`); + appendPhaseTextSegment(state, phase, key, delta); +} + +function setReasoningSummaryValue( + state: ParsedResponseState, + outputIndex: number | null, + summaryIndex: number | null, + text: string | null, +): void { + const key = makeSummaryKey(outputIndex, summaryIndex); + if (!key) return; + if (!text) { + state.reasoningSummaryText.delete(key); + return; + } + state.reasoningSummaryText.set(key, text); +} + +function appendReasoningSummaryValue( + state: ParsedResponseState, + outputIndex: number | null, + summaryIndex: number | null, + delta: string | null, +): void { + if (!delta) return; + const key = makeSummaryKey(outputIndex, summaryIndex); + if (!key) return; + const existing = state.reasoningSummaryText.get(key) ?? ""; + state.reasoningSummaryText.set(key, `${existing}${delta}`); +} + +function ensureOutputItemAtIndex(output: unknown[], index: number): MutableRecord | null { + if (!isValidSynthesizedIndex(index)) return null; + while (output.length <= index) { + output.push({}); + } + const current = output[index]; + if (!isRecord(current)) { + output[index] = {}; + } + return isRecord(output[index]) ? (output[index] as MutableRecord) : null; +} + +function ensureContentPartAtIndex(item: MutableRecord, index: number): MutableRecord | null { + if (!isValidSynthesizedIndex(index)) return null; + const content = Array.isArray(item.content) ? [...item.content] : []; + while (content.length <= index) { + content.push({}); + } + const current = content[index]; + if (!isRecord(current)) { + content[index] = {}; + } + item.content = content; + return isRecord(content[index]) ? (content[index] as MutableRecord) : null; +} + +function applyAccumulatedOutputText(response: MutableRecord, state: ParsedResponseState): void { + if (state.outputText.size === 0) return; + const output = Array.isArray(response.output) ? [...response.output] : []; + + for (const [key, text] of state.outputText.entries()) { + const [outputIndexText, contentIndexText] = key.split(":"); + const outputIndex = Number.parseInt(outputIndexText ?? "", 10); + const contentIndex = Number.parseInt(contentIndexText ?? "", 10); + if ( + !isValidSynthesizedIndex(outputIndex) || + !isValidSynthesizedIndex(contentIndex) + ) { + continue; + } + const item = ensureOutputItemAtIndex(output, outputIndex); + if (!item) continue; + const part = ensureContentPartAtIndex(item, contentIndex); + if (!part) continue; + if (!getStringField(part, "type")) { + part.type = "output_text"; + } + if (typeof part.text === "string") { + setPhaseTextSegment(state, part.phase, key, part.text); + continue; + } + part.text = text; + } + + if (output.length > 0) { + response.output = output; + } +} + +function mergeOutputItemsIntoResponse(response: MutableRecord, state: ParsedResponseState): void { + if (state.outputItems.size === 0) return; + const output = Array.isArray(response.output) ? [...response.output] : []; + + for (const [outputIndex, item] of state.outputItems.entries()) { + if (!isValidSynthesizedIndex(outputIndex)) continue; + while (output.length <= outputIndex) { + output.push({}); + } + output[outputIndex] = mergeRecord(toMutableRecord(output[outputIndex]), item); + } + + response.output = output; +} + +function collectMessageOutputText(output: unknown[]): string { + return output + .filter(isRecord) + .map((item) => { + if (item.type !== "message") return ""; + const content = Array.isArray(item.content) ? item.content : []; + return content + .filter(isRecord) + .map((part) => { + if (part.type !== "output_text") return ""; + return typeof part.text === "string" ? part.text : ""; + }) + .join(""); + }) + .filter((text) => text.length > 0) + .join(""); +} + +function collectReasoningSummaryText(output: unknown[]): string { + return output + .filter(isRecord) + .map((item) => { + if (item.type !== "reasoning") return ""; + const summary = Array.isArray(item.summary) ? item.summary : []; + return summary + .filter(isRecord) + .map((part) => (typeof part.text === "string" ? part.text : "")) + .filter((text) => text.length > 0) + .join("\n\n"); + }) + .filter((text) => text.length > 0) + .join("\n\n"); +} + +function applyReasoningSummaries(response: MutableRecord, state: ParsedResponseState): void { + if (state.reasoningSummaryText.size === 0) return; + const output = Array.isArray(response.output) ? [...response.output] : []; + + for (const [key, text] of state.reasoningSummaryText.entries()) { + const [outputIndexText, summaryIndexText] = key.split(":"); + const outputIndex = Number.parseInt(outputIndexText ?? "", 10); + const summaryIndex = Number.parseInt(summaryIndexText ?? "", 10); + if ( + !isValidSynthesizedIndex(outputIndex) || + !isValidSynthesizedIndex(summaryIndex) + ) { + continue; + } + const item = ensureOutputItemAtIndex(output, outputIndex); + if (!item) continue; + const summary = Array.isArray(item.summary) ? [...item.summary] : []; + while (summary.length <= summaryIndex) { + summary.push({}); + } + const current = summary[summaryIndex]; + const nextPart = isRecord(current) ? { ...current } : {}; + if (!getStringField(nextPart, "type")) { + nextPart.type = "summary_text"; + } + if (typeof nextPart.text === "string") { + continue; + } + nextPart.text = text; + summary[summaryIndex] = nextPart; + item.summary = summary; + if (!getStringField(item, "type")) { + item.type = "reasoning"; + } + } + + if (output.length > 0) { + response.output = output; + } +} + +function finalizeParsedResponse(state: ParsedResponseState): MutableRecord | null { + const response = state.finalResponse ? { ...state.finalResponse } : null; + if (!response) return null; + if (state.encounteredError) return null; + + mergeOutputItemsIntoResponse(response, state); + applyAccumulatedOutputText(response, state); + applyReasoningSummaries(response, state); + + const output = Array.isArray(response.output) ? response.output : []; + if (typeof response.output_text !== "string") { + const outputText = collectMessageOutputText(output); + if (outputText.length > 0) { + response.output_text = outputText; + } + } + + const reasoningSummaryText = collectReasoningSummaryText(output); + if ( + reasoningSummaryText.length > 0 && + typeof response.reasoning_summary_text !== "string" + ) { + response.reasoning_summary_text = reasoningSummaryText; + } + + if (state.lastPhase && typeof response.phase !== "string") { + response.phase = state.lastPhase; + } + + if (state.phaseText.size > 0) { + const phaseText: MutableRecord = {}; + for (const [phase, text] of state.phaseText.entries()) { + phaseText[phase] = text; + if (phase === "commentary") response.commentary_text = text; + if (phase === "final_answer") response.final_answer_text = text; + } + response.phase_text = phaseText; + } + + return response; +} function extractResponseId(response: unknown): string | null { if (!response || typeof response !== "object") return null; @@ -17,11 +497,13 @@ function extractResponseId(response: unknown): string | null { } function notifyResponseId( + state: ParsedResponseState, onResponseId: ((responseId: string) => void) | undefined, response: unknown, ): void { const responseId = extractResponseId(response); - if (!responseId || !onResponseId) return; + if (!responseId || !onResponseId || state.seenResponseIds.has(responseId)) return; + state.seenResponseIds.add(responseId); try { onResponseId(responseId); } catch (error) { @@ -32,28 +514,106 @@ function notifyResponseId( } } -type CapturedResponseEvent = - | { kind: "error" } - | { kind: "response"; response: unknown } - | null; - function maybeCaptureResponseEvent( + state: ParsedResponseState, data: SSEEventData, onResponseId?: (responseId: string) => void, -): CapturedResponseEvent { +): void { if (data.type === "error") { log.error("SSE error event received", { error: data }); - return { kind: "error" }; + state.encounteredError = true; + return; } if (data.type === "response.done" || data.type === "response.completed") { - notifyResponseId(onResponseId, data.response); - if (data.response !== undefined && data.response !== null) { - return { kind: "response", response: data.response }; + if (isRecord(data.response)) { + state.finalResponse = { ...data.response }; } + notifyResponseId(state, onResponseId, data.response); + return; } - return null; + const eventRecord = toMutableRecord(data); + if (!eventRecord) return; + const outputIndex = getNumberField(eventRecord, "output_index"); + + if (data.type === "response.output_item.added" || data.type === "response.output_item.done") { + upsertOutputItem(state, outputIndex, eventRecord.item); + return; + } + + if (data.type === "response.output_text.delta") { + appendOutputTextValue( + state, + outputIndex, + getNumberField(eventRecord, "content_index"), + getDeltaField(eventRecord, "delta"), + eventRecord.phase, + ); + return; + } + + if (data.type === "response.output_text.done") { + setOutputTextValue( + state, + outputIndex, + getNumberField(eventRecord, "content_index"), + getStringField(eventRecord, "text"), + eventRecord.phase, + ); + return; + } + + if (data.type === "response.content_part.added" || data.type === "response.content_part.done") { + const part = toMutableRecord(eventRecord.part); + if (!part || getStringField(part, "type") !== "output_text") { + capturePhase(state, part?.phase); + return; + } + setOutputTextValue( + state, + outputIndex, + getNumberField(eventRecord, "content_index"), + getPartText(part), + part.phase, + ); + return; + } + + if (data.type === "response.reasoning_summary_text.delta") { + appendReasoningSummaryValue( + state, + outputIndex, + getNumberField(eventRecord, "summary_index"), + getDeltaField(eventRecord, "delta"), + ); + return; + } + + if (data.type === "response.reasoning_summary_text.done") { + setReasoningSummaryValue( + state, + outputIndex, + getNumberField(eventRecord, "summary_index"), + getStringField(eventRecord, "text"), + ); + return; + } + + if ( + data.type === "response.reasoning_summary_part.added" || + data.type === "response.reasoning_summary_part.done" + ) { + setReasoningSummaryValue( + state, + outputIndex, + getNumberField(eventRecord, "summary_index"), + getPartText(eventRecord.part), + ); + return; + } + + capturePhase(state, eventRecord.phase); } /** @@ -67,6 +627,7 @@ function parseSseStream( onResponseId?: (responseId: string) => void, ): unknown | null { const lines = sseText.split(/\r?\n/); + const state = createParsedResponseState(); for (const line of lines) { const trimmedLine = line.trim(); @@ -75,16 +636,15 @@ function parseSseStream( if (!payload || payload === '[DONE]') continue; try { const data = JSON.parse(payload) as SSEEventData; - const capturedEvent = maybeCaptureResponseEvent(data, onResponseId); - if (capturedEvent?.kind === "error") return null; - if (capturedEvent?.kind === "response") return capturedEvent.response; + maybeCaptureResponseEvent(state, data, onResponseId); + if (state.encounteredError) return null; } catch { // Skip malformed JSON } } } - return null; + return finalizeParsedResponse(state); } /** @@ -133,7 +693,9 @@ export async function convertSseToJson( if (!finalResponse) { log.warn("Could not find final response in SSE stream"); - logRequest("stream-error", { error: "No response.done event found" }); + logRequest("stream-error", { + error: "No terminal response event found in SSE stream", + }); // Return original stream if we can't parse return new Response(fullText, { @@ -173,10 +735,10 @@ function createResponseIdCapturingStream( ): ReadableStream { const decoder = new TextDecoder(); let bufferedText = ""; - let sawErrorEvent = false; + const state = createParsedResponseState(); const processBufferedLines = (flush = false): void => { - if (sawErrorEvent) return; + if (state.encounteredError) return; const lines = bufferedText.split(/\r?\n/); if (!flush) { bufferedText = lines.pop() ?? ""; @@ -191,11 +753,8 @@ function createResponseIdCapturingStream( if (!payload || payload === "[DONE]") continue; try { const data = JSON.parse(payload) as SSEEventData; - const capturedEvent = maybeCaptureResponseEvent(data, onResponseId); - if (capturedEvent?.kind === "error") { - sawErrorEvent = true; - break; - } + maybeCaptureResponseEvent(state, data, onResponseId); + if (state.encounteredError) break; } catch { // Ignore malformed SSE lines and keep forwarding the raw stream. } @@ -244,7 +803,7 @@ async function readWithTimeout( timeoutId = setTimeout(() => { reject( new Error( - `SSE stream stalled for ${timeoutMs}ms while waiting for response.done`, + `SSE stream stalled for ${timeoutMs}ms while waiting for a terminal response event`, ), ); }, timeoutMs); diff --git a/lib/session-affinity.ts b/lib/session-affinity.ts index 9a90950f..1ce27e30 100644 --- a/lib/session-affinity.ts +++ b/lib/session-affinity.ts @@ -98,6 +98,14 @@ export class SessionAffinityStore { * This method does not create a new affinity entry; callers that need to * upsert continuation state should use `rememberWithResponseId`. */ + rememberLastResponseId( + sessionKey: string | null | undefined, + responseId: string | null | undefined, + now = Date.now(), + ): void { + this.updateLastResponseId(sessionKey, responseId, now); + } + updateLastResponseId( sessionKey: string | null | undefined, responseId: string | null | undefined, diff --git a/test/fetch-helpers.test.ts b/test/fetch-helpers.test.ts index 518a725c..8efc73c1 100644 --- a/test/fetch-helpers.test.ts +++ b/test/fetch-helpers.test.ts @@ -743,6 +743,26 @@ describe('createEntitlementErrorResponse', () => { const text = await result.text(); expect(text).toBe('stream body'); }); + + it('captures response ids from streaming semantic SSE without rewriting the stream', async () => { + const onResponseId = vi.fn(); + const response = new Response( + [ + 'data: {"type":"response.created","response":{"id":"resp_stream_123"}}', + '', + 'data: {"type":"response.done","response":{"id":"resp_stream_123"}}', + '', + ].join('\n'), + { status: 200, headers: new Headers({ 'content-type': 'text/event-stream' }) }, + ); + + const result = await handleSuccessResponse(response, true, { onResponseId }); + const text = await result.text(); + + expect(text).toContain('"resp_stream_123"'); + expect(onResponseId).toHaveBeenCalledWith('resp_stream_123'); + expect(onResponseId).toHaveBeenCalledTimes(1); + }); }); describe('handleErrorResponse error normalization', () => { diff --git a/test/response-handler.test.ts b/test/response-handler.test.ts index 3e0fb0dd..f372f7c3 100644 --- a/test/response-handler.test.ts +++ b/test/response-handler.test.ts @@ -66,6 +66,477 @@ data: {"type":"response.completed","response":{"id":"resp_456","output":"done"}} expect(body).toEqual({ id: 'resp_456', output: 'done' }); }); + it('synthesizes output_text and reasoning summaries from semantic SSE events', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_semantic_123","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_123","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"Hello ","phase":"final_answer"}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"world","phase":"final_answer"}', + 'data: {"type":"response.output_text.done","output_index":0,"content_index":0,"text":"Hello world","phase":"final_answer"}', + 'data: {"type":"response.output_item.added","output_index":1,"item":{"id":"rs_123","type":"reasoning"}}', + 'data: {"type":"response.reasoning_summary_text.delta","output_index":1,"summary_index":0,"delta":"Need more context."}', + 'data: {"type":"response.reasoning_summary_text.done","output_index":1,"summary_index":0,"text":"Need more context."}', + 'data: {"type":"response.completed","response":{"id":"resp_semantic_123","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + id: string; + output?: Array<{ + type?: string; + role?: string; + phase?: string; + content?: Array<{ type?: string; text?: string }>; + summary?: Array<{ type?: string; text?: string }>; + }>; + output_text?: string; + reasoning_summary_text?: string; + phase?: string; + final_answer_text?: string; + phase_text?: Record; + }; + + expect(body.id).toBe('resp_semantic_123'); + expect(body.output_text).toBe('Hello world'); + expect(body.reasoning_summary_text).toBe('Need more context.'); + expect(body.phase).toBe('final_answer'); + expect(body.final_answer_text).toBe('Hello world'); + expect(body.phase_text).toEqual({ final_answer: 'Hello world' }); + expect(body.output?.[0]?.content?.[0]).toEqual({ + type: 'output_text', + text: 'Hello world', + }); + expect(body.output?.[1]?.summary?.[0]).toEqual({ + type: 'summary_text', + text: 'Need more context.', + }); + }); + + it('preserves canonical terminal reasoning_summary_text over synthesized semantic text', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_semantic_canonical","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":1,"item":{"id":"rs_456","type":"reasoning"}}', + 'data: {"type":"response.reasoning_summary_text.delta","output_index":1,"summary_index":0,"delta":"Draft summary"}', + 'data: {"type":"response.reasoning_summary_text.done","output_index":1,"summary_index":0,"text":"Draft summary"}', + 'data: {"type":"response.completed","response":{"id":"resp_semantic_canonical","object":"response","reasoning_summary_text":"Canonical summary"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + reasoning_summary_text?: string; + output?: Array<{ summary?: Array<{ text?: string }> }>; + }; + + expect(body.reasoning_summary_text).toBe('Canonical summary'); + expect(body.output?.[1]?.summary?.[0]?.text).toBe('Draft summary'); + }); + + it('preserves canonical terminal reasoning summary parts over synthesized semantic text', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_semantic_part_canonical","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":1,"item":{"id":"rs_789","type":"reasoning"}}', + 'data: {"type":"response.reasoning_summary_part.added","output_index":1,"summary_index":0,"part":{"text":"Draft summary"}}', + 'data: {"type":"response.reasoning_summary_part.done","output_index":1,"summary_index":0,"part":{"text":"Draft summary"}}', + 'data: {"type":"response.completed","response":{"id":"resp_semantic_part_canonical","object":"response","output":[{},{"type":"reasoning","summary":[{"type":"summary_text","text":"Canonical summary part"}]}]}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + reasoning_summary_text?: string; + output?: Array<{ summary?: Array<{ type?: string; text?: string }> }>; + }; + + expect(body.reasoning_summary_text).toBe('Canonical summary part'); + expect(body.output?.[1]?.summary?.[0]).toEqual({ + type: 'summary_text', + text: 'Canonical summary part', + }); + }); + + it('synthesizes reasoning summaries from part events', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_summary_part","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":1,"item":{"id":"rs_part","type":"reasoning"}}', + 'data: {"type":"response.reasoning_summary_part.added","output_index":1,"summary_index":0,"part":{"text":"Draft summary"}}', + 'data: {"type":"response.reasoning_summary_part.done","output_index":1,"summary_index":0,"part":{"text":"Need more context."}}', + 'data: {"type":"response.done","response":{"id":"resp_summary_part","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + reasoning_summary_text?: string; + output?: Array<{ summary?: Array<{ type?: string; text?: string }> }>; + }; + + expect(body.reasoning_summary_text).toBe('Need more context.'); + expect(body.output?.[1]?.summary?.[0]).toEqual({ + type: 'summary_text', + text: 'Need more context.', + }); + }); + + it('preserves canonical terminal reasoning summary text over synthesized summary deltas', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_semantic_nested","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":1,"item":{"id":"rs_nested","type":"reasoning"}}', + 'data: {"type":"response.reasoning_summary_text.delta","output_index":1,"summary_index":0,"delta":"Draft nested summary"}', + 'data: {"type":"response.reasoning_summary_text.done","output_index":1,"summary_index":0,"text":"Draft nested summary"}', + 'data: {"type":"response.completed","response":{"id":"resp_semantic_nested","object":"response","output":[{},{"id":"rs_nested","type":"reasoning","summary":[{"type":"summary_text","text":"Canonical nested summary"}]}]}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + reasoning_summary_text?: string; + output?: Array<{ summary?: Array<{ text?: string }> }>; + }; + + expect(body.output?.[1]?.summary?.[0]?.text).toBe('Canonical nested summary'); + expect(body.reasoning_summary_text).toBe('Canonical nested summary'); + }); + + it('synthesizes output text from content_part events', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_content_part","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_part","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.content_part.added","output_index":0,"content_index":0,"part":{"type":"output_text","text":"Hello ","phase":"final_answer"}}', + 'data: {"type":"response.content_part.done","output_index":0,"content_index":0,"part":{"type":"output_text","text":"Hello world","phase":"final_answer"}}', + 'data: {"type":"response.done","response":{"id":"resp_content_part","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + output_text?: string; + final_answer_text?: string; + phase_text?: Record; + output?: Array<{ content?: Array<{ type?: string; text?: string }> }>; + }; + + expect(body.output?.[0]?.content?.[0]).toEqual({ + type: 'output_text', + text: 'Hello world', + }); + expect(body.output_text).toBe('Hello world'); + expect(body.final_answer_text).toBe('Hello world'); + expect(body.phase_text).toEqual({ final_answer: 'Hello world' }); + }); + + it('preserves whitespace-only semantic deltas when no done events override them', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_whitespace_delta","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_space","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"Hello","phase":"final_answer"}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":" ","phase":"final_answer"}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"world","phase":"final_answer"}', + 'data: {"type":"response.output_item.added","output_index":1,"item":{"id":"rs_space","type":"reasoning"}}', + 'data: {"type":"response.reasoning_summary_text.delta","output_index":1,"summary_index":0,"delta":"Need"}', + 'data: {"type":"response.reasoning_summary_text.delta","output_index":1,"summary_index":0,"delta":" "}', + 'data: {"type":"response.reasoning_summary_text.delta","output_index":1,"summary_index":0,"delta":"context."}', + 'data: {"type":"response.done","response":{"id":"resp_whitespace_delta","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + output_text?: string; + final_answer_text?: string; + reasoning_summary_text?: string; + output?: Array<{ + content?: Array<{ text?: string }>; + summary?: Array<{ text?: string }>; + }>; + }; + + expect(body.output?.[0]?.content?.[0]?.text).toBe('Hello world'); + expect(body.output_text).toBe('Hello world'); + expect(body.final_answer_text).toBe('Hello world'); + expect(body.output?.[1]?.summary?.[0]?.text).toBe('Need context.'); + expect(body.reasoning_summary_text).toBe('Need context.'); + }); + + it('preserves richer terminal output when semantic items arrive with empty content arrays', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_rich_123","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_123","type":"message","role":"assistant","content":[]}}', + 'data: {"type":"response.completed","response":{"id":"resp_rich_123","object":"response","output":[{"id":"msg_123","type":"message","role":"assistant","content":[{"type":"output_text","text":"Hello rich world"},{"type":"annotation","label":"kept"}]}]}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + id: string; + output?: Array<{ + content?: Array<{ type?: string; text?: string; label?: string }>; + }>; + }; + + expect(body.id).toBe('resp_rich_123'); + expect(body.output?.[0]?.content).toEqual([ + { type: 'output_text', text: 'Hello rich world' }, + { type: 'annotation', label: 'kept' }, + ]); + }); + + it('preserves canonical terminal content over accumulated deltas for the same slot', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_canonical_slot","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_canonical","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"Draft answer","phase":"final_answer"}', + 'data: {"type":"response.completed","response":{"id":"resp_canonical_slot","object":"response","output":[{"id":"msg_canonical","type":"message","role":"assistant","content":[{"type":"output_text","text":"Canonical answer"}]}]}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + output_text?: string; + final_answer_text?: string; + phase_text?: Record; + output?: Array<{ content?: Array<{ text?: string }> }>; + }; + + expect(body.output?.[0]?.content?.[0]?.text).toBe('Canonical answer'); + expect(body.output_text).toBe('Canonical answer'); + expect(body.final_answer_text).toBe('Canonical answer'); + expect(body.phase_text).toEqual({ final_answer: 'Canonical answer' }); + }); + + it('clears stale output_text deltas when done events omit canonical text', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_stale_delta","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_stale","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"Hello ","phase":"final_answer"}', + 'data: {"type":"response.output_text.done","output_index":0,"content_index":0,"text":" ","phase":"final_answer"}', + 'data: {"type":"response.done","response":{"id":"resp_stale_delta","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + output_text?: string; + final_answer_text?: string; + phase_text?: Record; + output?: Array<{ content?: Array<{ text?: string }> }>; + }; + + expect(body.output?.[0]?.content).toBeUndefined(); + expect(body.output_text).toBeUndefined(); + expect(body.final_answer_text).toBeUndefined(); + expect(body.phase_text).toBeUndefined(); + }); + + it('clears stale reasoning_summary_text deltas when done events omit canonical text', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_stale_reasoning","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":1,"item":{"id":"rs_stale","type":"reasoning"}}', + 'data: {"type":"response.reasoning_summary_text.delta","output_index":1,"summary_index":0,"delta":"Need more context"}', + 'data: {"type":"response.reasoning_summary_text.done","output_index":1,"summary_index":0,"text":" "}', + 'data: {"type":"response.done","response":{"id":"resp_stale_reasoning","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + reasoning_summary_text?: string; + output?: Array<{ summary?: Array<{ text?: string }> }>; + }; + + expect(body.output?.[1]?.summary).toBeUndefined(); + expect(body.reasoning_summary_text).toBeUndefined(); + }); + + it('tracks commentary and final_answer phase text separately when phase labels are present', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_phase_123","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_123","type":"message","role":"assistant","phase":"commentary"}}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"Thinking...","phase":"commentary"}', + 'data: {"type":"response.output_text.done","output_index":0,"content_index":0,"text":"Thinking...","phase":"commentary"}', + 'data: {"type":"response.output_item.done","output_index":0,"item":{"id":"msg_123","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.output_text.done","output_index":0,"content_index":1,"text":"Done.","phase":"final_answer"}', + 'data: {"type":"response.done","response":{"id":"resp_phase_123","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + phase?: string; + commentary_text?: string; + final_answer_text?: string; + phase_text?: Record; + output_text?: string; + }; + + expect(body.phase).toBe('final_answer'); + expect(body.commentary_text).toBe('Thinking...'); + expect(body.final_answer_text).toBe('Done.'); + expect(body.phase_text).toEqual({ + commentary: 'Thinking...', + final_answer: 'Done.', + }); + expect(body.output_text).toBe('Thinking...Done.'); + }); + + it('replaces phase text when output_text.done corrects earlier deltas', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_phase_fix","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_fix","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"Hellp","phase":"final_answer"}', + 'data: {"type":"response.output_text.done","output_index":0,"content_index":0,"text":"Hello","phase":"final_answer"}', + 'data: {"type":"response.done","response":{"id":"resp_phase_fix","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + output_text?: string; + final_answer_text?: string; + phase_text?: Record; + output?: Array<{ content?: Array<{ text?: string }> }>; + }; + + expect(body.output?.[0]?.content?.[0]?.text).toBe('Hello'); + expect(body.output_text).toBe('Hello'); + expect(body.final_answer_text).toBe('Hello'); + expect(body.phase_text).toEqual({ final_answer: 'Hello' }); + }); + + it('replaces phase text when output_text.done omits phase after earlier deltas set it', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_phase_fix_missing","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_fix_missing","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"Hellp","phase":"final_answer"}', + 'data: {"type":"response.output_text.done","output_index":0,"content_index":0,"text":"Hello"}', + 'data: {"type":"response.done","response":{"id":"resp_phase_fix_missing","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + output_text?: string; + final_answer_text?: string; + phase_text?: Record; + output?: Array<{ content?: Array<{ text?: string }> }>; + }; + + expect(body.output?.[0]?.content?.[0]?.text).toBe('Hello'); + expect(body.output_text).toBe('Hello'); + expect(body.final_answer_text).toBe('Hello'); + expect(body.phase_text).toEqual({ final_answer: 'Hello' }); + }); + + it('handles response.content_part.added for output_text parts', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_content_part_added","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_part_added","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.content_part.added","output_index":0,"content_index":0,"part":{"type":"output_text","text":"Hello from added part","phase":"final_answer"}}', + 'data: {"type":"response.done","response":{"id":"resp_content_part_added","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + output_text?: string; + final_answer_text?: string; + phase_text?: Record; + output?: Array<{ content?: Array<{ type?: string; text?: string }> }>; + }; + + expect(body.output?.[0]?.content?.[0]).toEqual({ + type: 'output_text', + text: 'Hello from added part', + }); + expect(body.output_text).toBe('Hello from added part'); + expect(body.final_answer_text).toBe('Hello from added part'); + expect(body.phase_text).toEqual({ final_answer: 'Hello from added part' }); + }); + + it('handles response.content_part.done for output_text parts', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_content_part_done","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_part_done","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.content_part.done","output_index":0,"content_index":0,"part":{"type":"output_text","text":"Hello from done part","phase":"final_answer"}}', + 'data: {"type":"response.done","response":{"id":"resp_content_part_done","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + output_text?: string; + final_answer_text?: string; + phase_text?: Record; + output?: Array<{ content?: Array<{ type?: string; text?: string }> }>; + }; + + expect(body.output?.[0]?.content?.[0]).toEqual({ + type: 'output_text', + text: 'Hello from done part', + }); + expect(body.output_text).toBe('Hello from done part'); + expect(body.final_answer_text).toBe('Hello from done part'); + expect(body.phase_text).toEqual({ final_answer: 'Hello from done part' }); + }); + + it('captures phase from non-output_text content parts without mutating output text', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_content_part_annotation","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_part_annotation","type":"message","role":"assistant"}}', + 'data: {"type":"response.content_part.added","output_index":0,"content_index":0,"part":{"type":"annotation","text":"ignored","phase":"commentary"}}', + 'data: {"type":"response.done","response":{"id":"resp_content_part_annotation","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + phase?: string; + output_text?: string; + phase_text?: Record; + output?: Array<{ content?: Array<{ type?: string; text?: string }> }>; + }; + + expect(body.phase).toBe('commentary'); + expect(body.output?.[0]?.content).toBeUndefined(); + expect(body.output_text).toBeUndefined(); + expect(body.phase_text).toBeUndefined(); + }); + it('should return original text if no final response found', async () => { const sseContent = `data: {"type":"response.started"} data: {"type":"chunk","delta":"text"} @@ -118,7 +589,11 @@ data: {"type":"response.done","response":{"id":"resp_789"}} it('should report the final response id while converting SSE to JSON', async () => { const onResponseId = vi.fn(); - const sseContent = `data: {"type":"response.done","response":{"id":"resp_123","output":"test"}}`; + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_123","object":"response"}}', + 'data: {"type":"response.done","response":{"id":"resp_123","output":"test"}}', + '', + ].join('\n'); const response = new Response(sseContent); const headers = new Headers(); @@ -127,11 +602,14 @@ data: {"type":"response.done","response":{"id":"resp_789"}} expect(body).toEqual({ id: 'resp_123', output: 'test' }); expect(onResponseId).toHaveBeenCalledWith('resp_123'); + expect(onResponseId).toHaveBeenCalledTimes(1); }); it('should return the raw SSE text when an error event arrives before response.done', async () => { const onResponseId = vi.fn(); const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_bad_123","object":"response"}}', + '', 'data: {"type":"error","message":"quota exceeded"}', '', 'data: {"type":"response.done","response":{"id":"resp_bad_123","output":"bad"}}', @@ -147,6 +625,77 @@ data: {"type":"response.done","response":{"id":"resp_789"}} expect(onResponseId).not.toHaveBeenCalled(); }); + it('should return the raw SSE text when a stream ends after response.created without a terminal event', async () => { + const onResponseId = vi.fn(); + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_partial_123","object":"response"}}', + '', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"partial"}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers, { onResponseId }); + const text = await result.text(); + + expect(text).toBe(sseContent); + expect(onResponseId).not.toHaveBeenCalled(); + }); + + it('ignores oversized semantic indices instead of building sparse output arrays', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_guarded_indices","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":1000000,"item":{"id":"msg_big","type":"message","role":"assistant"}}', + 'data: {"type":"response.output_text.done","output_index":1000000,"content_index":1000000,"text":"ignored"}', + 'data: {"type":"response.reasoning_summary_part.done","output_index":1000000,"summary_index":1000000,"part":{"text":"ignored"}}', + 'data: {"type":"response.done","response":{"id":"resp_guarded_indices","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + id: string; + object: string; + output?: unknown[]; + output_text?: string; + reasoning_summary_text?: string; + }; + + expect(body).toEqual({ + id: 'resp_guarded_indices', + object: 'response', + }); + expect(body.output).toBeUndefined(); + expect(body.output_text).toBeUndefined(); + expect(body.reasoning_summary_text).toBeUndefined(); + }); + + it('ignores delta events with missing output_index', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_no_index","object":"response"}}', + 'data: {"type":"response.output_text.delta","content_index":0,"delta":"orphan"}', + 'data: {"type":"response.reasoning_summary_text.delta","summary_index":0,"delta":"orphan"}', + 'data: {"type":"response.done","response":{"id":"resp_no_index","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + id: string; + output_text?: string; + reasoning_summary_text?: string; + }; + + expect(body.id).toBe('resp_no_index'); + expect(body.output_text).toBeUndefined(); + expect(body.reasoning_summary_text).toBeUndefined(); + }); + it('should throw error if SSE stream exceeds size limit', async () => { const largeContent = 'a'.repeat(20 * 1024 * 1024 + 1); const response = new Response(largeContent); @@ -217,6 +766,7 @@ data: {"type":"response.done","response":{"id":"resp_789"}} const text = await captured.text(); expect(text).toBe(sseContent); + expect(onResponseId).toHaveBeenCalledTimes(1); expect(onResponseId).toHaveBeenCalledWith('resp_stream_123'); expect(captured.headers.get('content-type')).toBe('text/event-stream'); });