Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions python/packages/core/agent_framework/_workflows/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ def run(
)
return self._run_impl(messages, response_id, session, checkpoint_id, checkpoint_storage, **kwargs)

def _filter_messages(self, chat_messages: list[Message]) -> list[Message] | None:
"""From a list[Message] output, return only the last meaningful assistant message."""
for msg in reversed(chat_messages):
if msg.role != "user" and msg.text and msg.text.strip():
return [msg]
# fallback: last non-user message
return [m for m in reversed(chat_messages) if m.role != "user"][:1]

async def _run_impl(
self,
messages: AgentRunInputs,
Expand Down Expand Up @@ -476,7 +484,7 @@ def _convert_workflow_events_to_agent_response(
messages.append(data)
raw_representations.append(data.raw_representation)
elif is_instance_of(data, list[Message]):
chat_messages = cast(list[Message], data)
chat_messages = self._filter_messages(cast(list[Message], data))
messages.extend(chat_messages)
raw_representations.append(data)
else:
Expand Down Expand Up @@ -593,7 +601,7 @@ def _convert_workflow_event_to_agent_response_updates(
]
if is_instance_of(data, list[Message]):
# Convert each Message to an AgentResponseUpdate
chat_messages = cast(list[Message], data)
chat_messages = self._filter_messages(cast(list[Message], data))
updates = []
for msg in chat_messages:
updates.append(
Expand Down
125 changes: 111 additions & 14 deletions python/packages/core/tests/workflow/test_workflow_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,15 +469,20 @@ async def raw_yielding_executor(
assert updates[2].raw_representation.value == 42

async def test_workflow_as_agent_yield_output_with_list_of_chat_messages(self) -> None:
"""Test that yield_output with list[Message] extracts contents from all messages.
"""Test that yield_output with list[Message] surfaces only the last assistant message.

Note: Content items are coalesced by _finalize_response, so multiple text contents
become a single merged Content in the final response.
When a workflow executor yields a list[Message] (as GroupChat orchestrators
do with self._full_conversation on termination), _filter_messages returns
only the last meaningful assistant message to avoid re-emitting user input
and replaying the full conversation history across turns. See #4261.

Users who need intermediate agent responses can opt in via
intermediate_outputs=True in GroupChatBuilder.
"""

@executor
async def list_yielding_executor(messages: list[Message], ctx: WorkflowContext[Never, list[Message]]) -> None:
# Yield a list of Messages (as SequentialBuilder does)
# Yield a list of Messages (as GroupChat orchestrator does with _full_conversation)
msg_list = [
Message(role="user", text="first message"),
Message(role="assistant", text="second message"),
Expand All @@ -491,25 +496,24 @@ async def list_yielding_executor(messages: list[Message], ctx: WorkflowContext[N
workflow = WorkflowBuilder(start_executor=list_yielding_executor).build()
agent = workflow.as_agent("list-msg-agent")

# Verify streaming returns the update with all 4 contents before coalescing
# Streaming: _filter_messages returns only the last meaningful assistant message
updates: list[AgentResponseUpdate] = []
async for update in agent.run("test", stream=True):
updates.append(update)

assert len(updates) == 3
# Only the last assistant message should be surfaced (user messages filtered,
# earlier assistant messages treated as conversation history replay)
assert len(updates) == 1
full_response = AgentResponse.from_updates(updates)
assert len(full_response.messages) == 3
texts = [message.text for message in full_response.messages]
# Note: `from_agent_run_response_updates` coalesces multiple text contents into one content
assert texts == ["first message", "second message", "thirdfourth"]
assert len(full_response.messages) == 1
assert full_response.messages[0].text == "thirdfourth"

# Verify run()
# Non-streaming: same filtering applies
result = await agent.run("test")

assert isinstance(result, AgentResponse)
assert len(result.messages) == 3
texts = [message.text for message in result.messages]
assert texts == ["first message", "second message", "third fourth"]
assert len(result.messages) == 1
assert result.messages[0].text == "third fourth"

async def test_session_conversation_history_included_in_workflow_run(self) -> None:
"""Test that messages provided to agent.run() are passed through to the workflow."""
Expand Down Expand Up @@ -1296,3 +1300,96 @@ def test_merge_updates_function_result_no_matching_call(self):

# Order: text (user), text (assistant), function_result (orphan at end)
assert content_types == ["text", "text", "function_result"]


class TestWorkflowAgentUserInputFilteringRegression:
"""Regression tests for #4261: user input must not compound across successive turns.

When a GroupChat orchestrator terminates, it yields self._full_conversation
(a list[Message]) containing the entire conversation history. Without filtering,
user inputs and earlier assistant messages accumulate in the response across turns.

The _filter_messages fix returns only the last meaningful assistant message from
list[Message] output, which aligns with the default GroupChatBuilder behavior
(intermediate_outputs=False) where only the orchestrator's final summary is surfaced.

Users who need intermediate agent responses can opt in via
intermediate_outputs=True in GroupChatBuilder.
"""

async def test_streaming_compounding_not_observed_across_turns(self):
"""Regression: turn 1's user input must not appear in turn 2's streamed response.

Reproduces the escalating symptom from #4261 where each new turn accumulated
all previous user messages in the streamed response text.
"""

@executor
async def groupchat_like_executor(messages: list[Message], ctx: WorkflowContext[Never, AgentResponse]) -> None:
input_text = messages[-1].text or ""
response = AgentResponse(
messages=[
Message(role="user", text=input_text),
Message(
role="assistant",
contents=[Content.from_text(text=f"Answer to: {input_text}")],
author_name="Principal",
),
],
)
await ctx.yield_output(response)

workflow = WorkflowBuilder(start_executor=groupchat_like_executor).build()
agent = workflow.as_agent("groupchat-agent")

# Turn 1
updates1: list[AgentResponseUpdate] = []
async for chunk in agent.run("first_question", stream=True):
updates1.append(chunk)

# Turn 2: "first_question" must NOT bleed into turn 2's streamed output
updates2: list[AgentResponseUpdate] = []
async for chunk in agent.run("second_question", stream=True):
updates2.append(chunk)

text2 = " ".join(u.text or "" for u in updates2 if u.text)
assert "first_question" not in text2, (
"Turn 1 user input should not appear in turn 2 streaming output (compounding regression)"
)
assert "Answer to: second_question" in text2

async def test_nonstreaming_compounding_not_observed_across_turns(self):
"""Regression: turn 1's user input must not appear in turn 2's response.

Reproduces the escalating symptom from #4261 where each new turn accumulated
all previous user messages in the response text.
"""

@executor
async def groupchat_like_executor(messages: list[Message], ctx: WorkflowContext[Never, AgentResponse]) -> None:
input_text = messages[-1].text or ""
response = AgentResponse(
messages=[
Message(role="user", text=input_text),
Message(
role="assistant",
contents=[Content.from_text(text=f"Answer to: {input_text}")],
author_name="Principal",
),
],
)
await ctx.yield_output(response)

workflow = WorkflowBuilder(start_executor=groupchat_like_executor).build()
agent = workflow.as_agent("groupchat-agent")

# Turn 1
await agent.run("first_question")

# Turn 2: "first_question" must NOT bleed into turn 2's response
result2 = await agent.run("second_question")
text2 = " ".join(m.text or "" for m in result2.messages)
assert "first_question" not in text2, (
"Turn 1 user input should not appear in turn 2 response (compounding regression)"
)
assert "Answer to: second_question" in text2