feat: Native OpenAI adapter with retry and AIMD throttle infrastructure#402
feat: Native OpenAI adapter with retry and AIMD throttle infrastructure#402
Conversation
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…provements - Wrap all LiteLLM router calls in try/except to normalize raw exceptions into canonical ProviderError at the bridge boundary (blocking review item) - Extract reusable response-parsing helpers into clients/parsing.py for shared use across future native adapters - Add async image parsing path using httpx.AsyncClient to avoid blocking the event loop in agenerate_image - Add retry_after field to ProviderError for future retry engine support - Fix _to_int_or_none to parse numeric strings from providers - Create test conftest.py with shared mock_router/bridge_client fixtures - Parametrize duplicate image generation and error mapping tests - Add tests for exception wrapping across all bridge methods
…larity - Parse RFC 7231 HTTP-date strings in Retry-After header (used by Azure and Anthropic during rate-limiting) in addition to numeric delay-seconds - Clarify collect_non_none_optional_fields docstring explaining why f.default is None is the correct check for optional field forwarding - Add tests for HTTP-date and garbage Retry-After values
- Fix misleading comment about prompt field defaults in _IMAGE_EXCLUDE - Handle list-format detail arrays in _extract_structured_message for FastAPI/Pydantic validation errors - Document scope boundary for vision content in collect_raw_image_candidates
…el-facade-guts-pr2
…el-facade-guts-pr2
Greptile SummaryThis PR is the third in the model facade overhaul series and introduces the native Key changes and findings:
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/models/clients/adapters/openai_compatible.py | New native httpx-based adapter implementing the ModelClient protocol. Well-structured with lazy double-checked locking for client initialization, correct error wrapping, and proper image route selection. Minor: close() and aclose() are not lock-guarded, so a concurrent lazy-init and close can race, but this is an acceptable lifecycle tradeoff for a shutdown-only operation. |
| packages/data-designer-engine/src/data_designer/engine/models/clients/throttle.py | New AIMD ThrottleManager with solid thread safety via a single threading.Lock. Timeout-overshoot fix is correctly applied (min(wait, remaining)). success_streak resets correctly on every window regardless of whether the limit was increased. Ordering invariant for register-before-acquire is clearly documented. |
| packages/data-designer-engine/src/data_designer/engine/models/clients/types.py | Adding "timeout" to _META_FIELDS correctly strips it from the HTTP body for the native adapter, but introduces a behavioral regression for LiteLLMBridgeClient: per-request timeout values previously forwarded via **transport.body to LiteLLM's router are now silently dropped because the bridge never reads transport.timeout. |
| packages/data-designer-engine/src/data_designer/engine/models/clients/retry.py | Clean frozen dataclass with sensible defaults mirroring the existing LiteLLM router settings. TODO comment about eventual 429 removal is appropriately placed. No issues found. |
| packages/data-designer-engine/src/data_designer/engine/models/clients/factory.py | Provider routing is clean: env-var override → openai → bridge fallback. Case-insensitive comparison for provider_type is correctly applied. _resolve_api_key helper properly handles None keys without passing placeholder strings to the native adapter. |
| packages/data-designer-engine/src/data_designer/engine/models/clients/parsing.py | New extract_reasoning_content correctly prioritises the vLLM canonical reasoning field with proper type guard on both the primary and fallback values. Consistent use of get_value_from across all parsing functions is a good improvement. |
| packages/data-designer-engine/src/data_designer/engine/models/registry.py | ThrottleManager and RetryConfig are correctly held as shared resources and exposed as read-only properties. _get_model correctly calls throttle_manager.register() on lazy facade creation, ensuring registration before any acquire. ModelFacadeFactory type alias cleanly captures the updated 4-arg signature. |
| packages/data-designer-engine/tests/engine/models/clients/test_openai_compatible.py | Thorough test coverage for chat, embeddings, image routing, auth headers, error mapping, lifecycle, and lazy init. One minor issue: the async lazy-init test creates a real httpx.AsyncClient without closing it, which may emit ResourceWarning. |
| packages/data-designer-engine/tests/engine/models/clients/test_throttle.py | Comprehensive AIMD lifecycle test using synthetic now timestamps avoids real sleeps. Thread-safety test with 8 concurrent workers is valuable. All AIMD edge cases (no registration, global cap clamping, domain isolation, recovery windows) are covered. |
Sequence Diagram
sequenceDiagram
participant Caller
participant Factory as create_model_client
participant Native as OpenAICompatibleClient
participant Bridge as LiteLLMBridgeClient
participant Transport as RetryTransport
participant API as Provider API
Caller->>Factory: create_model_client(model_config, ...)
alt DATA_DESIGNER_MODEL_BACKEND=litellm_bridge
Factory->>Bridge: _create_bridge_client(...)
else provider_type == "openai"
Factory->>Native: OpenAICompatibleClient(endpoint, retry_config, ...)
else other provider
Factory->>Bridge: _create_bridge_client(...)
end
Note over Caller,Native: Native adapter request flow
Caller->>Native: completion(ChatCompletionRequest)
Native->>Native: TransportKwargs.from_request()<br/>(timeout → transport.timeout, not body)
Native->>Native: _get_sync_client() [lazy init w/ lock]
Native->>Transport: POST /chat/completions
loop Retry on {429,502,503,504}
Transport->>API: HTTP POST
API-->>Transport: response
end
Transport-->>Native: final response
alt status >= 400
Native-->>Caller: raise ProviderError
else JSON decode error
Native-->>Caller: raise ProviderError(API_ERROR)
else success
Native->>Native: parse_chat_completion_response(dict)
Native-->>Caller: ChatCompletionResponse
end
Comments Outside Diff (2)
-
packages/data-designer-engine/src/data_designer/engine/models/clients/types.py, line 127 (link)timeoutsilently dropped forLiteLLMBridgeClientAdding
"timeout"to_META_FIELDSmeanstimeoutis no longer collected intotransport.body— it is extracted intotransport.timeoutinstead. This is correct forOpenAICompatibleClient, which readstransport.timeoutin every_post_sync/_apostcall.However,
LiteLLMBridgeClientforwards only**transport.bodyto its router calls (e.g.,self._router.completion(model=..., messages=..., extra_headers=..., **transport.body)) and never readstransport.timeout. Before this PR, a caller passingChatCompletionRequest(timeout=120.0)had that value forwarded to LiteLLM as a per-request timeout kwarg (LiteLLM'sRouter.completionacceptstimeout). After this PR the value is silently discarded for the bridge path — the same applies toEmbeddingRequest.timeoutandImageGenerationRequest.timeout.The bridge will need to be updated to pass
transport.timeoutexplicitly to the router, e.g.:response = self._router.completion( model=request.model, messages=request.messages, extra_headers=transport.headers or None, **({"timeout": transport.timeout} if transport.timeout is not None else {}), **transport.body, )
(Or add a
timeoutkeyword directly to each router call site inLiteLLMBridgeClient.) -
packages/data-designer-engine/tests/engine/models/clients/test_openai_compatible.py, line 1857-1866 (link)Unclosed
httpx.AsyncClientin sync test_get_async_client()creates a realhttpx.AsyncClient(complete with an internal connection pool), but this sync test never closes it. The companion sync-client test callsclient.close()at the end; this one has no cleanup. This will emitResourceWarning: Unclosed client/Unclosed connectordepending on how the test runner surfaces asyncio warnings.Suggested fix — close the underlying client at the end of the test:
def test_lazy_async_client_creates_real_httpx_async_client() -> None: client = _make_client() assert client._aclient is None async_client = client._get_async_client() assert async_client is not None assert client._aclient is async_client assert client._get_async_client() is async_client # clean up to avoid ResourceWarning: Unclosed client import asyncio asyncio.get_event_loop().run_until_complete(client.aclose())
Alternatively convert to
@pytest.mark.asyncioandawait client.aclose().
Prompt To Fix All With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/models/clients/types.py
Line: 127
Comment:
**`timeout` silently dropped for `LiteLLMBridgeClient`**
Adding `"timeout"` to `_META_FIELDS` means `timeout` is no longer collected into `transport.body` — it is extracted into `transport.timeout` instead. This is correct for `OpenAICompatibleClient`, which reads `transport.timeout` in every `_post_sync`/`_apost` call.
However, `LiteLLMBridgeClient` forwards only `**transport.body` to its router calls (e.g., `self._router.completion(model=..., messages=..., extra_headers=..., **transport.body)`) and never reads `transport.timeout`. Before this PR, a caller passing `ChatCompletionRequest(timeout=120.0)` had that value forwarded to LiteLLM as a per-request timeout kwarg (LiteLLM's `Router.completion` accepts `timeout`). After this PR the value is silently discarded for the bridge path — the same applies to `EmbeddingRequest.timeout` and `ImageGenerationRequest.timeout`.
The bridge will need to be updated to pass `transport.timeout` explicitly to the router, e.g.:
```python
response = self._router.completion(
model=request.model,
messages=request.messages,
extra_headers=transport.headers or None,
**({"timeout": transport.timeout} if transport.timeout is not None else {}),
**transport.body,
)
```
(Or add a `timeout` keyword directly to each router call site in `LiteLLMBridgeClient`.)
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: packages/data-designer-engine/tests/engine/models/clients/test_openai_compatible.py
Line: 1857-1866
Comment:
**Unclosed `httpx.AsyncClient` in sync test**
`_get_async_client()` creates a real `httpx.AsyncClient` (complete with an internal connection pool), but this sync test never closes it. The companion sync-client test calls `client.close()` at the end; this one has no cleanup. This will emit `ResourceWarning: Unclosed client` / `Unclosed connector` depending on how the test runner surfaces asyncio warnings.
Suggested fix — close the underlying client at the end of the test:
```python
def test_lazy_async_client_creates_real_httpx_async_client() -> None:
client = _make_client()
assert client._aclient is None
async_client = client._get_async_client()
assert async_client is not None
assert client._aclient is async_client
assert client._get_async_client() is async_client
# clean up to avoid ResourceWarning: Unclosed client
import asyncio
asyncio.get_event_loop().run_until_complete(client.aclose())
```
Alternatively convert to `@pytest.mark.asyncio` and `await client.aclose()`.
How can I resolve this? If you propose a fix, please make it concise.Last reviewed commit: 3b491c9
...s/data-designer-engine/src/data_designer/engine/models/clients/adapters/openai_compatible.py
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/models/clients/throttle.py
Outdated
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/models/clients/parsing.py
Outdated
Show resolved
Hide resolved
...s/data-designer-engine/src/data_designer/engine/models/clients/adapters/openai_compatible.py
Show resolved
Hide resolved
…ent timeout overshoot Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
packages/data-designer-engine/src/data_designer/engine/models/clients/throttle.py
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/models/clients/factory.py
Outdated
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/models/clients/throttle.py
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/models/clients/retry.py
Show resolved
Hide resolved
- Guard reasoning_content fallback with isinstance(str) check to prevent non-string provider values from violating the return type contract - Normalize provider_type comparison to lowercase so mixed-case configs (e.g. "OpenAI") route to the native adapter instead of silently falling through to the bridge - Document register-before-acquire ordering invariant on ThrottleManager - Add TODO to remove 429 from retryable_status_codes once ThrottleManager is wired via AsyncTaskScheduler (plan 346) Made-with: Cursor
...s/data-designer-engine/src/data_designer/engine/models/clients/adapters/openai_compatible.py
Show resolved
Hide resolved
|
/review |
...s/data-designer-engine/src/data_designer/engine/models/clients/adapters/openai_compatible.py
Show resolved
Hide resolved
...s/data-designer-engine/src/data_designer/engine/models/clients/adapters/openai_compatible.py
Show resolved
Hide resolved
|
(AR) Suggestion: Add test for non-JSON response error path
The test suite doesn't exercise the path where def test_non_json_response_raises_provider_error() -> None:
resp = MagicMock()
resp.status_code = 200
resp.json.side_effect = ValueError("not json")
sync_mock = MagicMock()
sync_mock.post = MagicMock(return_value=resp)
client = _make_client(sync_client=sync_mock)
request = ChatCompletionRequest(model=MODEL, messages=[{"role": "user", "content": "Hi"}])
with pytest.raises(ProviderError) as exc_info:
client.completion(request)
assert exc_info.value.kind == ProviderErrorKind.API_ERROR |
|
(AR) This PR adds a native httpx-based adapter ( The implementation is clean and well-structured. The adapter correctly delegates parsing to shared helpers (ensuring behavioral parity with the LiteLLM bridge), the AIMD algorithm is sound with thorough test coverage (including a full lifecycle scenario and thread safety verification), and the factory routing logic with env-var override provides good rollback safety. Two findings are worth addressing: The existing review feedback from the previous round has been substantially addressed — the timeout overshoot fix, reasoning_content type guard, case-insensitive provider routing, and ordering invariant documentation are all in good shape. Test coverage is comprehensive across all new modules. The plan compliance is solid — this PR delivers the core PR-3 scope with throttle wiring correctly deferred to the AsyncTaskScheduler integration. |
📋 Summary
Third PR in the model facade overhaul series (plan, architecture notes). Introduces the first native HTTP adapter (
OpenAICompatibleClient) with shared retry infrastructure and an adaptive AIMD throttle resource, moving toward LiteLLM removal for OpenAI-compatible providers.Previous PRs:
🔄 Changes
✨ Added
clients/adapters/openai_compatible.py— nativehttpx-based adapter implementingModelClientprotocol for OpenAI-compatible APIs (chat, embeddings, image generation)clients/retry.py—RetryConfig(frozen dataclass) +create_retry_transport()usinghttpx_retries.RetryTransport. Defaults mirror current LiteLLM router settingsclients/throttle.py—ThrottleManagerwith AIMD concurrency control. Two-level keying: global cap(provider, model)+ per-domain state(chat, embedding, image). Configurableadditive_increase,success_window, and acquiretimeout(default 300s)plans/343/model-facade-overhaul-pr-3-architecture-notes.md— architecture notes documenting design decisions and ownership rationale🔧 Changed
clients/factory.py— routesprovider_type="openai"to native adapter;DATA_DESIGNER_MODEL_BACKEND=litellm_bridgeforces bridge fallback for rollback safetyclients/parsing.py—extract_reasoning_content()checksmessage.reasoning(vLLM >= 0.16.0) beforemessage.reasoning_content(legacy). Parsing functions now useget_value_from()consistently for dict+object compatibility (#374)clients/errors.py— extractedinfer_error_kind_from_exception()from LiteLLM bridge as shared utilitymodels/registry.py— holds sharedThrottleManagerandRetryConfig; registers aliases on lazy facade creationmodels/factory.py— creates sharedThrottleManagerandRetryConfig, passes through to client factory🧪 Tests
test_openai_compatible.py— 20 tests: chat/embedding/image routing, auth headers, error mapping, transport errors, lifecycle, capabilitiestest_throttle.py— 22 tests: AIMD lifecycle scenario (429 → backoff → cooldown → recovery), acquire timeout, configurable additive increase, global cap clamping, domain isolation, thread safetytest_retry.py— config defaults, frozen immutability, transport propagationtest_factory.py— provider routing, bridge env overridetest_parsing.py—extract_reasoning_contentwith vLLM canonical field, legacy fallback, both-present precedence🔍 Attention Areas
throttle.py— AIMD algorithm correctness, thread safety via single lock, recovery cost trade-offs. Note: ThrottleManager is a standalone resource — acquire/release will be wired by theAsyncTaskScheduler(plan 346), not by the adapteropenai_compatible.py— lazy client initialization with double-checked locking, connection pool sizing heuristicsfactory.py— routing logic: env var override → provider type → bridge fallback🤖 Generated with AI