diff --git a/index.ts b/index.ts index 368daaf3..13c03abb 100644 --- a/index.ts +++ b/index.ts @@ -65,6 +65,7 @@ import { getLiveAccountSync, getLiveAccountSyncDebounceMs, getLiveAccountSyncPollMs, + getResponseContinuation, getSessionAffinity, getSessionAffinityTtlMs, getSessionAffinityMaxEntries, @@ -977,7 +978,10 @@ export const OpenAIOAuthPlugin: Plugin = async ({ client }: PluginInput) => { const ensureSessionAffinity = ( pluginConfig: ReturnType, ): void => { - if (!getSessionAffinity(pluginConfig)) { + if ( + !getSessionAffinity(pluginConfig) && + !getResponseContinuation(pluginConfig) + ) { sessionAffinityStore = null; sessionAffinityConfigKey = null; return; @@ -1336,8 +1340,7 @@ export const OpenAIOAuthPlugin: Plugin = async ({ client }: PluginInput) => { const originalBody = await parseRequestBodyFromInit(baseInit?.body); const isStreaming = originalBody.stream === true; const parsedBody = - Object.keys(originalBody).length > 0 ? originalBody : undefined; - + Object.keys(originalBody).length > 0 ? { ...originalBody } : undefined; const transformation = await transformRequestForCodex( baseInit, url, @@ -1356,6 +1359,8 @@ export const OpenAIOAuthPlugin: Plugin = async ({ client }: PluginInput) => { let model = transformedBody?.model; let modelFamily = model ? getModelFamily(model) : "gpt-5.1"; let quotaKey = model ? `${modelFamily}:${model}` : modelFamily; + const responseContinuationEnabled = + getResponseContinuation(pluginConfig); const threadIdCandidate = (process.env.CODEX_THREAD_ID ?? promptCacheKey ?? "") .toString() @@ -1363,6 +1368,24 @@ export const OpenAIOAuthPlugin: Plugin = async ({ client }: PluginInput) => { const sessionAffinityKey = threadIdCandidate ?? promptCacheKey ?? null; const effectivePromptCacheKey = (sessionAffinityKey ?? promptCacheKey ?? "").toString().trim() || undefined; + const shouldUseResponseContinuation = + Boolean(transformedBody) && + responseContinuationEnabled && + !transformedBody?.previous_response_id; + if (shouldUseResponseContinuation && transformedBody) { + const lastResponseId = + sessionAffinityStore?.getLastResponseId(sessionAffinityKey); + if (lastResponseId) { + transformedBody = { + ...transformedBody, + previous_response_id: lastResponseId, + }; + requestInit = { + ...requestInit, + body: JSON.stringify(transformedBody), + }; + } + } const preferredSessionAccountIndex = sessionAffinityStore?.getPreferredAccountIndex( sessionAffinityKey, ); @@ -2397,7 +2420,10 @@ accountAttemptLoop: while (attempted.size < Math.max(1, accountCount)) { successAccountForResponse = fallbackAccount; successEntitlementAccountKey = fallbackEntitlementAccountKey; runtimeMetrics.streamFailoverRecoveries += 1; - if (fallbackAccount.index !== account.index) { + if ( + fallbackAccount.index !== account.index && + !responseContinuationEnabled + ) { runtimeMetrics.streamFailoverCrossAccountRecoveries += 1; runtimeMetrics.accountRotations += 1; sessionAffinityStore?.remember( @@ -2447,7 +2473,20 @@ accountAttemptLoop: while (attempted.size < Math.max(1, accountCount)) { }, ); } + let storedResponseIdForSuccess = false; const successResponse = await handleSuccessResponse(responseForSuccess, isStreaming, { + onResponseId: (responseId) => { + if (!responseContinuationEnabled) return; + sessionAffinityStore?.remember( + sessionAffinityKey, + successAccountForResponse.index, + ); + sessionAffinityStore?.rememberLastResponseId( + sessionAffinityKey, + responseId, + ); + storedResponseIdForSuccess = true; + }, streamStallTimeoutMs, }); @@ -2512,10 +2551,15 @@ accountAttemptLoop: while (attempted.size < Math.max(1, accountCount)) { capabilityModelKey, ); entitlementCache.clear(successAccountKey, capabilityModelKey); - sessionAffinityStore?.remember( - sessionAffinityKey, - successAccountForResponse.index, - ); + if ( + !responseContinuationEnabled || + (!isStreaming && !storedResponseIdForSuccess) + ) { + sessionAffinityStore?.remember( + sessionAffinityKey, + successAccountForResponse.index, + ); + } runtimeMetrics.successfulRequests++; runtimeMetrics.lastError = null; if (lastCodexCliActiveSyncIndex !== successAccountForResponse.index) { diff --git a/lib/config.ts b/lib/config.ts index f9e7ecf8..910ed14b 100644 --- a/lib/config.ts +++ b/lib/config.ts @@ -148,6 +148,7 @@ export const DEFAULT_PLUGIN_CONFIG: PluginConfig = { sessionAffinity: true, sessionAffinityTtlMs: 20 * 60_000, sessionAffinityMaxEntries: 512, + responseContinuation: false, proactiveRefreshGuardian: true, proactiveRefreshIntervalMs: 60_000, proactiveRefreshBufferMs: 5 * 60_000, @@ -917,6 +918,24 @@ export function getSessionAffinityMaxEntries(pluginConfig: PluginConfig): number ); } +/** + * Controls whether the plugin should automatically continue Responses API turns + * with the last known `previous_response_id` for the active session key. + * + * Reads the `responseContinuation` value from `pluginConfig` and allows an + * environment override via `CODEX_AUTH_RESPONSE_CONTINUATION`. + * + * @param pluginConfig - The plugin configuration to consult for the setting + * @returns `true` if automatic response continuation is enabled, `false` otherwise + */ +export function getResponseContinuation(pluginConfig: PluginConfig): boolean { + return resolveBooleanSetting( + "CODEX_AUTH_RESPONSE_CONTINUATION", + pluginConfig.responseContinuation, + false, + ); +} + /** * Controls whether the proactive refresh guardian is enabled. * diff --git a/lib/request/fetch-helpers.ts b/lib/request/fetch-helpers.ts index 4581b618..37043418 100644 --- a/lib/request/fetch-helpers.ts +++ b/lib/request/fetch-helpers.ts @@ -9,7 +9,11 @@ import { queuedRefresh } from "../refresh-queue.js"; import { logRequest, logError, logWarn } from "../logger.js"; import { getCodexInstructions, getModelFamily } from "../prompts/codex.js"; import { transformRequestBody, normalizeModel } from "./request-transformer.js"; -import { convertSseToJson, ensureContentType } from "./response-handler.js"; +import { + attachResponseIdCapture, + convertSseToJson, + ensureContentType, +} from "./response-handler.js"; import type { UserConfig, RequestBody } from "../types.js"; import { registerCleanup } from "../shutdown.js"; import { CodexAuthError } from "../errors.js"; @@ -841,7 +845,10 @@ export async function handleErrorResponse( export async function handleSuccessResponse( response: Response, isStreaming: boolean, - options?: { streamStallTimeoutMs?: number }, + options?: { + onResponseId?: (responseId: string) => void; + streamStallTimeoutMs?: number; + }, ): Promise { // Check for deprecation headers (RFC 8594) const deprecation = response.headers.get("Deprecation"); @@ -858,11 +865,7 @@ export async function handleSuccessResponse( } // For streaming requests (streamText), return stream as-is - return new Response(response.body, { - status: response.status, - statusText: response.statusText, - headers: responseHeaders, - }); + return attachResponseIdCapture(response, responseHeaders, options?.onResponseId); } async function safeReadBody(response: Response): Promise { diff --git a/lib/request/response-handler.ts b/lib/request/response-handler.ts index 76de00f6..861be8ab 100644 --- a/lib/request/response-handler.ts +++ b/lib/request/response-handler.ts @@ -8,13 +8,64 @@ const log = createLogger("response-handler"); const MAX_SSE_SIZE = 10 * 1024 * 1024; // 10MB limit to prevent memory exhaustion const DEFAULT_STREAM_STALL_TIMEOUT_MS = 45_000; +function extractResponseId(response: unknown): string | null { + if (!response || typeof response !== "object") return null; + const candidate = (response as { id?: unknown }).id; + return typeof candidate === "string" && candidate.trim().length > 0 + ? candidate.trim() + : null; +} + +function notifyResponseId( + onResponseId: ((responseId: string) => void) | undefined, + response: unknown, +): void { + const responseId = extractResponseId(response); + if (!responseId || !onResponseId) return; + try { + onResponseId(responseId); + } catch (error) { + log.warn("Failed to persist response id from upstream event", { + error: String(error), + responseId, + }); + } +} + +type CapturedResponseEvent = + | { kind: "error" } + | { kind: "response"; response: unknown } + | null; + +function maybeCaptureResponseEvent( + data: SSEEventData, + onResponseId?: (responseId: string) => void, +): CapturedResponseEvent { + if (data.type === "error") { + log.error("SSE error event received", { error: data }); + return { kind: "error" }; + } + + if (data.type === "response.done" || data.type === "response.completed") { + notifyResponseId(onResponseId, data.response); + if (data.response !== undefined && data.response !== null) { + return { kind: "response", response: data.response }; + } + } + + return null; +} + /** * Parse SSE stream to extract final response * @param sseText - Complete SSE stream text * @returns Final response object or null if not found */ -function parseSseStream(sseText: string): unknown | null { +function parseSseStream( + sseText: string, + onResponseId?: (responseId: string) => void, +): unknown | null { const lines = sseText.split(/\r?\n/); for (const line of lines) { @@ -24,15 +75,9 @@ function parseSseStream(sseText: string): unknown | null { if (!payload || payload === '[DONE]') continue; try { const data = JSON.parse(payload) as SSEEventData; - - if (data.type === 'error') { - log.error("SSE error event received", { error: data }); - return null; - } - - if (data.type === 'response.done' || data.type === 'response.completed') { - return data.response; - } + const capturedEvent = maybeCaptureResponseEvent(data, onResponseId); + if (capturedEvent?.kind === "error") return null; + if (capturedEvent?.kind === "response") return capturedEvent.response; } catch { // Skip malformed JSON } @@ -51,7 +96,10 @@ function parseSseStream(sseText: string): unknown | null { export async function convertSseToJson( response: Response, headers: Headers, - options?: { streamStallTimeoutMs?: number }, + options?: { + onResponseId?: (responseId: string) => void; + streamStallTimeoutMs?: number; + }, ): Promise { if (!response.body) { throw new Error(`[${PLUGIN_NAME}] Response has no body`); @@ -80,7 +128,7 @@ export async function convertSseToJson( } // Parse SSE events to extract the final response - const finalResponse = parseSseStream(fullText); + const finalResponse = parseSseStream(fullText, options?.onResponseId); if (!finalResponse) { log.warn("Could not find final response in SSE stream"); @@ -119,6 +167,56 @@ export async function convertSseToJson( } +function createResponseIdCapturingStream( + body: ReadableStream, + onResponseId: (responseId: string) => void, +): ReadableStream { + const decoder = new TextDecoder(); + let bufferedText = ""; + let sawErrorEvent = false; + + const processBufferedLines = (flush = false): void => { + if (sawErrorEvent) return; + const lines = bufferedText.split(/\r?\n/); + if (!flush) { + bufferedText = lines.pop() ?? ""; + } else { + bufferedText = ""; + } + + for (const rawLine of lines) { + const trimmedLine = rawLine.trim(); + if (!trimmedLine.startsWith("data: ")) continue; + const payload = trimmedLine.slice(6).trim(); + if (!payload || payload === "[DONE]") continue; + try { + const data = JSON.parse(payload) as SSEEventData; + const capturedEvent = maybeCaptureResponseEvent(data, onResponseId); + if (capturedEvent?.kind === "error") { + sawErrorEvent = true; + break; + } + } catch { + // Ignore malformed SSE lines and keep forwarding the raw stream. + } + } + }; + + return body.pipeThrough( + new TransformStream({ + transform(chunk, controller) { + bufferedText += decoder.decode(chunk, { stream: true }); + processBufferedLines(); + controller.enqueue(chunk); + }, + flush() { + bufferedText += decoder.decode(); + processBufferedLines(true); + }, + }), + ); +} + /** * Ensure response has content-type header * @param headers - Response headers @@ -186,3 +284,23 @@ export function isEmptyResponse(body: unknown): boolean { return false; } + +export function attachResponseIdCapture( + response: Response, + headers: Headers, + onResponseId?: (responseId: string) => void, +): Response { + if (!response.body || !onResponseId) { + return new Response(response.body, { + status: response.status, + statusText: response.statusText, + headers, + }); + } + + return new Response(createResponseIdCapturingStream(response.body, onResponseId), { + status: response.status, + statusText: response.statusText, + headers, + }); +} diff --git a/lib/schemas.ts b/lib/schemas.ts index 1ab18caa..41585678 100644 --- a/lib/schemas.ts +++ b/lib/schemas.ts @@ -47,6 +47,7 @@ export const PluginConfigSchema = z.object({ sessionAffinity: z.boolean().optional(), sessionAffinityTtlMs: z.number().min(1_000).optional(), sessionAffinityMaxEntries: z.number().min(8).optional(), + responseContinuation: z.boolean().optional(), proactiveRefreshGuardian: z.boolean().optional(), proactiveRefreshIntervalMs: z.number().min(5_000).optional(), proactiveRefreshBufferMs: z.number().min(30_000).optional(), diff --git a/lib/session-affinity.ts b/lib/session-affinity.ts index 5cfcdafa..9a90950f 100644 --- a/lib/session-affinity.ts +++ b/lib/session-affinity.ts @@ -10,6 +10,7 @@ export interface SessionAffinityOptions { interface SessionAffinityEntry { accountIndex: number; expiresAt: number; + lastResponseId?: string; updatedAt: number; } @@ -65,14 +66,58 @@ export class SessionAffinityStore { if (!key) return; if (!Number.isFinite(accountIndex) || accountIndex < 0) return; - if (this.entries.size >= this.maxEntries && !this.entries.has(key)) { - const oldest = this.findOldestKey(); - if (oldest) this.entries.delete(oldest); - } + const existingEntry = this.entries.get(key); - this.entries.set(key, { + this.setEntry(key, { accountIndex, expiresAt: now + this.ttlMs, + lastResponseId: existingEntry?.lastResponseId, + updatedAt: now, + }); + } + + getLastResponseId(sessionKey: string | null | undefined, now = Date.now()): string | null { + const key = normalizeSessionKey(sessionKey); + if (!key) return null; + + const entry = this.entries.get(key); + if (!entry) return null; + if (entry.expiresAt <= now) { + this.entries.delete(key); + return null; + } + + const lastResponseId = + typeof entry.lastResponseId === "string" ? entry.lastResponseId.trim() : ""; + return lastResponseId || null; + } + + /** + * Update the last response id for an existing live session. + * + * This method does not create a new affinity entry; callers that need to + * upsert continuation state should use `rememberWithResponseId`. + */ + updateLastResponseId( + sessionKey: string | null | undefined, + responseId: string | null | undefined, + now = Date.now(), + ): void { + const key = normalizeSessionKey(sessionKey); + const normalizedResponseId = typeof responseId === "string" ? responseId.trim() : ""; + if (!key || !normalizedResponseId) return; + + const entry = this.entries.get(key); + if (!entry) return; + if (entry.expiresAt <= now) { + this.entries.delete(key); + return; + } + + this.setEntry(key, { + ...entry, + expiresAt: now + this.ttlMs, + lastResponseId: normalizedResponseId, updatedAt: now, }); } @@ -128,6 +173,15 @@ export class SessionAffinityStore { return this.entries.size; } + private setEntry(key: string, entry: SessionAffinityEntry): void { + if (this.entries.size >= this.maxEntries && !this.entries.has(key)) { + const oldest = this.findOldestKey(); + if (oldest) this.entries.delete(oldest); + } + + this.entries.set(key, entry); + } + private findOldestKey(): string | null { let oldestKey: string | null = null; let oldestTimestamp = Number.POSITIVE_INFINITY; diff --git a/lib/types.ts b/lib/types.ts index fbe8e667..1feeb8a9 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -32,6 +32,28 @@ export interface ReasoningConfig { summary: "auto" | "concise" | "detailed"; } +export type TextFormatConfig = + | { + type: "text"; + [key: string]: unknown; + } + | { + type: "json_object"; + [key: string]: unknown; + } + | { + type: "json_schema"; + name?: string; + description?: string; + schema?: Record; + strict?: boolean; + [key: string]: unknown; + } + | { + type: string; + [key: string]: unknown; + }; + export interface OAuthServerInfo { port: number; ready: boolean; @@ -99,6 +121,7 @@ export interface RequestBody { reasoning?: Partial; text?: { verbosity?: "low" | "medium" | "high"; + format?: TextFormatConfig; }; include?: string[]; providerOptions?: { @@ -107,6 +130,10 @@ export interface RequestBody { }; /** Stable key to enable prompt-token caching on Codex backend */ prompt_cache_key?: string; + /** Retention mode for server-side prompt cache entries */ + prompt_cache_retention?: string; + /** Resume a prior Responses API turn without resending the full transcript */ + previous_response_id?: string; max_output_tokens?: number; max_completion_tokens?: number; [key: string]: unknown; diff --git a/test/codex-prompts.test.ts b/test/codex-prompts.test.ts index 60a190e4..5bb9e334 100644 --- a/test/codex-prompts.test.ts +++ b/test/codex-prompts.test.ts @@ -160,6 +160,15 @@ describe("Codex Prompts Module", () => { const result = await getCodexInstructions("codex-max"); expect(result).toBe("new instructions from github"); expect(mockFetch).toHaveBeenCalledTimes(2); + const rawGitHubUrls = mockFetch.mock.calls + .map((call) => call[0]) + .filter( + (url): url is string => + typeof url === "string" && url.includes("raw.githubusercontent.com"), + ); + expect( + rawGitHubUrls.some((url) => url.includes("gpt-5.1-codex-max_prompt.md")), + ).toBe(true); }); it("should handle 304 Not Modified response", async () => { @@ -243,6 +252,15 @@ describe("Codex Prompts Module", () => { const result = await getCodexInstructions("gpt-5.2-codex"); expect(result).toBe("fallback instructions"); + const rawGitHubUrls = mockFetch.mock.calls + .map((call) => call[0]) + .filter( + (url): url is string => + typeof url === "string" && url.includes("raw.githubusercontent.com"), + ); + expect(rawGitHubUrls.some((url) => url.includes("gpt_5_codex_prompt.md"))).toBe( + true, + ); }); it("should parse tag from HTML content if URL parsing fails", async () => { diff --git a/test/index.test.ts b/test/index.test.ts index fa56f49d..954e6621 100644 --- a/test/index.test.ts +++ b/test/index.test.ts @@ -96,9 +96,10 @@ vi.mock("../lib/config.js", () => ({ getLiveAccountSync: vi.fn(() => false), getLiveAccountSyncDebounceMs: () => 250, getLiveAccountSyncPollMs: () => 2000, - getSessionAffinity: () => false, - getSessionAffinityTtlMs: () => 1_200_000, - getSessionAffinityMaxEntries: () => 512, + getSessionAffinity: vi.fn(() => false), + getSessionAffinityTtlMs: vi.fn(() => 1_200_000), + getSessionAffinityMaxEntries: vi.fn(() => 512), + getResponseContinuation: vi.fn(() => false), getProactiveRefreshGuardian: () => false, getProactiveRefreshIntervalMs: () => 60000, getProactiveRefreshBufferMs: () => 300000, @@ -1300,6 +1301,22 @@ describe("OpenAIOAuthPlugin fetch handler", () => { }; }; + const buildStableRoutingManager = ( + account: { + index: number; + accountId?: string; + accountIdSource?: string; + email?: string; + refreshToken: string; + accessToken?: string; + idToken?: string; + }, + ) => ({ + ...buildRoutingManager([account]), + getCurrentOrNextForFamilyHybrid: () => account, + getCurrentOrNextForFamily: () => account, + }); + it("returns success response for successful fetch", async () => { globalThis.fetch = vi.fn().mockResolvedValue( new Response(JSON.stringify({ content: "test" }), { status: 200 }), @@ -1315,6 +1332,310 @@ describe("OpenAIOAuthPlugin fetch handler", () => { expect(syncCodexCliSelectionMock).toHaveBeenCalledWith(0); }); + it("injects stored previous_response_id on follow-up requests when continuation is enabled", async () => { + const { AccountManager } = await import("../lib/accounts.js"); + const configModule = await import("../lib/config.js"); + const fetchHelpers = await import("../lib/request/fetch-helpers.js"); + const stableAccount = { + index: 0, + accountId: "acc-1", + email: "user@example.com", + refreshToken: "refresh-1", + }; + + vi.spyOn(AccountManager, "loadFromDisk").mockResolvedValueOnce( + buildStableRoutingManager(stableAccount) as never, + ); + vi.mocked(configModule.getSessionAffinity).mockReturnValue(true); + vi.mocked(configModule.getResponseContinuation).mockReturnValue(true); + vi.mocked(fetchHelpers.transformRequestForCodex).mockImplementation( + async (init, _url, _userConfig, _codexMode, body) => { + const normalizedPromptCacheKey = + typeof body?.prompt_cache_key === "string" + ? `${body.prompt_cache_key.trim()}:normalized` + : undefined; + return { + updatedInit: { + ...(init as RequestInit), + body: JSON.stringify({ + ...(body ?? {}), + ...(normalizedPromptCacheKey + ? { prompt_cache_key: normalizedPromptCacheKey } + : {}), + }), + }, + body: { + ...(body ?? { model: "gpt-5.4" }), + ...(normalizedPromptCacheKey + ? { prompt_cache_key: normalizedPromptCacheKey } + : {}), + } as { model: string; prompt_cache_key?: string; previous_response_id?: string }, + }; + }, + ); + vi.mocked(fetchHelpers.handleSuccessResponse) + .mockImplementationOnce(async (response, _isStreaming, options) => { + options?.onResponseId?.("resp_prev_123"); + return response; + }) + .mockImplementationOnce(async (response) => response); + + globalThis.fetch = vi + .fn() + .mockImplementation( + async () => new Response(JSON.stringify({ content: "ok" }), { status: 200 }), + ); + + const { sdk } = await setupPlugin(); + await sdk.fetch!("https://api.openai.com/v1/chat", { + method: "POST", + body: JSON.stringify({ model: "gpt-5.4", prompt_cache_key: "ses_contract" }), + }); + await sdk.fetch!("https://api.openai.com/v1/chat", { + method: "POST", + body: JSON.stringify({ model: "gpt-5.4", prompt_cache_key: "ses_contract" }), + }); + + const firstInit = vi.mocked(globalThis.fetch).mock.calls[0]?.[1] as RequestInit; + const secondInit = vi.mocked(globalThis.fetch).mock.calls[1]?.[1] as RequestInit; + const firstBody = JSON.parse(String(firstInit.body)) as { + previous_response_id?: string; + prompt_cache_key?: string; + }; + const secondBody = JSON.parse(String(secondInit.body)) as { + previous_response_id?: string; + prompt_cache_key?: string; + }; + + expect(firstBody?.previous_response_id).toBeUndefined(); + expect(secondBody?.previous_response_id).toBe("resp_prev_123"); + expect(secondBody?.prompt_cache_key).toBe("ses_contract:normalized"); + }); + + it("preserves explicit previous_response_id over stored continuation state", async () => { + const { AccountManager } = await import("../lib/accounts.js"); + const configModule = await import("../lib/config.js"); + const fetchHelpers = await import("../lib/request/fetch-helpers.js"); + const stableAccount = { + index: 0, + accountId: "acc-1", + email: "user@example.com", + refreshToken: "refresh-1", + }; + + vi.spyOn(AccountManager, "loadFromDisk").mockResolvedValueOnce( + buildStableRoutingManager(stableAccount) as never, + ); + vi.mocked(configModule.getSessionAffinity).mockReturnValue(true); + vi.mocked(configModule.getResponseContinuation).mockReturnValue(true); + vi.mocked(fetchHelpers.transformRequestForCodex).mockImplementation( + async (init, _url, _userConfig, _codexMode, body) => ({ + updatedInit: { + ...(init as RequestInit), + body: JSON.stringify(body ?? {}), + }, + body: (body ?? { model: "gpt-5.4" }) as { model: string }, + }), + ); + vi.mocked(fetchHelpers.handleSuccessResponse) + .mockImplementationOnce(async (response, _isStreaming, options) => { + options?.onResponseId?.("resp_prev_123"); + return response; + }) + .mockImplementationOnce(async (response) => response); + + globalThis.fetch = vi + .fn() + .mockResolvedValue(new Response(JSON.stringify({ content: "ok" }), { status: 200 })); + + const { sdk } = await setupPlugin(); + await sdk.fetch!("https://api.openai.com/v1/chat", { + method: "POST", + body: JSON.stringify({ model: "gpt-5.4", prompt_cache_key: "ses_contract" }), + }); + await sdk.fetch!("https://api.openai.com/v1/chat", { + method: "POST", + body: JSON.stringify({ + model: "gpt-5.4", + prompt_cache_key: "ses_contract", + previous_response_id: "resp_explicit_456", + }), + }); + + const secondInit = vi.mocked(globalThis.fetch).mock.calls[1]?.[1] as RequestInit; + const secondBody = JSON.parse(String(secondInit.body)) as { + previous_response_id?: string; + }; + expect(secondBody?.previous_response_id).toBe("resp_explicit_456"); + }); + + it("injects stored previous_response_id when continuation is enabled without session affinity", async () => { + const { AccountManager } = await import("../lib/accounts.js"); + const configModule = await import("../lib/config.js"); + const fetchHelpers = await import("../lib/request/fetch-helpers.js"); + const stableAccount = { + index: 0, + accountId: "acc-1", + email: "user@example.com", + refreshToken: "refresh-1", + }; + + vi.spyOn(AccountManager, "loadFromDisk").mockResolvedValueOnce( + buildStableRoutingManager(stableAccount) as never, + ); + vi.mocked(configModule.getSessionAffinity).mockReturnValue(false); + vi.mocked(configModule.getResponseContinuation).mockReturnValue(true); + vi.mocked(fetchHelpers.transformRequestForCodex).mockImplementation( + async (init, _url, _userConfig, _codexMode, body) => ({ + updatedInit: { + ...(init as RequestInit), + body: JSON.stringify(body ?? {}), + }, + body: (body ?? { model: "gpt-5.4" }) as { + model: string; + previous_response_id?: string; + }, + }), + ); + vi.mocked(fetchHelpers.handleSuccessResponse) + .mockImplementationOnce(async (response, _isStreaming, options) => { + options?.onResponseId?.("resp_standalone_789"); + return response; + }) + .mockImplementationOnce(async (response) => response); + + globalThis.fetch = vi + .fn() + .mockResolvedValue(new Response(JSON.stringify({ content: "ok" }), { status: 200 })); + + const { sdk } = await setupPlugin(); + await sdk.fetch!("https://api.openai.com/v1/chat", { + method: "POST", + body: JSON.stringify({ model: "gpt-5.4", prompt_cache_key: "ses_contract" }), + }); + await sdk.fetch!("https://api.openai.com/v1/chat", { + method: "POST", + body: JSON.stringify({ model: "gpt-5.4", prompt_cache_key: "ses_contract" }), + }); + + const secondInit = vi.mocked(globalThis.fetch).mock.calls[1]?.[1] as RequestInit; + const secondBody = JSON.parse(String(secondInit.body)) as { + previous_response_id?: string; + }; + expect(secondBody?.previous_response_id).toBe("resp_standalone_789"); + }); + + it("keeps account and previous_response_id aligned across overlapping same-session streams", async () => { + const { AccountManager } = await import("../lib/accounts.js"); + const configModule = await import("../lib/config.js"); + const fetchHelpers = await import("../lib/request/fetch-helpers.js"); + const streamFailoverModule = await import("../lib/request/stream-failover.js"); + const accounts = [ + { + index: 0, + accountId: "acc-1", + email: "alpha@example.com", + refreshToken: "refresh-1", + accessToken: "access-alpha", + }, + { + index: 1, + accountId: "acc-2", + email: "beta@example.com", + refreshToken: "refresh-2", + accessToken: "access-beta", + }, + ]; + let selection = 0; + const manager = { + ...buildRoutingManager(accounts), + getCurrentOrNextForFamilyHybrid: () => accounts[selection++] ?? accounts[1] ?? null, + getCurrentOrNextForFamily: () => accounts[selection++] ?? accounts[1] ?? null, + }; + const responseIdCallbacks: Array<(responseId: string) => void> = []; + + vi.spyOn(AccountManager, "loadFromDisk").mockResolvedValueOnce(manager as never); + vi.mocked(configModule.getSessionAffinity).mockReturnValue(true); + vi.mocked(configModule.getResponseContinuation).mockReturnValue(true); + extractAccountIdMock.mockImplementation((accessToken: unknown) => { + if (accessToken === "access-alpha") return "acc-1"; + if (accessToken === "access-beta") return "acc-2"; + return "account-1"; + }); + vi.spyOn(streamFailoverModule, "withStreamingFailover").mockImplementation( + (initialResponse) => initialResponse, + ); + vi.mocked(fetchHelpers.createCodexHeaders).mockImplementation( + (_init, accountId, accessToken) => + new Headers({ + "x-test-account-id": String(accountId), + "x-test-access-token": String(accessToken), + }), + ); + vi.mocked(fetchHelpers.transformRequestForCodex).mockImplementation( + async (init, _url, _userConfig, _codexMode, body) => ({ + updatedInit: { + ...(init as RequestInit), + body: JSON.stringify(body ?? {}), + }, + body: (body ?? { model: "gpt-5.4" }) as { + model: string; + prompt_cache_key?: string; + previous_response_id?: string; + }, + }), + ); + vi.mocked(fetchHelpers.handleSuccessResponse).mockImplementation( + async (response, _isStreaming, options) => { + if (options?.onResponseId) { + responseIdCallbacks.push(options.onResponseId); + } + return response; + }, + ); + + globalThis.fetch = vi + .fn() + .mockResolvedValue(new Response(JSON.stringify({ content: "ok" }), { status: 200 })); + + const { sdk } = await setupPlugin(); + const firstRequest = sdk.fetch!("https://api.openai.com/v1/chat", { + method: "POST", + body: JSON.stringify({ + model: "gpt-5.4", + prompt_cache_key: "ses_contract", + stream: true, + }), + }); + const secondRequest = sdk.fetch!("https://api.openai.com/v1/chat", { + method: "POST", + body: JSON.stringify({ + model: "gpt-5.4", + prompt_cache_key: "ses_contract", + stream: true, + }), + }); + + await Promise.all([firstRequest, secondRequest]); + expect(responseIdCallbacks).toHaveLength(2); + responseIdCallbacks[1]?.("resp_second_456"); + responseIdCallbacks[0]?.("resp_first_123"); + + await sdk.fetch!("https://api.openai.com/v1/chat", { + method: "POST", + body: JSON.stringify({ model: "gpt-5.4", prompt_cache_key: "ses_contract" }), + }); + + const thirdInit = vi.mocked(globalThis.fetch).mock.calls[2]?.[1] as RequestInit; + const thirdBody = JSON.parse(String(thirdInit.body)) as { + previous_response_id?: string; + }; + const thirdHeaders = new Headers(thirdInit.headers); + expect(thirdBody.previous_response_id).toBe("resp_first_123"); + expect(thirdHeaders.get("x-test-account-id")).toBe("acc-1"); + expect(thirdHeaders.get("x-test-access-token")).toBe("access-alpha"); + }); + it("uses the refreshed token email when checking entitlement blocks", async () => { const { AccountManager } = await import("../lib/accounts.js"); const manager = buildRoutingManager([ diff --git a/test/plugin-config.test.ts b/test/plugin-config.test.ts index 9caebf96..e34e650f 100644 --- a/test/plugin-config.test.ts +++ b/test/plugin-config.test.ts @@ -21,6 +21,7 @@ import { getPreemptiveQuotaRemainingPercent5h, getPreemptiveQuotaRemainingPercent7d, getPreemptiveQuotaMaxDeferralMs, + getResponseContinuation, } from '../lib/config.js'; import type { PluginConfig } from '../lib/types.js'; import * as fs from 'node:fs'; @@ -63,6 +64,7 @@ describe('Plugin Configuration', () => { 'CODEX_AUTH_UNSUPPORTED_MODEL_POLICY', 'CODEX_AUTH_FALLBACK_UNSUPPORTED_MODEL', 'CODEX_AUTH_FALLBACK_GPT53_TO_GPT52', + 'CODEX_AUTH_RESPONSE_CONTINUATION', 'CODEX_AUTH_PREEMPTIVE_QUOTA_ENABLED', 'CODEX_AUTH_PREEMPTIVE_QUOTA_5H_REMAINING_PCT', 'CODEX_AUTH_PREEMPTIVE_QUOTA_7D_REMAINING_PCT', @@ -129,6 +131,7 @@ describe('Plugin Configuration', () => { sessionAffinity: true, sessionAffinityTtlMs: 20 * 60_000, sessionAffinityMaxEntries: 512, + responseContinuation: false, proactiveRefreshGuardian: true, proactiveRefreshIntervalMs: 60_000, proactiveRefreshBufferMs: 5 * 60_000, @@ -187,6 +190,7 @@ describe('Plugin Configuration', () => { sessionAffinity: true, sessionAffinityTtlMs: 20 * 60_000, sessionAffinityMaxEntries: 512, + responseContinuation: false, proactiveRefreshGuardian: true, proactiveRefreshIntervalMs: 60_000, proactiveRefreshBufferMs: 5 * 60_000, @@ -200,6 +204,15 @@ describe('Plugin Configuration', () => { }); }); + it('loads responseContinuation from disk config', () => { + mockExistsSync.mockReturnValue(true); + mockReadFileSync.mockReturnValue( + JSON.stringify({ responseContinuation: true }), + ); + + expect(loadPluginConfig().responseContinuation).toBe(true); + }); + it('should detect CODEX_HOME legacy auth config path before global legacy path', async () => { const runWithCodexHome = async (codexHomePath: string) => { vi.resetModules(); @@ -442,6 +455,7 @@ describe('Plugin Configuration', () => { sessionAffinity: true, sessionAffinityTtlMs: 20 * 60_000, sessionAffinityMaxEntries: 512, + responseContinuation: false, proactiveRefreshGuardian: true, proactiveRefreshIntervalMs: 60_000, proactiveRefreshBufferMs: 5 * 60_000, @@ -506,6 +520,7 @@ describe('Plugin Configuration', () => { sessionAffinity: true, sessionAffinityTtlMs: 20 * 60_000, sessionAffinityMaxEntries: 512, + responseContinuation: false, proactiveRefreshGuardian: true, proactiveRefreshIntervalMs: 60_000, proactiveRefreshBufferMs: 5 * 60_000, @@ -564,6 +579,7 @@ describe('Plugin Configuration', () => { sessionAffinity: true, sessionAffinityTtlMs: 20 * 60_000, sessionAffinityMaxEntries: 512, + responseContinuation: false, proactiveRefreshGuardian: true, proactiveRefreshIntervalMs: 60_000, proactiveRefreshBufferMs: 5 * 60_000, @@ -657,6 +673,25 @@ describe('Plugin Configuration', () => { }); }); + describe('getResponseContinuation', () => { + it('should default to false', () => { + delete process.env.CODEX_AUTH_RESPONSE_CONTINUATION; + expect(getResponseContinuation({})).toBe(false); + }); + + it('should use config value when env var not set', () => { + delete process.env.CODEX_AUTH_RESPONSE_CONTINUATION; + expect(getResponseContinuation({ responseContinuation: true })).toBe(true); + }); + + it('should prioritize env override', () => { + process.env.CODEX_AUTH_RESPONSE_CONTINUATION = '1'; + expect(getResponseContinuation({ responseContinuation: false })).toBe(true); + process.env.CODEX_AUTH_RESPONSE_CONTINUATION = '0'; + expect(getResponseContinuation({ responseContinuation: true })).toBe(false); + }); + }); + describe('getCodexTuiV2', () => { it('should default to true', () => { delete process.env.CODEX_TUI_V2; diff --git a/test/request-transformer.test.ts b/test/request-transformer.test.ts index 0dd01959..51eb1214 100644 --- a/test/request-transformer.test.ts +++ b/test/request-transformer.test.ts @@ -612,6 +612,52 @@ describe('Request Transformer Module', () => { expect(result.prompt_cache_key).toBeUndefined(); }); + it('preserves host-provided previous_response_id', async () => { + const body: RequestBody = { + model: 'gpt-5.4', + input: [], + previous_response_id: 'resp_prior_123', + }; + const result = await transformRequestBody(body, codexInstructions); + expect(result.previous_response_id).toBe('resp_prior_123'); + }); + + it('preserves prompt_cache_retention settings', async () => { + const body: RequestBody = { + model: 'gpt-5.4', + input: [], + prompt_cache_key: 'ses_cache_key_123', + prompt_cache_retention: '24h', + }; + const result = await transformRequestBody(body, codexInstructions); + expect(result.prompt_cache_key).toBe('ses_cache_key_123'); + expect(result.prompt_cache_retention).toBe('24h'); + }); + + it('preserves text.format when applying text verbosity defaults', async () => { + const body: RequestBody = { + model: 'gpt-5.4', + input: [], + text: { + format: { + type: 'json_schema', + name: 'contract_response', + schema: { + type: 'object', + properties: { + answer: { type: 'string' }, + }, + required: ['answer'], + }, + strict: true, + }, + }, + }; + const result = await transformRequestBody(body, codexInstructions); + expect(result.text?.verbosity).toBe('medium'); + expect(result.text?.format).toEqual(body.text?.format); + }); + it('should set required Codex fields', async () => { const body: RequestBody = { model: 'gpt-5', diff --git a/test/response-handler.test.ts b/test/response-handler.test.ts index 2da04e9d..3e0fb0dd 100644 --- a/test/response-handler.test.ts +++ b/test/response-handler.test.ts @@ -1,5 +1,10 @@ import { describe, it, expect, vi } from 'vitest'; -import { ensureContentType, convertSseToJson, isEmptyResponse } from '../lib/request/response-handler.js'; +import { + attachResponseIdCapture, + ensureContentType, + convertSseToJson, + isEmptyResponse, +} from '../lib/request/response-handler.js'; describe('Response Handler Module', () => { describe('ensureContentType', () => { @@ -111,6 +116,37 @@ data: {"type":"response.done","response":{"id":"resp_789"}} expect(result.statusText).toBe('OK'); }); + it('should report the final response id while converting SSE to JSON', async () => { + const onResponseId = vi.fn(); + const sseContent = `data: {"type":"response.done","response":{"id":"resp_123","output":"test"}}`; + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers, { onResponseId }); + const body = await result.json(); + + expect(body).toEqual({ id: 'resp_123', output: 'test' }); + expect(onResponseId).toHaveBeenCalledWith('resp_123'); + }); + + it('should return the raw SSE text when an error event arrives before response.done', async () => { + const onResponseId = vi.fn(); + const sseContent = [ + 'data: {"type":"error","message":"quota exceeded"}', + '', + 'data: {"type":"response.done","response":{"id":"resp_bad_123","output":"bad"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers(); + + const result = await convertSseToJson(response, headers, { onResponseId }); + const text = await result.text(); + + expect(text).toBe(sseContent); + expect(onResponseId).not.toHaveBeenCalled(); + }); + it('should throw error if SSE stream exceeds size limit', async () => { const largeContent = 'a'.repeat(20 * 1024 * 1024 + 1); const response = new Response(largeContent); @@ -165,6 +201,45 @@ data: {"type":"response.done","response":{"id":"resp_789"}} }); }); + describe('attachResponseIdCapture', () => { + it('should capture response ids while preserving the SSE stream', async () => { + const onResponseId = vi.fn(); + const sseContent = [ + 'data: {"type":"response.started"}', + '', + 'data: {"type":"response.done","response":{"id":"resp_stream_123","output":"done"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers({ 'content-type': 'text/event-stream' }); + + const captured = attachResponseIdCapture(response, headers, onResponseId); + const text = await captured.text(); + + expect(text).toBe(sseContent); + expect(onResponseId).toHaveBeenCalledWith('resp_stream_123'); + expect(captured.headers.get('content-type')).toBe('text/event-stream'); + }); + + it('should stop capturing response ids after an SSE error event', async () => { + const onResponseId = vi.fn(); + const sseContent = [ + 'data: {"type":"error","message":"quota exceeded"}', + '', + 'data: {"type":"response.done","response":{"id":"resp_bad_123","output":"done"}}', + '', + ].join('\n'); + const response = new Response(sseContent); + const headers = new Headers({ 'content-type': 'text/event-stream' }); + + const captured = attachResponseIdCapture(response, headers, onResponseId); + const text = await captured.text(); + + expect(text).toBe(sseContent); + expect(onResponseId).not.toHaveBeenCalled(); + }); + }); + describe('isEmptyResponse', () => { it('should return true for null', () => { expect(isEmptyResponse(null)).toBe(true); diff --git a/test/session-affinity.test.ts b/test/session-affinity.test.ts index a8268e20..1bf1ae73 100644 --- a/test/session-affinity.test.ts +++ b/test/session-affinity.test.ts @@ -113,4 +113,34 @@ describe("SessionAffinityStore", () => { expect(store.getPreferredAccountIndex("s1", 2_001)).toBeNull(); expect(store.getPreferredAccountIndex("s2", 2_001)).toBe(1); }); + + it("updates and retrieves the last response id for a live session", () => { + const store = new SessionAffinityStore({ ttlMs: 10_000, maxEntries: 4 }); + store.remember("session-a", 1, 1_000); + store.updateLastResponseId("session-a", "resp_123", 2_000); + + expect(store.getLastResponseId("session-a", 2_500)).toBe("resp_123"); + expect(store.getPreferredAccountIndex("session-a", 2_500)).toBe(1); + }); + + it("does not persist response ids for missing or expired sessions", () => { + const store = new SessionAffinityStore({ ttlMs: 1_000, maxEntries: 4 }); + store.updateLastResponseId("missing", "resp_missing", 1_000); + expect(store.getLastResponseId("missing", 1_500)).toBeNull(); + + store.remember("session-a", 1, 1_000); + store.updateLastResponseId("session-a", "resp_123", 2_500); + expect(store.getLastResponseId("session-a", 2_500)).toBeNull(); + expect(store.size()).toBe(0); + }); + + it("preserves response id when account index is updated via remember()", () => { + const store = new SessionAffinityStore({ ttlMs: 10_000, maxEntries: 4 }); + store.remember("session-a", 1, 1_000); + store.updateLastResponseId("session-a", "resp_123", 2_000); + store.remember("session-a", 2, 3_000); + + expect(store.getLastResponseId("session-a", 3_500)).toBe("resp_123"); + expect(store.getPreferredAccountIndex("session-a", 3_500)).toBe(2); + }); });