From 37b768fb64e5a99331e44717f2405abd8fc7110c Mon Sep 17 00:00:00 2001 From: ndycode Date: Sun, 22 Mar 2026 16:02:34 +0800 Subject: [PATCH 01/14] enhance responses parser for semantic SSE events --- lib/request/response-handler.ts | 438 ++++++++++++++++++++++++++++++-- test/fetch-helpers.test.ts | 19 ++ test/response-handler.test.ts | 82 ++++++ 3 files changed, 515 insertions(+), 24 deletions(-) diff --git a/lib/request/response-handler.ts b/lib/request/response-handler.ts index 861be8ab..a36f89ae 100644 --- a/lib/request/response-handler.ts +++ b/lib/request/response-handler.ts @@ -1,5 +1,6 @@ import { createLogger, logRequest, LOGGING_ENABLED } from "../logger.js"; import { PLUGIN_NAME } from "../constants.js"; +import { isRecord } from "../utils.js"; import type { SSEEventData } from "../types.js"; @@ -8,6 +9,322 @@ const log = createLogger("response-handler"); const MAX_SSE_SIZE = 10 * 1024 * 1024; // 10MB limit to prevent memory exhaustion const DEFAULT_STREAM_STALL_TIMEOUT_MS = 45_000; +type MutableRecord = Record; + +interface ParsedResponseState { + finalResponse: MutableRecord | null; + lastPhase: string | null; + outputItems: Map; + outputText: Map; + phaseText: Map; + reasoningSummaryText: Map; +} + +function createParsedResponseState(): ParsedResponseState { + return { + finalResponse: null, + lastPhase: null, + outputItems: new Map(), + outputText: new Map(), + phaseText: new Map(), + reasoningSummaryText: new Map(), + }; +} + +function toMutableRecord(value: unknown): MutableRecord | null { + return isRecord(value) ? { ...value } : null; +} + +function getNumberField(record: MutableRecord, key: string): number | null { + const value = record[key]; + return typeof value === "number" && Number.isFinite(value) ? value : null; +} + +function getStringField(record: MutableRecord, key: string): string | null { + const value = record[key]; + return typeof value === "string" && value.trim().length > 0 ? value : null; +} + +function cloneContentArray(content: unknown): MutableRecord[] { + if (!Array.isArray(content)) return []; + return content.filter(isRecord).map((part) => ({ ...part })); +} + +function mergeRecord(base: MutableRecord | null, update: MutableRecord): MutableRecord { + if (!base) return { ...update }; + const merged: MutableRecord = { ...base, ...update }; + if ("content" in update || "content" in base) { + merged.content = cloneContentArray(update.content ?? base.content); + } + return merged; +} + +function makeOutputTextKey(outputIndex: number | null, contentIndex: number | null): string | null { + if (outputIndex === null || contentIndex === null) return null; + return `${outputIndex}:${contentIndex}`; +} + +function makeSummaryKey(outputIndex: number | null, summaryIndex: number | null): string | null { + if (outputIndex === null || summaryIndex === null) return null; + return `${outputIndex}:${summaryIndex}`; +} + +function getPartText(part: unknown): string | null { + if (!isRecord(part)) return null; + const text = getStringField(part, "text"); + if (text) return text; + return null; +} + +function capturePhase( + state: ParsedResponseState, + phase: unknown, + text: string | null = null, +): void { + if (typeof phase !== "string" || phase.trim().length === 0) return; + const normalizedPhase = phase.trim(); + state.lastPhase = normalizedPhase; + if (text && text.length > 0) { + const existing = state.phaseText.get(normalizedPhase) ?? ""; + state.phaseText.set(normalizedPhase, `${existing}${text}`); + } +} + +function upsertOutputItem(state: ParsedResponseState, outputIndex: number | null, item: unknown): void { + if (outputIndex === null || !isRecord(item)) return; + const current = state.outputItems.get(outputIndex) ?? null; + const merged = mergeRecord(current, item); + state.outputItems.set(outputIndex, merged); + capturePhase(state, merged.phase); +} + +function setOutputTextValue( + state: ParsedResponseState, + outputIndex: number | null, + contentIndex: number | null, + text: string | null, + phase: unknown = undefined, +): void { + if (!text) return; + const key = makeOutputTextKey(outputIndex, contentIndex); + if (!key) return; + const existing = state.outputText.get(key) ?? ""; + state.outputText.set(key, text); + const phaseDelta = existing.length > 0 && text.startsWith(existing) + ? text.slice(existing.length) + : existing === text + ? "" + : text; + capturePhase(state, phase, phaseDelta); +} + +function appendOutputTextValue( + state: ParsedResponseState, + outputIndex: number | null, + contentIndex: number | null, + delta: string | null, + phase: unknown = undefined, +): void { + if (!delta) return; + const key = makeOutputTextKey(outputIndex, contentIndex); + if (!key) return; + const existing = state.outputText.get(key) ?? ""; + state.outputText.set(key, `${existing}${delta}`); + capturePhase(state, phase, delta); +} + +function setReasoningSummaryValue( + state: ParsedResponseState, + outputIndex: number | null, + summaryIndex: number | null, + text: string | null, +): void { + if (!text) return; + const key = makeSummaryKey(outputIndex, summaryIndex); + if (!key) return; + state.reasoningSummaryText.set(key, text); +} + +function appendReasoningSummaryValue( + state: ParsedResponseState, + outputIndex: number | null, + summaryIndex: number | null, + delta: string | null, +): void { + if (!delta) return; + const key = makeSummaryKey(outputIndex, summaryIndex); + if (!key) return; + const existing = state.reasoningSummaryText.get(key) ?? ""; + state.reasoningSummaryText.set(key, `${existing}${delta}`); +} + +function ensureOutputItemAtIndex(output: unknown[], index: number): MutableRecord | null { + while (output.length <= index) { + output.push({}); + } + const current = output[index]; + if (!isRecord(current)) { + output[index] = {}; + } + return isRecord(output[index]) ? (output[index] as MutableRecord) : null; +} + +function ensureContentPartAtIndex(item: MutableRecord, index: number): MutableRecord | null { + const content = Array.isArray(item.content) ? [...item.content] : []; + while (content.length <= index) { + content.push({}); + } + const current = content[index]; + if (!isRecord(current)) { + content[index] = {}; + } + item.content = content; + return isRecord(content[index]) ? (content[index] as MutableRecord) : null; +} + +function applyAccumulatedOutputText(response: MutableRecord, state: ParsedResponseState): void { + if (state.outputText.size === 0) return; + const output = Array.isArray(response.output) ? [...response.output] : []; + + for (const [key, text] of state.outputText.entries()) { + const [outputIndexText, contentIndexText] = key.split(":"); + const outputIndex = Number.parseInt(outputIndexText ?? "", 10); + const contentIndex = Number.parseInt(contentIndexText ?? "", 10); + if (!Number.isFinite(outputIndex) || !Number.isFinite(contentIndex)) continue; + const item = ensureOutputItemAtIndex(output, outputIndex); + if (!item) continue; + const part = ensureContentPartAtIndex(item, contentIndex); + if (!part) continue; + if (!getStringField(part, "type")) { + part.type = "output_text"; + } + part.text = text; + } + + if (output.length > 0) { + response.output = output; + } +} + +function mergeOutputItemsIntoResponse(response: MutableRecord, state: ParsedResponseState): void { + if (state.outputItems.size === 0) return; + const output = Array.isArray(response.output) ? [...response.output] : []; + + for (const [outputIndex, item] of state.outputItems.entries()) { + while (output.length <= outputIndex) { + output.push({}); + } + output[outputIndex] = mergeRecord(toMutableRecord(output[outputIndex]), item); + } + + response.output = output; +} + +function collectMessageOutputText(output: unknown[]): string { + return output + .filter(isRecord) + .map((item) => { + if (item.type !== "message") return ""; + const content = Array.isArray(item.content) ? item.content : []; + return content + .filter(isRecord) + .map((part) => { + if (part.type !== "output_text") return ""; + return typeof part.text === "string" ? part.text : ""; + }) + .join(""); + }) + .filter((text) => text.length > 0) + .join(""); +} + +function collectReasoningSummaryText(output: unknown[]): string { + return output + .filter(isRecord) + .map((item) => { + if (item.type !== "reasoning") return ""; + const summary = Array.isArray(item.summary) ? item.summary : []; + return summary + .filter(isRecord) + .map((part) => (typeof part.text === "string" ? part.text : "")) + .filter((text) => text.length > 0) + .join("\n\n"); + }) + .filter((text) => text.length > 0) + .join("\n\n"); +} + +function applyReasoningSummaries(response: MutableRecord, state: ParsedResponseState): void { + if (state.reasoningSummaryText.size === 0) return; + const output = Array.isArray(response.output) ? [...response.output] : []; + + for (const [key, text] of state.reasoningSummaryText.entries()) { + const [outputIndexText, summaryIndexText] = key.split(":"); + const outputIndex = Number.parseInt(outputIndexText ?? "", 10); + const summaryIndex = Number.parseInt(summaryIndexText ?? "", 10); + if (!Number.isFinite(outputIndex) || !Number.isFinite(summaryIndex)) continue; + const item = ensureOutputItemAtIndex(output, outputIndex); + if (!item) continue; + const summary = Array.isArray(item.summary) ? [...item.summary] : []; + while (summary.length <= summaryIndex) { + summary.push({}); + } + const current = summary[summaryIndex]; + const nextPart = isRecord(current) ? { ...current } : {}; + if (!getStringField(nextPart, "type")) { + nextPart.type = "summary_text"; + } + nextPart.text = text; + summary[summaryIndex] = nextPart; + item.summary = summary; + if (!getStringField(item, "type")) { + item.type = "reasoning"; + } + } + + if (output.length > 0) { + response.output = output; + } +} + +function finalizeParsedResponse(state: ParsedResponseState): MutableRecord | null { + const response = state.finalResponse ? { ...state.finalResponse } : null; + if (!response) return null; + + mergeOutputItemsIntoResponse(response, state); + applyAccumulatedOutputText(response, state); + applyReasoningSummaries(response, state); + + const output = Array.isArray(response.output) ? response.output : []; + if (typeof response.output_text !== "string") { + const outputText = collectMessageOutputText(output); + if (outputText.length > 0) { + response.output_text = outputText; + } + } + + const reasoningSummaryText = collectReasoningSummaryText(output); + if (reasoningSummaryText.length > 0) { + response.reasoning_summary_text = reasoningSummaryText; + } + + if (state.lastPhase && typeof response.phase !== "string") { + response.phase = state.lastPhase; + } + + if (state.phaseText.size > 0) { + const phaseText: MutableRecord = {}; + for (const [phase, text] of state.phaseText.entries()) { + phaseText[phase] = text; + if (phase === "commentary") response.commentary_text = text; + if (phase === "final_answer") response.final_answer_text = text; + } + response.phase_text = phaseText; + } + + return response; +} + function extractResponseId(response: unknown): string | null { if (!response || typeof response !== "object") return null; const candidate = (response as { id?: unknown }).id; @@ -32,28 +349,106 @@ function notifyResponseId( } } -type CapturedResponseEvent = - | { kind: "error" } - | { kind: "response"; response: unknown } - | null; - function maybeCaptureResponseEvent( + state: ParsedResponseState, data: SSEEventData, onResponseId?: (responseId: string) => void, -): CapturedResponseEvent { +): void { if (data.type === "error") { log.error("SSE error event received", { error: data }); - return { kind: "error" }; + return; } - if (data.type === "response.done" || data.type === "response.completed") { + if (isRecord(data.response)) { + state.finalResponse = { ...data.response }; notifyResponseId(onResponseId, data.response); - if (data.response !== undefined && data.response !== null) { - return { kind: "response", response: data.response }; + } + + if (data.type === "response.done" || data.type === "response.completed") { + return; + } + + const eventRecord = toMutableRecord(data); + if (!eventRecord) return; + const outputIndex = getNumberField(eventRecord, "output_index"); + + if (data.type === "response.output_item.added" || data.type === "response.output_item.done") { + upsertOutputItem(state, outputIndex, eventRecord.item); + return; + } + + if (data.type === "response.output_text.delta") { + appendOutputTextValue( + state, + outputIndex, + getNumberField(eventRecord, "content_index"), + getStringField(eventRecord, "delta"), + eventRecord.phase, + ); + return; + } + + if (data.type === "response.output_text.done") { + setOutputTextValue( + state, + outputIndex, + getNumberField(eventRecord, "content_index"), + getStringField(eventRecord, "text"), + eventRecord.phase, + ); + return; + } + + if (data.type === "response.content_part.added" || data.type === "response.content_part.done") { + const part = toMutableRecord(eventRecord.part); + if (!part || getStringField(part, "type") !== "output_text") { + capturePhase(state, part?.phase); + return; } + setOutputTextValue( + state, + outputIndex, + getNumberField(eventRecord, "content_index"), + getPartText(part), + part.phase, + ); + return; } - return null; + if (data.type === "response.reasoning_summary_text.delta") { + appendReasoningSummaryValue( + state, + outputIndex, + getNumberField(eventRecord, "summary_index"), + getStringField(eventRecord, "delta"), + ); + return; + } + + if (data.type === "response.reasoning_summary_text.done") { + setReasoningSummaryValue( + state, + outputIndex, + getNumberField(eventRecord, "summary_index"), + getStringField(eventRecord, "text"), + ); + return; + } + + if ( + data.type === "response.reasoning_summary_part.added" || + data.type === "response.reasoning_summary_part.done" + ) { + setReasoningSummaryValue( + state, + outputIndex, + getNumberField(eventRecord, "summary_index"), + getPartText(eventRecord.part), + ); + return; + } + + capturePhase(state, eventRecord.phase); } /** @@ -67,6 +462,7 @@ function parseSseStream( onResponseId?: (responseId: string) => void, ): unknown | null { const lines = sseText.split(/\r?\n/); + const state = createParsedResponseState(); for (const line of lines) { const trimmedLine = line.trim(); @@ -75,16 +471,14 @@ function parseSseStream( if (!payload || payload === '[DONE]') continue; try { const data = JSON.parse(payload) as SSEEventData; - const capturedEvent = maybeCaptureResponseEvent(data, onResponseId); - if (capturedEvent?.kind === "error") return null; - if (capturedEvent?.kind === "response") return capturedEvent.response; + maybeCaptureResponseEvent(state, data, onResponseId); } catch { // Skip malformed JSON } } } - return null; + return finalizeParsedResponse(state); } /** @@ -133,7 +527,9 @@ export async function convertSseToJson( if (!finalResponse) { log.warn("Could not find final response in SSE stream"); - logRequest("stream-error", { error: "No response.done event found" }); + logRequest("stream-error", { + error: "No terminal response event found in SSE stream", + }); // Return original stream if we can't parse return new Response(fullText, { @@ -173,10 +569,8 @@ function createResponseIdCapturingStream( ): ReadableStream { const decoder = new TextDecoder(); let bufferedText = ""; - let sawErrorEvent = false; const processBufferedLines = (flush = false): void => { - if (sawErrorEvent) return; const lines = bufferedText.split(/\r?\n/); if (!flush) { bufferedText = lines.pop() ?? ""; @@ -191,11 +585,7 @@ function createResponseIdCapturingStream( if (!payload || payload === "[DONE]") continue; try { const data = JSON.parse(payload) as SSEEventData; - const capturedEvent = maybeCaptureResponseEvent(data, onResponseId); - if (capturedEvent?.kind === "error") { - sawErrorEvent = true; - break; - } + maybeCaptureResponseEvent(createParsedResponseState(), data, onResponseId); } catch { // Ignore malformed SSE lines and keep forwarding the raw stream. } @@ -244,7 +634,7 @@ async function readWithTimeout( timeoutId = setTimeout(() => { reject( new Error( - `SSE stream stalled for ${timeoutMs}ms while waiting for response.done`, + `SSE stream stalled for ${timeoutMs}ms while waiting for a terminal response event`, ), ); }, timeoutMs); diff --git a/test/fetch-helpers.test.ts b/test/fetch-helpers.test.ts index 518a725c..f90108e7 100644 --- a/test/fetch-helpers.test.ts +++ b/test/fetch-helpers.test.ts @@ -743,6 +743,25 @@ describe('createEntitlementErrorResponse', () => { const text = await result.text(); expect(text).toBe('stream body'); }); + + it('captures response ids from streaming semantic SSE without rewriting the stream', async () => { + const onResponseId = vi.fn(); + const response = new Response( + [ + 'data: {"type":"response.created","response":{"id":"resp_stream_123"}}', + '', + 'data: {"type":"response.done","response":{"id":"resp_stream_123"}}', + '', + ].join('\n'), + { status: 200, headers: new Headers({ 'content-type': 'text/event-stream' }) }, + ); + + const result = await handleSuccessResponse(response, true, { onResponseId }); + const text = await result.text(); + + expect(text).toContain('"resp_stream_123"'); + expect(onResponseId).toHaveBeenCalledWith('resp_stream_123'); + }); }); describe('handleErrorResponse error normalization', () => { diff --git a/test/response-handler.test.ts b/test/response-handler.test.ts index 3e0fb0dd..14c4d9a7 100644 --- a/test/response-handler.test.ts +++ b/test/response-handler.test.ts @@ -66,6 +66,88 @@ data: {"type":"response.completed","response":{"id":"resp_456","output":"done"}} expect(body).toEqual({ id: 'resp_456', output: 'done' }); }); + it('synthesizes output_text and reasoning summaries from semantic SSE events', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_semantic_123","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_123","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"Hello ","phase":"final_answer"}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"world","phase":"final_answer"}', + 'data: {"type":"response.output_text.done","output_index":0,"content_index":0,"text":"Hello world","phase":"final_answer"}', + 'data: {"type":"response.output_item.added","output_index":1,"item":{"id":"rs_123","type":"reasoning"}}', + 'data: {"type":"response.reasoning_summary_text.delta","output_index":1,"summary_index":0,"delta":"Need more context."}', + 'data: {"type":"response.reasoning_summary_text.done","output_index":1,"summary_index":0,"text":"Need more context."}', + 'data: {"type":"response.completed","response":{"id":"resp_semantic_123","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + id: string; + output?: Array<{ + type?: string; + role?: string; + phase?: string; + content?: Array<{ type?: string; text?: string }>; + summary?: Array<{ type?: string; text?: string }>; + }>; + output_text?: string; + reasoning_summary_text?: string; + phase?: string; + final_answer_text?: string; + phase_text?: Record; + }; + + expect(body.id).toBe('resp_semantic_123'); + expect(body.output_text).toBe('Hello world'); + expect(body.reasoning_summary_text).toBe('Need more context.'); + expect(body.phase).toBe('final_answer'); + expect(body.final_answer_text).toBe('Hello world'); + expect(body.phase_text).toEqual({ final_answer: 'Hello world' }); + expect(body.output?.[0]?.content?.[0]).toEqual({ + type: 'output_text', + text: 'Hello world', + }); + expect(body.output?.[1]?.summary?.[0]).toEqual({ + type: 'summary_text', + text: 'Need more context.', + }); + }); + + it('tracks commentary and final_answer phase text separately when phase labels are present', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_phase_123","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_123","type":"message","role":"assistant","phase":"commentary"}}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"Thinking...","phase":"commentary"}', + 'data: {"type":"response.output_text.done","output_index":0,"content_index":0,"text":"Thinking...","phase":"commentary"}', + 'data: {"type":"response.output_item.done","output_index":0,"item":{"id":"msg_123","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.output_text.done","output_index":0,"content_index":1,"text":"Done.","phase":"final_answer"}', + 'data: {"type":"response.done","response":{"id":"resp_phase_123","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + phase?: string; + commentary_text?: string; + final_answer_text?: string; + phase_text?: Record; + output_text?: string; + }; + + expect(body.phase).toBe('final_answer'); + expect(body.commentary_text).toBe('Thinking...'); + expect(body.final_answer_text).toBe('Done.'); + expect(body.phase_text).toEqual({ + commentary: 'Thinking...', + final_answer: 'Done.', + }); + expect(body.output_text).toBe('Thinking...Done.'); + }); + it('should return original text if no final response found', async () => { const sseContent = `data: {"type":"response.started"} data: {"type":"chunk","delta":"text"} From ad0443ba9d66b367a5b8a405d59c618aec8786ef Mon Sep 17 00:00:00 2001 From: ndycode <405533+ndycode@users.noreply.github.com> Date: Sun, 22 Mar 2026 18:41:04 +0800 Subject: [PATCH 02/14] Fix semantic response id capture --- lib/request/response-handler.ts | 17 +++++++++++++---- test/fetch-helpers.test.ts | 1 + test/response-handler.test.ts | 32 +++++++++++++++++++++++++++++++- 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/lib/request/response-handler.ts b/lib/request/response-handler.ts index a36f89ae..82219267 100644 --- a/lib/request/response-handler.ts +++ b/lib/request/response-handler.ts @@ -18,6 +18,7 @@ interface ParsedResponseState { outputText: Map; phaseText: Map; reasoningSummaryText: Map; + seenResponseIds: Set; } function createParsedResponseState(): ParsedResponseState { @@ -28,6 +29,7 @@ function createParsedResponseState(): ParsedResponseState { outputText: new Map(), phaseText: new Map(), reasoningSummaryText: new Map(), + seenResponseIds: new Set(), }; } @@ -54,7 +56,11 @@ function mergeRecord(base: MutableRecord | null, update: MutableRecord): Mutable if (!base) return { ...update }; const merged: MutableRecord = { ...base, ...update }; if ("content" in update || "content" in base) { - merged.content = cloneContentArray(update.content ?? base.content); + const updateContent = cloneContentArray(update.content); + merged.content = + updateContent.length > 0 || !("content" in base) + ? updateContent + : cloneContentArray(base.content); } return merged; } @@ -334,11 +340,13 @@ function extractResponseId(response: unknown): string | null { } function notifyResponseId( + state: ParsedResponseState, onResponseId: ((responseId: string) => void) | undefined, response: unknown, ): void { const responseId = extractResponseId(response); - if (!responseId || !onResponseId) return; + if (!responseId || !onResponseId || state.seenResponseIds.has(responseId)) return; + state.seenResponseIds.add(responseId); try { onResponseId(responseId); } catch (error) { @@ -361,7 +369,7 @@ function maybeCaptureResponseEvent( if (isRecord(data.response)) { state.finalResponse = { ...data.response }; - notifyResponseId(onResponseId, data.response); + notifyResponseId(state, onResponseId, data.response); } if (data.type === "response.done" || data.type === "response.completed") { @@ -569,6 +577,7 @@ function createResponseIdCapturingStream( ): ReadableStream { const decoder = new TextDecoder(); let bufferedText = ""; + const state = createParsedResponseState(); const processBufferedLines = (flush = false): void => { const lines = bufferedText.split(/\r?\n/); @@ -585,7 +594,7 @@ function createResponseIdCapturingStream( if (!payload || payload === "[DONE]") continue; try { const data = JSON.parse(payload) as SSEEventData; - maybeCaptureResponseEvent(createParsedResponseState(), data, onResponseId); + maybeCaptureResponseEvent(state, data, onResponseId); } catch { // Ignore malformed SSE lines and keep forwarding the raw stream. } diff --git a/test/fetch-helpers.test.ts b/test/fetch-helpers.test.ts index f90108e7..8efc73c1 100644 --- a/test/fetch-helpers.test.ts +++ b/test/fetch-helpers.test.ts @@ -761,6 +761,7 @@ describe('createEntitlementErrorResponse', () => { expect(text).toContain('"resp_stream_123"'); expect(onResponseId).toHaveBeenCalledWith('resp_stream_123'); + expect(onResponseId).toHaveBeenCalledTimes(1); }); }); diff --git a/test/response-handler.test.ts b/test/response-handler.test.ts index 14c4d9a7..d740e0f3 100644 --- a/test/response-handler.test.ts +++ b/test/response-handler.test.ts @@ -115,6 +115,31 @@ data: {"type":"response.completed","response":{"id":"resp_456","output":"done"}} }); }); + it('preserves richer terminal output when semantic items arrive with empty content arrays', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_rich_123","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_123","type":"message","role":"assistant","content":[]}}', + 'data: {"type":"response.completed","response":{"id":"resp_rich_123","object":"response","output":[{"id":"msg_123","type":"message","role":"assistant","content":[{"type":"output_text","text":"Hello rich world"},{"type":"annotation","label":"kept"}]}]}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + id: string; + output?: Array<{ + content?: Array<{ type?: string; text?: string; label?: string }>; + }>; + }; + + expect(body.id).toBe('resp_rich_123'); + expect(body.output?.[0]?.content).toEqual([ + { type: 'output_text', text: 'Hello rich world' }, + { type: 'annotation', label: 'kept' }, + ]); + }); + it('tracks commentary and final_answer phase text separately when phase labels are present', async () => { const sseContent = [ 'data: {"type":"response.created","response":{"id":"resp_phase_123","object":"response"}}', @@ -200,7 +225,11 @@ data: {"type":"response.done","response":{"id":"resp_789"}} it('should report the final response id while converting SSE to JSON', async () => { const onResponseId = vi.fn(); - const sseContent = `data: {"type":"response.done","response":{"id":"resp_123","output":"test"}}`; + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_123","object":"response"}}', + 'data: {"type":"response.done","response":{"id":"resp_123","output":"test"}}', + '', + ].join('\n'); const response = new Response(sseContent); const headers = new Headers(); @@ -209,6 +238,7 @@ data: {"type":"response.done","response":{"id":"resp_789"}} expect(body).toEqual({ id: 'resp_123', output: 'test' }); expect(onResponseId).toHaveBeenCalledWith('resp_123'); + expect(onResponseId).toHaveBeenCalledTimes(1); }); it('should return the raw SSE text when an error event arrives before response.done', async () => { From eb3d7a339d3e2eca5cd5bdb79eac41c47566a489 Mon Sep 17 00:00:00 2001 From: ndycode <405533+ndycode@users.noreply.github.com> Date: Sun, 22 Mar 2026 18:42:52 +0800 Subject: [PATCH 03/14] fix: tighten semantic SSE response handling --- lib/request/response-handler.ts | 67 +++++++++++++++++++++++++++------ test/response-handler.test.ts | 27 +++++++++++++ 2 files changed, 82 insertions(+), 12 deletions(-) diff --git a/lib/request/response-handler.ts b/lib/request/response-handler.ts index 82219267..3f5a485c 100644 --- a/lib/request/response-handler.ts +++ b/lib/request/response-handler.ts @@ -16,6 +16,7 @@ interface ParsedResponseState { lastPhase: string | null; outputItems: Map; outputText: Map; + phaseTextSegments: Map; phaseText: Map; reasoningSummaryText: Map; seenResponseIds: Set; @@ -27,6 +28,7 @@ function createParsedResponseState(): ParsedResponseState { lastPhase: null, outputItems: new Map(), outputText: new Map(), + phaseTextSegments: new Map(), phaseText: new Map(), reasoningSummaryText: new Map(), seenResponseIds: new Set(), @@ -70,6 +72,10 @@ function makeOutputTextKey(outputIndex: number | null, contentIndex: number | nu return `${outputIndex}:${contentIndex}`; } +function makePhaseTextSegmentKey(phase: string, outputTextKey: string): string { + return `${phase}\u0000${outputTextKey}`; +} + function makeSummaryKey(outputIndex: number | null, summaryIndex: number | null): string | null { if (outputIndex === null || summaryIndex === null) return null; return `${outputIndex}:${summaryIndex}`; @@ -85,15 +91,58 @@ function getPartText(part: unknown): string | null { function capturePhase( state: ParsedResponseState, phase: unknown, - text: string | null = null, +): void { + if (typeof phase !== "string" || phase.trim().length === 0) return; + state.lastPhase = phase.trim(); +} + +function syncPhaseText(state: ParsedResponseState, phase: string): void { + const prefix = `${phase}\u0000`; + const text = [...state.phaseTextSegments.entries()] + .filter(([key]) => key.startsWith(prefix)) + .map(([, value]) => value) + .join(""); + if (text.length === 0) { + state.phaseText.delete(phase); + return; + } + state.phaseText.set(phase, text); +} + +function setPhaseTextSegment( + state: ParsedResponseState, + phase: unknown, + outputTextKey: string, + text: string | null, ): void { if (typeof phase !== "string" || phase.trim().length === 0) return; const normalizedPhase = phase.trim(); state.lastPhase = normalizedPhase; - if (text && text.length > 0) { - const existing = state.phaseText.get(normalizedPhase) ?? ""; - state.phaseText.set(normalizedPhase, `${existing}${text}`); + const segmentKey = makePhaseTextSegmentKey(normalizedPhase, outputTextKey); + if (!text || text.length === 0) { + state.phaseTextSegments.delete(segmentKey); + syncPhaseText(state, normalizedPhase); + return; } + state.phaseTextSegments.set(segmentKey, text); + syncPhaseText(state, normalizedPhase); +} + +function appendPhaseTextSegment( + state: ParsedResponseState, + phase: unknown, + outputTextKey: string, + delta: string | null, +): void { + if (!delta || delta.length === 0 || typeof phase !== "string" || phase.trim().length === 0) { + return; + } + const normalizedPhase = phase.trim(); + state.lastPhase = normalizedPhase; + const segmentKey = makePhaseTextSegmentKey(normalizedPhase, outputTextKey); + const existing = state.phaseTextSegments.get(segmentKey) ?? ""; + state.phaseTextSegments.set(segmentKey, `${existing}${delta}`); + syncPhaseText(state, normalizedPhase); } function upsertOutputItem(state: ParsedResponseState, outputIndex: number | null, item: unknown): void { @@ -114,14 +163,8 @@ function setOutputTextValue( if (!text) return; const key = makeOutputTextKey(outputIndex, contentIndex); if (!key) return; - const existing = state.outputText.get(key) ?? ""; state.outputText.set(key, text); - const phaseDelta = existing.length > 0 && text.startsWith(existing) - ? text.slice(existing.length) - : existing === text - ? "" - : text; - capturePhase(state, phase, phaseDelta); + setPhaseTextSegment(state, phase, key, text); } function appendOutputTextValue( @@ -136,7 +179,7 @@ function appendOutputTextValue( if (!key) return; const existing = state.outputText.get(key) ?? ""; state.outputText.set(key, `${existing}${delta}`); - capturePhase(state, phase, delta); + appendPhaseTextSegment(state, phase, key, delta); } function setReasoningSummaryValue( diff --git a/test/response-handler.test.ts b/test/response-handler.test.ts index d740e0f3..a25386de 100644 --- a/test/response-handler.test.ts +++ b/test/response-handler.test.ts @@ -173,6 +173,32 @@ data: {"type":"response.completed","response":{"id":"resp_456","output":"done"}} expect(body.output_text).toBe('Thinking...Done.'); }); + it('replaces phase text when output_text.done corrects earlier deltas', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_phase_fix","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_fix","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"Hellp","phase":"final_answer"}', + 'data: {"type":"response.output_text.done","output_index":0,"content_index":0,"text":"Hello","phase":"final_answer"}', + 'data: {"type":"response.done","response":{"id":"resp_phase_fix","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + output_text?: string; + final_answer_text?: string; + phase_text?: Record; + output?: Array<{ content?: Array<{ text?: string }> }>; + }; + + expect(body.output?.[0]?.content?.[0]?.text).toBe('Hello'); + expect(body.output_text).toBe('Hello'); + expect(body.final_answer_text).toBe('Hello'); + expect(body.phase_text).toEqual({ final_answer: 'Hello' }); + }); + it('should return original text if no final response found', async () => { const sseContent = `data: {"type":"response.started"} data: {"type":"chunk","delta":"text"} @@ -329,6 +355,7 @@ data: {"type":"response.done","response":{"id":"resp_789"}} const text = await captured.text(); expect(text).toBe(sseContent); + expect(onResponseId).toHaveBeenCalledTimes(1); expect(onResponseId).toHaveBeenCalledWith('resp_stream_123'); expect(captured.headers.get('content-type')).toBe('text/event-stream'); }); From 19edb998d41d719f453cf4ab5e6d33a09c8aec45 Mon Sep 17 00:00:00 2001 From: ndycode <405533+ndycode@users.noreply.github.com> Date: Sun, 22 Mar 2026 18:56:16 +0800 Subject: [PATCH 04/14] Abort response-id capture after SSE errors --- lib/request/response-handler.ts | 31 ++++++++++++++++---- test/response-handler.test.ts | 50 +++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 5 deletions(-) diff --git a/lib/request/response-handler.ts b/lib/request/response-handler.ts index 3f5a485c..77e1ff59 100644 --- a/lib/request/response-handler.ts +++ b/lib/request/response-handler.ts @@ -16,10 +16,12 @@ interface ParsedResponseState { lastPhase: string | null; outputItems: Map; outputText: Map; + outputTextPhases: Map; phaseTextSegments: Map; phaseText: Map; reasoningSummaryText: Map; seenResponseIds: Set; + encounteredError: boolean; } function createParsedResponseState(): ParsedResponseState { @@ -28,10 +30,12 @@ function createParsedResponseState(): ParsedResponseState { lastPhase: null, outputItems: new Map(), outputText: new Map(), + outputTextPhases: new Map(), phaseTextSegments: new Map(), phaseText: new Map(), reasoningSummaryText: new Map(), seenResponseIds: new Set(), + encounteredError: false, }; } @@ -115,8 +119,12 @@ function setPhaseTextSegment( outputTextKey: string, text: string | null, ): void { - if (typeof phase !== "string" || phase.trim().length === 0) return; - const normalizedPhase = phase.trim(); + const normalizedPhase = + typeof phase === "string" && phase.trim().length > 0 + ? phase.trim() + : state.outputTextPhases.get(outputTextKey) ?? null; + if (!normalizedPhase) return; + state.outputTextPhases.set(outputTextKey, normalizedPhase); state.lastPhase = normalizedPhase; const segmentKey = makePhaseTextSegmentKey(normalizedPhase, outputTextKey); if (!text || text.length === 0) { @@ -134,10 +142,15 @@ function appendPhaseTextSegment( outputTextKey: string, delta: string | null, ): void { - if (!delta || delta.length === 0 || typeof phase !== "string" || phase.trim().length === 0) { + if (!delta || delta.length === 0) { return; } - const normalizedPhase = phase.trim(); + const normalizedPhase = + typeof phase === "string" && phase.trim().length > 0 + ? phase.trim() + : state.outputTextPhases.get(outputTextKey) ?? null; + if (!normalizedPhase) return; + state.outputTextPhases.set(outputTextKey, normalizedPhase); state.lastPhase = normalizedPhase; const segmentKey = makePhaseTextSegmentKey(normalizedPhase, outputTextKey); const existing = state.phaseTextSegments.get(segmentKey) ?? ""; @@ -339,6 +352,7 @@ function applyReasoningSummaries(response: MutableRecord, state: ParsedResponseS function finalizeParsedResponse(state: ParsedResponseState): MutableRecord | null { const response = state.finalResponse ? { ...state.finalResponse } : null; if (!response) return null; + if (state.encounteredError) return null; mergeOutputItemsIntoResponse(response, state); applyAccumulatedOutputText(response, state); @@ -353,7 +367,10 @@ function finalizeParsedResponse(state: ParsedResponseState): MutableRecord | nul } const reasoningSummaryText = collectReasoningSummaryText(output); - if (reasoningSummaryText.length > 0) { + if ( + reasoningSummaryText.length > 0 && + typeof response.reasoning_summary_text !== "string" + ) { response.reasoning_summary_text = reasoningSummaryText; } @@ -407,6 +424,7 @@ function maybeCaptureResponseEvent( ): void { if (data.type === "error") { log.error("SSE error event received", { error: data }); + state.encounteredError = true; return; } @@ -523,6 +541,7 @@ function parseSseStream( try { const data = JSON.parse(payload) as SSEEventData; maybeCaptureResponseEvent(state, data, onResponseId); + if (state.encounteredError) return null; } catch { // Skip malformed JSON } @@ -623,6 +642,7 @@ function createResponseIdCapturingStream( const state = createParsedResponseState(); const processBufferedLines = (flush = false): void => { + if (state.encounteredError) return; const lines = bufferedText.split(/\r?\n/); if (!flush) { bufferedText = lines.pop() ?? ""; @@ -638,6 +658,7 @@ function createResponseIdCapturingStream( try { const data = JSON.parse(payload) as SSEEventData; maybeCaptureResponseEvent(state, data, onResponseId); + if (state.encounteredError) break; } catch { // Ignore malformed SSE lines and keep forwarding the raw stream. } diff --git a/test/response-handler.test.ts b/test/response-handler.test.ts index a25386de..8a3f2114 100644 --- a/test/response-handler.test.ts +++ b/test/response-handler.test.ts @@ -115,6 +115,28 @@ data: {"type":"response.completed","response":{"id":"resp_456","output":"done"}} }); }); + it('preserves canonical terminal reasoning_summary_text over synthesized semantic text', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_semantic_canonical","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":1,"item":{"id":"rs_456","type":"reasoning"}}', + 'data: {"type":"response.reasoning_summary_text.delta","output_index":1,"summary_index":0,"delta":"Draft summary"}', + 'data: {"type":"response.reasoning_summary_text.done","output_index":1,"summary_index":0,"text":"Draft summary"}', + 'data: {"type":"response.completed","response":{"id":"resp_semantic_canonical","object":"response","reasoning_summary_text":"Canonical summary"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + reasoning_summary_text?: string; + output?: Array<{ summary?: Array<{ text?: string }> }>; + }; + + expect(body.reasoning_summary_text).toBe('Canonical summary'); + expect(body.output?.[0]?.summary?.[0]?.text).toBe('Draft summary'); + }); + it('preserves richer terminal output when semantic items arrive with empty content arrays', async () => { const sseContent = [ 'data: {"type":"response.created","response":{"id":"resp_rich_123","object":"response"}}', @@ -199,6 +221,32 @@ data: {"type":"response.completed","response":{"id":"resp_456","output":"done"}} expect(body.phase_text).toEqual({ final_answer: 'Hello' }); }); + it('replaces phase text when output_text.done omits phase after earlier deltas set it', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_phase_fix_missing","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_fix_missing","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"Hellp","phase":"final_answer"}', + 'data: {"type":"response.output_text.done","output_index":0,"content_index":0,"text":"Hello"}', + 'data: {"type":"response.done","response":{"id":"resp_phase_fix_missing","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + output_text?: string; + final_answer_text?: string; + phase_text?: Record; + output?: Array<{ content?: Array<{ text?: string }> }>; + }; + + expect(body.output?.[0]?.content?.[0]?.text).toBe('Hello'); + expect(body.output_text).toBe('Hello'); + expect(body.final_answer_text).toBe('Hello'); + expect(body.phase_text).toEqual({ final_answer: 'Hello' }); + }); + it('should return original text if no final response found', async () => { const sseContent = `data: {"type":"response.started"} data: {"type":"chunk","delta":"text"} @@ -270,6 +318,8 @@ data: {"type":"response.done","response":{"id":"resp_789"}} it('should return the raw SSE text when an error event arrives before response.done', async () => { const onResponseId = vi.fn(); const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_bad_123","object":"response"}}', + '', 'data: {"type":"error","message":"quota exceeded"}', '', 'data: {"type":"response.done","response":{"id":"resp_bad_123","output":"bad"}}', From b5eba0a3f6a2eadc5e7195fc5fdcf16e0c745e8a Mon Sep 17 00:00:00 2001 From: ndycode <405533+ndycode@users.noreply.github.com> Date: Sun, 22 Mar 2026 19:01:59 +0800 Subject: [PATCH 05/14] Tighten semantic response parser fallbacks --- lib/request/response-handler.ts | 2 +- test/response-handler.test.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/request/response-handler.ts b/lib/request/response-handler.ts index 77e1ff59..115451ff 100644 --- a/lib/request/response-handler.ts +++ b/lib/request/response-handler.ts @@ -430,10 +430,10 @@ function maybeCaptureResponseEvent( if (isRecord(data.response)) { state.finalResponse = { ...data.response }; - notifyResponseId(state, onResponseId, data.response); } if (data.type === "response.done" || data.type === "response.completed") { + notifyResponseId(state, onResponseId, data.response); return; } diff --git a/test/response-handler.test.ts b/test/response-handler.test.ts index 8a3f2114..92b010d6 100644 --- a/test/response-handler.test.ts +++ b/test/response-handler.test.ts @@ -134,7 +134,7 @@ data: {"type":"response.completed","response":{"id":"resp_456","output":"done"}} }; expect(body.reasoning_summary_text).toBe('Canonical summary'); - expect(body.output?.[0]?.summary?.[0]?.text).toBe('Draft summary'); + expect(body.output?.[1]?.summary?.[0]?.text).toBe('Draft summary'); }); it('preserves richer terminal output when semantic items arrive with empty content arrays', async () => { From e0fec5a97de6364362efefa33e25f6d66b3a3fa6 Mon Sep 17 00:00:00 2001 From: ndycode <405533+ndycode@users.noreply.github.com> Date: Sun, 22 Mar 2026 19:11:03 +0800 Subject: [PATCH 06/14] Preserve whitespace-only response deltas --- lib/request/response-handler.ts | 9 +++++++-- test/response-handler.test.ts | 35 +++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/lib/request/response-handler.ts b/lib/request/response-handler.ts index 115451ff..ba056106 100644 --- a/lib/request/response-handler.ts +++ b/lib/request/response-handler.ts @@ -53,6 +53,11 @@ function getStringField(record: MutableRecord, key: string): string | null { return typeof value === "string" && value.trim().length > 0 ? value : null; } +function getDeltaField(record: MutableRecord, key: string): string | null { + const value = record[key]; + return typeof value === "string" && value.length > 0 ? value : null; +} + function cloneContentArray(content: unknown): MutableRecord[] { if (!Array.isArray(content)) return []; return content.filter(isRecord).map((part) => ({ ...part })); @@ -451,7 +456,7 @@ function maybeCaptureResponseEvent( state, outputIndex, getNumberField(eventRecord, "content_index"), - getStringField(eventRecord, "delta"), + getDeltaField(eventRecord, "delta"), eventRecord.phase, ); return; @@ -489,7 +494,7 @@ function maybeCaptureResponseEvent( state, outputIndex, getNumberField(eventRecord, "summary_index"), - getStringField(eventRecord, "delta"), + getDeltaField(eventRecord, "delta"), ); return; } diff --git a/test/response-handler.test.ts b/test/response-handler.test.ts index 92b010d6..36981f6d 100644 --- a/test/response-handler.test.ts +++ b/test/response-handler.test.ts @@ -137,6 +137,41 @@ data: {"type":"response.completed","response":{"id":"resp_456","output":"done"}} expect(body.output?.[1]?.summary?.[0]?.text).toBe('Draft summary'); }); + it('preserves whitespace-only semantic deltas when no done events override them', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_whitespace_delta","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_space","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"Hello","phase":"final_answer"}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":" ","phase":"final_answer"}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"world","phase":"final_answer"}', + 'data: {"type":"response.output_item.added","output_index":1,"item":{"id":"rs_space","type":"reasoning"}}', + 'data: {"type":"response.reasoning_summary_text.delta","output_index":1,"summary_index":0,"delta":"Need"}', + 'data: {"type":"response.reasoning_summary_text.delta","output_index":1,"summary_index":0,"delta":" "}', + 'data: {"type":"response.reasoning_summary_text.delta","output_index":1,"summary_index":0,"delta":"context."}', + 'data: {"type":"response.done","response":{"id":"resp_whitespace_delta","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + output_text?: string; + final_answer_text?: string; + reasoning_summary_text?: string; + output?: Array<{ + content?: Array<{ text?: string }>; + summary?: Array<{ text?: string }>; + }>; + }; + + expect(body.output?.[0]?.content?.[0]?.text).toBe('Hello world'); + expect(body.output_text).toBe('Hello world'); + expect(body.final_answer_text).toBe('Hello world'); + expect(body.output?.[1]?.summary?.[0]?.text).toBe('Need context.'); + expect(body.reasoning_summary_text).toBe('Need context.'); + }); + it('preserves richer terminal output when semantic items arrive with empty content arrays', async () => { const sseContent = [ 'data: {"type":"response.created","response":{"id":"resp_rich_123","object":"response"}}', From acc6b2b8478a889bea427d6f97e3e071d1130302 Mon Sep 17 00:00:00 2001 From: ndycode <405533+ndycode@users.noreply.github.com> Date: Sun, 22 Mar 2026 19:20:41 +0800 Subject: [PATCH 07/14] Require terminal response events for SSE JSON --- lib/request/response-handler.ts | 7 +++---- test/response-handler.test.ts | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/lib/request/response-handler.ts b/lib/request/response-handler.ts index ba056106..01089f8f 100644 --- a/lib/request/response-handler.ts +++ b/lib/request/response-handler.ts @@ -433,11 +433,10 @@ function maybeCaptureResponseEvent( return; } - if (isRecord(data.response)) { - state.finalResponse = { ...data.response }; - } - if (data.type === "response.done" || data.type === "response.completed") { + if (isRecord(data.response)) { + state.finalResponse = { ...data.response }; + } notifyResponseId(state, onResponseId, data.response); return; } diff --git a/test/response-handler.test.ts b/test/response-handler.test.ts index 36981f6d..7453a230 100644 --- a/test/response-handler.test.ts +++ b/test/response-handler.test.ts @@ -370,6 +370,24 @@ data: {"type":"response.done","response":{"id":"resp_789"}} expect(onResponseId).not.toHaveBeenCalled(); }); + it('should return the raw SSE text when a stream ends after response.created without a terminal event', async () => { + const onResponseId = vi.fn(); + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_partial_123","object":"response"}}', + '', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"partial"}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers, { onResponseId }); + const text = await result.text(); + + expect(text).toBe(sseContent); + expect(onResponseId).not.toHaveBeenCalled(); + }); + it('should throw error if SSE stream exceeds size limit', async () => { const largeContent = 'a'.repeat(20 * 1024 * 1024 + 1); const response = new Response(largeContent); From da2e5a43a5d9e32d5f1d04c141beeef90a084982 Mon Sep 17 00:00:00 2001 From: ndycode <405533+ndycode@users.noreply.github.com> Date: Sun, 22 Mar 2026 19:29:19 +0800 Subject: [PATCH 08/14] fix: harden semantic sse parsing --- lib/request/response-handler.ts | 43 +++++++++++++++++++++++--- test/response-handler.test.ts | 55 +++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 5 deletions(-) diff --git a/lib/request/response-handler.ts b/lib/request/response-handler.ts index 01089f8f..a413d03e 100644 --- a/lib/request/response-handler.ts +++ b/lib/request/response-handler.ts @@ -8,6 +8,7 @@ const log = createLogger("response-handler"); const MAX_SSE_SIZE = 10 * 1024 * 1024; // 10MB limit to prevent memory exhaustion const DEFAULT_STREAM_STALL_TIMEOUT_MS = 45_000; +const MAX_SYNTHESIZED_EVENT_INDEX = 255; type MutableRecord = Record; @@ -58,6 +59,15 @@ function getDeltaField(record: MutableRecord, key: string): string | null { return typeof value === "string" && value.length > 0 ? value : null; } +function isValidSynthesizedIndex(index: number | null): index is number { + return ( + index !== null && + Number.isInteger(index) && + index >= 0 && + index <= MAX_SYNTHESIZED_EVENT_INDEX + ); +} + function cloneContentArray(content: unknown): MutableRecord[] { if (!Array.isArray(content)) return []; return content.filter(isRecord).map((part) => ({ ...part })); @@ -77,7 +87,12 @@ function mergeRecord(base: MutableRecord | null, update: MutableRecord): Mutable } function makeOutputTextKey(outputIndex: number | null, contentIndex: number | null): string | null { - if (outputIndex === null || contentIndex === null) return null; + if ( + !isValidSynthesizedIndex(outputIndex) || + !isValidSynthesizedIndex(contentIndex) + ) { + return null; + } return `${outputIndex}:${contentIndex}`; } @@ -86,7 +101,12 @@ function makePhaseTextSegmentKey(phase: string, outputTextKey: string): string { } function makeSummaryKey(outputIndex: number | null, summaryIndex: number | null): string | null { - if (outputIndex === null || summaryIndex === null) return null; + if ( + !isValidSynthesizedIndex(outputIndex) || + !isValidSynthesizedIndex(summaryIndex) + ) { + return null; + } return `${outputIndex}:${summaryIndex}`; } @@ -164,7 +184,7 @@ function appendPhaseTextSegment( } function upsertOutputItem(state: ParsedResponseState, outputIndex: number | null, item: unknown): void { - if (outputIndex === null || !isRecord(item)) return; + if (!isValidSynthesizedIndex(outputIndex) || !isRecord(item)) return; const current = state.outputItems.get(outputIndex) ?? null; const merged = mergeRecord(current, item); state.outputItems.set(outputIndex, merged); @@ -226,6 +246,7 @@ function appendReasoningSummaryValue( } function ensureOutputItemAtIndex(output: unknown[], index: number): MutableRecord | null { + if (!isValidSynthesizedIndex(index)) return null; while (output.length <= index) { output.push({}); } @@ -237,6 +258,7 @@ function ensureOutputItemAtIndex(output: unknown[], index: number): MutableRecor } function ensureContentPartAtIndex(item: MutableRecord, index: number): MutableRecord | null { + if (!isValidSynthesizedIndex(index)) return null; const content = Array.isArray(item.content) ? [...item.content] : []; while (content.length <= index) { content.push({}); @@ -257,7 +279,12 @@ function applyAccumulatedOutputText(response: MutableRecord, state: ParsedRespon const [outputIndexText, contentIndexText] = key.split(":"); const outputIndex = Number.parseInt(outputIndexText ?? "", 10); const contentIndex = Number.parseInt(contentIndexText ?? "", 10); - if (!Number.isFinite(outputIndex) || !Number.isFinite(contentIndex)) continue; + if ( + !isValidSynthesizedIndex(outputIndex) || + !isValidSynthesizedIndex(contentIndex) + ) { + continue; + } const item = ensureOutputItemAtIndex(output, outputIndex); if (!item) continue; const part = ensureContentPartAtIndex(item, contentIndex); @@ -278,6 +305,7 @@ function mergeOutputItemsIntoResponse(response: MutableRecord, state: ParsedResp const output = Array.isArray(response.output) ? [...response.output] : []; for (const [outputIndex, item] of state.outputItems.entries()) { + if (!isValidSynthesizedIndex(outputIndex)) continue; while (output.length <= outputIndex) { output.push({}); } @@ -329,7 +357,12 @@ function applyReasoningSummaries(response: MutableRecord, state: ParsedResponseS const [outputIndexText, summaryIndexText] = key.split(":"); const outputIndex = Number.parseInt(outputIndexText ?? "", 10); const summaryIndex = Number.parseInt(summaryIndexText ?? "", 10); - if (!Number.isFinite(outputIndex) || !Number.isFinite(summaryIndex)) continue; + if ( + !isValidSynthesizedIndex(outputIndex) || + !isValidSynthesizedIndex(summaryIndex) + ) { + continue; + } const item = ensureOutputItemAtIndex(output, outputIndex); if (!item) continue; const summary = Array.isArray(item.summary) ? [...item.summary] : []; diff --git a/test/response-handler.test.ts b/test/response-handler.test.ts index 7453a230..9c180edb 100644 --- a/test/response-handler.test.ts +++ b/test/response-handler.test.ts @@ -137,6 +137,31 @@ data: {"type":"response.completed","response":{"id":"resp_456","output":"done"}} expect(body.output?.[1]?.summary?.[0]?.text).toBe('Draft summary'); }); + it('synthesizes reasoning summaries from part events', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_summary_part","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":1,"item":{"id":"rs_part","type":"reasoning"}}', + 'data: {"type":"response.reasoning_summary_part.added","output_index":1,"summary_index":0,"part":{"text":"Draft summary"}}', + 'data: {"type":"response.reasoning_summary_part.done","output_index":1,"summary_index":0,"part":{"text":"Need more context."}}', + 'data: {"type":"response.done","response":{"id":"resp_summary_part","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + reasoning_summary_text?: string; + output?: Array<{ summary?: Array<{ type?: string; text?: string }> }>; + }; + + expect(body.reasoning_summary_text).toBe('Need more context.'); + expect(body.output?.[1]?.summary?.[0]).toEqual({ + type: 'summary_text', + text: 'Need more context.', + }); + }); + it('preserves whitespace-only semantic deltas when no done events override them', async () => { const sseContent = [ 'data: {"type":"response.created","response":{"id":"resp_whitespace_delta","object":"response"}}', @@ -388,6 +413,36 @@ data: {"type":"response.done","response":{"id":"resp_789"}} expect(onResponseId).not.toHaveBeenCalled(); }); + it('ignores oversized semantic indices instead of building sparse output arrays', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_guarded_indices","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":1000000,"item":{"id":"msg_big","type":"message","role":"assistant"}}', + 'data: {"type":"response.output_text.done","output_index":1000000,"content_index":1000000,"text":"ignored"}', + 'data: {"type":"response.reasoning_summary_part.done","output_index":1000000,"summary_index":1000000,"part":{"text":"ignored"}}', + 'data: {"type":"response.done","response":{"id":"resp_guarded_indices","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + id: string; + object: string; + output?: unknown[]; + output_text?: string; + reasoning_summary_text?: string; + }; + + expect(body).toEqual({ + id: 'resp_guarded_indices', + object: 'response', + }); + expect(body.output).toBeUndefined(); + expect(body.output_text).toBeUndefined(); + expect(body.reasoning_summary_text).toBeUndefined(); + }); + it('should throw error if SSE stream exceeds size limit', async () => { const largeContent = 'a'.repeat(20 * 1024 * 1024 + 1); const response = new Response(largeContent); From 65f2fa063703cfeb54361670772e2449b5e95bf4 Mon Sep 17 00:00:00 2001 From: ndycode <405533+ndycode@users.noreply.github.com> Date: Sun, 22 Mar 2026 19:52:41 +0800 Subject: [PATCH 09/14] fix: preserve canonical semantic response text --- lib/request/response-handler.ts | 10 ++++++- test/response-handler.test.ts | 51 +++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/lib/request/response-handler.ts b/lib/request/response-handler.ts index a413d03e..87805135 100644 --- a/lib/request/response-handler.ts +++ b/lib/request/response-handler.ts @@ -198,9 +198,13 @@ function setOutputTextValue( text: string | null, phase: unknown = undefined, ): void { - if (!text) return; const key = makeOutputTextKey(outputIndex, contentIndex); if (!key) return; + if (!text) { + state.outputText.delete(key); + setPhaseTextSegment(state, phase, key, null); + return; + } state.outputText.set(key, text); setPhaseTextSegment(state, phase, key, text); } @@ -292,6 +296,10 @@ function applyAccumulatedOutputText(response: MutableRecord, state: ParsedRespon if (!getStringField(part, "type")) { part.type = "output_text"; } + if (typeof part.text === "string") { + setPhaseTextSegment(state, part.phase, key, part.text); + continue; + } part.text = text; } diff --git a/test/response-handler.test.ts b/test/response-handler.test.ts index 9c180edb..e2c4572c 100644 --- a/test/response-handler.test.ts +++ b/test/response-handler.test.ts @@ -222,6 +222,57 @@ data: {"type":"response.completed","response":{"id":"resp_456","output":"done"}} ]); }); + it('preserves canonical terminal content over accumulated deltas for the same slot', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_canonical_slot","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_canonical","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"Draft answer","phase":"final_answer"}', + 'data: {"type":"response.completed","response":{"id":"resp_canonical_slot","object":"response","output":[{"id":"msg_canonical","type":"message","role":"assistant","content":[{"type":"output_text","text":"Canonical answer"}]}]}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + output_text?: string; + final_answer_text?: string; + phase_text?: Record; + output?: Array<{ content?: Array<{ text?: string }> }>; + }; + + expect(body.output?.[0]?.content?.[0]?.text).toBe('Canonical answer'); + expect(body.output_text).toBe('Canonical answer'); + expect(body.final_answer_text).toBe('Canonical answer'); + expect(body.phase_text).toEqual({ final_answer: 'Canonical answer' }); + }); + + it('clears stale output_text deltas when done events omit canonical text', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_stale_delta","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_stale","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"Hello ","phase":"final_answer"}', + 'data: {"type":"response.output_text.done","output_index":0,"content_index":0,"text":" ","phase":"final_answer"}', + 'data: {"type":"response.done","response":{"id":"resp_stale_delta","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + output_text?: string; + final_answer_text?: string; + phase_text?: Record; + output?: Array<{ content?: Array<{ text?: string }> }>; + }; + + expect(body.output?.[0]?.content).toBeUndefined(); + expect(body.output_text).toBeUndefined(); + expect(body.final_answer_text).toBeUndefined(); + expect(body.phase_text).toBeUndefined(); + }); + it('tracks commentary and final_answer phase text separately when phase labels are present', async () => { const sseContent = [ 'data: {"type":"response.created","response":{"id":"resp_phase_123","object":"response"}}', From 729cf2d856b256500bbf823009a142d33e3a3244 Mon Sep 17 00:00:00 2001 From: ndycode <405533+ndycode@users.noreply.github.com> Date: Sun, 22 Mar 2026 20:26:20 +0800 Subject: [PATCH 10/14] fix: clear stale reasoning summary deltas --- lib/request/response-handler.ts | 5 ++++- test/response-handler.test.ts | 22 ++++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/lib/request/response-handler.ts b/lib/request/response-handler.ts index 87805135..f9648ab7 100644 --- a/lib/request/response-handler.ts +++ b/lib/request/response-handler.ts @@ -230,9 +230,12 @@ function setReasoningSummaryValue( summaryIndex: number | null, text: string | null, ): void { - if (!text) return; const key = makeSummaryKey(outputIndex, summaryIndex); if (!key) return; + if (!text) { + state.reasoningSummaryText.delete(key); + return; + } state.reasoningSummaryText.set(key, text); } diff --git a/test/response-handler.test.ts b/test/response-handler.test.ts index e2c4572c..17f0ae65 100644 --- a/test/response-handler.test.ts +++ b/test/response-handler.test.ts @@ -273,6 +273,28 @@ data: {"type":"response.completed","response":{"id":"resp_456","output":"done"}} expect(body.phase_text).toBeUndefined(); }); + it('clears stale reasoning_summary_text deltas when done events omit canonical text', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_stale_reasoning","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":1,"item":{"id":"rs_stale","type":"reasoning"}}', + 'data: {"type":"response.reasoning_summary_text.delta","output_index":1,"summary_index":0,"delta":"Need more context"}', + 'data: {"type":"response.reasoning_summary_text.done","output_index":1,"summary_index":0,"text":" "}', + 'data: {"type":"response.done","response":{"id":"resp_stale_reasoning","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + reasoning_summary_text?: string; + output?: Array<{ summary?: Array<{ text?: string }> }>; + }; + + expect(body.output?.[1]?.summary).toBeUndefined(); + expect(body.reasoning_summary_text).toBeUndefined(); + }); + it('tracks commentary and final_answer phase text separately when phase labels are present', async () => { const sseContent = [ 'data: {"type":"response.created","response":{"id":"resp_phase_123","object":"response"}}', From 5acb8ad643ea62e84ac41e9e732729c22058cf57 Mon Sep 17 00:00:00 2001 From: ndycode <405533+ndycode@users.noreply.github.com> Date: Sun, 22 Mar 2026 20:46:22 +0800 Subject: [PATCH 11/14] fix: preserve canonical response summaries --- lib/request/response-handler.ts | 3 ++ test/response-handler.test.ts | 54 +++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/lib/request/response-handler.ts b/lib/request/response-handler.ts index f9648ab7..3691ad5f 100644 --- a/lib/request/response-handler.ts +++ b/lib/request/response-handler.ts @@ -385,6 +385,9 @@ function applyReasoningSummaries(response: MutableRecord, state: ParsedResponseS if (!getStringField(nextPart, "type")) { nextPart.type = "summary_text"; } + if (typeof nextPart.text === "string") { + continue; + } nextPart.text = text; summary[summaryIndex] = nextPart; item.summary = summary; diff --git a/test/response-handler.test.ts b/test/response-handler.test.ts index 17f0ae65..92837943 100644 --- a/test/response-handler.test.ts +++ b/test/response-handler.test.ts @@ -137,6 +137,31 @@ data: {"type":"response.completed","response":{"id":"resp_456","output":"done"}} expect(body.output?.[1]?.summary?.[0]?.text).toBe('Draft summary'); }); + it('preserves canonical terminal reasoning summary parts over synthesized semantic text', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_semantic_part_canonical","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":1,"item":{"id":"rs_789","type":"reasoning"}}', + 'data: {"type":"response.reasoning_summary_part.added","output_index":1,"summary_index":0,"part":{"text":"Draft summary"}}', + 'data: {"type":"response.reasoning_summary_part.done","output_index":1,"summary_index":0,"part":{"text":"Draft summary"}}', + 'data: {"type":"response.completed","response":{"id":"resp_semantic_part_canonical","object":"response","output":[{},{"type":"reasoning","summary":[{"type":"summary_text","text":"Canonical summary part"}]}]}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + reasoning_summary_text?: string; + output?: Array<{ summary?: Array<{ type?: string; text?: string }> }>; + }; + + expect(body.reasoning_summary_text).toBe('Canonical summary part'); + expect(body.output?.[1]?.summary?.[0]).toEqual({ + type: 'summary_text', + text: 'Canonical summary part', + }); + }); + it('synthesizes reasoning summaries from part events', async () => { const sseContent = [ 'data: {"type":"response.created","response":{"id":"resp_summary_part","object":"response"}}', @@ -162,6 +187,35 @@ data: {"type":"response.completed","response":{"id":"resp_456","output":"done"}} }); }); + it('synthesizes output text from content_part events', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_content_part","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_part","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.content_part.added","output_index":0,"content_index":0,"part":{"type":"output_text","text":"Hello ","phase":"final_answer"}}', + 'data: {"type":"response.content_part.done","output_index":0,"content_index":0,"part":{"type":"output_text","text":"Hello world","phase":"final_answer"}}', + 'data: {"type":"response.done","response":{"id":"resp_content_part","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + output_text?: string; + final_answer_text?: string; + phase_text?: Record; + output?: Array<{ content?: Array<{ type?: string; text?: string }> }>; + }; + + expect(body.output?.[0]?.content?.[0]).toEqual({ + type: 'output_text', + text: 'Hello world', + }); + expect(body.output_text).toBe('Hello world'); + expect(body.final_answer_text).toBe('Hello world'); + expect(body.phase_text).toEqual({ final_answer: 'Hello world' }); + }); + it('preserves whitespace-only semantic deltas when no done events override them', async () => { const sseContent = [ 'data: {"type":"response.created","response":{"id":"resp_whitespace_delta","object":"response"}}', From 8d07f803e1b31f5ec10c26d3bcd9d73ea582e327 Mon Sep 17 00:00:00 2001 From: ndycode <405533+ndycode@users.noreply.github.com> Date: Sun, 22 Mar 2026 20:49:25 +0800 Subject: [PATCH 12/14] preserve canonical reasoning summary parts --- lib/request/response-handler.ts | 55 ++++++++++++++--- test/response-handler.test.ts | 103 ++++++++++++++++++++++++++++++++ 2 files changed, 150 insertions(+), 8 deletions(-) diff --git a/lib/request/response-handler.ts b/lib/request/response-handler.ts index 3691ad5f..e5a0f628 100644 --- a/lib/request/response-handler.ts +++ b/lib/request/response-handler.ts @@ -19,6 +19,7 @@ interface ParsedResponseState { outputText: Map; outputTextPhases: Map; phaseTextSegments: Map; + phaseSegmentOrder: Map; phaseText: Map; reasoningSummaryText: Map; seenResponseIds: Set; @@ -33,6 +34,7 @@ function createParsedResponseState(): ParsedResponseState { outputText: new Map(), outputTextPhases: new Map(), phaseTextSegments: new Map(), + phaseSegmentOrder: new Map(), phaseText: new Map(), reasoningSummaryText: new Map(), seenResponseIds: new Set(), @@ -125,11 +127,40 @@ function capturePhase( state.lastPhase = phase.trim(); } -function syncPhaseText(state: ParsedResponseState, phase: string): void { - const prefix = `${phase}\u0000`; - const text = [...state.phaseTextSegments.entries()] - .filter(([key]) => key.startsWith(prefix)) - .map(([, value]) => value) +function rememberPhaseSegmentOrder( + state: ParsedResponseState, + phase: string, + segmentKey: string, +): string[] { + const existingOrder = state.phaseSegmentOrder.get(phase); + if (existingOrder?.includes(segmentKey)) { + return existingOrder; + } + const nextOrder = [...(existingOrder ?? []), segmentKey]; + state.phaseSegmentOrder.set(phase, nextOrder); + return nextOrder; +} + +function removePhaseSegmentOrder( + state: ParsedResponseState, + phase: string, + segmentKey: string, +): void { + const existingOrder = state.phaseSegmentOrder.get(phase); + if (!existingOrder) return; + const nextOrder = existingOrder.filter((key) => key !== segmentKey); + if (nextOrder.length === 0) { + state.phaseSegmentOrder.delete(phase); + return; + } + state.phaseSegmentOrder.set(phase, nextOrder); +} + +function rebuildPhaseText(state: ParsedResponseState, phase: string): void { + const orderedKeys = state.phaseSegmentOrder.get(phase) ?? []; + const text = orderedKeys + .map((key) => state.phaseTextSegments.get(key) ?? "") + .filter((value) => value.length > 0) .join(""); if (text.length === 0) { state.phaseText.delete(phase); @@ -154,11 +185,13 @@ function setPhaseTextSegment( const segmentKey = makePhaseTextSegmentKey(normalizedPhase, outputTextKey); if (!text || text.length === 0) { state.phaseTextSegments.delete(segmentKey); - syncPhaseText(state, normalizedPhase); + removePhaseSegmentOrder(state, normalizedPhase, segmentKey); + rebuildPhaseText(state, normalizedPhase); return; } + rememberPhaseSegmentOrder(state, normalizedPhase, segmentKey); state.phaseTextSegments.set(segmentKey, text); - syncPhaseText(state, normalizedPhase); + rebuildPhaseText(state, normalizedPhase); } function appendPhaseTextSegment( @@ -178,9 +211,15 @@ function appendPhaseTextSegment( state.outputTextPhases.set(outputTextKey, normalizedPhase); state.lastPhase = normalizedPhase; const segmentKey = makePhaseTextSegmentKey(normalizedPhase, outputTextKey); + const phaseOrder = rememberPhaseSegmentOrder(state, normalizedPhase, segmentKey); const existing = state.phaseTextSegments.get(segmentKey) ?? ""; state.phaseTextSegments.set(segmentKey, `${existing}${delta}`); - syncPhaseText(state, normalizedPhase); + if (phaseOrder[phaseOrder.length - 1] === segmentKey) { + const existingPhaseText = state.phaseText.get(normalizedPhase) ?? ""; + state.phaseText.set(normalizedPhase, `${existingPhaseText}${delta}`); + return; + } + rebuildPhaseText(state, normalizedPhase); } function upsertOutputItem(state: ParsedResponseState, outputIndex: number | null, item: unknown): void { diff --git a/test/response-handler.test.ts b/test/response-handler.test.ts index 92837943..60c41bdf 100644 --- a/test/response-handler.test.ts +++ b/test/response-handler.test.ts @@ -187,6 +187,28 @@ data: {"type":"response.completed","response":{"id":"resp_456","output":"done"}} }); }); + it('preserves canonical terminal reasoning summary text over synthesized summary deltas', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_semantic_nested","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":1,"item":{"id":"rs_nested","type":"reasoning"}}', + 'data: {"type":"response.reasoning_summary_text.delta","output_index":1,"summary_index":0,"delta":"Draft nested summary"}', + 'data: {"type":"response.reasoning_summary_text.done","output_index":1,"summary_index":0,"text":"Draft nested summary"}', + 'data: {"type":"response.completed","response":{"id":"resp_semantic_nested","object":"response","output":[{},{"id":"rs_nested","type":"reasoning","summary":[{"type":"summary_text","text":"Canonical nested summary"}]}]}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + reasoning_summary_text?: string; + output?: Array<{ summary?: Array<{ text?: string }> }>; + }; + + expect(body.output?.[1]?.summary?.[0]?.text).toBe('Canonical nested summary'); + expect(body.reasoning_summary_text).toBe('Canonical nested summary'); + }); + it('synthesizes output text from content_part events', async () => { const sseContent = [ 'data: {"type":"response.created","response":{"id":"resp_content_part","object":"response"}}', @@ -434,6 +456,87 @@ data: {"type":"response.completed","response":{"id":"resp_456","output":"done"}} expect(body.phase_text).toEqual({ final_answer: 'Hello' }); }); + it('handles response.content_part.added for output_text parts', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_content_part_added","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_part_added","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.content_part.added","output_index":0,"content_index":0,"part":{"type":"output_text","text":"Hello from added part","phase":"final_answer"}}', + 'data: {"type":"response.done","response":{"id":"resp_content_part_added","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + output_text?: string; + final_answer_text?: string; + phase_text?: Record; + output?: Array<{ content?: Array<{ type?: string; text?: string }> }>; + }; + + expect(body.output?.[0]?.content?.[0]).toEqual({ + type: 'output_text', + text: 'Hello from added part', + }); + expect(body.output_text).toBe('Hello from added part'); + expect(body.final_answer_text).toBe('Hello from added part'); + expect(body.phase_text).toEqual({ final_answer: 'Hello from added part' }); + }); + + it('handles response.content_part.done for output_text parts', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_content_part_done","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_part_done","type":"message","role":"assistant","phase":"final_answer"}}', + 'data: {"type":"response.content_part.done","output_index":0,"content_index":0,"part":{"type":"output_text","text":"Hello from done part","phase":"final_answer"}}', + 'data: {"type":"response.done","response":{"id":"resp_content_part_done","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + output_text?: string; + final_answer_text?: string; + phase_text?: Record; + output?: Array<{ content?: Array<{ type?: string; text?: string }> }>; + }; + + expect(body.output?.[0]?.content?.[0]).toEqual({ + type: 'output_text', + text: 'Hello from done part', + }); + expect(body.output_text).toBe('Hello from done part'); + expect(body.final_answer_text).toBe('Hello from done part'); + expect(body.phase_text).toEqual({ final_answer: 'Hello from done part' }); + }); + + it('captures phase from non-output_text content parts without mutating output text', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_content_part_annotation","object":"response"}}', + 'data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_part_annotation","type":"message","role":"assistant"}}', + 'data: {"type":"response.content_part.added","output_index":0,"content_index":0,"part":{"type":"annotation","text":"ignored","phase":"commentary"}}', + 'data: {"type":"response.done","response":{"id":"resp_content_part_annotation","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + phase?: string; + output_text?: string; + phase_text?: Record; + output?: Array<{ content?: Array<{ type?: string; text?: string }> }>; + }; + + expect(body.phase).toBe('commentary'); + expect(body.output?.[0]?.content).toBeUndefined(); + expect(body.output_text).toBeUndefined(); + expect(body.phase_text).toBeUndefined(); + }); + it('should return original text if no final response found', async () => { const sseContent = `data: {"type":"response.started"} data: {"type":"chunk","delta":"text"} From 26c00d8b12a8c5cc593a4bebc3fa5ee6c3ed672d Mon Sep 17 00:00:00 2001 From: ndycode <405533+ndycode@users.noreply.github.com> Date: Sun, 22 Mar 2026 21:00:55 +0800 Subject: [PATCH 13/14] test: cover missing semantic output indices --- lib/request/response-handler.ts | 6 ++++++ test/response-handler.test.ts | 23 +++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/lib/request/response-handler.ts b/lib/request/response-handler.ts index e5a0f628..ee5a5bec 100644 --- a/lib/request/response-handler.ts +++ b/lib/request/response-handler.ts @@ -51,6 +51,12 @@ function getNumberField(record: MutableRecord, key: string): number | null { return typeof value === "number" && Number.isFinite(value) ? value : null; } +/** + * Read a trimmed, non-empty string field for identifier-like values. + * + * For textual payloads where whitespace is meaningful, use a field-specific + * accessor such as `getDeltaField` instead of reusing this helper. + */ function getStringField(record: MutableRecord, key: string): string | null { const value = record[key]; return typeof value === "string" && value.trim().length > 0 ? value : null; diff --git a/test/response-handler.test.ts b/test/response-handler.test.ts index 60c41bdf..f372f7c3 100644 --- a/test/response-handler.test.ts +++ b/test/response-handler.test.ts @@ -673,6 +673,29 @@ data: {"type":"response.done","response":{"id":"resp_789"}} expect(body.reasoning_summary_text).toBeUndefined(); }); + it('ignores delta events with missing output_index', async () => { + const sseContent = [ + 'data: {"type":"response.created","response":{"id":"resp_no_index","object":"response"}}', + 'data: {"type":"response.output_text.delta","content_index":0,"delta":"orphan"}', + 'data: {"type":"response.reasoning_summary_text.delta","summary_index":0,"delta":"orphan"}', + 'data: {"type":"response.done","response":{"id":"resp_no_index","object":"response"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers); + const body = await result.json() as { + id: string; + output_text?: string; + reasoning_summary_text?: string; + }; + + expect(body.id).toBe('resp_no_index'); + expect(body.output_text).toBeUndefined(); + expect(body.reasoning_summary_text).toBeUndefined(); + }); + it('should throw error if SSE stream exceeds size limit', async () => { const largeContent = 'a'.repeat(20 * 1024 * 1024 + 1); const response = new Response(largeContent); From 6a273fe4cd5db1636386e06c0a963d1d8a95ee93 Mon Sep 17 00:00:00 2001 From: ndycode <405533+ndycode@users.noreply.github.com> Date: Sun, 22 Mar 2026 21:43:00 +0800 Subject: [PATCH 14/14] fix session-affinity response id compatibility --- lib/session-affinity.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/session-affinity.ts b/lib/session-affinity.ts index 9a90950f..1ce27e30 100644 --- a/lib/session-affinity.ts +++ b/lib/session-affinity.ts @@ -98,6 +98,14 @@ export class SessionAffinityStore { * This method does not create a new affinity entry; callers that need to * upsert continuation state should use `rememberWithResponseId`. */ + rememberLastResponseId( + sessionKey: string | null | undefined, + responseId: string | null | undefined, + now = Date.now(), + ): void { + this.updateLastResponseId(sessionKey, responseId, now); + } + updateLastResponseId( sessionKey: string | null | undefined, responseId: string | null | undefined,