From b36188c20bb629ba532c4778e6bacf7b098ec454 Mon Sep 17 00:00:00 2001 From: rimkusaurimas Date: Wed, 6 May 2026 15:02:24 +0200 Subject: [PATCH] feat(trusted_endpoints): support {id} and {path:path} placeholders in registered URLs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A registered URL may now contain FastAPI/Express-style path placeholders so a single entry covers a family of concrete URLs: {name} - matches exactly one path segment (no '/'). e.g. https://api.example.com/customers/{id} matches /customers/42 but NOT /customers/42/orders. {name:path} - matches any subtree, including '/' separators. e.g. https://api.example.com/customers/{rest:path} matches both /customers/42 and /customers/42/orders. Closes #14. Why: customer-support-sdk-demo had to enumerate ~70 concrete URLs at startup for templated routes (/customers/{id}). Runtime-generated ids (e.g. POST /tickets returning a fresh id) couldn't be trusted until manually registered. A single placeholder entry replaces the enumeration. Implementation: - Plain URLs without '{' keep exact-match semantics. No schema change. No migration needed for existing rows. Existing exact-match tests unchanged. - Pattern matching is auto-detected from URL content. Pattern compilation is LRU-cached so repeated lookups don't recompile the regex. - is_trusted_endpoint uses a two-phase lookup: exact match first (single indexed query, fast path), then a pattern-only scan (LIKE '%{%' filter) for rows containing placeholders. Plain registries see no perf regression. - The snapshot tamper-check inside check_claim_endpoints_are_trusted honors the same syntax — a payload built against a pattern entry verifies cleanly on the receiver side. Tests: 12 new (94 total). Ruff clean. --- CHANGELOG.md | 4 + README.md | 25 +++++ src/provably/trusted_endpoints.py | 89 +++++++++++++++++- tests/unit/test_trusted_endpoints.py | 135 +++++++++++++++++++++++++++ 4 files changed, 250 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d17bd1a..199bfed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Unreleased + +- `trusted_endpoints`: registered URLs may now contain FastAPI/Express-style path placeholders. `{id}` matches exactly one path segment, `{rest:path}` matches any subtree. Plain URLs without `{` keep exact-match semantics — no migration needed for existing rows. Both `is_trusted_endpoint` and the snapshot tamper-check inside `evaluate_handoff` honor the new syntax. Closes #14. + ## 0.2.0 - Added `provably.configure_indexing(enable_indexing: bool)`: one-call bootstrap (`initialize_runtime` + `init_interceptor` + `enable` / `disable`) for sender agents. diff --git a/README.md b/README.md index 389c962..3209975 100644 --- a/README.md +++ b/README.md @@ -346,6 +346,31 @@ URLs are normalized (lowercase scheme + host, default ports collapsed, trailing slash dropped) before any read or write so that `https://API.EXAMPLE.COM/x/` and `https://api.example.com/x` collide on the same row. +#### Path-pattern entries + +Concrete URLs match exactly. To authorize a family of URLs with a single entry — +useful for templated routes like `/customers/{id}` or runtime-generated ids — +register the URL with FastAPI/Express-style placeholders: + +| Placeholder | Matches | Example | +|---|---|---| +| `{name}` | exactly one path segment (no `/`) | `https://api.example.com/customers/{id}` matches `…/customers/42` but **not** `…/customers/42/orders` | +| `{name:path}` | any subtree (including `/` separators) | `https://api.example.com/customers/{rest:path}` matches both `…/customers/42` and `…/customers/42/orders` | + +The placeholder name (`id`, `rest`, …) is purely descriptive and does not affect +matching. Plain URLs without `{` characters keep exact-match semantics — no +behavior change for existing entries. + +```sql +-- Register a templated route once instead of enumerating every concrete id +INSERT INTO trusted_endpoints (org_id, normalized_url, display_label, entry_type) +VALUES ('my-org', 'https://api.example.com/customers/{id}', 'Customers (by id)', 'endpoint'); +``` + +`is_trusted_endpoint` and the snapshot tamper-check inside `evaluate_handoff` +both honor the same matching rules, so a claim against `…/customers/42` will +pass both gates when only the templated entry is registered. + ## Public API All public symbols are re-exported from the top-level `provably` namespace. See diff --git a/src/provably/trusted_endpoints.py b/src/provably/trusted_endpoints.py index e687a15..51851f4 100644 --- a/src/provably/trusted_endpoints.py +++ b/src/provably/trusted_endpoints.py @@ -2,6 +2,8 @@ from __future__ import annotations +import re +from functools import lru_cache from typing import TYPE_CHECKING from urllib.parse import urlparse @@ -12,6 +14,58 @@ _DDL_DONE = False +# --------------------------------------------------------------------------- +# Pattern matching +# +# A registered URL may contain FastAPI/Express-style path placeholders so a single +# entry can authorize a family of concrete URLs: +# +# {name} — matches one path segment (no '/'). E.g. /customers/{id} matches +# /customers/123 but NOT /customers/123/orders. +# {name:path} — matches any subtree, including '/' separators. E.g. +# /customers/{rest:path} matches both /customers/123 and +# /customers/123/orders. +# +# Plain URLs (no '{' character) keep exact-match semantics — no behavior change for +# existing entries. +# --------------------------------------------------------------------------- + +_PLACEHOLDER_RE = re.compile(r"\{[^}/]+(?::path)?\}") + + +@lru_cache(maxsize=512) +def _compile_pattern(registered: str) -> re.Pattern[str] | None: + """Compile a registered URL into a regex if it has placeholders, else return None. + + Cache keeps regex compilation off the hot per-request path. + """ + if "{" not in registered: + return None + parts: list[str] = [] + cursor = 0 + has_placeholder = False + for match in _PLACEHOLDER_RE.finditer(registered): + parts.append(re.escape(registered[cursor : match.start()])) + is_path = ":path" in match.group(0) + parts.append(".+?" if is_path else "[^/]+?") + cursor = match.end() + has_placeholder = True + if not has_placeholder: + return None + parts.append(re.escape(registered[cursor:])) + try: + return re.compile(f"^{''.join(parts)}$") + except re.error: + return None + + +def _matches_registered(claim_url: str, registered: str) -> bool: + """``True`` when ``claim_url`` exactly matches ``registered`` or matches its pattern.""" + if claim_url == registered: + return True + pattern = _compile_pattern(registered) + return pattern is not None and pattern.match(claim_url) is not None + def normalize_url_for_trust(url: str) -> str: """Return the canonical form of ``url`` used for trust look-ups. @@ -74,7 +128,13 @@ def ensure_trusted_endpoints_table(conn: psycopg2.extensions.connection) -> None def is_trusted_endpoint(url: str, org_id: str, conn: psycopg2.extensions.connection) -> bool: - """Return whether ``url`` is currently allowlisted for ``org_id``; normalizes URL before look-up.""" + """Return whether ``url`` is currently allowlisted for ``org_id``. + + Two-phase lookup: exact match first (fast path, single indexed query), then a + pattern-match scan over only the rows containing ``{`` in their ``normalized_url``. + Plain URLs without placeholders never enter the slow path, so existing exact-match + registries see no perf regression. + """ if not url or not org_id: return False norm = normalize_url_for_trust(url) @@ -82,6 +142,7 @@ def is_trusted_endpoint(url: str, org_id: str, conn: psycopg2.extensions.connect return False _ensure_trusted_table(conn) with conn.cursor() as cur: + # Fast path: exact match. cur.execute( """ SELECT 1 FROM trusted_endpoints @@ -90,7 +151,21 @@ def is_trusted_endpoint(url: str, org_id: str, conn: psycopg2.extensions.connect """, (org_id, norm), ) - return cur.fetchone() is not None + if cur.fetchone() is not None: + return True + # Slow path: pattern entries only. + cur.execute( + """ + SELECT normalized_url FROM trusted_endpoints + WHERE org_id = %s AND entry_type = 'endpoint' AND revoked_at IS NULL + AND normalized_url LIKE '%%{%%' + """, + (org_id,), + ) + for (registered,) in cur.fetchall(): + if _matches_registered(norm, str(registered or "")): + return True + return False def list_trusted_endpoints( @@ -208,7 +283,15 @@ def check_claim_endpoints_are_trusted( registry = {n for url in hp.trusted_endpoint_registry if (n := normalize_url_for_trust(str(url)))} if registry: - missing = list(dict.fromkeys(u for u in claim_urls if u not in registry)) + pattern_entries = [r for r in registry if "{" in r] + missing: list[str] = [] + for claim_url in claim_urls: + if claim_url in registry: + continue + if any(_matches_registered(claim_url, entry) for entry in pattern_entries): + continue + missing.append(claim_url) + missing = list(dict.fromkeys(missing)) if missing: raise ValueError(f"handoff has endpoints missing from trusted snapshot: {', '.join(missing)}") diff --git a/tests/unit/test_trusted_endpoints.py b/tests/unit/test_trusted_endpoints.py index 0f914e5..b740722 100644 --- a/tests/unit/test_trusted_endpoints.py +++ b/tests/unit/test_trusted_endpoints.py @@ -5,6 +5,8 @@ import pytest from provably.trusted_endpoints import ( + _compile_pattern, + _matches_registered, is_trusted_endpoint, list_trusted_endpoints, normalize_url_for_trust, @@ -46,6 +48,139 @@ def test_is_trusted_queries_normalized_row(monkeypatch: pytest.MonkeyPatch) -> N assert args[1][1] == "https://x.com/a" +# --------------------------------------------------------------------------- +# Pattern matching ({name} and {name:path} placeholders) +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "registered", + [ + "https://api.example.com/customers", + "https://api.example.com/customers/123", + "https://example.com", + ], +) +def test_compile_pattern_returns_none_for_plain_urls(registered: str) -> None: + assert _compile_pattern(registered) is None + + +def test_pattern_single_segment_matches_one_path_segment() -> None: + pattern = _compile_pattern("https://api.example.com/customers/{id}") + assert pattern is not None + assert pattern.match("https://api.example.com/customers/123") is not None + assert pattern.match("https://api.example.com/customers/abc-DEF") is not None + # Must NOT swallow additional path segments + assert pattern.match("https://api.example.com/customers/123/orders") is None + # Must NOT match a different prefix + assert pattern.match("https://api.example.com/clients/123") is None + # Must NOT match the bare prefix without an id segment + assert pattern.match("https://api.example.com/customers/") is None + + +def test_pattern_path_placeholder_matches_subtree() -> None: + pattern = _compile_pattern("https://api.example.com/customers/{rest:path}") + assert pattern is not None + assert pattern.match("https://api.example.com/customers/123") is not None + assert pattern.match("https://api.example.com/customers/123/orders/456") is not None + # Still anchored at the prefix + assert pattern.match("https://api.example.com/clients/123") is None + + +def test_pattern_multiple_placeholders() -> None: + pattern = _compile_pattern("https://api.example.com/customers/{cust}/orders/{order}") + assert pattern is not None + assert pattern.match("https://api.example.com/customers/c1/orders/o9") is not None + assert pattern.match("https://api.example.com/customers/c1/orders/o9/items/x") is None + + +def test_matches_registered_falls_back_to_exact() -> None: + assert _matches_registered("https://x.com/a", "https://x.com/a") is True + assert _matches_registered("https://x.com/a", "https://x.com/b") is False + + +def test_matches_registered_uses_pattern_when_present() -> None: + assert _matches_registered("https://x.com/customers/9", "https://x.com/customers/{id}") is True + assert _matches_registered("https://x.com/customers/9/orders", "https://x.com/customers/{id}") is False + + +def test_is_trusted_endpoint_matches_pattern_entry(monkeypatch: pytest.MonkeyPatch) -> None: + """A claim URL matching a registered ``{id}`` pattern is trusted via the slow path.""" + monkeypatch.setattr("provably.trusted_endpoints._ensure_trusted_table", lambda _c: None) + conn = MagicMock() + cur = MagicMock() + conn.cursor.return_value.__enter__ = lambda *_: cur + conn.cursor.return_value.__exit__ = lambda *_: None + # First query (exact match) misses; second query (pattern entries) returns one row. + cur.fetchone.return_value = None + cur.fetchall.return_value = [("https://api.example.com/customers/{id}",)] + + assert is_trusted_endpoint("https://api.example.com/customers/42", "org-1", conn) is True + # Exact-then-pattern: two execute calls. + assert cur.execute.call_count == 2 + + +def test_is_trusted_endpoint_rejects_nonmatching_pattern(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr("provably.trusted_endpoints._ensure_trusted_table", lambda _c: None) + conn = MagicMock() + cur = MagicMock() + conn.cursor.return_value.__enter__ = lambda *_: cur + conn.cursor.return_value.__exit__ = lambda *_: None + cur.fetchone.return_value = None + # Registered pattern allows /customers/{id} only — claim hits a deeper path. + cur.fetchall.return_value = [("https://api.example.com/customers/{id}",)] + + assert is_trusted_endpoint("https://api.example.com/customers/42/orders", "org-1", conn) is False + + +def test_snapshot_check_accepts_pattern_match(monkeypatch: pytest.MonkeyPatch) -> None: + """The snapshot tamper-check must honor pattern entries the same way the live DB check does.""" + from provably.handoff.types import HandoffClaim, HandoffPayload + from provably.trusted_endpoints import check_claim_endpoints_are_trusted + + # Live DB check is exercised separately; stub it as trusting whatever made it past + # the snapshot check (returns True). + monkeypatch.setattr("provably.trusted_endpoints.is_trusted_endpoint", lambda *_a, **_kw: True) + monkeypatch.setattr("psycopg2.connect", lambda *_a, **_kw: MagicMock()) + + payload = HandoffPayload( + provably_org_id="org-1", + trusted_endpoint_registry=["https://api.example.com/customers/{id}"], + claims=[ + HandoffClaim( + action_name="get_customer", + request_payload={"url": "https://api.example.com/customers/42", "method": "GET"}, + ) + ], + ) + + # Should NOT raise — pattern entry covers the concrete URL. + check_claim_endpoints_are_trusted(payload, postgres_url="postgresql://x") + + +def test_snapshot_check_rejects_url_outside_pattern(monkeypatch: pytest.MonkeyPatch) -> None: + from provably.handoff.types import HandoffClaim, HandoffPayload + from provably.trusted_endpoints import check_claim_endpoints_are_trusted + + monkeypatch.setattr("provably.trusted_endpoints.is_trusted_endpoint", lambda *_a, **_kw: True) + monkeypatch.setattr("psycopg2.connect", lambda *_a, **_kw: MagicMock()) + + payload = HandoffPayload( + provably_org_id="org-1", + trusted_endpoint_registry=["https://api.example.com/customers/{id}"], + claims=[ + HandoffClaim( + action_name="get_orders", + # Goes one segment deeper than {id} permits. + request_payload={"url": "https://api.example.com/customers/42/orders", "method": "GET"}, + ) + ], + ) + + with pytest.raises(ValueError, match="missing from trusted snapshot"): + check_claim_endpoints_are_trusted(payload, postgres_url="postgresql://x") + + def test_list_trusted_endpoints_excludes_given_urls(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr("provably.trusted_endpoints._ensure_trusted_table", lambda _c: None) conn = MagicMock()