diff --git a/CHANGELOG.md b/CHANGELOG.md index d17bd1a..d377699 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,60 @@ # Changelog +## 0.3.0 + +### aiohttp interception (soft dependency) + +- `aiohttp.ClientSession._request` is now patched when `aiohttp` is importable. + The intercept SDK does **not** add `aiohttp` as a hard dependency — the patch + installs only when the user's environment already has it. +- This unlocks **LiteLLM** (which uses `aiohttp` as its default transport since + v1.71+) and any framework that opts into an `aiohttp` extra (Google GenAI, + Google ADK, etc.). +- Body override (the simulation tamper hook) is not supported for + `aiohttp.ClientResponse` — recording fires in full but the response is + returned unchanged. + +### OpenAI Agents SDK integration + +- Added `examples/openai_agents/` — a runnable end-to-end demo that drives a + real OpenRouter model call through the full Provably intercept → handoff → + evaluate pipeline. Model: `openai/gpt-4o-mini` (~$0.001/run); data API: + Open-Meteo (no auth required). +- Added `tests/e2e/test_openai_agents_e2e.py` — six deterministic scenarios + (A–F) using in-process `FakeHttpServer`s; zero network egress in CI. + +### Broader HTTP interception surface + +- `httpx.Client.send`, `httpx.AsyncClient.send`, and `requests.Session.send` + are now patched in addition to the existing module-level shortcuts + (`httpx.get`, `httpx.post`, `requests.get`, `requests.post`). This means + every outbound HTTP call from any framework — including the async agent loops + used by the OpenAI Agents SDK — is intercepted without any user-side changes. +- A re-entry contextvar guard (`_reentry.already_recording`) prevents + double-recording when a module-level call (e.g. `httpx.get`) internally + delegates to the newly-patched `Client.send`. + +### Trust gate fires on all HTTP methods (BREAKING-ISH) + +- **Before this release** `_require_trusted_endpoint` was only called for GET + requests. It now fires unconditionally for every method (POST, PUT, PATCH, + DELETE, etc.). +- **Migration:** register every outbound URL — including your LLM provider + (e.g. `https://openrouter.ai/api/v1/chat/completions`) — in `trusted_endpoints` + before running an agent. Use `INSERT ... ON CONFLICT DO NOTHING` or the + Provably dashboard to add rows. See `examples/openai_agents/agent_run.py` + for the pattern. + +### New `provably_self_egress()` context manager + +- Added `provably.intercept.provably_self_egress()` — a context manager that + marks a block of code as SDK-internal egress. Inside it, the trust gate is + bypassed and no intercept rows are written. All SDK self-egress sites + (`handoff.transport`, `handoff.evaluator`, `handoff._bootstrap`) already wrap + their own HTTP calls in this context, so the SDK never trips its own gate. + Advanced users who make their own Provably API calls from within an agent loop + can use this to avoid BLOCKED errors. + ## 0.2.0 - Added `provably.configure_indexing(enable_indexing: bool)`: one-call bootstrap (`initialize_runtime` + `init_interceptor` + `enable` / `disable`) for sender agents. diff --git a/Dockerfile b/Dockerfile index a95406f..a59d30d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -56,7 +56,7 @@ COPY --from=builder /dist /dist RUN pip install --upgrade pip \ && pip install /dist/*.whl \ - && pip install "pytest>=8.0" "ruff>=0.3" "build>=1.2" + && pip install "pytest>=8.0" "pytest-asyncio>=0.23" "ruff>=0.3" "build>=1.2" "openai-agents" "aiohttp>=3.9" COPY pyproject.toml ./ COPY tests ./tests diff --git a/README.md b/README.md index 389c962..1e9ed9a 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ to at all — is enforced before the request leaves the process. ## Contents - [What it does](#what-it-does) +- [Framework coverage](#framework-coverage) - [Install](#install) - [Quick start](#quick-start) - [Configuration](#configuration) @@ -60,8 +61,8 @@ flowchart LR The flow, in order: -1. **Intercept + Police** — every outbound `requests` / `httpx` call goes - through the SDK's monkey-patched HTTP path. _Inside_ the interceptor, before +1. **Intercept + Police** — every outbound `requests` / `httpx` / `aiohttp` + call goes through the SDK's monkey-patched HTTP path. _Inside_ the interceptor, before the request leaves the process, the URL is checked against the `trusted_endpoints` table. If the URL is not registered the call is killed with `RuntimeError("BLOCKED: ...")` and never reaches the network. @@ -91,6 +92,46 @@ Nothing in this loop relies on a model self-evaluating its own output. | Eval service | **You** — any HTTP service that calls `provably.evaluate_handoff(...)` on the incoming payload. | The SDK gives you the function; you decide where to host it. | | Provably query record | **Provably** — fetched over HTTPS by the eval service using the `integration_api_key` from the handoff payload. | This is the source of truth the evaluator compares each claim against. | +## Framework coverage + +The interceptor patches the central HTTP transport choke points, so coverage of +agent frameworks follows automatically from which library a framework uses +under the hood. As of v0.3.0: + +**Transport patches** + +| Transport | Patched at | +| --- | --- | +| `requests` | module-level `get`/`post` + `Session.send` | +| `httpx` | module-level `get`/`post` + `Client.send` + `AsyncClient.send` | +| `aiohttp` | `ClientSession._request` (soft dep — patches only when `aiohttp` is importable) | +| `botocore` / `urllib3` | _pending_ — see [issue #10](https://github.com/ProvablyAI/provably-python-sdk/issues/10) | + +**Agent / LLM frameworks** + +| Framework | Status | Notes | +| --- | --- | --- | +| OpenAI SDK | ✅ | httpx | +| Anthropic SDK | ✅ | httpx | +| Pydantic AI | ✅ | delegates to AsyncOpenAI / AsyncAnthropic | +| LangChain | ✅ | delegates to provider SDKs | +| LangGraph | ✅ | same | +| LlamaIndex | ✅ | httpx via OpenAI SDK | +| AutoGen | ✅ | AsyncOpenAI | +| Haystack | ✅ | migrated to httpx (2024–25) | +| Phidata / Agno | ✅ | AsyncOpenAI / `httpx[http2]` | +| OpenAI Agents SDK | ✅ | httpx; e2e suite at [tests/e2e/test_openai_agents_e2e.py](tests/e2e/test_openai_agents_e2e.py); demo at [examples/openai_agents/](examples/openai_agents/) | +| Google GenAI | ✅ | httpx default + optional `aiohttp` extra | +| LiteLLM | ✅ | aiohttp transport (default since v1.71) | +| DSPy | ✅ | LiteLLM only | +| smolagents | ✅ | OpenAI SDK / HF / LiteLLM paths covered | +| CrewAI | ⚠️ | OpenAI/Anthropic ✅, LiteLLM fallback ✅, **Bedrock provider ❌** (boto3) | +| AWS Strands | ❌ | boto3/botocore → urllib3; tracked in [issue #10](https://github.com/ProvablyAI/provably-python-sdk/issues/10) | + +**Out of scope for the HTTP interception layer** (separate shipping units): +MCP servers, in-process LLMs (`transformers`, `mlx_lm`), gRPC (Google ADK +A2A), websockets, raw sockets. + ## Install > **Status:** v0.2 — not yet published to PyPI. Install from source. diff --git a/examples/openai_agents/README.md b/examples/openai_agents/README.md new file mode 100644 index 0000000..5501cc4 --- /dev/null +++ b/examples/openai_agents/README.md @@ -0,0 +1,100 @@ +# OpenAI Agents SDK + Provably — Runnable Demo + +This demo shows an end-to-end run of the Provably SDK integrated with the +[OpenAI Agents SDK](https://github.com/openai/openai-agents-python) (>=0.0.3). +It exercises every pillar of the SDK in a single script: + +1. **Intercept** — `configure_indexing(True)` installs monkey-patches on + `httpx.AsyncClient.send` and `requests.Session.send` so every outbound HTTP + request from the agent loop is captured and stored in `provably_intercepts`. +2. **Trust gate** — before storing a request the SDK checks that its URL is + registered in `trusted_endpoints`. The demo seeds both the OpenRouter + chat-completions URL and the Open-Meteo weather URL before running the agent. +3. **Tool call** — the agent uses a `@function_tool` that calls the free + [Open-Meteo API](https://open-meteo.com/) (no API key required) to fetch the + current temperature in London. +4. **Handoff** — the captured intercept row id is wrapped in a `HandoffPayload` + with one `HandoffClaim` asserting the tool output. +5. **Evaluate** — `evaluate_handoff()` fetches the stored query record from the + Provably backend, compares it to the claimed value, and prints the verdict. + +Expected output (abbreviated): + +```json +{ + "outcome": "PASS", + "per_claim": [ + { + "action_name": "get_weather", + "result": "PASS", + "proof_time_ms": 42, + "verify_time_ms": 137 + } + ], + "errors": [] +} +``` + +## Required environment variables + +| Variable | Required | Notes | +|---|---|---| +| `OPENROUTER_API_KEY` | yes | API key for [OpenRouter](https://openrouter.ai/). Used for the model call (`openai/gpt-4o-mini`). | +| `PROVABLY_API_KEY` | yes | Provably integration API key. | +| `PROVABLY_ORG_ID` | yes | Provably organisation id. Scopes trusted-endpoint and query-record lookups. | +| `PROVABLY_RUST_BE_URL` | yes | Base URL of the Provably Rust backend (e.g. `https://api.provably.ai`). | +| `POSTGRES_URL` | yes | PostgreSQL DSN (e.g. `postgresql://user:pass@host/db`). Used for intercept storage and trusted-endpoint registry. | + +## How to run + +```bash +# 1. Install the SDK in editable mode with dev extras (includes openai-agents) +pip install -e .[dev] + +# 2. Export the required env vars +export OPENROUTER_API_KEY="sk-or-..." +export PROVABLY_API_KEY="prov_..." +export PROVABLY_ORG_ID="org_..." +export PROVABLY_RUST_BE_URL="https://api.provably.ai" +export POSTGRES_URL="postgresql://user:pass@localhost/provably" + +# 3. Run the demo +python examples/openai_agents/agent_run.py +``` + +## Model and cost + +The demo uses **`openai/gpt-4o-mini`** on OpenRouter — a cheap, capable model +that reliably follows tool-calling instructions. Estimated cost is approximately +**$0.001 per run** (one tool call + one summary turn). + +## How the trust gate works — and what happens when you forget to seed it + +The Provably SDK now enforces trust on **all HTTP methods** (GET, POST, etc.), +not only GET. This means the LLM provider call (a POST to OpenRouter) *and* the +weather API call (a GET to Open-Meteo) both need to be registered in +`trusted_endpoints` before the agent runs. + +If you forget to seed an endpoint, the SDK raises: + +``` +RuntimeError: BLOCKED: endpoint https://openrouter.ai/api/v1/chat/completions not in trusted index for org +``` + +When this error occurs inside `httpx.AsyncClient.send` (the async LLM call), the +OpenAI SDK wraps it in an `APIConnectionError`. You can inspect the full +exception chain to find the original `BLOCKED: ...` message. + +**Migration note for existing users:** if you were previously relying on the SDK +only trust-checking GET requests, you must now register *all* outbound URLs +including your LLM provider URL. Use the `seed_trusted_endpoints` helper pattern +shown in this demo (raw psycopg2 `INSERT ... ON CONFLICT DO NOTHING`), or add +rows via the Provably dashboard. + +## How `provably_self_egress()` relates to this demo + +The Provably SDK's own HTTP calls (fetching query records, posting verify +requests, bootstrap handshakes) are **never** blocked by the trust gate. They +run inside `with provably_self_egress():` context managers that mark them as +SDK-internal egress, so the trust gate is bypassed automatically. You do not +need to add Provably's own backend URL to `trusted_endpoints`. diff --git a/examples/openai_agents/agent_run.py b/examples/openai_agents/agent_run.py new file mode 100644 index 0000000..c2ca86b --- /dev/null +++ b/examples/openai_agents/agent_run.py @@ -0,0 +1,204 @@ +""" +Runnable demo: OpenAI Agents SDK + Provably interception → handoff → evaluate. + +Prerequisites +------------- +Set the following environment variables before running: + + OPENROUTER_API_KEY – OpenRouter API key (model call) + PROVABLY_API_KEY – Provably integration key + PROVABLY_ORG_ID – Provably organisation id + PROVABLY_RUST_BE_URL – Provably Rust backend base URL + POSTGRES_URL – PostgreSQL DSN for intercept storage + +Run: + pip install -e .[dev] + python examples/openai_agents/agent_run.py + +Cost: ~$0.001 per run using openai/gpt-4o-mini on OpenRouter. +""" + +from __future__ import annotations + +import asyncio +import json +import os + +import psycopg2 +import requests +from agents import Agent, Runner, function_tool, set_default_openai_api, set_default_openai_client +from openai import AsyncOpenAI + +import provably.runtime as _prt +from provably.handoff.client import cached_integration_api_key +from provably.handoff.evaluator import evaluate_handoff +from provably.handoff.types import HandoffClaim, HandoffPayload +from provably.intercept import set_interceptor_context, take_last_intercept_row_id +from provably.trusted_endpoints import ( + ensure_trusted_endpoints_table, + load_trusted_endpoint_urls, + normalize_url_for_trust, +) + +# --------------------------------------------------------------------------- +# Trusted endpoint URLs for this demo +# --------------------------------------------------------------------------- +_OPENROUTER_COMPLETIONS_URL = "https://openrouter.ai/api/v1/chat/completions" +_OPEN_METEO_BASE_URL = "https://api.open-meteo.com/v1/forecast" + +_TRUSTED_URLS = [ + _OPENROUTER_COMPLETIONS_URL, + _OPEN_METEO_BASE_URL, +] + + +def _seed_trusted_endpoints() -> None: + """Insert the demo URLs into trusted_endpoints (idempotent ON CONFLICT DO NOTHING).""" + postgres_url = os.environ["POSTGRES_URL"] + org_id = os.environ["PROVABLY_ORG_ID"] + + conn = psycopg2.connect(postgres_url) + try: + ensure_trusted_endpoints_table(conn) + with conn.cursor() as cur: + for url in _TRUSTED_URLS: + norm = normalize_url_for_trust(url) + cur.execute( + """ + INSERT INTO trusted_endpoints (org_id, normalized_url, display_label, entry_type) + VALUES (%s, %s, %s, 'endpoint') + ON CONFLICT (org_id, normalized_url) WHERE revoked_at IS NULL DO NOTHING + """, + (org_id, norm, url), + ) + conn.commit() + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Tool definition — the @function_tool decorator registers the schema at +# import time but does NOT make HTTP calls, so the interceptor doesn't need +# to be active yet at decoration time. +# --------------------------------------------------------------------------- +@function_tool +def get_current_temperature_london() -> dict: + """Fetch the current temperature in London (51.5074 N, 0.1278 W) from Open-Meteo. + + Returns a dict with a ``temperature_2m`` key (Celsius, float). + """ + set_interceptor_context(agent_id="demo", action_name="get_weather") + + response = requests.get( + _OPEN_METEO_BASE_URL, + params={ + "latitude": 51.5074, + "longitude": -0.1278, + "current": "temperature_2m", + }, + timeout=30, + ) + response.raise_for_status() + data = response.json() + current = data.get("current", {}) + return {"temperature_2m": current.get("temperature_2m")} + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +async def main() -> None: + # Step 1 — activate Provably indexing (interceptor + storage) + # Must happen before Runner.run() so all HTTP calls are recorded. + _prt.configure_indexing(enable_indexing=True) + + # Step 2 — configure the Agents SDK to use OpenRouter (Chat Completions API) + openrouter_client = AsyncOpenAI( + base_url="https://openrouter.ai/api/v1", + api_key=os.environ["OPENROUTER_API_KEY"], + ) + set_default_openai_client(openrouter_client, use_for_tracing=False) + # OpenRouter speaks Chat Completions, not the Responses API the SDK defaults to. + set_default_openai_api("chat_completions") + + # Step 3 — seed trusted endpoints so the trust gate allows these URLs + print("Seeding trusted endpoints…") + _seed_trusted_endpoints() + + # Step 4 — run the agent + agent = Agent( + name="weather-demo", + instructions=( + "You are a helpful assistant. When asked about the current temperature in London, " + "use the get_current_temperature_london tool and report the result clearly." + ), + tools=[get_current_temperature_london], + model="openai/gpt-4o-mini", + ) + + print("Running agent…") + result = await Runner.run(agent, "What is the current temperature in London?") + print(f"\nAgent response: {result.final_output}\n") + + # Step 5 — capture the intercept row id for the weather tool call + intercept_row_id = take_last_intercept_row_id() + if intercept_row_id is None: + print("WARNING: No intercept row captured. Check POSTGRES_URL and that trusted_endpoints are seeded correctly.") + + # Step 6 — extract tool output for the claim + tool_output_value: dict = {} + for item in result.new_items: + if hasattr(item, "output"): + raw_out = item.output + if isinstance(raw_out, str): + try: + tool_output_value = json.loads(raw_out) + except Exception: # noqa: BLE001 + pass + elif isinstance(raw_out, dict): + tool_output_value = raw_out + if tool_output_value: + break + + print(f"Tool output captured for claim: {tool_output_value}") + + # Step 7 — build the HandoffPayload and evaluate + org_id = os.environ["PROVABLY_ORG_ID"] + provably_base_url = os.environ.get("PROVABLY_RUST_BE_URL", "").rstrip("/") + postgres_url = os.environ["POSTGRES_URL"] + + # NOTE: in production use build_handoff_payload() to obtain a real Provably + # query record UUID. This demo passes the raw intercepts PK as a stand-in. + query_record_id = str(intercept_row_id) if intercept_row_id else "" + trusted_urls = load_trusted_endpoint_urls(postgres_url, org_id) + + payload = HandoffPayload( + provably_org_id=org_id, + integration_api_key=cached_integration_api_key(), + trusted_endpoint_registry=trusted_urls, + claims=[ + HandoffClaim( + action_name="get_weather", + claimed_value=tool_output_value, + query_record_id=query_record_id, + verification_mode="verbatim", + ) + ], + ) + + print("Evaluating handoff…") + eval_result = evaluate_handoff( + payload, + provably_base_url=provably_base_url, + postgres_url=postgres_url, + org_id_fallback=org_id, + ) + + print("\nEvaluation result:") + print(json.dumps(eval_result, indent=2)) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index fe9dc49..84121bb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ dependencies = [ ] [project.optional-dependencies] -dev = ["pytest>=8.0", "ruff>=0.3", "build>=1.2"] +dev = ["pytest>=8.0", "pytest-asyncio>=0.23", "ruff>=0.3", "build>=1.2", "openai-agents>=0.0.3", "aiohttp>=3.9"] [project.urls] Homepage = "https://github.com/ProvablyAI/provably-python-sdk" @@ -51,6 +51,7 @@ known-first-party = ["provably"] [tool.pytest.ini_options] testpaths = ["tests"] pythonpath = ["src", "."] +asyncio_mode = "auto" markers = [ "e2e: end-to-end tests that drive the SDK against a real loopback HTTP server", ] diff --git a/src/provably/handoff/_http.py b/src/provably/handoff/_http.py index 8d0a846..6333bc5 100644 --- a/src/provably/handoff/_http.py +++ b/src/provably/handoff/_http.py @@ -14,10 +14,24 @@ _log = get_logger(__name__) _SESSION = requests.Session() - _TRANSIENT_STATUS = {429, 502, 503, 504} +def _request(method: str, path: str, **kwargs: Any) -> requests.Response: + """All outbound HTTP from this module funnels through here. + + Wraps every call in ``provably_self_egress()`` so the SDK's own backend traffic + bypasses the trust gate and the intercept recorder. The ``provably_self_egress`` + import is deferred to avoid the circular import: + ``provably.intercept`` → ``interceptor`` → ``_storage`` → ``handoff._preprocess`` + → ``handoff._http``. + """ + from provably.intercept._self_egress import provably_self_egress # noqa: PLC0415 + + with provably_self_egress(): + return _SESSION.request(method, f"{base_url()}{path}", headers=headers(), **kwargs) + + def base_url() -> str: env_url = os.getenv("PROVABLY_RUST_BE_URL", "").strip() if not env_url: @@ -129,7 +143,7 @@ def log_failed_response(resp: requests.Response) -> None: def get_json(path: str) -> Any: - resp = _SESSION.get(f"{base_url()}{path}", headers=headers(), timeout=60) + resp = _request("GET", path, timeout=60) if not resp.ok: log_failed_response(resp) resp.raise_for_status() @@ -138,25 +152,15 @@ def get_json(path: str) -> Any: def get_json_params(path: str, params: dict[str, Any], *, timeout_s: float = 60.0) -> Any: """GET with query-string parameters (e.g. list queries filtered by ``collection_ids``).""" - resp = _SESSION.get( - f"{base_url()}{path}", headers=headers(), params=params, timeout=timeout_s - ) + resp = _request("GET", path, params=params, timeout=timeout_s) if not resp.ok: log_failed_response(resp) resp.raise_for_status() - if not resp.text: - return [] - data = resp.json() - return data + return resp.json() if resp.text else [] def post_json(path: str, payload: dict[str, Any] | None = None) -> dict[str, Any]: - resp = _SESSION.post( - f"{base_url()}{path}", - headers=headers(), - json=payload or {}, - timeout=60, - ) + resp = _request("POST", path, json=payload or {}, timeout=60) if not resp.ok: log_failed_response(resp) resp.raise_for_status() @@ -165,7 +169,7 @@ def post_json(path: str, payload: dict[str, Any] | None = None) -> dict[str, Any def post_raw(path: str, payload: dict[str, Any]) -> requests.Response: """Post without raising so callers can inspect error bodies (e.g. 'already exists').""" - return _SESSION.post(f"{base_url()}{path}", headers=headers(), json=payload, timeout=60) + return _request("POST", path, json=payload, timeout=60) def post_json_with_transient_retry( @@ -178,12 +182,7 @@ def post_json_with_transient_retry( payload = payload or {} last: requests.Response | None = None for attempt in range(max_attempts): - last = _SESSION.post( - f"{base_url()}{path}", - headers=headers(), - json=payload, - timeout=120, - ) + last = _request("POST", path, json=payload, timeout=120) if last.status_code in _TRANSIENT_STATUS: log_failed_response(last) time.sleep(min(3.0 * (attempt + 1), 45)) diff --git a/src/provably/handoff/evaluator.py b/src/provably/handoff/evaluator.py index 138a86b..efc2110 100644 --- a/src/provably/handoff/evaluator.py +++ b/src/provably/handoff/evaluator.py @@ -26,6 +26,7 @@ from provably.handoff.eval_modes import evaluate_claim from provably.handoff.json_utils import canonical_json from provably.handoff.types import HandoffClaim, HandoffPayload +from provably.intercept._self_egress import provably_self_egress from provably.log import get_logger from provably.trusted_endpoints import check_claim_endpoints_are_trusted @@ -106,40 +107,41 @@ def evaluate_handoff( per_claim: list[dict[str, Any]] = [] errors: list[str] = [] - with httpx.Client(timeout=timeout_s) as client: - # Phase 1+2: per-claim compare; capture TPT (TVT comes after verify in phase 3). - for claim in payload.claims: - query_record_id = (claim.query_record_id or "").strip() - if not query_record_id: - err = "missing query_record_id" - errors.append(f"{claim.action_name}: {err}") - per_claim.append(_error(claim, err)) - continue - try: - record = _fetch_query_record(client, base_url, org, query_record_id, api_key) - except Exception as exc: # noqa: BLE001 - # Record exists (we have a query_record_id) but can't be fetched — treat as - # unverifiable (suspicious), not a plain infra/setup error. - errors.append(f"{claim.action_name}: fetch failed: {exc}") - per_claim.append(_caught_verdict(claim, f"fetch failed: {exc}")) - continue - verdict = evaluate_claim(claim, extract_indexed_from_query_record(record)) - verdict["query_record_id"] = query_record_id - timing = _timing_from_query_record(record) - if timing: - verdict = {**verdict, **timing} - per_claim.append(verdict) - - # Phase 3 (final): /verify each unique query_record_id and refresh TVT timing. - _verify_and_refresh_timings( - client=client, - base_url=base_url, - org=org, - api_key=api_key, - payload=payload, - per_claim=per_claim, - errors=errors, - ) + with provably_self_egress(): + with httpx.Client(timeout=timeout_s) as client: + # Phase 1+2: per-claim compare; capture TPT (TVT comes after verify in phase 3). + for claim in payload.claims: + query_record_id = (claim.query_record_id or "").strip() + if not query_record_id: + err = "missing query_record_id" + errors.append(f"{claim.action_name}: {err}") + per_claim.append(_error(claim, err)) + continue + try: + record = _fetch_query_record(client, base_url, org, query_record_id, api_key) + except Exception as exc: # noqa: BLE001 + # Record exists (we have a query_record_id) but can't be fetched — treat as + # unverifiable (suspicious), not a plain infra/setup error. + errors.append(f"{claim.action_name}: fetch failed: {exc}") + per_claim.append(_caught_verdict(claim, f"fetch failed: {exc}")) + continue + verdict = evaluate_claim(claim, extract_indexed_from_query_record(record)) + verdict["query_record_id"] = query_record_id + timing = _timing_from_query_record(record) + if timing: + verdict = {**verdict, **timing} + per_claim.append(verdict) + + # Phase 3 (final): /verify each unique query_record_id and refresh TVT timing. + _verify_and_refresh_timings( + client=client, + base_url=base_url, + org=org, + api_key=api_key, + payload=payload, + per_claim=per_claim, + errors=errors, + ) return {"outcome": _resolve_outcome(per_claim), "per_claim": per_claim, "errors": errors} diff --git a/src/provably/handoff/transport.py b/src/provably/handoff/transport.py index 85dcfec..6eddf3f 100644 --- a/src/provably/handoff/transport.py +++ b/src/provably/handoff/transport.py @@ -5,6 +5,7 @@ import httpx from provably.handoff.types import HandoffPayload +from provably.intercept._self_egress import provably_self_egress from provably.log import get_logger _log = get_logger(__name__) @@ -24,7 +25,8 @@ def post_handoff( body = handoff_payload.model_dump(mode="json") hdrs = {"Content-Type": "application/json", **(headers or {})} try: - resp = httpx.post(url, json=body, headers=hdrs, timeout=timeout_s) + with provably_self_egress(): + resp = httpx.post(url, json=body, headers=hdrs, timeout=timeout_s) resp.raise_for_status() except Exception as e: _log.error("post_handoff_failed", url=url, error=str(e)) diff --git a/src/provably/intercept/__init__.py b/src/provably/intercept/__init__.py index ac1d063..f301962 100644 --- a/src/provably/intercept/__init__.py +++ b/src/provably/intercept/__init__.py @@ -1,6 +1,7 @@ """Intercept phase: monkey-patch ``requests`` + ``httpx`` and record responses into Postgres.""" from ._loader import load_latest_intercept_payload as load_latest_intercept_payload +from ._self_egress import provably_self_egress as provably_self_egress from .interceptor import ( clear_intercept_row_ids as clear_intercept_row_ids, ) @@ -23,6 +24,7 @@ "enable", "init_interceptor", "is_enabled", + "provably_self_egress", "set_intercept_body_hook", "set_intercept_url_allowlist", "set_interceptor_context", diff --git a/src/provably/intercept/_reentry.py b/src/provably/intercept/_reentry.py new file mode 100644 index 0000000..848c436 --- /dev/null +++ b/src/provably/intercept/_reentry.py @@ -0,0 +1,25 @@ +"""Re-entry guard: prevent double-recording when module-level calls delegate to Client.send.""" + +import contextvars + +_in_intercept: contextvars.ContextVar[bool] = contextvars.ContextVar( + "provably_in_intercept", default=False +) + + +def already_recording() -> bool: + """Return True if the current task/thread is already inside an intercept wrapper.""" + return _in_intercept.get() + + +class recording_scope: + """Context manager that marks the current task/thread as 'currently inside an intercept + wrapper', so deeper layers (e.g. Client.send when called from httpx.get) can skip + duplicate recording.""" + + def __enter__(self) -> "recording_scope": + self._token = _in_intercept.set(True) + return self + + def __exit__(self, *exc: object) -> None: + _in_intercept.reset(self._token) diff --git a/src/provably/intercept/_self_egress.py b/src/provably/intercept/_self_egress.py new file mode 100644 index 0000000..5783c7a --- /dev/null +++ b/src/provably/intercept/_self_egress.py @@ -0,0 +1,28 @@ +"""Self-egress exemption: SDK-internal HTTP calls bypass intercept recording and trust checks.""" + +import contextvars +from collections.abc import Generator +from contextlib import contextmanager + +_self_egress: contextvars.ContextVar[bool] = contextvars.ContextVar( + "provably_self_egress", default=False +) + + +def is_self_egress() -> bool: + """Return True if the current task/thread is inside a provably_self_egress() block.""" + return _self_egress.get() + + +@contextmanager +def provably_self_egress() -> Generator[None, None, None]: + """Mark a block of code as SDK-internal egress: skip trust check AND skip recording. + + Used by handoff.transport, handoff.evaluator, handoff._bootstrap (via handoff._http) + when they make their own httpx / requests calls so the SDK doesn't trip its own gate. + """ + token = _self_egress.set(True) + try: + yield + finally: + _self_egress.reset(token) diff --git a/src/provably/intercept/_storage.py b/src/provably/intercept/_storage.py index edac669..d8d6faa 100644 --- a/src/provably/intercept/_storage.py +++ b/src/provably/intercept/_storage.py @@ -10,6 +10,7 @@ import psycopg2 from provably.handoff._preprocess import preprocess_after_intercept_write +from provably.intercept._self_egress import is_self_egress from provably.log import get_logger from provably.trusted_endpoints import ensure_trusted_endpoints_table, is_trusted_endpoint @@ -112,12 +113,13 @@ def insert_intercept_row( agent_id: str, action_name: str, ) -> int | None: - """Insert a row and return its id; enforce trust on GET before storing.""" + """Insert a row and return its id; enforce trust on all methods before storing.""" + if is_self_egress(): + return None postgres_url = os.getenv("POSTGRES_URL", "").strip() if not postgres_url: return None - if method.upper() == "GET": - _require_trusted_endpoint(postgres_url, url) + _require_trusted_endpoint(postgres_url, url) return _write_row(postgres_url, url, method, request_payload, raw, agent_id, action_name) diff --git a/src/provably/intercept/interceptor.py b/src/provably/intercept/interceptor.py index 01afa0f..225c232 100644 --- a/src/provably/intercept/interceptor.py +++ b/src/provably/intercept/interceptor.py @@ -2,28 +2,34 @@ from __future__ import annotations +import json import threading from collections.abc import Callable from contextvars import ContextVar from typing import Any +from urllib.parse import parse_qs, urlsplit import httpx import requests +try: + import aiohttp as _aiohttp +except ImportError: + _aiohttp = None # type: ignore[assignment] + +from provably.intercept._reentry import already_recording, recording_scope from provably.intercept._responses import ( HttpxJsonOverride, RequestsJsonOverride, extract_raw, ) +from provably.intercept._self_egress import is_self_egress from provably.intercept._storage import ( insert_intercept_row, request_payload_dict, ) from provably.trusted_endpoints import normalize_url_for_trust -_RequestsJsonOverride = RequestsJsonOverride -_HttpxJsonOverride = HttpxJsonOverride - _ctx_agent_id: ContextVar[str] = ContextVar("provably_agent_id", default="") _ctx_action_name: ContextVar[str] = ContextVar("provably_action_name", default="") _ctx_intercept_index: ContextVar[int] = ContextVar("provably_intercept_index", default=0) @@ -120,7 +126,9 @@ def init_interceptor() -> None: Replaces ``requests.get``/``requests.post`` and ``httpx.get``/``httpx.post`` with wrapped versions that record the response into ``provably_intercepts`` and optionally pass it - through the simulation hook. The patch is one-way (use :func:`disable` to short-circuit + through the simulation hook. Also patches ``httpx.Client.send``, ``httpx.AsyncClient.send``, + and ``requests.Session.send`` as lower-level choke points so instance-based and async + usage is also intercepted. The patch is one-way (use :func:`disable` to short-circuit rather than uninstall) and flips ``enabled=True``. """ global _initialized, _enabled @@ -130,10 +138,24 @@ def init_interceptor() -> None: _orig["requests_post"] = requests.post _orig["httpx_get"] = httpx.get _orig["httpx_post"] = httpx.post + _orig["httpx_client_send"] = httpx.Client.send + _orig["httpx_async_client_send"] = httpx.AsyncClient.send + _orig["requests_session_send"] = requests.Session.send + # Module-level convenience patches (kept for backward compatibility) requests.get = _wrap_call(_orig["requests_get"], "GET") requests.post = _wrap_call(_orig["requests_post"], "POST") httpx.get = _wrap_call(_orig["httpx_get"], "GET") httpx.post = _wrap_call(_orig["httpx_post"], "POST") + # Lower-level instance method patches (cover async, Client(), Session()) + requests.Session.send = _wrap_session_send(_orig["requests_session_send"]) + httpx.Client.send = _wrap_client_send(_orig["httpx_client_send"]) + httpx.AsyncClient.send = _wrap_async_client_send(_orig["httpx_async_client_send"]) + # Soft-dep: aiohttp is not a hard dependency of this SDK. When present, patch the + # central ClientSession._request choke point that every aiohttp call routes through + # (LiteLLM's default transport, optional Google GenAI / Google ADK paths, etc.). + if _aiohttp is not None: + _orig["aiohttp_session_request"] = _aiohttp.ClientSession._request + _aiohttp.ClientSession._request = _wrap_aiohttp_request(_orig["aiohttp_session_request"]) _initialized = True _enabled = True @@ -160,19 +182,21 @@ def is_enabled() -> bool: def _insert_row(url: str, request_payload: dict[str, Any], raw: Any, *, method: str = "GET") -> None: if not _enabled: return + agent_id = _ctx_agent_id.get() or "unknown" + action_name = _ctx_action_name.get() or "unknown" row_id = insert_intercept_row( url=url, method=method, request_payload=request_payload, raw=raw, - agent_id=_ctx_agent_id.get() or "unknown", - action_name=_ctx_action_name.get() or "unknown", + agent_id=agent_id, + action_name=action_name, ) if row_id is not None: global _last_intercept_row_id with _intercept_lock: _last_intercept_row_id = row_id - _action_row_ids[(_ctx_agent_id.get() or "unknown", _ctx_action_name.get() or "unknown")] = row_id + _action_row_ids[(agent_id, action_name)] = row_id def _maybe_transform_body(raw: Any) -> Any: @@ -183,31 +207,180 @@ def _maybe_transform_body(raw: Any) -> Any: def _attach(response: Any, url: str, method: str, req_kwargs: dict[str, Any]) -> Any: - raw = extract_raw(response) + """Record an httpx/requests response and optionally mutate it via the simulation hook. + + Short-circuits immediately when inside a self-egress block or already in the middle of + recording (re-entry guard prevents double-recording when module-level calls like + ``httpx.get`` internally delegate to a patched ``Client.send``). + + For aiohttp responses, see :func:`_wrap_aiohttp_request` which awaits the body + asynchronously before calling :func:`_record_and_maybe_tamper` directly. + """ + if is_self_egress() or already_recording(): + return response + return _record_and_maybe_tamper(response, url, method, req_kwargs, extract_raw(response)) + + +def _record_and_maybe_tamper( + response: Any, url: str, method: str, req_kwargs: dict[str, Any], raw: Any +) -> Any: + """Inside-the-gate recording path shared by sync (_attach) and async (aiohttp) callers. + + Callers must have already cleared self-egress / re-entry guards. ``raw`` is the + pre-extracted body (sync from ``extract_raw``, or awaited for aiohttp). + """ req = request_payload_dict(url, method, req_kwargs) nurl = normalize_url_for_trust(str(url)) if _url_allowlist is not None and nurl not in _url_allowlist: return response - # Recording: any request that passed the allowlist gate (including legacy mode when - # allowlist is None — all outbound traffic). if _enabled: _insert_row(url, req, raw, method=method) - # Simulation tamper hook: only for explicit run endpoints, never for OpenRouter, - # Provably API, cluster handoff posts, etc. (those run with allowlist cleared or off-list). + # Tamper hook fires only for explicit run endpoints; never for OpenRouter, Provably API, + # cluster handoff posts (those run with allowlist cleared or off-list). tamper = _url_allowlist is not None and nurl in _url_allowlist mutated = _maybe_transform_body(raw) if tamper else raw if mutated is raw: return response if isinstance(response, requests.Response): - return _RequestsJsonOverride(response, mutated) + return RequestsJsonOverride(response, mutated) if isinstance(response, httpx.Response): - return _HttpxJsonOverride(response, mutated) + return HttpxJsonOverride(response, mutated) + # aiohttp.ClientResponse and other libraries: body override not supported, return as-is. return response def _wrap_call(orig_fn, method: str): + """Wrap a module-level convenience function (e.g. httpx.get, requests.post). + + Sets ``recording_scope`` BEFORE calling the original function so that any + lower-level ``Client.send`` / ``Session.send`` wrapper that fires during the + same call sees ``already_recording() == True`` and skips duplicate recording. + After the original returns, ``_attach`` runs in the outer scope (where + ``already_recording()`` is now False again) to do the actual recording. + """ + def wrapped(url, *args, **kwargs): - response = orig_fn(url, *args, **kwargs) + with recording_scope(): + response = orig_fn(url, *args, **kwargs) + # recording_scope has exited; _attach will record this call return _attach(response, str(url), method, dict(kwargs)) return wrapped + + +def _decode_body_into_kwargs(kwargs: dict[str, Any], body: bytes | str | None, content_type: str) -> None: + """Decode a raw request body into the kwargs shape ``request_payload_dict`` understands. + + JSON bodies (when Content-Type contains ``application/json``) are parsed and stored under + ``json``; everything else is decoded to text and stored under ``data``. Mutates ``kwargs``. + """ + if body is None or body == b"": + return + text = body if isinstance(body, str) else body.decode(errors="replace") + if "application/json" in content_type: + try: + kwargs["json"] = json.loads(text) + return + except Exception: # noqa: BLE001 + pass + kwargs["data"] = text + + +def _httpx_request_to_kwargs(req: httpx.Request) -> dict[str, Any]: + """Extract httpx.Request metadata into the kwargs shape request_payload_dict understands.""" + kwargs: dict[str, Any] = {} + params = dict(req.url.params) + if params: + kwargs["params"] = params + _decode_body_into_kwargs(kwargs, req.content, req.headers.get("content-type", "")) + return kwargs + + +def _requests_prepared_to_kwargs(req: requests.PreparedRequest) -> dict[str, Any]: + """Extract PreparedRequest metadata into the kwargs shape request_payload_dict understands.""" + kwargs: dict[str, Any] = {} + parsed = urlsplit(str(req.url or "")) + if parsed.query: + raw_params = parse_qs(parsed.query, keep_blank_values=True) + # Flatten single-value lists to scalars (matches typical kwargs["params"] shape) + kwargs["params"] = {k: (v[0] if len(v) == 1 else v) for k, v in raw_params.items()} + _decode_body_into_kwargs(kwargs, req.body, (req.headers or {}).get("Content-Type", "") or "") + return kwargs + + +def _wrap_client_send(orig_send): + """Wrap httpx.Client.send to record responses via _attach.""" + + def wrapped(self, request: httpx.Request, **kwargs: Any) -> httpx.Response: + response = orig_send(self, request, **kwargs) + return _attach(response, str(request.url), request.method, _httpx_request_to_kwargs(request)) + + return wrapped + + +def _wrap_async_client_send(orig_send): + """Wrap httpx.AsyncClient.send to record responses via _attach.""" + + async def wrapped(self, request: httpx.Request, **kwargs: Any) -> httpx.Response: + response = await orig_send(self, request, **kwargs) + return _attach(response, str(request.url), request.method, _httpx_request_to_kwargs(request)) + + return wrapped + + +def _wrap_session_send(orig_send): + """Wrap requests.Session.send to record responses via _attach.""" + + def wrapped(self, request: requests.PreparedRequest, **kwargs: Any) -> requests.Response: + response = orig_send(self, request, **kwargs) + return _attach(response, str(request.url), request.method or "GET", _requests_prepared_to_kwargs(request)) + + return wrapped + + +_AIOHTTP_KWARG_KEYS = ("params", "json", "data") + + +def _aiohttp_kwargs_to_kwargs(kwargs: dict[str, Any]) -> dict[str, Any]: + """Filter aiohttp _request kwargs down to the subset request_payload_dict cares about.""" + return {k: v for k, v in kwargs.items() if k in _AIOHTTP_KWARG_KEYS and v is not None} + + +def _wrap_aiohttp_request(orig_request): + """Wrap aiohttp.ClientSession._request — the central method every aiohttp call routes through. + + Async wrapper: awaits the original to get a ClientResponse, then awaits ``response.read()`` + to populate aiohttp's body cache (so user code calling ``.json()`` / ``.text()`` later + still works), parses the body, and routes through ``_record_and_maybe_tamper``. + + Body override (the simulation tamper hook) is not supported for aiohttp responses — the + response is returned as-is. Recording works in full. + """ + + async def wrapped(self, method: str, str_or_url: Any, **kwargs: Any) -> Any: + response = await orig_request(self, method, str_or_url, **kwargs) + if is_self_egress() or already_recording(): + return response + with recording_scope(): + try: + body_bytes = await response.read() + except Exception: # noqa: BLE001 + body_bytes = b"" + content_type = response.headers.get("Content-Type", "") if response.headers else "" + text = body_bytes.decode(errors="replace") if body_bytes else "" + if "application/json" in content_type and text: + try: + raw = json.loads(text) + except Exception: # noqa: BLE001 + raw = {"text": text} + else: + raw = {"text": text} + return _record_and_maybe_tamper( + response, + str(str_or_url), + method, + _aiohttp_kwargs_to_kwargs(kwargs), + raw, + ) + + return wrapped diff --git a/tests/conftest.py b/tests/conftest.py index 2d79b1c..081d8f6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -18,3 +18,24 @@ """ from __future__ import annotations + +from collections.abc import Iterator + +import pytest + +from tests.e2e.conftest import FakeHttpServer + + +@pytest.fixture +def fake_server() -> Iterator[FakeHttpServer]: + """Loopback HTTP server fixture available to both unit and e2e tests. + + Provides a real in-process loopback server so tests can drive SDK HTTP patches + without any external network access. + """ + server = FakeHttpServer() + server.start() + try: + yield server + finally: + server.stop() diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 1b0e3de..3678d15 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -16,6 +16,8 @@ import pytest +import provably.intercept.interceptor as _interceptor_module + Handler = Callable[["RecordedRequest"], "FakeResponse"] @@ -117,10 +119,39 @@ def stop(self) -> None: @pytest.fixture -def fake_server() -> Iterator[FakeHttpServer]: - server = FakeHttpServer() - server.start() - try: - yield server - finally: - server.stop() +def fake_server_factory() -> Iterator[Callable[[], FakeHttpServer]]: + """Factory fixture that creates and tracks multiple FakeHttpServer instances. + + Each call to the returned factory function starts a new server; all servers + are shut down after the test finishes. + """ + servers: list[FakeHttpServer] = [] + + def _make() -> FakeHttpServer: + s = FakeHttpServer() + s.start() + servers.append(s) + return s + + yield _make + + for s in servers: + s.stop() + + +@pytest.fixture +def patched_interceptor(monkeypatch: pytest.MonkeyPatch) -> list[dict[str, Any]]: + """Install the real interceptor with the storage layer redirected to an in-memory list. + + Yields the list of recorded rows so tests can assert against it. + This fixture is shared between test_interceptor_e2e.py and test_openai_agents_e2e.py. + """ + rows: list[dict[str, Any]] = [] + + def fake_insert(_url: str, request_payload: dict, raw: Any, *, method: str = "GET") -> None: + rows.append({"url": _url, "method": method, "request": request_payload, "raw": raw}) + + monkeypatch.setattr(_interceptor_module, "_insert_row", fake_insert) + _interceptor_module.init_interceptor() + monkeypatch.setattr(_interceptor_module, "_enabled", True) + return rows diff --git a/tests/e2e/test_interceptor_e2e.py b/tests/e2e/test_interceptor_e2e.py index bf99547..8d4f860 100644 --- a/tests/e2e/test_interceptor_e2e.py +++ b/tests/e2e/test_interceptor_e2e.py @@ -24,23 +24,6 @@ from tests.e2e.conftest import FakeHttpServer -@pytest.fixture -def patched_interceptor(monkeypatch: pytest.MonkeyPatch) -> list[dict[str, Any]]: - """Install the real interceptor with the storage layer redirected to an in-memory list. - - Yields the list of recorded rows so tests can assert against it. - """ - rows: list[dict[str, Any]] = [] - - def fake_insert(_url: str, request_payload: dict, raw: Any, *, method: str = "GET") -> None: - rows.append({"url": _url, "method": method, "request": request_payload, "raw": raw}) - - monkeypatch.setattr(interceptor, "_insert_row", fake_insert) - interceptor.init_interceptor() - monkeypatch.setattr(interceptor, "_enabled", True) - return rows - - @pytest.mark.e2e def test_requests_get_records_raw_response( fake_server: FakeHttpServer, patched_interceptor: list[dict[str, Any]] diff --git a/tests/e2e/test_openai_agents_e2e.py b/tests/e2e/test_openai_agents_e2e.py new file mode 100644 index 0000000..d3c48aa --- /dev/null +++ b/tests/e2e/test_openai_agents_e2e.py @@ -0,0 +1,391 @@ +"""End-to-end tests for OpenAI Agents SDK integration. + +Six scenarios drive a fake LLM server + fake data server through the full +intercept → handoff → evaluate flow: + + A – happy path: all URLs trusted, evaluate_handoff → PASS + B – tampered claim: claimed_value wrong → CAUGHT + C – untrusted GET: data URL not in allowlist → tool call raises BLOCKED + D – untrusted POST: LLM URL not in allowlist → LLM POST raises BLOCKED + E – self-egress exemption: Provably backend NOT in allowlist; evaluate_handoff still completes + F – async LLM coverage: at least one recorded POST row from the LLM server + +Trust-gate testing notes (scenarios C and D): + ``patched_interceptor`` replaces ``_insert_row`` entirely, bypassing the + ``_storage.insert_intercept_row`` path (and therefore the trust check). For C and D the + trust check must fire while still avoiding a real DB write, so those scenarios use + ``patched_interceptor_with_trust`` which patches ``_require_trusted_endpoint`` + + ``_write_row`` instead and sets a sentinel ``POSTGRES_URL``. +""" + +from __future__ import annotations + +from typing import Any + +import pytest +import requests as requests_lib +from agents import Agent, Runner, function_tool, set_default_openai_api, set_default_openai_client +from openai import AsyncOpenAI + +import provably.intercept._storage as _storage_module +import provably.intercept.interceptor as _interceptor_module +from provably.handoff.evaluator import evaluate_handoff +from provably.handoff.types import HandoffClaim, HandoffPayload +from provably.trusted_endpoints import normalize_url_for_trust +from tests.e2e.conftest import FakeHttpServer, FakeResponse + +# --------------------------------------------------------------------------- +# Fake LLM ChatCompletion JSON responses +# --------------------------------------------------------------------------- + + +def _tool_call_response(tool_name: str, call_id: str = "call_001", arguments: str = "{}") -> dict: + """First LLM turn: respond with a tool_call.""" + return { + "id": "chatcmpl-test-turn1", + "object": "chat.completion", + "created": 1700000000, + "model": "gpt-4o-mini", + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": None, + "tool_calls": [ + { + "id": call_id, + "type": "function", + "function": {"name": tool_name, "arguments": arguments}, + } + ], + }, + "finish_reason": "tool_calls", + "logprobs": None, + } + ], + "usage": {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15}, + } + + +def _final_response(content: str = "The temperature is 21 degrees Celsius.") -> dict: + """Second LLM turn: final assistant message.""" + return { + "id": "chatcmpl-test-turn2", + "object": "chat.completion", + "created": 1700000001, + "model": "gpt-4o-mini", + "choices": [ + { + "index": 0, + "message": {"role": "assistant", "content": content, "tool_calls": None}, + "finish_reason": "stop", + "logprobs": None, + } + ], + "usage": {"prompt_tokens": 20, "completion_tokens": 10, "total_tokens": 30}, + } + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def fake_llm_server(fake_server_factory) -> FakeHttpServer: + """Fake LLM: tool_call response on the first POST, final assistant message after that.""" + server = fake_server_factory() + call_count = [0] + + def _handle_chat(_req): + call_count[0] += 1 + body = _tool_call_response("get_temperature") if call_count[0] == 1 else _final_response() + return FakeResponse(status=200, body=body) + + server.route("POST", "/v1/chat/completions", _handle_chat) + return server + + +@pytest.fixture +def fake_data_server(fake_server_factory) -> FakeHttpServer: + """Fake data API: GET /v1/temperature → {'celsius': 21}.""" + server = fake_server_factory() + server.respond("GET", "/v1/temperature", status=200, body={"celsius": 21}) + return server + + +def _trusted_check(allowlist: set[str], url: str) -> None: + """Prefix-aware trust check. + + Mirrors real-world usage where users register a service base URL (e.g. + ``http://127.0.0.1:9000``) and any path under that base is allowed. + """ + norm = normalize_url_for_trust(url) + for trusted in allowlist: + if norm == trusted or norm.startswith(trusted + "/") or norm.startswith(trusted + "?"): + return + raise RuntimeError(f"BLOCKED: {url} not in trusted_endpoints") + + +@pytest.fixture +def fake_trusted_endpoints(monkeypatch: pytest.MonkeyPatch) -> set[str]: + """In-memory allowlist for ``_require_trusted_endpoint``. + + Only sufficient when paired with ``patched_interceptor`` (which bypasses + ``insert_intercept_row``). For scenarios needing the real trust gate to fire (C, D), + use ``patched_interceptor_with_trust``. + """ + allowlist: set[str] = set() + monkeypatch.setattr( + "provably.intercept._storage._require_trusted_endpoint", + lambda _pg, url: _trusted_check(allowlist, url), + ) + return allowlist + + +@pytest.fixture +def patched_interceptor_with_trust(monkeypatch: pytest.MonkeyPatch) -> tuple[list[dict[str, Any]], set[str]]: + """Like ``patched_interceptor`` but with the trust gate active. + + Lets ``insert_intercept_row`` actually run so ``_require_trusted_endpoint`` fires; avoids + a real Postgres connection by: + 1. Setting a sentinel ``POSTGRES_URL`` so the early-return guard is skipped. + 2. Replacing ``_require_trusted_endpoint`` with the in-memory prefix check. + 3. Replacing ``_write_row`` with a no-op that records to an in-memory list. + """ + rows: list[dict[str, Any]] = [] + allowlist: set[str] = set() + + monkeypatch.setenv("POSTGRES_URL", "postgresql://fake-host/fake-db") + monkeypatch.setattr( + _storage_module, "_require_trusted_endpoint", lambda _pg, url: _trusted_check(allowlist, url) + ) + monkeypatch.setattr( + _storage_module, + "_write_row", + lambda *_a, **_kw: rows.append({"url": _a[1], "method": _a[2], "request": _a[3], "raw": _a[4]}), + ) + + _interceptor_module.init_interceptor() + monkeypatch.setattr(_interceptor_module, "_enabled", True) + return rows, allowlist + + +# --------------------------------------------------------------------------- +# Test helpers +# --------------------------------------------------------------------------- + + +def _seed_allowlist(allowlist: set[str], *servers: FakeHttpServer) -> None: + """Seed an allowlist with the normalized base URLs of the given fake servers.""" + for s in servers: + allowlist.add(normalize_url_for_trust(s.base_url)) + + +def _configure_agent_client(fake_llm_server: FakeHttpServer) -> None: + """Point the agents SDK at the fake LLM server and force chat completions mode.""" + client = AsyncOpenAI(base_url=f"{fake_llm_server.base_url}/v1", api_key="test-key") + set_default_openai_client(client, use_for_tracing=False) + set_default_openai_api("chat_completions") + + +def _make_weather_agent( + fake_data_server: FakeHttpServer, *, fail_loudly: bool = False +) -> Agent: + """Build the standard weather agent that GETs ``/v1/temperature`` from the fake data server. + + ``fail_loudly=True`` sets ``failure_error_function=None`` on the tool so that exceptions + raised inside the tool propagate up through ``Runner.run`` instead of being converted + into LLM-visible error strings (used by scenario C). + """ + data_url = f"{fake_data_server.base_url}/v1/temperature" + decorator = function_tool(failure_error_function=None) if fail_loudly else function_tool + + @decorator + def get_temperature() -> dict: + """Get the current temperature.""" + return requests_lib.get(data_url).json() + + return Agent( + name="weather-agent", + instructions="Use the get_temperature tool and report the result.", + tools=[get_temperature], + model="gpt-4o-mini", + ) + + +def _make_provably_backend( + fake_server_factory, query_record_id: str, indexed_value: dict +) -> FakeHttpServer: + """Spin up a fake Provably backend that resolves one query record + one verify call.""" + server = fake_server_factory() + base = f"/api/v1/organizations/org-1/queries/{query_record_id}" + server.respond("GET", base, status=200, body={"result": indexed_value}) + server.respond("POST", f"{base}/verify", status=200, body={"verified": True}) + return server + + +def _make_payload(query_record_id: str, claimed_value: dict) -> HandoffPayload: + return HandoffPayload( + provably_org_id="org-1", + integration_api_key="key-abc", + claims=[ + HandoffClaim( + action_name="get_temperature", + claimed_value=claimed_value, + query_record_id=query_record_id, + ) + ], + ) + + +def _exception_chain_contains(exc: BaseException, pattern: str) -> bool: + """Walk ``__cause__``/``__context__`` looking for ``pattern`` (the agents SDK wraps + ``RuntimeError`` from ``AsyncClient.send`` as ``APIConnectionError``).""" + seen: set[int] = set() + curr: BaseException | None = exc + while curr is not None and id(curr) not in seen: + seen.add(id(curr)) + if pattern in str(curr): + return True + curr = curr.__cause__ if curr.__cause__ is not None else curr.__context__ + return False + + +# --------------------------------------------------------------------------- +# Scenarios +# --------------------------------------------------------------------------- + + +@pytest.mark.e2e +async def test_openai_agent_intercepts_and_handoff_passes( + fake_llm_server: FakeHttpServer, + fake_data_server: FakeHttpServer, + fake_server_factory, + patched_interceptor: list[dict[str, Any]], + fake_trusted_endpoints: set[str], +) -> None: + """A — both URLs trusted; agent runs; evaluate_handoff → PASS.""" + _seed_allowlist(fake_trusted_endpoints, fake_llm_server, fake_data_server) + _configure_agent_client(fake_llm_server) + + result = await Runner.run(_make_weather_agent(fake_data_server), "What's the temp?") + assert result is not None + + methods = [r["method"] for r in patched_interceptor] + assert methods.count("POST") >= 1, f"Expected ≥1 LLM POST, got {methods}" + assert methods.count("GET") >= 1, f"Expected ≥1 data GET, got {methods}" + + fake_provably = _make_provably_backend(fake_server_factory, "q1", {"celsius": 21}) + eval_result = evaluate_handoff( + _make_payload("q1", {"celsius": 21}), provably_base_url=fake_provably.base_url + ) + assert eval_result["outcome"] == "PASS", f"Expected PASS, got: {eval_result}" + assert eval_result["per_claim"][0]["result"] == "PASS" + + +@pytest.mark.e2e +async def test_evaluate_catches_wrong_claim( + fake_llm_server: FakeHttpServer, + fake_data_server: FakeHttpServer, + fake_server_factory, + patched_interceptor: list[dict[str, Any]], + fake_trusted_endpoints: set[str], +) -> None: + """B — happy run, but the claim's value disagrees with the indexed record → CAUGHT.""" + _seed_allowlist(fake_trusted_endpoints, fake_llm_server, fake_data_server) + _configure_agent_client(fake_llm_server) + + await Runner.run(_make_weather_agent(fake_data_server), "What's the temp?") + + fake_provably = _make_provably_backend(fake_server_factory, "q1", {"celsius": 21}) + eval_result = evaluate_handoff( + _make_payload("q1", {"celsius": 99}), # tampered claim + provably_base_url=fake_provably.base_url, + ) + assert eval_result["outcome"] == "CAUGHT", f"Expected CAUGHT, got: {eval_result}" + assert eval_result["per_claim"][0]["result"] == "CAUGHT" + + +@pytest.mark.e2e +async def test_untrusted_get_blocks_request( + fake_llm_server: FakeHttpServer, + fake_data_server: FakeHttpServer, + patched_interceptor_with_trust: tuple[list[dict[str, Any]], set[str]], +) -> None: + """C — data URL not in allowlist → tool GET raises BLOCKED.""" + _, allowlist = patched_interceptor_with_trust + _seed_allowlist(allowlist, fake_llm_server) # data server omitted on purpose + _configure_agent_client(fake_llm_server) + + with pytest.raises((RuntimeError, Exception), match="BLOCKED"): + await Runner.run(_make_weather_agent(fake_data_server, fail_loudly=True), "What's the temp?") + + +@pytest.mark.e2e +async def test_untrusted_post_blocks_llm_call( + fake_llm_server: FakeHttpServer, + fake_data_server: FakeHttpServer, + patched_interceptor_with_trust: tuple[list[dict[str, Any]], set[str]], +) -> None: + """D — LLM URL not in allowlist → LLM POST raises BLOCKED (validates POST trust gate). + + The BLOCKED RuntimeError is raised inside ``_attach()`` during ``AsyncClient.send``, + which the agents SDK wraps as ``APIConnectionError`` — we walk the exception chain. + """ + _, allowlist = patched_interceptor_with_trust + _seed_allowlist(allowlist, fake_data_server) # LLM server omitted on purpose + _configure_agent_client(fake_llm_server) + + with pytest.raises(Exception) as exc_info: + await Runner.run(_make_weather_agent(fake_data_server), "What's the temp?") + + assert _exception_chain_contains(exc_info.value, "BLOCKED"), ( + f"Expected 'BLOCKED' in exception chain. Got: {exc_info.value!r}" + ) + + +@pytest.mark.e2e +def test_self_egress_completes_without_trust( + fake_server_factory, + patched_interceptor: list[dict[str, Any]], + fake_trusted_endpoints: set[str], +) -> None: + """E — Provably backend NOT in trusted_endpoints; evaluate_handoff still completes. + + The SDK wraps its own egress in ``provably_self_egress()`` so the trust gate never fires + on internal calls. + """ + assert len(fake_trusted_endpoints) == 0 # nothing trusted + fake_provably = _make_provably_backend(fake_server_factory, "q1", {"x": 1}) + + eval_result = evaluate_handoff( + _make_payload("q1", {"x": 1}), provably_base_url=fake_provably.base_url + ) + assert eval_result["outcome"] == "PASS", f"Expected PASS, got: {eval_result}" + + +@pytest.mark.e2e +async def test_async_llm_call_intercepted( + fake_llm_server: FakeHttpServer, + fake_data_server: FakeHttpServer, + patched_interceptor: list[dict[str, Any]], + fake_trusted_endpoints: set[str], +) -> None: + """F — at least one recorded POST row points at the fake LLM server (proves + ``AsyncClient.send`` patch fires).""" + _seed_allowlist(fake_trusted_endpoints, fake_llm_server, fake_data_server) + _configure_agent_client(fake_llm_server) + + await Runner.run(_make_weather_agent(fake_data_server), "What's the temp?") + + llm_base = normalize_url_for_trust(fake_llm_server.base_url) + post_llm_rows = [ + r for r in patched_interceptor + if r["method"] == "POST" and llm_base in normalize_url_for_trust(r["url"]) + ] + assert post_llm_rows, ( + f"Expected ≥1 POST row to LLM server {llm_base}. " + f"Recorded: {[(r['method'], r['url']) for r in patched_interceptor]}" + ) diff --git a/tests/unit/test_interceptor.py b/tests/unit/test_interceptor.py index 5825e4b..9ab09f5 100644 --- a/tests/unit/test_interceptor.py +++ b/tests/unit/test_interceptor.py @@ -2,9 +2,14 @@ from typing import Any +import httpx +import pytest import requests +import provably.intercept._storage as storage import provably.intercept.interceptor as interceptor +from provably.intercept._responses import RequestsJsonOverride +from provably.intercept._self_egress import provably_self_egress def test_insert_row_receives_raw_before_mutation(monkeypatch: Any) -> None: @@ -29,7 +34,7 @@ def fake_mutate(raw: Any) -> Any: out = interceptor._attach(resp, "https://example.com/x", "GET", {}) assert captured == [{"original": True}] - assert isinstance(out, interceptor._RequestsJsonOverride) + assert isinstance(out, RequestsJsonOverride) assert out.json() == {"user_edited": True} finally: interceptor.set_intercept_url_allowlist(None) @@ -79,3 +84,237 @@ def fake_insert(url: str, *_a: Any, **_k: Any) -> None: assert out is resp finally: interceptor.set_intercept_url_allowlist(None) + + +# --------------------------------------------------------------------------- +# Phase 1 additions: Client.send / AsyncClient.send / Session.send coverage, +# re-entry guard, and self-egress exemption. +# --------------------------------------------------------------------------- + + +def _make_fake_insert(rows: list[dict[str, Any]]): + """Return a fake _insert_row that appends to ``rows``.""" + + def fake_insert(url: str, request_payload: dict, raw: Any, *, method: str = "GET") -> None: + rows.append({"url": url, "method": method, "request": request_payload, "raw": raw}) + + return fake_insert + + +def _setup_interceptor(monkeypatch: Any, rows: list[dict[str, Any]]) -> None: + """Patch _insert_row and enable interceptor; call init_interceptor to ensure patches are installed.""" + monkeypatch.setattr(interceptor, "_insert_row", _make_fake_insert(rows)) + interceptor.init_interceptor() + monkeypatch.setattr(interceptor, "_enabled", True) + + +def test_httpx_client_send_intercepted(monkeypatch: Any, fake_server: Any) -> None: + """httpx.Client(...).get(url) records exactly one row via the Client.send patch.""" + fake_server.respond("GET", "/data", status=200, body={"from_client": True}) + rows: list[dict[str, Any]] = [] + _setup_interceptor(monkeypatch, rows) + + client = httpx.Client() + response = client.get(f"{fake_server.base_url}/data") + client.close() + + assert response.status_code == 200 + assert len(rows) == 1 + assert rows[0]["method"] == "GET" + assert rows[0]["raw"] == {"from_client": True} + + +async def test_httpx_async_client_send_intercepted(monkeypatch: Any, fake_server: Any) -> None: + """await httpx.AsyncClient().get(url) records exactly one row via AsyncClient.send patch.""" + fake_server.respond("GET", "/async-data", status=200, body={"async": True}) + rows: list[dict[str, Any]] = [] + _setup_interceptor(monkeypatch, rows) + + async with httpx.AsyncClient() as client: + response = await client.get(f"{fake_server.base_url}/async-data") + + assert response.status_code == 200 + assert len(rows) == 1 + assert rows[0]["method"] == "GET" + assert rows[0]["raw"] == {"async": True} + + +def test_requests_session_send_intercepted(monkeypatch: Any, fake_server: Any) -> None: + """requests.Session().get(url) records exactly one row via Session.send patch.""" + fake_server.respond("GET", "/session-data", status=200, body={"from_session": True}) + rows: list[dict[str, Any]] = [] + _setup_interceptor(monkeypatch, rows) + + session = requests.Session() + response = session.get(f"{fake_server.base_url}/session-data") + + assert response.status_code == 200 + assert len(rows) == 1 + assert rows[0]["method"] == "GET" + assert rows[0]["raw"] == {"from_session": True} + + +def test_no_double_record_when_module_httpx_get_called(monkeypatch: Any, fake_server: Any) -> None: + """httpx.get(url) records exactly one row even though both module-level and Client.send patches are active.""" + fake_server.respond("GET", "/data", status=200, body={"v": 1}) + rows: list[dict[str, Any]] = [] + _setup_interceptor(monkeypatch, rows) + + httpx.get(f"{fake_server.base_url}/data") + + assert len(rows) == 1, f"Expected 1 row but got {len(rows)}: {rows}" + + +def test_no_double_record_when_module_requests_get_called(monkeypatch: Any, fake_server: Any) -> None: + """requests.get(url) records exactly one row even though both module-level and Session.send patches are active.""" + fake_server.respond("GET", "/data", status=200, body={"v": 2}) + rows: list[dict[str, Any]] = [] + _setup_interceptor(monkeypatch, rows) + + requests.get(f"{fake_server.base_url}/data") + + assert len(rows) == 1, f"Expected 1 row but got {len(rows)}: {rows}" + + +def test_self_egress_skips_recording(monkeypatch: Any, fake_server: Any) -> None: + """Inside with provably_self_egress(): no row is inserted.""" + fake_server.respond("GET", "/data", status=200, body={"v": 3}) + rows: list[dict[str, Any]] = [] + _setup_interceptor(monkeypatch, rows) + + with provably_self_egress(): + requests.get(f"{fake_server.base_url}/data") + + assert rows == [], f"Expected no rows but got: {rows}" + + +def test_self_egress_skips_trust_check(monkeypatch: Any, fake_server: Any) -> None: + """Inside with provably_self_egress(): an untrusted URL does NOT raise (storage layer short-circuits).""" + fake_server.respond("POST", "/untrusted", status=200, body={"ok": True}) + + trust_calls: list[str] = [] + + def fake_require(postgres_url: str, url: str) -> None: + trust_calls.append(url) + raise RuntimeError(f"BLOCKED: {url}") + + monkeypatch.setattr(storage, "_require_trusted_endpoint", fake_require) + monkeypatch.setenv("POSTGRES_URL", "postgresql://fake/db") + monkeypatch.setattr(interceptor, "_insert_row", _make_fake_insert([])) + interceptor.init_interceptor() + monkeypatch.setattr(interceptor, "_enabled", True) + + with provably_self_egress(): + # This should NOT raise even though _require_trusted_endpoint would block it + response = requests.post(f"{fake_server.base_url}/untrusted", json={"x": 1}) + + # Trust check was never called (self-egress bypassed storage entirely) + assert trust_calls == [] + assert response.status_code == 200 + + +# --------------------------------------------------------------------------- +# Storage-level trust gate tests (POST / DELETE coverage) +# --------------------------------------------------------------------------- + + +def test_trust_gate_fires_on_post(monkeypatch: Any) -> None: + """POST to an untrusted URL raises RuntimeError('BLOCKED: ...').""" + trust_calls: list[tuple[str, str]] = [] + + def fake_require(postgres_url: str, url: str) -> None: + trust_calls.append((postgres_url, url)) + raise RuntimeError(f"BLOCKED: {url} not in trusted index") + + monkeypatch.setattr(storage, "_require_trusted_endpoint", fake_require) + monkeypatch.setenv("POSTGRES_URL", "postgresql://fake/db") + + with pytest.raises(RuntimeError, match="BLOCKED"): + storage.insert_intercept_row( + url="https://untrusted.example/api", + method="POST", + request_payload={"url": "https://untrusted.example/api", "method": "POST"}, + raw={"data": 1}, + agent_id="ag", + action_name="act", + ) + + assert len(trust_calls) == 1 + assert trust_calls[0][1] == "https://untrusted.example/api" + + +# --------------------------------------------------------------------------- +# aiohttp coverage (soft dependency — only installs when aiohttp is importable) +# --------------------------------------------------------------------------- + +aiohttp = pytest.importorskip("aiohttp") + + +async def test_aiohttp_session_request_intercepted(monkeypatch: Any, fake_server: Any) -> None: + """aiohttp.ClientSession().get(url) records exactly one row via the _request patch.""" + fake_server.respond("GET", "/aiohttp-data", status=200, body={"from_aiohttp": True}) + rows: list[dict[str, Any]] = [] + _setup_interceptor(monkeypatch, rows) + + async with aiohttp.ClientSession() as session: + async with session.get(f"{fake_server.base_url}/aiohttp-data") as response: + assert response.status == 200 + data = await response.json() + assert data == {"from_aiohttp": True} + + assert len(rows) == 1, f"Expected 1 row, got {len(rows)}: {rows}" + assert rows[0]["method"] == "GET" + assert rows[0]["raw"] == {"from_aiohttp": True} + + +async def test_aiohttp_post_with_json_body_intercepted(monkeypatch: Any, fake_server: Any) -> None: + """POSTing JSON via aiohttp records request payload correctly.""" + fake_server.respond("POST", "/echo", status=200, body={"ok": True}) + rows: list[dict[str, Any]] = [] + _setup_interceptor(monkeypatch, rows) + + async with aiohttp.ClientSession() as session: + async with session.post(f"{fake_server.base_url}/echo", json={"k": "v"}) as response: + assert response.status == 200 + + assert len(rows) == 1 + assert rows[0]["method"] == "POST" + assert rows[0]["request"]["json"] == {"k": "v"} + + +async def test_aiohttp_self_egress_skips_recording(monkeypatch: Any, fake_server: Any) -> None: + """Inside provably_self_egress(): aiohttp call records nothing.""" + fake_server.respond("GET", "/data", status=200, body={"v": 9}) + rows: list[dict[str, Any]] = [] + _setup_interceptor(monkeypatch, rows) + + with provably_self_egress(): + async with aiohttp.ClientSession() as session: + async with session.get(f"{fake_server.base_url}/data") as _resp: + pass + + assert rows == [] + + +def test_trust_gate_fires_on_delete(monkeypatch: Any) -> None: + """DELETE to an untrusted URL raises RuntimeError('BLOCKED: ...') — all-methods coverage.""" + trust_calls: list[str] = [] + + def fake_require(postgres_url: str, url: str) -> None: + trust_calls.append(url) + raise RuntimeError(f"BLOCKED: {url}") + + monkeypatch.setattr(storage, "_require_trusted_endpoint", fake_require) + monkeypatch.setenv("POSTGRES_URL", "postgresql://fake/db") + + with pytest.raises(RuntimeError, match="BLOCKED"): + storage.insert_intercept_row( + url="https://untrusted.example/resource/1", + method="DELETE", + request_payload={"url": "https://untrusted.example/resource/1", "method": "DELETE"}, + raw={}, + agent_id="ag", + action_name="act", + ) + + assert trust_calls == ["https://untrusted.example/resource/1"]