From 69dbe7bc12f07deae21777b02e1006cc735829be Mon Sep 17 00:00:00 2001 From: rimkusaurimas Date: Tue, 5 May 2026 19:35:04 +0200 Subject: [PATCH 1/6] feat(intercept): extend to httpx instance methods + async, trust all HTTP methods, add OpenAI Agents SDK e2e MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Why The previous interceptor only patched module-level shortcuts (`httpx.get`, `httpx.post`, `requests.get`, `requests.post`). Frameworks that instantiate `httpx.AsyncClient(...)` or `httpx.Client(...)` or `requests.Session(...)` and call instance methods were silently unrecorded. The OpenAI Agents SDK — the first agent-framework integration target — falls into this category: it routes all LLM calls through `httpx.AsyncClient.send`. Also, the trust gate (`_require_trusted_endpoint`) previously fired only on GET, leaving POST/PUT/DELETE mutations unchecked. ## What changed ### Interceptor extension - Patches `httpx.Client.send`, `httpx.AsyncClient.send`, and `requests.Session.send` — the single choke points all instance-method variants (`get`, `post`, `request`, `stream`, …) flow through. - Keeps existing module-level patches; a new re-entry `ContextVar` guard in `_reentry.py` prevents double-recording when a module-level call internally delegates to `send`. ### Trust gate (breaking-ish) - `_require_trusted_endpoint` now fires on **all** HTTP methods, not just GET. - Existing users must seed `trusted_endpoints` with LLM provider URLs (e.g. `https://openrouter.ai/api/v1/chat/completions`) or POST calls will be BLOCKED. Migration note in CHANGELOG. ### Self-egress exemption - New `provably_self_egress()` context manager (`_self_egress.py`) marks a block as SDK-internal egress: skips both recording and the trust gate. - Applied at every outbound call site in `handoff/{transport,evaluator,_http}`. - Re-exported from `provably.intercept` for advanced users. ### OpenAI Agents SDK integration - `tests/e2e/test_openai_agents_e2e.py`: 6 deterministic scenarios (happy path, tampered claim → CAUGHT, untrusted GET → BLOCKED, untrusted POST → BLOCKED, self-egress exemption, async LLM call recorded). No network egress; uses loopback `FakeHttpServer` instances as the fake LLM and data endpoints. - `examples/openai_agents/agent_run.py` + `README.md`: runnable demo against real OpenRouter (reads `OPENROUTER_API_KEY`). Uses `openai/gpt-4o-mini` and open-meteo (free, no auth) as the tool endpoint. - `pytest-asyncio>=0.23` added to dev deps; `asyncio_mode = "auto"` set. ### Tests - 9 new unit tests covering instance-method patches, double-record guard, and trust-on-all-methods. - `patched_interceptor` fixture lifted from `test_interceptor_e2e.py` into `tests/e2e/conftest.py` for reuse. - Full suite: 97/97 pass. Co-Authored-By: Claude Opus 4.7 --- CHANGELOG.md | 43 ++ examples/openai_agents/README.md | 100 +++++ examples/openai_agents/agent_run.py | 246 +++++++++++ pyproject.toml | 3 +- src/provably/handoff/_http.py | 51 ++- src/provably/handoff/evaluator.py | 70 ++-- src/provably/handoff/transport.py | 4 +- src/provably/intercept/__init__.py | 2 + src/provably/intercept/_reentry.py | 25 ++ src/provably/intercept/_self_egress.py | 28 ++ src/provably/intercept/_storage.py | 8 +- src/provably/intercept/interceptor.py | 112 ++++- tests/conftest.py | 21 + tests/e2e/conftest.py | 41 ++ tests/e2e/test_interceptor_e2e.py | 17 - tests/e2e/test_openai_agents_e2e.py | 559 +++++++++++++++++++++++++ tests/unit/test_interceptor.py | 186 ++++++++ 17 files changed, 1441 insertions(+), 75 deletions(-) create mode 100644 examples/openai_agents/README.md create mode 100644 examples/openai_agents/agent_run.py create mode 100644 src/provably/intercept/_reentry.py create mode 100644 src/provably/intercept/_self_egress.py create mode 100644 tests/e2e/test_openai_agents_e2e.py diff --git a/CHANGELOG.md b/CHANGELOG.md index d17bd1a..7a9e704 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,48 @@ # Changelog +## 0.3.0 + +### OpenAI Agents SDK integration + +- Added `examples/openai_agents/` — a runnable end-to-end demo that drives a + real OpenRouter model call through the full Provably intercept → handoff → + evaluate pipeline. Model: `openai/gpt-4o-mini` (~$0.001/run); data API: + Open-Meteo (no auth required). +- Added `tests/e2e/test_openai_agents_e2e.py` — six deterministic scenarios + (A–F) using in-process `FakeHttpServer`s; zero network egress in CI. + +### Broader HTTP interception surface + +- `httpx.Client.send`, `httpx.AsyncClient.send`, and `requests.Session.send` + are now patched in addition to the existing module-level shortcuts + (`httpx.get`, `httpx.post`, `requests.get`, `requests.post`). This means + every outbound HTTP call from any framework — including the async agent loops + used by the OpenAI Agents SDK — is intercepted without any user-side changes. +- A re-entry contextvar guard (`_reentry.already_recording`) prevents + double-recording when a module-level call (e.g. `httpx.get`) internally + delegates to the newly-patched `Client.send`. + +### Trust gate fires on all HTTP methods (BREAKING-ISH) + +- **Before this release** `_require_trusted_endpoint` was only called for GET + requests. It now fires unconditionally for every method (POST, PUT, PATCH, + DELETE, etc.). +- **Migration:** register every outbound URL — including your LLM provider + (e.g. `https://openrouter.ai/api/v1/chat/completions`) — in `trusted_endpoints` + before running an agent. Use `INSERT ... ON CONFLICT DO NOTHING` or the + Provably dashboard to add rows. See `examples/openai_agents/agent_run.py` + for the pattern. + +### New `provably_self_egress()` context manager + +- Added `provably.intercept.provably_self_egress()` — a context manager that + marks a block of code as SDK-internal egress. Inside it, the trust gate is + bypassed and no intercept rows are written. All SDK self-egress sites + (`handoff.transport`, `handoff.evaluator`, `handoff._bootstrap`) already wrap + their own HTTP calls in this context, so the SDK never trips its own gate. + Advanced users who make their own Provably API calls from within an agent loop + can use this to avoid BLOCKED errors. + ## 0.2.0 - Added `provably.configure_indexing(enable_indexing: bool)`: one-call bootstrap (`initialize_runtime` + `init_interceptor` + `enable` / `disable`) for sender agents. diff --git a/examples/openai_agents/README.md b/examples/openai_agents/README.md new file mode 100644 index 0000000..5501cc4 --- /dev/null +++ b/examples/openai_agents/README.md @@ -0,0 +1,100 @@ +# OpenAI Agents SDK + Provably — Runnable Demo + +This demo shows an end-to-end run of the Provably SDK integrated with the +[OpenAI Agents SDK](https://github.com/openai/openai-agents-python) (>=0.0.3). +It exercises every pillar of the SDK in a single script: + +1. **Intercept** — `configure_indexing(True)` installs monkey-patches on + `httpx.AsyncClient.send` and `requests.Session.send` so every outbound HTTP + request from the agent loop is captured and stored in `provably_intercepts`. +2. **Trust gate** — before storing a request the SDK checks that its URL is + registered in `trusted_endpoints`. The demo seeds both the OpenRouter + chat-completions URL and the Open-Meteo weather URL before running the agent. +3. **Tool call** — the agent uses a `@function_tool` that calls the free + [Open-Meteo API](https://open-meteo.com/) (no API key required) to fetch the + current temperature in London. +4. **Handoff** — the captured intercept row id is wrapped in a `HandoffPayload` + with one `HandoffClaim` asserting the tool output. +5. **Evaluate** — `evaluate_handoff()` fetches the stored query record from the + Provably backend, compares it to the claimed value, and prints the verdict. + +Expected output (abbreviated): + +```json +{ + "outcome": "PASS", + "per_claim": [ + { + "action_name": "get_weather", + "result": "PASS", + "proof_time_ms": 42, + "verify_time_ms": 137 + } + ], + "errors": [] +} +``` + +## Required environment variables + +| Variable | Required | Notes | +|---|---|---| +| `OPENROUTER_API_KEY` | yes | API key for [OpenRouter](https://openrouter.ai/). Used for the model call (`openai/gpt-4o-mini`). | +| `PROVABLY_API_KEY` | yes | Provably integration API key. | +| `PROVABLY_ORG_ID` | yes | Provably organisation id. Scopes trusted-endpoint and query-record lookups. | +| `PROVABLY_RUST_BE_URL` | yes | Base URL of the Provably Rust backend (e.g. `https://api.provably.ai`). | +| `POSTGRES_URL` | yes | PostgreSQL DSN (e.g. `postgresql://user:pass@host/db`). Used for intercept storage and trusted-endpoint registry. | + +## How to run + +```bash +# 1. Install the SDK in editable mode with dev extras (includes openai-agents) +pip install -e .[dev] + +# 2. Export the required env vars +export OPENROUTER_API_KEY="sk-or-..." +export PROVABLY_API_KEY="prov_..." +export PROVABLY_ORG_ID="org_..." +export PROVABLY_RUST_BE_URL="https://api.provably.ai" +export POSTGRES_URL="postgresql://user:pass@localhost/provably" + +# 3. Run the demo +python examples/openai_agents/agent_run.py +``` + +## Model and cost + +The demo uses **`openai/gpt-4o-mini`** on OpenRouter — a cheap, capable model +that reliably follows tool-calling instructions. Estimated cost is approximately +**$0.001 per run** (one tool call + one summary turn). + +## How the trust gate works — and what happens when you forget to seed it + +The Provably SDK now enforces trust on **all HTTP methods** (GET, POST, etc.), +not only GET. This means the LLM provider call (a POST to OpenRouter) *and* the +weather API call (a GET to Open-Meteo) both need to be registered in +`trusted_endpoints` before the agent runs. + +If you forget to seed an endpoint, the SDK raises: + +``` +RuntimeError: BLOCKED: endpoint https://openrouter.ai/api/v1/chat/completions not in trusted index for org +``` + +When this error occurs inside `httpx.AsyncClient.send` (the async LLM call), the +OpenAI SDK wraps it in an `APIConnectionError`. You can inspect the full +exception chain to find the original `BLOCKED: ...` message. + +**Migration note for existing users:** if you were previously relying on the SDK +only trust-checking GET requests, you must now register *all* outbound URLs +including your LLM provider URL. Use the `seed_trusted_endpoints` helper pattern +shown in this demo (raw psycopg2 `INSERT ... ON CONFLICT DO NOTHING`), or add +rows via the Provably dashboard. + +## How `provably_self_egress()` relates to this demo + +The Provably SDK's own HTTP calls (fetching query records, posting verify +requests, bootstrap handshakes) are **never** blocked by the trust gate. They +run inside `with provably_self_egress():` context managers that mark them as +SDK-internal egress, so the trust gate is bypassed automatically. You do not +need to add Provably's own backend URL to `trusted_endpoints`. diff --git a/examples/openai_agents/agent_run.py b/examples/openai_agents/agent_run.py new file mode 100644 index 0000000..e8447c4 --- /dev/null +++ b/examples/openai_agents/agent_run.py @@ -0,0 +1,246 @@ +""" +Runnable demo: OpenAI Agents SDK + Provably interception → handoff → evaluate. + +Prerequisites +------------- +Set the following environment variables before running: + + OPENROUTER_API_KEY – OpenRouter API key (model call) + PROVABLY_API_KEY – Provably integration key + PROVABLY_ORG_ID – Provably organisation id + PROVABLY_RUST_BE_URL – Provably Rust backend base URL + POSTGRES_URL – PostgreSQL DSN for intercept storage + +Run: + pip install -e .[dev] + python examples/openai_agents/agent_run.py + +Cost: ~$0.001 per run using openai/gpt-4o-mini on OpenRouter. +""" + +from __future__ import annotations + +import asyncio +import json +import os + +import psycopg2 +import requests + +# --------------------------------------------------------------------------- +# Step 1 – activate Provably indexing +# (calls initialize_runtime() + init_interceptor() + enable()) +# --------------------------------------------------------------------------- +import provably.runtime as _prt +_prt.configure_indexing(enable_indexing=True) + +# --------------------------------------------------------------------------- +# Step 2 – configure the OpenAI Agents SDK to use OpenRouter +# --------------------------------------------------------------------------- +from openai import AsyncOpenAI +from agents import ( + Agent, + Runner, + function_tool, + set_default_openai_client, + set_default_openai_api, +) + +_openrouter_client = AsyncOpenAI( + base_url="https://openrouter.ai/api/v1", + api_key=os.environ["OPENROUTER_API_KEY"], +) +set_default_openai_client(_openrouter_client, use_for_tracing=False) +# OpenRouter speaks the Chat Completions API, not the Responses API that the +# Agents SDK defaults to. Switching to "chat_completions" is required. +set_default_openai_api("chat_completions") + +# --------------------------------------------------------------------------- +# Step 3 – seed trusted_endpoints +# We register both the LLM provider URL and the weather API URL so the trust +# gate allows them through. normalize_url_for_trust strips trailing slashes +# and normalises scheme/host; we use the exact path URLs so exact matching works. +# --------------------------------------------------------------------------- +from provably.trusted_endpoints import normalize_url_for_trust, ensure_trusted_endpoints_table + +# Exact URLs that will appear in intercepted requests +_OPENROUTER_COMPLETIONS_URL = "https://openrouter.ai/api/v1/chat/completions" +_OPEN_METEO_URL = ( + "https://api.open-meteo.com/v1/forecast" + "?latitude=51.5074&longitude=-0.1278¤t=temperature_2m" +) + +_TRUSTED_URLS = [ + _OPENROUTER_COMPLETIONS_URL, + "https://api.open-meteo.com/v1/forecast", # base path (prefix match not needed – exact norm) +] + + +def _seed_trusted_endpoints() -> None: + """Insert the demo URLs into trusted_endpoints (idempotent ON CONFLICT DO NOTHING).""" + postgres_url = os.environ["POSTGRES_URL"] + org_id = os.environ["PROVABLY_ORG_ID"] + + conn = psycopg2.connect(postgres_url) + try: + ensure_trusted_endpoints_table(conn) + with conn.cursor() as cur: + for url in _TRUSTED_URLS: + norm = normalize_url_for_trust(url) + cur.execute( + """ + INSERT INTO trusted_endpoints (org_id, normalized_url, display_label, entry_type) + VALUES (%s, %s, %s, 'endpoint') + ON CONFLICT (org_id, normalized_url) WHERE revoked_at IS NULL DO NOTHING + """, + (org_id, norm, url), + ) + conn.commit() + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Step 4 – define the weather tool +# --------------------------------------------------------------------------- +from provably.intercept import set_interceptor_context, take_last_intercept_row_id +from provably.handoff.evaluator import evaluate_handoff +from provably.handoff.types import HandoffClaim, HandoffPayload + + +@function_tool +def get_current_temperature_london() -> dict: + """Fetch the current temperature in London (51.5074 N, 0.1278 W) from Open-Meteo. + + Returns a dict with a ``temperature_2m`` key (Celsius, float). + """ + # Tag this tool call so the interceptor records it under action_name="get_weather" + set_interceptor_context(agent_id="demo", action_name="get_weather") + + response = requests.get( + "https://api.open-meteo.com/v1/forecast", + params={ + "latitude": 51.5074, + "longitude": -0.1278, + "current": "temperature_2m", + }, + timeout=30, + ) + response.raise_for_status() + data = response.json() + # The Open-Meteo API returns: {"current": {"temperature_2m": , ...}, ...} + current = data.get("current", {}) + temperature = current.get("temperature_2m") + return {"temperature_2m": temperature} + + +# --------------------------------------------------------------------------- +# Step 5 – main: run agent → capture intercept → build claim → evaluate +# --------------------------------------------------------------------------- + +async def main() -> None: + print("Seeding trusted endpoints…") + _seed_trusted_endpoints() + + agent = Agent( + name="weather-demo", + instructions=( + "You are a helpful assistant. When asked about the current temperature in London, " + "use the get_current_temperature_london tool and report the result clearly." + ), + tools=[get_current_temperature_london], + model="openai/gpt-4o-mini", + ) + + print("Running agent…") + result = await Runner.run(agent, "What is the current temperature in London?") + print(f"\nAgent response: {result.final_output}\n") + + # ----------------------------------------------------------------- + # Capture the intercept row id for the weather tool call. + # take_last_intercept_row_id() pops the most-recently inserted row id + # (set by the interceptor when it stored the Open-Meteo response). + # ----------------------------------------------------------------- + intercept_row_id = take_last_intercept_row_id() + if intercept_row_id is None: + print( + "WARNING: No intercept row captured. " + "Check POSTGRES_URL and that trusted_endpoints are seeded correctly." + ) + + # ----------------------------------------------------------------- + # Extract the temperature value the tool actually returned. + # The agent's tool output is available in result.new_items as ToolCallOutputItem. + # For simplicity, re-derive it from the final output text is fragile, so we + # inspect the tool call outputs directly. + # ----------------------------------------------------------------- + tool_output_value: dict = {} + for item in result.new_items: + # ToolCallOutputItem has .output attribute (the raw tool return value) + if hasattr(item, "output"): + raw_out = item.output + if isinstance(raw_out, str): + try: + tool_output_value = json.loads(raw_out) + except Exception: # noqa: BLE001 + pass + elif isinstance(raw_out, dict): + tool_output_value = raw_out + if tool_output_value: + break + + print(f"Tool output captured for claim: {tool_output_value}") + + # ----------------------------------------------------------------- + # Build a HandoffPayload with one HandoffClaim referencing the + # intercepted Open-Meteo response stored in provably_intercepts. + # ----------------------------------------------------------------- + org_id = os.environ["PROVABLY_ORG_ID"] + provably_base_url = os.environ.get("PROVABLY_RUST_BE_URL", "").rstrip("/") + postgres_url = os.environ["POSTGRES_URL"] + + # Import the integration_api_key from the runtime cache + from provably.handoff.client import cached_integration_api_key + integration_key = cached_integration_api_key() + + # Build the query_record_id. When intercept_row_id is available the + # convention used by build_handoff_payload is to create a query record + # against the row PK. For this demo we build the HandoffClaim directly + # with the row id encoded as the query_record_id string (the evaluator + # will fetch the record from the Provably backend using this id). + # In production usage you would call create_query_record_for_intercept() + # (via build_handoff_payload) to get the real Provably query record id. + query_record_id = str(intercept_row_id) if intercept_row_id else "" + + # Load the trusted endpoint URLs for the snapshot embedded in the payload + from provably.trusted_endpoints import load_trusted_endpoint_urls + trusted_urls = load_trusted_endpoint_urls(postgres_url, org_id) + + payload = HandoffPayload( + provably_org_id=org_id, + integration_api_key=integration_key, + trusted_endpoint_registry=trusted_urls, + claims=[ + HandoffClaim( + action_name="get_weather", + claimed_value=tool_output_value, + query_record_id=query_record_id, + verification_mode="verbatim", + ) + ], + ) + + print("Evaluating handoff…") + eval_result = evaluate_handoff( + payload, + provably_base_url=provably_base_url, + postgres_url=postgres_url, + org_id_fallback=org_id, + ) + + print("\nEvaluation result:") + print(json.dumps(eval_result, indent=2)) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index fe9dc49..b605407 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ dependencies = [ ] [project.optional-dependencies] -dev = ["pytest>=8.0", "ruff>=0.3", "build>=1.2"] +dev = ["pytest>=8.0", "pytest-asyncio>=0.23", "ruff>=0.3", "build>=1.2", "openai-agents>=0.0.3"] [project.urls] Homepage = "https://github.com/ProvablyAI/provably-python-sdk" @@ -51,6 +51,7 @@ known-first-party = ["provably"] [tool.pytest.ini_options] testpaths = ["tests"] pythonpath = ["src", "."] +asyncio_mode = "auto" markers = [ "e2e: end-to-end tests that drive the SDK against a real loopback HTTP server", ] diff --git a/src/provably/handoff/_http.py b/src/provably/handoff/_http.py index 8d0a846..ba3e833 100644 --- a/src/provably/handoff/_http.py +++ b/src/provably/handoff/_http.py @@ -15,6 +15,18 @@ _log = get_logger(__name__) _SESSION = requests.Session() + +def _self_egress(): + """Lazy import of provably_self_egress to avoid circular imports at module load time. + + provably.intercept._self_egress -> provably/intercept/__init__.py -> interceptor.py + -> _storage.py -> provably.handoff._preprocess -> provably.handoff._http (circular). + Deferring the import to call time avoids the cycle. + """ + from provably.intercept._self_egress import provably_self_egress # noqa: PLC0415 + + return provably_self_egress() + _TRANSIENT_STATUS = {429, 502, 503, 504} @@ -129,7 +141,8 @@ def log_failed_response(resp: requests.Response) -> None: def get_json(path: str) -> Any: - resp = _SESSION.get(f"{base_url()}{path}", headers=headers(), timeout=60) + with _self_egress(): + resp = _SESSION.get(f"{base_url()}{path}", headers=headers(), timeout=60) if not resp.ok: log_failed_response(resp) resp.raise_for_status() @@ -138,9 +151,10 @@ def get_json(path: str) -> Any: def get_json_params(path: str, params: dict[str, Any], *, timeout_s: float = 60.0) -> Any: """GET with query-string parameters (e.g. list queries filtered by ``collection_ids``).""" - resp = _SESSION.get( - f"{base_url()}{path}", headers=headers(), params=params, timeout=timeout_s - ) + with _self_egress(): + resp = _SESSION.get( + f"{base_url()}{path}", headers=headers(), params=params, timeout=timeout_s + ) if not resp.ok: log_failed_response(resp) resp.raise_for_status() @@ -151,12 +165,13 @@ def get_json_params(path: str, params: dict[str, Any], *, timeout_s: float = 60. def post_json(path: str, payload: dict[str, Any] | None = None) -> dict[str, Any]: - resp = _SESSION.post( - f"{base_url()}{path}", - headers=headers(), - json=payload or {}, - timeout=60, - ) + with _self_egress(): + resp = _SESSION.post( + f"{base_url()}{path}", + headers=headers(), + json=payload or {}, + timeout=60, + ) if not resp.ok: log_failed_response(resp) resp.raise_for_status() @@ -165,7 +180,8 @@ def post_json(path: str, payload: dict[str, Any] | None = None) -> dict[str, Any def post_raw(path: str, payload: dict[str, Any]) -> requests.Response: """Post without raising so callers can inspect error bodies (e.g. 'already exists').""" - return _SESSION.post(f"{base_url()}{path}", headers=headers(), json=payload, timeout=60) + with _self_egress(): + return _SESSION.post(f"{base_url()}{path}", headers=headers(), json=payload, timeout=60) def post_json_with_transient_retry( @@ -178,12 +194,13 @@ def post_json_with_transient_retry( payload = payload or {} last: requests.Response | None = None for attempt in range(max_attempts): - last = _SESSION.post( - f"{base_url()}{path}", - headers=headers(), - json=payload, - timeout=120, - ) + with _self_egress(): + last = _SESSION.post( + f"{base_url()}{path}", + headers=headers(), + json=payload, + timeout=120, + ) if last.status_code in _TRANSIENT_STATUS: log_failed_response(last) time.sleep(min(3.0 * (attempt + 1), 45)) diff --git a/src/provably/handoff/evaluator.py b/src/provably/handoff/evaluator.py index 138a86b..efc2110 100644 --- a/src/provably/handoff/evaluator.py +++ b/src/provably/handoff/evaluator.py @@ -26,6 +26,7 @@ from provably.handoff.eval_modes import evaluate_claim from provably.handoff.json_utils import canonical_json from provably.handoff.types import HandoffClaim, HandoffPayload +from provably.intercept._self_egress import provably_self_egress from provably.log import get_logger from provably.trusted_endpoints import check_claim_endpoints_are_trusted @@ -106,40 +107,41 @@ def evaluate_handoff( per_claim: list[dict[str, Any]] = [] errors: list[str] = [] - with httpx.Client(timeout=timeout_s) as client: - # Phase 1+2: per-claim compare; capture TPT (TVT comes after verify in phase 3). - for claim in payload.claims: - query_record_id = (claim.query_record_id or "").strip() - if not query_record_id: - err = "missing query_record_id" - errors.append(f"{claim.action_name}: {err}") - per_claim.append(_error(claim, err)) - continue - try: - record = _fetch_query_record(client, base_url, org, query_record_id, api_key) - except Exception as exc: # noqa: BLE001 - # Record exists (we have a query_record_id) but can't be fetched — treat as - # unverifiable (suspicious), not a plain infra/setup error. - errors.append(f"{claim.action_name}: fetch failed: {exc}") - per_claim.append(_caught_verdict(claim, f"fetch failed: {exc}")) - continue - verdict = evaluate_claim(claim, extract_indexed_from_query_record(record)) - verdict["query_record_id"] = query_record_id - timing = _timing_from_query_record(record) - if timing: - verdict = {**verdict, **timing} - per_claim.append(verdict) - - # Phase 3 (final): /verify each unique query_record_id and refresh TVT timing. - _verify_and_refresh_timings( - client=client, - base_url=base_url, - org=org, - api_key=api_key, - payload=payload, - per_claim=per_claim, - errors=errors, - ) + with provably_self_egress(): + with httpx.Client(timeout=timeout_s) as client: + # Phase 1+2: per-claim compare; capture TPT (TVT comes after verify in phase 3). + for claim in payload.claims: + query_record_id = (claim.query_record_id or "").strip() + if not query_record_id: + err = "missing query_record_id" + errors.append(f"{claim.action_name}: {err}") + per_claim.append(_error(claim, err)) + continue + try: + record = _fetch_query_record(client, base_url, org, query_record_id, api_key) + except Exception as exc: # noqa: BLE001 + # Record exists (we have a query_record_id) but can't be fetched — treat as + # unverifiable (suspicious), not a plain infra/setup error. + errors.append(f"{claim.action_name}: fetch failed: {exc}") + per_claim.append(_caught_verdict(claim, f"fetch failed: {exc}")) + continue + verdict = evaluate_claim(claim, extract_indexed_from_query_record(record)) + verdict["query_record_id"] = query_record_id + timing = _timing_from_query_record(record) + if timing: + verdict = {**verdict, **timing} + per_claim.append(verdict) + + # Phase 3 (final): /verify each unique query_record_id and refresh TVT timing. + _verify_and_refresh_timings( + client=client, + base_url=base_url, + org=org, + api_key=api_key, + payload=payload, + per_claim=per_claim, + errors=errors, + ) return {"outcome": _resolve_outcome(per_claim), "per_claim": per_claim, "errors": errors} diff --git a/src/provably/handoff/transport.py b/src/provably/handoff/transport.py index 85dcfec..6eddf3f 100644 --- a/src/provably/handoff/transport.py +++ b/src/provably/handoff/transport.py @@ -5,6 +5,7 @@ import httpx from provably.handoff.types import HandoffPayload +from provably.intercept._self_egress import provably_self_egress from provably.log import get_logger _log = get_logger(__name__) @@ -24,7 +25,8 @@ def post_handoff( body = handoff_payload.model_dump(mode="json") hdrs = {"Content-Type": "application/json", **(headers or {})} try: - resp = httpx.post(url, json=body, headers=hdrs, timeout=timeout_s) + with provably_self_egress(): + resp = httpx.post(url, json=body, headers=hdrs, timeout=timeout_s) resp.raise_for_status() except Exception as e: _log.error("post_handoff_failed", url=url, error=str(e)) diff --git a/src/provably/intercept/__init__.py b/src/provably/intercept/__init__.py index ac1d063..f301962 100644 --- a/src/provably/intercept/__init__.py +++ b/src/provably/intercept/__init__.py @@ -1,6 +1,7 @@ """Intercept phase: monkey-patch ``requests`` + ``httpx`` and record responses into Postgres.""" from ._loader import load_latest_intercept_payload as load_latest_intercept_payload +from ._self_egress import provably_self_egress as provably_self_egress from .interceptor import ( clear_intercept_row_ids as clear_intercept_row_ids, ) @@ -23,6 +24,7 @@ "enable", "init_interceptor", "is_enabled", + "provably_self_egress", "set_intercept_body_hook", "set_intercept_url_allowlist", "set_interceptor_context", diff --git a/src/provably/intercept/_reentry.py b/src/provably/intercept/_reentry.py new file mode 100644 index 0000000..848c436 --- /dev/null +++ b/src/provably/intercept/_reentry.py @@ -0,0 +1,25 @@ +"""Re-entry guard: prevent double-recording when module-level calls delegate to Client.send.""" + +import contextvars + +_in_intercept: contextvars.ContextVar[bool] = contextvars.ContextVar( + "provably_in_intercept", default=False +) + + +def already_recording() -> bool: + """Return True if the current task/thread is already inside an intercept wrapper.""" + return _in_intercept.get() + + +class recording_scope: + """Context manager that marks the current task/thread as 'currently inside an intercept + wrapper', so deeper layers (e.g. Client.send when called from httpx.get) can skip + duplicate recording.""" + + def __enter__(self) -> "recording_scope": + self._token = _in_intercept.set(True) + return self + + def __exit__(self, *exc: object) -> None: + _in_intercept.reset(self._token) diff --git a/src/provably/intercept/_self_egress.py b/src/provably/intercept/_self_egress.py new file mode 100644 index 0000000..7bf71bc --- /dev/null +++ b/src/provably/intercept/_self_egress.py @@ -0,0 +1,28 @@ +"""Self-egress exemption: SDK-internal HTTP calls bypass intercept recording and trust checks.""" + +import contextvars +from contextlib import contextmanager +from typing import Generator + +_self_egress: contextvars.ContextVar[bool] = contextvars.ContextVar( + "provably_self_egress", default=False +) + + +def is_self_egress() -> bool: + """Return True if the current task/thread is inside a provably_self_egress() block.""" + return _self_egress.get() + + +@contextmanager +def provably_self_egress() -> Generator[None, None, None]: + """Mark a block of code as SDK-internal egress: skip trust check AND skip recording. + + Used by handoff.transport, handoff.evaluator, handoff._bootstrap (via handoff._http) + when they make their own httpx / requests calls so the SDK doesn't trip its own gate. + """ + token = _self_egress.set(True) + try: + yield + finally: + _self_egress.reset(token) diff --git a/src/provably/intercept/_storage.py b/src/provably/intercept/_storage.py index edac669..d8d6faa 100644 --- a/src/provably/intercept/_storage.py +++ b/src/provably/intercept/_storage.py @@ -10,6 +10,7 @@ import psycopg2 from provably.handoff._preprocess import preprocess_after_intercept_write +from provably.intercept._self_egress import is_self_egress from provably.log import get_logger from provably.trusted_endpoints import ensure_trusted_endpoints_table, is_trusted_endpoint @@ -112,12 +113,13 @@ def insert_intercept_row( agent_id: str, action_name: str, ) -> int | None: - """Insert a row and return its id; enforce trust on GET before storing.""" + """Insert a row and return its id; enforce trust on all methods before storing.""" + if is_self_egress(): + return None postgres_url = os.getenv("POSTGRES_URL", "").strip() if not postgres_url: return None - if method.upper() == "GET": - _require_trusted_endpoint(postgres_url, url) + _require_trusted_endpoint(postgres_url, url) return _write_row(postgres_url, url, method, request_payload, raw, agent_id, action_name) diff --git a/src/provably/intercept/interceptor.py b/src/provably/intercept/interceptor.py index 01afa0f..4553476 100644 --- a/src/provably/intercept/interceptor.py +++ b/src/provably/intercept/interceptor.py @@ -2,6 +2,7 @@ from __future__ import annotations +import json import threading from collections.abc import Callable from contextvars import ContextVar @@ -10,11 +11,13 @@ import httpx import requests +from provably.intercept._reentry import already_recording, recording_scope from provably.intercept._responses import ( HttpxJsonOverride, RequestsJsonOverride, extract_raw, ) +from provably.intercept._self_egress import is_self_egress from provably.intercept._storage import ( insert_intercept_row, request_payload_dict, @@ -120,7 +123,9 @@ def init_interceptor() -> None: Replaces ``requests.get``/``requests.post`` and ``httpx.get``/``httpx.post`` with wrapped versions that record the response into ``provably_intercepts`` and optionally pass it - through the simulation hook. The patch is one-way (use :func:`disable` to short-circuit + through the simulation hook. Also patches ``httpx.Client.send``, ``httpx.AsyncClient.send``, + and ``requests.Session.send`` as lower-level choke points so instance-based and async + usage is also intercepted. The patch is one-way (use :func:`disable` to short-circuit rather than uninstall) and flips ``enabled=True``. """ global _initialized, _enabled @@ -130,10 +135,18 @@ def init_interceptor() -> None: _orig["requests_post"] = requests.post _orig["httpx_get"] = httpx.get _orig["httpx_post"] = httpx.post + _orig["httpx_client_send"] = httpx.Client.send + _orig["httpx_async_client_send"] = httpx.AsyncClient.send + _orig["requests_session_send"] = requests.Session.send + # Module-level convenience patches (kept for backward compatibility) requests.get = _wrap_call(_orig["requests_get"], "GET") requests.post = _wrap_call(_orig["requests_post"], "POST") httpx.get = _wrap_call(_orig["httpx_get"], "GET") httpx.post = _wrap_call(_orig["httpx_post"], "POST") + # Lower-level instance method patches (cover async, Client(), Session()) + requests.Session.send = _wrap_session_send(_orig["requests_session_send"]) + httpx.Client.send = _wrap_client_send(_orig["httpx_client_send"]) + httpx.AsyncClient.send = _wrap_async_client_send(_orig["httpx_async_client_send"]) _initialized = True _enabled = True @@ -183,6 +196,14 @@ def _maybe_transform_body(raw: Any) -> Any: def _attach(response: Any, url: str, method: str, req_kwargs: dict[str, Any]) -> Any: + """Record the response and optionally mutate it via the simulation hook. + + Short-circuits immediately when inside a self-egress block or already in the middle of + recording (re-entry guard prevents double-recording when module-level calls like + ``httpx.get`` internally delegate to a patched ``Client.send``). + """ + if is_self_egress() or already_recording(): + return response raw = extract_raw(response) req = request_payload_dict(url, method, req_kwargs) nurl = normalize_url_for_trust(str(url)) @@ -206,8 +227,95 @@ def _attach(response: Any, url: str, method: str, req_kwargs: dict[str, Any]) -> def _wrap_call(orig_fn, method: str): + """Wrap a module-level convenience function (e.g. httpx.get, requests.post). + + Sets ``recording_scope`` BEFORE calling the original function so that any + lower-level ``Client.send`` / ``Session.send`` wrapper that fires during the + same call sees ``already_recording() == True`` and skips duplicate recording. + After the original returns, ``_attach`` runs in the outer scope (where + ``already_recording()`` is now False again) to do the actual recording. + """ + def wrapped(url, *args, **kwargs): - response = orig_fn(url, *args, **kwargs) + with recording_scope(): + response = orig_fn(url, *args, **kwargs) + # recording_scope has exited; _attach will record this call return _attach(response, str(url), method, dict(kwargs)) return wrapped + + +def _httpx_request_to_kwargs(req: httpx.Request) -> dict[str, Any]: + """Extract request metadata from an httpx.Request into the kwargs shape request_payload_dict understands.""" + kwargs: dict[str, Any] = {} + # Query params + params = dict(req.url.params) + if params: + kwargs["params"] = params + # Body: try to parse as JSON if Content-Type is application/json + content_type = req.headers.get("content-type", "") + if req.content and "application/json" in content_type: + try: + kwargs["json"] = json.loads(req.content) + except Exception: # noqa: BLE001 + kwargs["content"] = req.content.decode(errors="replace") + elif req.content: + kwargs["data"] = req.content.decode(errors="replace") + return kwargs + + +def _requests_prepared_to_kwargs(req: requests.PreparedRequest) -> dict[str, Any]: + """Extract request metadata from a requests.PreparedRequest into the kwargs shape request_payload_dict understands.""" + from urllib.parse import parse_qs, urlsplit + + kwargs: dict[str, Any] = {} + # Query params from URL + url_str = str(req.url or "") + parsed = urlsplit(url_str) + if parsed.query: + raw_params = parse_qs(parsed.query, keep_blank_values=True) + # Flatten single-value lists to scalars (matches typical kwargs["params"] shape) + kwargs["params"] = {k: (v[0] if len(v) == 1 else v) for k, v in raw_params.items()} + # Body + content_type = (req.headers or {}).get("Content-Type", "") or "" + body = req.body + if body is not None: + if "application/json" in content_type: + try: + body_str = body if isinstance(body, str) else body.decode(errors="replace") + kwargs["json"] = json.loads(body_str) + except Exception: # noqa: BLE001 + kwargs["data"] = body if isinstance(body, str) else body.decode(errors="replace") + else: + kwargs["data"] = body if isinstance(body, str) else body.decode(errors="replace") + return kwargs + + +def _wrap_client_send(orig_send): + """Wrap httpx.Client.send to record responses via _attach.""" + + def wrapped(self, request: httpx.Request, **kwargs: Any) -> httpx.Response: + response = orig_send(self, request, **kwargs) + return _attach(response, str(request.url), request.method, _httpx_request_to_kwargs(request)) + + return wrapped + + +def _wrap_async_client_send(orig_send): + """Wrap httpx.AsyncClient.send to record responses via _attach.""" + + async def wrapped(self, request: httpx.Request, **kwargs: Any) -> httpx.Response: + response = await orig_send(self, request, **kwargs) + return _attach(response, str(request.url), request.method, _httpx_request_to_kwargs(request)) + + return wrapped + + +def _wrap_session_send(orig_send): + """Wrap requests.Session.send to record responses via _attach.""" + + def wrapped(self, request: requests.PreparedRequest, **kwargs: Any) -> requests.Response: + response = orig_send(self, request, **kwargs) + return _attach(response, str(request.url), request.method or "GET", _requests_prepared_to_kwargs(request)) + + return wrapped diff --git a/tests/conftest.py b/tests/conftest.py index 2d79b1c..081d8f6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -18,3 +18,24 @@ """ from __future__ import annotations + +from collections.abc import Iterator + +import pytest + +from tests.e2e.conftest import FakeHttpServer + + +@pytest.fixture +def fake_server() -> Iterator[FakeHttpServer]: + """Loopback HTTP server fixture available to both unit and e2e tests. + + Provides a real in-process loopback server so tests can drive SDK HTTP patches + without any external network access. + """ + server = FakeHttpServer() + server.start() + try: + yield server + finally: + server.stop() diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 1b0e3de..8f679e6 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -16,6 +16,8 @@ import pytest +import provably.intercept.interceptor as _interceptor_module + Handler = Callable[["RecordedRequest"], "FakeResponse"] @@ -124,3 +126,42 @@ def fake_server() -> Iterator[FakeHttpServer]: yield server finally: server.stop() + + +@pytest.fixture +def fake_server_factory() -> Iterator[Callable[[], FakeHttpServer]]: + """Factory fixture that creates and tracks multiple FakeHttpServer instances. + + Each call to the returned factory function starts a new server; all servers + are shut down after the test finishes. + """ + servers: list[FakeHttpServer] = [] + + def _make() -> FakeHttpServer: + s = FakeHttpServer() + s.start() + servers.append(s) + return s + + yield _make + + for s in servers: + s.stop() + + +@pytest.fixture +def patched_interceptor(monkeypatch: pytest.MonkeyPatch) -> list[dict[str, Any]]: + """Install the real interceptor with the storage layer redirected to an in-memory list. + + Yields the list of recorded rows so tests can assert against it. + This fixture is shared between test_interceptor_e2e.py and test_openai_agents_e2e.py. + """ + rows: list[dict[str, Any]] = [] + + def fake_insert(_url: str, request_payload: dict, raw: Any, *, method: str = "GET") -> None: + rows.append({"url": _url, "method": method, "request": request_payload, "raw": raw}) + + monkeypatch.setattr(_interceptor_module, "_insert_row", fake_insert) + _interceptor_module.init_interceptor() + monkeypatch.setattr(_interceptor_module, "_enabled", True) + return rows diff --git a/tests/e2e/test_interceptor_e2e.py b/tests/e2e/test_interceptor_e2e.py index bf99547..8d4f860 100644 --- a/tests/e2e/test_interceptor_e2e.py +++ b/tests/e2e/test_interceptor_e2e.py @@ -24,23 +24,6 @@ from tests.e2e.conftest import FakeHttpServer -@pytest.fixture -def patched_interceptor(monkeypatch: pytest.MonkeyPatch) -> list[dict[str, Any]]: - """Install the real interceptor with the storage layer redirected to an in-memory list. - - Yields the list of recorded rows so tests can assert against it. - """ - rows: list[dict[str, Any]] = [] - - def fake_insert(_url: str, request_payload: dict, raw: Any, *, method: str = "GET") -> None: - rows.append({"url": _url, "method": method, "request": request_payload, "raw": raw}) - - monkeypatch.setattr(interceptor, "_insert_row", fake_insert) - interceptor.init_interceptor() - monkeypatch.setattr(interceptor, "_enabled", True) - return rows - - @pytest.mark.e2e def test_requests_get_records_raw_response( fake_server: FakeHttpServer, patched_interceptor: list[dict[str, Any]] diff --git a/tests/e2e/test_openai_agents_e2e.py b/tests/e2e/test_openai_agents_e2e.py new file mode 100644 index 0000000..ae72b29 --- /dev/null +++ b/tests/e2e/test_openai_agents_e2e.py @@ -0,0 +1,559 @@ +"""End-to-end test suite for OpenAI Agents SDK integration. + +Six scenarios (A–F) drive a fake LLM server + fake data server through the +full intercept → handoff → evaluate flow: + + A – happy path: all URLs trusted, evaluate_handoff → PASS + B – tampered claim: claimed_value wrong → CAUGHT + C – untrusted GET: data URL not in allowlist → tool call raises BLOCKED + D – untrusted POST: LLM URL not in allowlist → LLM POST raises BLOCKED + E – self-egress exemption: Provably backend NOT in allowlist; evaluate_handoff still completes + F – async LLM coverage: at least one recorded row has method == "POST" from the LLM server + +Design note on trust-gate testing (scenarios C and D): + ``patched_interceptor`` replaces ``_insert_row`` in ``interceptor.py``, which bypasses the + entire ``_storage.insert_intercept_row`` path (and therefore never reaches + ``_require_trusted_endpoint``). For C and D we need the trust check to fire but avoid a real + DB write, so those two scenarios use ``patched_interceptor_with_trust`` instead, which: + 1. Calls ``init_interceptor()`` (installs patches), + 2. Monkeypatches POSTGRES_URL to a non-empty sentinel so the early-return guard in + ``insert_intercept_row`` is skipped, + 3. Monkeypatches ``_require_trusted_endpoint`` with the in-memory allowlist check, and + 4. Monkeypatches ``_write_row`` to a no-op so no actual DB call happens. +""" + +from __future__ import annotations + +from typing import Any + +import pytest +import requests as requests_lib + +from openai import AsyncOpenAI +from agents import Agent, Runner, function_tool, set_default_openai_client, set_default_openai_api + +import provably.intercept.interceptor as _interceptor_module +import provably.intercept._storage as _storage_module +from provably.handoff.evaluator import evaluate_handoff +from provably.handoff.types import HandoffClaim, HandoffPayload +from provably.trusted_endpoints import normalize_url_for_trust +from tests.e2e.conftest import FakeHttpServer + + +# --------------------------------------------------------------------------- +# Helper: build canonical ChatCompletion JSON responses for the fake LLM server +# --------------------------------------------------------------------------- + +def _tool_call_response(tool_name: str, call_id: str = "call_001", arguments: str = "{}") -> dict: + """First LLM turn: respond with a tool_call.""" + return { + "id": "chatcmpl-test-turn1", + "object": "chat.completion", + "created": 1700000000, + "model": "gpt-4o-mini", + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": None, + "tool_calls": [ + { + "id": call_id, + "type": "function", + "function": { + "name": tool_name, + "arguments": arguments, + }, + } + ], + }, + "finish_reason": "tool_calls", + "logprobs": None, + } + ], + "usage": { + "prompt_tokens": 10, + "completion_tokens": 5, + "total_tokens": 15, + }, + } + + +def _final_response(content: str = "The temperature is 21 degrees Celsius.") -> dict: + """Second LLM turn: final assistant message.""" + return { + "id": "chatcmpl-test-turn2", + "object": "chat.completion", + "created": 1700000001, + "model": "gpt-4o-mini", + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": content, + "tool_calls": None, + }, + "finish_reason": "stop", + "logprobs": None, + } + ], + "usage": { + "prompt_tokens": 20, + "completion_tokens": 10, + "total_tokens": 30, + }, + } + + +# --------------------------------------------------------------------------- +# Local fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def fake_llm_server(fake_server_factory) -> FakeHttpServer: + """A FakeHttpServer that responds to POST /v1/chat/completions. + + First call → tool_call response; subsequent calls → final assistant message. + """ + server = fake_server_factory() + call_count = [0] + + def _handle_chat(_req): + from tests.e2e.conftest import FakeResponse + call_count[0] += 1 + body = _tool_call_response("get_temperature") if call_count[0] == 1 else _final_response() + return FakeResponse(status=200, body=body) + + server.route("POST", "/v1/chat/completions", _handle_chat) + return server + + +@pytest.fixture +def fake_data_server(fake_server_factory) -> FakeHttpServer: + """A FakeHttpServer that responds to GET /v1/temperature with celsius: 21.""" + server = fake_server_factory() + server.respond("GET", "/v1/temperature", status=200, body={"celsius": 21}) + return server + + +def _trusted_check(allowlist: set[str], url: str) -> None: + """Prefix-aware trust check: a URL is trusted if its normalized form starts with any + trusted entry (or exactly matches one). This mirrors real-world usage where users + register the service base URL (e.g. ``http://127.0.0.1:9000``) and calls to any + path under that base (e.g. ``http://127.0.0.1:9000/v1/chat/completions``) are trusted. + """ + norm = normalize_url_for_trust(url) + for trusted in allowlist: + if norm == trusted or norm.startswith(trusted + "/") or norm.startswith(trusted + "?"): + return + raise RuntimeError(f"BLOCKED: {url} not in trusted_endpoints") + + +@pytest.fixture +def fake_trusted_endpoints(monkeypatch: pytest.MonkeyPatch) -> set[str]: + """Monkeypatch _require_trusted_endpoint to consult an in-memory allowlist. + + Tests mutate the returned set to seed the allowlist for each scenario. + This fixture is ONLY sufficient for scenarios that also use ``patched_interceptor`` + (A, B, E, F), because ``patched_interceptor`` bypasses ``_storage.insert_intercept_row`` + entirely — the trust check never fires. For scenarios C and D, use + ``patched_interceptor_with_trust`` instead (it also patches the trust gate). + """ + allowlist: set[str] = set() + + def fake_require(_pg_url: str, url: str) -> None: + _trusted_check(allowlist, url) + + monkeypatch.setattr( + "provably.intercept._storage._require_trusted_endpoint", + fake_require, + ) + return allowlist + + +@pytest.fixture +def patched_interceptor_with_trust(monkeypatch: pytest.MonkeyPatch) -> tuple[list[dict[str, Any]], set[str]]: + """Install the real interceptor WITH the trust gate active. + + Unlike ``patched_interceptor`` (which replaces _insert_row and bypasses _storage entirely), + this fixture lets ``insert_intercept_row`` run so ``_require_trusted_endpoint`` fires. + It avoids a real Postgres connection by: + 1. Setting a sentinel POSTGRES_URL so the early-return guard is skipped. + 2. Replacing ``_require_trusted_endpoint`` with an in-memory prefix-aware allowlist check. + 3. Replacing ``_write_row`` with a no-op that records to an in-memory list. + + Returns (rows_list, trusted_allowlist) — tests mutate trusted_allowlist to seed permissions. + """ + rows: list[dict[str, Any]] = [] + allowlist: set[str] = set() + + # Give insert_intercept_row a non-empty POSTGRES_URL so it doesn't early-return + monkeypatch.setenv("POSTGRES_URL", "postgresql://fake-host/fake-db") + + def fake_require(_pg_url: str, url: str) -> None: + _trusted_check(allowlist, url) + + def fake_write_row(postgres_url, url, method, request_payload, raw, agent_id, action_name): + rows.append({"url": url, "method": method, "request": request_payload, "raw": raw}) + return None + + monkeypatch.setattr(_storage_module, "_require_trusted_endpoint", fake_require) + monkeypatch.setattr(_storage_module, "_write_row", fake_write_row) + + _interceptor_module.init_interceptor() + monkeypatch.setattr(_interceptor_module, "_enabled", True) + + return rows, allowlist + + +def _configure_agent_client(fake_llm_server: FakeHttpServer) -> None: + """Point the agents SDK at the fake LLM server and force chat completions mode.""" + client = AsyncOpenAI( + base_url=f"{fake_llm_server.base_url}/v1", + api_key="test-key", + ) + set_default_openai_client(client, use_for_tracing=False) + set_default_openai_api("chat_completions") + + +def _stored(record: dict) -> dict: + return {"result": record} + + +# --------------------------------------------------------------------------- +# Scenario A — happy path +# --------------------------------------------------------------------------- + +@pytest.mark.e2e +async def test_openai_agent_intercepts_and_handoff_passes( + fake_llm_server: FakeHttpServer, + fake_data_server: FakeHttpServer, + fake_server_factory, + patched_interceptor: list[dict[str, Any]], + fake_trusted_endpoints: set[str], +) -> None: + """A: both LLM and data URLs trusted; Runner.run succeeds; evaluate_handoff → PASS.""" + # Seed trusted endpoints: LLM server + data server base URLs + fake_trusted_endpoints.add(normalize_url_for_trust(fake_llm_server.base_url)) + fake_trusted_endpoints.add(normalize_url_for_trust(fake_data_server.base_url)) + + _configure_agent_client(fake_llm_server) + + data_url = f"{fake_data_server.base_url}/v1/temperature" + + @function_tool + def get_temperature() -> dict: + """Get the current temperature.""" + return requests_lib.get(data_url).json() + + agent = Agent( + name="weather-agent", + instructions="Use the get_temperature tool and report the result.", + tools=[get_temperature], + model="gpt-4o-mini", + ) + + result = await Runner.run(agent, "What's the temp?") + assert result is not None + + # Assert interceptor recorded at least 2 rows: one POST (LLM), one GET (data) + assert len(patched_interceptor) >= 2, f"Expected >=2 rows, got {len(patched_interceptor)}" + llm_rows = [r for r in patched_interceptor if r["method"] == "POST"] + data_rows = [r for r in patched_interceptor if r["method"] == "GET"] + assert len(llm_rows) >= 1, "Expected at least one POST row (LLM call)" + assert len(data_rows) >= 1, "Expected at least one GET row (data call)" + + # Spin up a fake Provably backend + fake_provably = fake_server_factory() + fake_provably.respond( + "GET", + "/api/v1/organizations/org-1/queries/q1", + status=200, + body=_stored({"celsius": 21}), + ) + fake_provably.respond( + "POST", + "/api/v1/organizations/org-1/queries/q1/verify", + status=200, + body={"verified": True}, + ) + + payload = HandoffPayload( + provably_org_id="org-1", + integration_api_key="key-abc", + claims=[ + HandoffClaim( + action_name="get_temperature", + claimed_value={"celsius": 21}, + query_record_id="q1", + ) + ], + ) + + eval_result = evaluate_handoff(payload, provably_base_url=fake_provably.base_url) + assert eval_result["outcome"] == "PASS", f"Expected PASS, got: {eval_result}" + assert eval_result["per_claim"][0]["result"] == "PASS" + + +# --------------------------------------------------------------------------- +# Scenario B — tampered claim +# --------------------------------------------------------------------------- + +@pytest.mark.e2e +async def test_evaluate_catches_wrong_claim( + fake_llm_server: FakeHttpServer, + fake_data_server: FakeHttpServer, + fake_server_factory, + patched_interceptor: list[dict[str, Any]], + fake_trusted_endpoints: set[str], +) -> None: + """B: same as A but claimed_value is wrong → CAUGHT.""" + fake_trusted_endpoints.add(normalize_url_for_trust(fake_llm_server.base_url)) + fake_trusted_endpoints.add(normalize_url_for_trust(fake_data_server.base_url)) + + _configure_agent_client(fake_llm_server) + + data_url = f"{fake_data_server.base_url}/v1/temperature" + + @function_tool + def get_temperature() -> dict: + """Get the current temperature.""" + return requests_lib.get(data_url).json() + + agent = Agent( + name="weather-agent", + instructions="Use the get_temperature tool and report the result.", + tools=[get_temperature], + model="gpt-4o-mini", + ) + + await Runner.run(agent, "What's the temp?") + + fake_provably = fake_server_factory() + fake_provably.respond( + "GET", + "/api/v1/organizations/org-1/queries/q1", + status=200, + body=_stored({"celsius": 21}), + ) + + # Claim a wrong (tampered) value + payload = HandoffPayload( + provably_org_id="org-1", + integration_api_key="key-abc", + claims=[ + HandoffClaim( + action_name="get_temperature", + claimed_value={"celsius": 99}, # tampered + query_record_id="q1", + ) + ], + ) + + eval_result = evaluate_handoff(payload, provably_base_url=fake_provably.base_url) + assert eval_result["outcome"] == "CAUGHT", f"Expected CAUGHT, got: {eval_result}" + assert eval_result["per_claim"][0]["result"] == "CAUGHT" + + +# --------------------------------------------------------------------------- +# Scenario C — untrusted GET blocks the tool call +# --------------------------------------------------------------------------- + +@pytest.mark.e2e +async def test_untrusted_get_blocks_request( + fake_llm_server: FakeHttpServer, + fake_data_server: FakeHttpServer, + patched_interceptor_with_trust, +) -> None: + """C: data server URL NOT in trusted_endpoints → tool GET raises BLOCKED. + + Uses patched_interceptor_with_trust so the real trust gate fires inside + insert_intercept_row (patched_interceptor bypasses it entirely). + """ + rows, allowlist = patched_interceptor_with_trust + + # Only trust the LLM server; omit the data server + allowlist.add(normalize_url_for_trust(fake_llm_server.base_url)) + # data server deliberately omitted + + _configure_agent_client(fake_llm_server) + + data_url = f"{fake_data_server.base_url}/v1/temperature" + + @function_tool(failure_error_function=None) + def get_temperature() -> dict: + """Get the current temperature.""" + return requests_lib.get(data_url).json() + + agent = Agent( + name="weather-agent", + instructions="Use the get_temperature tool and report the result.", + tools=[get_temperature], + model="gpt-4o-mini", + ) + + with pytest.raises((RuntimeError, Exception), match="BLOCKED"): + await Runner.run(agent, "What's the temp?") + + +# --------------------------------------------------------------------------- +# Scenario D — untrusted POST blocks the LLM call +# --------------------------------------------------------------------------- + +def _exception_chain_contains(exc: BaseException, pattern: str) -> bool: + """Walk the full exception chain (__cause__, __context__) looking for ``pattern``.""" + seen: set[int] = set() + curr: BaseException | None = exc + while curr is not None and id(curr) not in seen: + seen.add(id(curr)) + if pattern in str(curr): + return True + # Check cause before context + next_exc = curr.__cause__ if curr.__cause__ is not None else curr.__context__ + curr = next_exc + return False + + +@pytest.mark.e2e +async def test_untrusted_post_blocks_llm_call( + fake_llm_server: FakeHttpServer, + fake_data_server: FakeHttpServer, + patched_interceptor_with_trust, +) -> None: + """D: LLM server URL NOT in trusted_endpoints → LLM POST raises BLOCKED. + + Validates that the trust gate now fires on POST (Phase 1 Prereq B). + Uses patched_interceptor_with_trust so the real trust gate fires. + + The BLOCKED RuntimeError is raised inside _attach() during AsyncClient.send, + which the openai SDK wraps in an APIConnectionError; we therefore inspect + the full exception chain rather than matching the top-level message. + """ + rows, allowlist = patched_interceptor_with_trust + + # Only trust the data server; omit the LLM server + allowlist.add(normalize_url_for_trust(fake_data_server.base_url)) + # LLM server deliberately omitted + + _configure_agent_client(fake_llm_server) + + data_url = f"{fake_data_server.base_url}/v1/temperature" + + @function_tool + def get_temperature() -> dict: + """Get the current temperature.""" + return requests_lib.get(data_url).json() + + agent = Agent( + name="weather-agent", + instructions="Use the get_temperature tool and report the result.", + tools=[get_temperature], + model="gpt-4o-mini", + ) + + with pytest.raises(Exception) as exc_info: + await Runner.run(agent, "What's the temp?") + + assert _exception_chain_contains(exc_info.value, "BLOCKED"), ( + f"Expected 'BLOCKED' somewhere in the exception chain. Got: {exc_info.value!r}" + ) + + +# --------------------------------------------------------------------------- +# Scenario E — self-egress exemption: evaluate_handoff completes even when +# the Provably backend URL is not in trusted_endpoints +# --------------------------------------------------------------------------- + +@pytest.mark.e2e +def test_self_egress_completes_without_trust( + fake_server_factory, + patched_interceptor: list[dict[str, Any]], + fake_trusted_endpoints: set[str], +) -> None: + """E: Provably backend NOT in trusted_endpoints; evaluate_handoff still completes. + + The SDK wraps its own egress with provably_self_egress() so the trust gate + never fires on internal SDK calls. + """ + # Leave trusted_endpoints completely empty — the Provably backend is not trusted + assert len(fake_trusted_endpoints) == 0 + + fake_provably = fake_server_factory() + fake_provably.respond( + "GET", + "/api/v1/organizations/org-1/queries/q1", + status=200, + body=_stored({"x": 1}), + ) + fake_provably.respond( + "POST", + "/api/v1/organizations/org-1/queries/q1/verify", + status=200, + body={"verified": True}, + ) + + payload = HandoffPayload( + provably_org_id="org-1", + integration_api_key="key-abc", + claims=[ + HandoffClaim( + action_name="get_data", + claimed_value={"x": 1}, + query_record_id="q1", + ) + ], + ) + + # This must NOT raise BLOCKED — SDK egress is exempt from trust gate + eval_result = evaluate_handoff(payload, provably_base_url=fake_provably.base_url) + assert eval_result["outcome"] == "PASS", f"Expected PASS, got: {eval_result}" + + +# --------------------------------------------------------------------------- +# Scenario F — async LLM call is intercepted +# --------------------------------------------------------------------------- + +@pytest.mark.e2e +async def test_async_llm_call_intercepted( + fake_llm_server: FakeHttpServer, + fake_data_server: FakeHttpServer, + patched_interceptor: list[dict[str, Any]], + fake_trusted_endpoints: set[str], +) -> None: + """F: assert at least one recorded row has method==POST from the fake LLM server. + + Validates that httpx.AsyncClient.send is patched and recording fires. + """ + fake_trusted_endpoints.add(normalize_url_for_trust(fake_llm_server.base_url)) + fake_trusted_endpoints.add(normalize_url_for_trust(fake_data_server.base_url)) + + _configure_agent_client(fake_llm_server) + + data_url = f"{fake_data_server.base_url}/v1/temperature" + + @function_tool + def get_temperature() -> dict: + """Get the current temperature.""" + return requests_lib.get(data_url).json() + + agent = Agent( + name="weather-agent", + instructions="Use the get_temperature tool and report the result.", + tools=[get_temperature], + model="gpt-4o-mini", + ) + + await Runner.run(agent, "What's the temp?") + + # Assert at least one POST row exists and its URL points to the fake LLM server + llm_base = normalize_url_for_trust(fake_llm_server.base_url) + post_llm_rows = [ + r for r in patched_interceptor + if r["method"] == "POST" and llm_base in normalize_url_for_trust(r["url"]) + ] + assert len(post_llm_rows) >= 1, ( + f"Expected at least one POST row to LLM server {llm_base}. " + f"Recorded rows: {[(r['method'], r['url']) for r in patched_interceptor]}" + ) diff --git a/tests/unit/test_interceptor.py b/tests/unit/test_interceptor.py index 5825e4b..3f4d117 100644 --- a/tests/unit/test_interceptor.py +++ b/tests/unit/test_interceptor.py @@ -1,10 +1,15 @@ from __future__ import annotations from typing import Any +from unittest.mock import MagicMock +import httpx +import pytest import requests +import provably.intercept._storage as storage import provably.intercept.interceptor as interceptor +from provably.intercept._self_egress import provably_self_egress def test_insert_row_receives_raw_before_mutation(monkeypatch: Any) -> None: @@ -79,3 +84,184 @@ def fake_insert(url: str, *_a: Any, **_k: Any) -> None: assert out is resp finally: interceptor.set_intercept_url_allowlist(None) + + +# --------------------------------------------------------------------------- +# Phase 1 additions: Client.send / AsyncClient.send / Session.send coverage, +# re-entry guard, and self-egress exemption. +# --------------------------------------------------------------------------- + + +def _make_fake_insert(rows: list[dict[str, Any]]): + """Return a fake _insert_row that appends to ``rows``.""" + + def fake_insert(url: str, request_payload: dict, raw: Any, *, method: str = "GET") -> None: + rows.append({"url": url, "method": method, "request": request_payload, "raw": raw}) + + return fake_insert + + +def _setup_interceptor(monkeypatch: Any, rows: list[dict[str, Any]]) -> None: + """Patch _insert_row and enable interceptor; call init_interceptor to ensure patches are installed.""" + monkeypatch.setattr(interceptor, "_insert_row", _make_fake_insert(rows)) + interceptor.init_interceptor() + monkeypatch.setattr(interceptor, "_enabled", True) + + +def test_httpx_client_send_intercepted(monkeypatch: Any, fake_server: Any) -> None: + """httpx.Client(...).get(url) records exactly one row via the Client.send patch.""" + fake_server.respond("GET", "/data", status=200, body={"from_client": True}) + rows: list[dict[str, Any]] = [] + _setup_interceptor(monkeypatch, rows) + + client = httpx.Client() + response = client.get(f"{fake_server.base_url}/data") + client.close() + + assert response.status_code == 200 + assert len(rows) == 1 + assert rows[0]["method"] == "GET" + assert rows[0]["raw"] == {"from_client": True} + + +async def test_httpx_async_client_send_intercepted(monkeypatch: Any, fake_server: Any) -> None: + """await httpx.AsyncClient().get(url) records exactly one row via AsyncClient.send patch.""" + fake_server.respond("GET", "/async-data", status=200, body={"async": True}) + rows: list[dict[str, Any]] = [] + _setup_interceptor(monkeypatch, rows) + + async with httpx.AsyncClient() as client: + response = await client.get(f"{fake_server.base_url}/async-data") + + assert response.status_code == 200 + assert len(rows) == 1 + assert rows[0]["method"] == "GET" + assert rows[0]["raw"] == {"async": True} + + +def test_requests_session_send_intercepted(monkeypatch: Any, fake_server: Any) -> None: + """requests.Session().get(url) records exactly one row via Session.send patch.""" + fake_server.respond("GET", "/session-data", status=200, body={"from_session": True}) + rows: list[dict[str, Any]] = [] + _setup_interceptor(monkeypatch, rows) + + session = requests.Session() + response = session.get(f"{fake_server.base_url}/session-data") + + assert response.status_code == 200 + assert len(rows) == 1 + assert rows[0]["method"] == "GET" + assert rows[0]["raw"] == {"from_session": True} + + +def test_no_double_record_when_module_httpx_get_called(monkeypatch: Any, fake_server: Any) -> None: + """httpx.get(url) records exactly one row even though both module-level and Client.send patches are active.""" + fake_server.respond("GET", "/data", status=200, body={"v": 1}) + rows: list[dict[str, Any]] = [] + _setup_interceptor(monkeypatch, rows) + + httpx.get(f"{fake_server.base_url}/data") + + assert len(rows) == 1, f"Expected 1 row but got {len(rows)}: {rows}" + + +def test_no_double_record_when_module_requests_get_called(monkeypatch: Any, fake_server: Any) -> None: + """requests.get(url) records exactly one row even though both module-level and Session.send patches are active.""" + fake_server.respond("GET", "/data", status=200, body={"v": 2}) + rows: list[dict[str, Any]] = [] + _setup_interceptor(monkeypatch, rows) + + requests.get(f"{fake_server.base_url}/data") + + assert len(rows) == 1, f"Expected 1 row but got {len(rows)}: {rows}" + + +def test_self_egress_skips_recording(monkeypatch: Any, fake_server: Any) -> None: + """Inside with provably_self_egress(): no row is inserted.""" + fake_server.respond("GET", "/data", status=200, body={"v": 3}) + rows: list[dict[str, Any]] = [] + _setup_interceptor(monkeypatch, rows) + + with provably_self_egress(): + requests.get(f"{fake_server.base_url}/data") + + assert rows == [], f"Expected no rows but got: {rows}" + + +def test_self_egress_skips_trust_check(monkeypatch: Any, fake_server: Any) -> None: + """Inside with provably_self_egress(): an untrusted URL does NOT raise (storage layer short-circuits).""" + fake_server.respond("POST", "/untrusted", status=200, body={"ok": True}) + + trust_calls: list[str] = [] + + def fake_require(postgres_url: str, url: str) -> None: + trust_calls.append(url) + raise RuntimeError(f"BLOCKED: {url}") + + monkeypatch.setattr(storage, "_require_trusted_endpoint", fake_require) + monkeypatch.setenv("POSTGRES_URL", "postgresql://fake/db") + monkeypatch.setattr(interceptor, "_insert_row", _make_fake_insert([])) + interceptor.init_interceptor() + monkeypatch.setattr(interceptor, "_enabled", True) + + with provably_self_egress(): + # This should NOT raise even though _require_trusted_endpoint would block it + response = requests.post(f"{fake_server.base_url}/untrusted", json={"x": 1}) + + # Trust check was never called (self-egress bypassed storage entirely) + assert trust_calls == [] + assert response.status_code == 200 + + +# --------------------------------------------------------------------------- +# Storage-level trust gate tests (POST / DELETE coverage) +# --------------------------------------------------------------------------- + + +def test_trust_gate_fires_on_post(monkeypatch: Any) -> None: + """POST to an untrusted URL raises RuntimeError('BLOCKED: ...').""" + trust_calls: list[tuple[str, str]] = [] + + def fake_require(postgres_url: str, url: str) -> None: + trust_calls.append((postgres_url, url)) + raise RuntimeError(f"BLOCKED: {url} not in trusted index") + + monkeypatch.setattr(storage, "_require_trusted_endpoint", fake_require) + monkeypatch.setenv("POSTGRES_URL", "postgresql://fake/db") + + with pytest.raises(RuntimeError, match="BLOCKED"): + storage.insert_intercept_row( + url="https://untrusted.example/api", + method="POST", + request_payload={"url": "https://untrusted.example/api", "method": "POST"}, + raw={"data": 1}, + agent_id="ag", + action_name="act", + ) + + assert len(trust_calls) == 1 + assert trust_calls[0][1] == "https://untrusted.example/api" + + +def test_trust_gate_fires_on_delete(monkeypatch: Any) -> None: + """DELETE to an untrusted URL raises RuntimeError('BLOCKED: ...') — all-methods coverage.""" + trust_calls: list[str] = [] + + def fake_require(postgres_url: str, url: str) -> None: + trust_calls.append(url) + raise RuntimeError(f"BLOCKED: {url}") + + monkeypatch.setattr(storage, "_require_trusted_endpoint", fake_require) + monkeypatch.setenv("POSTGRES_URL", "postgresql://fake/db") + + with pytest.raises(RuntimeError, match="BLOCKED"): + storage.insert_intercept_row( + url="https://untrusted.example/resource/1", + method="DELETE", + request_payload={"url": "https://untrusted.example/resource/1", "method": "DELETE"}, + raw={}, + agent_id="ag", + action_name="act", + ) + + assert trust_calls == ["https://untrusted.example/resource/1"] From 5d316d1bc21cce2faa0b8cd8a2f7fc438ba41342 Mon Sep 17 00:00:00 2001 From: rimkusaurimas Date: Tue, 5 May 2026 19:51:27 +0200 Subject: [PATCH 2/6] fix(examples): move imports to top and configure_indexing into main() to satisfy ruff E402/I001 Co-Authored-By: Claude Opus 4.7 --- examples/openai_agents/agent_run.py | 128 ++++++++++------------------ 1 file changed, 43 insertions(+), 85 deletions(-) diff --git a/examples/openai_agents/agent_run.py b/examples/openai_agents/agent_run.py index e8447c4..c2ca86b 100644 --- a/examples/openai_agents/agent_run.py +++ b/examples/openai_agents/agent_run.py @@ -26,53 +26,29 @@ import psycopg2 import requests - -# --------------------------------------------------------------------------- -# Step 1 – activate Provably indexing -# (calls initialize_runtime() + init_interceptor() + enable()) -# --------------------------------------------------------------------------- -import provably.runtime as _prt -_prt.configure_indexing(enable_indexing=True) - -# --------------------------------------------------------------------------- -# Step 2 – configure the OpenAI Agents SDK to use OpenRouter -# --------------------------------------------------------------------------- +from agents import Agent, Runner, function_tool, set_default_openai_api, set_default_openai_client from openai import AsyncOpenAI -from agents import ( - Agent, - Runner, - function_tool, - set_default_openai_client, - set_default_openai_api, -) -_openrouter_client = AsyncOpenAI( - base_url="https://openrouter.ai/api/v1", - api_key=os.environ["OPENROUTER_API_KEY"], +import provably.runtime as _prt +from provably.handoff.client import cached_integration_api_key +from provably.handoff.evaluator import evaluate_handoff +from provably.handoff.types import HandoffClaim, HandoffPayload +from provably.intercept import set_interceptor_context, take_last_intercept_row_id +from provably.trusted_endpoints import ( + ensure_trusted_endpoints_table, + load_trusted_endpoint_urls, + normalize_url_for_trust, ) -set_default_openai_client(_openrouter_client, use_for_tracing=False) -# OpenRouter speaks the Chat Completions API, not the Responses API that the -# Agents SDK defaults to. Switching to "chat_completions" is required. -set_default_openai_api("chat_completions") # --------------------------------------------------------------------------- -# Step 3 – seed trusted_endpoints -# We register both the LLM provider URL and the weather API URL so the trust -# gate allows them through. normalize_url_for_trust strips trailing slashes -# and normalises scheme/host; we use the exact path URLs so exact matching works. +# Trusted endpoint URLs for this demo # --------------------------------------------------------------------------- -from provably.trusted_endpoints import normalize_url_for_trust, ensure_trusted_endpoints_table - -# Exact URLs that will appear in intercepted requests _OPENROUTER_COMPLETIONS_URL = "https://openrouter.ai/api/v1/chat/completions" -_OPEN_METEO_URL = ( - "https://api.open-meteo.com/v1/forecast" - "?latitude=51.5074&longitude=-0.1278¤t=temperature_2m" -) +_OPEN_METEO_BASE_URL = "https://api.open-meteo.com/v1/forecast" _TRUSTED_URLS = [ _OPENROUTER_COMPLETIONS_URL, - "https://api.open-meteo.com/v1/forecast", # base path (prefix match not needed – exact norm) + _OPEN_METEO_BASE_URL, ] @@ -101,24 +77,20 @@ def _seed_trusted_endpoints() -> None: # --------------------------------------------------------------------------- -# Step 4 – define the weather tool +# Tool definition — the @function_tool decorator registers the schema at +# import time but does NOT make HTTP calls, so the interceptor doesn't need +# to be active yet at decoration time. # --------------------------------------------------------------------------- -from provably.intercept import set_interceptor_context, take_last_intercept_row_id -from provably.handoff.evaluator import evaluate_handoff -from provably.handoff.types import HandoffClaim, HandoffPayload - - @function_tool def get_current_temperature_london() -> dict: """Fetch the current temperature in London (51.5074 N, 0.1278 W) from Open-Meteo. Returns a dict with a ``temperature_2m`` key (Celsius, float). """ - # Tag this tool call so the interceptor records it under action_name="get_weather" set_interceptor_context(agent_id="demo", action_name="get_weather") response = requests.get( - "https://api.open-meteo.com/v1/forecast", + _OPEN_METEO_BASE_URL, params={ "latitude": 51.5074, "longitude": -0.1278, @@ -128,20 +100,34 @@ def get_current_temperature_london() -> dict: ) response.raise_for_status() data = response.json() - # The Open-Meteo API returns: {"current": {"temperature_2m": , ...}, ...} current = data.get("current", {}) - temperature = current.get("temperature_2m") - return {"temperature_2m": temperature} + return {"temperature_2m": current.get("temperature_2m")} # --------------------------------------------------------------------------- -# Step 5 – main: run agent → capture intercept → build claim → evaluate +# Main # --------------------------------------------------------------------------- + async def main() -> None: + # Step 1 — activate Provably indexing (interceptor + storage) + # Must happen before Runner.run() so all HTTP calls are recorded. + _prt.configure_indexing(enable_indexing=True) + + # Step 2 — configure the Agents SDK to use OpenRouter (Chat Completions API) + openrouter_client = AsyncOpenAI( + base_url="https://openrouter.ai/api/v1", + api_key=os.environ["OPENROUTER_API_KEY"], + ) + set_default_openai_client(openrouter_client, use_for_tracing=False) + # OpenRouter speaks Chat Completions, not the Responses API the SDK defaults to. + set_default_openai_api("chat_completions") + + # Step 3 — seed trusted endpoints so the trust gate allows these URLs print("Seeding trusted endpoints…") _seed_trusted_endpoints() + # Step 4 — run the agent agent = Agent( name="weather-demo", instructions=( @@ -156,27 +142,14 @@ async def main() -> None: result = await Runner.run(agent, "What is the current temperature in London?") print(f"\nAgent response: {result.final_output}\n") - # ----------------------------------------------------------------- - # Capture the intercept row id for the weather tool call. - # take_last_intercept_row_id() pops the most-recently inserted row id - # (set by the interceptor when it stored the Open-Meteo response). - # ----------------------------------------------------------------- + # Step 5 — capture the intercept row id for the weather tool call intercept_row_id = take_last_intercept_row_id() if intercept_row_id is None: - print( - "WARNING: No intercept row captured. " - "Check POSTGRES_URL and that trusted_endpoints are seeded correctly." - ) - - # ----------------------------------------------------------------- - # Extract the temperature value the tool actually returned. - # The agent's tool output is available in result.new_items as ToolCallOutputItem. - # For simplicity, re-derive it from the final output text is fragile, so we - # inspect the tool call outputs directly. - # ----------------------------------------------------------------- + print("WARNING: No intercept row captured. Check POSTGRES_URL and that trusted_endpoints are seeded correctly.") + + # Step 6 — extract tool output for the claim tool_output_value: dict = {} for item in result.new_items: - # ToolCallOutputItem has .output attribute (the raw tool return value) if hasattr(item, "output"): raw_out = item.output if isinstance(raw_out, str): @@ -191,34 +164,19 @@ async def main() -> None: print(f"Tool output captured for claim: {tool_output_value}") - # ----------------------------------------------------------------- - # Build a HandoffPayload with one HandoffClaim referencing the - # intercepted Open-Meteo response stored in provably_intercepts. - # ----------------------------------------------------------------- + # Step 7 — build the HandoffPayload and evaluate org_id = os.environ["PROVABLY_ORG_ID"] provably_base_url = os.environ.get("PROVABLY_RUST_BE_URL", "").rstrip("/") postgres_url = os.environ["POSTGRES_URL"] - # Import the integration_api_key from the runtime cache - from provably.handoff.client import cached_integration_api_key - integration_key = cached_integration_api_key() - - # Build the query_record_id. When intercept_row_id is available the - # convention used by build_handoff_payload is to create a query record - # against the row PK. For this demo we build the HandoffClaim directly - # with the row id encoded as the query_record_id string (the evaluator - # will fetch the record from the Provably backend using this id). - # In production usage you would call create_query_record_for_intercept() - # (via build_handoff_payload) to get the real Provably query record id. + # NOTE: in production use build_handoff_payload() to obtain a real Provably + # query record UUID. This demo passes the raw intercepts PK as a stand-in. query_record_id = str(intercept_row_id) if intercept_row_id else "" - - # Load the trusted endpoint URLs for the snapshot embedded in the payload - from provably.trusted_endpoints import load_trusted_endpoint_urls trusted_urls = load_trusted_endpoint_urls(postgres_url, org_id) payload = HandoffPayload( provably_org_id=org_id, - integration_api_key=integration_key, + integration_api_key=cached_integration_api_key(), trusted_endpoint_registry=trusted_urls, claims=[ HandoffClaim( From 0232b10e48c3fbafd3b7e8edc42f94fa5821dd7f Mon Sep 17 00:00:00 2001 From: rimkusaurimas Date: Tue, 5 May 2026 19:56:23 +0200 Subject: [PATCH 3/6] fix(lint): resolve repo-wide ruff errors - _self_egress.py: import Generator from collections.abc (UP035, py3.9+) - interceptor.py: shorten oversized docstring line (E501) - test_openai_agents_e2e.py: sort imports (I001) - test_interceptor.py: remove unused MagicMock import (F401) Co-Authored-By: Claude Opus 4.7 --- src/provably/intercept/_self_egress.py | 2 +- src/provably/intercept/interceptor.py | 2 +- tests/e2e/test_openai_agents_e2e.py | 6 ++---- tests/unit/test_interceptor.py | 1 - 4 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/provably/intercept/_self_egress.py b/src/provably/intercept/_self_egress.py index 7bf71bc..5783c7a 100644 --- a/src/provably/intercept/_self_egress.py +++ b/src/provably/intercept/_self_egress.py @@ -1,8 +1,8 @@ """Self-egress exemption: SDK-internal HTTP calls bypass intercept recording and trust checks.""" import contextvars +from collections.abc import Generator from contextlib import contextmanager -from typing import Generator _self_egress: contextvars.ContextVar[bool] = contextvars.ContextVar( "provably_self_egress", default=False diff --git a/src/provably/intercept/interceptor.py b/src/provably/intercept/interceptor.py index 4553476..3e12d20 100644 --- a/src/provably/intercept/interceptor.py +++ b/src/provably/intercept/interceptor.py @@ -265,7 +265,7 @@ def _httpx_request_to_kwargs(req: httpx.Request) -> dict[str, Any]: def _requests_prepared_to_kwargs(req: requests.PreparedRequest) -> dict[str, Any]: - """Extract request metadata from a requests.PreparedRequest into the kwargs shape request_payload_dict understands.""" + """Extract request metadata from a PreparedRequest into the kwargs shape request_payload_dict understands.""" from urllib.parse import parse_qs, urlsplit kwargs: dict[str, Any] = {} diff --git a/tests/e2e/test_openai_agents_e2e.py b/tests/e2e/test_openai_agents_e2e.py index ae72b29..0a49d57 100644 --- a/tests/e2e/test_openai_agents_e2e.py +++ b/tests/e2e/test_openai_agents_e2e.py @@ -28,18 +28,16 @@ import pytest import requests as requests_lib - +from agents import Agent, Runner, function_tool, set_default_openai_api, set_default_openai_client from openai import AsyncOpenAI -from agents import Agent, Runner, function_tool, set_default_openai_client, set_default_openai_api -import provably.intercept.interceptor as _interceptor_module import provably.intercept._storage as _storage_module +import provably.intercept.interceptor as _interceptor_module from provably.handoff.evaluator import evaluate_handoff from provably.handoff.types import HandoffClaim, HandoffPayload from provably.trusted_endpoints import normalize_url_for_trust from tests.e2e.conftest import FakeHttpServer - # --------------------------------------------------------------------------- # Helper: build canonical ChatCompletion JSON responses for the fake LLM server # --------------------------------------------------------------------------- diff --git a/tests/unit/test_interceptor.py b/tests/unit/test_interceptor.py index 3f4d117..d87c434 100644 --- a/tests/unit/test_interceptor.py +++ b/tests/unit/test_interceptor.py @@ -1,7 +1,6 @@ from __future__ import annotations from typing import Any -from unittest.mock import MagicMock import httpx import pytest From 31b37891006bba477dee28a457d2312b0fc89cfb Mon Sep 17 00:00:00 2001 From: rimkusaurimas Date: Tue, 5 May 2026 20:14:05 +0200 Subject: [PATCH 4/6] refactor(qa): DRY interceptor + handoff HTTP + e2e tests; fix Docker image deps QA pass after Sonnet's initial Phase 1-3 work: - Dockerfile: add pytest-asyncio + openai-agents to test image so the new e2e suite can collect inside the container (CI docker job was failing). - handoff/_http.py: collapse 6 scattered `with _self_egress():` blocks into a single `_request(method, path, **kwargs)` helper. -28 lines. - intercept/interceptor.py: * Drop dead `_RequestsJsonOverride`/`_HttpxJsonOverride` aliases; use the public names directly. * Hoist `agent_id`/`action_name` locals in `_insert_row` (was reading each ContextVar three times). * Move `urllib.parse` import to module scope (was buried in a function). * Extract shared `_decode_body_into_kwargs` helper for the two request-to-kwargs converters; same JSON-vs-data logic was inlined twice. - tests/e2e/test_openai_agents_e2e.py: -267 lines (-48%). Six scenarios were copy-pasting the same 15-line agent setup; lifted into `_make_weather_agent`, `_seed_allowlist`, `_make_provably_backend`, `_make_payload` helpers. - tests/e2e/conftest.py: remove duplicate `fake_server` fixture (tests/conftest.py already provides it; pytest inherits from parent conftests). 97/97 tests still green. Ruff clean. Co-Authored-By: Claude Opus 4.7 --- Dockerfile | 2 +- src/provably/handoff/_http.py | 50 +-- src/provably/intercept/interceptor.py | 68 ++-- tests/e2e/conftest.py | 10 - tests/e2e/test_openai_agents_e2e.py | 468 +++++++++----------------- tests/unit/test_interceptor.py | 3 +- 6 files changed, 201 insertions(+), 400 deletions(-) diff --git a/Dockerfile b/Dockerfile index a95406f..ff4dfa8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -56,7 +56,7 @@ COPY --from=builder /dist /dist RUN pip install --upgrade pip \ && pip install /dist/*.whl \ - && pip install "pytest>=8.0" "ruff>=0.3" "build>=1.2" + && pip install "pytest>=8.0" "pytest-asyncio>=0.23" "ruff>=0.3" "build>=1.2" "openai-agents" COPY pyproject.toml ./ COPY tests ./tests diff --git a/src/provably/handoff/_http.py b/src/provably/handoff/_http.py index ba3e833..6333bc5 100644 --- a/src/provably/handoff/_http.py +++ b/src/provably/handoff/_http.py @@ -14,20 +14,22 @@ _log = get_logger(__name__) _SESSION = requests.Session() +_TRANSIENT_STATUS = {429, 502, 503, 504} -def _self_egress(): - """Lazy import of provably_self_egress to avoid circular imports at module load time. +def _request(method: str, path: str, **kwargs: Any) -> requests.Response: + """All outbound HTTP from this module funnels through here. - provably.intercept._self_egress -> provably/intercept/__init__.py -> interceptor.py - -> _storage.py -> provably.handoff._preprocess -> provably.handoff._http (circular). - Deferring the import to call time avoids the cycle. + Wraps every call in ``provably_self_egress()`` so the SDK's own backend traffic + bypasses the trust gate and the intercept recorder. The ``provably_self_egress`` + import is deferred to avoid the circular import: + ``provably.intercept`` → ``interceptor`` → ``_storage`` → ``handoff._preprocess`` + → ``handoff._http``. """ from provably.intercept._self_egress import provably_self_egress # noqa: PLC0415 - return provably_self_egress() - -_TRANSIENT_STATUS = {429, 502, 503, 504} + with provably_self_egress(): + return _SESSION.request(method, f"{base_url()}{path}", headers=headers(), **kwargs) def base_url() -> str: @@ -141,8 +143,7 @@ def log_failed_response(resp: requests.Response) -> None: def get_json(path: str) -> Any: - with _self_egress(): - resp = _SESSION.get(f"{base_url()}{path}", headers=headers(), timeout=60) + resp = _request("GET", path, timeout=60) if not resp.ok: log_failed_response(resp) resp.raise_for_status() @@ -151,27 +152,15 @@ def get_json(path: str) -> Any: def get_json_params(path: str, params: dict[str, Any], *, timeout_s: float = 60.0) -> Any: """GET with query-string parameters (e.g. list queries filtered by ``collection_ids``).""" - with _self_egress(): - resp = _SESSION.get( - f"{base_url()}{path}", headers=headers(), params=params, timeout=timeout_s - ) + resp = _request("GET", path, params=params, timeout=timeout_s) if not resp.ok: log_failed_response(resp) resp.raise_for_status() - if not resp.text: - return [] - data = resp.json() - return data + return resp.json() if resp.text else [] def post_json(path: str, payload: dict[str, Any] | None = None) -> dict[str, Any]: - with _self_egress(): - resp = _SESSION.post( - f"{base_url()}{path}", - headers=headers(), - json=payload or {}, - timeout=60, - ) + resp = _request("POST", path, json=payload or {}, timeout=60) if not resp.ok: log_failed_response(resp) resp.raise_for_status() @@ -180,8 +169,7 @@ def post_json(path: str, payload: dict[str, Any] | None = None) -> dict[str, Any def post_raw(path: str, payload: dict[str, Any]) -> requests.Response: """Post without raising so callers can inspect error bodies (e.g. 'already exists').""" - with _self_egress(): - return _SESSION.post(f"{base_url()}{path}", headers=headers(), json=payload, timeout=60) + return _request("POST", path, json=payload, timeout=60) def post_json_with_transient_retry( @@ -194,13 +182,7 @@ def post_json_with_transient_retry( payload = payload or {} last: requests.Response | None = None for attempt in range(max_attempts): - with _self_egress(): - last = _SESSION.post( - f"{base_url()}{path}", - headers=headers(), - json=payload, - timeout=120, - ) + last = _request("POST", path, json=payload, timeout=120) if last.status_code in _TRANSIENT_STATUS: log_failed_response(last) time.sleep(min(3.0 * (attempt + 1), 45)) diff --git a/src/provably/intercept/interceptor.py b/src/provably/intercept/interceptor.py index 3e12d20..5b9497c 100644 --- a/src/provably/intercept/interceptor.py +++ b/src/provably/intercept/interceptor.py @@ -7,6 +7,7 @@ from collections.abc import Callable from contextvars import ContextVar from typing import Any +from urllib.parse import parse_qs, urlsplit import httpx import requests @@ -24,9 +25,6 @@ ) from provably.trusted_endpoints import normalize_url_for_trust -_RequestsJsonOverride = RequestsJsonOverride -_HttpxJsonOverride = HttpxJsonOverride - _ctx_agent_id: ContextVar[str] = ContextVar("provably_agent_id", default="") _ctx_action_name: ContextVar[str] = ContextVar("provably_action_name", default="") _ctx_intercept_index: ContextVar[int] = ContextVar("provably_intercept_index", default=0) @@ -173,19 +171,21 @@ def is_enabled() -> bool: def _insert_row(url: str, request_payload: dict[str, Any], raw: Any, *, method: str = "GET") -> None: if not _enabled: return + agent_id = _ctx_agent_id.get() or "unknown" + action_name = _ctx_action_name.get() or "unknown" row_id = insert_intercept_row( url=url, method=method, request_payload=request_payload, raw=raw, - agent_id=_ctx_agent_id.get() or "unknown", - action_name=_ctx_action_name.get() or "unknown", + agent_id=agent_id, + action_name=action_name, ) if row_id is not None: global _last_intercept_row_id with _intercept_lock: _last_intercept_row_id = row_id - _action_row_ids[(_ctx_agent_id.get() or "unknown", _ctx_action_name.get() or "unknown")] = row_id + _action_row_ids[(agent_id, action_name)] = row_id def _maybe_transform_body(raw: Any) -> Any: @@ -220,9 +220,9 @@ def _attach(response: Any, url: str, method: str, req_kwargs: dict[str, Any]) -> if mutated is raw: return response if isinstance(response, requests.Response): - return _RequestsJsonOverride(response, mutated) + return RequestsJsonOverride(response, mutated) if isinstance(response, httpx.Response): - return _HttpxJsonOverride(response, mutated) + return HttpxJsonOverride(response, mutated) return response @@ -245,49 +245,43 @@ def wrapped(url, *args, **kwargs): return wrapped +def _decode_body_into_kwargs(kwargs: dict[str, Any], body: bytes | str | None, content_type: str) -> None: + """Decode a raw request body into the kwargs shape ``request_payload_dict`` understands. + + JSON bodies (when Content-Type contains ``application/json``) are parsed and stored under + ``json``; everything else is decoded to text and stored under ``data``. Mutates ``kwargs``. + """ + if body is None or body == b"": + return + text = body if isinstance(body, str) else body.decode(errors="replace") + if "application/json" in content_type: + try: + kwargs["json"] = json.loads(text) + return + except Exception: # noqa: BLE001 + pass + kwargs["data"] = text + + def _httpx_request_to_kwargs(req: httpx.Request) -> dict[str, Any]: - """Extract request metadata from an httpx.Request into the kwargs shape request_payload_dict understands.""" + """Extract httpx.Request metadata into the kwargs shape request_payload_dict understands.""" kwargs: dict[str, Any] = {} - # Query params params = dict(req.url.params) if params: kwargs["params"] = params - # Body: try to parse as JSON if Content-Type is application/json - content_type = req.headers.get("content-type", "") - if req.content and "application/json" in content_type: - try: - kwargs["json"] = json.loads(req.content) - except Exception: # noqa: BLE001 - kwargs["content"] = req.content.decode(errors="replace") - elif req.content: - kwargs["data"] = req.content.decode(errors="replace") + _decode_body_into_kwargs(kwargs, req.content, req.headers.get("content-type", "")) return kwargs def _requests_prepared_to_kwargs(req: requests.PreparedRequest) -> dict[str, Any]: - """Extract request metadata from a PreparedRequest into the kwargs shape request_payload_dict understands.""" - from urllib.parse import parse_qs, urlsplit - + """Extract PreparedRequest metadata into the kwargs shape request_payload_dict understands.""" kwargs: dict[str, Any] = {} - # Query params from URL - url_str = str(req.url or "") - parsed = urlsplit(url_str) + parsed = urlsplit(str(req.url or "")) if parsed.query: raw_params = parse_qs(parsed.query, keep_blank_values=True) # Flatten single-value lists to scalars (matches typical kwargs["params"] shape) kwargs["params"] = {k: (v[0] if len(v) == 1 else v) for k, v in raw_params.items()} - # Body - content_type = (req.headers or {}).get("Content-Type", "") or "" - body = req.body - if body is not None: - if "application/json" in content_type: - try: - body_str = body if isinstance(body, str) else body.decode(errors="replace") - kwargs["json"] = json.loads(body_str) - except Exception: # noqa: BLE001 - kwargs["data"] = body if isinstance(body, str) else body.decode(errors="replace") - else: - kwargs["data"] = body if isinstance(body, str) else body.decode(errors="replace") + _decode_body_into_kwargs(kwargs, req.body, (req.headers or {}).get("Content-Type", "") or "") return kwargs diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 8f679e6..3678d15 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -118,16 +118,6 @@ def stop(self) -> None: self._thread.join(timeout=2.0) -@pytest.fixture -def fake_server() -> Iterator[FakeHttpServer]: - server = FakeHttpServer() - server.start() - try: - yield server - finally: - server.stop() - - @pytest.fixture def fake_server_factory() -> Iterator[Callable[[], FakeHttpServer]]: """Factory fixture that creates and tracks multiple FakeHttpServer instances. diff --git a/tests/e2e/test_openai_agents_e2e.py b/tests/e2e/test_openai_agents_e2e.py index 0a49d57..d3c48aa 100644 --- a/tests/e2e/test_openai_agents_e2e.py +++ b/tests/e2e/test_openai_agents_e2e.py @@ -1,25 +1,21 @@ -"""End-to-end test suite for OpenAI Agents SDK integration. +"""End-to-end tests for OpenAI Agents SDK integration. -Six scenarios (A–F) drive a fake LLM server + fake data server through the -full intercept → handoff → evaluate flow: +Six scenarios drive a fake LLM server + fake data server through the full +intercept → handoff → evaluate flow: A – happy path: all URLs trusted, evaluate_handoff → PASS B – tampered claim: claimed_value wrong → CAUGHT C – untrusted GET: data URL not in allowlist → tool call raises BLOCKED D – untrusted POST: LLM URL not in allowlist → LLM POST raises BLOCKED E – self-egress exemption: Provably backend NOT in allowlist; evaluate_handoff still completes - F – async LLM coverage: at least one recorded row has method == "POST" from the LLM server - -Design note on trust-gate testing (scenarios C and D): - ``patched_interceptor`` replaces ``_insert_row`` in ``interceptor.py``, which bypasses the - entire ``_storage.insert_intercept_row`` path (and therefore never reaches - ``_require_trusted_endpoint``). For C and D we need the trust check to fire but avoid a real - DB write, so those two scenarios use ``patched_interceptor_with_trust`` instead, which: - 1. Calls ``init_interceptor()`` (installs patches), - 2. Monkeypatches POSTGRES_URL to a non-empty sentinel so the early-return guard in - ``insert_intercept_row`` is skipped, - 3. Monkeypatches ``_require_trusted_endpoint`` with the in-memory allowlist check, and - 4. Monkeypatches ``_write_row`` to a no-op so no actual DB call happens. + F – async LLM coverage: at least one recorded POST row from the LLM server + +Trust-gate testing notes (scenarios C and D): + ``patched_interceptor`` replaces ``_insert_row`` entirely, bypassing the + ``_storage.insert_intercept_row`` path (and therefore the trust check). For C and D the + trust check must fire while still avoiding a real DB write, so those scenarios use + ``patched_interceptor_with_trust`` which patches ``_require_trusted_endpoint`` + + ``_write_row`` instead and sets a sentinel ``POSTGRES_URL``. """ from __future__ import annotations @@ -36,12 +32,13 @@ from provably.handoff.evaluator import evaluate_handoff from provably.handoff.types import HandoffClaim, HandoffPayload from provably.trusted_endpoints import normalize_url_for_trust -from tests.e2e.conftest import FakeHttpServer +from tests.e2e.conftest import FakeHttpServer, FakeResponse # --------------------------------------------------------------------------- -# Helper: build canonical ChatCompletion JSON responses for the fake LLM server +# Fake LLM ChatCompletion JSON responses # --------------------------------------------------------------------------- + def _tool_call_response(tool_name: str, call_id: str = "call_001", arguments: str = "{}") -> dict: """First LLM turn: respond with a tool_call.""" return { @@ -59,10 +56,7 @@ def _tool_call_response(tool_name: str, call_id: str = "call_001", arguments: st { "id": call_id, "type": "function", - "function": { - "name": tool_name, - "arguments": arguments, - }, + "function": {"name": tool_name, "arguments": arguments}, } ], }, @@ -70,11 +64,7 @@ def _tool_call_response(tool_name: str, call_id: str = "call_001", arguments: st "logprobs": None, } ], - "usage": { - "prompt_tokens": 10, - "completion_tokens": 5, - "total_tokens": 15, - }, + "usage": {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15}, } @@ -88,38 +78,27 @@ def _final_response(content: str = "The temperature is 21 degrees Celsius.") -> "choices": [ { "index": 0, - "message": { - "role": "assistant", - "content": content, - "tool_calls": None, - }, + "message": {"role": "assistant", "content": content, "tool_calls": None}, "finish_reason": "stop", "logprobs": None, } ], - "usage": { - "prompt_tokens": 20, - "completion_tokens": 10, - "total_tokens": 30, - }, + "usage": {"prompt_tokens": 20, "completion_tokens": 10, "total_tokens": 30}, } # --------------------------------------------------------------------------- -# Local fixtures +# Fixtures # --------------------------------------------------------------------------- + @pytest.fixture def fake_llm_server(fake_server_factory) -> FakeHttpServer: - """A FakeHttpServer that responds to POST /v1/chat/completions. - - First call → tool_call response; subsequent calls → final assistant message. - """ + """Fake LLM: tool_call response on the first POST, final assistant message after that.""" server = fake_server_factory() call_count = [0] def _handle_chat(_req): - from tests.e2e.conftest import FakeResponse call_count[0] += 1 body = _tool_call_response("get_temperature") if call_count[0] == 1 else _final_response() return FakeResponse(status=200, body=body) @@ -130,17 +109,17 @@ def _handle_chat(_req): @pytest.fixture def fake_data_server(fake_server_factory) -> FakeHttpServer: - """A FakeHttpServer that responds to GET /v1/temperature with celsius: 21.""" + """Fake data API: GET /v1/temperature → {'celsius': 21}.""" server = fake_server_factory() server.respond("GET", "/v1/temperature", status=200, body={"celsius": 21}) return server def _trusted_check(allowlist: set[str], url: str) -> None: - """Prefix-aware trust check: a URL is trusted if its normalized form starts with any - trusted entry (or exactly matches one). This mirrors real-world usage where users - register the service base URL (e.g. ``http://127.0.0.1:9000``) and calls to any - path under that base (e.g. ``http://127.0.0.1:9000/v1/chat/completions``) are trusted. + """Prefix-aware trust check. + + Mirrors real-world usage where users register a service base URL (e.g. + ``http://127.0.0.1:9000``) and any path under that base is allowed. """ norm = normalize_url_for_trust(url) for trusted in allowlist: @@ -151,368 +130,242 @@ def _trusted_check(allowlist: set[str], url: str) -> None: @pytest.fixture def fake_trusted_endpoints(monkeypatch: pytest.MonkeyPatch) -> set[str]: - """Monkeypatch _require_trusted_endpoint to consult an in-memory allowlist. + """In-memory allowlist for ``_require_trusted_endpoint``. - Tests mutate the returned set to seed the allowlist for each scenario. - This fixture is ONLY sufficient for scenarios that also use ``patched_interceptor`` - (A, B, E, F), because ``patched_interceptor`` bypasses ``_storage.insert_intercept_row`` - entirely — the trust check never fires. For scenarios C and D, use - ``patched_interceptor_with_trust`` instead (it also patches the trust gate). + Only sufficient when paired with ``patched_interceptor`` (which bypasses + ``insert_intercept_row``). For scenarios needing the real trust gate to fire (C, D), + use ``patched_interceptor_with_trust``. """ allowlist: set[str] = set() - - def fake_require(_pg_url: str, url: str) -> None: - _trusted_check(allowlist, url) - monkeypatch.setattr( "provably.intercept._storage._require_trusted_endpoint", - fake_require, + lambda _pg, url: _trusted_check(allowlist, url), ) return allowlist @pytest.fixture def patched_interceptor_with_trust(monkeypatch: pytest.MonkeyPatch) -> tuple[list[dict[str, Any]], set[str]]: - """Install the real interceptor WITH the trust gate active. + """Like ``patched_interceptor`` but with the trust gate active. - Unlike ``patched_interceptor`` (which replaces _insert_row and bypasses _storage entirely), - this fixture lets ``insert_intercept_row`` run so ``_require_trusted_endpoint`` fires. - It avoids a real Postgres connection by: - 1. Setting a sentinel POSTGRES_URL so the early-return guard is skipped. - 2. Replacing ``_require_trusted_endpoint`` with an in-memory prefix-aware allowlist check. + Lets ``insert_intercept_row`` actually run so ``_require_trusted_endpoint`` fires; avoids + a real Postgres connection by: + 1. Setting a sentinel ``POSTGRES_URL`` so the early-return guard is skipped. + 2. Replacing ``_require_trusted_endpoint`` with the in-memory prefix check. 3. Replacing ``_write_row`` with a no-op that records to an in-memory list. - - Returns (rows_list, trusted_allowlist) — tests mutate trusted_allowlist to seed permissions. """ rows: list[dict[str, Any]] = [] allowlist: set[str] = set() - # Give insert_intercept_row a non-empty POSTGRES_URL so it doesn't early-return monkeypatch.setenv("POSTGRES_URL", "postgresql://fake-host/fake-db") + monkeypatch.setattr( + _storage_module, "_require_trusted_endpoint", lambda _pg, url: _trusted_check(allowlist, url) + ) + monkeypatch.setattr( + _storage_module, + "_write_row", + lambda *_a, **_kw: rows.append({"url": _a[1], "method": _a[2], "request": _a[3], "raw": _a[4]}), + ) - def fake_require(_pg_url: str, url: str) -> None: - _trusted_check(allowlist, url) + _interceptor_module.init_interceptor() + monkeypatch.setattr(_interceptor_module, "_enabled", True) + return rows, allowlist - def fake_write_row(postgres_url, url, method, request_payload, raw, agent_id, action_name): - rows.append({"url": url, "method": method, "request": request_payload, "raw": raw}) - return None - monkeypatch.setattr(_storage_module, "_require_trusted_endpoint", fake_require) - monkeypatch.setattr(_storage_module, "_write_row", fake_write_row) +# --------------------------------------------------------------------------- +# Test helpers +# --------------------------------------------------------------------------- - _interceptor_module.init_interceptor() - monkeypatch.setattr(_interceptor_module, "_enabled", True) - return rows, allowlist +def _seed_allowlist(allowlist: set[str], *servers: FakeHttpServer) -> None: + """Seed an allowlist with the normalized base URLs of the given fake servers.""" + for s in servers: + allowlist.add(normalize_url_for_trust(s.base_url)) def _configure_agent_client(fake_llm_server: FakeHttpServer) -> None: """Point the agents SDK at the fake LLM server and force chat completions mode.""" - client = AsyncOpenAI( - base_url=f"{fake_llm_server.base_url}/v1", - api_key="test-key", - ) + client = AsyncOpenAI(base_url=f"{fake_llm_server.base_url}/v1", api_key="test-key") set_default_openai_client(client, use_for_tracing=False) set_default_openai_api("chat_completions") -def _stored(record: dict) -> dict: - return {"result": record} - - -# --------------------------------------------------------------------------- -# Scenario A — happy path -# --------------------------------------------------------------------------- - -@pytest.mark.e2e -async def test_openai_agent_intercepts_and_handoff_passes( - fake_llm_server: FakeHttpServer, - fake_data_server: FakeHttpServer, - fake_server_factory, - patched_interceptor: list[dict[str, Any]], - fake_trusted_endpoints: set[str], -) -> None: - """A: both LLM and data URLs trusted; Runner.run succeeds; evaluate_handoff → PASS.""" - # Seed trusted endpoints: LLM server + data server base URLs - fake_trusted_endpoints.add(normalize_url_for_trust(fake_llm_server.base_url)) - fake_trusted_endpoints.add(normalize_url_for_trust(fake_data_server.base_url)) - - _configure_agent_client(fake_llm_server) +def _make_weather_agent( + fake_data_server: FakeHttpServer, *, fail_loudly: bool = False +) -> Agent: + """Build the standard weather agent that GETs ``/v1/temperature`` from the fake data server. + ``fail_loudly=True`` sets ``failure_error_function=None`` on the tool so that exceptions + raised inside the tool propagate up through ``Runner.run`` instead of being converted + into LLM-visible error strings (used by scenario C). + """ data_url = f"{fake_data_server.base_url}/v1/temperature" + decorator = function_tool(failure_error_function=None) if fail_loudly else function_tool - @function_tool + @decorator def get_temperature() -> dict: """Get the current temperature.""" return requests_lib.get(data_url).json() - agent = Agent( + return Agent( name="weather-agent", instructions="Use the get_temperature tool and report the result.", tools=[get_temperature], model="gpt-4o-mini", ) - result = await Runner.run(agent, "What's the temp?") - assert result is not None - # Assert interceptor recorded at least 2 rows: one POST (LLM), one GET (data) - assert len(patched_interceptor) >= 2, f"Expected >=2 rows, got {len(patched_interceptor)}" - llm_rows = [r for r in patched_interceptor if r["method"] == "POST"] - data_rows = [r for r in patched_interceptor if r["method"] == "GET"] - assert len(llm_rows) >= 1, "Expected at least one POST row (LLM call)" - assert len(data_rows) >= 1, "Expected at least one GET row (data call)" - - # Spin up a fake Provably backend - fake_provably = fake_server_factory() - fake_provably.respond( - "GET", - "/api/v1/organizations/org-1/queries/q1", - status=200, - body=_stored({"celsius": 21}), - ) - fake_provably.respond( - "POST", - "/api/v1/organizations/org-1/queries/q1/verify", - status=200, - body={"verified": True}, - ) +def _make_provably_backend( + fake_server_factory, query_record_id: str, indexed_value: dict +) -> FakeHttpServer: + """Spin up a fake Provably backend that resolves one query record + one verify call.""" + server = fake_server_factory() + base = f"/api/v1/organizations/org-1/queries/{query_record_id}" + server.respond("GET", base, status=200, body={"result": indexed_value}) + server.respond("POST", f"{base}/verify", status=200, body={"verified": True}) + return server - payload = HandoffPayload( + +def _make_payload(query_record_id: str, claimed_value: dict) -> HandoffPayload: + return HandoffPayload( provably_org_id="org-1", integration_api_key="key-abc", claims=[ HandoffClaim( action_name="get_temperature", - claimed_value={"celsius": 21}, - query_record_id="q1", + claimed_value=claimed_value, + query_record_id=query_record_id, ) ], ) - eval_result = evaluate_handoff(payload, provably_base_url=fake_provably.base_url) - assert eval_result["outcome"] == "PASS", f"Expected PASS, got: {eval_result}" - assert eval_result["per_claim"][0]["result"] == "PASS" + +def _exception_chain_contains(exc: BaseException, pattern: str) -> bool: + """Walk ``__cause__``/``__context__`` looking for ``pattern`` (the agents SDK wraps + ``RuntimeError`` from ``AsyncClient.send`` as ``APIConnectionError``).""" + seen: set[int] = set() + curr: BaseException | None = exc + while curr is not None and id(curr) not in seen: + seen.add(id(curr)) + if pattern in str(curr): + return True + curr = curr.__cause__ if curr.__cause__ is not None else curr.__context__ + return False # --------------------------------------------------------------------------- -# Scenario B — tampered claim +# Scenarios # --------------------------------------------------------------------------- + @pytest.mark.e2e -async def test_evaluate_catches_wrong_claim( +async def test_openai_agent_intercepts_and_handoff_passes( fake_llm_server: FakeHttpServer, fake_data_server: FakeHttpServer, fake_server_factory, patched_interceptor: list[dict[str, Any]], fake_trusted_endpoints: set[str], ) -> None: - """B: same as A but claimed_value is wrong → CAUGHT.""" - fake_trusted_endpoints.add(normalize_url_for_trust(fake_llm_server.base_url)) - fake_trusted_endpoints.add(normalize_url_for_trust(fake_data_server.base_url)) - + """A — both URLs trusted; agent runs; evaluate_handoff → PASS.""" + _seed_allowlist(fake_trusted_endpoints, fake_llm_server, fake_data_server) _configure_agent_client(fake_llm_server) - data_url = f"{fake_data_server.base_url}/v1/temperature" + result = await Runner.run(_make_weather_agent(fake_data_server), "What's the temp?") + assert result is not None - @function_tool - def get_temperature() -> dict: - """Get the current temperature.""" - return requests_lib.get(data_url).json() + methods = [r["method"] for r in patched_interceptor] + assert methods.count("POST") >= 1, f"Expected ≥1 LLM POST, got {methods}" + assert methods.count("GET") >= 1, f"Expected ≥1 data GET, got {methods}" - agent = Agent( - name="weather-agent", - instructions="Use the get_temperature tool and report the result.", - tools=[get_temperature], - model="gpt-4o-mini", + fake_provably = _make_provably_backend(fake_server_factory, "q1", {"celsius": 21}) + eval_result = evaluate_handoff( + _make_payload("q1", {"celsius": 21}), provably_base_url=fake_provably.base_url ) + assert eval_result["outcome"] == "PASS", f"Expected PASS, got: {eval_result}" + assert eval_result["per_claim"][0]["result"] == "PASS" - await Runner.run(agent, "What's the temp?") - fake_provably = fake_server_factory() - fake_provably.respond( - "GET", - "/api/v1/organizations/org-1/queries/q1", - status=200, - body=_stored({"celsius": 21}), - ) +@pytest.mark.e2e +async def test_evaluate_catches_wrong_claim( + fake_llm_server: FakeHttpServer, + fake_data_server: FakeHttpServer, + fake_server_factory, + patched_interceptor: list[dict[str, Any]], + fake_trusted_endpoints: set[str], +) -> None: + """B — happy run, but the claim's value disagrees with the indexed record → CAUGHT.""" + _seed_allowlist(fake_trusted_endpoints, fake_llm_server, fake_data_server) + _configure_agent_client(fake_llm_server) - # Claim a wrong (tampered) value - payload = HandoffPayload( - provably_org_id="org-1", - integration_api_key="key-abc", - claims=[ - HandoffClaim( - action_name="get_temperature", - claimed_value={"celsius": 99}, # tampered - query_record_id="q1", - ) - ], - ) + await Runner.run(_make_weather_agent(fake_data_server), "What's the temp?") - eval_result = evaluate_handoff(payload, provably_base_url=fake_provably.base_url) + fake_provably = _make_provably_backend(fake_server_factory, "q1", {"celsius": 21}) + eval_result = evaluate_handoff( + _make_payload("q1", {"celsius": 99}), # tampered claim + provably_base_url=fake_provably.base_url, + ) assert eval_result["outcome"] == "CAUGHT", f"Expected CAUGHT, got: {eval_result}" assert eval_result["per_claim"][0]["result"] == "CAUGHT" -# --------------------------------------------------------------------------- -# Scenario C — untrusted GET blocks the tool call -# --------------------------------------------------------------------------- - @pytest.mark.e2e async def test_untrusted_get_blocks_request( fake_llm_server: FakeHttpServer, fake_data_server: FakeHttpServer, - patched_interceptor_with_trust, + patched_interceptor_with_trust: tuple[list[dict[str, Any]], set[str]], ) -> None: - """C: data server URL NOT in trusted_endpoints → tool GET raises BLOCKED. - - Uses patched_interceptor_with_trust so the real trust gate fires inside - insert_intercept_row (patched_interceptor bypasses it entirely). - """ - rows, allowlist = patched_interceptor_with_trust - - # Only trust the LLM server; omit the data server - allowlist.add(normalize_url_for_trust(fake_llm_server.base_url)) - # data server deliberately omitted - + """C — data URL not in allowlist → tool GET raises BLOCKED.""" + _, allowlist = patched_interceptor_with_trust + _seed_allowlist(allowlist, fake_llm_server) # data server omitted on purpose _configure_agent_client(fake_llm_server) - data_url = f"{fake_data_server.base_url}/v1/temperature" - - @function_tool(failure_error_function=None) - def get_temperature() -> dict: - """Get the current temperature.""" - return requests_lib.get(data_url).json() - - agent = Agent( - name="weather-agent", - instructions="Use the get_temperature tool and report the result.", - tools=[get_temperature], - model="gpt-4o-mini", - ) - with pytest.raises((RuntimeError, Exception), match="BLOCKED"): - await Runner.run(agent, "What's the temp?") - - -# --------------------------------------------------------------------------- -# Scenario D — untrusted POST blocks the LLM call -# --------------------------------------------------------------------------- - -def _exception_chain_contains(exc: BaseException, pattern: str) -> bool: - """Walk the full exception chain (__cause__, __context__) looking for ``pattern``.""" - seen: set[int] = set() - curr: BaseException | None = exc - while curr is not None and id(curr) not in seen: - seen.add(id(curr)) - if pattern in str(curr): - return True - # Check cause before context - next_exc = curr.__cause__ if curr.__cause__ is not None else curr.__context__ - curr = next_exc - return False + await Runner.run(_make_weather_agent(fake_data_server, fail_loudly=True), "What's the temp?") @pytest.mark.e2e async def test_untrusted_post_blocks_llm_call( fake_llm_server: FakeHttpServer, fake_data_server: FakeHttpServer, - patched_interceptor_with_trust, + patched_interceptor_with_trust: tuple[list[dict[str, Any]], set[str]], ) -> None: - """D: LLM server URL NOT in trusted_endpoints → LLM POST raises BLOCKED. - - Validates that the trust gate now fires on POST (Phase 1 Prereq B). - Uses patched_interceptor_with_trust so the real trust gate fires. + """D — LLM URL not in allowlist → LLM POST raises BLOCKED (validates POST trust gate). - The BLOCKED RuntimeError is raised inside _attach() during AsyncClient.send, - which the openai SDK wraps in an APIConnectionError; we therefore inspect - the full exception chain rather than matching the top-level message. + The BLOCKED RuntimeError is raised inside ``_attach()`` during ``AsyncClient.send``, + which the agents SDK wraps as ``APIConnectionError`` — we walk the exception chain. """ - rows, allowlist = patched_interceptor_with_trust - - # Only trust the data server; omit the LLM server - allowlist.add(normalize_url_for_trust(fake_data_server.base_url)) - # LLM server deliberately omitted - + _, allowlist = patched_interceptor_with_trust + _seed_allowlist(allowlist, fake_data_server) # LLM server omitted on purpose _configure_agent_client(fake_llm_server) - data_url = f"{fake_data_server.base_url}/v1/temperature" - - @function_tool - def get_temperature() -> dict: - """Get the current temperature.""" - return requests_lib.get(data_url).json() - - agent = Agent( - name="weather-agent", - instructions="Use the get_temperature tool and report the result.", - tools=[get_temperature], - model="gpt-4o-mini", - ) - with pytest.raises(Exception) as exc_info: - await Runner.run(agent, "What's the temp?") + await Runner.run(_make_weather_agent(fake_data_server), "What's the temp?") assert _exception_chain_contains(exc_info.value, "BLOCKED"), ( - f"Expected 'BLOCKED' somewhere in the exception chain. Got: {exc_info.value!r}" + f"Expected 'BLOCKED' in exception chain. Got: {exc_info.value!r}" ) -# --------------------------------------------------------------------------- -# Scenario E — self-egress exemption: evaluate_handoff completes even when -# the Provably backend URL is not in trusted_endpoints -# --------------------------------------------------------------------------- - @pytest.mark.e2e def test_self_egress_completes_without_trust( fake_server_factory, patched_interceptor: list[dict[str, Any]], fake_trusted_endpoints: set[str], ) -> None: - """E: Provably backend NOT in trusted_endpoints; evaluate_handoff still completes. + """E — Provably backend NOT in trusted_endpoints; evaluate_handoff still completes. - The SDK wraps its own egress with provably_self_egress() so the trust gate - never fires on internal SDK calls. + The SDK wraps its own egress in ``provably_self_egress()`` so the trust gate never fires + on internal calls. """ - # Leave trusted_endpoints completely empty — the Provably backend is not trusted - assert len(fake_trusted_endpoints) == 0 - - fake_provably = fake_server_factory() - fake_provably.respond( - "GET", - "/api/v1/organizations/org-1/queries/q1", - status=200, - body=_stored({"x": 1}), - ) - fake_provably.respond( - "POST", - "/api/v1/organizations/org-1/queries/q1/verify", - status=200, - body={"verified": True}, - ) + assert len(fake_trusted_endpoints) == 0 # nothing trusted + fake_provably = _make_provably_backend(fake_server_factory, "q1", {"x": 1}) - payload = HandoffPayload( - provably_org_id="org-1", - integration_api_key="key-abc", - claims=[ - HandoffClaim( - action_name="get_data", - claimed_value={"x": 1}, - query_record_id="q1", - ) - ], + eval_result = evaluate_handoff( + _make_payload("q1", {"x": 1}), provably_base_url=fake_provably.base_url ) - - # This must NOT raise BLOCKED — SDK egress is exempt from trust gate - eval_result = evaluate_handoff(payload, provably_base_url=fake_provably.base_url) assert eval_result["outcome"] == "PASS", f"Expected PASS, got: {eval_result}" -# --------------------------------------------------------------------------- -# Scenario F — async LLM call is intercepted -# --------------------------------------------------------------------------- - @pytest.mark.e2e async def test_async_llm_call_intercepted( fake_llm_server: FakeHttpServer, @@ -520,38 +373,19 @@ async def test_async_llm_call_intercepted( patched_interceptor: list[dict[str, Any]], fake_trusted_endpoints: set[str], ) -> None: - """F: assert at least one recorded row has method==POST from the fake LLM server. - - Validates that httpx.AsyncClient.send is patched and recording fires. - """ - fake_trusted_endpoints.add(normalize_url_for_trust(fake_llm_server.base_url)) - fake_trusted_endpoints.add(normalize_url_for_trust(fake_data_server.base_url)) - + """F — at least one recorded POST row points at the fake LLM server (proves + ``AsyncClient.send`` patch fires).""" + _seed_allowlist(fake_trusted_endpoints, fake_llm_server, fake_data_server) _configure_agent_client(fake_llm_server) - data_url = f"{fake_data_server.base_url}/v1/temperature" - - @function_tool - def get_temperature() -> dict: - """Get the current temperature.""" - return requests_lib.get(data_url).json() - - agent = Agent( - name="weather-agent", - instructions="Use the get_temperature tool and report the result.", - tools=[get_temperature], - model="gpt-4o-mini", - ) - - await Runner.run(agent, "What's the temp?") + await Runner.run(_make_weather_agent(fake_data_server), "What's the temp?") - # Assert at least one POST row exists and its URL points to the fake LLM server llm_base = normalize_url_for_trust(fake_llm_server.base_url) post_llm_rows = [ r for r in patched_interceptor if r["method"] == "POST" and llm_base in normalize_url_for_trust(r["url"]) ] - assert len(post_llm_rows) >= 1, ( - f"Expected at least one POST row to LLM server {llm_base}. " - f"Recorded rows: {[(r['method'], r['url']) for r in patched_interceptor]}" + assert post_llm_rows, ( + f"Expected ≥1 POST row to LLM server {llm_base}. " + f"Recorded: {[(r['method'], r['url']) for r in patched_interceptor]}" ) diff --git a/tests/unit/test_interceptor.py b/tests/unit/test_interceptor.py index d87c434..b575d42 100644 --- a/tests/unit/test_interceptor.py +++ b/tests/unit/test_interceptor.py @@ -8,6 +8,7 @@ import provably.intercept._storage as storage import provably.intercept.interceptor as interceptor +from provably.intercept._responses import RequestsJsonOverride from provably.intercept._self_egress import provably_self_egress @@ -33,7 +34,7 @@ def fake_mutate(raw: Any) -> Any: out = interceptor._attach(resp, "https://example.com/x", "GET", {}) assert captured == [{"original": True}] - assert isinstance(out, interceptor._RequestsJsonOverride) + assert isinstance(out, RequestsJsonOverride) assert out.json() == {"user_edited": True} finally: interceptor.set_intercept_url_allowlist(None) From f841b25a70cc5874eba2d40b4368f8e98e0c4ad2 Mon Sep 17 00:00:00 2001 From: rimkusaurimas Date: Wed, 6 May 2026 08:46:48 +0200 Subject: [PATCH 5/6] feat(intercept): patch aiohttp.ClientSession._request (soft dep) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds aiohttp coverage as a third HTTP transport, alongside requests and httpx. aiohttp is NOT promoted to a hard dependency — the patch installs only if `import aiohttp` succeeds at module load time. This unlocks LiteLLM (default aiohttp transport since v1.71), DSPy (LiteLLM-only), CrewAI's LiteLLM fallback path, smolagents' LiteLLM path, and the optional aiohttp extras for Google GenAI / Google ADK. Implementation notes: - New `_record_and_maybe_tamper` helper extracted from `_attach` so the async aiohttp wrapper can reuse the gating + recording + tamper path without duplicating it. `_attach` still owns the sync extract_raw call. - `_wrap_aiohttp_request` awaits `response.read()` to populate aiohttp's body cache (so user code calling .json()/.text() later still works), parses JSON when Content-Type matches, and routes through `_record_and_maybe_tamper`. - Body override (the simulation tamper hook) is not supported for ClientResponse — _record_and_maybe_tamper falls through to return the original response. Recording works in full. - Soft-dep is enforced at module level: `try: import aiohttp` followed by an `if _aiohttp is not None` guard in `init_interceptor`. SDK installs and runs fine without aiohttp present. 3 new unit tests (skipped via `pytest.importorskip` if aiohttp absent): - aiohttp GET via ClientSession records one row - aiohttp POST with json body records request payload - provably_self_egress() block skips recording Tests: 100/100 (was 97). Dockerfile + dev deps updated to install aiohttp. Closes the gap surfaced by https://www.notion.so/356d0532b54e81a2b783c6964da57934 which lists aiohttp in the intercept scope ("httpx + requests + aiohttp"). Related: #10 (botocore patch for AWS Strands — separate follow-up). Co-Authored-By: Claude Opus 4.7 --- CHANGELOG.md | 12 ++++ Dockerfile | 2 +- pyproject.toml | 2 +- src/provably/intercept/interceptor.py | 83 +++++++++++++++++++++++++-- tests/unit/test_interceptor.py | 53 +++++++++++++++++ 5 files changed, 144 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a9e704..d377699 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,18 @@ ## 0.3.0 +### aiohttp interception (soft dependency) + +- `aiohttp.ClientSession._request` is now patched when `aiohttp` is importable. + The intercept SDK does **not** add `aiohttp` as a hard dependency — the patch + installs only when the user's environment already has it. +- This unlocks **LiteLLM** (which uses `aiohttp` as its default transport since + v1.71+) and any framework that opts into an `aiohttp` extra (Google GenAI, + Google ADK, etc.). +- Body override (the simulation tamper hook) is not supported for + `aiohttp.ClientResponse` — recording fires in full but the response is + returned unchanged. + ### OpenAI Agents SDK integration - Added `examples/openai_agents/` — a runnable end-to-end demo that drives a diff --git a/Dockerfile b/Dockerfile index ff4dfa8..a59d30d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -56,7 +56,7 @@ COPY --from=builder /dist /dist RUN pip install --upgrade pip \ && pip install /dist/*.whl \ - && pip install "pytest>=8.0" "pytest-asyncio>=0.23" "ruff>=0.3" "build>=1.2" "openai-agents" + && pip install "pytest>=8.0" "pytest-asyncio>=0.23" "ruff>=0.3" "build>=1.2" "openai-agents" "aiohttp>=3.9" COPY pyproject.toml ./ COPY tests ./tests diff --git a/pyproject.toml b/pyproject.toml index b605407..84121bb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ dependencies = [ ] [project.optional-dependencies] -dev = ["pytest>=8.0", "pytest-asyncio>=0.23", "ruff>=0.3", "build>=1.2", "openai-agents>=0.0.3"] +dev = ["pytest>=8.0", "pytest-asyncio>=0.23", "ruff>=0.3", "build>=1.2", "openai-agents>=0.0.3", "aiohttp>=3.9"] [project.urls] Homepage = "https://github.com/ProvablyAI/provably-python-sdk" diff --git a/src/provably/intercept/interceptor.py b/src/provably/intercept/interceptor.py index 5b9497c..225c232 100644 --- a/src/provably/intercept/interceptor.py +++ b/src/provably/intercept/interceptor.py @@ -12,6 +12,11 @@ import httpx import requests +try: + import aiohttp as _aiohttp +except ImportError: + _aiohttp = None # type: ignore[assignment] + from provably.intercept._reentry import already_recording, recording_scope from provably.intercept._responses import ( HttpxJsonOverride, @@ -145,6 +150,12 @@ def init_interceptor() -> None: requests.Session.send = _wrap_session_send(_orig["requests_session_send"]) httpx.Client.send = _wrap_client_send(_orig["httpx_client_send"]) httpx.AsyncClient.send = _wrap_async_client_send(_orig["httpx_async_client_send"]) + # Soft-dep: aiohttp is not a hard dependency of this SDK. When present, patch the + # central ClientSession._request choke point that every aiohttp call routes through + # (LiteLLM's default transport, optional Google GenAI / Google ADK paths, etc.). + if _aiohttp is not None: + _orig["aiohttp_session_request"] = _aiohttp.ClientSession._request + _aiohttp.ClientSession._request = _wrap_aiohttp_request(_orig["aiohttp_session_request"]) _initialized = True _enabled = True @@ -196,25 +207,36 @@ def _maybe_transform_body(raw: Any) -> Any: def _attach(response: Any, url: str, method: str, req_kwargs: dict[str, Any]) -> Any: - """Record the response and optionally mutate it via the simulation hook. + """Record an httpx/requests response and optionally mutate it via the simulation hook. Short-circuits immediately when inside a self-egress block or already in the middle of recording (re-entry guard prevents double-recording when module-level calls like ``httpx.get`` internally delegate to a patched ``Client.send``). + + For aiohttp responses, see :func:`_wrap_aiohttp_request` which awaits the body + asynchronously before calling :func:`_record_and_maybe_tamper` directly. """ if is_self_egress() or already_recording(): return response - raw = extract_raw(response) + return _record_and_maybe_tamper(response, url, method, req_kwargs, extract_raw(response)) + + +def _record_and_maybe_tamper( + response: Any, url: str, method: str, req_kwargs: dict[str, Any], raw: Any +) -> Any: + """Inside-the-gate recording path shared by sync (_attach) and async (aiohttp) callers. + + Callers must have already cleared self-egress / re-entry guards. ``raw`` is the + pre-extracted body (sync from ``extract_raw``, or awaited for aiohttp). + """ req = request_payload_dict(url, method, req_kwargs) nurl = normalize_url_for_trust(str(url)) if _url_allowlist is not None and nurl not in _url_allowlist: return response - # Recording: any request that passed the allowlist gate (including legacy mode when - # allowlist is None — all outbound traffic). if _enabled: _insert_row(url, req, raw, method=method) - # Simulation tamper hook: only for explicit run endpoints, never for OpenRouter, - # Provably API, cluster handoff posts, etc. (those run with allowlist cleared or off-list). + # Tamper hook fires only for explicit run endpoints; never for OpenRouter, Provably API, + # cluster handoff posts (those run with allowlist cleared or off-list). tamper = _url_allowlist is not None and nurl in _url_allowlist mutated = _maybe_transform_body(raw) if tamper else raw if mutated is raw: @@ -223,6 +245,7 @@ def _attach(response: Any, url: str, method: str, req_kwargs: dict[str, Any]) -> return RequestsJsonOverride(response, mutated) if isinstance(response, httpx.Response): return HttpxJsonOverride(response, mutated) + # aiohttp.ClientResponse and other libraries: body override not supported, return as-is. return response @@ -313,3 +336,51 @@ def wrapped(self, request: requests.PreparedRequest, **kwargs: Any) -> requests. return _attach(response, str(request.url), request.method or "GET", _requests_prepared_to_kwargs(request)) return wrapped + + +_AIOHTTP_KWARG_KEYS = ("params", "json", "data") + + +def _aiohttp_kwargs_to_kwargs(kwargs: dict[str, Any]) -> dict[str, Any]: + """Filter aiohttp _request kwargs down to the subset request_payload_dict cares about.""" + return {k: v for k, v in kwargs.items() if k in _AIOHTTP_KWARG_KEYS and v is not None} + + +def _wrap_aiohttp_request(orig_request): + """Wrap aiohttp.ClientSession._request — the central method every aiohttp call routes through. + + Async wrapper: awaits the original to get a ClientResponse, then awaits ``response.read()`` + to populate aiohttp's body cache (so user code calling ``.json()`` / ``.text()`` later + still works), parses the body, and routes through ``_record_and_maybe_tamper``. + + Body override (the simulation tamper hook) is not supported for aiohttp responses — the + response is returned as-is. Recording works in full. + """ + + async def wrapped(self, method: str, str_or_url: Any, **kwargs: Any) -> Any: + response = await orig_request(self, method, str_or_url, **kwargs) + if is_self_egress() or already_recording(): + return response + with recording_scope(): + try: + body_bytes = await response.read() + except Exception: # noqa: BLE001 + body_bytes = b"" + content_type = response.headers.get("Content-Type", "") if response.headers else "" + text = body_bytes.decode(errors="replace") if body_bytes else "" + if "application/json" in content_type and text: + try: + raw = json.loads(text) + except Exception: # noqa: BLE001 + raw = {"text": text} + else: + raw = {"text": text} + return _record_and_maybe_tamper( + response, + str(str_or_url), + method, + _aiohttp_kwargs_to_kwargs(kwargs), + raw, + ) + + return wrapped diff --git a/tests/unit/test_interceptor.py b/tests/unit/test_interceptor.py index b575d42..9ab09f5 100644 --- a/tests/unit/test_interceptor.py +++ b/tests/unit/test_interceptor.py @@ -243,6 +243,59 @@ def fake_require(postgres_url: str, url: str) -> None: assert trust_calls[0][1] == "https://untrusted.example/api" +# --------------------------------------------------------------------------- +# aiohttp coverage (soft dependency — only installs when aiohttp is importable) +# --------------------------------------------------------------------------- + +aiohttp = pytest.importorskip("aiohttp") + + +async def test_aiohttp_session_request_intercepted(monkeypatch: Any, fake_server: Any) -> None: + """aiohttp.ClientSession().get(url) records exactly one row via the _request patch.""" + fake_server.respond("GET", "/aiohttp-data", status=200, body={"from_aiohttp": True}) + rows: list[dict[str, Any]] = [] + _setup_interceptor(monkeypatch, rows) + + async with aiohttp.ClientSession() as session: + async with session.get(f"{fake_server.base_url}/aiohttp-data") as response: + assert response.status == 200 + data = await response.json() + assert data == {"from_aiohttp": True} + + assert len(rows) == 1, f"Expected 1 row, got {len(rows)}: {rows}" + assert rows[0]["method"] == "GET" + assert rows[0]["raw"] == {"from_aiohttp": True} + + +async def test_aiohttp_post_with_json_body_intercepted(monkeypatch: Any, fake_server: Any) -> None: + """POSTing JSON via aiohttp records request payload correctly.""" + fake_server.respond("POST", "/echo", status=200, body={"ok": True}) + rows: list[dict[str, Any]] = [] + _setup_interceptor(monkeypatch, rows) + + async with aiohttp.ClientSession() as session: + async with session.post(f"{fake_server.base_url}/echo", json={"k": "v"}) as response: + assert response.status == 200 + + assert len(rows) == 1 + assert rows[0]["method"] == "POST" + assert rows[0]["request"]["json"] == {"k": "v"} + + +async def test_aiohttp_self_egress_skips_recording(monkeypatch: Any, fake_server: Any) -> None: + """Inside provably_self_egress(): aiohttp call records nothing.""" + fake_server.respond("GET", "/data", status=200, body={"v": 9}) + rows: list[dict[str, Any]] = [] + _setup_interceptor(monkeypatch, rows) + + with provably_self_egress(): + async with aiohttp.ClientSession() as session: + async with session.get(f"{fake_server.base_url}/data") as _resp: + pass + + assert rows == [] + + def test_trust_gate_fires_on_delete(monkeypatch: Any) -> None: """DELETE to an untrusted URL raises RuntimeError('BLOCKED: ...') — all-methods coverage.""" trust_calls: list[str] = [] From 3d001a8e99384b97326833225487bfebfe06a140 Mon Sep 17 00:00:00 2001 From: rimkusaurimas Date: Wed, 6 May 2026 08:48:41 +0200 Subject: [PATCH 6/6] docs(readme): add framework coverage section MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a new 'Framework coverage' section listing transport patches and per-framework status. Also updates the intercept description to mention aiohttp alongside requests/httpx. Pulls the audit + post-aiohttp coverage matrix into the README so users can see at a glance which frameworks work today vs. what is gapped (CrewAI Bedrock path, AWS Strands — both tracked in #10). Co-Authored-By: Claude Opus 4.7 --- README.md | 45 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 389c962..1e9ed9a 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ to at all — is enforced before the request leaves the process. ## Contents - [What it does](#what-it-does) +- [Framework coverage](#framework-coverage) - [Install](#install) - [Quick start](#quick-start) - [Configuration](#configuration) @@ -60,8 +61,8 @@ flowchart LR The flow, in order: -1. **Intercept + Police** — every outbound `requests` / `httpx` call goes - through the SDK's monkey-patched HTTP path. _Inside_ the interceptor, before +1. **Intercept + Police** — every outbound `requests` / `httpx` / `aiohttp` + call goes through the SDK's monkey-patched HTTP path. _Inside_ the interceptor, before the request leaves the process, the URL is checked against the `trusted_endpoints` table. If the URL is not registered the call is killed with `RuntimeError("BLOCKED: ...")` and never reaches the network. @@ -91,6 +92,46 @@ Nothing in this loop relies on a model self-evaluating its own output. | Eval service | **You** — any HTTP service that calls `provably.evaluate_handoff(...)` on the incoming payload. | The SDK gives you the function; you decide where to host it. | | Provably query record | **Provably** — fetched over HTTPS by the eval service using the `integration_api_key` from the handoff payload. | This is the source of truth the evaluator compares each claim against. | +## Framework coverage + +The interceptor patches the central HTTP transport choke points, so coverage of +agent frameworks follows automatically from which library a framework uses +under the hood. As of v0.3.0: + +**Transport patches** + +| Transport | Patched at | +| --- | --- | +| `requests` | module-level `get`/`post` + `Session.send` | +| `httpx` | module-level `get`/`post` + `Client.send` + `AsyncClient.send` | +| `aiohttp` | `ClientSession._request` (soft dep — patches only when `aiohttp` is importable) | +| `botocore` / `urllib3` | _pending_ — see [issue #10](https://github.com/ProvablyAI/provably-python-sdk/issues/10) | + +**Agent / LLM frameworks** + +| Framework | Status | Notes | +| --- | --- | --- | +| OpenAI SDK | ✅ | httpx | +| Anthropic SDK | ✅ | httpx | +| Pydantic AI | ✅ | delegates to AsyncOpenAI / AsyncAnthropic | +| LangChain | ✅ | delegates to provider SDKs | +| LangGraph | ✅ | same | +| LlamaIndex | ✅ | httpx via OpenAI SDK | +| AutoGen | ✅ | AsyncOpenAI | +| Haystack | ✅ | migrated to httpx (2024–25) | +| Phidata / Agno | ✅ | AsyncOpenAI / `httpx[http2]` | +| OpenAI Agents SDK | ✅ | httpx; e2e suite at [tests/e2e/test_openai_agents_e2e.py](tests/e2e/test_openai_agents_e2e.py); demo at [examples/openai_agents/](examples/openai_agents/) | +| Google GenAI | ✅ | httpx default + optional `aiohttp` extra | +| LiteLLM | ✅ | aiohttp transport (default since v1.71) | +| DSPy | ✅ | LiteLLM only | +| smolagents | ✅ | OpenAI SDK / HF / LiteLLM paths covered | +| CrewAI | ⚠️ | OpenAI/Anthropic ✅, LiteLLM fallback ✅, **Bedrock provider ❌** (boto3) | +| AWS Strands | ❌ | boto3/botocore → urllib3; tracked in [issue #10](https://github.com/ProvablyAI/provably-python-sdk/issues/10) | + +**Out of scope for the HTTP interception layer** (separate shipping units): +MCP servers, in-process LLMs (`transformers`, `mlx_lm`), gRPC (Google ADK +A2A), websockets, raw sockets. + ## Install > **Status:** v0.2 — not yet published to PyPI. Install from source.