Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions src/adcp/server/a2a_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,19 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non
structured_error_types = (ADCPError, _DecisioningAdcpError)
try:
result = await self._dispatch_with_middleware(skill_name, params, tool_context)
await self._send_result(event_queue, context, skill_name, result)
# ``params`` carries the parsed wire request including any
# ``context`` extension. Both success and error paths thread
# it through to the result builder so the context-passthrough
# contract holds across the dispatch outcome.
await self._send_result(event_queue, context, skill_name, result, params)
except structured_error_types as exc:
# Application-layer AdCP error. Emit a failed task with the
# adcp_error in a DataPart per transport-errors.mdx §A2A
# Binding, plus a human-readable text part. The JSON-RPC
# channel is reserved for transport-level errors (auth
# rejected, rate-limited pre-dispatch).
logger.info("AdCP application error for skill %s: %s", skill_name, exc)
await self._send_adcp_error(event_queue, context, exc)
await self._send_adcp_error(event_queue, context, exc, params)
except Exception:
logger.exception("Error executing skill %s", skill_name)
await self._send_error(event_queue, context, f"Skill execution failed: {skill_name}")
Expand Down Expand Up @@ -391,8 +395,17 @@ async def _send_result(
context: RequestContext,
skill_name: str,
result: Any,
params: dict[str, Any] | None = None,
) -> None:
"""Publish a completed task with the skill result."""
"""Publish a completed task with the skill result.

When ``params`` is supplied and carries a wire ``context`` field,
echo it onto the result DataPart per the AdCP context-passthrough
contract. This mirrors the MCP success path's
:func:`adcp.server.helpers.inject_context` call in
:mod:`adcp.server.mcp_tools` and keeps the error path's echo
(see :meth:`_send_adcp_error`) symmetric on A2A.
"""
# Normalize result to a JSON-safe dict
if hasattr(result, "model_dump"):
data = result.model_dump(mode="json", exclude_none=True)
Expand All @@ -401,6 +414,11 @@ async def _send_result(
else:
data = result

if params is not None and isinstance(data, dict):
from adcp.server.helpers import inject_context

inject_context(params, data)

task = _make_task(
context,
state=pb.TaskState.TASK_STATE_COMPLETED,
Expand Down Expand Up @@ -428,6 +446,7 @@ async def _send_adcp_error(
event_queue: EventQueue,
context: RequestContext,
exc: Any,
params: dict[str, Any] | None = None,
) -> None:
"""Publish a failed task carrying an AdCP ``adcp_error`` payload.

Expand All @@ -442,9 +461,18 @@ async def _send_adcp_error(
is shared with the MCP path via
:func:`adcp.server.translate._extract_structured_fields`, so
both transports project off the same source-of-truth shape.

When ``params`` is supplied and carries a wire ``context`` field,
that field is echoed alongside ``adcp_error`` in the DataPart —
symmetric with the success path's
:func:`adcp.server.helpers.inject_context` call. Without this
echo, error responses violate the AdCP context-passthrough
contract and buyers lose correlation IDs across the
raise-AdcpError boundary.
"""
# Lazy import — ``translate.py`` pulls in heavier server deps
# (mcp.types) which the A2A module doesn't otherwise need.
from adcp.server.helpers import inject_context
from adcp.server.translate import _extract_structured_fields

code, message, recovery, field, suggestion, details, _errors = _extract_structured_fields(
Expand All @@ -467,10 +495,14 @@ async def _send_adcp_error(
if details:
adcp_error["details"] = dict(details)

data: dict[str, Any] = {"adcp_error": adcp_error}
if params is not None:
inject_context(params, data)

task = _make_task(
context,
state=pb.TaskState.TASK_STATE_FAILED,
data={"adcp_error": adcp_error},
data=data,
message=message,
)
await event_queue.enqueue_event(task)
Expand Down
9 changes: 7 additions & 2 deletions src/adcp/server/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -1794,14 +1794,19 @@ async def _call_handler() -> Any:
# ``-> dict[str, Any]`` annotation drives FastMCP's output_schema
# derivation; the actual return type is broader (CallToolResult
# is a valid return per the lowlevel handler's contract).
return build_mcp_error_result(exc) # type: ignore[return-value]
#
# ``kwargs`` is the raw request dict — passing it lets the
# builder echo the request's ``context`` extension into the
# error envelope, symmetric with the success path's
# ``inject_context`` call (mcp_tools.py).
return build_mcp_error_result(exc, params=kwargs) # type: ignore[return-value]
except Exception as exc:
# Decisioning ``AdcpError`` is NOT a subclass of
# ``adcp.exceptions.ADCPError`` (different class hierarchy
# — ``adcp.decisioning.types.AdcpError``). Catch it explicitly
# and project the same structured envelope.
if DecisioningAdcpError is not None and isinstance(exc, DecisioningAdcpError):
return build_mcp_error_result(exc) # type: ignore[return-value]
return build_mcp_error_result(exc, params=kwargs) # type: ignore[return-value]
raise
# Pre-built CallToolResult (error envelope from build_mcp_error_result)
# passes through FastMCP's convert_result and the lowlevel handler
Expand Down
22 changes: 20 additions & 2 deletions src/adcp/server/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,11 @@ def _extract_structured_fields(
return code, message, recovery, field, suggestion, details, errors


def build_mcp_error_result(exc: ADCPError | Error | Any) -> CallToolResult:
def build_mcp_error_result(
exc: ADCPError | Error | Any,
*,
params: dict[str, Any] | None = None,
) -> CallToolResult:
"""Build an MCP ``CallToolResult`` carrying the structured ``adcp_error`` envelope.

The framework dispatcher returns this when a platform method raises a
Expand All @@ -181,7 +185,17 @@ def build_mcp_error_result(exc: ADCPError | Error | Any) -> CallToolResult:
Buyer agents read the structured envelope first; the text fallback
is only consulted when ``structuredContent`` is absent, per the
spec's structured-error precedence rules.

When ``params`` is supplied and carries a ``context`` field, that
field is echoed onto the structuredContent envelope alongside
``adcp_error`` — symmetric with the success path's
:func:`adcp.server.helpers.inject_context` call. Without this echo,
error responses violate the AdCP context-passthrough contract and
buyers lose correlation IDs and idempotency hints across the
raise-AdcpError boundary.
"""
from adcp.server.helpers import inject_context

code, message, recovery, field, suggestion, details, _errors = _extract_structured_fields(exc)

adcp_error: dict[str, Any] = {
Expand All @@ -208,9 +222,13 @@ def build_mcp_error_result(exc: ADCPError | Error | Any) -> CallToolResult:
if suggestion:
text += f"\nSuggestion: {suggestion}"

structured: dict[str, Any] = {"adcp_error": adcp_error}
if params is not None:
inject_context(params, structured)

return CallToolResult(
content=[TextContent(type="text", text=text)],
structuredContent={"adcp_error": adcp_error},
structuredContent=structured,
isError=True,
)

Expand Down
157 changes: 155 additions & 2 deletions tests/test_a2a_structured_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ def _empty_call_context() -> Any:
return ServerCallContext(user=UnauthenticatedUser())


def _request_context(skill: str) -> _RealRequestContext:
def _request_context(skill: str, parameters: dict[str, Any] | None = None) -> _RealRequestContext:
return _RealRequestContext(
call_context=_empty_call_context(),
request=pb.SendMessageRequest(message=_make_datapart_msg(skill)),
request=pb.SendMessageRequest(message=_make_datapart_msg(skill, parameters)),
)


Expand Down Expand Up @@ -330,3 +330,156 @@ async def test_adcp_task_error_with_field_projects_field() -> None:
payload = _adcp_error_data_part(event)
assert payload["code"] == "VALIDATION_ERROR"
assert payload["field"] == "packages[0].budget"


# ============================================================================
# Issue #557: AdCP context-passthrough on the error path
# ============================================================================
#
# The success path emits the request's ``context`` extension back into
# the response. The error path must do the same so buyers retain
# correlation IDs and idempotency hints across the raise-AdcpError
# boundary.


def _adcp_error_full_payload(task: pb.Task) -> dict[str, Any]:
"""Pull the full DataPart payload (sibling fields incl. ``context``)."""
assert task.artifacts, "expected at least one artifact on failed task"
for part in task.artifacts[0].parts:
if part.WhichOneof("content") != "data":
continue
payload = _MessageToDict(part.data)
if isinstance(payload, dict) and "adcp_error" in payload:
return payload
raise AssertionError("no adcp_error DataPart found on task artifacts")


@pytest.mark.asyncio
async def test_request_context_echoes_into_error_envelope() -> None:
"""A request with a ``context`` field that triggers an AdcpError raise
produces a failed-task DataPart with that ``context`` echoed alongside
``adcp_error``."""
handler = _DecisioningRaiser(
lambda: DecisioningAdcpError("MEDIA_BUY_NOT_FOUND", message="no such buy")
)
executor = _executor(handler)
queue = EventQueue()

request_context = {"correlation_id": "buyer-req-42", "trace_id": "abc"}
await executor.execute(_request_context("get_products", {"context": request_context}), queue)

event = await queue.dequeue_event()
payload = _adcp_error_full_payload(event)
assert payload["adcp_error"]["code"] == "MEDIA_BUY_NOT_FOUND"
assert payload.get("context") == request_context


@pytest.mark.asyncio
async def test_no_request_context_omits_context_from_error_envelope() -> None:
"""When the request carries no ``context`` field, the error DataPart
MUST NOT synthesise one — only echo what the buyer sent."""
handler = _DecisioningRaiser(
lambda: DecisioningAdcpError("MEDIA_BUY_NOT_FOUND", message="no such buy")
)
executor = _executor(handler)
queue = EventQueue()

await executor.execute(_request_context("get_products"), queue)

event = await queue.dequeue_event()
payload = _adcp_error_full_payload(event)
assert "context" not in payload


@pytest.mark.asyncio
async def test_echoed_context_is_sibling_of_adcp_error_not_inside() -> None:
"""``context`` lands at the DataPart top level, not inside ``adcp_error``."""
handler = _DecisioningRaiser(lambda: DecisioningAdcpError("INTERNAL_ERROR", message="oops"))
executor = _executor(handler)
queue = EventQueue()

request_context = {"correlation_id": "abc-123"}
await executor.execute(_request_context("get_products", {"context": request_context}), queue)

event = await queue.dequeue_event()
payload = _adcp_error_full_payload(event)
assert payload.get("context") == request_context
assert "context" not in payload["adcp_error"]


@pytest.mark.asyncio
async def test_oversized_request_context_dropped_on_error_path() -> None:
"""An oversized ``context`` (>64KB) is silently dropped per the
inject_context size cap — buyers cannot use the error envelope to
amplify response size by stuffing the request context."""
handler = _DecisioningRaiser(lambda: DecisioningAdcpError("INTERNAL_ERROR", message="oops"))
executor = _executor(handler)
queue = EventQueue()

huge_context = {"junk": "A" * (65 * 1024)}
await executor.execute(_request_context("get_products", {"context": huge_context}), queue)

event = await queue.dequeue_event()
payload = _adcp_error_full_payload(event)
assert "context" not in payload


# ============================================================================
# A2A success path also echoes context (parity with MCP success path)
# ============================================================================


def _success_data_payload(task: pb.Task) -> dict[str, Any]:
"""Pull the DataPart payload from a completed task."""
assert task.artifacts, "expected at least one artifact on completed task"
for part in task.artifacts[0].parts:
if part.WhichOneof("content") != "data":
continue
payload = _MessageToDict(part.data)
if isinstance(payload, dict):
return payload
raise AssertionError("no DataPart found on task artifacts")


@pytest.mark.asyncio
async def test_a2a_success_path_echoes_request_context() -> None:
"""A successful A2A skill response echoes the request's ``context``
extension, matching the MCP success path's ``inject_context`` call.
Without this the AdCP context-passthrough contract holds on errors
but not on successes — a strange asymmetry this PR closes."""

class _OkHandler(_AdcpCapsBase):
async def get_products(self, _params: Any, _context: Any = None) -> Any:
return {"products": []}

executor = _executor(_OkHandler())
queue = EventQueue()

request_context = {"correlation_id": "buyer-req-7"}
await executor.execute(_request_context("get_products", {"context": request_context}), queue)

event = await queue.dequeue_event()
assert isinstance(event, pb.Task)
assert event.status.state == pb.TaskState.TASK_STATE_COMPLETED
payload = _success_data_payload(event)
assert payload.get("context") == request_context
assert payload.get("products") == []


@pytest.mark.asyncio
async def test_a2a_success_path_no_request_context_omits_echo() -> None:
"""No request-side ``context`` → no synthesized one on the success
response either."""

class _OkHandler(_AdcpCapsBase):
async def get_products(self, _params: Any, _context: Any = None) -> Any:
return {"products": []}

executor = _executor(_OkHandler())
queue = EventQueue()

await executor.execute(_request_context("get_products"), queue)

event = await queue.dequeue_event()
payload = _success_data_payload(event)
assert "context" not in payload
Loading
Loading