Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 3 additions & 22 deletions examples/multi_platform_seller/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,23 +114,6 @@ def build_subdomain_middleware() -> tuple[type, dict[str, object]]:
return SubdomainTenantMiddleware, {"router": subdomain_router}


def _allowed_hosts() -> list[str]:
"""The TransportSecurityMiddleware host allowlist.

FastMCP's DNS-rebinding-protection default only accepts loopback
patterns; the tenant subdomains have to be added explicitly. Both
bare hosts and ``host:*`` (any-port) wildcards work; using
``host:*`` keeps the example port-agnostic for adopters who change
``ADCP_PORT``.
"""
return [
"tenant-a.localhost",
"tenant-a.localhost:*",
"tenant-b.localhost",
"tenant-b.localhost:*",
]


if __name__ == "__main__":
router = build_router()
middleware_class, middleware_kwargs = build_subdomain_middleware()
Expand All @@ -142,10 +125,8 @@ def _allowed_hosts() -> list[str]:
auto_emit_completion_webhooks=False,
# ``serve()`` forwards extra kwargs to ``adcp.server.serve``;
# the underlying transport accepts a Starlette middleware list.
# serve() auto-synthesizes bare + :* allowed_hosts entries from
# the SubdomainTenantMiddleware router, so no separate host list
# is needed here.
asgi_middleware=[(middleware_class, middleware_kwargs)],
# Extend FastMCP's host allowlist to include the tenant
# subdomains. Without this the transport returns 421 on every
# ``Host: tenant-x.localhost`` request before the subdomain
# router gets a chance to resolve.
allowed_hosts=_allowed_hosts(),
)
79 changes: 79 additions & 0 deletions src/adcp/server/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,16 @@ async def force_account_status(self, account_id, status):
debug_traffic_source=debug_traffic_source,
)

# Auto-synthesize bare+:* host pairs from any SubdomainTenantMiddleware
# in asgi_middleware so the router's host list is the single source of
# truth for both lookup and the FastMCP DNS-rebinding allowlist.
allowed_hosts = _synthesize_allowed_hosts(asgi_middleware, allowed_hosts)

if transport == "a2a":
# allowed_hosts / allowed_origins are FastMCP / MCP-layer knobs and
# are not forwarded to the A2A path — the A2A transport doesn't use
# FastMCP's TransportSecuritySettings. Host validation on the A2A
# path is handled by the outer ASGI stack (e.g. SubdomainTenantMiddleware).
_serve_a2a(
handler,
name=name,
Expand Down Expand Up @@ -769,6 +778,76 @@ def _prepend_debug_endpoint(
return [debug_entry, *asgi_middleware]


def _synthesize_allowed_hosts(
asgi_middleware: Sequence[ASGIMiddlewareEntry] | None,
allowed_hosts: Sequence[str] | None,
) -> Sequence[str] | None:
"""Auto-expand ``allowed_hosts`` from :class:`SubdomainTenantMiddleware` routers.

When the middleware list contains a :class:`SubdomainTenantMiddleware`
entry (or a subclass) whose ``router`` exposes a ``hosts()`` method (as
:class:`InMemorySubdomainTenantRouter` does), this synthesizes both the
bare host and the ``:*`` port-wildcard variant for each registered host.
That makes the FastMCP DNS-rebinding allowlist symmetric with the
router's port-agnostic ``_normalize_host`` resolution — adopters no
longer have to maintain a separate ``_allowed_hosts()`` helper.

Synthesis is a one-time snapshot at ``serve()`` startup. Production
adopters with dynamically provisioned tenants (SQL-backed routers) should
pass ``enable_dns_rebinding_protection=False`` and rely on
:class:`SubdomainTenantMiddleware` alone for host validation — it already
enforces the allowlist per-request.

Existing explicit ``allowed_hosts`` entries are preserved and
deduplicated against the synthesized set. Custom router implementations
that want to benefit from auto-synthesis must expose a ``hosts() ->
list[str]`` method; see :class:`InMemorySubdomainTenantRouter` as the
reference.
"""
import warnings

from adcp.server.tenant_router import SubdomainTenantMiddleware as _SubdomainTenantMw

synthesized: list[str] = []
for entry in asgi_middleware or []:
if not (isinstance(entry, tuple) and len(entry) == 2):
continue
cls, kwargs = entry
# Use issubclass (with a TypeError guard for non-class callables such
# as functools.partial) so adopters who subclass SubdomainTenantMiddleware
# to add logging or extra exclusion logic also trigger synthesis.
try:
is_subdomain_mw = isinstance(cls, type) and issubclass(cls, _SubdomainTenantMw)
except TypeError:
is_subdomain_mw = False
if not is_subdomain_mw:
continue
router = kwargs.get("router")
if router is None or not hasattr(router, "hosts"):
warnings.warn(
"SubdomainTenantMiddleware router does not expose hosts() — "
"auto-synthesis of allowed_hosts skipped. Multi-tenant subdomain "
"hosts may be rejected with 421 Misdirected Request unless listed "
"in allowed_hosts explicitly. Implement hosts() on your router or "
"pass enable_dns_rebinding_protection=False if the middleware "
"already handles all host validation.",
stacklevel=3,
)
continue
for bare_host in router.hosts():
if bare_host not in synthesized:
synthesized.append(bare_host)
port_wild = f"{bare_host}:*"
if port_wild not in synthesized:
synthesized.append(port_wild)

if not synthesized:
return allowed_hosts

existing = list(allowed_hosts or [])
return existing + [h for h in synthesized if h not in existing]


def _apply_asgi_middleware(
app: Any,
asgi_middleware: Sequence[ASGIMiddlewareEntry] | None,
Expand Down
22 changes: 22 additions & 0 deletions src/adcp/server/tenant_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,17 @@ async def resolve(self, host: str) -> Tenant | None:
"""
...

# Optional convention (not part of the formal Protocol contract):
# Implementing ``hosts(self) -> list[str]`` on a custom router lets
# :func:`adcp.server.serve.serve` auto-synthesize the FastMCP
# ``allowed_hosts`` allowlist (bare + ``:*`` variants) from the
# router's registered host list, eliminating the need for a separate
# ``_allowed_hosts()`` helper in adopter code. See
# :class:`InMemorySubdomainTenantRouter.hosts` for the reference
# implementation. Routers without ``hosts()`` are skipped with a
# startup warning — the adopter must pass ``allowed_hosts`` explicitly
# or set ``enable_dns_rebinding_protection=False``.


class InMemorySubdomainTenantRouter:
"""Reference :class:`SubdomainTenantRouter` for dev / test.
Expand All @@ -159,6 +170,17 @@ def __init__(self, tenants: Mapping[str, Tenant]) -> None:
async def resolve(self, host: str) -> Tenant | None:
return self._tenants.get(_normalize_host(host))

def hosts(self) -> list[str]:
"""Return registered host names (normalized: lower-cased, port-stripped).

Called by :func:`adcp.server.serve.serve` to auto-synthesize the
FastMCP ``allowed_hosts`` allowlist — so the host list passed to
the router is the single source of truth for both lookup and
DNS-rebinding-protection. Custom :class:`SubdomainTenantRouter`
implementations can expose the same method to get the same benefit.
"""
return list(self._tenants.keys())


# Module-level contextvar — request-scoped via the ASGI middleware's
# per-call `set()`. ASGI guarantees per-task context isolation, so
Expand Down
111 changes: 110 additions & 1 deletion tests/test_serve_transport_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@

from __future__ import annotations

import warnings
from typing import Any

from adcp.server.base import ADCPHandler
from adcp.server.serve import create_mcp_server
from adcp.server.serve import _synthesize_allowed_hosts, create_mcp_server
from adcp.server.tenant_router import (
InMemorySubdomainTenantRouter,
SubdomainTenantMiddleware,
Tenant,
)


class _StubHandler(ADCPHandler[Any]):
Expand Down Expand Up @@ -69,6 +75,109 @@ def test_allowed_origins_extends_default_list() -> None:
assert "http://acme.localhost:*" in ts.allowed_origins


# ----- _synthesize_allowed_hosts -----------------------------------------


def test_synthesize_produces_bare_and_port_wildcard() -> None:
"""Each registered host gets both the bare form and ``:*`` so the
FastMCP allowlist is port-agnostic, matching the router's
``_normalize_host`` port-stripping at lookup time."""
router = InMemorySubdomainTenantRouter(
tenants={
"acme.localhost": Tenant(id="acme", display_name="Acme"),
"beta.localhost": Tenant(id="beta", display_name="Beta"),
}
)
result = _synthesize_allowed_hosts(
[(SubdomainTenantMiddleware, {"router": router})],
allowed_hosts=None,
)
assert result is not None
assert "acme.localhost" in result
assert "acme.localhost:*" in result
assert "beta.localhost" in result
assert "beta.localhost:*" in result


def test_synthesize_merges_with_explicit_allowed_hosts() -> None:
"""Explicit ``allowed_hosts`` are preserved; synthesized entries are
appended without duplicates."""
router = InMemorySubdomainTenantRouter(
tenants={"acme.localhost": Tenant(id="acme", display_name="Acme")}
)
result = _synthesize_allowed_hosts(
[(SubdomainTenantMiddleware, {"router": router})],
allowed_hosts=["extra.example.com"],
)
assert result is not None
assert "extra.example.com" in result
assert "acme.localhost" in result
assert "acme.localhost:*" in result
# No duplicate bare host
assert list(result).count("acme.localhost") == 1


def test_synthesize_noop_when_no_subdomain_middleware() -> None:
"""No SubdomainTenantMiddleware in the list → returned value is
unchanged (None stays None, explicit list stays unchanged)."""
assert _synthesize_allowed_hosts(None, None) is None
assert _synthesize_allowed_hosts([], None) is None
explicit: list[str] = ["host.example.com"]
result = _synthesize_allowed_hosts([], explicit)
assert result is explicit


def test_synthesize_noop_for_router_without_hosts_method() -> None:
"""Custom routers that don't expose ``hosts()`` are skipped and emit a
startup warning — no AttributeError, no silent 421."""

class _CustomRouter:
async def resolve(self, host: str) -> None:
return None

with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
result = _synthesize_allowed_hosts(
[(SubdomainTenantMiddleware, {"router": _CustomRouter()})],
allowed_hosts=None,
)

assert result is None
assert len(caught) == 1
assert "hosts()" in str(caught[0].message)
assert "421" in str(caught[0].message)


def test_synthesize_matches_subclass_of_subdomain_middleware() -> None:
"""Adopters who subclass SubdomainTenantMiddleware (e.g. to add request
logging) still trigger synthesis — the check uses issubclass, not identity."""

class _LoggingTenantMiddleware(SubdomainTenantMiddleware):
pass

router = InMemorySubdomainTenantRouter(
tenants={"acme.localhost": Tenant(id="acme", display_name="Acme")}
)
result = _synthesize_allowed_hosts(
[(_LoggingTenantMiddleware, {"router": router})],
allowed_hosts=None,
)
assert result is not None
assert "acme.localhost" in result
assert "acme.localhost:*" in result


def test_synthesize_skips_callable_factory_entries() -> None:
"""Callable-factory middleware entries (not tuples) are ignored."""
router = InMemorySubdomainTenantRouter(tenants={})

def _factory(app: Any) -> Any:
return app

result = _synthesize_allowed_hosts([_factory], allowed_hosts=None)
assert result is None


def test_enable_dns_rebinding_protection_false_disables_check() -> None:
"""Adopters with their own tenant-aware host validation pass
``enable_dns_rebinding_protection=False`` so the MCP-layer check
Expand Down
18 changes: 18 additions & 0 deletions tests/test_subdomain_tenant_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,24 @@ def test_in_memory_router_satisfies_protocol() -> None:
assert isinstance(router, SubdomainTenantRouter)


def test_in_memory_router_hosts_returns_normalized_keys() -> None:
"""hosts() returns the normalized (lower-cased, port-stripped) keys so
serve() can synthesize the FastMCP allowlist from the same source."""
router = InMemorySubdomainTenantRouter(
tenants={
"Acme.Localhost": Tenant(id="acme", display_name="Acme"),
"beta.localhost:8080": Tenant(id="beta", display_name="Beta"),
}
)
result = sorted(router.hosts())
assert result == ["acme.localhost", "beta.localhost"]


def test_in_memory_router_hosts_empty() -> None:
router = InMemorySubdomainTenantRouter(tenants={})
assert router.hosts() == []


# ----- middleware: known host happy path ------------------------------


Expand Down
Loading