diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_run_common.py b/python/packages/ag-ui/agent_framework_ag_ui/_run_common.py index cec86bdcf3..f573db3418 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_run_common.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_run_common.py @@ -355,6 +355,134 @@ def _emit_usage(content: Content) -> list[BaseEvent]: return [CustomEvent(name="usage", value=usage_details)] +def _emit_mcp_tool_call(content: Content, flow: FlowState) -> list[BaseEvent]: + """Emit ToolCall start/args events for MCP server tool call content. + + MCP tool calls arrive as complete items (not streamed deltas), so we emit a + ``ToolCallStartEvent`` (and, when arguments are present, a ``ToolCallArgsEvent``) + immediately. This maps MCP-specific fields (tool_name, server_name) to the + same AG-UI ToolCall* events used by regular function calls, making MCP tool + execution visible to AG-UI consumers. Completion/end events are handled + separately by ``_emit_mcp_tool_result``. + """ + events: list[BaseEvent] = [] + + tool_call_id = content.call_id or generate_event_id() + tool_name = content.tool_name or "mcp_tool" + + # Prefix with server name for disambiguation when available + display_name = f"{content.server_name}/{tool_name}" if content.server_name else tool_name + + events.append( + ToolCallStartEvent( + tool_call_id=tool_call_id, + tool_call_name=display_name, + parent_message_id=flow.message_id, + ) + ) + + # Serialize arguments + args_str = "" + if content.arguments: + args_str = ( + content.arguments + if isinstance(content.arguments, str) + else json.dumps(make_json_safe(content.arguments)) + ) + events.append(ToolCallArgsEvent(tool_call_id=tool_call_id, delta=args_str)) + + # Track in flow state for MESSAGES_SNAPSHOT + tool_entry = { + "id": tool_call_id, + "type": "function", + "function": {"name": display_name, "arguments": args_str}, + } + flow.pending_tool_calls.append(tool_entry) + flow.tool_calls_by_id[tool_call_id] = tool_entry + + return events + + +def _emit_mcp_tool_result(content: Content, flow: FlowState) -> list[BaseEvent]: + """Emit ToolCallResult events for MCP server tool result content. + + Maps MCP tool results to the same AG-UI ToolCallEnd + ToolCallResult events + used by regular function results. Uses ``content.output`` (the MCP-specific + result field) instead of ``content.result``. + + Mirrors the FlowState cleanup performed by ``_emit_tool_result`` (resetting + tool_call_id/tool_call_name, closing any open text message) so MCP results + behave consistently with standard tool results. + """ + events: list[BaseEvent] = [] + + if not content.call_id: + logger.warning("MCP tool result content missing call_id, skipping") + return events + + events.append(ToolCallEndEvent(tool_call_id=content.call_id)) + flow.tool_calls_ended.add(content.call_id) + + raw_output = content.output if content.output is not None else "" + result_content = raw_output if isinstance(raw_output, str) else json.dumps(make_json_safe(raw_output)) + message_id = generate_event_id() + events.append( + ToolCallResultEvent( + message_id=message_id, + tool_call_id=content.call_id, + content=result_content, + role="tool", + ) + ) + + flow.tool_results.append( + { + "id": message_id, + "role": "tool", + "toolCallId": content.call_id, + "content": result_content, + } + ) + + # Mirror _emit_tool_result cleanup so MCP results behave consistently + flow.tool_call_id = None + flow.tool_call_name = None + + if flow.message_id: + logger.debug("Closing text message for MCP tool result: message_id=%s", flow.message_id) + events.append(TextMessageEndEvent(message_id=flow.message_id)) + flow.message_id = None + flow.accumulated_text = "" + + return events + + +def _emit_text_reasoning(content: Content) -> list[BaseEvent]: + """Emit a custom event for text_reasoning content. + + AG-UI protocol does not define a dedicated reasoning event type, so we emit + a ``CustomEvent`` with ``name="text_reasoning"``. This makes reasoning / + chain-of-thought progress visible to frontends that listen for custom events, + following the same pattern used by ``_emit_usage``. + """ + # Only emit user-visible text from content.text. Do not fall back to + # protected_data as text, since protected_data may contain non-display + # payloads such as provider-specific reasoning metadata. + text = content.text or "" + if not text and content.protected_data is None: + return [] + + value: dict[str, Any] = {"text": text} + # Expose protected_data under a separate key so consumers can decide + # whether/how to render it, without conflating it with display text. + if content.protected_data is not None: + value["protected_data"] = content.protected_data + if content.id: + value["id"] = content.id + + return [CustomEvent(name="text_reasoning", value=value)] + + def _emit_content( content: Any, flow: FlowState, @@ -374,5 +502,11 @@ def _emit_content( return _emit_approval_request(content, flow, predictive_handler, require_confirmation) if content_type == "usage": return _emit_usage(content) + if content_type == "mcp_server_tool_call": + return _emit_mcp_tool_call(content, flow) + if content_type == "mcp_server_tool_result": + return _emit_mcp_tool_result(content, flow) + if content_type == "text_reasoning": + return _emit_text_reasoning(content) logger.debug("Skipping unsupported content type in AG-UI emitter: %s", content_type) return [] diff --git a/python/packages/ag-ui/tests/ag_ui/test_run.py b/python/packages/ag-ui/tests/ag_ui/test_run.py index 4abd63c799..62d4ea8a61 100644 --- a/python/packages/ag-ui/tests/ag_ui/test_run.py +++ b/python/packages/ag-ui/tests/ag_ui/test_run.py @@ -23,7 +23,10 @@ _build_run_finished_event, _emit_approval_request, _emit_content, + _emit_mcp_tool_call, + _emit_mcp_tool_result, _emit_text, + _emit_text_reasoning, _emit_tool_call, _emit_tool_result, _extract_resume_payload, @@ -834,3 +837,294 @@ def test_text_then_tool_flow(self): assert len(start_events) == 2 assert len(end_events) == 2 + + +# ============================================================================ +# Tests for MCP tool call, MCP tool result, and text reasoning event emission +# ============================================================================ + + +class TestEmitMcpToolCall: + """Tests for _emit_mcp_tool_call function.""" + + def test_produces_start_and_args_events(self): + """MCP tool call emits ToolCallStart + ToolCallArgs events.""" + flow = FlowState() + content = Content.from_mcp_server_tool_call( + call_id="mcp_call_1", + tool_name="search", + server_name="brave", + arguments={"query": "weather"}, + ) + + events = _emit_mcp_tool_call(content, flow) + + assert len(events) == 2 # ToolCallStartEvent + ToolCallArgsEvent + assert events[0].type == "TOOL_CALL_START" + assert events[0].tool_call_id == "mcp_call_1" + assert events[0].tool_call_name == "brave/search" + assert events[1].type == "TOOL_CALL_ARGS" + assert events[1].tool_call_id == "mcp_call_1" + assert "weather" in events[1].delta + + def test_tracks_in_flow_state(self): + """MCP tool call is tracked in flow.pending_tool_calls and tool_calls_by_id.""" + flow = FlowState() + content = Content.from_mcp_server_tool_call( + call_id="mcp_call_2", + tool_name="get_file", + arguments='{"path": "/tmp/test.txt"}', + ) + + _emit_mcp_tool_call(content, flow) + + assert len(flow.pending_tool_calls) == 1 + assert flow.pending_tool_calls[0]["id"] == "mcp_call_2" + assert "mcp_call_2" in flow.tool_calls_by_id + assert flow.tool_calls_by_id["mcp_call_2"]["function"]["name"] == "get_file" + + def test_no_server_name_uses_tool_name_only(self): + """Without server_name, display name is just tool_name.""" + flow = FlowState() + content = Content.from_mcp_server_tool_call( + call_id="mcp_call_3", + tool_name="list_files", + ) + + events = _emit_mcp_tool_call(content, flow) + + assert events[0].tool_call_name == "list_files" + + def test_no_arguments_skips_args_event(self): + """No arguments produces only ToolCallStart, no ToolCallArgs.""" + flow = FlowState() + content = Content.from_mcp_server_tool_call( + call_id="mcp_call_4", + tool_name="ping", + ) + + events = _emit_mcp_tool_call(content, flow) + + assert len(events) == 1 # Only ToolCallStartEvent + assert events[0].type == "TOOL_CALL_START" + + def test_generates_id_when_missing(self): + """A tool_call_id is generated when call_id is None.""" + flow = FlowState() + # Create content manually to bypass required call_id in from_mcp_server_tool_call + content = Content(type="mcp_server_tool_call", tool_name="test_tool") + + events = _emit_mcp_tool_call(content, flow) + + assert len(events) >= 1 + assert events[0].tool_call_id is not None + assert events[0].tool_call_id != "" + + +class TestEmitMcpToolResult: + """Tests for _emit_mcp_tool_result function.""" + + def test_produces_end_and_result_events(self): + """MCP tool result emits ToolCallEnd + ToolCallResult events.""" + flow = FlowState() + content = Content.from_mcp_server_tool_result( + call_id="mcp_call_1", + output={"results": [{"title": "Weather", "url": "https://example.com"}]}, + ) + + events = _emit_mcp_tool_result(content, flow) + + assert len(events) == 2 # ToolCallEndEvent + ToolCallResultEvent + assert events[0].type == "TOOL_CALL_END" + assert events[0].tool_call_id == "mcp_call_1" + assert events[1].type == "TOOL_CALL_RESULT" + assert events[1].tool_call_id == "mcp_call_1" + assert "Weather" in events[1].content + + def test_tracks_in_flow_state(self): + """MCP tool result is tracked in flow.tool_results and tool_calls_ended.""" + flow = FlowState() + content = Content.from_mcp_server_tool_result( + call_id="mcp_call_5", + output="Success", + ) + + _emit_mcp_tool_result(content, flow) + + assert "mcp_call_5" in flow.tool_calls_ended + assert len(flow.tool_results) == 1 + assert flow.tool_results[0]["toolCallId"] == "mcp_call_5" + assert flow.tool_results[0]["content"] == "Success" + + def test_no_call_id_returns_empty(self): + """Missing call_id returns empty events list.""" + flow = FlowState() + content = Content(type="mcp_server_tool_result", output="data") + + events = _emit_mcp_tool_result(content, flow) + + assert events == [] + + def test_serializes_non_string_output(self): + """Non-string output is serialized to JSON.""" + flow = FlowState() + content = Content.from_mcp_server_tool_result( + call_id="mcp_call_6", + output={"key": "value", "count": 42}, + ) + + events = _emit_mcp_tool_result(content, flow) + + result_event = events[1] + assert isinstance(result_event.content, str) + assert '"key": "value"' in result_event.content + + def test_resets_flow_state_like_emit_tool_result(self): + """MCP tool result performs same FlowState cleanup as _emit_tool_result. + + Mirrors _emit_tool_result behavior: resets tool_call_id, tool_call_name, + closes any open text message, and resets accumulated_text. + """ + flow = FlowState() + flow.tool_call_id = "mcp_call_7" + flow.tool_call_name = "brave/search" + flow.message_id = "open-msg-456" + flow.accumulated_text = "Let me search for that..." + + content = Content.from_mcp_server_tool_result( + call_id="mcp_call_7", + output="search results", + ) + + events = _emit_mcp_tool_result(content, flow) + + # Verify FlowState cleanup + assert flow.tool_call_id is None + assert flow.tool_call_name is None + assert flow.message_id is None + assert flow.accumulated_text == "" + + # Verify TextMessageEndEvent was emitted for the open message + text_end_events = [e for e in events if isinstance(e, TextMessageEndEvent)] + assert len(text_end_events) == 1 + assert text_end_events[0].message_id == "open-msg-456" + + def test_no_open_message_skips_text_end(self): + """MCP tool result without open text message skips TextMessageEndEvent.""" + flow = FlowState() + flow.message_id = None + + content = Content.from_mcp_server_tool_result( + call_id="mcp_call_8", + output="result", + ) + + events = _emit_mcp_tool_result(content, flow) + + text_end_events = [e for e in events if isinstance(e, TextMessageEndEvent)] + assert len(text_end_events) == 0 + + +class TestEmitTextReasoning: + """Tests for _emit_text_reasoning function.""" + + def test_produces_custom_event(self): + """Text reasoning emits a CustomEvent with name='text_reasoning'.""" + content = Content.from_text_reasoning( + id="reason_1", + text="The user is asking about weather, so I should call the weather tool.", + ) + + events = _emit_text_reasoning(content) + + assert len(events) == 1 + assert events[0].type == "CUSTOM" + assert events[0].name == "text_reasoning" + assert events[0].value["text"] == "The user is asking about weather, so I should call the weather tool." + assert events[0].value["id"] == "reason_1" + + def test_protected_data_as_separate_key(self): + """protected_data is exposed under its own key, not conflated with text.""" + content = Content.from_text_reasoning( + text="visible reasoning", + protected_data="encrypted metadata", + ) + + events = _emit_text_reasoning(content) + + assert len(events) == 1 + assert events[0].value["text"] == "visible reasoning" + assert events[0].value["protected_data"] == "encrypted metadata" + + def test_protected_data_only_emits_event(self): + """Content with only protected_data (no text) still emits an event.""" + content = Content.from_text_reasoning( + protected_data="encrypted reasoning content", + ) + + events = _emit_text_reasoning(content) + + assert len(events) == 1 + assert events[0].value["text"] == "" + assert events[0].value["protected_data"] == "encrypted reasoning content" + + def test_empty_text_and_no_protected_data_returns_empty(self): + """Empty text and no protected_data returns no events.""" + content = Content.from_text_reasoning() + + events = _emit_text_reasoning(content) + + assert events == [] + + def test_no_id_omits_id_field(self): + """When id is None, the value dict should not include 'id'.""" + content = Content.from_text_reasoning(text="thinking...") + + events = _emit_text_reasoning(content) + + assert len(events) == 1 + assert "id" not in events[0].value + + +class TestEmitContentMcpRouting: + """Tests that _emit_content correctly routes MCP and reasoning types.""" + + def test_routes_mcp_server_tool_call(self): + """_emit_content dispatches mcp_server_tool_call to _emit_mcp_tool_call.""" + flow = FlowState() + content = Content.from_mcp_server_tool_call( + call_id="route_test_1", + tool_name="test_tool", + server_name="test_server", + ) + + events = _emit_content(content, flow) + + assert len(events) >= 1 + assert events[0].type == "TOOL_CALL_START" + assert events[0].tool_call_name == "test_server/test_tool" + + def test_routes_mcp_server_tool_result(self): + """_emit_content dispatches mcp_server_tool_result to _emit_mcp_tool_result.""" + flow = FlowState() + content = Content.from_mcp_server_tool_result( + call_id="route_test_2", + output="result data", + ) + + events = _emit_content(content, flow) + + assert len(events) == 2 + assert events[0].type == "TOOL_CALL_END" + assert events[1].type == "TOOL_CALL_RESULT" + + def test_routes_text_reasoning(self): + """_emit_content dispatches text_reasoning to _emit_text_reasoning.""" + flow = FlowState() + content = Content.from_text_reasoning(text="I need to think about this...") + + events = _emit_content(content, flow) + + assert len(events) == 1 + assert events[0].type == "CUSTOM" + assert events[0].name == "text_reasoning"