Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
434 changes: 434 additions & 0 deletions examples/a2a_sqlalchemy_tasks.py

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions src/adcp/decisioning/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
from adcp.decisioning.task_registry import TaskRegistry
from adcp.decisioning.types import Account
from adcp.webhook_sender import WebhookSender
from adcp.webhook_supervisor import WebhookDeliverySupervisor


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -465,6 +466,7 @@ def __init__(
state_reader: StateReader | None = None,
resource_resolver: ResourceResolver | None = None,
webhook_sender: WebhookSender | None = None,
webhook_supervisor: WebhookDeliverySupervisor | None = None,
auto_emit_completion_webhooks: bool = True,
) -> None:
super().__init__()
Expand All @@ -474,6 +476,7 @@ def __init__(
self._state_reader = state_reader
self._resource_resolver = resource_resolver
self._webhook_sender = webhook_sender
self._webhook_supervisor = webhook_supervisor
self._auto_emit_completion_webhooks = auto_emit_completion_webhooks

# ----- account resolution helper -----
Expand Down Expand Up @@ -569,6 +572,7 @@ def _maybe_auto_emit_sync_completion(
return
maybe_emit_sync_completion(
sender=self._webhook_sender,
supervisor=self._webhook_supervisor,
enabled=self._auto_emit_completion_webhooks,
method_name=method_name,
params=params,
Expand Down
37 changes: 31 additions & 6 deletions src/adcp/decisioning/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from adcp.decisioning.state import StateReader
from adcp.decisioning.task_registry import TaskRegistry
from adcp.webhook_sender import WebhookSender
from adcp.webhook_supervisor import WebhookDeliverySupervisor


def _is_production_env() -> bool:
Expand Down Expand Up @@ -77,6 +78,7 @@ def create_adcp_server_from_platform(
state_reader: StateReader | None = None,
resource_resolver: ResourceResolver | None = None,
webhook_sender: WebhookSender | None = None,
webhook_supervisor: WebhookDeliverySupervisor | None = None,
auto_emit_completion_webhooks: bool = True,
) -> tuple[PlatformHandler, ThreadPoolExecutor, TaskRegistry]:
"""Build the :class:`PlatformHandler` + supporting wiring from a
Expand Down Expand Up @@ -122,11 +124,22 @@ def create_adcp_server_from_platform(
v6.1).
:param webhook_sender: Bring-your-own
:class:`adcp.webhook_sender.WebhookSender` for sync-completion
and HITL-completion webhook delivery. Default ``None`` — when
unset, sync-completion auto-emit is a silent no-op (no URL to
deliver to, framework can't synthesize a sender). Adopters
wiring webhook delivery pass a configured sender (with their
signing key, IP-pinned transport, etc.).
and HITL-completion webhook delivery. Default ``None``. The
sender is the *transport* — one HTTP-Signatures POST per call,
no retry, no breaker. Production sellers typically wrap the
sender in a :class:`~adcp.webhook_supervisor.WebhookDeliverySupervisor`
and pass that via ``webhook_supervisor=`` instead.
:param webhook_supervisor: Bring-your-own
:class:`~adcp.webhook_supervisor.WebhookDeliverySupervisor` for
reliable delivery (retry, circuit breaker, attempt audit). When
passed, the F12 auto-emit path routes through it instead of
``webhook_sender``. The reference
:class:`~adcp.webhook_supervisor.InMemoryWebhookDeliverySupervisor`
wraps a sender; adopters with infra-side retry (Celery, Kafka,
durable outbox) implement the Protocol against their queue.
Mutually optional with ``webhook_sender``; passing both is
valid (supervisor wins for auto-emit, sender remains available
for direct calls inside platform methods).
:param auto_emit_completion_webhooks: F12 feature gate. When
``True`` (default), the framework auto-fires a completion
webhook on the sync-success arm of mutating tools whenever the
Expand Down Expand Up @@ -235,6 +248,7 @@ def create_adcp_server_from_platform(
state_reader=state_reader,
resource_resolver=resource_resolver,
webhook_sender=webhook_sender,
webhook_supervisor=webhook_supervisor,
auto_emit_completion_webhooks=auto_emit_completion_webhooks,
)

Expand All @@ -255,6 +269,7 @@ def create_adcp_server_from_platform(
validate_webhook_sender_for_platform(
advertised_tools=handler.advertised_tools_for_instance(),
sender=webhook_sender,
supervisor=webhook_supervisor,
auto_emit=auto_emit_completion_webhooks,
)

Expand All @@ -271,6 +286,7 @@ def serve(
state_reader: StateReader | None = None,
resource_resolver: ResourceResolver | None = None,
webhook_sender: WebhookSender | None = None,
webhook_supervisor: WebhookDeliverySupervisor | None = None,
auto_emit_completion_webhooks: bool = True,
advertise_all: bool = False,
**serve_kwargs: Any,
Expand All @@ -294,7 +310,15 @@ def serve(
:param resource_resolver: Custom :class:`ResourceResolver` impl (D15).
:param webhook_sender: BYO :class:`adcp.webhook_sender.WebhookSender`
for completion webhook delivery (sync auto-emit + HITL terminal).
``None`` disables auto-emit silently.
Transport only — one attempt, no retry. ``None`` disables
auto-emit silently.
:param webhook_supervisor: BYO
:class:`~adcp.webhook_supervisor.WebhookDeliverySupervisor` for
reliable delivery (retry, circuit breaker, attempt audit).
Takes precedence over ``webhook_sender`` for F12 auto-emit
when both are passed. Production sellers typically pass an
:class:`~adcp.webhook_supervisor.InMemoryWebhookDeliverySupervisor`
wrapping their sender.
:param auto_emit_completion_webhooks: F12 — auto-fire a completion
webhook on the sync-success arm of mutating tools when the
request supplied ``push_notification_config.url``. Default
Expand Down Expand Up @@ -322,6 +346,7 @@ def serve(
state_reader=state_reader,
resource_resolver=resource_resolver,
webhook_sender=webhook_sender,
webhook_supervisor=webhook_supervisor,
auto_emit_completion_webhooks=auto_emit_completion_webhooks,
)

Expand Down
50 changes: 31 additions & 19 deletions src/adcp/decisioning/webhook_emit.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@

if TYPE_CHECKING:
from adcp.webhook_sender import WebhookSender
from adcp.webhook_supervisor import WebhookDeliverySupervisor

DeliveryTarget = WebhookSender | WebhookDeliverySupervisor

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -125,7 +128,7 @@ def _extract_push_notification_url_and_token(

async def _emit_sync_completion_webhook(
*,
sender: WebhookSender,
target: DeliveryTarget,
url: str,
token: str | None,
method_name: str,
Expand All @@ -139,10 +142,15 @@ async def _emit_sync_completion_webhook(
correlate primarily via resource ids on ``result``
(``media_buy_id``, ``creative_id``, etc.); ``task_id`` here is
informational for the spec's required-field shape.

``target`` is either a bare :class:`WebhookSender` (one attempt,
no breaker) or a :class:`WebhookDeliverySupervisor` (retry, breaker,
optional delivery log). Both expose ``send_mcp(...)`` with
compatible kwargs; the call site is polymorphic.
"""
task_id = f"sync-{uuid.uuid4().hex[:16]}"
try:
await sender.send_mcp(
await target.send_mcp(
url=url,
task_id=task_id,
status="completed",
Expand All @@ -169,6 +177,7 @@ def maybe_emit_sync_completion(
method_name: str,
params: Any,
result: Any,
supervisor: WebhookDeliverySupervisor | None = None,
) -> None:
"""Fire-and-forget auto-emit gate. Called by handler shims after
the sync-success arm of mutating tools.
Expand Down Expand Up @@ -217,7 +226,8 @@ def maybe_emit_sync_completion(
if config is None:
return # buyer didn't register — nothing to do

if sender is None:
target = supervisor or sender
if target is None:
# Buyer registered a webhook config but the adopter didn't
# wire a sender. Without this branch, the buyer's
# notification quietly disappears — they think they
Expand All @@ -237,8 +247,8 @@ def maybe_emit_sync_completion(
logger.warning(
"[adcp.decisioning] buyer registered "
"push_notification_config (url=%s) for %s but auto-emit "
"webhook_sender is None — webhook silently dropped. "
"Pass webhook_sender to "
"has neither webhook_sender nor webhook_supervisor — "
"webhook silently dropped. Pass one to "
"adcp.decisioning.serve.create_adcp_server_from_platform, "
"or set auto_emit_completion_webhooks=False to silence "
"this warning.",
Expand Down Expand Up @@ -279,7 +289,7 @@ def maybe_emit_sync_completion(
return
bg = loop.create_task(
_emit_sync_completion_webhook(
sender=sender,
target=target,
url=url,
token=token,
method_name=method_name,
Expand Down Expand Up @@ -307,16 +317,17 @@ def validate_webhook_sender_for_platform(
advertised_tools: frozenset[str] | set[str],
sender: Any,
auto_emit: bool,
supervisor: Any = None,
) -> None:
"""Server-boot fail-fast for the F12 misconfig (Emma sales-direct
P0 root cause).

When an adopter claims a specialism whose tool surface includes
any spec-eligible webhook task type (e.g., ``create_media_buy``,
``activate_signal``, ``acquire_rights``) AND auto-emit is on AND
no ``webhook_sender`` is wired, every buyer who registers
``push_notification_config.url`` would have their notification
silently dropped. The runtime gate at
neither ``webhook_sender`` nor ``webhook_supervisor`` is wired,
every buyer who registers ``push_notification_config.url`` would
have their notification silently dropped. The runtime gate at
:func:`maybe_emit_sync_completion` warns on the FIRST call, but
by then the buyer has already burned a request and the adopter
has shipped without webhook wiring.
Expand All @@ -335,7 +346,7 @@ def validate_webhook_sender_for_platform(
"""
if not auto_emit:
return
if sender is not None:
if sender is not None or supervisor is not None:
return
eligible = SPEC_WEBHOOK_TASK_TYPES & set(advertised_tools)
if not eligible:
Expand All @@ -347,18 +358,19 @@ def validate_webhook_sender_for_platform(
message=(
"auto_emit_completion_webhooks is enabled and the platform's "
"claimed specialisms expose webhook-eligible tools "
f"{sorted(eligible)!r}, but no webhook_sender was wired. "
"Buyers who register push_notification_config.url on these "
"tools would have their notifications silently dropped. "
"Either pass a configured WebhookSender via "
"adcp.decisioning.serve.create_adcp_server_from_platform("
"..., webhook_sender=...), or set "
"auto_emit_completion_webhooks=False if you handle webhooks "
"manually inside your platform methods."
f"{sorted(eligible)!r}, but neither webhook_sender nor "
"webhook_supervisor was wired. Buyers who register "
"push_notification_config.url on these tools would have their "
"notifications silently dropped. Pass a configured "
"WebhookSender (transport only) or InMemoryWebhookDeliverySupervisor "
"(retry + circuit breaker) to "
"adcp.decisioning.serve.create_adcp_server_from_platform, "
"or set auto_emit_completion_webhooks=False if you handle "
"webhooks manually inside your platform methods."
),
recovery="terminal",
details={
"missing": "webhook_sender",
"missing": "webhook_sender_or_supervisor",
"webhook_eligible_tools": sorted(eligible),
},
)
Expand Down
Loading
Loading