Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 44 additions & 23 deletions src/adcp/server/a2a_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from google.protobuf.struct_pb2 import Value
from starlette.applications import Starlette

from adcp.exceptions import ADCPError, ADCPTaskError
from adcp.exceptions import ADCPError
from adcp.server.base import ADCPHandler, ToolContext

if TYPE_CHECKING:
Expand Down Expand Up @@ -73,7 +73,6 @@
"""


from adcp.server.helpers import STANDARD_ERROR_CODES # noqa: E402
from adcp.server.mcp_tools import create_tool_caller, get_tools_for_handler
from adcp.server.test_controller import TestControllerStore, _handle_test_controller

Expand Down Expand Up @@ -214,9 +213,23 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non
# errors (auth rejected, rate-limited pre-dispatch).
logger.info("AdCP application error for skill %s: %s", skill_name, exc)
await self._send_adcp_error(event_queue, context, exc)
except Exception:
logger.exception("Error executing skill %s", skill_name)
await self._send_error(event_queue, context, f"Skill execution failed: {skill_name}")
except Exception as exc:
# DecisioningPlatform raises adcp.decisioning.types.AdcpError which is
# a separate class hierarchy from adcp.exceptions.ADCPError. Catch it
# here to preserve the structured shape instead of collapsing it into
# a generic "Skill execution failed" text message.
try:
from adcp.decisioning.types import AdcpError as _DecAdcpError # noqa: N813
except Exception:
_DecAdcpError = None # type: ignore[assignment,misc] # noqa: N806
if _DecAdcpError is not None and isinstance(exc, _DecAdcpError):
logger.info("AdCP decisioning error for skill %s: %s", skill_name, exc)
await self._send_adcp_error(event_queue, context, exc)
else:
logger.exception("Error executing skill %s", skill_name)
await self._send_error(
event_queue, context, f"Skill execution failed: {skill_name}"
)

async def _dispatch_with_middleware(
self,
Expand Down Expand Up @@ -406,37 +419,45 @@ async def _send_adcp_error(
self,
event_queue: EventQueue,
context: RequestContext,
exc: ADCPError,
exc: Any,
) -> None:
"""Publish a failed task carrying an AdCP ``adcp_error`` payload.

Follows transport-errors.mdx §A2A Binding: failed task with artifact
containing a ``DataPart`` keyed under ``adcp_error`` plus a terse
``TextPart`` for human/LLM consumption.

Accepts both ``adcp.exceptions.ADCPError`` and the decisioning-layer
``adcp.decisioning.types.AdcpError`` — both are routed here so the
full structured shape (code, message, recovery, field, details,
retry_after) reaches A2A buyers on the same wire path as MCP.
"""
# Derive the spec error code. ADCPTaskError carries a list of codes
# (e.g. IdempotencyConflictError → IDEMPOTENCY_CONFLICT); fall back
# to a generic INTERNAL_ERROR when the exception doesn't supply one.
code = "INTERNAL_ERROR"
if isinstance(exc, ADCPTaskError) and exc.error_codes:
code = str(exc.error_codes[0])

adcp_error: dict[str, Any] = {
from adcp.server.translate import _extract_structured_fields

code, message, recovery, field, suggestion, details, _errors = _extract_structured_fields(
exc
)

adcp_error_payload: dict[str, Any] = {
"code": code,
"message": exc.message,
"message": message,
"recovery": recovery,
}
recovery = STANDARD_ERROR_CODES.get(code, {}).get("recovery")
if recovery:
adcp_error["recovery"] = recovery
suggestion = getattr(exc, "suggestion", None)
if suggestion:
adcp_error["suggestion"] = suggestion
if field is not None:
adcp_error_payload["field"] = field
if suggestion is not None:
adcp_error_payload["suggestion"] = suggestion
retry_after = getattr(exc, "retry_after", None)
if retry_after is not None:
adcp_error_payload["retry_after"] = retry_after
if details:
adcp_error_payload["details"] = dict(details)

task = _make_task(
context,
state=pb.TaskState.TASK_STATE_FAILED,
data={"adcp_error": adcp_error},
message=exc.message,
data={"adcp_error": adcp_error_payload},
message=message,
)
await event_queue.enqueue_event(task)

Expand Down
172 changes: 172 additions & 0 deletions tests/test_a2a_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,178 @@ async def test_cancel():
assert event.status.state == pb.TaskState.TASK_STATE_CANCELED


# ---------------------------------------------------------------------------
# Structured A2A error envelope — issue #530 parity with MCP (#525)
# ---------------------------------------------------------------------------


def _make_fake_queue() -> tuple[Any, list[Any]]:
"""Return (queue, captured) — enqueue_event appends to captured."""
captured: list[Any] = []

class _FakeQueue:
async def enqueue_event(self, event: Any) -> None:
captured.append(event)

return _FakeQueue(), captured


def _make_context_stub() -> Any:
"""Minimal RequestContext stub for direct _send_adcp_error calls."""
from types import SimpleNamespace

return SimpleNamespace(task_id=None, context_id=None, message=None)


def _extract_adcp_error_from_task(task: Any) -> dict[str, Any]:
"""Pull the adcp_error dict out of the first DataPart in a failed task."""
data_parts = [
_MessageToDict(p.data)
for p in task.artifacts[0].parts
if p.WhichOneof("content") == "data"
]
assert data_parts, "failed task missing DataPart"
err = data_parts[0].get("adcp_error")
assert err is not None, f"DataPart missing adcp_error key; got {data_parts[0]}"
return err # type: ignore[return-value]


@pytest.mark.parametrize(
"code,recovery",
[
("MEDIA_BUY_NOT_FOUND", "correctable"),
("PACKAGE_NOT_FOUND", "correctable"),
("BUDGET_TOO_LOW", "correctable"),
("TERMS_REJECTED", "terminal"), # not in STANDARD_ERROR_CODES → default terminal
],
)
async def test_send_adcp_error_full_envelope_from_adcp_task_error(
code: str, recovery: str
) -> None:
"""_send_adcp_error populates field/details from ADCPTaskError with Error model."""
from adcp.exceptions import ADCPTaskError
from adcp.types import Error

detail_err = Error(
code=code,
message=f"{code} raised",
field="packages[0].budget",
details={"minimum": 500},
)
exc = ADCPTaskError("create_media_buy", [detail_err])

executor = ADCPAgentExecutor(_TestHandler())
queue, captured = _make_fake_queue()
await executor._send_adcp_error(queue, _make_context_stub(), exc)

assert captured, "no event emitted"
task = captured[0]
assert task.status.state == pb.TaskState.TASK_STATE_FAILED
err = _extract_adcp_error_from_task(task)

assert err["code"] == code
assert err["recovery"] == recovery
assert err.get("field") == "packages[0].budget"
assert err.get("details") == {"minimum": 500}


async def test_send_adcp_error_retry_after_from_decisioning_error() -> None:
"""Decisioning AdcpError with retry_after projects onto the wire envelope."""
from adcp.decisioning.types import AdcpError as DecisioningAdcpError

exc = DecisioningAdcpError(
"RATE_LIMITED",
message="Too many requests",
recovery="transient",
retry_after=30,
suggestion="Wait 30 seconds",
)

executor = ADCPAgentExecutor(_TestHandler())
queue, captured = _make_fake_queue()
await executor._send_adcp_error(queue, _make_context_stub(), exc)

assert captured
task = captured[0]
assert task.status.state == pb.TaskState.TASK_STATE_FAILED
err = _extract_adcp_error_from_task(task)

assert err["code"] == "RATE_LIMITED"
assert err["recovery"] == "transient"
assert err.get("retry_after") == 30
assert err.get("suggestion") == "Wait 30 seconds"


async def test_execute_catches_decisioning_adcp_error() -> None:
"""execute() routes decisioning AdcpError through _send_adcp_error (not generic failed)."""
from adcp.decisioning.types import AdcpError as DecisioningAdcpError

class _SellerRaisesDecisioningError(ADCPHandler):
async def get_adcp_capabilities(self, params: Any, context: Any = None) -> Any:
return {"adcp": {"major_versions": [3]}}

async def get_products(self, params: Any, context: Any = None) -> Any:
raise DecisioningAdcpError(
"BUDGET_TOO_LOW",
message="Budget $50 is below minimum",
recovery="correctable",
field="packages[0].budget",
suggestion="Increase budget to at least $500",
details={"minimum": 500, "actual": 50},
)

executor = ADCPAgentExecutor(_SellerRaisesDecisioningError())
ctx = RequestContext(request=MessageSendParams(message=_make_datapart_msg("get_products")))
queue = EventQueue()

await executor.execute(ctx, queue)

event = await queue.dequeue_event()
assert event.status.state == pb.TaskState.TASK_STATE_FAILED

err = _extract_adcp_error_from_task(event)
assert err["code"] == "BUDGET_TOO_LOW"
assert err["recovery"] == "correctable"
assert err.get("field") == "packages[0].budget"
assert err.get("suggestion") == "Increase budget to at least $500"
assert err.get("details") == {"minimum": 500, "actual": 50}

# Must not surface as a generic "Skill execution failed" text
text_parts = [
p.text for p in event.artifacts[0].parts if p.WhichOneof("content") == "text"
]
assert text_parts, "expected text fallback in failed task"
assert "Skill execution failed" not in text_parts[0]


async def test_execute_decisioning_error_terms_rejected() -> None:
"""Decisioning AdcpError without optional fields still emits structured code."""
from adcp.decisioning.types import AdcpError as DecisioningAdcpError

class _SellerTermsRejected(ADCPHandler):
async def get_adcp_capabilities(self, params: Any, context: Any = None) -> Any:
return {"adcp": {"major_versions": [3]}}

async def get_products(self, params: Any, context: Any = None) -> Any:
raise DecisioningAdcpError(
"TERMS_REJECTED",
message="Terms must be accepted before buying",
recovery="correctable",
)

executor = ADCPAgentExecutor(_SellerTermsRejected())
ctx = RequestContext(request=MessageSendParams(message=_make_datapart_msg("get_products")))
queue = EventQueue()

await executor.execute(ctx, queue)

event = await queue.dequeue_event()
assert event.status.state == pb.TaskState.TASK_STATE_FAILED
err = _extract_adcp_error_from_task(event)
assert err["code"] == "TERMS_REJECTED"
assert "Skill execution failed" not in str(event.artifacts)


# ---------------------------------------------------------------------------
# Agent card builder
# ---------------------------------------------------------------------------
Expand Down
Loading