Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/adcp/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,12 @@ async def get_products(params, context=None):
)
from adcp.server.sponsored_intelligence import SponsoredIntelligenceHandler
from adcp.server.tenant_router import (
CallableSubdomainTenantRouter,
InMemorySubdomainTenantRouter,
SubdomainTenantMiddleware,
SubdomainTenantRouter,
Tenant,
TenantResolver,
current_tenant,
)
from adcp.server.test_controller import (
Expand Down Expand Up @@ -204,10 +206,12 @@ async def get_products(params, context=None):
"IdempotencyStore",
"MemoryBackend",
# Subdomain tenant routing
"CallableSubdomainTenantRouter",
"InMemorySubdomainTenantRouter",
"SubdomainTenantMiddleware",
"SubdomainTenantRouter",
"Tenant",
"TenantResolver",
"current_tenant",
# Multi-agent discovery manifest (/.well-known/adcp-agents.json)
"DISCOVERY_PATH",
Expand Down
196 changes: 194 additions & 2 deletions src/adcp/server/tenant_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
* :class:`SubdomainTenantRouter` — runtime-checkable Protocol with
one async ``resolve(host: str) -> Tenant | None`` method.
* :class:`InMemorySubdomainTenantRouter` — reference impl for
dev/test backed by a static ``host → Tenant`` dict. Production
adopters back the Protocol with their tenant table.
dev/test backed by a static ``host → Tenant`` dict.
* :class:`CallableSubdomainTenantRouter` — adopter-callable router
for DB-backed lookups. Adopter writes a single sync-or-async
callable mapping a normalized host to a :class:`Tenant`; the
framework owns host normalization. Optional bounded TTL cache
for hot-path lookups. **Recommended for production multi-tenant
deployments** — replaces ~25 LOC of adopter glue with ~5.
* :class:`SubdomainTenantMiddleware` — Starlette ASGI middleware
that calls the router, stashes the result in a
:class:`contextvars.ContextVar`, and ``404`` s on unknown hosts.
Expand Down Expand Up @@ -84,6 +89,10 @@ def build_context(meta):
from __future__ import annotations

import contextvars
import inspect
import time
from collections import OrderedDict
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable

Expand Down Expand Up @@ -160,6 +169,187 @@ async def resolve(self, host: str) -> Tenant | None:
return self._tenants.get(_normalize_host(host))


# Type alias for adopter-supplied lookup callables. Either sync (returns
# Tenant | None) or async (returns Awaitable[Tenant | None]) is accepted —
# CallableSubdomainTenantRouter awaits at call time. Receives the
# already-normalized (lower-cased + port-stripped) host so adopters don't
# reimplement the parser.
TenantResolver = Callable[[str], "Tenant | None | Awaitable[Tenant | None]"]


class CallableSubdomainTenantRouter:
"""Adopter-callable :class:`SubdomainTenantRouter` for DB-backed lookups.

The adopter passes a single callable mapping a normalized host to a
:class:`Tenant` (or ``None`` for 404). The framework owns host
normalization (lower-case + port-strip), so adopters write only the
lookup itself — typically a single SQL query against their tenant
table.

The callable may be sync or async; the router awaits at call time.

Example::

from sqlalchemy import select
from adcp.server import CallableSubdomainTenantRouter, Tenant

async def lookup(host: str) -> Tenant | None:
subdomain = host.split(".", 1)[0] # 'acme.example.com' -> 'acme'
async with my_db.session() as s:
row = await s.scalar(
select(TenantRow).filter_by(subdomain=subdomain, is_active=True)
)
return Tenant(id=row.tenant_id, display_name=row.name) if row else None

router = CallableSubdomainTenantRouter(lookup)

Optional bounded TTL cache absorbs hot-path lookups without adopters
reimplementing — useful when the resolver hits a remote DB on every
request. Defaults to **no caching** (``cache_size=0``); adopters opt
in with explicit bounds:

::

router = CallableSubdomainTenantRouter(
lookup,
cache_size=1024, # bounded LRU; never grows beyond this
cache_ttl_seconds=60.0, # expire entries after 60s
)

Cache bounds are mandatory when caching is enabled — there is no
"cache forever, unbounded size" mode by design. Tenants come and go
(suspension, deactivation); long-lived caches without TTL hand
adopters a stale-cache footgun. The ``cache_ttl_seconds`` ceiling is
the explicit knob.

**Negative-cache + tenant onboarding race.** When caching is enabled,
``None`` results are cached too (to absorb probing for unknown hosts).
This creates a race on tenant creation: if a probe for
``acme.example.com`` hits at T=0 (host doesn't exist yet) and the
tenant is provisioned at T=1, the cached ``None`` causes 404s for up
to ``cache_ttl_seconds`` afterward. Call ``invalidate(host)`` from
your tenant *creation* path — not only deactivation — to clear the
negative entry immediately::

# on tenant create / re-activate
router.invalidate("acme.example.com")

Memory profile
--------------
Without caching: zero state held by the router. Each ``resolve()``
call awaits the adopter callable directly.

With caching: bounded by ``cache_size`` entries. Maximum memory is
``cache_size × (sizeof(host_str) + sizeof(your_Tenant) + 16)``
where ``sizeof(your_Tenant)`` depends on what you store in
:attr:`Tenant.ext` — the router can't predict it. The cache never
grows beyond ``cache_size`` entries regardless of payload size.
"""

def __init__(
self,
resolver: TenantResolver,
*,
cache_size: int = 0,
cache_ttl_seconds: float = 0.0,
) -> None:
"""Construct the router.

:param resolver: Callable taking a normalized host string and
returning ``Tenant | None`` (sync or async). Receives
already-normalized hosts — lower-cased with any
``:port`` suffix stripped.
:param cache_size: Maximum number of cached lookups. ``0``
disables caching entirely (the adopter callable is awaited
on every request). Must be ``>= 0``.
:param cache_ttl_seconds: Per-entry TTL in seconds. Must be
``> 0`` when ``cache_size > 0``. There is no "cache forever"
mode — see the class docstring for rationale.
:raises ValueError: If ``cache_size > 0`` and
``cache_ttl_seconds <= 0`` (cache requires explicit TTL).
"""
if cache_size < 0:
raise ValueError(f"cache_size must be >= 0, got {cache_size}")
if cache_size > 0 and cache_ttl_seconds <= 0:
raise ValueError(
"cache_ttl_seconds must be > 0 when cache_size > 0; "
"explicit TTL prevents stale-tenant footguns. Pass a "
"value like 60.0 (one-minute cache) to opt in."
)
self._resolver = resolver
self._cache_size = cache_size
self._cache_ttl = cache_ttl_seconds
# OrderedDict gives us LRU-by-move-to-end for free; bounded by
# popitem(last=False) when over cache_size. Each entry is
# (Tenant | None, expires_at_monotonic). Negative results are
# cached too so DOS-style probing doesn't bypass the cache.
self._cache: OrderedDict[str, tuple[Tenant | None, float]] = OrderedDict()

async def resolve(self, host: str) -> Tenant | None:
normalized = _normalize_host(host)

if self._cache_size > 0:
cached = self._cache_get(normalized)
if cached is not _CACHE_MISS:
return cached # type: ignore[return-value]

result = self._resolver(normalized)
if inspect.isawaitable(result):
result = await result

if self._cache_size > 0:
self._cache_put(normalized, result)

return result

# ----- cache internals (request-path; keep tight) ---------------------

def _cache_get(self, host: str) -> Tenant | None | object:
entry = self._cache.get(host)
if entry is None:
return _CACHE_MISS
tenant, expires_at = entry
if time.monotonic() > expires_at:
# Expired — drop and miss. Don't await a fresh resolve here;
# the caller does that. Avoids holding the entry through the
# adopter callable's network round-trip.
self._cache.pop(host, None)
return _CACHE_MISS
# LRU touch
self._cache.move_to_end(host)
return tenant

def _cache_put(self, host: str, tenant: Tenant | None) -> None:
expires_at = time.monotonic() + self._cache_ttl
self._cache[host] = (tenant, expires_at)
self._cache.move_to_end(host)
# Bound size — evict oldest until under limit.
while len(self._cache) > self._cache_size:
self._cache.popitem(last=False)

def invalidate(self, host: str | None = None) -> None:
"""Drop a cached entry (or all entries when ``host`` is ``None``).

Adopters call this from their tenant-creation, -deactivation, and
-modification flows to evict stale entries before the TTL fires.
Creation matters because negative results (``None``) are cached —
see the class docstring for details. Safe to call even when caching
is disabled (no-op).

:param host: Specific host to evict (raw or normalized — the
method normalizes internally). ``None`` clears the entire
cache.
"""
if host is None:
self._cache.clear()
return
self._cache.pop(_normalize_host(host), None)


# Sentinel for cache miss vs. cached-None (negative result)
_CACHE_MISS: object = object()


# Module-level contextvar — request-scoped via the ASGI middleware's
# per-call `set()`. ASGI guarantees per-task context isolation, so
# concurrent requests on the same process see only their own tenant.
Expand Down Expand Up @@ -303,9 +493,11 @@ async def _send_404(send: Send, *, reason: str) -> None:


__all__ = [
"CallableSubdomainTenantRouter",
"InMemorySubdomainTenantRouter",
"SubdomainTenantMiddleware",
"SubdomainTenantRouter",
"Tenant",
"TenantResolver",
"current_tenant",
]
Loading
Loading