diff --git a/src/adcp/decisioning/__init__.py b/src/adcp/decisioning/__init__.py index 221902db..bc8dcbc3 100644 --- a/src/adcp/decisioning/__init__.py +++ b/src/adcp/decisioning/__init__.py @@ -114,7 +114,7 @@ def create_media_buy( DecisioningCapabilities, DecisioningPlatform, ) -from adcp.decisioning.platform_router import PlatformRouter +from adcp.decisioning.platform_router import LazyPlatformRouter, PlatformRouter from adcp.decisioning.property_list import ( PropertyListFetcher, filter_products_by_property_list, @@ -326,6 +326,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "OAuthCredential", "PermissionDeniedError", "PgTaskRegistry", + "LazyPlatformRouter", "PlatformRouter", "PostgresTaskRegistry", "Proposal", diff --git a/src/adcp/decisioning/platform_router.py b/src/adcp/decisioning/platform_router.py index 1e06ee14..dbcbdf4c 100644 --- a/src/adcp/decisioning/platform_router.py +++ b/src/adcp/decisioning/platform_router.py @@ -96,7 +96,9 @@ import asyncio import inspect -from collections.abc import Mapping +import time +from collections import OrderedDict +from collections.abc import Awaitable, Callable, Mapping from typing import TYPE_CHECKING, Any from adcp.decisioning.platform import ( @@ -458,7 +460,7 @@ async def get_products(self, *args: Any, **kwargs: Any) -> Any: manager = self._proposal_managers.get(tenant_id) if manager is not None: - method_name = self._select_proposal_method(manager, args, kwargs) + method_name = _select_proposal_method(manager, args, kwargs) method = getattr(manager, method_name) if inspect.iscoroutinefunction(method): return await method(*args, **kwargs) @@ -473,49 +475,6 @@ async def get_products(self, *args: Any, **kwargs: Any) -> Any: return await method(*args, **kwargs) return await asyncio.to_thread(method, *args, **kwargs) - def _select_proposal_method( - self, - manager: ProposalManager, - args: tuple[Any, ...], - kwargs: Mapping[str, Any], - ) -> str: - """Choose between ``get_products`` and ``refine_products`` on - the wired :class:`ProposalManager`. - - Refine is dispatched only when all three conditions hold: - - 1. The request's ``buying_mode`` is ``'refine'``. - 2. The manager's ``capabilities.refine`` flag is True. - 3. The manager subclass implements ``refine_products`` - (``hasattr`` covers the Protocol's "present-or-absent" - semantics). - - Otherwise routes to ``get_products``. Adopters whose - ``get_products`` handler also handles refine internally keep - working without declaring the refine capability. - - ``buying_mode`` is read off the request — conventionally the - first positional argument of ``get_products(req, ctx)``, or - the ``req=`` kwarg. - """ - req: Any = kwargs.get("req") - if req is None and args: - req = args[0] - buying_mode = getattr(req, "buying_mode", None) - # ``buying_mode`` may be a string or a generated enum (the - # Pydantic model coerces). Normalize via ``getattr(.., 'value', - # buying_mode)`` so both shapes compare cleanly. - buying_mode_str = getattr(buying_mode, "value", buying_mode) - if buying_mode_str != "refine": - return "get_products" - caps = getattr(manager, "capabilities", None) - refine_supported = bool(getattr(caps, "refine", False)) - if not refine_supported: - return "get_products" - if not hasattr(manager, "refine_products"): - return "get_products" - return "refine_products" - def _make_delegate(self, method_name: str) -> Any: """Create a delegating callable for ``method_name``. @@ -586,4 +545,449 @@ def proposal_manager_for_tenant(self, tenant_id: str) -> ProposalManager | None: return self._proposal_managers.get(tenant_id) -__all__ = ["PlatformRouter"] +#: Type alias for a :class:`LazyPlatformRouter` factory. +#: +#: The factory takes a ``tenant_id`` string and returns a +#: :class:`DecisioningPlatform`. It may be sync or async — both shapes +#: work because the router awaits the result via +#: :func:`inspect.isawaitable`. +#: +#: Example (sync):: +#: +#: def factory(tenant_id: str) -> DecisioningPlatform: +#: return MockSellerPlatform(tenant_id) +#: +#: Example (async — typical for SDK auth handshakes):: +#: +#: async def factory(tenant_id: str) -> DecisioningPlatform: +#: cfg = await load_tenant_config(tenant_id) +#: return GamPlatform(cfg) +PlatformFactory = Callable[[str], DecisioningPlatform | Awaitable[DecisioningPlatform]] + + +# Sentinel for cache miss — distinct from any value the cache might +# legitimately hold (None is rejected upstream). Same shape as +# :data:`adcp.server.tenant_router._CACHE_MISS`. +_LAZY_CACHE_MISS: object = object() + + +class LazyPlatformRouter(DecisioningPlatform): + """Per-tenant ``DecisioningPlatform`` constructed on first request. + + A :class:`PlatformRouter` variant that defers building per-tenant + platforms to first-request lookup, with a bounded LRU + TTL cache. + Drop-in replacement: ``isinstance(router, DecisioningPlatform)`` is + true, ``serve()`` accepts it identically, and it shares the same + ``ctx.account.metadata['tenant_id']`` resolution path. + + **When to reach for this over** :class:`PlatformRouter`: + + * **N tenants × per-tenant SDK auth handshake.** Eagerly building + every platform at boot scales O(N) and the auth handshake (e.g., + Google Ad Manager service-account, Kevel API key) typically does + network I/O — so 50-500 tenants means the boot path either takes + minutes or you write your own lazy layer. + * **Hot-add / hot-rotate of tenants.** Today's eager router pins + every platform at construction; adding a new tenant requires a + restart. The lazy router builds on first request, with + :meth:`invalidate` for explicit eviction when a tenant rotates. + * **Memory profile under tenant churn.** The eager router holds + every platform for the process lifetime. The lazy router's + bounded cache evicts platforms for inactive tenants — strictly + safer for long-lived processes. + + **Async factory.** Building per-tenant adapters typically does I/O. + The factory may be sync or async; the router awaits at call time + (matches :class:`adcp.server.CallableSubdomainTenantRouter`'s + convention). Sync factories that block the event loop should be + refactored to async or routed through :func:`asyncio.to_thread` + inside the factory. + + Example:: + + from adcp.decisioning import LazyPlatformRouter, DecisioningCapabilities, serve + + async def build_platform(tenant_id: str) -> DecisioningPlatform: + cfg = await load_tenant_config(tenant_id) + if cfg.adapter == "google_ad_manager": + return WonderstruckGamPlatform(cfg) + elif cfg.adapter == "kevel": + return KevelPlatform(cfg) + return MockSellerPlatform(cfg) + + router = LazyPlatformRouter( + accounts=tenant_routing_account_store, + factory=build_platform, + capabilities=DecisioningCapabilities(specialisms=[...]), + cache_size=256, # default + cache_ttl_seconds=3600.0, # default; 0 = size-only eviction + ) + + serve(router, ...) + + **Invalidation.** Adopters call :meth:`invalidate` from tenant + rotation / deactivation paths:: + + router.invalidate("tenant-a") # specific tenant + router.invalidate() # all platforms — ops "drop everything" + + Behavior on ``invalidate`` of an in-flight request: the request + that already grabbed the platform reference completes normally + (caller holds the ref); the next request gets a fresh build. No + request cancellation. Mirrors + :class:`adcp.server.CallableSubdomainTenantRouter`'s contract. + + **Thundering herd.** If two concurrent requests for the same cold + tenant hit, both await the factory; asyncio cooperative scheduling + means no corruption (last-write wins, both refs are equivalent), + but the auth handshake runs 2x. Singleflight (one-build-per-tenant + under contention) is intentionally NOT in v1 — adopters reporting + DB pressure / API rate-limit spikes is the trigger to add it. + + **Capabilities** are adopter-supplied (the union of what every + tenant's platform serves). The router can't introspect children at + boot — it doesn't know what tenants exist yet — so the adopter is + the source of truth here. + + :param accounts: The adopter's :class:`AccountStore`. Same role as + :class:`PlatformRouter`: resolves every request to an + :class:`Account` whose ``metadata['tenant_id']`` keys the + factory. + :param factory: Callable taking a ``tenant_id`` string and + returning a :class:`DecisioningPlatform` (sync or async). Must + not return ``None`` — return a typed + :class:`adcp.decisioning.types.AdcpError` raise from inside the + factory if the tenant is invalid. + :param capabilities: The router's wire-shape capability declaration + — should be the union of every child platform's specialisms. + :param proposal_managers: Optional ``{tenant_id: ProposalManager}`` + — eager (managers are dict-cheap to hold). Per-tenant + ``get_products`` routes to the manager when wired; otherwise + falls through to the lazily-resolved platform's + ``get_products``. + :param cache_size: Maximum number of cached :class:`DecisioningPlatform` + instances. Bounded LRU eviction past this size. Default 256. + Adopters with more concurrent active tenants override. + :param cache_ttl_seconds: Per-entry TTL in seconds. ``0`` means + size-only eviction (no time-based expiry). Default 3600.0 + (one hour). Distinct from + :class:`adcp.server.CallableSubdomainTenantRouter` which + rejects ``0`` — there, *tenants* go stale; here, *platform + adapters* don't, *unless* your factory reads mutable config + (rotating API keys, adapter selection driven by a config + store). In that case, override to a value that bounds your + rotation lag, or call :meth:`invalidate` from your rotation + path. + + :raises ValueError: when ``cache_size <= 0`` or + ``cache_ttl_seconds < 0``. + """ + + def __init__( + self, + *, + accounts: AccountStore[Any], + factory: PlatformFactory, + capabilities: DecisioningCapabilities, + proposal_managers: Mapping[str, ProposalManager] | None = None, + cache_size: int = 256, + cache_ttl_seconds: float = 3600.0, + ) -> None: + if cache_size <= 0: + raise ValueError( + f"cache_size must be > 0, got {cache_size}. The whole point of " + "LazyPlatformRouter is to bound resident memory; an unbounded " + "cache would re-introduce the eager-router's leak profile." + ) + if cache_ttl_seconds < 0: + raise ValueError( + f"cache_ttl_seconds must be >= 0, got {cache_ttl_seconds}. " + "Pass 0 for size-only eviction (no time-based expiry)." + ) + + self.accounts = accounts + self.capabilities = capabilities + self._factory = factory + self._proposal_managers: dict[str, ProposalManager] = dict(proposal_managers or {}) + + self._cache_size = cache_size + self._cache_ttl = cache_ttl_seconds + # OrderedDict gives LRU-by-move-to-end and bounded popitem(last=False). + # Entry: (DecisioningPlatform, expires_at_monotonic). When ttl=0, + # expires_at = math.inf so the time check never trips. + self._cache: OrderedDict[str, tuple[DecisioningPlatform, float]] = OrderedDict() + # Per-tenant generation counter. Bumped by :meth:`invalidate` + # (specific tenant) and the global counter is bumped by the + # ``invalidate(None)`` flush. :meth:`_resolve_platform` + # snapshots the (tenant, global) generation pair before + # awaiting the factory and refuses to cache a build that lost + # the rollover race — otherwise a slow build outracing an + # ``invalidate`` call would silently resurrect the stale + # platform after operators thought it was gone. + self._tenant_generations: dict[str, int] = {} + self._global_generation: int = 0 + + # Synthesize delegating methods, mirroring PlatformRouter's + # construction. ``get_products`` is special-cased below for + # proposal_managers routing. + for method_name in sorted(_all_specialism_methods()): + if method_name in _ACCOUNT_STORE_METHODS: + continue + if method_name == "get_products": + continue + self.__dict__[method_name] = self._make_delegate(method_name) + + # ----- public introspection / control -------------------------------- + + @property + def cached_tenants(self) -> frozenset[str]: + """The set of tenant ids currently in the cache. + + Read-only snapshot; mutations to the cache after this property + is read are not reflected. + """ + return frozenset(self._cache) + + def invalidate(self, tenant_id: str | None = None) -> None: + """Drop a cached platform (or every cached platform). + + Adopters call this from tenant rotation / deactivation paths to + evict before the TTL fires. Safe to call when the tenant isn't + currently cached (no-op). + + :param tenant_id: Specific tenant to evict. ``None`` clears the + entire cache. + """ + if tenant_id is None: + self._cache.clear() + self._global_generation += 1 + return + self._cache.pop(tenant_id, None) + self._tenant_generations[tenant_id] = self._tenant_generations.get(tenant_id, 0) + 1 + + # ----- per-tenant dispatch ------------------------------------------- + + async def _resolve_platform(self, tenant_id: str) -> DecisioningPlatform: + """Return the platform for ``tenant_id``, building via the factory on miss. + + The factory may be sync or async; the result is awaited if + awaitable. Cache writes happen after a successful build — a + factory that raises is NOT cached, so the next request retries. + + **Invalidate-during-build race.** :meth:`invalidate` bumps a + per-tenant or global generation counter. This method snapshots + both before awaiting the factory and refuses to cache the + result if either advanced — the freshly-built platform is + returned to the in-flight caller (which already paid for it), + but the cache slot stays empty so the next request rebuilds. + Without this, a slow factory racing an ``invalidate(tenant_id)`` + would silently resurrect the platform after operators thought + it was gone. + """ + cached = self._cache_get(tenant_id) + if cached is not _LAZY_CACHE_MISS: + return cached # type: ignore[return-value] + + tenant_gen_at_start = self._tenant_generations.get(tenant_id, 0) + global_gen_at_start = self._global_generation + + result = self._factory(tenant_id) + if inspect.isawaitable(result): + result = await result + + if result is None: + raise AdcpError( + "ACCOUNT_NOT_FOUND", + message=( + f"LazyPlatformRouter factory returned None for " + f"tenant_id={tenant_id!r}. The factory must return a " + "DecisioningPlatform (the lazy-router equivalent of an " + "unknown tenant in PlatformRouter.platforms). Raise " + "AdcpError from inside the factory if the tenant should " + "be rejected." + ), + recovery="terminal", + field="account.metadata.tenant_id", + ) + + # Type guard: the factory's return type union is checked at + # static analysis; runtime mis-typed returns surface here as + # validation rather than silent corruption. + if not isinstance(result, DecisioningPlatform): + raise AdcpError( + "INTERNAL_ERROR", + message=( + f"LazyPlatformRouter factory returned a " + f"{type(result).__name__!r} for tenant_id={tenant_id!r}; " + "expected a DecisioningPlatform instance." + ), + recovery="terminal", + ) + + # Skip the cache write if invalidation raced past us. + invalidated = ( + self._tenant_generations.get(tenant_id, 0) != tenant_gen_at_start + or self._global_generation != global_gen_at_start + ) + if not invalidated: + self._cache_put(tenant_id, result) + return result + + def _cache_get(self, tenant_id: str) -> DecisioningPlatform | object: + entry = self._cache.get(tenant_id) + if entry is None: + return _LAZY_CACHE_MISS + platform, expires_at = entry + if self._cache_ttl > 0 and time.monotonic() > expires_at: + self._cache.pop(tenant_id, None) + return _LAZY_CACHE_MISS + # LRU touch — most-recently used to the end. + self._cache.move_to_end(tenant_id) + return platform + + def _cache_put(self, tenant_id: str, platform: DecisioningPlatform) -> None: + # ttl=0 → never expire; sort everything by LRU only. Use +inf + # so the same time-check branch above stays trivial. + if self._cache_ttl > 0: + expires_at = time.monotonic() + self._cache_ttl + else: + expires_at = float("inf") + self._cache[tenant_id] = (platform, expires_at) + self._cache.move_to_end(tenant_id) + while len(self._cache) > self._cache_size: + self._cache.popitem(last=False) + + async def _platform_for_method( + self, + ctx: RequestContext[Any], + method_name: str, + ) -> DecisioningPlatform: + """Async equivalent of :meth:`PlatformRouter._platform_for`. + + :raises AdcpError: ``ACCOUNT_NOT_FOUND`` when the factory rejects + the tenant (returns None / raises). ``UNSUPPORTED_FEATURE`` + when the platform exists but doesn't implement the method. + """ + tenant_id = _tenant_id_from_ctx(ctx) + platform = await self._resolve_platform(tenant_id) + method = getattr(platform, method_name, None) + if method is None or not callable(method): + raise AdcpError( + "UNSUPPORTED_FEATURE", + message=( + f"Tenant {tenant_id!r}'s platform " + f"({type(platform).__name__}) does not implement " + f"{method_name!r}. The router advertises this method " + "because at least one tenant supports it, but this " + "tenant's platform doesn't." + ), + recovery="terminal", + ) + return platform + + def _make_delegate(self, method_name: str) -> Any: + """Create a delegating callable for ``method_name``. + + Async closure that resolves the tenant, awaits the factory if + cold, looks up the method, and delegates with sync/async + handling matching :meth:`PlatformRouter._make_delegate`. + """ + router = self + + async def _delegate(*args: Any, **kwargs: Any) -> Any: + ctx = _resolve_ctx_from_args(args, kwargs) + platform = await router._platform_for_method(ctx, method_name) + method = getattr(platform, method_name) + if inspect.iscoroutinefunction(method): + return await method(*args, **kwargs) + return await asyncio.to_thread(method, *args, **kwargs) + + _delegate.__name__ = method_name + _delegate.__qualname__ = f"LazyPlatformRouter.{method_name}" + return _delegate + + async def refine_get_products(self, *args: Any, **kwargs: Any) -> Any: + """Refine entry point — delegates to :meth:`get_products`. + + Mirrors :meth:`PlatformRouter.refine_get_products`; the handler's + refine pathway dispatches via + ``_invoke_platform_method(platform, "refine_get_products", ...)`` + when the platform's :func:`has_refine_support` returns True. + """ + return await self.get_products(*args, **kwargs) + + async def get_products(self, *args: Any, **kwargs: Any) -> Any: + """Per-tenant ``get_products`` dispatch. + + Resolves the tenant, then routes to the wired + :class:`ProposalManager` when one exists for the tenant + (refine vs. plain selection mirrors PlatformRouter); otherwise + falls through to the lazily-resolved platform's + ``get_products``. + """ + ctx = _resolve_ctx_from_args(args, kwargs) + tenant_id = _tenant_id_from_ctx(ctx) + manager = self._proposal_managers.get(tenant_id) + + if manager is not None: + method_name = _select_proposal_method(manager, args, kwargs) + method = getattr(manager, method_name) + if inspect.iscoroutinefunction(method): + return await method(*args, **kwargs) + return await asyncio.to_thread(method, *args, **kwargs) + + platform = await self._platform_for_method(ctx, "get_products") + method = getattr(platform, "get_products") + if inspect.iscoroutinefunction(method): + return await method(*args, **kwargs) + return await asyncio.to_thread(method, *args, **kwargs) + + def proposal_manager_for_tenant(self, tenant_id: str) -> ProposalManager | None: + """Return the :class:`ProposalManager` for ``tenant_id``, or ``None``.""" + return self._proposal_managers.get(tenant_id) + + async def platform_for_tenant(self, tenant_id: str) -> DecisioningPlatform: + """Return the platform for ``tenant_id``, building via the factory if needed. + + Sibling-API parity with :meth:`PlatformRouter.platform_for_tenant` + for adopters with admin / health endpoints that need direct + access to a tenant's platform. Triggers the factory on cache + miss (this is a write-path call, not just a getter). + + :raises AdcpError: ``ACCOUNT_NOT_FOUND`` when the factory + rejects the tenant. ``INTERNAL_ERROR`` when the factory + returns a non-:class:`DecisioningPlatform` value. + """ + return await self._resolve_platform(tenant_id) + + +def _select_proposal_method( + manager: ProposalManager, + args: tuple[Any, ...], + kwargs: Mapping[str, Any], +) -> str: + """Choose between ``get_products`` and ``refine_products`` on a manager. + + Extracted from :class:`PlatformRouter` so :class:`LazyPlatformRouter` + reuses the same routing logic without a class-method on the eager + router. The three conditions for refine: ``buying_mode == 'refine'``, + manager declares ``capabilities.refine``, and + ``hasattr(manager, "refine_products")``. + """ + req: Any = kwargs.get("req") + if req is None and args: + req = args[0] + buying_mode = getattr(req, "buying_mode", None) + buying_mode_str = getattr(buying_mode, "value", buying_mode) + if buying_mode_str != "refine": + return "get_products" + caps = getattr(manager, "capabilities", None) + refine_supported = bool(getattr(caps, "refine", False)) + if not refine_supported: + return "get_products" + if not hasattr(manager, "refine_products"): + return "get_products" + return "refine_products" + + +__all__ = ["LazyPlatformRouter", "PlatformRouter"] diff --git a/tests/test_lazy_platform_router.py b/tests/test_lazy_platform_router.py new file mode 100644 index 00000000..f34ca32a --- /dev/null +++ b/tests/test_lazy_platform_router.py @@ -0,0 +1,712 @@ +"""Tests for :class:`adcp.decisioning.LazyPlatformRouter`. + +The lazy variant of :class:`PlatformRouter`: defers per-tenant +platform construction to first request, with a bounded LRU + TTL +cache. These tests cover: + +* Drop-in compatibility — ``isinstance(router, DecisioningPlatform)``. +* Lazy construction — factory called once per cold tenant. +* Cache semantics — second call hits cache; LRU eviction past + ``cache_size``; TTL expiry; ``cache_ttl_seconds=0`` size-only mode. +* Async + sync factory; async + sync child platform methods. +* ``invalidate(tenant_id)`` and ``invalidate()``. +* Construction validation — ``cache_size <= 0`` and + ``cache_ttl_seconds < 0`` rejected. +* Factory rejection paths — ``None`` return, wrong type return. +* ``proposal_managers`` routing through the lazy resolver. +""" + +from __future__ import annotations + +import asyncio +from typing import Any + +import pytest + +from adcp.decisioning import ( + AdcpError, + DecisioningCapabilities, + DecisioningPlatform, + ExplicitAccounts, + LazyPlatformRouter, + RequestContext, + SalesPlatform, +) +from adcp.decisioning.types import Account + + +def _capabilities(specialisms: list[str]) -> DecisioningCapabilities: + from adcp.decisioning.capabilities import ( + Adcp, + IdempotencyUnsupported, + SupportedProtocol, + ) + + return DecisioningCapabilities( + specialisms=specialisms, + adcp=Adcp( + major_versions=[3], + idempotency=IdempotencyUnsupported(supported=False), + ), + supported_protocols=[SupportedProtocol.media_buy], + ) + + +class _SyncSalesPlatform(DecisioningPlatform, SalesPlatform): + """Sync child platform — minimum sales-non-guaranteed surface.""" + + def __init__(self, tag: str) -> None: + self.tag = tag + self.calls: list[tuple[str, Any]] = [] + + capabilities = _capabilities(["sales-non-guaranteed"]) + accounts = ExplicitAccounts(loader=lambda _id: Account(id=_id)) + + def get_products(self, req: Any, ctx: RequestContext[Any]) -> dict[str, Any]: + self.calls.append(("get_products", ctx.account.id)) + return {"products": [{"product_id": f"prod-{self.tag}"}]} + + def create_media_buy(self, req: Any, ctx: RequestContext[Any]) -> dict[str, Any]: + self.calls.append(("create_media_buy", ctx.account.id)) + return {"media_buy_id": f"mb-{self.tag}", "status": "active"} + + def update_media_buy( + self, media_buy_id: str, patch: Any, ctx: RequestContext[Any] + ) -> dict[str, Any]: + self.calls.append(("update_media_buy", ctx.account.id)) + return {"media_buy_id": media_buy_id, "status": "active"} + + def sync_creatives(self, req: Any, ctx: RequestContext[Any]) -> dict[str, Any]: + self.calls.append(("sync_creatives", ctx.account.id)) + return {"creatives": []} + + def get_media_buy_delivery(self, req: Any, ctx: RequestContext[Any]) -> dict[str, Any]: + self.calls.append(("get_media_buy_delivery", ctx.account.id)) + return {"media_buy_deliveries": []} + + +class _AsyncSalesPlatform(_SyncSalesPlatform): + async def get_products( # type: ignore[override] + self, req: Any, ctx: RequestContext[Any] + ) -> dict[str, Any]: + self.calls.append(("get_products", ctx.account.id)) + return {"products": [{"product_id": f"prod-async-{self.tag}"}]} + + async def create_media_buy( # type: ignore[override] + self, req: Any, ctx: RequestContext[Any] + ) -> dict[str, Any]: + self.calls.append(("create_media_buy", ctx.account.id)) + return {"media_buy_id": f"mb-async-{self.tag}", "status": "active"} + + +def _make_routing_account_store( + account_to_tenant: dict[str, str], +) -> ExplicitAccounts[Any]: + def _load(account_id: str) -> Account[Any]: + if account_id not in account_to_tenant: + raise AdcpError( + "ACCOUNT_NOT_FOUND", + message=f"unknown account {account_id!r}", + recovery="terminal", + ) + return Account( + id=account_id, + metadata={"tenant_id": account_to_tenant[account_id]}, + ) + + return ExplicitAccounts(loader=_load) + + +def _make_ctx(account: Account[Any]) -> RequestContext[Any]: + return RequestContext(account=account) + + +# --------------------------------------------------------------------------- +# Drop-in compatibility +# --------------------------------------------------------------------------- + + +def test_lazy_router_is_decisioning_platform() -> None: + accounts = _make_routing_account_store({"acct_a": "tenant-a"}) + router = LazyPlatformRouter( + accounts=accounts, + factory=lambda tid: _SyncSalesPlatform(tag=tid), + capabilities=_capabilities(["sales-non-guaranteed"]), + ) + assert isinstance(router, DecisioningPlatform) + + +# --------------------------------------------------------------------------- +# Lazy construction + cache hit/miss +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_factory_called_once_per_cold_tenant() -> None: + """First request for tenant-a builds; second hits cache.""" + accounts = _make_routing_account_store({"a1": "tenant-a"}) + build_count = {"a": 0} + + def factory(tid: str) -> DecisioningPlatform: + build_count["a"] += 1 + return _SyncSalesPlatform(tag=tid) + + router = LazyPlatformRouter( + accounts=accounts, + factory=factory, + capabilities=_capabilities(["sales-non-guaranteed"]), + ) + ctx = _make_ctx(Account(id="a1", metadata={"tenant_id": "tenant-a"})) + + await router.create_media_buy({}, ctx) + await router.create_media_buy({}, ctx) + await router.create_media_buy({}, ctx) + + assert build_count["a"] == 1 + assert "tenant-a" in router.cached_tenants + + +@pytest.mark.asyncio +async def test_async_factory_awaited() -> None: + """Factory may be async — router awaits at call time.""" + accounts = _make_routing_account_store({"a1": "tenant-a"}) + + async def factory(tid: str) -> DecisioningPlatform: + await asyncio.sleep(0) + return _AsyncSalesPlatform(tag=tid) + + router = LazyPlatformRouter( + accounts=accounts, + factory=factory, + capabilities=_capabilities(["sales-non-guaranteed"]), + ) + ctx = _make_ctx(Account(id="a1", metadata={"tenant_id": "tenant-a"})) + + result = await router.create_media_buy({}, ctx) + assert result["media_buy_id"] == "mb-async-tenant-a" + + +@pytest.mark.asyncio +async def test_sync_child_via_to_thread() -> None: + """Sync child platform method runs through asyncio.to_thread — + matches PlatformRouter's behaviour.""" + accounts = _make_routing_account_store({"a1": "tenant-a"}) + router = LazyPlatformRouter( + accounts=accounts, + factory=lambda tid: _SyncSalesPlatform(tag=tid), + capabilities=_capabilities(["sales-non-guaranteed"]), + ) + ctx = _make_ctx(Account(id="a1", metadata={"tenant_id": "tenant-a"})) + + result = await router.get_products({}, ctx) + assert result["products"][0]["product_id"] == "prod-tenant-a" + + +# --------------------------------------------------------------------------- +# Cache eviction: size-bounded LRU +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_cache_size_bound_evicts_lru() -> None: + """With cache_size=2, building a third tenant evicts the LRU one.""" + accounts = _make_routing_account_store({"a1": "tenant-a", "b1": "tenant-b", "c1": "tenant-c"}) + builds: list[str] = [] + + def factory(tid: str) -> DecisioningPlatform: + builds.append(tid) + return _SyncSalesPlatform(tag=tid) + + router = LazyPlatformRouter( + accounts=accounts, + factory=factory, + capabilities=_capabilities(["sales-non-guaranteed"]), + cache_size=2, + cache_ttl_seconds=0.0, # no TTL — exercise size-only eviction + ) + + ctx_a = _make_ctx(Account(id="a1", metadata={"tenant_id": "tenant-a"})) + ctx_b = _make_ctx(Account(id="b1", metadata={"tenant_id": "tenant-b"})) + ctx_c = _make_ctx(Account(id="c1", metadata={"tenant_id": "tenant-c"})) + + await router.create_media_buy({}, ctx_a) + await router.create_media_buy({}, ctx_b) + # tenant-a was the LRU — adding tenant-c evicts it. + await router.create_media_buy({}, ctx_c) + assert router.cached_tenants == {"tenant-b", "tenant-c"} + # Re-request tenant-a — factory rebuilds. + await router.create_media_buy({}, ctx_a) + assert builds == ["tenant-a", "tenant-b", "tenant-c", "tenant-a"] + + +@pytest.mark.asyncio +async def test_cache_ttl_zero_disables_time_expiry() -> None: + """``cache_ttl_seconds=0`` keeps platforms forever (size-only).""" + accounts = _make_routing_account_store({"a1": "tenant-a"}) + builds = {"n": 0} + + def factory(tid: str) -> DecisioningPlatform: + builds["n"] += 1 + return _SyncSalesPlatform(tag=tid) + + router = LazyPlatformRouter( + accounts=accounts, + factory=factory, + capabilities=_capabilities(["sales-non-guaranteed"]), + cache_size=10, + cache_ttl_seconds=0.0, + ) + ctx = _make_ctx(Account(id="a1", metadata={"tenant_id": "tenant-a"})) + + for _ in range(5): + await router.create_media_buy({}, ctx) + assert builds["n"] == 1 + + +@pytest.mark.asyncio +async def test_cache_ttl_expiry_evicts(monkeypatch: pytest.MonkeyPatch) -> None: + """An entry past its TTL is rebuilt on next access.""" + import adcp.decisioning.platform_router as pr_mod + + accounts = _make_routing_account_store({"a1": "tenant-a"}) + builds = {"n": 0} + + def factory(tid: str) -> DecisioningPlatform: + builds["n"] += 1 + return _SyncSalesPlatform(tag=tid) + + fake_clock = {"t": 1000.0} + monkeypatch.setattr(pr_mod.time, "monotonic", lambda: fake_clock["t"]) + + router = LazyPlatformRouter( + accounts=accounts, + factory=factory, + capabilities=_capabilities(["sales-non-guaranteed"]), + cache_size=10, + cache_ttl_seconds=60.0, + ) + ctx = _make_ctx(Account(id="a1", metadata={"tenant_id": "tenant-a"})) + + await router.create_media_buy({}, ctx) + assert builds["n"] == 1 + + # Advance past TTL + fake_clock["t"] = 1100.0 + await router.create_media_buy({}, ctx) + assert builds["n"] == 2 + + +# --------------------------------------------------------------------------- +# invalidate() +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_invalidate_specific_tenant_forces_rebuild() -> None: + accounts = _make_routing_account_store({"a1": "tenant-a", "b1": "tenant-b"}) + builds: list[str] = [] + + def factory(tid: str) -> DecisioningPlatform: + builds.append(tid) + return _SyncSalesPlatform(tag=tid) + + router = LazyPlatformRouter( + accounts=accounts, + factory=factory, + capabilities=_capabilities(["sales-non-guaranteed"]), + ) + ctx_a = _make_ctx(Account(id="a1", metadata={"tenant_id": "tenant-a"})) + ctx_b = _make_ctx(Account(id="b1", metadata={"tenant_id": "tenant-b"})) + + await router.create_media_buy({}, ctx_a) + await router.create_media_buy({}, ctx_b) + assert builds == ["tenant-a", "tenant-b"] + + router.invalidate("tenant-a") + assert "tenant-a" not in router.cached_tenants + assert "tenant-b" in router.cached_tenants + + await router.create_media_buy({}, ctx_a) + assert builds == ["tenant-a", "tenant-b", "tenant-a"] + + +@pytest.mark.asyncio +async def test_invalidate_all_clears_cache() -> None: + accounts = _make_routing_account_store({"a1": "tenant-a", "b1": "tenant-b"}) + + def factory(tid: str) -> DecisioningPlatform: + return _SyncSalesPlatform(tag=tid) + + router = LazyPlatformRouter( + accounts=accounts, + factory=factory, + capabilities=_capabilities(["sales-non-guaranteed"]), + ) + ctx_a = _make_ctx(Account(id="a1", metadata={"tenant_id": "tenant-a"})) + ctx_b = _make_ctx(Account(id="b1", metadata={"tenant_id": "tenant-b"})) + + await router.create_media_buy({}, ctx_a) + await router.create_media_buy({}, ctx_b) + assert router.cached_tenants == {"tenant-a", "tenant-b"} + + router.invalidate() + assert router.cached_tenants == frozenset() + + +def test_invalidate_unknown_tenant_is_noop() -> None: + accounts = _make_routing_account_store({}) + router = LazyPlatformRouter( + accounts=accounts, + factory=lambda tid: _SyncSalesPlatform(tag=tid), + capabilities=_capabilities(["sales-non-guaranteed"]), + ) + router.invalidate("never-cached") # no raise + + +# --------------------------------------------------------------------------- +# Construction validation +# --------------------------------------------------------------------------- + + +class TestConstructionValidation: + def test_cache_size_zero_rejected(self) -> None: + with pytest.raises(ValueError, match="cache_size"): + LazyPlatformRouter( + accounts=_make_routing_account_store({}), + factory=lambda _t: _SyncSalesPlatform(tag="x"), + capabilities=_capabilities(["sales-non-guaranteed"]), + cache_size=0, + ) + + def test_cache_size_negative_rejected(self) -> None: + with pytest.raises(ValueError, match="cache_size"): + LazyPlatformRouter( + accounts=_make_routing_account_store({}), + factory=lambda _t: _SyncSalesPlatform(tag="x"), + capabilities=_capabilities(["sales-non-guaranteed"]), + cache_size=-1, + ) + + def test_cache_ttl_negative_rejected(self) -> None: + with pytest.raises(ValueError, match="cache_ttl_seconds"): + LazyPlatformRouter( + accounts=_make_routing_account_store({}), + factory=lambda _t: _SyncSalesPlatform(tag="x"), + capabilities=_capabilities(["sales-non-guaranteed"]), + cache_ttl_seconds=-1.0, + ) + + def test_cache_ttl_zero_accepted(self) -> None: + """Distinct from CallableSubdomainTenantRouter — platform + adapters don't go stale, so TTL=0 (size-only eviction) is a + valid mode.""" + LazyPlatformRouter( + accounts=_make_routing_account_store({}), + factory=lambda _t: _SyncSalesPlatform(tag="x"), + capabilities=_capabilities(["sales-non-guaranteed"]), + cache_ttl_seconds=0.0, + ) + + +# --------------------------------------------------------------------------- +# Factory rejection paths +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_factory_returning_none_raises_account_not_found() -> None: + accounts = _make_routing_account_store({"a1": "tenant-a"}) + router = LazyPlatformRouter( + accounts=accounts, + factory=lambda _tid: None, # type: ignore[arg-type,return-value] + capabilities=_capabilities(["sales-non-guaranteed"]), + ) + ctx = _make_ctx(Account(id="a1", metadata={"tenant_id": "tenant-a"})) + + with pytest.raises(AdcpError) as exc_info: + await router.create_media_buy({}, ctx) + err = exc_info.value + assert err.code == "ACCOUNT_NOT_FOUND" + assert err.recovery == "terminal" + + +@pytest.mark.asyncio +async def test_factory_returning_wrong_type_raises_internal_error() -> None: + accounts = _make_routing_account_store({"a1": "tenant-a"}) + router = LazyPlatformRouter( + accounts=accounts, + factory=lambda _tid: "not a platform", # type: ignore[arg-type,return-value] + capabilities=_capabilities(["sales-non-guaranteed"]), + ) + ctx = _make_ctx(Account(id="a1", metadata={"tenant_id": "tenant-a"})) + + with pytest.raises(AdcpError) as exc_info: + await router.create_media_buy({}, ctx) + err = exc_info.value + assert err.code == "INTERNAL_ERROR" + assert err.recovery == "terminal" + + +@pytest.mark.asyncio +async def test_factory_raise_not_cached() -> None: + """A factory that raises does not cache; next request retries.""" + accounts = _make_routing_account_store({"a1": "tenant-a"}) + attempts = {"n": 0} + + def factory(tid: str) -> DecisioningPlatform: + attempts["n"] += 1 + if attempts["n"] == 1: + raise RuntimeError("transient") + return _SyncSalesPlatform(tag=tid) + + router = LazyPlatformRouter( + accounts=accounts, + factory=factory, + capabilities=_capabilities(["sales-non-guaranteed"]), + ) + ctx = _make_ctx(Account(id="a1", metadata={"tenant_id": "tenant-a"})) + + with pytest.raises(RuntimeError): + await router.create_media_buy({}, ctx) + assert "tenant-a" not in router.cached_tenants + + result = await router.create_media_buy({}, ctx) + assert result["media_buy_id"] == "mb-tenant-a" + assert attempts["n"] == 2 + + +# --------------------------------------------------------------------------- +# Unknown tenant + unsupported method paths +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_unsupported_method_raises_unsupported_feature() -> None: + """The platform doesn't implement audience methods; calling one + raises ``UNSUPPORTED_FEATURE``.""" + accounts = _make_routing_account_store({"a1": "tenant-a"}) + router = LazyPlatformRouter( + accounts=accounts, + factory=lambda tid: _SyncSalesPlatform(tag=tid), + capabilities=_capabilities(["sales-non-guaranteed", "audience-sync"]), + ) + ctx = _make_ctx(Account(id="a1", metadata={"tenant_id": "tenant-a"})) + + with pytest.raises(AdcpError) as exc_info: + await router.sync_audiences({}, ctx) # type: ignore[attr-defined] + assert exc_info.value.code == "UNSUPPORTED_FEATURE" + + +@pytest.mark.asyncio +async def test_invalidate_during_build_does_not_resurrect() -> None: + """Race contract: ``invalidate(tenant_id)`` while the factory is + in-flight must not resurrect the just-evicted slot when the build + completes. The in-flight caller still gets the platform it paid + for; the cache stays empty so the next request rebuilds.""" + accounts = _make_routing_account_store({"a1": "tenant-a"}) + started = asyncio.Event() + finish = asyncio.Event() + + async def slow_factory(tid: str) -> DecisioningPlatform: + started.set() + await finish.wait() + return _SyncSalesPlatform(tag=tid) + + router = LazyPlatformRouter( + accounts=accounts, + factory=slow_factory, + capabilities=_capabilities(["sales-non-guaranteed"]), + ) + ctx = _make_ctx(Account(id="a1", metadata={"tenant_id": "tenant-a"})) + + task = asyncio.create_task(router.create_media_buy({}, ctx)) + await started.wait() + router.invalidate("tenant-a") + finish.set() + + result = await task # in-flight build completes + assert result["media_buy_id"] == "mb-tenant-a" + # The cache must NOT have resurrected the platform. + assert "tenant-a" not in router.cached_tenants + + +@pytest.mark.asyncio +async def test_invalidate_all_during_build_does_not_resurrect() -> None: + """``invalidate()`` (no arg) must also bump generation so an + in-flight build can't slip back into the cache.""" + accounts = _make_routing_account_store({"a1": "tenant-a"}) + started = asyncio.Event() + finish = asyncio.Event() + + async def slow_factory(tid: str) -> DecisioningPlatform: + started.set() + await finish.wait() + return _SyncSalesPlatform(tag=tid) + + router = LazyPlatformRouter( + accounts=accounts, + factory=slow_factory, + capabilities=_capabilities(["sales-non-guaranteed"]), + ) + ctx = _make_ctx(Account(id="a1", metadata={"tenant_id": "tenant-a"})) + + task = asyncio.create_task(router.create_media_buy({}, ctx)) + await started.wait() + router.invalidate() # global flush mid-build + finish.set() + await task + assert "tenant-a" not in router.cached_tenants + + +@pytest.mark.asyncio +async def test_concurrent_cold_requests_each_build_v1_contract() -> None: + """v1 contract — no singleflight. Two concurrent requests for the + same cold tenant each invoke the factory. Locks the contract; if a + future change adds singleflight, this test fails and the change is + intentional.""" + accounts = _make_routing_account_store({"a1": "tenant-a"}) + builds = {"n": 0} + + async def factory(tid: str) -> DecisioningPlatform: + builds["n"] += 1 + await asyncio.sleep(0.01) # let the second request enter resolve_platform + return _SyncSalesPlatform(tag=tid) + + router = LazyPlatformRouter( + accounts=accounts, + factory=factory, + capabilities=_capabilities(["sales-non-guaranteed"]), + ) + ctx = _make_ctx(Account(id="a1", metadata={"tenant_id": "tenant-a"})) + + results = await asyncio.gather( + router.create_media_buy({}, ctx), + router.create_media_buy({}, ctx), + ) + assert all(r["media_buy_id"] == "mb-tenant-a" for r in results) + assert builds["n"] == 2 + + +# --------------------------------------------------------------------------- +# proposal_managers routing +# --------------------------------------------------------------------------- + + +class _StubProposalManager: + """Minimal :class:`ProposalManager`-shaped stub for routing tests. + Records which method the router invoked.""" + + def __init__(self) -> None: + from adcp.decisioning import ProposalCapabilities + + self.capabilities = ProposalCapabilities( + sales_specialism="sales-non-guaranteed", + refine=False, + ) + self.calls: list[str] = [] + + async def get_products(self, req: Any, ctx: RequestContext[Any]) -> dict[str, Any]: + self.calls.append("get_products") + return {"products": [{"product_id": "manager-prod"}]} + + +@pytest.mark.asyncio +async def test_proposal_manager_routed_for_tenant_with_manager() -> None: + """Tenant with a wired manager → router calls manager.get_products, + not the lazily-built platform.""" + accounts = _make_routing_account_store({"a1": "tenant-a"}) + builds = {"n": 0} + + def factory(tid: str) -> DecisioningPlatform: + builds["n"] += 1 + return _SyncSalesPlatform(tag=tid) + + manager = _StubProposalManager() + router = LazyPlatformRouter( + accounts=accounts, + factory=factory, + capabilities=_capabilities(["sales-non-guaranteed"]), + proposal_managers={"tenant-a": manager}, # type: ignore[dict-item] + ) + ctx = _make_ctx(Account(id="a1", metadata={"tenant_id": "tenant-a"})) + + result = await router.get_products({}, ctx) + assert result["products"][0]["product_id"] == "manager-prod" + assert manager.calls == ["get_products"] + # Manager handled it — factory was never called. + assert builds["n"] == 0 + + +@pytest.mark.asyncio +async def test_proposal_manager_fall_through_when_unwired() -> None: + """Tenant without a manager → router falls through to the + lazily-built platform's get_products, identical to the no-manager + case.""" + accounts = _make_routing_account_store({"b1": "tenant-b"}) + manager = _StubProposalManager() + router = LazyPlatformRouter( + accounts=accounts, + factory=lambda tid: _SyncSalesPlatform(tag=tid), + capabilities=_capabilities(["sales-non-guaranteed"]), + proposal_managers={"tenant-a": manager}, # type: ignore[dict-item] + ) + ctx = _make_ctx(Account(id="b1", metadata={"tenant_id": "tenant-b"})) + + result = await router.get_products({}, ctx) + assert result["products"][0]["product_id"] == "prod-tenant-b" + assert manager.calls == [] + + +def test_proposal_manager_for_tenant_lookup() -> None: + """``proposal_manager_for_tenant`` returns the wired manager + (or None) without touching the platform factory.""" + manager = _StubProposalManager() + router = LazyPlatformRouter( + accounts=_make_routing_account_store({}), + factory=lambda tid: _SyncSalesPlatform(tag=tid), + capabilities=_capabilities(["sales-non-guaranteed"]), + proposal_managers={"tenant-a": manager}, # type: ignore[dict-item] + ) + assert router.proposal_manager_for_tenant("tenant-a") is manager + assert router.proposal_manager_for_tenant("tenant-x") is None + + +# --------------------------------------------------------------------------- +# platform_for_tenant introspection +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_platform_for_tenant_builds_via_factory() -> None: + """Sibling-API parity: ``platform_for_tenant`` triggers the + factory and returns the same instance the cache would serve to + request-path delegations.""" + accounts = _make_routing_account_store({"a1": "tenant-a"}) + router = LazyPlatformRouter( + accounts=accounts, + factory=lambda tid: _SyncSalesPlatform(tag=tid), + capabilities=_capabilities(["sales-non-guaranteed"]), + ) + + platform = await router.platform_for_tenant("tenant-a") + assert isinstance(platform, _SyncSalesPlatform) + assert "tenant-a" in router.cached_tenants + + # Subsequent call returns the same cached instance. + again = await router.platform_for_tenant("tenant-a") + assert again is platform + + +@pytest.mark.asyncio +async def test_missing_tenant_metadata_raises_account_not_found() -> None: + """Account without ``metadata['tenant_id']`` → ACCOUNT_NOT_FOUND.""" + router = LazyPlatformRouter( + accounts=_make_routing_account_store({}), + factory=lambda tid: _SyncSalesPlatform(tag=tid), + capabilities=_capabilities(["sales-non-guaranteed"]), + ) + ctx = _make_ctx(Account(id="acct")) # no metadata + + with pytest.raises(AdcpError) as exc_info: + await router.create_media_buy({}, ctx) + assert exc_info.value.code == "ACCOUNT_NOT_FOUND"