diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6d17551bc..b38e0b6a4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -39,8 +39,8 @@ jobs: run: | pytest tests/ -v --cov=src/adcp --cov-report=term-missing - pg-replay-store: - name: PgReplayStore tests (Postgres 16) + pg-conformance: + name: Postgres conformance tests (Postgres 16) runs-on: ubuntu-latest services: postgres: @@ -73,12 +73,13 @@ jobs: python -m pip install --upgrade pip pip install -e ".[dev,pg]" - - name: Run PgReplayStore tests (unit + full-wire e2e) + - name: Run Postgres conformance tests env: ADCP_PG_TEST_URL: postgresql://postgres@localhost:5432/adcp_test run: | pytest tests/conformance/signing/test_pg_replay_store.py \ tests/conformance/signing/test_pg_replay_store_e2e.py \ + tests/conformance/decisioning/test_pg_buyer_agent_registry.py \ -v conventional-commits: diff --git a/src/adcp/decisioning/pg/__init__.py b/src/adcp/decisioning/pg/__init__.py new file mode 100644 index 000000000..ce43e7b4f --- /dev/null +++ b/src/adcp/decisioning/pg/__init__.py @@ -0,0 +1,32 @@ +"""PostgreSQL-backed implementations for the decisioning module. + +Ships durable backends behind the ``[pg]`` optional extra so the +base ``adcp.decisioning`` import path stays free of SQL dependencies +for adopters who only need the in-memory primitives. + +Available when ``adcp[pg]`` is installed: + +* :class:`PgBuyerAgentRegistry` — durable Tier 2 commercial-identity + layer for v3 sellers. The framework calls the registry on every + request to gate dispatch on the seller's commercial relationship + with the buyer agent (allowlist + onboarding state + billing + capabilities). + +The schema DDL ships alongside the Python code (e.g. +``adcp/decisioning/pg/buyer_agent_registry.sql``) so adopters can run +it through whatever migration tool they use (Alembic, Flyway, psql). +""" + +from __future__ import annotations + +from adcp.decisioning.pg.buyer_agent_registry import ( + DEFAULT_TABLE_NAME, + PG_AVAILABLE, + PgBuyerAgentRegistry, +) + +__all__ = [ + "DEFAULT_TABLE_NAME", + "PG_AVAILABLE", + "PgBuyerAgentRegistry", +] diff --git a/src/adcp/decisioning/pg/buyer_agent_registry.py b/src/adcp/decisioning/pg/buyer_agent_registry.py new file mode 100644 index 000000000..6463871e1 --- /dev/null +++ b/src/adcp/decisioning/pg/buyer_agent_registry.py @@ -0,0 +1,419 @@ +"""PostgreSQL-backed :class:`~adcp.decisioning.BuyerAgentRegistry`. + +Durable commercial-identity storage for AdCP v3 sellers — the Tier 2 +counterparty allowlist + onboarding state + billing capabilities. The +framework calls the registry on every request to gate dispatch on +the seller's commercial relationship with the agent (recognized? +suspended? billable agent-direct?) BEFORE +:meth:`AccountStore.resolve` runs. + +Mirrors the design of :class:`adcp.signing.pg.PgReplayStore`: caller +supplies a :class:`psycopg_pool.ConnectionPool`, the framework runs +short-lived statements per call (no long-lived transactions, no +cross-operation state), and a separate ``.sql`` file ships the DDL +for adopters using a migration tool (Alembic, Flyway, psql). + +End-to-end example +------------------ + +:: + + from psycopg_pool import ConnectionPool + from adcp.decisioning import serve, signing_only_registry + from adcp.decisioning.pg import PgBuyerAgentRegistry + + pool = ConnectionPool("postgresql://...", min_size=4, max_size=20) + registry = PgBuyerAgentRegistry(pool=pool) + registry.create_schema() # idempotent; safe on every boot + + # Seed the allowlist — typically driven by an admin UI / API. + registry.upsert( + BuyerAgent( + agent_url="https://agent.example/", + display_name="Acme", + status="active", + ) + ) + + serve( + platform=MySalesPlatform(), + buyer_agent_registry=registry, + ..., + ) + +Async-from-sync bridging +------------------------ + +The :class:`BuyerAgentRegistry` Protocol is async (called from inside +the framework's dispatch event loop), but psycopg-pool's +:class:`~psycopg_pool.ConnectionPool` is sync. Each ``resolve_*`` +method wraps its sync DB call with :func:`asyncio.to_thread` so the +event loop stays responsive — at the cost of a thread-pool hop per +request. + +Adopters needing higher throughput swap to a custom Protocol impl +backed by :class:`psycopg_pool.AsyncConnectionPool`. The framework +keeps the simpler sync-pool shape as the bundled default; it matches +:class:`PgReplayStore` and lets adopters share a single sync pool +across replay-store, registry, and (future) audit-sink. + +Concurrency +----------- + +Safe to share across threads and processes. PostgreSQL provides the +cross-instance locking via PK conflict resolution on +``INSERT ... ON CONFLICT``. + +Failure mode +------------ + +Transport / connection errors propagate from psycopg unchanged +(:class:`OperationalError`, :class:`PoolTimeout`, etc.). The +framework's dispatch layer treats unexpected exceptions as +``INTERNAL_ERROR`` so the wire response stays opaque to the buyer +while the original exception lands in server logs via the +observability hooks. +""" + +from __future__ import annotations + +import asyncio +import json +import re +from typing import TYPE_CHECKING, Any + +from adcp.decisioning.registry import ( + ApiKeyCredential, + BuyerAgent, + BuyerAgentDefaultTerms, + BuyerAgentStatus, + OAuthCredential, +) + +if TYPE_CHECKING: + from psycopg_pool import ConnectionPool + +try: + import psycopg # noqa: F401 + import psycopg_pool # noqa: F401 + + PG_AVAILABLE = True +except ImportError: + PG_AVAILABLE = False + + +DEFAULT_TABLE_NAME = "adcp_buyer_agents" + +# Byte-level ASCII identifier match — same posture as PgReplayStore. +# Non-ASCII Unicode letters would format verbatim into SQL as a +# DIFFERENT table, which under attacker-influenced configuration is +# a silent table-substitution vector. ASCII-byte-exact only. +_SAFE_IDENTIFIER_RE = re.compile(r"^[a-z_][a-z0-9_]{0,62}$") + +_INSTALL_HINT = ( + "PgBuyerAgentRegistry requires psycopg3 and psycopg-pool. Install " + "the 'pg' extra: `pip install 'adcp[pg]'` (Poetry: " + "`poetry add 'adcp[pg]'`)." +) + +_VALID_STATUSES = frozenset({"active", "suspended", "blocked"}) + + +class PgBuyerAgentRegistry: + """PostgreSQL-backed :class:`~adcp.decisioning.BuyerAgentRegistry`. + + Parameters + ---------- + pool: + A :class:`psycopg_pool.ConnectionPool` owned by the caller. + Each operation acquires a short-lived connection, runs a + single statement, and returns the connection. + table_name: + Override the default ``adcp_buyer_agents`` table when two + tenants share a database and need separate registries. Must + be an ASCII-byte-clean identifier — the constructor validates. + + Concurrency + ----------- + + Safe to share across threads and processes. The + :meth:`resolve_by_agent_url` / :meth:`resolve_by_credential` + methods bridge the async Protocol to the sync pool via + :func:`asyncio.to_thread`; concurrent dispatches each get their + own thread + connection. + """ + + def __init__( + self, + *, + pool: ConnectionPool, + table_name: str = DEFAULT_TABLE_NAME, + ) -> None: + if not PG_AVAILABLE: + raise ImportError(_INSTALL_HINT) + if not _is_safe_identifier(table_name): + raise ValueError( + "table_name must match [a-z_][a-z0-9_]* (ASCII only), " f"got {table_name!r}" + ) + self._pool = pool + self._table = table_name + + # Pre-format queries so the hot path doesn't f-string per call. + # All identifier substitutions are validated at __init__; row + # values flow through psycopg's parameter binding. + cols = ( + "agent_url, display_name, status, billing_capabilities, " + "api_key_id, default_terms, allowed_brands, ext" + ) + self._sql_select_by_agent_url = ( + f"SELECT {cols} FROM {self._table} " # noqa: S608 — table name validated + f"WHERE agent_url = %s" + ) + self._sql_select_by_api_key_id = ( + f"SELECT {cols} FROM {self._table} " # noqa: S608 + f"WHERE api_key_id = %s" + ) + self._sql_upsert = ( + f"INSERT INTO {self._table} (" # noqa: S608 + f" agent_url, display_name, status, billing_capabilities, " + f" api_key_id, default_terms, allowed_brands, ext, updated_at" + f") VALUES (%s, %s, %s, %s::jsonb, %s, %s::jsonb, %s::jsonb, " + f" %s::jsonb, now()) " + f"ON CONFLICT (agent_url) DO UPDATE SET " + f" display_name = EXCLUDED.display_name, " + f" status = EXCLUDED.status, " + f" billing_capabilities = EXCLUDED.billing_capabilities, " + f" api_key_id = EXCLUDED.api_key_id, " + f" default_terms = EXCLUDED.default_terms, " + f" allowed_brands = EXCLUDED.allowed_brands, " + f" ext = EXCLUDED.ext, " + f" updated_at = now()" + ) + self._sql_set_status = ( + f"UPDATE {self._table} " # noqa: S608 + f"SET status = %s, updated_at = now() " + f"WHERE agent_url = %s" + ) + self._sql_delete = f"DELETE FROM {self._table} WHERE agent_url = %s" # noqa: S608 + + # ----- schema bootstrap --------------------------------------------- + + def create_schema(self) -> None: + """Create the registry table + indexes for this store's + ``table_name``. Idempotent via ``CREATE ... IF NOT EXISTS``; + safe to call on every app boot. + + The equivalent raw DDL ships at + :file:`src/adcp/decisioning/pg/buyer_agent_registry.sql` for + adopters using a migration tool (Alembic, Flyway, psql) — + that file uses the canonical ``adcp_buyer_agents`` name. + """ + table = self._table # already validated at __init__ + ddl = ( + f"CREATE TABLE IF NOT EXISTS {table} (" # noqa: S608 — validated + f' agent_url TEXT COLLATE "C" PRIMARY KEY,' + f" display_name TEXT NOT NULL," + f" status TEXT NOT NULL DEFAULT 'active'" + f" CHECK (status IN ('active', 'suspended', 'blocked'))," + f" billing_capabilities JSONB NOT NULL DEFAULT '[\"operator\"]'::jsonb," + f' api_key_id TEXT COLLATE "C",' + f" default_terms JSONB," + f" allowed_brands JSONB," + f" ext JSONB NOT NULL DEFAULT '{{}}'::jsonb," + f" created_at TIMESTAMPTZ NOT NULL DEFAULT now()," + f" updated_at TIMESTAMPTZ NOT NULL DEFAULT now()" + f");" + f"CREATE INDEX IF NOT EXISTS {table}_api_key_id_idx " # noqa: S608 + f" ON {table} (api_key_id) WHERE api_key_id IS NOT NULL;" + f"CREATE INDEX IF NOT EXISTS {table}_status_idx " # noqa: S608 + f" ON {table} (status) WHERE status <> 'active';" + ) + with self._pool.connection() as conn, conn.cursor() as cur: + cur.execute(ddl) + + # ----- BuyerAgentRegistry Protocol -------------------------------- + + async def resolve_by_agent_url(self, agent_url: str) -> BuyerAgent | None: + """Resolve a verified ``agent_url`` against the allowlist. + + The framework has already validated the RFC 9421 signature + before this point — the registry's only job is the commercial + lookup. Returns ``None`` when the agent isn't recognized; + the framework converts that to ``REQUEST_AUTH_UNRECOGNIZED_AGENT``. + """ + return await asyncio.to_thread(self._sync_lookup_by_agent_url, agent_url) + + async def resolve_by_credential( + self, + credential: ApiKeyCredential | OAuthCredential, + ) -> BuyerAgent | None: + """Resolve a bearer / API-key / OAuth credential. + + Looks up against the ``api_key_id`` column. For + :class:`OAuthCredential`, the ``client_id`` is used as the + lookup key — adopters with separate OAuth-client tables fork + this registry impl and split the column. The MVP shape + treats both bearer and OAuth as the same column for the + common case (one identifier per agent). + """ + if isinstance(credential, ApiKeyCredential): + key = credential.key_id + elif isinstance(credential, OAuthCredential): + key = credential.client_id + else: # defensive: future Credential variants the registry can't dispatch + return None + return await asyncio.to_thread(self._sync_lookup_by_api_key_id, key) + + # ----- admin CRUD -------------------------------------------------- + + def upsert(self, agent: BuyerAgent, *, api_key_id: str | None = None) -> None: + """Insert or update a :class:`BuyerAgent` row. + + ``api_key_id`` is separate from the :class:`BuyerAgent` shape + because the framework's typed model doesn't carry the + bearer-table FK. Adopters running bearer auth populate this; + signing-only adopters leave it ``None``. + """ + if agent.status not in _VALID_STATUSES: + raise ValueError( + f"BuyerAgent.status must be one of {sorted(_VALID_STATUSES)!r}, " + f"got {agent.status!r}" + ) + terms_json = ( + json.dumps(_terms_to_dict(agent.default_account_terms)) + if agent.default_account_terms is not None + else None + ) + allowed_brands_json = ( + json.dumps(sorted(agent.allowed_brands)) if agent.allowed_brands is not None else None + ) + params = ( + agent.agent_url, + agent.display_name, + agent.status, + json.dumps(sorted(agent.billing_capabilities)), + api_key_id, + terms_json, + allowed_brands_json, + json.dumps(dict(agent.ext)), + ) + with self._pool.connection() as conn, conn.cursor() as cur: + cur.execute(self._sql_upsert, params) + + def set_status(self, agent_url: str, status: BuyerAgentStatus) -> None: + """Update an agent's lifecycle status. Use to suspend / block + / reactivate without rewriting the full row.""" + if status not in _VALID_STATUSES: + raise ValueError(f"status must be one of {sorted(_VALID_STATUSES)!r}, got {status!r}") + with self._pool.connection() as conn, conn.cursor() as cur: + cur.execute(self._sql_set_status, (status, agent_url)) + + def delete(self, agent_url: str) -> None: + """Remove an agent from the registry. + + Hard delete — no row history. Adopters needing audit retention + keep the row and set ``status='blocked'`` instead. + """ + with self._pool.connection() as conn, conn.cursor() as cur: + cur.execute(self._sql_delete, (agent_url,)) + + # ----- sync helpers (called via asyncio.to_thread) ---------------- + + def _sync_lookup_by_agent_url(self, agent_url: str) -> BuyerAgent | None: + with self._pool.connection() as conn, conn.cursor() as cur: + cur.execute(self._sql_select_by_agent_url, (agent_url,)) + row = cur.fetchone() + return _row_to_agent(row) if row else None + + def _sync_lookup_by_api_key_id(self, key: str) -> BuyerAgent | None: + with self._pool.connection() as conn, conn.cursor() as cur: + cur.execute(self._sql_select_by_api_key_id, (key,)) + row = cur.fetchone() + return _row_to_agent(row) if row else None + + +def _row_to_agent(row: Any) -> BuyerAgent: + """Project a DB row to the typed :class:`BuyerAgent`. + + Row tuple shape mirrors the SELECT column order in + :class:`PgBuyerAgentRegistry`: ``(agent_url, display_name, status, + billing_capabilities, api_key_id, default_terms, allowed_brands, + ext)``. ``api_key_id`` is read but not surfaced on + :class:`BuyerAgent` — the framework's typed model doesn't carry + the bearer-table FK; adopters reading it use admin queries. + """ + ( + agent_url, + display_name, + status, + billing_capabilities, + _api_key_id, + default_terms, + allowed_brands, + ext, + ) = row + + # billing_capabilities arrives as a Python list (psycopg auto-decodes + # JSONB). Defensive parse if a string slips through. + if isinstance(billing_capabilities, str): + billing_capabilities = json.loads(billing_capabilities) + + terms: BuyerAgentDefaultTerms | None = None + if default_terms is not None: + terms_dict = json.loads(default_terms) if isinstance(default_terms, str) else default_terms + terms = BuyerAgentDefaultTerms( + rate_card=terms_dict.get("rate_card"), + payment_terms=terms_dict.get("payment_terms"), + credit_limit=terms_dict.get("credit_limit"), + billing_entity=terms_dict.get("billing_entity"), + ) + + brands: frozenset[str] | None = None + if allowed_brands is not None: + brands_list = ( + json.loads(allowed_brands) if isinstance(allowed_brands, str) else allowed_brands + ) + brands = frozenset(brands_list) + + if isinstance(ext, str): + ext = json.loads(ext) + + return BuyerAgent( + agent_url=agent_url, + display_name=display_name, + status=status, + billing_capabilities=frozenset(billing_capabilities), + default_account_terms=terms, + allowed_brands=brands, + ext=ext or {}, + ) + + +def _terms_to_dict(terms: BuyerAgentDefaultTerms) -> dict[str, Any]: + """Project :class:`BuyerAgentDefaultTerms` to a JSONB-friendly dict.""" + out: dict[str, Any] = {} + if terms.rate_card is not None: + out["rate_card"] = terms.rate_card + if terms.payment_terms is not None: + out["payment_terms"] = terms.payment_terms + if terms.credit_limit is not None: + out["credit_limit"] = dict(terms.credit_limit) + if terms.billing_entity is not None: + out["billing_entity"] = dict(terms.billing_entity) + return out + + +def _is_safe_identifier(name: str) -> bool: + """Allow only byte-ASCII lowercase identifiers for the table-name + kwarg. Same posture as :class:`PgReplayStore` — the table name + is static-formatted into SQL at construction; this validator is + the sole guard against injection or homoglyph table substitution. + """ + return _SAFE_IDENTIFIER_RE.fullmatch(name) is not None + + +__all__ = [ + "DEFAULT_TABLE_NAME", + "PG_AVAILABLE", + "PgBuyerAgentRegistry", +] diff --git a/src/adcp/decisioning/pg/buyer_agent_registry.sql b/src/adcp/decisioning/pg/buyer_agent_registry.sql new file mode 100644 index 000000000..db8a4f54d --- /dev/null +++ b/src/adcp/decisioning/pg/buyer_agent_registry.sql @@ -0,0 +1,72 @@ +-- AdCP Tier 2 commercial-identity layer — durable registry storage. +-- +-- Run this once per deployment. Tracked by the adcp-client-python +-- PgBuyerAgentRegistry; see +-- src/adcp/decisioning/pg/buyer_agent_registry.py for the query +-- shapes the Python code executes. +-- +-- COLLATE "C" on the identifier columns avoids locale-dependent case +-- folding — on some locales "https://Acme/" and "https://acme/" compare +-- equal, which would conflate distinct buyer agents. "C" is the +-- byte-for-byte comparison the framework's lookup expects. + +CREATE TABLE IF NOT EXISTS adcp_buyer_agents ( + -- AdCP v3 canonical identifier. The framework looks up by this + -- string for HTTP-Signature traffic after the verifier validates + -- the signature. + agent_url TEXT COLLATE "C" PRIMARY KEY, + + display_name TEXT NOT NULL, + + -- Lifecycle: active / suspended / blocked. Adopters update + -- in-place to suspend / unblock; the framework rejects suspended + -- and blocked agents with structured error codes before the + -- platform method runs. + status TEXT NOT NULL DEFAULT 'active' + CHECK (status IN ('active', 'suspended', 'blocked')), + + -- Set of permitted BillingMode values for accounts under this + -- agent. Stored as JSON array; the registry projects to + -- frozenset[BillingMode] inside the wrapper. Default is + -- passthrough-only — agent has no payments relationship. + billing_capabilities JSONB NOT NULL DEFAULT '["operator"]'::jsonb, + + -- Bearer-token id for pre-trust beta auth. NULL for signing-only + -- adopters. Indexed (partial) for the bearer-credential lookup + -- path; the index excludes NULL rows so pre-trust adopters who + -- never populate it pay nothing. + api_key_id TEXT COLLATE "C", + + -- Default account terms (rate card, payment terms, credit limit, + -- billing entity). JSONB blob mirroring BuyerAgentDefaultTerms + -- shape. Adopters with structured validation can swap to a + -- domain-specific table joined via FK. + default_terms JSONB, + + -- Pre-RFC allowlist of brand domains this agent can transact + -- for. Static fallback; once Tier 3 BrandAuthorizationResolver + -- lands (gated on ADCP #3690), this layers on top of per-request + -- brand.json authz. + allowed_brands JSONB, + + -- Adopter passthrough for internal ids, audit metadata, anything + -- the SDK doesn't model. + ext JSONB NOT NULL DEFAULT '{}'::jsonb, + + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +-- Bearer-credential lookup index. Partial — only rows with an +-- api_key_id pay the index cost. Signing-only adopters store no +-- bearer keys and the index stays empty. +CREATE INDEX IF NOT EXISTS adcp_buyer_agents_api_key_id_idx + ON adcp_buyer_agents (api_key_id) + WHERE api_key_id IS NOT NULL; + +-- Suspension / blocking sweep helper — admin tools that list +-- agents-needing-attention can scan by status efficiently without +-- a sequential scan. +CREATE INDEX IF NOT EXISTS adcp_buyer_agents_status_idx + ON adcp_buyer_agents (status) + WHERE status <> 'active'; diff --git a/tests/conformance/decisioning/__init__.py b/tests/conformance/decisioning/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/conformance/decisioning/test_pg_buyer_agent_registry.py b/tests/conformance/decisioning/test_pg_buyer_agent_registry.py new file mode 100644 index 000000000..5219df2b2 --- /dev/null +++ b/tests/conformance/decisioning/test_pg_buyer_agent_registry.py @@ -0,0 +1,303 @@ +"""Conformance tests for :class:`adcp.decisioning.pg.PgBuyerAgentRegistry`. + +Requires a real PostgreSQL instance. To run locally:: + + docker run --rm -d -p 5432:5432 -e POSTGRES_PASSWORD=pg postgres:16 + export ADCP_PG_TEST_URL=postgresql://postgres:pg@localhost:5432/postgres + pytest tests/conformance/decisioning/test_pg_buyer_agent_registry.py -v + +The entire module skips when ``ADCP_PG_TEST_URL`` is unset, so the +default test matrix stays green without a database dependency. CI +runs this in the same Postgres-16 job as the PgReplayStore tests. + +Each test runs in an isolated table (``test_adcp_buyer_agents_``) +so parallel runs and rerun-after-crash don't collide. +""" + +from __future__ import annotations + +import asyncio +import os +import secrets +from collections.abc import Iterator + +import pytest + +psycopg = pytest.importorskip("psycopg") +psycopg_pool = pytest.importorskip("psycopg_pool") + +TEST_URL = os.environ.get("ADCP_PG_TEST_URL") +if not TEST_URL: + pytest.skip( + "ADCP_PG_TEST_URL not set — skipping PgBuyerAgentRegistry tests", + allow_module_level=True, + ) + +from adcp.decisioning import ( # noqa: E402 + ApiKeyCredential, + BuyerAgent, + BuyerAgentDefaultTerms, + OAuthCredential, +) +from adcp.decisioning.pg import PgBuyerAgentRegistry # noqa: E402 + + +@pytest.fixture() +def isolated_pool() -> Iterator[tuple[psycopg_pool.ConnectionPool, str]]: + """Fresh pool + isolated table per test. Drops on teardown.""" + table = f"test_adcp_buyer_agents_{secrets.token_hex(6)}" + with psycopg_pool.ConnectionPool(TEST_URL, min_size=2, max_size=8) as pool: + registry = PgBuyerAgentRegistry(pool=pool, table_name=table) + registry.create_schema() + try: + yield pool, table + finally: + with pool.connection() as conn, conn.cursor() as cur: + cur.execute(f"DROP TABLE IF EXISTS {table}") + + +def _registry(fixture: tuple[psycopg_pool.ConnectionPool, str]) -> PgBuyerAgentRegistry: + pool, table = fixture + return PgBuyerAgentRegistry(pool=pool, table_name=table) + + +# ----- create_schema bootstrap ------------------------------------------- + + +def test_create_schema_is_idempotent(isolated_pool) -> None: + """``create_schema`` is safe to call multiple times — uses + ``CREATE TABLE IF NOT EXISTS`` so a second call after the + fixture's bootstrap is a no-op.""" + registry = _registry(isolated_pool) + registry.create_schema() + registry.create_schema() # must not raise + + +# ----- resolve_by_agent_url ---------------------------------------------- + + +def test_resolve_by_agent_url_returns_none_for_unknown(isolated_pool) -> None: + registry = _registry(isolated_pool) + result = asyncio.run(registry.resolve_by_agent_url("https://unknown/")) + assert result is None + + +def test_resolve_by_agent_url_returns_typed_buyer_agent(isolated_pool) -> None: + registry = _registry(isolated_pool) + expected = BuyerAgent( + agent_url="https://acme/", + display_name="Acme", + status="active", + billing_capabilities=frozenset({"operator", "agent"}), + default_account_terms=BuyerAgentDefaultTerms( + rate_card="acme-2026", + payment_terms="NET30", + ), + allowed_brands=frozenset({"example.com", "acme.example"}), + ext={"internal_id": "tenant-42"}, + ) + registry.upsert(expected) + + result = asyncio.run(registry.resolve_by_agent_url("https://acme/")) + assert result is not None + assert result.agent_url == "https://acme/" + assert result.display_name == "Acme" + assert result.status == "active" + assert result.billing_capabilities == frozenset({"operator", "agent"}) + assert result.default_account_terms is not None + assert result.default_account_terms.rate_card == "acme-2026" + assert result.default_account_terms.payment_terms == "NET30" + assert result.allowed_brands == frozenset({"example.com", "acme.example"}) + assert result.ext == {"internal_id": "tenant-42"} + + +# ----- resolve_by_credential --------------------------------------------- + + +def test_resolve_by_api_key_credential(isolated_pool) -> None: + registry = _registry(isolated_pool) + agent = BuyerAgent( + agent_url="https://bearer-buyer/", + display_name="Bearer Buyer", + status="active", + ) + registry.upsert(agent, api_key_id="bearer-key-1") + + result = asyncio.run( + registry.resolve_by_credential( + ApiKeyCredential(kind="api_key", key_id="bearer-key-1"), + ) + ) + assert result is not None + assert result.agent_url == "https://bearer-buyer/" + + +def test_resolve_by_oauth_credential_uses_client_id(isolated_pool) -> None: + registry = _registry(isolated_pool) + agent = BuyerAgent( + agent_url="https://oauth-buyer/", + display_name="OAuth Buyer", + status="active", + ) + registry.upsert(agent, api_key_id="oauth-client-1") + + result = asyncio.run( + registry.resolve_by_credential( + OAuthCredential(kind="oauth", client_id="oauth-client-1"), + ) + ) + assert result is not None + assert result.agent_url == "https://oauth-buyer/" + + +def test_resolve_by_credential_returns_none_for_unknown_key(isolated_pool) -> None: + registry = _registry(isolated_pool) + result = asyncio.run( + registry.resolve_by_credential( + ApiKeyCredential(kind="api_key", key_id="never-seeded"), + ) + ) + assert result is None + + +# ----- upsert (admin path) ----------------------------------------------- + + +def test_upsert_overwrites_existing_row(isolated_pool) -> None: + """Re-upserting under the same agent_url updates the display_name, + status, billing_capabilities, etc. — the framework uses this + path to project admin-UI edits into the registry.""" + registry = _registry(isolated_pool) + registry.upsert( + BuyerAgent( + agent_url="https://acme/", + display_name="Acme (old)", + status="active", + ) + ) + registry.upsert( + BuyerAgent( + agent_url="https://acme/", + display_name="Acme (renamed)", + status="active", + billing_capabilities=frozenset({"operator", "agent"}), + ) + ) + + result = asyncio.run(registry.resolve_by_agent_url("https://acme/")) + assert result is not None + assert result.display_name == "Acme (renamed)" + assert result.billing_capabilities == frozenset({"operator", "agent"}) + + +def test_upsert_rejects_invalid_status(isolated_pool) -> None: + registry = _registry(isolated_pool) + with pytest.raises(ValueError, match="status"): + registry.upsert( + BuyerAgent( + agent_url="https://x/", + display_name="x", + status="bogus", # type: ignore[arg-type] + ) + ) + + +# ----- set_status (admin lifecycle) -------------------------------------- + + +def test_set_status_suspends_agent(isolated_pool) -> None: + """Admin path: flipping status to ``suspended`` cuts the agent + off on the next request — no cache invalidation, no restart. + The framework reads status fresh on every dispatch.""" + registry = _registry(isolated_pool) + registry.upsert( + BuyerAgent( + agent_url="https://acme/", + display_name="Acme", + status="active", + ) + ) + registry.set_status("https://acme/", "suspended") + + result = asyncio.run(registry.resolve_by_agent_url("https://acme/")) + assert result is not None + assert result.status == "suspended" + + +def test_set_status_rejects_invalid_status(isolated_pool) -> None: + registry = _registry(isolated_pool) + with pytest.raises(ValueError, match="status"): + registry.set_status("https://acme/", "bogus") # type: ignore[arg-type] + + +# ----- delete ------------------------------------------------------------- + + +def test_delete_removes_agent(isolated_pool) -> None: + registry = _registry(isolated_pool) + registry.upsert(BuyerAgent(agent_url="https://x/", display_name="x", status="active")) + registry.delete("https://x/") + + result = asyncio.run(registry.resolve_by_agent_url("https://x/")) + assert result is None + + +# ----- security: table-name validation ----------------------------------- + + +def test_constructor_rejects_unsafe_table_name(isolated_pool) -> None: + """Attacker-influenced table_name with a SQL fragment must not + format into the dynamic SQL. The constructor validates against + the same regex the PgReplayStore uses.""" + pool, _ = isolated_pool + with pytest.raises(ValueError, match="table_name"): + PgBuyerAgentRegistry(pool=pool, table_name="dangerous; DROP TABLE foo--") + + +def test_constructor_rejects_unicode_homoglyph_table_name(isolated_pool) -> None: + """Unicode homoglyphs (e.g. fullwidth Latin) format verbatim and + would silently address a different table than the operator + intended. Reject.""" + pool, _ = isolated_pool + with pytest.raises(ValueError, match="table_name"): + # Fullwidth Latin "table" — looks like ASCII to a reader, + # different bytes to Postgres. + PgBuyerAgentRegistry(pool=pool, table_name="table") + + +# ----- defaults / edge cases --------------------------------------------- + + +def test_default_billing_capabilities_is_operator_only(isolated_pool) -> None: + """Pre-trust beta default: agents seeded with no explicit + capabilities project to passthrough-only.""" + registry = _registry(isolated_pool) + registry.upsert( + BuyerAgent( + agent_url="https://x/", + display_name="x", + status="active", + # billing_capabilities defaults to frozenset({"operator"}). + ) + ) + result = asyncio.run(registry.resolve_by_agent_url("https://x/")) + assert result is not None + assert result.billing_capabilities == frozenset({"operator"}) + + +def test_round_trip_with_no_optional_fields(isolated_pool) -> None: + """Minimal seed (no terms, no allowed_brands, default ext) round + trips without losing field presence.""" + registry = _registry(isolated_pool) + registry.upsert( + BuyerAgent( + agent_url="https://minimal/", + display_name="Minimal", + status="active", + ) + ) + result = asyncio.run(registry.resolve_by_agent_url("https://minimal/")) + assert result is not None + assert result.default_account_terms is None + assert result.allowed_brands is None + assert result.ext == {}