Skip to content

Commit 8b094e3

Browse files
committed
fix(codex): resolve mid-task stalling and garbled tool calls
Root cause: Missing text.verbosity parameter in Responses API payload. Without it, the model defaulted to a verbose text mode that caused: 1. Tool calls emitted as garbled text content (+#+#, ♀♀♀♀, etc.) 2. Model announcing actions in text then stopping (finish_reason=stop) Changes: - Add text.verbosity: 'medium' to Codex Responses API payload (matches pi) - Fix garbled tool call detection: delta is a dict, not object (getattr→dict.get) - Add multi-marker detection: +#+#, to=functions., ♀♀♀♀ - Add accumulated text check for markers split across SSE chunks - Buffer-then-flush retry: buffer all chunks before yielding to allow clean retry - Add diagnostic logging for response.completed events (debug level)
1 parent 0988e76 commit 8b094e3

1 file changed

Lines changed: 196 additions & 2 deletions

File tree

src/rotator_library/providers/codex_provider.py

Lines changed: 196 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,32 @@ def _build_available_models() -> list:
141141
EMPTY_RESPONSE_MAX_ATTEMPTS = max(1, env_int("CODEX_EMPTY_RESPONSE_ATTEMPTS", 3))
142142
EMPTY_RESPONSE_RETRY_DELAY = env_int("CODEX_EMPTY_RESPONSE_RETRY_DELAY", 2)
143143

144+
# Garbled tool call retry configuration
145+
# When the Responses API model emits tool calls as garbled text content
146+
# instead of structured function_call output items, automatically retry.
147+
# The garbled output takes multiple forms but always contains the ChatML-era
148+
# tool call format "to=functions.<name>" in the text content. Known prefixes:
149+
# - "+#+#+#+#+#+assistant to=functions.exec ..."
150+
# - "♀♀♀♀assistant to=functions.exec մelon..."
151+
# - Various Unicode noise + "assistant to=functions.<name>"
152+
# This is an intermittent issue where the model reverts to ChatGPT's internal
153+
# chat completion format instead of the Responses API's structured output.
154+
GARBLED_TOOL_CALL_MAX_RETRIES = max(1, env_int("CODEX_GARBLED_TOOL_CALL_RETRIES", 3))
155+
GARBLED_TOOL_CALL_RETRY_DELAY = env_int("CODEX_GARBLED_TOOL_CALL_RETRY_DELAY", 1)
156+
157+
# Multiple detection markers — if ANY match, the stream is considered garbled.
158+
# The "to=functions." pattern is the universal signature across all variants.
159+
GARBLED_TOOL_CALL_MARKERS = [
160+
"+#+#", # Original marker
161+
"to=functions.", # ChatML tool call format (universal across all garble variants)
162+
"♀♀♀♀", # Unicode variant seen in production
163+
]
164+
165+
166+
def _is_garbled_tool_call(text: str) -> bool:
167+
"""Check if text content contains garbled tool call markers."""
168+
return any(marker in text for marker in GARBLED_TOOL_CALL_MARKERS)
169+
144170
# System instruction for Codex models - loaded from file to preserve exact bytes
145171
# The ChatGPT backend API validates this instruction matches exactly
146172
def _load_codex_prompt() -> str:
@@ -653,6 +679,7 @@ async def acompletion(
653679
"input": input_items,
654680
"stream": True, # Always use streaming internally
655681
"store": False,
682+
"text": {"verbosity": "medium"}, # Match pi's default; controls output structure
656683
}
657684

658685
if instructions:
@@ -672,16 +699,169 @@ async def acompletion(
672699
lib_logger.debug(f"Codex request to {normalized_model}: {json.dumps(payload, default=str)[:500]}...")
673700

674701
if stream:
675-
return self._stream_response(
702+
return self._stream_with_retry(
676703
client, headers, payload, requested_model, kwargs.get("reasoning_compat", DEFAULT_REASONING_COMPAT),
677704
credential_path
678705
)
679706
else:
680-
return await self._non_stream_response(
707+
return await self._non_stream_with_retry(
681708
client, headers, payload, requested_model, kwargs.get("reasoning_compat", DEFAULT_REASONING_COMPAT),
682709
credential_path
683710
)
684711

712+
async def _stream_with_retry(
713+
self,
714+
client: httpx.AsyncClient,
715+
headers: Dict[str, str],
716+
payload: Dict[str, Any],
717+
model: str,
718+
reasoning_compat: str,
719+
credential_path: str = "",
720+
) -> AsyncGenerator[litellm.ModelResponse, None]:
721+
"""
722+
Wrapper around _stream_response that retries on garbled tool calls.
723+
724+
When the Responses API model intermittently emits tool calls as garbled
725+
text content (containing markers like +#+# or to=functions.), this
726+
wrapper detects the pattern and retries the entire request.
727+
728+
Uses a buffer-then-flush approach: all chunks are collected first,
729+
then checked for the garbled marker. Only if the stream is clean
730+
are chunks yielded to the caller. This allows true retry since
731+
no chunks have been sent to the HTTP client yet.
732+
733+
Detection is done both per-chunk (for early abort) AND on the
734+
accumulated text after stream completion (to catch markers that
735+
are split across multiple SSE chunks).
736+
"""
737+
for attempt in range(GARBLED_TOOL_CALL_MAX_RETRIES):
738+
garbled_detected = False
739+
buffered_chunks: list = []
740+
accumulated_text = "" # Track all text content across chunks
741+
742+
try:
743+
async for chunk in self._stream_response(
744+
client, headers, payload, model, reasoning_compat, credential_path
745+
):
746+
# Extract content from this chunk for garble detection
747+
# NOTE: delta is a dict (not an object), so use dict access
748+
chunk_content = ""
749+
if hasattr(chunk, "choices") and chunk.choices:
750+
choice = chunk.choices[0]
751+
delta = getattr(choice, "delta", None)
752+
if delta:
753+
if isinstance(delta, dict):
754+
chunk_content = delta.get("content") or ""
755+
else:
756+
chunk_content = getattr(delta, "content", None) or ""
757+
758+
# Accumulate text for cross-chunk detection
759+
if chunk_content:
760+
accumulated_text += chunk_content
761+
762+
# Per-chunk check (catches garble within a single chunk)
763+
if chunk_content and _is_garbled_tool_call(chunk_content):
764+
garbled_detected = True
765+
lib_logger.warning(
766+
f"[Codex] Garbled tool call detected (per-chunk) in stream for {model}, "
767+
f"attempt {attempt + 1}/{GARBLED_TOOL_CALL_MAX_RETRIES}. "
768+
f"Content snippet: {chunk_content[:200]!r}"
769+
)
770+
break # Stop consuming this stream
771+
772+
buffered_chunks.append(chunk)
773+
774+
# Post-stream check: inspect accumulated text for markers split across chunks
775+
if not garbled_detected and _is_garbled_tool_call(accumulated_text):
776+
garbled_detected = True
777+
# Find the garbled portion for logging
778+
snippet_start = max(0, len(accumulated_text) - 200)
779+
lib_logger.warning(
780+
f"[Codex] Garbled tool call detected (accumulated) in stream for {model}, "
781+
f"attempt {attempt + 1}/{GARBLED_TOOL_CALL_MAX_RETRIES}. "
782+
f"Tail of accumulated text: {accumulated_text[snippet_start:]!r}"
783+
)
784+
785+
if not garbled_detected:
786+
# Stream was clean — flush all buffered chunks to caller
787+
for chunk in buffered_chunks:
788+
yield chunk
789+
return # Done
790+
791+
except Exception:
792+
if garbled_detected:
793+
# Exception during stream teardown after garble detected - continue to retry
794+
pass
795+
else:
796+
raise # Non-garble exception - propagate
797+
798+
# Garbled stream detected — discard buffer and retry if we have attempts left
799+
if attempt < GARBLED_TOOL_CALL_MAX_RETRIES - 1:
800+
lib_logger.info(
801+
f"[Codex] Retrying request for {model} after garbled tool call "
802+
f"(attempt {attempt + 2}/{GARBLED_TOOL_CALL_MAX_RETRIES}). "
803+
f"Discarding {len(buffered_chunks)} buffered chunks, "
804+
f"{len(accumulated_text)} chars of accumulated text."
805+
)
806+
await asyncio.sleep(GARBLED_TOOL_CALL_RETRY_DELAY)
807+
else:
808+
lib_logger.error(
809+
f"[Codex] Garbled tool call persisted after {GARBLED_TOOL_CALL_MAX_RETRIES} "
810+
f"attempts for {model}. Flushing last attempt's buffer."
811+
)
812+
# Flush the last attempt's buffer (garbled but better than nothing)
813+
for chunk in buffered_chunks:
814+
yield chunk
815+
return
816+
817+
async def _non_stream_with_retry(
818+
self,
819+
client: httpx.AsyncClient,
820+
headers: Dict[str, str],
821+
payload: Dict[str, Any],
822+
model: str,
823+
reasoning_compat: str,
824+
credential_path: str = "",
825+
) -> litellm.ModelResponse:
826+
"""
827+
Wrapper around _non_stream_response that retries on garbled tool calls.
828+
829+
For non-streaming responses, the entire response is collected before
830+
returning, so we can inspect the accumulated text and retry if the
831+
garbled tool call marker is found.
832+
"""
833+
for attempt in range(GARBLED_TOOL_CALL_MAX_RETRIES):
834+
response = await self._non_stream_response(
835+
client, headers, payload, model, reasoning_compat, credential_path
836+
)
837+
838+
# Check accumulated content for garbled marker
839+
content = None
840+
if hasattr(response, "choices") and response.choices:
841+
message = getattr(response.choices[0], "message", None)
842+
if message:
843+
content = getattr(message, "content", None)
844+
845+
if content and _is_garbled_tool_call(content):
846+
if attempt < GARBLED_TOOL_CALL_MAX_RETRIES - 1:
847+
lib_logger.warning(
848+
f"[Codex] Garbled tool call detected in non-stream response for {model}, "
849+
f"attempt {attempt + 1}/{GARBLED_TOOL_CALL_MAX_RETRIES}. "
850+
f"Content snippet: {content[:100]!r}. Retrying..."
851+
)
852+
await asyncio.sleep(GARBLED_TOOL_CALL_RETRY_DELAY)
853+
continue
854+
else:
855+
lib_logger.error(
856+
f"[Codex] Garbled tool call persisted after {GARBLED_TOOL_CALL_MAX_RETRIES} "
857+
f"attempts for {model} (non-stream). Returning last response."
858+
)
859+
860+
return response
861+
862+
# Should not reach here, but return the last response as fallback
863+
return response
864+
685865
async def _stream_response(
686866
self,
687867
client: httpx.AsyncClient,
@@ -875,6 +1055,20 @@ async def _stream_response(
8751055

8761056
# Handle completion
8771057
elif kind == "response.completed":
1058+
# Log the raw completion event for diagnostics
1059+
resp_data_debug = evt.get("response", {})
1060+
resp_status = resp_data_debug.get("status", "unknown")
1061+
resp_stop_reason = resp_data_debug.get("stop_reason", "N/A")
1062+
resp_output = resp_data_debug.get("output", [])
1063+
output_types = [item.get("type", "?") for item in resp_output] if isinstance(resp_output, list) else []
1064+
has_tool_calls_in_output = any(t in ("function_call", "tool_call") for t in output_types)
1065+
lib_logger.info(
1066+
f"[Codex] response.completed: status={resp_status}, "
1067+
f"stop_reason={resp_stop_reason}, output_types={output_types}, "
1068+
f"tracked_tool_calls={list(current_tool_calls.keys())}, "
1069+
f"has_tc_in_output={has_tool_calls_in_output}"
1070+
)
1071+
8781072
# Determine finish reason
8791073
finish_reason = "stop"
8801074
if current_tool_calls:

0 commit comments

Comments
 (0)