From b1dbadd3d773c30c0a4d3b2202e07986b5061c7a Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Thu, 30 Apr 2026 20:13:24 -0400 Subject: [PATCH 1/2] =?UTF-8?q?feat(decisioning):=20F12=20=E2=80=94=20auto?= =?UTF-8?q?-emit=20completion=20webhook=20on=20sync=20mutating=20responses?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sync ``create_media_buy``, ``update_media_buy``, ``sync_creatives`` responses now auto-fire a completion webhook when the buyer supplied ``push_notification_config.url``. Previously only the HITL TaskHandoff path emitted, so sync responses left buyers polling. Mirrors the JS-side ``emitSyncCompletionWebhook`` implementation (commits ``8dc427f9`` and ``7a887dfa`` on ``src/lib/server/decisioning/runtime/from-platform.ts``). Wire-format is identical: ``task_type``, ``status: 'completed'``, ``result`` field carrying the projected sync response, echoed ``token`` via ``X-AdCP-Push-Token`` header. ``task_id`` is synthesized as ``f"sync-{uuid4()}"`` since sync responses don't allocate a registry task; buyers correlate via the resource ids embedded in ``result``. New module ``adcp.decisioning.webhook_emit``: * ``SPEC_WEBHOOK_TASK_TYPES`` — closed 20-value set mirroring the on-disk spec enum at ``schemas/cache/enums/task-type.json``. The ``test_spec_webhook_task_types_matches_schema_cache`` test pins the constant so out-of-band drift surfaces in CI. * ``maybe_emit_sync_completion`` — fire-and-forget gate. Skips when disabled, no sender wired, no push URL on the request, or the tool isn't in the spec enum (logged warning so adopters notice they extended the surface beyond spec). * ``_BACKGROUND_WEBHOOK_TASKS`` — module-level strong-ref pin so the asyncio loop's weak-ref behavior doesn't garbage-collect in-flight emissions mid-flight. Mirrors the same pattern in ``dispatch._BACKGROUND_HANDOFF_TASKS``. **Fire-and-forget posture (DoS defense).** Webhook delivery runs in a background asyncio task; the sync response returns inline immediately. A buyer-supplied slowloris webhook URL must not be able to hold the seller's request worker for the full retry budget — the JS round-2 fix at ``7a887dfa`` documented this DoS vector and Python preserves the same posture from the start. **TaskHandoff path doesn't double-fire.** The ``_maybe_auto_emit_sync_completion`` helper detects the projected Submitted envelope (``status == 'submitted'`` shape) and skips delivery. The HITL path's registry completion emits its own webhook on terminal state. Configuration on ``create_adcp_server_from_platform`` and ``serve``: * ``webhook_sender: WebhookSender | None = None`` — BYO emitter. ``None`` silently disables auto-emit. * ``auto_emit_completion_webhooks: bool = True`` — default-on. Adopters who emit webhooks manually inside their handlers pass ``False`` to avoid duplicate delivery. 21 new tests cover: drift-guard against the on-disk schema cache, URL+token extraction (incl. dict-params test fixtures), gate skips (disabled, no sender, no URL, tool outside spec enum, no running loop), happy-path delivery via ``WebhookSender.send_mcp``, token echo via ``X-AdCP-Push-Token`` header, delivery-failure swallow, sync-success fires, TaskHandoff doesn't double-fire, opt-out suppresses, default-on, no-sender silent, sync_creatives fires too. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/adcp/decisioning/handler.py | 96 +++-- src/adcp/decisioning/serve.py | 35 ++ src/adcp/decisioning/webhook_emit.py | 226 ++++++++++ tests/test_decisioning_webhook_emit.py | 553 +++++++++++++++++++++++++ 4 files changed, 879 insertions(+), 31 deletions(-) create mode 100644 src/adcp/decisioning/webhook_emit.py create mode 100644 tests/test_decisioning_webhook_emit.py diff --git a/src/adcp/decisioning/handler.py b/src/adcp/decisioning/handler.py index 1554ab956..8c54aaf3d 100644 --- a/src/adcp/decisioning/handler.py +++ b/src/adcp/decisioning/handler.py @@ -39,6 +39,7 @@ _build_request_context, _invoke_platform_method, ) +from adcp.decisioning.webhook_emit import maybe_emit_sync_completion from adcp.server.base import ADCPHandler, ToolContext if TYPE_CHECKING: @@ -70,6 +71,7 @@ UpdateMediaBuyRequest, UpdateMediaBuySuccessResponse, ) + from adcp.webhook_sender import WebhookSender # --------------------------------------------------------------------------- @@ -141,6 +143,8 @@ def __init__( registry: TaskRegistry, state_reader: StateReader | None = None, resource_resolver: ResourceResolver | None = None, + webhook_sender: WebhookSender | None = None, + auto_emit_completion_webhooks: bool = True, ) -> None: super().__init__() self._platform = platform @@ -148,6 +152,8 @@ def __init__( self._registry = registry self._state_reader = state_reader self._resource_resolver = resource_resolver + self._webhook_sender = webhook_sender + self._auto_emit_completion_webhooks = auto_emit_completion_webhooks # ----- account resolution helper ----- @@ -211,6 +217,37 @@ def _extract_auth_info(ctx: ToolContext) -> AuthInfo | None: ) return None + def _maybe_auto_emit_sync_completion( + self, + method_name: str, + params: Any, + result: Any, + ) -> None: + """Fire the F12 sync-completion webhook if applicable. + + Skips TaskHandoff projections — those go through the registry + completion path which emits its own webhook on terminal state. + The auto-emit fires on the sync-success arm only, mirroring the + JS-side ``routeIfHandoff`` logic at + ``src/lib/server/decisioning/runtime/from-platform.ts``. + + TaskHandoff projection returns ``{"task_id": ..., "status": + "submitted"}`` from ``_project_handoff``; sync success returns + a Pydantic response or a dict matching the wire shape. We + distinguish on the ``status == "submitted"`` shape. + """ + if isinstance(result, dict) and result.get("status") == "submitted": + # TaskHandoff projection — registry completion path emits + # its own webhook on terminal state. + return + maybe_emit_sync_completion( + sender=self._webhook_sender, + enabled=self._auto_emit_completion_webhooks, + method_name=method_name, + params=params, + result=result, + ) + def _build_ctx( self, tool_ctx: ToolContext, @@ -260,17 +297,16 @@ async def create_media_buy( # type: ignore[override] tool_ctx = context or ToolContext() account = await self._resolve_account(params.account, tool_ctx) ctx = self._build_ctx(tool_ctx, account) - return cast( - "CreateMediaBuySuccessResponse", - await _invoke_platform_method( - self._platform, - "create_media_buy", - params, - ctx, - executor=self._executor, - registry=self._registry, - ), + result = await _invoke_platform_method( + self._platform, + "create_media_buy", + params, + ctx, + executor=self._executor, + registry=self._registry, ) + self._maybe_auto_emit_sync_completion("create_media_buy", params, result) + return cast("CreateMediaBuySuccessResponse", result) async def update_media_buy( # type: ignore[override] self, @@ -285,18 +321,17 @@ async def update_media_buy( # type: ignore[override] tool_ctx = context or ToolContext() account = await self._resolve_account(params.account, tool_ctx) ctx = self._build_ctx(tool_ctx, account) - return cast( - "UpdateMediaBuySuccessResponse", - await _invoke_platform_method( - self._platform, - "update_media_buy", - params, - ctx, - executor=self._executor, - registry=self._registry, - arg_projector={"media_buy_id": params.media_buy_id, "patch": params}, - ), + result = await _invoke_platform_method( + self._platform, + "update_media_buy", + params, + ctx, + executor=self._executor, + registry=self._registry, + arg_projector={"media_buy_id": params.media_buy_id, "patch": params}, ) + self._maybe_auto_emit_sync_completion("update_media_buy", params, result) + return cast("UpdateMediaBuySuccessResponse", result) async def sync_creatives( # type: ignore[override] self, @@ -306,17 +341,16 @@ async def sync_creatives( # type: ignore[override] tool_ctx = context or ToolContext() account = await self._resolve_account(params.account, tool_ctx) ctx = self._build_ctx(tool_ctx, account) - return cast( - "SyncCreativesSuccessResponse", - await _invoke_platform_method( - self._platform, - "sync_creatives", - params, - ctx, - executor=self._executor, - registry=self._registry, - ), + result = await _invoke_platform_method( + self._platform, + "sync_creatives", + params, + ctx, + executor=self._executor, + registry=self._registry, ) + self._maybe_auto_emit_sync_completion("sync_creatives", params, result) + return cast("SyncCreativesSuccessResponse", result) async def get_media_buy_delivery( # type: ignore[override] self, diff --git a/src/adcp/decisioning/serve.py b/src/adcp/decisioning/serve.py index ddf998085..fb86afb4e 100644 --- a/src/adcp/decisioning/serve.py +++ b/src/adcp/decisioning/serve.py @@ -43,6 +43,7 @@ from adcp.decisioning.resolve import ResourceResolver from adcp.decisioning.state import StateReader from adcp.decisioning.task_registry import TaskRegistry + from adcp.webhook_sender import WebhookSender def _is_production_env() -> bool: @@ -75,6 +76,8 @@ def create_adcp_server_from_platform( registry: TaskRegistry | None = None, state_reader: StateReader | None = None, resource_resolver: ResourceResolver | None = None, + webhook_sender: WebhookSender | None = None, + auto_emit_completion_webhooks: bool = True, ) -> tuple[PlatformHandler, ThreadPoolExecutor, TaskRegistry]: """Build the :class:`PlatformHandler` + supporting wiring from a :class:`DecisioningPlatform`. @@ -117,6 +120,24 @@ def create_adcp_server_from_platform( (D15 — async framework-mediated fetches). Default is the v6.0 stub (raises ``NotImplementedError`` with a pointer to v6.1). + :param webhook_sender: Bring-your-own + :class:`adcp.webhook_sender.WebhookSender` for sync-completion + and HITL-completion webhook delivery. Default ``None`` — when + unset, sync-completion auto-emit is a silent no-op (no URL to + deliver to, framework can't synthesize a sender). Adopters + wiring webhook delivery pass a configured sender (with their + signing key, IP-pinned transport, etc.). + :param auto_emit_completion_webhooks: F12 feature gate. When + ``True`` (default), the framework auto-fires a completion + webhook on the sync-success arm of mutating tools whenever the + request supplied ``push_notification_config.url`` AND the tool + is in :data:`adcp.decisioning.webhook_emit.SPEC_WEBHOOK_TASK_TYPES`. + Buyers passing the URL expect notification regardless of + whether the seller routed sync vs HITL. Set ``False`` for + adopters who emit webhooks manually inside their handlers + (avoid duplicate delivery; idempotency-key dedup at the + receiver would handle it but explicit suppression matches the + v5 manual-emit posture for adopters mid-migration). :raises ValueError: when ``executor`` and ``thread_pool_size`` are both supplied (D5 mutually-exclusive validation). @@ -213,6 +234,8 @@ def create_adcp_server_from_platform( registry=registry, state_reader=state_reader, resource_resolver=resource_resolver, + webhook_sender=webhook_sender, + auto_emit_completion_webhooks=auto_emit_completion_webhooks, ) return handler, executor, registry @@ -226,6 +249,8 @@ def serve( registry: TaskRegistry | None = None, state_reader: StateReader | None = None, resource_resolver: ResourceResolver | None = None, + webhook_sender: WebhookSender | None = None, + auto_emit_completion_webhooks: bool = True, advertise_all: bool = False, **serve_kwargs: Any, ) -> None: @@ -246,6 +271,14 @@ def serve( :class:`InMemoryTaskRegistry` (gated for production). :param state_reader: Custom :class:`StateReader` impl (D15). :param resource_resolver: Custom :class:`ResourceResolver` impl (D15). + :param webhook_sender: BYO :class:`adcp.webhook_sender.WebhookSender` + for completion webhook delivery (sync auto-emit + HITL terminal). + ``None`` disables auto-emit silently. + :param auto_emit_completion_webhooks: F12 — auto-fire a completion + webhook on the sync-success arm of mutating tools when the + request supplied ``push_notification_config.url``. Default + ``True``. Set ``False`` for adopters who emit webhooks + manually inside their handlers. :param advertise_all: Forwarded to :func:`adcp.server.serve`. When ``True``, ``tools/list`` advertises every method on the handler regardless of override status. Default ``False`` — @@ -267,6 +300,8 @@ def serve( registry=registry, state_reader=state_reader, resource_resolver=resource_resolver, + webhook_sender=webhook_sender, + auto_emit_completion_webhooks=auto_emit_completion_webhooks, ) server_name = name or type(platform).__name__ diff --git a/src/adcp/decisioning/webhook_emit.py b/src/adcp/decisioning/webhook_emit.py new file mode 100644 index 000000000..2c969770c --- /dev/null +++ b/src/adcp/decisioning/webhook_emit.py @@ -0,0 +1,226 @@ +"""Auto-emit completion webhook on sync-success arm of mutating tools. + +When a buyer supplies ``push_notification_config.url`` on a request and +the seller answers via the sync fast path (NOT a :class:`TaskHandoff`), +the framework fires a completion webhook to that URL after the response +so buyers get consistent notification regardless of how the seller +routed the call. Without this, a buyer registering a webhook URL would +get notifications only on the HITL path — sync responses would leave +them polling. + +Mirrors the JS-side ``emitSyncCompletionWebhook`` at +``src/lib/server/decisioning/runtime/from-platform.ts`` (commits +``8dc427f9`` and ``7a887dfa``). Wire-format is identical: same +``task_type``, ``status: 'completed'``, ``result`` field carrying the +projected sync response, and an echoed ``token`` if the buyer +registered one. ``task_id`` is synthesized as ``f"sync-{uuid4()}"`` +since sync responses don't allocate a registry task; buyers correlate +via the resource ids embedded in ``result``. + +**Fire-and-forget.** Webhook delivery runs in a background asyncio +task; the sync response returns inline immediately. A buyer-supplied +slowloris webhook URL must not be able to hold the seller's request +worker for the full retry budget — the JS round-2 fix (``7a887dfa``) +addressed this DoS vector and Python preserves the same posture. +``_BACKGROUND_WEBHOOK_TASKS`` strong-refs in-flight emissions so the +asyncio loop's weak-ref behavior doesn't garbage-collect them +mid-flight. + +**Spec gate.** Only tools in :data:`SPEC_WEBHOOK_TASK_TYPES` (the +closed 20-value enum from ``schemas/cache/enums/task-type.json``) +emit. Spec-validating webhook receivers reject envelopes with +non-spec ``task_type`` values; tools the framework dispatches that +aren't in the spec enum (adopter-only specialism methods) skip +delivery and rely on ``publishStatusChange`` for state updates. + +Adopters who emit webhooks manually inside their handlers pass +``auto_emit_completion_webhooks=False`` to +:func:`adcp.decisioning.serve` to avoid duplicate delivery. +""" + +from __future__ import annotations + +import asyncio +import logging +import uuid +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from adcp.webhook_sender import WebhookSender + +logger = logging.getLogger(__name__) + + +#: Tools eligible for sync-completion webhook auto-emit. Mirrors the +#: closed enum in ``schemas/cache/enums/task-type.json`` verbatim. The +#: framework dispatches a wider tool surface than this set; the JS side +#: maintains the same set at +#: ``src/lib/server/decisioning/runtime/protocol-for-tool.ts``. +#: +#: Drift policy: bump this constant AND the JS +#: ``SPEC_WEBHOOK_TASK_TYPES`` in lockstep when the spec enum widens. +#: A unit test pins this to the on-disk enum so out-of-band drift +#: surfaces in CI. +SPEC_WEBHOOK_TASK_TYPES: frozenset[str] = frozenset( + { + "create_media_buy", + "update_media_buy", + "sync_creatives", + "activate_signal", + "get_signals", + "create_property_list", + "update_property_list", + "get_property_list", + "list_property_lists", + "delete_property_list", + "sync_accounts", + "get_account_financials", + "get_creative_delivery", + "sync_event_sources", + "sync_audiences", + "sync_catalogs", + "log_event", + "get_brand_identity", + "get_rights", + "acquire_rights", + } +) + + +#: Strong-ref the in-flight auto-emit tasks so the asyncio loop's +#: weak-ref behavior doesn't garbage-collect them mid-flight. +#: Module-level so the set survives across requests; framework-internal, +#: never exported. Mirrors ``_BACKGROUND_HANDOFF_TASKS`` in +#: ``dispatch.py``. +_BACKGROUND_WEBHOOK_TASKS: set[asyncio.Task[None]] = set() + + +def _extract_push_notification_url_and_token( + params: Any, +) -> tuple[str, str | None] | None: + """Pull ``(url, token)`` from ``params.push_notification_config``. + + Returns ``None`` when the request didn't carry the field, the field + is None, or the URL is empty. Tolerates both Pydantic models and + plain dicts on ``params`` since handler shims and test fixtures + both call in. The URL is unwrapped via ``str()`` so the webhook + sender sees a plain string (Pydantic AnyUrl stringifies to canonical + form). + """ + config = getattr(params, "push_notification_config", None) + if config is None and isinstance(params, dict): + config = params.get("push_notification_config") + if config is None: + return None + url = getattr(config, "url", None) + if url is None and isinstance(config, dict): + url = config.get("url") + if not url: + return None + token = getattr(config, "token", None) + if token is None and isinstance(config, dict): + token = config.get("token") + return (str(url), token) + + +async def _emit_sync_completion_webhook( + *, + sender: WebhookSender, + url: str, + token: str | None, + method_name: str, + result: Any, +) -> None: + """Fire one sync-completion webhook. Logged-and-swallowed on failure. + + Wrapped by the caller in :func:`asyncio.create_task` so the sync + response returns to the buyer immediately. + """ + task_id = f"sync-{uuid.uuid4().hex[:16]}" + extra_headers = {"X-AdCP-Push-Token": token} if token else None + try: + await sender.send_mcp( + url=url, + task_id=task_id, + status="completed", + task_type=method_name, + result=result, + extra_headers=extra_headers, + ) + except Exception: + # Logged-and-swallowed: the sync response has already returned + # to the buyer with the result inline. + logger.warning( + "[adcp.decisioning] sync completion webhook for %s " + "task_id=%s failed; sync response already returned to buyer", + method_name, + task_id, + exc_info=True, + ) + + +def maybe_emit_sync_completion( + *, + sender: WebhookSender | None, + enabled: bool, + method_name: str, + params: Any, + result: Any, +) -> None: + """Fire-and-forget auto-emit gate. Called by handler shims after + the sync-success arm of mutating tools. + + Skips silently when: + + * ``enabled`` is False (operator opted out). + * ``sender`` is None (no emitter wired). + * The request didn't carry ``push_notification_config.url``. + * ``method_name`` isn't in :data:`SPEC_WEBHOOK_TASK_TYPES` (logged + as a warning so adopters notice they extended the tool surface + beyond the spec enum). + + Schedules the actual delivery via the running event loop's + ``create_task`` so the sync response path is non-blocking. + """ + if not enabled or sender is None: + return + extracted = _extract_push_notification_url_and_token(params) + if extracted is None: + return + url, token = extracted + if method_name not in SPEC_WEBHOOK_TASK_TYPES: + logger.warning( + "[adcp.decisioning] sync completion webhook for %s skipped — " + "tool not in spec task-type enum (closed 20-value set per " + "schemas/cache/enums/task-type.json). Use " + "publishStatusChange for long-running %s state.", + method_name, + method_name, + ) + return + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # No running loop — typically a sync-test path. The auto-emit + # is best-effort; surfacing this to the sync response path + # would be strictly worse than silent skip. + logger.debug("[adcp.decisioning] sync completion webhook skipped — no running loop") + return + bg = loop.create_task( + _emit_sync_completion_webhook( + sender=sender, + url=url, + token=token, + method_name=method_name, + result=result, + ), + name=f"adcp-sync-completion-{method_name}", + ) + _BACKGROUND_WEBHOOK_TASKS.add(bg) + bg.add_done_callback(_BACKGROUND_WEBHOOK_TASKS.discard) + + +__all__ = [ + "SPEC_WEBHOOK_TASK_TYPES", + "maybe_emit_sync_completion", +] diff --git a/tests/test_decisioning_webhook_emit.py b/tests/test_decisioning_webhook_emit.py new file mode 100644 index 000000000..33b524fdf --- /dev/null +++ b/tests/test_decisioning_webhook_emit.py @@ -0,0 +1,553 @@ +"""F12: auto-emit completion webhook on sync-success arm. + +Mirrors the JS test file +``test/server-decisioning-auto-emit-completion.test.js`` (commits +``8dc427f9`` + ``7a887dfa``) plus Python-specific concerns: + +* TaskHandoff projection path doesn't double-fire (registry completion + emits its own webhook on terminal state). +* Fire-and-forget non-blocking — sync response returns before webhook + delivery. +* Tools outside ``SPEC_WEBHOOK_TASK_TYPES`` skip with a warning. +* No-running-loop branch is silent (sync test paths). +* SPEC_WEBHOOK_TASK_TYPES drift-guard against the on-disk schema cache. +""" + +from __future__ import annotations + +import asyncio +import json +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Any +from unittest.mock import AsyncMock + +import pytest + +from adcp.decisioning import ( + DecisioningCapabilities, + DecisioningPlatform, + SingletonAccounts, +) +from adcp.decisioning.handler import PlatformHandler +from adcp.decisioning.task_registry import InMemoryTaskRegistry +from adcp.decisioning.webhook_emit import ( + _BACKGROUND_WEBHOOK_TASKS, + SPEC_WEBHOOK_TASK_TYPES, + _extract_push_notification_url_and_token, + maybe_emit_sync_completion, +) +from adcp.server.base import ToolContext +from adcp.types import ( + CreateMediaBuyRequest, + CreateMediaBuySuccessResponse, + SyncCreativesRequest, +) + +# ---- SPEC_WEBHOOK_TASK_TYPES drift-guard ---- + + +def test_spec_webhook_task_types_matches_schema_cache() -> None: + """Pin the constant to the on-disk spec enum. CI catches + out-of-band drift when the schema cache refreshes from upstream.""" + schema_path = Path(__file__).parent.parent / "schemas" / "cache" / "enums" / "task-type.json" + with schema_path.open() as f: + on_disk = frozenset(json.load(f)["enum"]) + assert SPEC_WEBHOOK_TASK_TYPES == on_disk, ( + f"SPEC_WEBHOOK_TASK_TYPES drifted from on-disk task-type enum. " + f"Missing from constant: {sorted(on_disk - SPEC_WEBHOOK_TASK_TYPES)}; " + f"extra in constant: {sorted(SPEC_WEBHOOK_TASK_TYPES - on_disk)}." + ) + + +# ---- _extract_push_notification_url_and_token ---- + + +def test_extract_returns_none_when_config_missing() -> None: + """No ``push_notification_config`` field → no auto-emit.""" + + class _Bare: + pass + + assert _extract_push_notification_url_and_token(_Bare()) is None + + +def test_extract_returns_none_when_config_is_none() -> None: + """Field present but ``None`` → no auto-emit.""" + + class _NullConfig: + push_notification_config = None + + assert _extract_push_notification_url_and_token(_NullConfig()) is None + + +def test_extract_returns_url_and_token_when_present() -> None: + """Field with URL + token → both pulled out.""" + + class _Config: + url = "https://buyer.example.com/webhooks" + token = "echo-back-this-token" + + class _Params: + push_notification_config = _Config() + + extracted = _extract_push_notification_url_and_token(_Params()) + assert extracted == ("https://buyer.example.com/webhooks", "echo-back-this-token") + + +def test_extract_returns_url_with_none_token() -> None: + """Field with URL only → token is None.""" + + class _Config: + url = "https://buyer.example.com/webhooks" + token = None + + class _Params: + push_notification_config = _Config() + + extracted = _extract_push_notification_url_and_token(_Params()) + assert extracted == ("https://buyer.example.com/webhooks", None) + + +def test_extract_handles_dict_params() -> None: + """Test fixtures using plain-dict params still work.""" + params = { + "push_notification_config": { + "url": "https://buyer.example.com/webhooks", + "token": "tok", + } + } + extracted = _extract_push_notification_url_and_token(params) + assert extracted == ("https://buyer.example.com/webhooks", "tok") + + +# ---- maybe_emit_sync_completion gate ---- + + +@pytest.mark.asyncio +async def test_maybe_emit_skips_when_disabled() -> None: + """``enabled=False`` → no delivery, no background task.""" + sender = AsyncMock() + + class _Config: + url = "https://buyer.example.com/wh" + token = None + + class _Params: + push_notification_config = _Config() + + maybe_emit_sync_completion( + sender=sender, + enabled=False, + method_name="create_media_buy", + params=_Params(), + result={"media_buy_id": "mb_1"}, + ) + await asyncio.sleep(0) + sender.send_mcp.assert_not_called() + + +@pytest.mark.asyncio +async def test_maybe_emit_skips_when_sender_none() -> None: + """``sender=None`` → silent skip (no emitter wired).""" + + class _Config: + url = "https://buyer.example.com/wh" + token = None + + class _Params: + push_notification_config = _Config() + + # Smoke — must not raise. + maybe_emit_sync_completion( + sender=None, + enabled=True, + method_name="create_media_buy", + params=_Params(), + result={"media_buy_id": "mb_1"}, + ) + + +@pytest.mark.asyncio +async def test_maybe_emit_skips_when_no_push_url() -> None: + """Request without ``push_notification_config.url`` → no delivery.""" + sender = AsyncMock() + + class _Params: + push_notification_config = None + + maybe_emit_sync_completion( + sender=sender, + enabled=True, + method_name="create_media_buy", + params=_Params(), + result={"media_buy_id": "mb_1"}, + ) + await asyncio.sleep(0) + sender.send_mcp.assert_not_called() + + +@pytest.mark.asyncio +async def test_maybe_emit_skips_tool_outside_spec_enum(caplog) -> None: + """Tool not in ``SPEC_WEBHOOK_TASK_TYPES`` → skip + warn. + + Spec-validating receivers reject envelopes with non-spec + ``task_type`` values; the framework logs once per skip so adopters + notice they extended the tool surface beyond the spec enum.""" + import logging + + sender = AsyncMock() + + class _Config: + url = "https://buyer.example.com/wh" + token = None + + class _Params: + push_notification_config = _Config() + + with caplog.at_level(logging.WARNING, logger="adcp.decisioning.webhook_emit"): + maybe_emit_sync_completion( + sender=sender, + enabled=True, + method_name="custom_adopter_tool", # Not in spec enum + params=_Params(), + result={"x": 1}, + ) + await asyncio.sleep(0) + sender.send_mcp.assert_not_called() + assert any("not in spec task-type enum" in rec.message for rec in caplog.records) + + +@pytest.mark.asyncio +async def test_maybe_emit_fires_when_url_set() -> None: + """Happy path — URL set + tool in enum + enabled → background + delivery via ``WebhookSender.send_mcp``.""" + sender = AsyncMock() + + class _Config: + url = "https://buyer.example.com/wh" + token = None + + class _Params: + push_notification_config = _Config() + + maybe_emit_sync_completion( + sender=sender, + enabled=True, + method_name="create_media_buy", + params=_Params(), + result={"media_buy_id": "mb_1"}, + ) + # Drain background task. + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + sender.send_mcp.assert_awaited_once() + call_kwargs = sender.send_mcp.await_args.kwargs + assert call_kwargs["url"] == "https://buyer.example.com/wh" + assert call_kwargs["task_type"] == "create_media_buy" + assert call_kwargs["status"] == "completed" + assert call_kwargs["result"] == {"media_buy_id": "mb_1"} + assert call_kwargs["task_id"].startswith("sync-") + + +@pytest.mark.asyncio +async def test_maybe_emit_echoes_token_via_extra_headers() -> None: + """Buyer-supplied token round-trips on the ``X-AdCP-Push-Token`` + extra header. Receivers verify the token without parsing URL paths. + + Note: the spec wire field is ``payload.token``; the SDK's + ``WebhookSender.send_mcp`` doesn't currently expose ``token`` on + the payload directly, so we round-trip via header. F12 round-3 may + add a payload-level wiring.""" + sender = AsyncMock() + + class _Config: + url = "https://buyer.example.com/wh" + token = "echo-this-back-1234567890" + + class _Params: + push_notification_config = _Config() + + maybe_emit_sync_completion( + sender=sender, + enabled=True, + method_name="create_media_buy", + params=_Params(), + result={"media_buy_id": "mb_1"}, + ) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + extra_headers = sender.send_mcp.await_args.kwargs["extra_headers"] + assert extra_headers == {"X-AdCP-Push-Token": "echo-this-back-1234567890"} + + +@pytest.mark.asyncio +async def test_maybe_emit_swallows_delivery_failure(caplog) -> None: + """Webhook delivery failure must NOT propagate — sync response + has already returned to the buyer.""" + import logging + + sender = AsyncMock() + sender.send_mcp.side_effect = RuntimeError("receiver down") + + class _Config: + url = "https://buyer.example.com/wh" + token = None + + class _Params: + push_notification_config = _Config() + + with caplog.at_level(logging.WARNING, logger="adcp.decisioning.webhook_emit"): + maybe_emit_sync_completion( + sender=sender, + enabled=True, + method_name="create_media_buy", + params=_Params(), + result={"media_buy_id": "mb_1"}, + ) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + sender.send_mcp.assert_awaited_once() + assert any( + "sync completion webhook" in rec.message and "failed" in rec.message + for rec in caplog.records + ) + + +def test_maybe_emit_skips_silently_with_no_running_loop() -> None: + """Sync test paths that call the gate outside an event loop get a + silent skip — surfacing this would be strictly worse than the + quiet best-effort behavior.""" + sender = AsyncMock() + + class _Config: + url = "https://buyer.example.com/wh" + token = None + + class _Params: + push_notification_config = _Config() + + # No asyncio.run wrapping this — must not raise. + maybe_emit_sync_completion( + sender=sender, + enabled=True, + method_name="create_media_buy", + params=_Params(), + result={"media_buy_id": "mb_1"}, + ) + sender.send_mcp.assert_not_called() + + +# ---- PlatformHandler integration: sync-success fires, handoff doesn't ---- + + +@pytest.fixture +def executor(): + pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test-f12-") + yield pool + pool.shutdown(wait=True) + + +def _make_request(*, with_url: bool = True, idem_suffix: str = "x") -> CreateMediaBuyRequest: + """Build a minimal CreateMediaBuyRequest with optional push config.""" + payload: dict[str, Any] = { + "account": {"account_id": "acct_a"}, + "brand": {"domain": "example.com"}, + "idempotency_key": f"idem_aaaa12345678{idem_suffix}", + "start_time": "2026-05-01T00:00:00Z", + "end_time": "2026-05-31T23:59:59Z", + } + if with_url: + payload["push_notification_config"] = { + "url": "https://buyer.example.com/wh", + "token": "echo-back-xxxxxxxxxxxxx", + } + return CreateMediaBuyRequest(**payload) + + +class _SyncSuccessPlatform(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["sales-non-guaranteed"]) + accounts = SingletonAccounts(account_id="hello") + + def get_products(self, req, ctx): + return {"products": []} + + def create_media_buy(self, req, ctx): + return CreateMediaBuySuccessResponse(media_buy_id="mb_1", packages=[], status="active") + + def update_media_buy(self, media_buy_id, patch, ctx): + return {"media_buy_id": media_buy_id, "status": "active"} + + def sync_creatives(self, req, ctx): + return {"creatives": []} + + def get_media_buy_delivery(self, req, ctx): + return {"deliveries": []} + + +class _HandoffPlatform(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["sales-non-guaranteed"]) + accounts = SingletonAccounts(account_id="hello") + + def get_products(self, req, ctx): + return {"products": []} + + def create_media_buy(self, req, ctx): + async def _review(task_ctx): + return CreateMediaBuySuccessResponse( + media_buy_id="mb_after_review", packages=[], status="active" + ) + + return ctx.handoff_to_task(_review) + + def update_media_buy(self, media_buy_id, patch, ctx): + return {"media_buy_id": media_buy_id, "status": "active"} + + def sync_creatives(self, req, ctx): + return {"creatives": []} + + def get_media_buy_delivery(self, req, ctx): + return {"deliveries": []} + + +@pytest.mark.asyncio +async def test_handler_fires_auto_emit_on_sync_success(executor) -> None: + """End-to-end: sync mutating tool with push URL → auto-emit fires.""" + sender = AsyncMock() + handler = PlatformHandler( + _SyncSuccessPlatform(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + auto_emit_completion_webhooks=True, + ) + await handler.create_media_buy(_make_request(with_url=True), ToolContext()) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + sender.send_mcp.assert_awaited_once() + assert sender.send_mcp.await_args.kwargs["task_type"] == "create_media_buy" + + +@pytest.mark.asyncio +async def test_handler_does_not_double_fire_on_handoff_path(executor) -> None: + """TaskHandoff projection returns the Submitted envelope; the + registry completion path emits its own webhook on terminal state. + The auto-emit MUST NOT fire on this arm — buyer would receive + duplicate webhooks.""" + sender = AsyncMock() + handler = PlatformHandler( + _HandoffPlatform(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + auto_emit_completion_webhooks=True, + ) + result = await handler.create_media_buy(_make_request(with_url=True), ToolContext()) + # Drain any background tasks (handoff fn runs in background). + for _ in range(20): + await asyncio.sleep(0.05) + # The auto-emit must NOT have fired — handoff path is responsible + # for its own webhook. + sender.send_mcp.assert_not_called() + # Sanity check: result is the Submitted envelope. + assert isinstance(result, dict) + assert result["status"] == "submitted" + + +@pytest.mark.asyncio +async def test_handler_opt_out_suppresses_auto_emit(executor) -> None: + """``auto_emit_completion_webhooks=False`` → no delivery on sync + success, even with URL set. Adopter middleware emits manually.""" + sender = AsyncMock() + handler = PlatformHandler( + _SyncSuccessPlatform(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + auto_emit_completion_webhooks=False, + ) + await handler.create_media_buy(_make_request(with_url=True), ToolContext()) + await asyncio.sleep(0.05) + sender.send_mcp.assert_not_called() + + +@pytest.mark.asyncio +async def test_handler_no_url_no_emit(executor) -> None: + """Request without ``push_notification_config`` → no auto-emit.""" + sender = AsyncMock() + handler = PlatformHandler( + _SyncSuccessPlatform(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + auto_emit_completion_webhooks=True, + ) + await handler.create_media_buy(_make_request(with_url=False), ToolContext()) + await asyncio.sleep(0.05) + sender.send_mcp.assert_not_called() + + +@pytest.mark.asyncio +async def test_handler_default_is_enabled(executor) -> None: + """``auto_emit_completion_webhooks`` defaults to True — adopter + not setting the flag still gets webhook delivery.""" + sender = AsyncMock() + handler = PlatformHandler( + _SyncSuccessPlatform(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + # NOT passing auto_emit_completion_webhooks — testing default. + ) + await handler.create_media_buy(_make_request(with_url=True), ToolContext()) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + sender.send_mcp.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_handler_no_sender_no_emit(executor) -> None: + """No webhook_sender wired (the default for ``serve()``) → silent + skip. Adopters who don't want webhooks just don't pass one.""" + handler = PlatformHandler( + _SyncSuccessPlatform(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=None, + auto_emit_completion_webhooks=True, + ) + # Smoke — must not raise. + await handler.create_media_buy(_make_request(with_url=True), ToolContext()) + + +@pytest.mark.asyncio +async def test_handler_sync_creatives_also_fires(executor) -> None: + """The auto-emit isn't create_media_buy-only — sync_creatives is + also a mutating tool in the spec enum and triggers identically. + + Uses ``model_construct`` to bypass creative-payload validation + (the F12 behavior is what's under test, not the request shape).""" + sender = AsyncMock() + handler = PlatformHandler( + _SyncSuccessPlatform(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + auto_emit_completion_webhooks=True, + ) + from adcp.types import PushNotificationConfig + + req = SyncCreativesRequest.model_construct( + account={"account_id": "acct_a"}, # type: ignore[arg-type] + creatives=[], + idempotency_key="idem_aaaa1234567890", + push_notification_config=PushNotificationConfig.model_construct( + url="https://buyer.example.com/wh", + token="echo-back-xxxxxxxxxxxxx", + ), + ) + await handler.sync_creatives(req, ToolContext()) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + sender.send_mcp.assert_awaited_once() + assert sender.send_mcp.await_args.kwargs["task_type"] == "sync_creatives" From 906369bd021cf59d6d8eac8fa06cab28421f84e6 Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Thu, 30 Apr 2026 20:22:15 -0400 Subject: [PATCH 2/2] =?UTF-8?q?fix(decisioning,webhooks):=20F12=20round-2?= =?UTF-8?q?=20=E2=80=94=20token=20via=20payload=20+=20exception=20isolatio?= =?UTF-8?q?n?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two P0 expert-review findings on PR #331: P0-1 (cross-language wire divergence): the buyer's ``push_notification_config.token`` was being echoed via ``X-AdCP-Push-Token`` HTTP header. Per ``schemas/cache/core/push_notification_config.json`` ("Echoed back in webhook payload to validate request authenticity") AND the JS reference impl (``buildTaskWebhookPayload`` in ``src/lib/server/decisioning/runtime/from-platform.ts``), the token belongs on ``payload.token``. Buyers validating against the spec read ``body.token``, not custom headers — header echo would silently fail their auth check. Fix: extend ``create_mcp_webhook_payload`` and ``WebhookSender.send_mcp`` to accept ``token`` and write it onto the payload. Update F12's ``_emit_sync_completion_webhook`` to pass ``token=`` through instead of building ``extra_headers``. Cross-language wire-parity restored. P0-2 (exception isolation): ``maybe_emit_sync_completion`` runs AFTER the platform method's successful return. ANY exception in the gate body — extraction quirk on a weird ``params`` shape, ``loop.create_task`` failure — would propagate to the handler shim and lose the buyer's sync response. Fix: wrap the entire gate body in ``try/except Exception``; logged-and-swallowed. Last-line defense ensures the post-success path can never poison the buyer's response. P1 fixes folded in: * Submitted-shape detection tightened to the EXACT 2-key dict ``{"task_id", "status"}`` (not the loose ``status == "submitted"`` predicate). An adopter who legitimately returns a sync ``{"status": "submitted", ...}`` with extra metadata (queue acceptance) now correctly gets the auto-emit fired. * No-running-loop branch bumped from ``logger.debug`` to ``logger.warning`` — production code landing here is mis-wired and should be visible. Round-2 tests added: * ``test_handler_returns_before_webhook_delivers`` — pins the non-blocking invariant (sync response returns before webhook delivery completes). * ``test_concurrent_emissions_dont_corrupt_strong_ref_set`` — 100 concurrent emissions exercising the ``_BACKGROUND_WEBHOOK_TASKS`` add/discard pattern. * ``test_handler_does_not_skip_loose_submitted_shape`` — pins the tightened submitted-shape detection. * ``test_gate_swallows_unexpected_exceptions`` — pins the exception-isolation invariant via a sender that raises on attribute access. 25 F12 tests pass total (up from 21); 2208 total tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/adcp/decisioning/handler.py | 16 ++- src/adcp/decisioning/webhook_emit.py | 102 ++++++++----- src/adcp/webhook_sender.py | 9 ++ src/adcp/webhooks.py | 10 ++ tests/test_decisioning_webhook_emit.py | 192 +++++++++++++++++++++++-- 5 files changed, 278 insertions(+), 51 deletions(-) diff --git a/src/adcp/decisioning/handler.py b/src/adcp/decisioning/handler.py index 8c54aaf3d..90d8dc72c 100644 --- a/src/adcp/decisioning/handler.py +++ b/src/adcp/decisioning/handler.py @@ -231,12 +231,18 @@ def _maybe_auto_emit_sync_completion( JS-side ``routeIfHandoff`` logic at ``src/lib/server/decisioning/runtime/from-platform.ts``. - TaskHandoff projection returns ``{"task_id": ..., "status": - "submitted"}`` from ``_project_handoff``; sync success returns - a Pydantic response or a dict matching the wire shape. We - distinguish on the ``status == "submitted"`` shape. + TaskHandoff projection returns the exact 2-key dict ``{"task_id": + ..., "status": "submitted"}`` from ``_project_handoff``; we + match the full key set rather than the loose ``status == + "submitted"`` predicate so an adopter who legitimately returns a + sync ``{"status": "submitted", ...}`` (e.g., synchronous queue + acceptance with extra metadata) still gets the auto-emit. """ - if isinstance(result, dict) and result.get("status") == "submitted": + if ( + isinstance(result, dict) + and set(result.keys()) == {"task_id", "status"} + and result.get("status") == "submitted" + ): # TaskHandoff projection — registry completion path emits # its own webhook on terminal state. return diff --git a/src/adcp/decisioning/webhook_emit.py b/src/adcp/decisioning/webhook_emit.py index 2c969770c..3cedda76f 100644 --- a/src/adcp/decisioning/webhook_emit.py +++ b/src/adcp/decisioning/webhook_emit.py @@ -134,10 +134,13 @@ async def _emit_sync_completion_webhook( """Fire one sync-completion webhook. Logged-and-swallowed on failure. Wrapped by the caller in :func:`asyncio.create_task` so the sync - response returns to the buyer immediately. + response returns to the buyer immediately. Truncated to 16 hex + chars (~64 bits) — adequate for buyer correlation. Buyers + correlate primarily via resource ids on ``result`` + (``media_buy_id``, ``creative_id``, etc.); ``task_id`` here is + informational for the spec's required-field shape. """ task_id = f"sync-{uuid.uuid4().hex[:16]}" - extra_headers = {"X-AdCP-Push-Token": token} if token else None try: await sender.send_mcp( url=url, @@ -145,7 +148,7 @@ async def _emit_sync_completion_webhook( status="completed", task_type=method_name, result=result, - extra_headers=extra_headers, + token=token, ) except Exception: # Logged-and-swallowed: the sync response has already returned @@ -181,43 +184,70 @@ def maybe_emit_sync_completion( Schedules the actual delivery via the running event loop's ``create_task`` so the sync response path is non-blocking. + + **Exception isolation.** The gate runs AFTER the platform method's + successful return. ANY exception in here — extraction quirk on a + weird ``params`` shape, ``loop.create_task`` failure — must NOT + propagate to the handler shim, which would lose the buyer's sync + response. The whole body is wrapped in ``try/except Exception``; + logged-and-swallowed. """ - if not enabled or sender is None: - return - extracted = _extract_push_notification_url_and_token(params) - if extracted is None: - return - url, token = extracted - if method_name not in SPEC_WEBHOOK_TASK_TYPES: + try: + if not enabled or sender is None: + return + extracted = _extract_push_notification_url_and_token(params) + if extracted is None: + return + url, token = extracted + if method_name not in SPEC_WEBHOOK_TASK_TYPES: + logger.warning( + "[adcp.decisioning] sync completion webhook for %s skipped — " + "tool not in spec task-type enum (closed 20-value set per " + "schemas/cache/enums/task-type.json). Use " + "publishStatusChange for long-running %s state.", + method_name, + method_name, + ) + return + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # Production code that lands here is mis-wired (handler + # shim called outside an event loop); bump to warning so + # it's visible. Cost of one warning per misuse is + # negligible vs. the cost of silent webhook loss. + logger.warning( + "[adcp.decisioning] sync completion webhook for %s " + "skipped — no running event loop. The handler shim is " + "expected to run inside an asyncio task; this branch " + "fires when sync test code calls into the handler " + "outside ``asyncio.run`` or ``pytest.mark.asyncio``.", + method_name, + ) + return + bg = loop.create_task( + _emit_sync_completion_webhook( + sender=sender, + url=url, + token=token, + method_name=method_name, + result=result, + ), + name=f"adcp-sync-completion-{method_name}", + ) + _BACKGROUND_WEBHOOK_TASKS.add(bg) + bg.add_done_callback(_BACKGROUND_WEBHOOK_TASKS.discard) + except Exception: + # Last-line defense: an unexpected exception in the gate + # itself (extraction quirk, scheduler error) must never + # propagate to the handler shim, which has already produced + # a successful sync response for the buyer. logger.warning( - "[adcp.decisioning] sync completion webhook for %s skipped — " - "tool not in spec task-type enum (closed 20-value set per " - "schemas/cache/enums/task-type.json). Use " - "publishStatusChange for long-running %s state.", - method_name, + "[adcp.decisioning] sync completion webhook gate raised " + "for %s; sync response unaffected", method_name, + exc_info=True, ) - return - try: - loop = asyncio.get_running_loop() - except RuntimeError: - # No running loop — typically a sync-test path. The auto-emit - # is best-effort; surfacing this to the sync response path - # would be strictly worse than silent skip. - logger.debug("[adcp.decisioning] sync completion webhook skipped — no running loop") - return - bg = loop.create_task( - _emit_sync_completion_webhook( - sender=sender, - url=url, - token=token, - method_name=method_name, - result=result, - ), - name=f"adcp-sync-completion-{method_name}", - ) - _BACKGROUND_WEBHOOK_TASKS.add(bg) - bg.add_done_callback(_BACKGROUND_WEBHOOK_TASKS.discard) __all__ = [ diff --git a/src/adcp/webhook_sender.py b/src/adcp/webhook_sender.py index 9d4f0b6b8..77bac692c 100644 --- a/src/adcp/webhook_sender.py +++ b/src/adcp/webhook_sender.py @@ -300,6 +300,7 @@ async def send_mcp( context_id: str | None = None, domain: str | None = None, idempotency_key: str | None = None, + token: str | None = None, extra_headers: Mapping[str, str] | None = None, ) -> WebhookDeliveryResult: """POST a signed MCP-style task-status webhook. @@ -309,6 +310,13 @@ async def send_mcp( the "same" args would produce a fresh ``timestamp`` and potentially a different serialized body, which the receiver would dedupe but with different observed payload data. + + :param token: Buyer-supplied token from + ``push_notification_config.token`` echoed back on the + payload's ``token`` field per spec + (``schemas/cache/core/push_notification_config.json``: "Echoed + back in webhook payload to validate request authenticity"). + Cross-language wire-parity with the JS implementation. """ payload = create_mcp_webhook_payload( task_id=task_id, @@ -321,6 +329,7 @@ async def send_mcp( context_id=context_id, domain=domain, idempotency_key=idempotency_key, + token=token, ) return await self.send_raw( url=url, diff --git a/src/adcp/webhooks.py b/src/adcp/webhooks.py index cf7993406..47e71a81b 100644 --- a/src/adcp/webhooks.py +++ b/src/adcp/webhooks.py @@ -94,6 +94,7 @@ def create_mcp_webhook_payload( context_id: str | None = None, domain: str | None = None, idempotency_key: str | None = None, + token: str | None = None, ) -> dict[str, Any]: """ Create MCP webhook payload dictionary. @@ -191,6 +192,15 @@ def create_mcp_webhook_payload( if domain is not None: payload["domain"] = domain + if token is not None: + # Buyer-supplied token from push_notification_config.token, + # echoed back per push-notification-config.json spec text: + # "Echoed back in webhook payload to validate request authenticity." + # Cross-language wire-parity with the JS implementation + # (``buildTaskWebhookPayload`` in ``from-platform.ts``) — buyers + # validating against the spec read body.token, not headers. + payload["token"] = token + return payload diff --git a/tests/test_decisioning_webhook_emit.py b/tests/test_decisioning_webhook_emit.py index 33b524fdf..8f5a04e17 100644 --- a/tests/test_decisioning_webhook_emit.py +++ b/tests/test_decisioning_webhook_emit.py @@ -251,14 +251,13 @@ class _Params: @pytest.mark.asyncio -async def test_maybe_emit_echoes_token_via_extra_headers() -> None: - """Buyer-supplied token round-trips on the ``X-AdCP-Push-Token`` - extra header. Receivers verify the token without parsing URL paths. - - Note: the spec wire field is ``payload.token``; the SDK's - ``WebhookSender.send_mcp`` doesn't currently expose ``token`` on - the payload directly, so we round-trip via header. F12 round-3 may - add a payload-level wiring.""" +async def test_maybe_emit_echoes_token_via_payload_field() -> None: + """Buyer-supplied ``push_notification_config.token`` round-trips + on the payload's ``token`` field per spec + (``schemas/cache/core/push_notification_config.json``: "Echoed + back in webhook payload to validate request authenticity"). + Cross-language wire-parity with the JS reference impl + (``buildTaskWebhookPayload`` in ``from-platform.ts``).""" sender = AsyncMock() class _Config: @@ -277,8 +276,10 @@ class _Params: ) while _BACKGROUND_WEBHOOK_TASKS: await asyncio.sleep(0) - extra_headers = sender.send_mcp.await_args.kwargs["extra_headers"] - assert extra_headers == {"X-AdCP-Push-Token": "echo-this-back-1234567890"} + # Token is on the payload via the ``token`` kwarg, NOT on a + # custom header. Receivers reading body.token per spec find it. + call_kwargs = sender.send_mcp.await_args.kwargs + assert call_kwargs["token"] == "echo-this-back-1234567890" @pytest.mark.asyncio @@ -551,3 +552,174 @@ async def test_handler_sync_creatives_also_fires(executor) -> None: await asyncio.sleep(0) sender.send_mcp.assert_awaited_once() assert sender.send_mcp.await_args.kwargs["task_type"] == "sync_creatives" + + +# ---- Round-2 expert review: non-blocking + concurrency + adopter-loose-shape ---- + + +@pytest.mark.asyncio +async def test_handler_returns_before_webhook_delivers(executor) -> None: + """The PR's load-bearing invariant: sync response returns BEFORE + webhook delivery completes. A future refactor that awaits the + webhook before returning would be a documented DoS vector + (slowloris webhook receiver holds the seller's request worker). + Block ``send_mcp`` on an asyncio.Event and assert the handler's + ``create_media_buy`` returns first.""" + webhook_started = asyncio.Event() + webhook_can_finish = asyncio.Event() + + async def _slow_send_mcp(*args, **kwargs): + webhook_started.set() + await webhook_can_finish.wait() + + sender = AsyncMock() + sender.send_mcp.side_effect = _slow_send_mcp + + handler = PlatformHandler( + _SyncSuccessPlatform(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + auto_emit_completion_webhooks=True, + ) + + # Sync response returns even though the webhook is still blocked. + response = await handler.create_media_buy( + _make_request(with_url=True, idem_suffix="nb"), ToolContext() + ) + # Handler returned its sync result. + assert response.media_buy_id == "mb_1" + + # Background task started but is blocked. The handler already + # returned its sync response above; the webhook receiver is still + # holding the delivery, proving the response path is non-blocking. + await asyncio.wait_for(webhook_started.wait(), timeout=1.0) + assert len(_BACKGROUND_WEBHOOK_TASKS) >= 1 + + # Release the webhook receiver and let the background task drain. + webhook_can_finish.set() + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + sender.send_mcp.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_concurrent_emissions_dont_corrupt_strong_ref_set(executor) -> None: + """100 concurrent ``maybe_emit_sync_completion`` calls — each + schedules a background task; ``_BACKGROUND_WEBHOOK_TASKS`` add / + discard pattern must remain consistent. A future regression + swapping ``set`` for a list, or using ``clear()`` instead of + ``discard``, would break this test.""" + + class _Config: + url = "https://buyer.example.com/wh" + token = None + + class _Params: + push_notification_config = _Config() + + sender = AsyncMock() + sender.send_mcp.return_value = None + + for _ in range(100): + maybe_emit_sync_completion( + sender=sender, + enabled=True, + method_name="create_media_buy", + params=_Params(), + result={"media_buy_id": "mb"}, + ) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + assert sender.send_mcp.await_count == 100 + # Set drained completely — done callbacks discarded each task. + assert len(_BACKGROUND_WEBHOOK_TASKS) == 0 + + +@pytest.mark.asyncio +async def test_handler_does_not_skip_loose_submitted_shape(executor) -> None: + """Round-2 expert review (P1): an adopter that legitimately returns + a sync ``{"status": "submitted", ...}`` (queue-acceptance with + extra metadata) must NOT have the auto-emit suppressed. The + framework's TaskHandoff projection is the EXACT 2-key shape + ``{"task_id", "status"}``; only that exact shape skips.""" + + class _LooseSubmittedPlatform(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["sales-non-guaranteed"]) + accounts = SingletonAccounts(account_id="hello") + + def get_products(self, req, ctx): + return {"products": []} + + def create_media_buy(self, req, ctx): + # Adopter returns a dict with "status": "submitted" PLUS + # extra fields — NOT a TaskHandoff projection. + return { + "task_id": "mb_xyz", + "status": "submitted", + "media_buy_id": "mb_xyz", + "queued_at": "2026-04-30T23:00:00Z", + } + + def update_media_buy(self, media_buy_id, patch, ctx): + return {} + + def sync_creatives(self, req, ctx): + return {} + + def get_media_buy_delivery(self, req, ctx): + return {} + + sender = AsyncMock() + handler = PlatformHandler( + _LooseSubmittedPlatform(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + auto_emit_completion_webhooks=True, + ) + await handler.create_media_buy(_make_request(with_url=True, idem_suffix="ls"), ToolContext()) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + # Auto-emit MUST fire — the response had extra fields, so it's + # not a TaskHandoff projection. + sender.send_mcp.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_gate_swallows_unexpected_exceptions(caplog) -> None: + """Round-2 expert review (P0): the gate's body MUST never propagate + an exception to the handler shim. Test by passing a sender whose + method-resolution raises (simulating a broken duck-typed sender). + The handler returns successfully and the gate logs the failure.""" + import logging + + # Sender that raises on attribute access — simulates a misconfigured + # duck-typed object that passes the ``sender is None`` check but + # explodes inside ``send_mcp`` lookup. + class _ExplodingSender: + @property + def send_mcp(self): + raise RuntimeError("synthetic sender access failure") + + class _Config: + url = "https://buyer.example.com/wh" + token = None + + class _Params: + push_notification_config = _Config() + + with caplog.at_level(logging.WARNING, logger="adcp.decisioning.webhook_emit"): + # Must NOT raise — the gate's outer try/except swallows. + maybe_emit_sync_completion( + sender=_ExplodingSender(), # type: ignore[arg-type] + enabled=True, + method_name="create_media_buy", + params=_Params(), + result={"media_buy_id": "mb_1"}, + ) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + # The logged failure surfaces via the framework logger so + # operators see it without breaking the buyer's sync response. + assert any("sync completion webhook" in rec.message for rec in caplog.records)