diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index 3fb83803c4..9893024ee7 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -213,6 +213,14 @@ def run( ) return self._run_impl(messages, response_id, session, checkpoint_id, checkpoint_storage, **kwargs) + def _filter_messages(self, chat_messages: list[Message]) -> list[Message]: + """From a list[Message] output, return only the last meaningful assistant message.""" + for msg in reversed(chat_messages): + if msg.role != "user" and msg.text and msg.text.strip(): + return [msg] + # fallback: last non-user message + return [m for m in reversed(chat_messages) if m.role != "user"][:1] + async def _run_impl( self, messages: AgentRunInputs, @@ -476,7 +484,7 @@ def _convert_workflow_events_to_agent_response( messages.append(data) raw_representations.append(data.raw_representation) elif is_instance_of(data, list[Message]): - chat_messages = cast(list[Message], data) + chat_messages = self._filter_messages(cast(list[Message], data)) messages.extend(chat_messages) raw_representations.append(data) else: @@ -593,7 +601,7 @@ def _convert_workflow_event_to_agent_response_updates( ] if is_instance_of(data, list[Message]): # Convert each Message to an AgentResponseUpdate - chat_messages = cast(list[Message], data) + chat_messages = self._filter_messages(cast(list[Message], data)) updates = [] for msg in chat_messages: updates.append( diff --git a/python/packages/core/tests/workflow/test_workflow_agent.py b/python/packages/core/tests/workflow/test_workflow_agent.py index b2fbded39b..d0d96c659d 100644 --- a/python/packages/core/tests/workflow/test_workflow_agent.py +++ b/python/packages/core/tests/workflow/test_workflow_agent.py @@ -469,15 +469,20 @@ async def raw_yielding_executor( assert updates[2].raw_representation.value == 42 async def test_workflow_as_agent_yield_output_with_list_of_chat_messages(self) -> None: - """Test that yield_output with list[Message] extracts contents from all messages. + """Test that yield_output with list[Message] surfaces only the last assistant message. - Note: Content items are coalesced by _finalize_response, so multiple text contents - become a single merged Content in the final response. + When a workflow executor yields a list[Message] (as GroupChat orchestrators + do with self._full_conversation on termination), _filter_messages returns + only the last meaningful assistant message to avoid re-emitting user input + and replaying the full conversation history across turns. See #4261. + + Users who need intermediate agent responses can opt in via + intermediate_outputs=True in GroupChatBuilder. """ @executor async def list_yielding_executor(messages: list[Message], ctx: WorkflowContext[Never, list[Message]]) -> None: - # Yield a list of Messages (as SequentialBuilder does) + # Yield a list of Messages (as GroupChat orchestrator does with _full_conversation) msg_list = [ Message(role="user", text="first message"), Message(role="assistant", text="second message"), @@ -491,25 +496,24 @@ async def list_yielding_executor(messages: list[Message], ctx: WorkflowContext[N workflow = WorkflowBuilder(start_executor=list_yielding_executor).build() agent = workflow.as_agent("list-msg-agent") - # Verify streaming returns the update with all 4 contents before coalescing + # Streaming: _filter_messages returns only the last meaningful assistant message updates: list[AgentResponseUpdate] = [] async for update in agent.run("test", stream=True): updates.append(update) - assert len(updates) == 3 + # Only the last assistant message should be surfaced (user messages filtered, + # earlier assistant messages treated as conversation history replay) + assert len(updates) == 1 full_response = AgentResponse.from_updates(updates) - assert len(full_response.messages) == 3 - texts = [message.text for message in full_response.messages] - # Note: `from_agent_run_response_updates` coalesces multiple text contents into one content - assert texts == ["first message", "second message", "thirdfourth"] + assert len(full_response.messages) == 1 + assert full_response.messages[0].text == "thirdfourth" - # Verify run() + # Non-streaming: same filtering applies result = await agent.run("test") assert isinstance(result, AgentResponse) - assert len(result.messages) == 3 - texts = [message.text for message in result.messages] - assert texts == ["first message", "second message", "third fourth"] + assert len(result.messages) == 1 + assert result.messages[0].text == "third fourth" async def test_session_conversation_history_included_in_workflow_run(self) -> None: """Test that messages provided to agent.run() are passed through to the workflow.""" @@ -1296,3 +1300,96 @@ def test_merge_updates_function_result_no_matching_call(self): # Order: text (user), text (assistant), function_result (orphan at end) assert content_types == ["text", "text", "function_result"] + + +class TestWorkflowAgentUserInputFilteringRegression: + """Regression tests for #4261: user input must not compound across successive turns. + + When a GroupChat orchestrator terminates, it yields self._full_conversation + (a list[Message]) containing the entire conversation history. Without filtering, + user inputs and earlier assistant messages accumulate in the response across turns. + + The _filter_messages fix returns only the last meaningful assistant message from + list[Message] output, which aligns with the default GroupChatBuilder behavior + (intermediate_outputs=False) where only the orchestrator's final summary is surfaced. + + Users who need intermediate agent responses can opt in via + intermediate_outputs=True in GroupChatBuilder. + """ + + async def test_streaming_compounding_not_observed_across_turns(self): + """Regression: turn 1's user input must not appear in turn 2's streamed response. + + Reproduces the escalating symptom from #4261 where each new turn accumulated + all previous user messages in the streamed response text. + """ + + @executor + async def groupchat_like_executor(messages: list[Message], ctx: WorkflowContext[Never, AgentResponse]) -> None: + input_text = messages[-1].text or "" + response = AgentResponse( + messages=[ + Message(role="user", text=input_text), + Message( + role="assistant", + contents=[Content.from_text(text=f"Answer to: {input_text}")], + author_name="Principal", + ), + ], + ) + await ctx.yield_output(response) + + workflow = WorkflowBuilder(start_executor=groupchat_like_executor).build() + agent = workflow.as_agent("groupchat-agent") + + # Turn 1 + updates1: list[AgentResponseUpdate] = [] + async for chunk in agent.run("first_question", stream=True): + updates1.append(chunk) + + # Turn 2: "first_question" must NOT bleed into turn 2's streamed output + updates2: list[AgentResponseUpdate] = [] + async for chunk in agent.run("second_question", stream=True): + updates2.append(chunk) + + text2 = " ".join(u.text or "" for u in updates2 if u.text) + assert "first_question" not in text2, ( + "Turn 1 user input should not appear in turn 2 streaming output (compounding regression)" + ) + assert "Answer to: second_question" in text2 + + async def test_nonstreaming_compounding_not_observed_across_turns(self): + """Regression: turn 1's user input must not appear in turn 2's response. + + Reproduces the escalating symptom from #4261 where each new turn accumulated + all previous user messages in the response text. + """ + + @executor + async def groupchat_like_executor(messages: list[Message], ctx: WorkflowContext[Never, AgentResponse]) -> None: + input_text = messages[-1].text or "" + response = AgentResponse( + messages=[ + Message(role="user", text=input_text), + Message( + role="assistant", + contents=[Content.from_text(text=f"Answer to: {input_text}")], + author_name="Principal", + ), + ], + ) + await ctx.yield_output(response) + + workflow = WorkflowBuilder(start_executor=groupchat_like_executor).build() + agent = workflow.as_agent("groupchat-agent") + + # Turn 1 + await agent.run("first_question") + + # Turn 2: "first_question" must NOT bleed into turn 2's response + result2 = await agent.run("second_question") + text2 = " ".join(m.text or "" for m in result2.messages) + assert "first_question" not in text2, ( + "Turn 1 user input should not appear in turn 2 response (compounding regression)" + ) + assert "Answer to: second_question" in text2