Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ internal sealed class LockstepRunEventStream : IRunEventStream
private int _isDisposed;

private readonly ISuperStepRunner _stepRunner;
private Activity? _sessionActivity;

public ValueTask<RunStatus> GetStatusAsync(CancellationToken cancellationToken = default) => new(this.RunStatus);

Expand All @@ -30,7 +31,16 @@ public LockstepRunEventStream(ISuperStepRunner stepRunner)

public void Start()
{
// No-op for lockstep execution
// Save and restore Activity.Current so the long-lived session activity
// doesn't leak into caller code via AsyncLocal.
Activity? previousActivity = Activity.Current;

this._sessionActivity = this._stepRunner.TelemetryContext.StartWorkflowSessionActivity();
this._sessionActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId)
.SetTag(Tags.SessionId, this._stepRunner.SessionId);
this._sessionActivity?.AddEvent(new ActivityEvent(EventNames.SessionStarted));

Activity.Current = previousActivity;
}

public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPendingRequest, [EnumeratorCancellation] CancellationToken cancellationToken = default)
Expand All @@ -44,19 +54,23 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
}
#endif

CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(this._stopCancellation.Token, cancellationToken);
using CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(this._stopCancellation.Token, cancellationToken);

ConcurrentQueue<WorkflowEvent> eventSink = [];

this._stepRunner.OutgoingEvents.EventRaised += OnWorkflowEventAsync;

using Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity();
activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.SessionId, this._stepRunner.SessionId);
// Re-establish session as parent so the run activity nests correctly.
Activity.Current = this._sessionActivity;

// Not 'using' — must dispose explicitly in finally for deterministic export.
Activity? runActivity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity();
runActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.SessionId, this._stepRunner.SessionId);

try
{
this.RunStatus = RunStatus.Running;
activity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted));
runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted));

do
{
Expand All @@ -65,7 +79,7 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
{
// Because we may be yielding out of this function, we need to ensure that the Activity.Current
// is set to our activity for the duration of this loop iteration.
Activity.Current = activity;
Activity.Current = runActivity;

// Drain SuperSteps while there are steps to run
try
Expand All @@ -75,13 +89,13 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
catch (OperationCanceledException)
{
}
catch (Exception ex) when (activity is not null)
catch (Exception ex) when (runActivity is not null)
{
activity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() {
runActivity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() {
{ Tags.ErrorType, ex.GetType().FullName },
{ Tags.BuildErrorMessage, ex.Message },
{ Tags.ErrorMessage, ex.Message },
}));
activity.CaptureException(ex);
runActivity.CaptureException(ex);
throw;
}

Expand Down Expand Up @@ -129,12 +143,16 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
}
} while (!ShouldBreak());

activity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted));
runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted));
}
finally
{
this.RunStatus = this._stepRunner.HasUnservicedRequests ? RunStatus.PendingRequests : RunStatus.Idle;
this._stepRunner.OutgoingEvents.EventRaised -= OnWorkflowEventAsync;

// Explicitly dispose the Activity so Activity.Stop fires deterministically,
// regardless of how the async iterator enumerator is disposed.
runActivity?.Dispose();
}

ValueTask OnWorkflowEventAsync(object? sender, WorkflowEvent e)
Expand Down Expand Up @@ -172,6 +190,14 @@ public ValueTask DisposeAsync()
{
this._stopCancellation.Cancel();

// Stop the session activity
if (this._sessionActivity is not null)
{
this._sessionActivity.AddEvent(new ActivityEvent(EventNames.SessionCompleted));
this._sessionActivity.Dispose();
this._sessionActivity = null;
}

this._stopCancellation.Dispose();
this._inputWaiter.Dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,20 @@ public void Start()
private async Task RunLoopAsync(CancellationToken cancellationToken)
{
using CancellationTokenSource errorSource = new();
CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(errorSource.Token, cancellationToken);
using CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(errorSource.Token, cancellationToken);

// Subscribe to events - they will flow directly to the channel as they're raised
this._stepRunner.OutgoingEvents.EventRaised += OnEventRaisedAsync;

using Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity();
activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.SessionId, this._stepRunner.SessionId);
// Start the session-level activity that spans the entire run loop lifetime.
// Individual run-stage activities are nested within this session activity.
Activity? sessionActivity = this._stepRunner.TelemetryContext.StartWorkflowSessionActivity();
sessionActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId)
.SetTag(Tags.SessionId, this._stepRunner.SessionId);

Activity? runActivity = null;

sessionActivity?.AddEvent(new ActivityEvent(EventNames.SessionStarted));

try
{
Expand All @@ -70,10 +77,15 @@ private async Task RunLoopAsync(CancellationToken cancellationToken)
await this._inputWaiter.WaitForInputAsync(cancellationToken: linkedSource.Token).ConfigureAwait(false);

this._runStatus = RunStatus.Running;
activity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted));

while (!linkedSource.Token.IsCancellationRequested)
{
// Start a new run-stage activity for this input→processing→halt cycle
runActivity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity();
runActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId)
.SetTag(Tags.SessionId, this._stepRunner.SessionId);
runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted));

// Run all available supersteps continuously
// Events are streamed out in real-time as they happen via the event handler
while (this._stepRunner.HasUnprocessedMessages && !linkedSource.Token.IsCancellationRequested)
Expand All @@ -93,6 +105,15 @@ private async Task RunLoopAsync(CancellationToken cancellationToken)
RunStatus capturedStatus = this._runStatus;
await this._eventChannel.Writer.WriteAsync(new InternalHaltSignal(currentEpoch, capturedStatus), linkedSource.Token).ConfigureAwait(false);

// Close the run-stage activity when processing halts.
// A new run activity will be created when the next input arrives.
if (runActivity is not null)
{
runActivity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted));
runActivity.Dispose();
runActivity = null;
}

// Wait for next input from the consumer
// Works for both Idle (no work) and PendingRequests (waiting for responses)
await this._inputWaiter.WaitForInputAsync(TimeSpan.FromSeconds(1), linkedSource.Token).ConfigureAwait(false);
Expand All @@ -107,14 +128,26 @@ private async Task RunLoopAsync(CancellationToken cancellationToken)
}
catch (Exception ex)
{
if (activity != null)
// Record error on the run-stage activity if one is active
if (runActivity is not null)
{
runActivity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() {
{ Tags.ErrorType, ex.GetType().FullName },
{ Tags.ErrorMessage, ex.Message },
}));
runActivity.CaptureException(ex);
}

// Record error on the session activity
if (sessionActivity is not null)
{
activity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() {
sessionActivity.AddEvent(new ActivityEvent(EventNames.SessionError, tags: new() {
{ Tags.ErrorType, ex.GetType().FullName },
{ Tags.BuildErrorMessage, ex.Message },
{ Tags.ErrorMessage, ex.Message },
}));
activity.CaptureException(ex);
sessionActivity.CaptureException(ex);
}

await this._eventChannel.Writer.WriteAsync(new WorkflowErrorEvent(ex), linkedSource.Token).ConfigureAwait(false);
}
finally
Expand All @@ -124,7 +157,20 @@ private async Task RunLoopAsync(CancellationToken cancellationToken)

// Mark as ended when run loop exits
this._runStatus = RunStatus.Ended;
activity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted));

// Stop the run-stage activity if not already stopped (e.g. on cancellation or error)
if (runActivity is not null)
{
runActivity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted));
runActivity.Dispose();
}

// Stop the session activity — the session always ends when the run loop exits
if (sessionActivity is not null)
{
sessionActivity.AddEvent(new ActivityEvent(EventNames.SessionCompleted));
sessionActivity.Dispose();
}
}

async ValueTask OnEventRaisedAsync(object? sender, WorkflowEvent e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ namespace Microsoft.Agents.AI.Workflows.Observability;
internal static class ActivityNames
{
public const string WorkflowBuild = "workflow.build";
public const string WorkflowRun = "workflow_invoke";
public const string WorkflowSession = "workflow.session";
public const string WorkflowInvoke = "workflow_invoke";
public const string MessageSend = "message.send";
public const string ExecutorProcess = "executor.process";
public const string EdgeGroupProcess = "edge_group.process";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ internal static class EventNames
public const string BuildValidationCompleted = "build.validation_completed";
public const string BuildCompleted = "build.completed";
public const string BuildError = "build.error";
public const string SessionStarted = "session.started";
public const string SessionCompleted = "session.completed";
public const string SessionError = "session.error";
public const string WorkflowStarted = "workflow.started";
public const string WorkflowCompleted = "workflow.completed";
public const string WorkflowError = "workflow.error";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ internal static class Tags
public const string BuildErrorMessage = "build.error.message";
public const string BuildErrorType = "build.error.type";
public const string ErrorType = "error.type";
public const string ErrorMessage = "error.message";
public const string SessionId = "session.id";
public const string ExecutorId = "executor.id";
public const string ExecutorType = "executor.type";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,25 @@ public WorkflowTelemetryContext(WorkflowTelemetryOptions options, ActivitySource
}

/// <summary>
/// Starts a workflow run activity if enabled.
/// Starts a workflow session activity if enabled. This is the outer/parent span
/// that represents the entire lifetime of a workflow execution (from start
/// until stop, cancellation, or error) within the current trace.
/// Individual run stages are typically nested within it.
/// </summary>
/// <returns>An activity if workflow run telemetry is enabled, otherwise null.</returns>
public Activity? StartWorkflowSessionActivity()
{
if (!this.IsEnabled || this.Options.DisableWorkflowRun)
{
return null;
}

return this.ActivitySource.StartActivity(ActivityNames.WorkflowSession);
}

/// <summary>
/// Starts a workflow run activity if enabled. This represents a single
/// input-to-halt cycle within a workflow session.
/// </summary>
/// <returns>An activity if workflow run telemetry is enabled, otherwise null.</returns>
public Activity? StartWorkflowRunActivity()
Expand All @@ -98,7 +116,7 @@ public WorkflowTelemetryContext(WorkflowTelemetryOptions options, ActivitySource
return null;
}

return this.ActivitySource.StartActivity(ActivityNames.WorkflowRun);
return this.ActivitySource.StartActivity(ActivityNames.WorkflowInvoke);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,16 @@ public ObservabilityTests()
/// Create a sample workflow for testing.
/// </summary>
/// <remarks>
/// This workflow is expected to create 8 activities that will be captured by the tests
/// This workflow is expected to create 9 activities that will be captured by the tests
/// - ActivityNames.WorkflowBuild
/// - ActivityNames.WorkflowRun
/// -- ActivityNames.EdgeGroupProcess
/// -- ActivityNames.ExecutorProcess (UppercaseExecutor)
/// --- ActivityNames.MessageSend
/// ---- ActivityNames.EdgeGroupProcess
/// -- ActivityNames.ExecutorProcess (ReverseTextExecutor)
/// --- ActivityNames.MessageSend
/// - ActivityNames.WorkflowSession
/// -- ActivityNames.WorkflowInvoke
/// --- ActivityNames.EdgeGroupProcess
/// --- ActivityNames.ExecutorProcess (UppercaseExecutor)
/// ---- ActivityNames.MessageSend
/// ----- ActivityNames.EdgeGroupProcess
/// --- ActivityNames.ExecutorProcess (ReverseTextExecutor)
/// ---- ActivityNames.MessageSend
/// </remarks>
/// <returns>The created workflow.</returns>
private static Workflow CreateWorkflow()
Expand All @@ -74,7 +75,8 @@ private static Dictionary<string, int> GetExpectedActivityNameCounts() =>
new()
{
{ ActivityNames.WorkflowBuild, 1 },
{ ActivityNames.WorkflowRun, 1 },
{ ActivityNames.WorkflowSession, 1 },
{ ActivityNames.WorkflowInvoke, 1 },
{ ActivityNames.EdgeGroupProcess, 2 },
{ ActivityNames.ExecutorProcess, 2 },
{ ActivityNames.MessageSend, 2 }
Expand Down Expand Up @@ -113,7 +115,7 @@ private async Task TestWorkflowEndToEndActivitiesAsync(string executionEnvironme

// Assert
var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList();
capturedActivities.Should().HaveCount(8, "Exactly 8 activities should be created.");
capturedActivities.Should().HaveCount(9, "Exactly 9 activities should be created.");

// Make sure all expected activities exist and have the correct count
foreach (var kvp in GetExpectedActivityNameCounts())
Expand All @@ -125,7 +127,7 @@ private async Task TestWorkflowEndToEndActivitiesAsync(string executionEnvironme
}

// Verify WorkflowRun activity events include workflow lifecycle events
var workflowRunActivity = capturedActivities.First(a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal));
var workflowRunActivity = capturedActivities.First(a => a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal));
var activityEvents = workflowRunActivity.Events.ToList();
activityEvents.Should().Contain(e => e.Name == EventNames.WorkflowStarted, "activity should have workflow started event");
activityEvents.Should().Contain(e => e.Name == EventNames.WorkflowCompleted, "activity should have workflow completed event");
Expand Down Expand Up @@ -273,8 +275,11 @@ public async Task DisableWorkflowRun_PreventsWorkflowRunActivityAsync()
// Assert
var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList();
capturedActivities.Should().NotContain(
a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal),
a => a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal),
"WorkflowRun activity should be disabled.");
capturedActivities.Should().NotContain(
a => a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal),
"WorkflowSession activity should also be disabled when DisableWorkflowRun is true.");
capturedActivities.Should().Contain(
a => a.OperationName.StartsWith(ActivityNames.WorkflowBuild, StringComparison.Ordinal),
"Other activities should still be created.");
Expand Down Expand Up @@ -303,7 +308,7 @@ public async Task DisableExecutorProcess_PreventsExecutorProcessActivityAsync()
a => a.OperationName.StartsWith(ActivityNames.ExecutorProcess, StringComparison.Ordinal),
"ExecutorProcess activity should be disabled.");
capturedActivities.Should().Contain(
a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal),
a => a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal),
"Other activities should still be created.");
}

Expand Down
Loading
Loading