Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public class Workflow
internal Dictionary<string, HashSet<Edge>> Edges { get; init; } = [];
internal Dictionary<string, HashSet<OutputTag>> OutputExecutors { get; init; } = new(StringComparer.Ordinal);

internal bool IsTerminalOutput(string executorId)
=> this.OutputExecutors.TryGetValue(executorId, out HashSet<OutputTag>? tags)
&& !tags.Contains(OutputTag.Intermediate);

/// <summary>
/// Gets the collection of edges grouped by their source node identifier.
/// </summary>
Expand Down
49 changes: 43 additions & 6 deletions dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowHostAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,16 @@ Task<AgentResponse> RunCoreAsync(
await this.ValidateWorkflowAsync().ConfigureAwait(false);

WorkflowSession workflowSession = await this.UpdateSessionAsync(messages, session, cancellationToken).ConfigureAwait(false);
MessageMerger merger = new();
ResponseMergeState mergeState = new();

await foreach (AgentResponseUpdate update in workflowSession.InvokeStageAsync(cancellationToken)
.ConfigureAwait(false)
.WithCancellation(cancellationToken))
{
merger.AddUpdate(update);
mergeState.AddUpdate(update, this.IsTerminalWorkflowOutputUpdate(update));
}

AgentResponse response = merger.ComputeMerged(workflowSession.LastResponseId!, this.Id, this.Name);
AgentResponse response = mergeState.ComputeMerged(workflowSession.LastResponseId!, this.Id, this.Name);
workflowSession.ChatHistoryProvider.AddMessages(workflowSession, response.Messages);
workflowSession.ChatHistoryProvider.UpdateBookmark(workflowSession);

Expand All @@ -142,18 +142,55 @@ IAsyncEnumerable<AgentResponseUpdate> RunCoreStreamingAsync(
await this.ValidateWorkflowAsync().ConfigureAwait(false);

WorkflowSession workflowSession = await this.UpdateSessionAsync(messages, session, cancellationToken).ConfigureAwait(false);
MessageMerger merger = new();
ResponseMergeState mergeState = new();

await foreach (AgentResponseUpdate update in workflowSession.InvokeStageAsync(cancellationToken)
.ConfigureAwait(false)
.WithCancellation(cancellationToken))
{
merger.AddUpdate(update);
mergeState.AddUpdate(update, this.IsTerminalWorkflowOutputUpdate(update));
yield return update;
}

AgentResponse response = merger.ComputeMerged(workflowSession.LastResponseId!, this.Id, this.Name);
AgentResponse response = mergeState.ComputeMerged(workflowSession.LastResponseId!, this.Id, this.Name);
workflowSession.ChatHistoryProvider.AddMessages(workflowSession, response.Messages);
workflowSession.ChatHistoryProvider.UpdateBookmark(workflowSession);
}

private sealed class ResponseMergeState
{
private readonly MessageMerger _allUpdates = new();
private readonly MessageMerger _terminalWorkflowOutputs = new();
private bool _hasTerminalWorkflowOutputs;

public void AddUpdate(AgentResponseUpdate update, bool isTerminalWorkflowOutput)
{
this._allUpdates.AddUpdate(update);
if (isTerminalWorkflowOutput)
{
this._terminalWorkflowOutputs.AddUpdate(update);
this._hasTerminalWorkflowOutputs = true;
}
}

public AgentResponse ComputeMerged(string responseId, string? agentId, string? agentName)
{
MessageMerger merger = this._hasTerminalWorkflowOutputs
? this._terminalWorkflowOutputs
: this._allUpdates;
return merger.ComputeMerged(responseId, agentId, agentName);
}
}

private bool IsTerminalWorkflowOutputUpdate(AgentResponseUpdate update)
{
if (update.RawRepresentation is not WorkflowOutputEvent output
|| output is AgentResponseUpdateEvent
|| output is AgentResponseEvent)
{
return false;
}

return this._workflow.IsTerminalOutput(output.ExecutorId);
}
Comment on lines +185 to +195
}
69 changes: 51 additions & 18 deletions dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public AgentResponseUpdate CreateUpdate(string responseId, object raw, ChatMessa

return new(message.Role, message.Contents)
{
AuthorName = message.AuthorName,
CreatedAt = message.CreatedAt ?? DateTimeOffset.UtcNow,
MessageId = message.MessageId ?? Guid.NewGuid().ToString("N"),
ResponseId = responseId,
Expand Down Expand Up @@ -456,6 +457,17 @@ IAsyncEnumerable<AgentResponseUpdate> InvokeStageAsync(
{
await run.TrySendMessageAsync(new TurnToken(emitEvents: true)).ConfigureAwait(false);
}

AgentResponseUpdate CreateObservabilityUpdate(WorkflowEvent evt)
=> new(ChatRole.Assistant, [])
{
CreatedAt = DateTimeOffset.UtcNow,
MessageId = Guid.NewGuid().ToString("N"),
Role = ChatRole.Assistant,
ResponseId = this.LastResponseId,
RawRepresentation = evt
};

await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false, cancellationToken)
.ConfigureAwait(false)
.WithCancellation(cancellationToken))
Expand Down Expand Up @@ -512,12 +524,14 @@ IAsyncEnumerable<AgentResponseUpdate> InvokeStageAsync(
? executorException.Message
: "An error occurred while executing the workflow.";

yield return this.CreateUpdate(this.LastResponseId, evt, new ErrorContent(executorMessage));
AgentResponseUpdate executorUpdate = this.CreateUpdate(this.LastResponseId, evt, new ErrorContent(executorMessage));
yield return executorUpdate;
break;

case SuperStepCompletedEvent stepCompleted:
this.LastCheckpoint = stepCompleted.CompletionInfo?.Checkpoint;
goto default;
yield return CreateObservabilityUpdate(evt);
break;

case AgentResponseEvent agentResponse:
// Under Futures.EnableAgentResponseOutputTaggingAndFiltering=true, mirror
Expand All @@ -526,7 +540,8 @@ IAsyncEnumerable<AgentResponseUpdate> InvokeStageAsync(
// the legacy default, keep today's behavior — gated by the include flag.
if (!Futures.EnableAgentResponseOutputTaggingAndFiltering && !this._includeWorkflowOutputsInResponse)
{
goto default;
yield return CreateObservabilityUpdate(evt);
break;
}

// Either EnableAgentResponseOutputTaggingAndFiltering -- so yield the Response
Expand All @@ -547,32 +562,50 @@ IAsyncEnumerable<AgentResponseUpdate> InvokeStageAsync(
ChatMessage chatMessage => [chatMessage],
_ => null
};
IEnumerable<AIContent>? updateContents = output.Data switch
{
string text => [new TextContent(text)],
AIContent content => [content],
IEnumerable<AIContent> contents => contents,
_ => null
};

// Same assymetry as with AgentResponseEvent, but there is no EnableFiltering flag
// to consider. If this made it here (and since it is not an AgentResponse[Update]),
// it means it is already been selected as an Output() from the user. Intermediate
// is irrelevant here.
if (updateMessages == null || !this._includeWorkflowOutputsInResponse)
// Workflow outputs with response-compatible payloads are forwarded when the
// host requests all workflow outputs, or when this executor is an explicit
// output source for the workflow.
if (updateMessages == null
&& updateContents == null)
{
goto default;
yield return CreateObservabilityUpdate(evt);
break;
}
Comment on lines +576 to +581

foreach (ChatMessage message in updateMessages)
bool includeTerminalOutput = this._workflow.IsTerminalOutput(output.ExecutorId);
if (!this._includeWorkflowOutputsInResponse
&& !includeTerminalOutput)
{
yield return CreateObservabilityUpdate(evt);
break;
}

foreach (ChatMessage message in this._includeWorkflowOutputsInResponse ? updateMessages ?? [] : [])
{
yield return this.CreateUpdate(this.LastResponseId, evt, message);
}
if (updateContents is not null
&& (this._includeWorkflowOutputsInResponse || includeTerminalOutput))
{
AIContent[] contents = [.. updateContents];
if (contents.Length > 0)
{
yield return this.CreateUpdate(this.LastResponseId, evt, contents);
}
}
break;

default:
// Emit all other workflow events for observability (DevUI, logging, etc.)
yield return new AgentResponseUpdate(ChatRole.Assistant, [])
{
CreatedAt = DateTimeOffset.UtcNow,
MessageId = Guid.NewGuid().ToString("N"),
Role = ChatRole.Assistant,
ResponseId = this.LastResponseId,
RawRepresentation = evt
};
yield return CreateObservabilityUpdate(evt);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,20 @@ public override ValueTask HandleAsync(string message, IWorkflowContext context,
}
}

internal sealed class UppercaseStringExecutor(string name = "UppercaseStringExecutor") : Executor<IList<ChatMessage>, string>(name)
{
public override ValueTask<string> HandleAsync(
IList<ChatMessage> message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
string text = string.Join(
"\n",
message.Select(chatMessage => chatMessage.Text).Where(text => !string.IsNullOrWhiteSpace(text)));
return new(text.ToUpperInvariant());
}
}

public class WorkflowHostSmokeTests : AIAgentHostingExecutorTestsBase
{
private sealed class AlwaysFailsAIAgent(bool failByThrowing) : AIAgent
Expand Down Expand Up @@ -825,6 +839,30 @@ public Task Test_Handoffs_AsAgent_OutgoingMessagesInHistoryAsync(bool runAsync)
return this.Run_AsAgent_OutgoingMessagesInHistoryAsync(handoffWorkflow, runAsync);
}

[Fact]
public async Task Test_AsAgent_UsesDesignatedWorkflowOutputInsteadOfIntermediateAgentResponsesAsync()
{
TestReplayAgent firstAgent = new(TestReplayAgent.ToChatMessages("first answer"), "first-agent", "First Agent");
TestReplayAgent secondAgent = new(TestReplayAgent.ToChatMessages("second answer"), "second-agent", "Second Agent");
ExecutorBinding first = firstAgent.BindAsExecutor(new AIAgentHostOptions { ForwardIncomingMessages = false });
ExecutorBinding second = secondAgent.BindAsExecutor(new AIAgentHostOptions { ForwardIncomingMessages = false });
UppercaseStringExecutor uppercase = new();

Workflow workflow = new WorkflowBuilder(first)
.AddEdge(first, second)
.AddEdge(second, uppercase)
.WithOutputFrom(uppercase)
.Build();

AgentResponse response = await workflow
.AsAIAgent("WorkflowAgent")
.RunAsync(new ChatMessage(ChatRole.User, "hello"));

response.Text.Should().Be("SECOND ANSWER");
response.Messages.Should().ContainSingle()
.Which.Text.Should().Be("SECOND ANSWER");
}

// ----- Phase 5: Workflow-as-Agent intermediate forwarding -----------------

[Collection(Futures.FuturesSerialCollection.Name)]
Expand Down