diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_agent_run.py b/python/packages/ag-ui/agent_framework_ag_ui/_agent_run.py index 5afed1ed87..e35f3e4062 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_agent_run.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_agent_run.py @@ -56,6 +56,7 @@ get_conversation_id_from_update, get_role_value, make_json_safe, + normalize_agui_role, ) if TYPE_CHECKING: @@ -450,7 +451,7 @@ async def _resolve_approval_responses( _convert_approval_results_to_tool_messages(messages) -def _convert_approval_results_to_tool_messages(messages: list[Any]) -> None: +def _convert_approval_results_to_tool_messages(messages: list[Message]) -> None: """Convert function_result content in user messages to proper tool messages. After approval processing, tool results end up in user messages. OpenAI and other @@ -462,14 +463,14 @@ def _convert_approval_results_to_tool_messages(messages: list[Any]) -> None: Args: messages: List of Message objects to process """ - result: list[Any] = [] + result: list[Message] = [] for msg in messages: if get_role_value(msg) != "user": result.append(msg) continue - msg_contents = cast(list[Content], getattr(msg, "contents", None) or []) + msg_contents = msg.contents or [] function_results: list[Content] = [content for content in msg_contents if content.type == "function_result"] other_contents: list[Content] = [content for content in msg_contents if content.type != "function_result"] @@ -492,6 +493,68 @@ def _convert_approval_results_to_tool_messages(messages: list[Any]) -> None: messages[:] = result +def _clean_resolved_approvals_from_snapshot( + snapshot_messages: list[dict[str, Any]], + resolved_messages: list[Message], +) -> None: + """Replace approval payloads in snapshot messages with actual tool results. + + After _resolve_approval_responses executes approved tools, the snapshot still + contains the raw approval payload (e.g. ``{"accepted": true}``). When this + snapshot is sent back to CopilotKit via ``MessagesSnapshotEvent``, the approval + payload persists in the conversation history. On the next turn CopilotKit + re-sends the full history and the adapter re-detects the approval, causing the + tool to be re-executed. + + This function replaces approval tool-message content in ``snapshot_messages`` + with the real tool result so the approval payload no longer appears in the + history sent to the client. + + Args: + snapshot_messages: Raw AG-UI snapshot messages (mutated in place). + resolved_messages: Provider messages after approval resolution. + """ + # Build call_id → result text from resolved tool messages + result_by_call_id: dict[str, str] = {} + for msg in resolved_messages: + if get_role_value(msg) != "tool": + continue + for content in msg.contents or []: + if content.type == "function_result" and content.call_id: + result_text = ( + content.result if isinstance(content.result, str) else json.dumps(make_json_safe(content.result)) + ) + result_by_call_id[str(content.call_id)] = result_text + + if not result_by_call_id: + return + + for snap_msg in snapshot_messages: + if normalize_agui_role(snap_msg.get("role", "")) != "tool": + continue + raw_content = snap_msg.get("content") + if not isinstance(raw_content, str): + continue + + # Check if this is an approval payload + try: + parsed = json.loads(raw_content) + except (json.JSONDecodeError, TypeError): + continue + if not isinstance(parsed, dict) or "accepted" not in parsed: + continue + + # Find matching tool result by toolCallId + tool_call_id = snap_msg.get("toolCallId") or snap_msg.get("tool_call_id") or "" + replacement = result_by_call_id.get(str(tool_call_id)) + if replacement is not None: + snap_msg["content"] = replacement + logger.info( + "Replaced approval payload in snapshot for tool_call_id=%s with actual result", + tool_call_id, + ) + + def _build_messages_snapshot( flow: FlowState, snapshot_messages: list[dict[str, Any]], @@ -646,6 +709,10 @@ async def run_agent_stream( tools_for_execution = tools if tools is not None else server_tools await _resolve_approval_responses(messages, tools_for_execution, agent, run_kwargs) + # Defense-in-depth: replace approval payloads in snapshot with actual tool results + # so CopilotKit does not re-send stale approval content on subsequent turns. + _clean_resolved_approvals_from_snapshot(snapshot_messages, messages) + # Feature #3: Emit StateSnapshotEvent for approved state-changing tools before agent runs approved_state_updates = _extract_approved_state_updates(messages, predictive_handler) approved_state_snapshot_emitted = False diff --git a/python/packages/ag-ui/tests/ag_ui/test_message_adapters.py b/python/packages/ag-ui/tests/ag_ui/test_message_adapters.py index 43bbf48fb2..bc1b95ad7d 100644 --- a/python/packages/ag-ui/tests/ag_ui/test_message_adapters.py +++ b/python/packages/ag-ui/tests/ag_ui/test_message_adapters.py @@ -866,3 +866,45 @@ def test_agui_messages_to_snapshot_format_basic(): assert result[0]["content"] == "Hello" assert result[1]["role"] == "assistant" assert result[1]["content"] == "Hi there" + + +def test_agui_fresh_approval_is_still_processed(): + """A fresh approval (no assistant response after it) must still produce function_approval_response. + + On Turn 2, the approval is fresh (no subsequent assistant message), so it + must be processed normally to execute the tool. + """ + messages_input = [ + # Turn 1: user asks something + {"role": "user", "content": "What time is it?", "id": "msg_1"}, + # Turn 1: assistant calls a tool + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_456", + "type": "function", + "function": {"name": "get_datetime", "arguments": "{}"}, + } + ], + "id": "msg_2", + }, + # Turn 2: user approves (no assistant message after this) + { + "role": "tool", + "content": json.dumps({"accepted": True}), + "toolCallId": "call_456", + "id": "msg_3", + }, + ] + + messages = agui_messages_to_agent_framework(messages_input) + + # The fresh approval SHOULD produce a function_approval_response + approval_contents = [ + content for msg in messages for content in (msg.contents or []) if content.type == "function_approval_response" + ] + assert len(approval_contents) == 1, "Fresh approval should produce function_approval_response" + assert approval_contents[0].approved is True + assert approval_contents[0].function_call.name == "get_datetime" diff --git a/python/packages/ag-ui/tests/ag_ui/test_message_hygiene.py b/python/packages/ag-ui/tests/ag_ui/test_message_hygiene.py index 90e7e35767..a3ccf26d1a 100644 --- a/python/packages/ag-ui/tests/ag_ui/test_message_hygiene.py +++ b/python/packages/ag-ui/tests/ag_ui/test_message_hygiene.py @@ -262,3 +262,141 @@ def test_sanitize_tool_history_filters_confirm_changes_from_assistant_messages() # (the approval response is handled separately by the framework) tool_call_ids = {str(msg.contents[0].call_id) for msg in tool_messages} assert "call_c1" not in tool_call_ids # No synthetic result for confirm_changes + + +# --------------------------------------------------------------------------- +# Tests for _clean_resolved_approvals_from_snapshot +# --------------------------------------------------------------------------- + + +def test_clean_resolved_approvals_from_snapshot() -> None: + """Approval payload in snapshot should be replaced with the actual tool result.""" + import json + + from agent_framework_ag_ui._agent_run import _clean_resolved_approvals_from_snapshot + + # Snapshot still has the approval payload + snapshot_messages = [ + {"role": "user", "content": "What time is it?", "id": "msg_1"}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + {"id": "call_123", "type": "function", "function": {"name": "get_datetime", "arguments": "{}"}} + ], + "id": "msg_2", + }, + { + "role": "tool", + "content": json.dumps({"accepted": True}), + "toolCallId": "call_123", + "id": "msg_3", + }, + ] + + # Resolved provider messages have the actual tool result + resolved_messages = [ + Message(role="user", contents=[Content.from_text(text="What time is it?")]), + Message( + role="assistant", + contents=[Content.from_function_call(call_id="call_123", name="get_datetime", arguments="{}")], + ), + Message( + role="tool", + contents=[Content.from_function_result(call_id="call_123", result="2024-01-01 12:00:00")], + ), + ] + + _clean_resolved_approvals_from_snapshot(snapshot_messages, resolved_messages) + + # The approval payload should now be replaced with the tool result + tool_snap = snapshot_messages[2] + assert tool_snap["content"] == "2024-01-01 12:00:00" + + +def test_clean_resolved_approvals_from_snapshot_no_approvals() -> None: + """When there are no approval payloads, snapshot should be unchanged.""" + from agent_framework_ag_ui._agent_run import _clean_resolved_approvals_from_snapshot # type: ignore + + snapshot_messages = [ + {"role": "user", "content": "Hello", "id": "msg_1"}, + {"role": "assistant", "content": "Hi there", "id": "msg_2"}, + ] + original = [dict(m) for m in snapshot_messages] + + resolved_messages = [ + Message(role="user", contents=[Content.from_text(text="Hello")]), + Message(role="assistant", contents=[Content.from_text(text="Hi there")]), + ] + + _clean_resolved_approvals_from_snapshot(snapshot_messages, resolved_messages) + + # Nothing should have changed + assert snapshot_messages == original + + +def test_cleaned_snapshot_prevents_approval_reprocessing() -> None: + """After snapshot cleaning, approval payload is replaced so it won't re-trigger on next turn. + + Simulates what happens on Turn 2: the approval is processed, the tool executes, + and _clean_resolved_approvals_from_snapshot replaces the approval payload with the + real tool result. On Turn 3, CopilotKit re-sends the cleaned snapshot, which no + longer contains an approval payload — so no function_approval_response is produced. + """ + import json + + from agent_framework_ag_ui._agent_run import _clean_resolved_approvals_from_snapshot + from agent_framework_ag_ui._message_adapters import normalize_agui_input_messages + + # Turn 2 snapshot: still has the raw approval payload + snapshot_messages = [ + {"role": "user", "content": "What time is it?", "id": "msg_1"}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + {"id": "call_789", "type": "function", "function": {"name": "get_datetime", "arguments": "{}"}} + ], + "id": "msg_2", + }, + { + "role": "tool", + "content": json.dumps({"accepted": True}), + "toolCallId": "call_789", + "id": "msg_3", + }, + ] + + # Resolved provider messages after tool execution + resolved_messages = [ + Message(role="user", contents=[Content.from_text(text="What time is it?")]), + Message( + role="assistant", + contents=[Content.from_function_call(call_id="call_789", name="get_datetime", arguments="{}")], + ), + Message( + role="tool", + contents=[Content.from_function_result(call_id="call_789", result="2024-01-01 12:00:00")], + ), + ] + + # Fix B: clean the snapshot + _clean_resolved_approvals_from_snapshot(snapshot_messages, resolved_messages) + + # Snapshot should now have the real tool result + assert snapshot_messages[2]["content"] == "2024-01-01 12:00:00" + + # Simulate Turn 3: CopilotKit re-sends the cleaned snapshot + new messages + turn3_messages = list(snapshot_messages) + [ + {"role": "assistant", "content": "It is 12:00 PM.", "id": "msg_4"}, + {"role": "user", "content": "Thanks!", "id": "msg_5"}, + ] + + provider_messages, _ = normalize_agui_input_messages(turn3_messages) + + # No function_approval_response should exist — the approval payload is gone + for msg in provider_messages: + for content in msg.contents or []: + assert content.type != "function_approval_response", ( + f"Stale approval was re-processed on subsequent turn: {content}" + )