diff --git a/src/adcp/server/a2a_server.py b/src/adcp/server/a2a_server.py index 9e4f1126..b178fb5c 100644 --- a/src/adcp/server/a2a_server.py +++ b/src/adcp/server/a2a_server.py @@ -226,7 +226,11 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non structured_error_types = (ADCPError, _DecisioningAdcpError) try: result = await self._dispatch_with_middleware(skill_name, params, tool_context) - await self._send_result(event_queue, context, skill_name, result) + # ``params`` carries the parsed wire request including any + # ``context`` extension. Both success and error paths thread + # it through to the result builder so the context-passthrough + # contract holds across the dispatch outcome. + await self._send_result(event_queue, context, skill_name, result, params) except structured_error_types as exc: # Application-layer AdCP error. Emit a failed task with the # adcp_error in a DataPart per transport-errors.mdx §A2A @@ -234,7 +238,7 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non # channel is reserved for transport-level errors (auth # rejected, rate-limited pre-dispatch). logger.info("AdCP application error for skill %s: %s", skill_name, exc) - await self._send_adcp_error(event_queue, context, exc) + await self._send_adcp_error(event_queue, context, exc, params) except Exception: logger.exception("Error executing skill %s", skill_name) await self._send_error(event_queue, context, f"Skill execution failed: {skill_name}") @@ -391,8 +395,17 @@ async def _send_result( context: RequestContext, skill_name: str, result: Any, + params: dict[str, Any] | None = None, ) -> None: - """Publish a completed task with the skill result.""" + """Publish a completed task with the skill result. + + When ``params`` is supplied and carries a wire ``context`` field, + echo it onto the result DataPart per the AdCP context-passthrough + contract. This mirrors the MCP success path's + :func:`adcp.server.helpers.inject_context` call in + :mod:`adcp.server.mcp_tools` and keeps the error path's echo + (see :meth:`_send_adcp_error`) symmetric on A2A. + """ # Normalize result to a JSON-safe dict if hasattr(result, "model_dump"): data = result.model_dump(mode="json", exclude_none=True) @@ -401,6 +414,11 @@ async def _send_result( else: data = result + if params is not None and isinstance(data, dict): + from adcp.server.helpers import inject_context + + inject_context(params, data) + task = _make_task( context, state=pb.TaskState.TASK_STATE_COMPLETED, @@ -428,6 +446,7 @@ async def _send_adcp_error( event_queue: EventQueue, context: RequestContext, exc: Any, + params: dict[str, Any] | None = None, ) -> None: """Publish a failed task carrying an AdCP ``adcp_error`` payload. @@ -442,9 +461,18 @@ async def _send_adcp_error( is shared with the MCP path via :func:`adcp.server.translate._extract_structured_fields`, so both transports project off the same source-of-truth shape. + + When ``params`` is supplied and carries a wire ``context`` field, + that field is echoed alongside ``adcp_error`` in the DataPart — + symmetric with the success path's + :func:`adcp.server.helpers.inject_context` call. Without this + echo, error responses violate the AdCP context-passthrough + contract and buyers lose correlation IDs across the + raise-AdcpError boundary. """ # Lazy import — ``translate.py`` pulls in heavier server deps # (mcp.types) which the A2A module doesn't otherwise need. + from adcp.server.helpers import inject_context from adcp.server.translate import _extract_structured_fields code, message, recovery, field, suggestion, details, _errors = _extract_structured_fields( @@ -467,10 +495,14 @@ async def _send_adcp_error( if details: adcp_error["details"] = dict(details) + data: dict[str, Any] = {"adcp_error": adcp_error} + if params is not None: + inject_context(params, data) + task = _make_task( context, state=pb.TaskState.TASK_STATE_FAILED, - data={"adcp_error": adcp_error}, + data=data, message=message, ) await event_queue.enqueue_event(task) diff --git a/src/adcp/server/serve.py b/src/adcp/server/serve.py index 4216378f..f5baf9eb 100644 --- a/src/adcp/server/serve.py +++ b/src/adcp/server/serve.py @@ -1794,14 +1794,19 @@ async def _call_handler() -> Any: # ``-> dict[str, Any]`` annotation drives FastMCP's output_schema # derivation; the actual return type is broader (CallToolResult # is a valid return per the lowlevel handler's contract). - return build_mcp_error_result(exc) # type: ignore[return-value] + # + # ``kwargs`` is the raw request dict — passing it lets the + # builder echo the request's ``context`` extension into the + # error envelope, symmetric with the success path's + # ``inject_context`` call (mcp_tools.py). + return build_mcp_error_result(exc, params=kwargs) # type: ignore[return-value] except Exception as exc: # Decisioning ``AdcpError`` is NOT a subclass of # ``adcp.exceptions.ADCPError`` (different class hierarchy # — ``adcp.decisioning.types.AdcpError``). Catch it explicitly # and project the same structured envelope. if DecisioningAdcpError is not None and isinstance(exc, DecisioningAdcpError): - return build_mcp_error_result(exc) # type: ignore[return-value] + return build_mcp_error_result(exc, params=kwargs) # type: ignore[return-value] raise # Pre-built CallToolResult (error envelope from build_mcp_error_result) # passes through FastMCP's convert_result and the lowlevel handler diff --git a/src/adcp/server/translate.py b/src/adcp/server/translate.py index b86d1a0c..0280e2a0 100644 --- a/src/adcp/server/translate.py +++ b/src/adcp/server/translate.py @@ -164,7 +164,11 @@ def _extract_structured_fields( return code, message, recovery, field, suggestion, details, errors -def build_mcp_error_result(exc: ADCPError | Error | Any) -> CallToolResult: +def build_mcp_error_result( + exc: ADCPError | Error | Any, + *, + params: dict[str, Any] | None = None, +) -> CallToolResult: """Build an MCP ``CallToolResult`` carrying the structured ``adcp_error`` envelope. The framework dispatcher returns this when a platform method raises a @@ -181,7 +185,17 @@ def build_mcp_error_result(exc: ADCPError | Error | Any) -> CallToolResult: Buyer agents read the structured envelope first; the text fallback is only consulted when ``structuredContent`` is absent, per the spec's structured-error precedence rules. + + When ``params`` is supplied and carries a ``context`` field, that + field is echoed onto the structuredContent envelope alongside + ``adcp_error`` — symmetric with the success path's + :func:`adcp.server.helpers.inject_context` call. Without this echo, + error responses violate the AdCP context-passthrough contract and + buyers lose correlation IDs and idempotency hints across the + raise-AdcpError boundary. """ + from adcp.server.helpers import inject_context + code, message, recovery, field, suggestion, details, _errors = _extract_structured_fields(exc) adcp_error: dict[str, Any] = { @@ -208,9 +222,13 @@ def build_mcp_error_result(exc: ADCPError | Error | Any) -> CallToolResult: if suggestion: text += f"\nSuggestion: {suggestion}" + structured: dict[str, Any] = {"adcp_error": adcp_error} + if params is not None: + inject_context(params, structured) + return CallToolResult( content=[TextContent(type="text", text=text)], - structuredContent={"adcp_error": adcp_error}, + structuredContent=structured, isError=True, ) diff --git a/tests/test_a2a_structured_error.py b/tests/test_a2a_structured_error.py index e2ce14f6..b46abfbc 100644 --- a/tests/test_a2a_structured_error.py +++ b/tests/test_a2a_structured_error.py @@ -69,10 +69,10 @@ def _empty_call_context() -> Any: return ServerCallContext(user=UnauthenticatedUser()) -def _request_context(skill: str) -> _RealRequestContext: +def _request_context(skill: str, parameters: dict[str, Any] | None = None) -> _RealRequestContext: return _RealRequestContext( call_context=_empty_call_context(), - request=pb.SendMessageRequest(message=_make_datapart_msg(skill)), + request=pb.SendMessageRequest(message=_make_datapart_msg(skill, parameters)), ) @@ -330,3 +330,156 @@ async def test_adcp_task_error_with_field_projects_field() -> None: payload = _adcp_error_data_part(event) assert payload["code"] == "VALIDATION_ERROR" assert payload["field"] == "packages[0].budget" + + +# ============================================================================ +# Issue #557: AdCP context-passthrough on the error path +# ============================================================================ +# +# The success path emits the request's ``context`` extension back into +# the response. The error path must do the same so buyers retain +# correlation IDs and idempotency hints across the raise-AdcpError +# boundary. + + +def _adcp_error_full_payload(task: pb.Task) -> dict[str, Any]: + """Pull the full DataPart payload (sibling fields incl. ``context``).""" + assert task.artifacts, "expected at least one artifact on failed task" + for part in task.artifacts[0].parts: + if part.WhichOneof("content") != "data": + continue + payload = _MessageToDict(part.data) + if isinstance(payload, dict) and "adcp_error" in payload: + return payload + raise AssertionError("no adcp_error DataPart found on task artifacts") + + +@pytest.mark.asyncio +async def test_request_context_echoes_into_error_envelope() -> None: + """A request with a ``context`` field that triggers an AdcpError raise + produces a failed-task DataPart with that ``context`` echoed alongside + ``adcp_error``.""" + handler = _DecisioningRaiser( + lambda: DecisioningAdcpError("MEDIA_BUY_NOT_FOUND", message="no such buy") + ) + executor = _executor(handler) + queue = EventQueue() + + request_context = {"correlation_id": "buyer-req-42", "trace_id": "abc"} + await executor.execute(_request_context("get_products", {"context": request_context}), queue) + + event = await queue.dequeue_event() + payload = _adcp_error_full_payload(event) + assert payload["adcp_error"]["code"] == "MEDIA_BUY_NOT_FOUND" + assert payload.get("context") == request_context + + +@pytest.mark.asyncio +async def test_no_request_context_omits_context_from_error_envelope() -> None: + """When the request carries no ``context`` field, the error DataPart + MUST NOT synthesise one — only echo what the buyer sent.""" + handler = _DecisioningRaiser( + lambda: DecisioningAdcpError("MEDIA_BUY_NOT_FOUND", message="no such buy") + ) + executor = _executor(handler) + queue = EventQueue() + + await executor.execute(_request_context("get_products"), queue) + + event = await queue.dequeue_event() + payload = _adcp_error_full_payload(event) + assert "context" not in payload + + +@pytest.mark.asyncio +async def test_echoed_context_is_sibling_of_adcp_error_not_inside() -> None: + """``context`` lands at the DataPart top level, not inside ``adcp_error``.""" + handler = _DecisioningRaiser(lambda: DecisioningAdcpError("INTERNAL_ERROR", message="oops")) + executor = _executor(handler) + queue = EventQueue() + + request_context = {"correlation_id": "abc-123"} + await executor.execute(_request_context("get_products", {"context": request_context}), queue) + + event = await queue.dequeue_event() + payload = _adcp_error_full_payload(event) + assert payload.get("context") == request_context + assert "context" not in payload["adcp_error"] + + +@pytest.mark.asyncio +async def test_oversized_request_context_dropped_on_error_path() -> None: + """An oversized ``context`` (>64KB) is silently dropped per the + inject_context size cap — buyers cannot use the error envelope to + amplify response size by stuffing the request context.""" + handler = _DecisioningRaiser(lambda: DecisioningAdcpError("INTERNAL_ERROR", message="oops")) + executor = _executor(handler) + queue = EventQueue() + + huge_context = {"junk": "A" * (65 * 1024)} + await executor.execute(_request_context("get_products", {"context": huge_context}), queue) + + event = await queue.dequeue_event() + payload = _adcp_error_full_payload(event) + assert "context" not in payload + + +# ============================================================================ +# A2A success path also echoes context (parity with MCP success path) +# ============================================================================ + + +def _success_data_payload(task: pb.Task) -> dict[str, Any]: + """Pull the DataPart payload from a completed task.""" + assert task.artifacts, "expected at least one artifact on completed task" + for part in task.artifacts[0].parts: + if part.WhichOneof("content") != "data": + continue + payload = _MessageToDict(part.data) + if isinstance(payload, dict): + return payload + raise AssertionError("no DataPart found on task artifacts") + + +@pytest.mark.asyncio +async def test_a2a_success_path_echoes_request_context() -> None: + """A successful A2A skill response echoes the request's ``context`` + extension, matching the MCP success path's ``inject_context`` call. + Without this the AdCP context-passthrough contract holds on errors + but not on successes — a strange asymmetry this PR closes.""" + + class _OkHandler(_AdcpCapsBase): + async def get_products(self, _params: Any, _context: Any = None) -> Any: + return {"products": []} + + executor = _executor(_OkHandler()) + queue = EventQueue() + + request_context = {"correlation_id": "buyer-req-7"} + await executor.execute(_request_context("get_products", {"context": request_context}), queue) + + event = await queue.dequeue_event() + assert isinstance(event, pb.Task) + assert event.status.state == pb.TaskState.TASK_STATE_COMPLETED + payload = _success_data_payload(event) + assert payload.get("context") == request_context + assert payload.get("products") == [] + + +@pytest.mark.asyncio +async def test_a2a_success_path_no_request_context_omits_echo() -> None: + """No request-side ``context`` → no synthesized one on the success + response either.""" + + class _OkHandler(_AdcpCapsBase): + async def get_products(self, _params: Any, _context: Any = None) -> Any: + return {"products": []} + + executor = _executor(_OkHandler()) + queue = EventQueue() + + await executor.execute(_request_context("get_products"), queue) + + event = await queue.dequeue_event() + payload = _success_data_payload(event) + assert "context" not in payload diff --git a/tests/test_mcp_structured_error.py b/tests/test_mcp_structured_error.py index 6096b820..782b4625 100644 --- a/tests/test_mcp_structured_error.py +++ b/tests/test_mcp_structured_error.py @@ -264,6 +264,85 @@ async def caller(_kwargs: dict[str, Any], *, context: Any = None) -> Any: assert result.structuredContent["adcp_error"]["code"] == "IDEMPOTENCY_CONFLICT" +class TestBuildMcpErrorResultContextEcho: + """Issue #557: AdCP context-passthrough contract on the error path. + + The success path runs ``inject_context(raw_params, response)`` so a + request's ``context`` extension echoes back to the buyer. The error + path must do the same — without it, buyers lose correlation IDs and + idempotency hints across the raise-AdcpError boundary. + """ + + def test_no_params_omits_context_from_envelope(self): + exc = DecisioningAdcpError("INTERNAL_ERROR", message="oops") + result = build_mcp_error_result(exc) + assert "context" not in result.structuredContent + + def test_params_without_context_omits_context_from_envelope(self): + exc = DecisioningAdcpError("INTERNAL_ERROR", message="oops") + result = build_mcp_error_result(exc, params={"media_buy_id": "mb-1"}) + assert "context" not in result.structuredContent + + def test_params_with_context_echoes_into_envelope(self): + exc = DecisioningAdcpError("INTERNAL_ERROR", message="oops") + ctx = {"correlation_id": "abc-123", "buyer_trace": "trace-xyz"} + result = build_mcp_error_result(exc, params={"media_buy_id": "mb-1", "context": ctx}) + assert result.structuredContent.get("context") == ctx + + def test_echoed_context_is_sibling_of_adcp_error_not_inside_it(self): + exc = DecisioningAdcpError("INTERNAL_ERROR", message="oops") + ctx = {"correlation_id": "abc-123"} + result = build_mcp_error_result(exc, params={"context": ctx}) + assert "context" in result.structuredContent + assert "context" not in result.structuredContent["adcp_error"] + + def test_oversized_context_silently_dropped(self): + """``inject_context``'s 64KB cap applies on the error path too — + prevents response-size amplification via buyer-controlled context.""" + exc = DecisioningAdcpError("INTERNAL_ERROR", message="oops") + huge = {"junk": "A" * (65 * 1024)} + result = build_mcp_error_result(exc, params={"context": huge}) + assert "context" not in result.structuredContent + + +@pytest.mark.asyncio +async def test_context_echo_round_trips_through_register_tool(): + """End-to-end: a request with a ``context`` field that triggers an + AdcpError raise produces a wire response with that same ``context`` + echoed alongside ``adcp_error`` in structuredContent. + """ + from mcp.server.fastmcp import FastMCP + + from adcp.server.serve import _register_tool + + async def caller(_kwargs: dict[str, Any], *, context: Any = None) -> Any: + raise DecisioningAdcpError( + "MEDIA_BUY_NOT_FOUND", + message="No media buy with id mb-404", + recovery="terminal", + ) + + mcp = FastMCP("test-context-echo") + _register_tool( + mcp, + "get_media_buy_delivery", + "test description", + {"type": "object"}, + caller, + ) + + request_context = {"correlation_id": "buyer-req-42"} + result = await mcp.call_tool( + "get_media_buy_delivery", + {"media_buy_id": "mb-404", "context": request_context}, + ) + + assert isinstance(result, CallToolResult) + assert result.isError is True + assert result.structuredContent["adcp_error"]["code"] == "MEDIA_BUY_NOT_FOUND" + assert result.structuredContent.get("context") == request_context + + @pytest.mark.asyncio async def test_success_path_unchanged(): """Regression: success-path responses still validate against the