From d6a7dfc3e46de23d60b248178a11435c8ea3e026 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 4 May 2026 01:39:45 +0000 Subject: [PATCH 1/2] =?UTF-8?q?fix(server/a2a):=20structured=20error=20par?= =?UTF-8?q?ity=20with=20MCP=20=E2=80=94=20emit=20field/details/retry=5Faft?= =?UTF-8?q?er,=20catch=20decisioning=20AdcpError=20(closes=20#530)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - `_send_adcp_error` now uses `_extract_structured_fields` from translate.py (introduced in #525) to populate the full adcp_error envelope: code, message, recovery, field, details, retry_after, suggestion. - `execute()` adds a second `except Exception` branch that catches decisioning-layer `AdcpError` (separate hierarchy from `adcp.exceptions.ADCPError`) and routes it through `_send_adcp_error` instead of the generic "Skill execution failed" text path. - Drop now-unused `ADCPTaskError` and `STANDARD_ERROR_CODES` imports. - Tests: parametrized over MEDIA_BUY_NOT_FOUND, PACKAGE_NOT_FOUND, BUDGET_TOO_LOW, TERMS_REJECTED; decisioning AdcpError with retry_after; end-to-end execute() → DataPart round-trip for both paths. https://claude.ai/code/session_019DMGxsA43mtT7QTRiNipBM --- src/adcp/server/a2a_server.py | 66 ++++++++----- tests/test_a2a_server.py | 172 ++++++++++++++++++++++++++++++++++ 2 files changed, 216 insertions(+), 22 deletions(-) diff --git a/src/adcp/server/a2a_server.py b/src/adcp/server/a2a_server.py index 7d482c3e1..0c89a62c2 100644 --- a/src/adcp/server/a2a_server.py +++ b/src/adcp/server/a2a_server.py @@ -35,7 +35,7 @@ from google.protobuf.struct_pb2 import Value from starlette.applications import Starlette -from adcp.exceptions import ADCPError, ADCPTaskError +from adcp.exceptions import ADCPError from adcp.server.base import ADCPHandler, ToolContext if TYPE_CHECKING: @@ -73,7 +73,6 @@ """ -from adcp.server.helpers import STANDARD_ERROR_CODES # noqa: E402 from adcp.server.mcp_tools import create_tool_caller, get_tools_for_handler from adcp.server.test_controller import TestControllerStore, _handle_test_controller @@ -214,9 +213,23 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non # errors (auth rejected, rate-limited pre-dispatch). logger.info("AdCP application error for skill %s: %s", skill_name, exc) await self._send_adcp_error(event_queue, context, exc) - except Exception: - logger.exception("Error executing skill %s", skill_name) - await self._send_error(event_queue, context, f"Skill execution failed: {skill_name}") + except Exception as exc: + # DecisioningPlatform raises adcp.decisioning.types.AdcpError which is + # a separate class hierarchy from adcp.exceptions.ADCPError. Catch it + # here to preserve the structured shape instead of collapsing it into + # a generic "Skill execution failed" text message. + try: + from adcp.decisioning.types import AdcpError as _DecAdcpError # noqa: N813 + except Exception: + _DecAdcpError = None # type: ignore[assignment,misc] # noqa: N806 + if _DecAdcpError is not None and isinstance(exc, _DecAdcpError): + logger.info("AdCP decisioning error for skill %s: %s", skill_name, exc) + await self._send_adcp_error(event_queue, context, exc) + else: + logger.exception("Error executing skill %s", skill_name) + await self._send_error( + event_queue, context, f"Skill execution failed: {skill_name}" + ) async def _dispatch_with_middleware( self, @@ -406,37 +419,46 @@ async def _send_adcp_error( self, event_queue: EventQueue, context: RequestContext, - exc: ADCPError, + exc: Any, ) -> None: """Publish a failed task carrying an AdCP ``adcp_error`` payload. Follows transport-errors.mdx §A2A Binding: failed task with artifact containing a ``DataPart`` keyed under ``adcp_error`` plus a terse ``TextPart`` for human/LLM consumption. + + Accepts both ``adcp.exceptions.ADCPError`` and the decisioning-layer + ``adcp.decisioning.types.AdcpError`` — both are routed here so the + full structured shape (code, message, recovery, field, details, + retry_after) reaches A2A buyers on the same wire path as MCP. """ - # Derive the spec error code. ADCPTaskError carries a list of codes - # (e.g. IdempotencyConflictError → IDEMPOTENCY_CONFLICT); fall back - # to a generic INTERNAL_ERROR when the exception doesn't supply one. - code = "INTERNAL_ERROR" - if isinstance(exc, ADCPTaskError) and exc.error_codes: - code = str(exc.error_codes[0]) - - adcp_error: dict[str, Any] = { + from adcp.server.translate import _extract_structured_fields + + code, message, recovery, field, suggestion, details, _errors = _extract_structured_fields( + exc + ) + + adcp_error_payload: dict[str, Any] = { "code": code, - "message": exc.message, + "message": message, } - recovery = STANDARD_ERROR_CODES.get(code, {}).get("recovery") if recovery: - adcp_error["recovery"] = recovery - suggestion = getattr(exc, "suggestion", None) - if suggestion: - adcp_error["suggestion"] = suggestion + adcp_error_payload["recovery"] = recovery + if field is not None: + adcp_error_payload["field"] = field + if suggestion is not None: + adcp_error_payload["suggestion"] = suggestion + retry_after = getattr(exc, "retry_after", None) + if retry_after is not None: + adcp_error_payload["retry_after"] = retry_after + if details: + adcp_error_payload["details"] = dict(details) task = _make_task( context, state=pb.TaskState.TASK_STATE_FAILED, - data={"adcp_error": adcp_error}, - message=exc.message, + data={"adcp_error": adcp_error_payload}, + message=message, ) await event_queue.enqueue_event(task) diff --git a/tests/test_a2a_server.py b/tests/test_a2a_server.py index 7d19bce8b..ecf72ae7f 100644 --- a/tests/test_a2a_server.py +++ b/tests/test_a2a_server.py @@ -311,6 +311,178 @@ async def test_cancel(): assert event.status.state == pb.TaskState.TASK_STATE_CANCELED +# --------------------------------------------------------------------------- +# Structured A2A error envelope — issue #530 parity with MCP (#525) +# --------------------------------------------------------------------------- + + +def _make_fake_queue() -> tuple[Any, list[Any]]: + """Return (queue, captured) — enqueue_event appends to captured.""" + captured: list[Any] = [] + + class _FakeQueue: + async def enqueue_event(self, event: Any) -> None: + captured.append(event) + + return _FakeQueue(), captured + + +def _make_context_stub() -> Any: + """Minimal RequestContext stub for direct _send_adcp_error calls.""" + from types import SimpleNamespace + + return SimpleNamespace(task_id=None, context_id=None, message=None) + + +def _extract_adcp_error_from_task(task: Any) -> dict[str, Any]: + """Pull the adcp_error dict out of the first DataPart in a failed task.""" + data_parts = [ + _MessageToDict(p.data) + for p in task.artifacts[0].parts + if p.WhichOneof("content") == "data" + ] + assert data_parts, "failed task missing DataPart" + err = data_parts[0].get("adcp_error") + assert err is not None, f"DataPart missing adcp_error key; got {data_parts[0]}" + return err # type: ignore[return-value] + + +@pytest.mark.parametrize( + "code,recovery", + [ + ("MEDIA_BUY_NOT_FOUND", "correctable"), + ("PACKAGE_NOT_FOUND", "correctable"), + ("BUDGET_TOO_LOW", "correctable"), + ("TERMS_REJECTED", "terminal"), # not in STANDARD_ERROR_CODES → default terminal + ], +) +async def test_send_adcp_error_full_envelope_from_adcp_task_error( + code: str, recovery: str +) -> None: + """_send_adcp_error populates field/details from ADCPTaskError with Error model.""" + from adcp.exceptions import ADCPTaskError + from adcp.types import Error + + detail_err = Error( + code=code, + message=f"{code} raised", + field="packages[0].budget", + details={"minimum": 500}, + ) + exc = ADCPTaskError("create_media_buy", [detail_err]) + + executor = ADCPAgentExecutor(_TestHandler()) + queue, captured = _make_fake_queue() + await executor._send_adcp_error(queue, _make_context_stub(), exc) + + assert captured, "no event emitted" + task = captured[0] + assert task.status.state == pb.TaskState.TASK_STATE_FAILED + err = _extract_adcp_error_from_task(task) + + assert err["code"] == code + assert err["recovery"] == recovery + assert err.get("field") == "packages[0].budget" + assert err.get("details") == {"minimum": 500} + + +async def test_send_adcp_error_retry_after_from_decisioning_error() -> None: + """Decisioning AdcpError with retry_after projects onto the wire envelope.""" + from adcp.decisioning.types import AdcpError as DecisioningAdcpError + + exc = DecisioningAdcpError( + "RATE_LIMITED", + message="Too many requests", + recovery="transient", + retry_after=30, + suggestion="Wait 30 seconds", + ) + + executor = ADCPAgentExecutor(_TestHandler()) + queue, captured = _make_fake_queue() + await executor._send_adcp_error(queue, _make_context_stub(), exc) + + assert captured + task = captured[0] + assert task.status.state == pb.TaskState.TASK_STATE_FAILED + err = _extract_adcp_error_from_task(task) + + assert err["code"] == "RATE_LIMITED" + assert err["recovery"] == "transient" + assert err.get("retry_after") == 30 + assert err.get("suggestion") == "Wait 30 seconds" + + +async def test_execute_catches_decisioning_adcp_error() -> None: + """execute() routes decisioning AdcpError through _send_adcp_error (not generic failed).""" + from adcp.decisioning.types import AdcpError as DecisioningAdcpError + + class _SellerRaisesDecisioningError(ADCPHandler): + async def get_adcp_capabilities(self, params: Any, context: Any = None) -> Any: + return {"adcp": {"major_versions": [3]}} + + async def get_products(self, params: Any, context: Any = None) -> Any: + raise DecisioningAdcpError( + "BUDGET_TOO_LOW", + message="Budget $50 is below minimum", + recovery="correctable", + field="packages[0].budget", + suggestion="Increase budget to at least $500", + details={"minimum": 500, "actual": 50}, + ) + + executor = ADCPAgentExecutor(_SellerRaisesDecisioningError()) + ctx = RequestContext(request=MessageSendParams(message=_make_datapart_msg("get_products"))) + queue = EventQueue() + + await executor.execute(ctx, queue) + + event = await queue.dequeue_event() + assert event.status.state == pb.TaskState.TASK_STATE_FAILED + + err = _extract_adcp_error_from_task(event) + assert err["code"] == "BUDGET_TOO_LOW" + assert err["recovery"] == "correctable" + assert err.get("field") == "packages[0].budget" + assert err.get("suggestion") == "Increase budget to at least $500" + assert err.get("details") == {"minimum": 500, "actual": 50} + + # Must not surface as a generic "Skill execution failed" text + text_parts = [ + p.text for p in event.artifacts[0].parts if p.WhichOneof("content") == "text" + ] + assert text_parts, "expected text fallback in failed task" + assert "Skill execution failed" not in text_parts[0] + + +async def test_execute_decisioning_error_terms_rejected() -> None: + """Decisioning AdcpError without optional fields still emits structured code.""" + from adcp.decisioning.types import AdcpError as DecisioningAdcpError + + class _SellerTermsRejected(ADCPHandler): + async def get_adcp_capabilities(self, params: Any, context: Any = None) -> Any: + return {"adcp": {"major_versions": [3]}} + + async def get_products(self, params: Any, context: Any = None) -> Any: + raise DecisioningAdcpError( + "TERMS_REJECTED", + message="Terms must be accepted before buying", + recovery="correctable", + ) + + executor = ADCPAgentExecutor(_SellerTermsRejected()) + ctx = RequestContext(request=MessageSendParams(message=_make_datapart_msg("get_products"))) + queue = EventQueue() + + await executor.execute(ctx, queue) + + event = await queue.dequeue_event() + assert event.status.state == pb.TaskState.TASK_STATE_FAILED + err = _extract_adcp_error_from_task(event) + assert err["code"] == "TERMS_REJECTED" + assert "Skill execution failed" not in str(event.artifacts) + + # --------------------------------------------------------------------------- # Agent card builder # --------------------------------------------------------------------------- From c4f01a27e75f4db4defd68a9124a3e62ac4d35dc Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 4 May 2026 01:43:14 +0000 Subject: [PATCH 2/2] fix(server/a2a): always emit recovery in adcp_error envelope for transport parity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirror build_mcp_error_result: recovery is always included in the A2A DataPart adcp_error dict (no falsy guard), matching the MCP path where recovery is a required key per transport-errors.mdx §A2A Binding. https://claude.ai/code/session_019DMGxsA43mtT7QTRiNipBM --- src/adcp/server/a2a_server.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/adcp/server/a2a_server.py b/src/adcp/server/a2a_server.py index 0c89a62c2..c49bc7b8f 100644 --- a/src/adcp/server/a2a_server.py +++ b/src/adcp/server/a2a_server.py @@ -441,9 +441,8 @@ async def _send_adcp_error( adcp_error_payload: dict[str, Any] = { "code": code, "message": message, + "recovery": recovery, } - if recovery: - adcp_error_payload["recovery"] = recovery if field is not None: adcp_error_payload["field"] = field if suggestion is not None: