Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 70 additions & 3 deletions python/packages/ag-ui/agent_framework_ag_ui/_agent_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
get_conversation_id_from_update,
get_role_value,
make_json_safe,
normalize_agui_role,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -450,7 +451,7 @@ async def _resolve_approval_responses(
_convert_approval_results_to_tool_messages(messages)


def _convert_approval_results_to_tool_messages(messages: list[Any]) -> None:
def _convert_approval_results_to_tool_messages(messages: list[Message]) -> None:
"""Convert function_result content in user messages to proper tool messages.

After approval processing, tool results end up in user messages. OpenAI and other
Expand All @@ -462,14 +463,14 @@ def _convert_approval_results_to_tool_messages(messages: list[Any]) -> None:
Args:
messages: List of Message objects to process
"""
result: list[Any] = []
result: list[Message] = []

for msg in messages:
if get_role_value(msg) != "user":
result.append(msg)
continue

msg_contents = cast(list[Content], getattr(msg, "contents", None) or [])
msg_contents = msg.contents or []
function_results: list[Content] = [content for content in msg_contents if content.type == "function_result"]
other_contents: list[Content] = [content for content in msg_contents if content.type != "function_result"]

Expand All @@ -492,6 +493,68 @@ def _convert_approval_results_to_tool_messages(messages: list[Any]) -> None:
messages[:] = result


def _clean_resolved_approvals_from_snapshot(
snapshot_messages: list[dict[str, Any]],
resolved_messages: list[Message],
) -> None:
"""Replace approval payloads in snapshot messages with actual tool results.

After _resolve_approval_responses executes approved tools, the snapshot still
contains the raw approval payload (e.g. ``{"accepted": true}``). When this
snapshot is sent back to CopilotKit via ``MessagesSnapshotEvent``, the approval
payload persists in the conversation history. On the next turn CopilotKit
re-sends the full history and the adapter re-detects the approval, causing the
tool to be re-executed.

This function replaces approval tool-message content in ``snapshot_messages``
with the real tool result so the approval payload no longer appears in the
history sent to the client.

Args:
snapshot_messages: Raw AG-UI snapshot messages (mutated in place).
resolved_messages: Provider messages after approval resolution.
"""
# Build call_id → result text from resolved tool messages
result_by_call_id: dict[str, str] = {}
for msg in resolved_messages:
if get_role_value(msg) != "tool":
continue
for content in msg.contents or []:
if content.type == "function_result" and content.call_id:
result_text = (
content.result if isinstance(content.result, str) else json.dumps(make_json_safe(content.result))
)
result_by_call_id[str(content.call_id)] = result_text

if not result_by_call_id:
return

for snap_msg in snapshot_messages:
if normalize_agui_role(snap_msg.get("role", "")) != "tool":
continue
raw_content = snap_msg.get("content")
if not isinstance(raw_content, str):
continue

# Check if this is an approval payload
try:
parsed = json.loads(raw_content)
except (json.JSONDecodeError, TypeError):
continue
if not isinstance(parsed, dict) or "accepted" not in parsed:
continue

# Find matching tool result by toolCallId
tool_call_id = snap_msg.get("toolCallId") or snap_msg.get("tool_call_id") or ""
replacement = result_by_call_id.get(str(tool_call_id))
if replacement is not None:
snap_msg["content"] = replacement
logger.info(
"Replaced approval payload in snapshot for tool_call_id=%s with actual result",
tool_call_id,
)


def _build_messages_snapshot(
flow: FlowState,
snapshot_messages: list[dict[str, Any]],
Expand Down Expand Up @@ -646,6 +709,10 @@ async def run_agent_stream(
tools_for_execution = tools if tools is not None else server_tools
await _resolve_approval_responses(messages, tools_for_execution, agent, run_kwargs)

# Defense-in-depth: replace approval payloads in snapshot with actual tool results
# so CopilotKit does not re-send stale approval content on subsequent turns.
_clean_resolved_approvals_from_snapshot(snapshot_messages, messages)

# Feature #3: Emit StateSnapshotEvent for approved state-changing tools before agent runs
approved_state_updates = _extract_approved_state_updates(messages, predictive_handler)
approved_state_snapshot_emitted = False
Expand Down
42 changes: 42 additions & 0 deletions python/packages/ag-ui/tests/ag_ui/test_message_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,3 +866,45 @@ def test_agui_messages_to_snapshot_format_basic():
assert result[0]["content"] == "Hello"
assert result[1]["role"] == "assistant"
assert result[1]["content"] == "Hi there"


def test_agui_fresh_approval_is_still_processed():
"""A fresh approval (no assistant response after it) must still produce function_approval_response.
On Turn 2, the approval is fresh (no subsequent assistant message), so it
must be processed normally to execute the tool.
"""
messages_input = [
# Turn 1: user asks something
{"role": "user", "content": "What time is it?", "id": "msg_1"},
# Turn 1: assistant calls a tool
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_456",
"type": "function",
"function": {"name": "get_datetime", "arguments": "{}"},
}
],
"id": "msg_2",
},
# Turn 2: user approves (no assistant message after this)
{
"role": "tool",
"content": json.dumps({"accepted": True}),
"toolCallId": "call_456",
"id": "msg_3",
},
]

messages = agui_messages_to_agent_framework(messages_input)

# The fresh approval SHOULD produce a function_approval_response
approval_contents = [
content for msg in messages for content in (msg.contents or []) if content.type == "function_approval_response"
]
assert len(approval_contents) == 1, "Fresh approval should produce function_approval_response"
assert approval_contents[0].approved is True
assert approval_contents[0].function_call.name == "get_datetime"
138 changes: 138 additions & 0 deletions python/packages/ag-ui/tests/ag_ui/test_message_hygiene.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,141 @@ def test_sanitize_tool_history_filters_confirm_changes_from_assistant_messages()
# (the approval response is handled separately by the framework)
tool_call_ids = {str(msg.contents[0].call_id) for msg in tool_messages}
assert "call_c1" not in tool_call_ids # No synthetic result for confirm_changes


# ---------------------------------------------------------------------------
# Tests for _clean_resolved_approvals_from_snapshot
# ---------------------------------------------------------------------------


def test_clean_resolved_approvals_from_snapshot() -> None:
"""Approval payload in snapshot should be replaced with the actual tool result."""
import json

from agent_framework_ag_ui._agent_run import _clean_resolved_approvals_from_snapshot

# Snapshot still has the approval payload
snapshot_messages = [
{"role": "user", "content": "What time is it?", "id": "msg_1"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{"id": "call_123", "type": "function", "function": {"name": "get_datetime", "arguments": "{}"}}
],
"id": "msg_2",
},
{
"role": "tool",
"content": json.dumps({"accepted": True}),
"toolCallId": "call_123",
"id": "msg_3",
},
]

# Resolved provider messages have the actual tool result
resolved_messages = [
Message(role="user", contents=[Content.from_text(text="What time is it?")]),
Message(
role="assistant",
contents=[Content.from_function_call(call_id="call_123", name="get_datetime", arguments="{}")],
),
Message(
role="tool",
contents=[Content.from_function_result(call_id="call_123", result="2024-01-01 12:00:00")],
),
]

_clean_resolved_approvals_from_snapshot(snapshot_messages, resolved_messages)

# The approval payload should now be replaced with the tool result
tool_snap = snapshot_messages[2]
assert tool_snap["content"] == "2024-01-01 12:00:00"


def test_clean_resolved_approvals_from_snapshot_no_approvals() -> None:
"""When there are no approval payloads, snapshot should be unchanged."""
from agent_framework_ag_ui._agent_run import _clean_resolved_approvals_from_snapshot # type: ignore

snapshot_messages = [
{"role": "user", "content": "Hello", "id": "msg_1"},
{"role": "assistant", "content": "Hi there", "id": "msg_2"},
]
original = [dict(m) for m in snapshot_messages]

resolved_messages = [
Message(role="user", contents=[Content.from_text(text="Hello")]),
Message(role="assistant", contents=[Content.from_text(text="Hi there")]),
]

_clean_resolved_approvals_from_snapshot(snapshot_messages, resolved_messages)

# Nothing should have changed
assert snapshot_messages == original


def test_cleaned_snapshot_prevents_approval_reprocessing() -> None:
"""After snapshot cleaning, approval payload is replaced so it won't re-trigger on next turn.

Simulates what happens on Turn 2: the approval is processed, the tool executes,
and _clean_resolved_approvals_from_snapshot replaces the approval payload with the
real tool result. On Turn 3, CopilotKit re-sends the cleaned snapshot, which no
longer contains an approval payload — so no function_approval_response is produced.
"""
import json

from agent_framework_ag_ui._agent_run import _clean_resolved_approvals_from_snapshot
from agent_framework_ag_ui._message_adapters import normalize_agui_input_messages

# Turn 2 snapshot: still has the raw approval payload
snapshot_messages = [
{"role": "user", "content": "What time is it?", "id": "msg_1"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{"id": "call_789", "type": "function", "function": {"name": "get_datetime", "arguments": "{}"}}
],
"id": "msg_2",
},
{
"role": "tool",
"content": json.dumps({"accepted": True}),
"toolCallId": "call_789",
"id": "msg_3",
},
]

# Resolved provider messages after tool execution
resolved_messages = [
Message(role="user", contents=[Content.from_text(text="What time is it?")]),
Message(
role="assistant",
contents=[Content.from_function_call(call_id="call_789", name="get_datetime", arguments="{}")],
),
Message(
role="tool",
contents=[Content.from_function_result(call_id="call_789", result="2024-01-01 12:00:00")],
),
]

# Fix B: clean the snapshot
_clean_resolved_approvals_from_snapshot(snapshot_messages, resolved_messages)

# Snapshot should now have the real tool result
assert snapshot_messages[2]["content"] == "2024-01-01 12:00:00"

# Simulate Turn 3: CopilotKit re-sends the cleaned snapshot + new messages
turn3_messages = list(snapshot_messages) + [
{"role": "assistant", "content": "It is 12:00 PM.", "id": "msg_4"},
{"role": "user", "content": "Thanks!", "id": "msg_5"},
]

provider_messages, _ = normalize_agui_input_messages(turn3_messages)

# No function_approval_response should exist — the approval payload is gone
for msg in provider_messages:
for content in msg.contents or []:
assert content.type != "function_approval_response", (
f"Stale approval was re-processed on subsequent turn: {content}"
)