Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,15 @@ Outcome semantics:
- **`CAUGHT`** — at least one claim disagreed with the indexed value or a proof failed.
- **`ERROR`** — the evaluator could not run (missing config, Provably backend unreachable, transient server error). Not evidence of tampering — the system was unhealthy, not the agent.

#### Getting `CAUGHT` and you don't expect to be?

`CAUGHT` means the indexed value the evaluator pulled from `provably_intercepts` doesn't match the claim. In practice when this surprises you, it's almost always one of:

1. **The tool body never ran.** `@function_tool` (or any agent-framework decorator) only registers the function — you still need an agent loop (e.g. `Runner.run(...)`) to invoke it. Bare LLM calls don't execute tools.
2. **`intercept_context(...)` was called without `with`.** It's a context manager; a bare call is a no-op (see the function's docstring).
3. **`agent_id` mismatch.** The `agent_id` you pass to `intercept_context(...)` inside the tool must match the `intercept_agent_id` you pass to `build_handoff_payload(...)` (default `"fetch_and_claim"`). Mismatch → the lookup misses → empty `request_payload`.
4. **Wrong row-id helper.** Use `get_intercept_row_id(agent_id, action_name)` to pick the row tagged with your action. `take_last_intercept_row_id()` returns the **globally** last insert (typically the final LLM POST), which is rarely what you want.

Comparison modes (the `VerificationMode` type):

| Mode | Comparison |
Expand Down
4 changes: 2 additions & 2 deletions src/provably/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
disable,
enable,
init_interceptor,
intercept_context,
is_enabled,
set_intercept_body_hook,
set_intercept_url_allowlist,
set_interceptor_context,
take_last_intercept_row_id,
)
from provably.runtime import configure_indexing
Expand Down Expand Up @@ -59,6 +59,7 @@
"field_descriptions",
"init_interceptor",
"initialize_runtime",
"intercept_context",
"is_enabled",
"is_trusted_endpoint",
"list_trusted_endpoints",
Expand All @@ -67,6 +68,5 @@
"post_handoff",
"set_intercept_body_hook",
"set_intercept_url_allowlist",
"set_interceptor_context",
"take_last_intercept_row_id",
]
4 changes: 2 additions & 2 deletions src/provably/intercept/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
disable,
enable,
init_interceptor,
intercept_context,
is_enabled,
set_intercept_body_hook,
set_intercept_url_allowlist,
set_interceptor_context,
take_last_intercept_row_id,
)
from .interceptor import (
Expand All @@ -22,9 +22,9 @@
"disable",
"enable",
"init_interceptor",
"intercept_context",
"is_enabled",
"set_intercept_body_hook",
"set_intercept_url_allowlist",
"set_interceptor_context",
"take_last_intercept_row_id",
]
47 changes: 39 additions & 8 deletions src/provably/intercept/interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from __future__ import annotations

import threading
from collections.abc import Callable
from collections.abc import Callable, Generator
from contextlib import contextmanager
from contextvars import ContextVar
from typing import Any

Expand Down Expand Up @@ -71,21 +72,51 @@ def set_intercept_url_allowlist(urls: list[str] | None) -> None:
_url_allowlist.discard("")


def set_interceptor_context(*, agent_id: str, action_name: str, intercept_index: int = 0) -> None:
"""Bind per-call context that subsequent intercepts will tag onto their inserted rows.
@contextmanager
def intercept_context(
*, agent_id: str, action_name: str, intercept_index: int = 0
) -> Generator[None, None, None]:
"""Scoped tagging for HTTP traffic emitted inside the ``with`` block.

Uses :class:`contextvars.ContextVar` so concurrent tasks/threads each see their own values.
Call this immediately before invoking the agent action whose HTTP traffic should be tagged.
Sets the underlying :class:`contextvars.ContextVar` values on enter and resets them
on exit, so the tag does not leak into surrounding LLM calls running in the same
:class:`asyncio.Task`.

.. important::
**Must be used as a ``with`` statement.** A bare call like
``intercept_context(agent_id="demo", action_name="get_weather")`` is a no-op
(returns a context-manager object that is immediately discarded; the body never
runs and no ContextVar is set). Subsequent intercepts will be tagged
``("unknown", "unknown")``.

Use this for any HTTP emitted from inside an agent framework's tool function::

@function_tool
def get_temperature():
with intercept_context(agent_id="demo", action_name="get_weather"):
return requests.get(...).json()

Nesting is supported: prior values are restored on exit, not cleared.

Args:
agent_id: Logical agent identifier; recorded in ``provably_intercepts.agent_id``.
**Must match** the ``intercept_agent_id`` you later pass to
:func:`provably.build_handoff_payload` (default ``"fetch_and_claim"``);
otherwise the (agent_id, action_name) lookup misses and the claim ends up
with no recorded request payload.
action_name: Action name; recorded in ``provably_intercepts.action_name``.
intercept_index: Per-action sequence number used by the simulation hook to address a
specific intercept (e.g. "mutate the second response of action X"). Default ``0``.
"""
_ctx_agent_id.set(agent_id)
_ctx_action_name.set(action_name)
_ctx_intercept_index.set(intercept_index)
t_agent = _ctx_agent_id.set(agent_id)
t_action = _ctx_action_name.set(action_name)
t_index = _ctx_intercept_index.set(intercept_index)
try:
yield
finally:
_ctx_intercept_index.reset(t_index)
_ctx_action_name.reset(t_action)
_ctx_agent_id.reset(t_agent)


def take_last_intercept_row_id() -> int | None:
Expand Down
145 changes: 145 additions & 0 deletions tests/unit/test_intercept_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
"""Tests for ``intercept_context`` and the ContextVar-leak it prevents.

The leak: a naked ``ContextVar.set()`` inside an agent loop's tool function persists past
the tool boundary into subsequent LLM calls running in the same ``asyncio.Task``, because
async tasks share their ContextVar cell unless an explicit ``reset(token)`` is paired with
the original ``set``. ``intercept_context`` is the scoped fix.
"""

from __future__ import annotations

import asyncio

import provably.intercept.interceptor as interceptor
from provably.intercept import intercept_context


def _reset_ctx_for_test_isolation() -> None:
"""Bring the ContextVars back to their declared defaults between tests."""
interceptor._ctx_agent_id.set("")
interceptor._ctx_action_name.set("")
interceptor._ctx_intercept_index.set(0)


def test_intercept_context_sets_values_inside_block() -> None:
_reset_ctx_for_test_isolation()
try:
with intercept_context(agent_id="ag-1", action_name="get_weather", intercept_index=2):
assert interceptor._ctx_agent_id.get() == "ag-1"
assert interceptor._ctx_action_name.get() == "get_weather"
assert interceptor._ctx_intercept_index.get() == 2
finally:
_reset_ctx_for_test_isolation()


def test_intercept_context_resets_to_default_on_exit() -> None:
"""When called with no prior values, exit restores the default empty/zero state."""
_reset_ctx_for_test_isolation()
try:
with intercept_context(agent_id="ag-1", action_name="get_weather"):
pass
assert interceptor._ctx_agent_id.get() == ""
assert interceptor._ctx_action_name.get() == ""
assert interceptor._ctx_intercept_index.get() == 0
finally:
_reset_ctx_for_test_isolation()


def test_intercept_context_restores_prior_values_on_exit() -> None:
"""Nesting: exit restores whatever values were set BEFORE the context manager entered."""
_reset_ctx_for_test_isolation()
try:
with intercept_context(agent_id="outer", action_name="outer-action", intercept_index=7):
with intercept_context(agent_id="inner", action_name="inner-action", intercept_index=99):
assert interceptor._ctx_action_name.get() == "inner-action"
assert interceptor._ctx_agent_id.get() == "outer"
assert interceptor._ctx_action_name.get() == "outer-action"
assert interceptor._ctx_intercept_index.get() == 7
finally:
_reset_ctx_for_test_isolation()


def test_intercept_context_resets_even_on_exception() -> None:
_reset_ctx_for_test_isolation()
try:
try:
with intercept_context(agent_id="x", action_name="y"):
raise RuntimeError("boom")
except RuntimeError:
pass
assert interceptor._ctx_agent_id.get() == ""
assert interceptor._ctx_action_name.get() == ""
finally:
_reset_ctx_for_test_isolation()


# ---------------------------------------------------------------------------
# Regression: agent-loop scenario (LLM → tool → LLM in one asyncio.Task).
#
# We simulate three "HTTP calls" by reading the ContextVars (which is exactly what
# ``_insert_row`` does at intercept time). The "tool" body sets the tag using
# ``intercept_context``; the second LLM call must NOT inherit the tool's tag.
# ---------------------------------------------------------------------------


def _read_what_insert_row_would_record() -> tuple[str, str]:
return (
interceptor._ctx_agent_id.get() or "unknown",
interceptor._ctx_action_name.get() or "unknown",
)


async def _agent_loop_with_naked_set() -> list[tuple[str, str]]:
"""Demonstrates WHY the context manager exists: a fire-and-forget ``ContextVar.set``
inside the tool persists into the subsequent LLM call in the same Task."""
rows: list[tuple[str, str]] = []
rows.append(_read_what_insert_row_would_record()) # LLM turn 1
# tool body uses naked .set() (the buggy pattern):
interceptor._ctx_agent_id.set("demo")
interceptor._ctx_action_name.set("get_weather")
rows.append(_read_what_insert_row_would_record()) # tool GET
# tool returns; agent continues with another LLM call in the SAME task:
rows.append(_read_what_insert_row_would_record()) # LLM turn 2
return rows


async def _agent_loop_with_intercept_context() -> list[tuple[str, str]]:
"""The fix: scoping with ``intercept_context`` resets the tag on tool exit."""
rows: list[tuple[str, str]] = []
rows.append(_read_what_insert_row_would_record()) # LLM turn 1
with intercept_context(agent_id="demo", action_name="get_weather"):
rows.append(_read_what_insert_row_would_record()) # tool GET
rows.append(_read_what_insert_row_would_record()) # LLM turn 2
return rows


def test_naked_ctx_var_set_leaks_into_subsequent_calls() -> None:
"""Documents WHY ``intercept_context`` must reset on exit. If an agent loop sets a
ContextVar directly inside a tool, the tag persists into the next LLM call in the
same asyncio.Task — producing wrong ``claim_urls`` and always-CAUGHT outcomes
downstream."""
_reset_ctx_for_test_isolation()
try:
rows = asyncio.run(_agent_loop_with_naked_set())
assert rows[0] == ("unknown", "unknown") # turn 1
assert rows[1] == ("demo", "get_weather") # tool
assert rows[2] == ("demo", "get_weather"), ( # turn 2 — leaks
"Naked ContextVar.set should leak into subsequent calls in the same Task. "
"If this assertion ever starts failing it means asyncio's ContextVar "
"semantics changed, in which case revisit the rationale for "
"intercept_context."
)
finally:
_reset_ctx_for_test_isolation()


def test_intercept_context_does_not_leak_into_subsequent_calls() -> None:
"""The fix: turn-2 LLM call goes back to ``"unknown"`` after the tool's ``with`` block."""
_reset_ctx_for_test_isolation()
try:
rows = asyncio.run(_agent_loop_with_intercept_context())
assert rows[0] == ("unknown", "unknown") # turn 1
assert rows[1] == ("demo", "get_weather") # tool
assert rows[2] == ("unknown", "unknown") # turn 2 — fixed
finally:
_reset_ctx_for_test_isolation()
Loading