From 068eb43e914f7da2bb43e3d2f967579ad6443bcd Mon Sep 17 00:00:00 2001 From: alliscode Date: Mon, 23 Feb 2026 12:15:12 -0800 Subject: [PATCH 1/7] 1. Add reproduction test for issue #4155: workflow.run Activity never stopped in streaming OffThread path The WorkflowRunActivity_IsStopped_Streaming_OffThread test demonstrates that the workflow.run OpenTelemetry Activity created in StreamingRunEventStream.RunLoopAsync is started but never stopped when using the OffThread/Default streaming execution. The background run loop keeps running after event consumption completes, so the using Activity? declaration never disposes until explicit StopAsync() is called. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> 2. Fix workflow.run Activity never stopped in streaming OffThread execution (#4155) The workflow.run OpenTelemetry Activity in StreamingRunEventStream.RunLoopAsync was scoped to the method lifetime via 'using'. Since the run loop only exits on cancellation, the Activity was never stopped/exported until explicit disposal. Fix: Remove 'using' and explicitly dispose the Activity when the workflow reaches Idle status (all supersteps complete). A safety-net disposal in the finally block handles cancellation and error paths. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Execution/StreamingRunEventStream.cs | 20 +- .../WorkflowRunActivityStopTests.cs | 204 ++++++++++++++++++ 2 files changed, 222 insertions(+), 2 deletions(-) create mode 100644 dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index a6c34f2b9f..1b991c711c 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -60,7 +60,7 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) // Subscribe to events - they will flow directly to the channel as they're raised this._stepRunner.OutgoingEvents.EventRaised += OnEventRaisedAsync; - using Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); + Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.SessionId, this._stepRunner.SessionId); try @@ -93,6 +93,16 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) RunStatus capturedStatus = this._runStatus; await this._eventChannel.Writer.WriteAsync(new InternalHaltSignal(currentEpoch, capturedStatus), linkedSource.Token).ConfigureAwait(false); + // Stop the workflow.run Activity when the workflow reaches Idle so the span is + // exported to telemetry backends immediately, rather than waiting for the run loop + // to be cancelled/disposed. + if (activity is not null && capturedStatus == RunStatus.Idle) + { + activity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); + activity.Dispose(); + activity = null; + } + // Wait for next input from the consumer // Works for both Idle (no work) and PendingRequests (waiting for responses) await this._inputWaiter.WaitForInputAsync(TimeSpan.FromSeconds(1), linkedSource.Token).ConfigureAwait(false); @@ -124,7 +134,13 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) // Mark as ended when run loop exits this._runStatus = RunStatus.Ended; - activity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); + + // Safety net: stop the activity if not already stopped (e.g. on cancellation or error) + if (activity is not null) + { + activity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); + activity.Dispose(); + } } async ValueTask OnEventRaisedAsync(object? sender, WorkflowEvent e) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs new file mode 100644 index 0000000000..ea314d2312 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs @@ -0,0 +1,204 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading.Tasks; +using FluentAssertions; +using Microsoft.Agents.AI.Workflows.InProc; +using Microsoft.Agents.AI.Workflows.Observability; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +/// +/// Regression test for https://github.com/microsoft/agent-framework/issues/4155 +/// Verifies that the workflow.run Activity is properly stopped/disposed so it gets exported +/// to telemetry backends. The ActivityStopped callback must fire for the workflow.run span. +/// +[Collection("ObservabilityTests")] +public sealed class WorkflowRunActivityStopTests : IDisposable +{ + private readonly ActivityListener _activityListener; + private readonly ConcurrentBag _startedActivities = []; + private readonly ConcurrentBag _stoppedActivities = []; + private bool _isDisposed; + + public WorkflowRunActivityStopTests() + { + this._activityListener = new ActivityListener + { + ShouldListenTo = source => source.Name.Contains(typeof(Workflow).Namespace!), + Sample = (ref ActivityCreationOptions options) => ActivitySamplingResult.AllData, + ActivityStarted = activity => this._startedActivities.Add(activity), + ActivityStopped = activity => this._stoppedActivities.Add(activity), + }; + ActivitySource.AddActivityListener(this._activityListener); + } + + public void Dispose() + { + if (!this._isDisposed) + { + this._activityListener?.Dispose(); + this._isDisposed = true; + } + } + + /// + /// Creates a simple sequential workflow with OpenTelemetry enabled. + /// + private static Workflow CreateWorkflow() + { + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + Func reverseFunc = s => new string(s.Reverse().ToArray()); + var reverse = reverseFunc.BindAsExecutor("ReverseTextExecutor"); + + WorkflowBuilder builder = new(uppercase); + builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse); + + return builder.WithOpenTelemetry().Build(); + } + + /// + /// Verifies that the workflow.run Activity is stopped (and thus exportable) when + /// using the Lockstep execution environment. + /// Bug: The Activity created by LockstepRunEventStream.TakeEventStreamAsync is never + /// disposed because yield break in async iterators does not trigger using disposal. + /// + [Fact] + public async Task WorkflowRunActivity_IsStopped_Lockstep() + { + // Arrange + using var testActivity = new Activity("WorkflowRunStopTest_Lockstep").Start(); + + // Act + var workflow = CreateWorkflow(); + Run run = await InProcessExecution.Lockstep.RunAsync(workflow, "Hello, World!"); + await run.DisposeAsync(); + + // Assert - workflow.run should have been started + var startedWorkflowRuns = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) + .ToList(); + startedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be started"); + + // Assert - workflow.run should have been stopped (i.e., Dispose/Stop was called) + // This is the core assertion for issue #4155: the ActivityStopped callback must fire + var stoppedWorkflowRuns = this._stoppedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) + .ToList(); + stoppedWorkflowRuns.Should().HaveCount(1, + "workflow.run Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); + } + + /// + /// Verifies that the workflow.run Activity is stopped when using the OffThread (Default) + /// execution environment (StreamingRunEventStream). + /// + [Fact] + public async Task WorkflowRunActivity_IsStopped_OffThread() + { + // Arrange + using var testActivity = new Activity("WorkflowRunStopTest_OffThread").Start(); + + // Act + var workflow = CreateWorkflow(); + Run run = await InProcessExecution.OffThread.RunAsync(workflow, "Hello, World!"); + await run.DisposeAsync(); + + // Assert - workflow.run should have been started + var startedWorkflowRuns = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) + .ToList(); + startedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be started"); + + // Assert - workflow.run should have been stopped + var stoppedWorkflowRuns = this._stoppedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) + .ToList(); + stoppedWorkflowRuns.Should().HaveCount(1, + "workflow.run Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); + } + + /// + /// Verifies that the workflow.run Activity is stopped when using the streaming API + /// (StreamingRun.WatchStreamAsync) with the OffThread execution environment. + /// This matches the exact usage pattern described in the issue. + /// + [Fact] + public async Task WorkflowRunActivity_IsStopped_Streaming_OffThread() + { + // Arrange + using var testActivity = new Activity("WorkflowRunStopTest_Streaming_OffThread").Start(); + + // Act - use streaming path (WatchStreamAsync), which is the pattern from the issue + var workflow = CreateWorkflow(); + await using StreamingRun run = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Hello, World!"); + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + // Consume all events + } + + // Assert - workflow.run should have been started + var startedWorkflowRuns = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) + .ToList(); + startedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be started"); + + // Assert - workflow.run should have been stopped + var stoppedWorkflowRuns = this._stoppedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) + .ToList(); + stoppedWorkflowRuns.Should().HaveCount(1, + "workflow.run Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); + } + + /// + /// Verifies that all started activities (not just workflow.run) are properly stopped. + /// This ensures no spans are "leaked" without being exported. + /// + [Fact] + public async Task AllActivities_AreStopped_AfterWorkflowCompletion() + { + // Arrange + using var testActivity = new Activity("AllActivitiesStopTest").Start(); + + // Act + var workflow = CreateWorkflow(); + Run run = await InProcessExecution.Lockstep.RunAsync(workflow, "Hello, World!"); + await run.DisposeAsync(); + + // Assert - every started activity should also be stopped + var started = this._startedActivities + .Where(a => a.RootId == testActivity.RootId) + .Select(a => a.Id) + .ToHashSet(); + + var stopped = this._stoppedActivities + .Where(a => a.RootId == testActivity.RootId) + .Select(a => a.Id) + .ToHashSet(); + + var neverStopped = started.Except(stopped).ToList(); + if (neverStopped.Count > 0) + { + var neverStoppedNames = this._startedActivities + .Where(a => neverStopped.Contains(a.Id)) + .Select(a => a.OperationName) + .ToList(); + neverStoppedNames.Should().BeEmpty( + "all started activities should be stopped so they are exported. " + + $"Activities started but never stopped: [{string.Join(", ", neverStoppedNames)}]"); + } + } +} From ae2e9ae8144d3130c347ef6050c9ac62d7ffbf94 Mon Sep 17 00:00:00 2001 From: alliscode Date: Tue, 24 Feb 2026 11:19:44 -0800 Subject: [PATCH 2/7] Add root-level workflow.session activity spanning run loop lifetime\n\nImplements two-level telemetry hierarchy per PR feedback from lokitoth:\n- workflow.session: spans the entire run loop / stream lifetime\n- workflow_invoke: per input-to-halt cycle, nested within the session\n\nThis ensures the session activity stays open across multiple turns,\nwhile individual run activities are created and disposed per cycle.\n\nAlso fixes linkedSource CancellationTokenSource disposal leak in\nStreamingRunEventStream (added using declaration)." --- .../Execution/LockstepRunEventStream.cs | 16 ++- .../Execution/StreamingRunEventStream.cs | 65 +++++++--- .../Observability/ActivityNames.cs | 1 + .../Observability/EventNames.cs | 3 + .../Observability/WorkflowTelemetryContext.cs | 19 ++- .../ObservabilityTests.cs | 20 ++-- .../WorkflowRunActivityStopTests.cs | 112 ++++++++++++++++-- 7 files changed, 198 insertions(+), 38 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs index f26f27ad96..978278ebfa 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs @@ -18,6 +18,7 @@ internal sealed class LockstepRunEventStream : IRunEventStream private int _isDisposed; private readonly ISuperStepRunner _stepRunner; + private Activity? _sessionActivity; public ValueTask GetStatusAsync(CancellationToken cancellationToken = default) => new(this.RunStatus); @@ -30,7 +31,12 @@ public LockstepRunEventStream(ISuperStepRunner stepRunner) public void Start() { - // No-op for lockstep execution + // Start the session-level activity that spans the entire lockstep execution lifetime. + // Individual run-stage activities are nested within this session activity. + this._sessionActivity = this._stepRunner.TelemetryContext.StartWorkflowSessionActivity(); + this._sessionActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId) + .SetTag(Tags.SessionId, this._stepRunner.SessionId); + this._sessionActivity?.AddEvent(new ActivityEvent(EventNames.SessionStarted)); } public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPendingRequest, [EnumeratorCancellation] CancellationToken cancellationToken = default) @@ -172,6 +178,14 @@ public ValueTask DisposeAsync() { this._stopCancellation.Cancel(); + // Stop the session activity — the session ends when the stream is disposed + if (this._sessionActivity is not null) + { + this._sessionActivity.AddEvent(new ActivityEvent(EventNames.SessionCompleted)); + this._sessionActivity.Dispose(); + this._sessionActivity = null; + } + this._stopCancellation.Dispose(); this._inputWaiter.Dispose(); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index 1b991c711c..ddf871e57e 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -55,13 +55,18 @@ public void Start() private async Task RunLoopAsync(CancellationToken cancellationToken) { using CancellationTokenSource errorSource = new(); - CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(errorSource.Token, cancellationToken); + using CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(errorSource.Token, cancellationToken); // Subscribe to events - they will flow directly to the channel as they're raised this._stepRunner.OutgoingEvents.EventRaised += OnEventRaisedAsync; - Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); - activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.SessionId, this._stepRunner.SessionId); + // Start the session-level activity that spans the entire run loop lifetime. + // Individual run-stage activities are nested within this session activity. + Activity? sessionActivity = this._stepRunner.TelemetryContext.StartWorkflowSessionActivity(); + sessionActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId) + .SetTag(Tags.SessionId, this._stepRunner.SessionId); + + Activity? runActivity = null; try { @@ -70,10 +75,16 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) await this._inputWaiter.WaitForInputAsync(cancellationToken: linkedSource.Token).ConfigureAwait(false); this._runStatus = RunStatus.Running; - activity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted)); + sessionActivity?.AddEvent(new ActivityEvent(EventNames.SessionStarted)); while (!linkedSource.Token.IsCancellationRequested) { + // Start a new run-stage activity for this input→processing→halt cycle + runActivity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); + runActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId) + .SetTag(Tags.SessionId, this._stepRunner.SessionId); + runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted)); + // Run all available supersteps continuously // Events are streamed out in real-time as they happen via the event handler while (this._stepRunner.HasUnprocessedMessages && !linkedSource.Token.IsCancellationRequested) @@ -93,14 +104,13 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) RunStatus capturedStatus = this._runStatus; await this._eventChannel.Writer.WriteAsync(new InternalHaltSignal(currentEpoch, capturedStatus), linkedSource.Token).ConfigureAwait(false); - // Stop the workflow.run Activity when the workflow reaches Idle so the span is - // exported to telemetry backends immediately, rather than waiting for the run loop - // to be cancelled/disposed. - if (activity is not null && capturedStatus == RunStatus.Idle) + // Close the run-stage activity when processing halts. + // A new run activity will be created when the next input arrives. + if (runActivity is not null) { - activity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); - activity.Dispose(); - activity = null; + runActivity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); + runActivity.Dispose(); + runActivity = null; } // Wait for next input from the consumer @@ -117,14 +127,26 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) } catch (Exception ex) { - if (activity != null) + // Record error on the run-stage activity if one is active + if (runActivity is not null) + { + runActivity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() { + { Tags.ErrorType, ex.GetType().FullName }, + { Tags.BuildErrorMessage, ex.Message }, + })); + runActivity.CaptureException(ex); + } + + // Record error on the session activity + if (sessionActivity is not null) { - activity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() { + sessionActivity.AddEvent(new ActivityEvent(EventNames.SessionError, tags: new() { { Tags.ErrorType, ex.GetType().FullName }, { Tags.BuildErrorMessage, ex.Message }, })); - activity.CaptureException(ex); + sessionActivity.CaptureException(ex); } + await this._eventChannel.Writer.WriteAsync(new WorkflowErrorEvent(ex), linkedSource.Token).ConfigureAwait(false); } finally @@ -135,11 +157,18 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) // Mark as ended when run loop exits this._runStatus = RunStatus.Ended; - // Safety net: stop the activity if not already stopped (e.g. on cancellation or error) - if (activity is not null) + // Safety net: stop the run-stage activity if not already stopped (e.g. on cancellation or error) + if (runActivity is not null) + { + runActivity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); + runActivity.Dispose(); + } + + // Stop the session activity — the session always ends when the run loop exits + if (sessionActivity is not null) { - activity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); - activity.Dispose(); + sessionActivity.AddEvent(new ActivityEvent(EventNames.SessionCompleted)); + sessionActivity.Dispose(); } } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs index 9d2912ecd3..a89bddc00b 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs @@ -5,6 +5,7 @@ namespace Microsoft.Agents.AI.Workflows.Observability; internal static class ActivityNames { public const string WorkflowBuild = "workflow.build"; + public const string WorkflowSession = "workflow.session"; public const string WorkflowRun = "workflow_invoke"; public const string MessageSend = "message.send"; public const string ExecutorProcess = "executor.process"; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/EventNames.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/EventNames.cs index 8b9f5bbde8..84540efdc8 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/EventNames.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/EventNames.cs @@ -8,6 +8,9 @@ internal static class EventNames public const string BuildValidationCompleted = "build.validation_completed"; public const string BuildCompleted = "build.completed"; public const string BuildError = "build.error"; + public const string SessionStarted = "session.started"; + public const string SessionCompleted = "session.completed"; + public const string SessionError = "session.error"; public const string WorkflowStarted = "workflow.started"; public const string WorkflowCompleted = "workflow.completed"; public const string WorkflowError = "workflow.error"; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs index e4b8d7a851..ad3e071424 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs @@ -88,7 +88,24 @@ public WorkflowTelemetryContext(WorkflowTelemetryOptions options, ActivitySource } /// - /// Starts a workflow run activity if enabled. + /// Starts a workflow session activity if enabled. This is a root-level span + /// that represents the entire lifetime of a workflow execution (from start + /// until stop, cancellation, or error). Individual run stages are nested within it. + /// + /// An activity if workflow run telemetry is enabled, otherwise null. + public Activity? StartWorkflowSessionActivity() + { + if (!this.IsEnabled || this.Options.DisableWorkflowRun) + { + return null; + } + + return this.ActivitySource.StartActivity(ActivityNames.WorkflowSession); + } + + /// + /// Starts a workflow run activity if enabled. This represents a single + /// input-to-halt cycle within a workflow session. /// /// An activity if workflow run telemetry is enabled, otherwise null. public Activity? StartWorkflowRunActivity() diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs index e7a99d5ca2..b24cb479a1 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs @@ -43,15 +43,16 @@ public ObservabilityTests() /// Create a sample workflow for testing. /// /// - /// This workflow is expected to create 8 activities that will be captured by the tests + /// This workflow is expected to create 9 activities that will be captured by the tests /// - ActivityNames.WorkflowBuild - /// - ActivityNames.WorkflowRun - /// -- ActivityNames.EdgeGroupProcess - /// -- ActivityNames.ExecutorProcess (UppercaseExecutor) - /// --- ActivityNames.MessageSend - /// ---- ActivityNames.EdgeGroupProcess - /// -- ActivityNames.ExecutorProcess (ReverseTextExecutor) - /// --- ActivityNames.MessageSend + /// - ActivityNames.WorkflowSession + /// -- ActivityNames.WorkflowRun + /// --- ActivityNames.EdgeGroupProcess + /// --- ActivityNames.ExecutorProcess (UppercaseExecutor) + /// ---- ActivityNames.MessageSend + /// ----- ActivityNames.EdgeGroupProcess + /// --- ActivityNames.ExecutorProcess (ReverseTextExecutor) + /// ---- ActivityNames.MessageSend /// /// The created workflow. private static Workflow CreateWorkflow() @@ -74,6 +75,7 @@ private static Dictionary GetExpectedActivityNameCounts() => new() { { ActivityNames.WorkflowBuild, 1 }, + { ActivityNames.WorkflowSession, 1 }, { ActivityNames.WorkflowRun, 1 }, { ActivityNames.EdgeGroupProcess, 2 }, { ActivityNames.ExecutorProcess, 2 }, @@ -113,7 +115,7 @@ private async Task TestWorkflowEndToEndActivitiesAsync(string executionEnvironme // Assert var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); - capturedActivities.Should().HaveCount(8, "Exactly 8 activities should be created."); + capturedActivities.Should().HaveCount(9, "Exactly 9 activities should be created."); // Make sure all expected activities exist and have the correct count foreach (var kvp in GetExpectedActivityNameCounts()) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs index ea314d2312..20777c04a0 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs @@ -70,7 +70,7 @@ private static Workflow CreateWorkflow() /// disposed because yield break in async iterators does not trigger using disposal. /// [Fact] - public async Task WorkflowRunActivity_IsStopped_Lockstep() + public async Task WorkflowRunActivity_IsStopped_LockstepAsync() { // Arrange using var testActivity = new Activity("WorkflowRunStopTest_Lockstep").Start(); @@ -80,15 +80,27 @@ public async Task WorkflowRunActivity_IsStopped_Lockstep() Run run = await InProcessExecution.Lockstep.RunAsync(workflow, "Hello, World!"); await run.DisposeAsync(); - // Assert - workflow.run should have been started + // Assert - workflow.session should have been started and stopped + var startedSessions = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) + .ToList(); + startedSessions.Should().HaveCount(1, "workflow.session Activity should be started"); + + var stoppedSessions = this._stoppedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) + .ToList(); + stoppedSessions.Should().HaveCount(1, + "workflow.session Activity should be stopped/disposed so it is exported to telemetry backends"); + + // Assert - workflow.run should have been started and stopped var startedWorkflowRuns = this._startedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) .ToList(); startedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be started"); - // Assert - workflow.run should have been stopped (i.e., Dispose/Stop was called) - // This is the core assertion for issue #4155: the ActivityStopped callback must fire var stoppedWorkflowRuns = this._stoppedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) @@ -102,7 +114,7 @@ public async Task WorkflowRunActivity_IsStopped_Lockstep() /// execution environment (StreamingRunEventStream). /// [Fact] - public async Task WorkflowRunActivity_IsStopped_OffThread() + public async Task WorkflowRunActivity_IsStopped_OffThreadAsync() { // Arrange using var testActivity = new Activity("WorkflowRunStopTest_OffThread").Start(); @@ -112,14 +124,27 @@ public async Task WorkflowRunActivity_IsStopped_OffThread() Run run = await InProcessExecution.OffThread.RunAsync(workflow, "Hello, World!"); await run.DisposeAsync(); - // Assert - workflow.run should have been started + // Assert - workflow.session should have been started and stopped + var startedSessions = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) + .ToList(); + startedSessions.Should().HaveCount(1, "workflow.session Activity should be started"); + + var stoppedSessions = this._stoppedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) + .ToList(); + stoppedSessions.Should().HaveCount(1, + "workflow.session Activity should be stopped/disposed so it is exported to telemetry backends"); + + // Assert - workflow.run should have been started and stopped var startedWorkflowRuns = this._startedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) .ToList(); startedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be started"); - // Assert - workflow.run should have been stopped var stoppedWorkflowRuns = this._stoppedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) @@ -134,7 +159,7 @@ public async Task WorkflowRunActivity_IsStopped_OffThread() /// This matches the exact usage pattern described in the issue. /// [Fact] - public async Task WorkflowRunActivity_IsStopped_Streaming_OffThread() + public async Task WorkflowRunActivity_IsStopped_Streaming_OffThreadAsync() { // Arrange using var testActivity = new Activity("WorkflowRunStopTest_Streaming_OffThread").Start(); @@ -147,6 +172,13 @@ public async Task WorkflowRunActivity_IsStopped_Streaming_OffThread() // Consume all events } + // Assert - workflow.session should have been started + var startedSessions = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) + .ToList(); + startedSessions.Should().HaveCount(1, "workflow.session Activity should be started"); + // Assert - workflow.run should have been started var startedWorkflowRuns = this._startedActivities .Where(a => a.RootId == testActivity.RootId && @@ -163,12 +195,74 @@ public async Task WorkflowRunActivity_IsStopped_Streaming_OffThread() "workflow.run Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); } + /// + /// Verifies that a new workflow.run activity is started and stopped for each + /// streaming invocation, even when using the same workflow in a multi-turn pattern, + /// and that each session gets its own session activity. + /// + [Fact] + public async Task WorkflowRunActivity_IsStopped_Streaming_OffThread_MultiTurnAsync() + { + // Arrange + using var testActivity = new Activity("WorkflowRunStopTest_Streaming_OffThread_MultiTurn").Start(); + + var workflow = CreateWorkflow(); + + // Act - first streaming run + await using (StreamingRun run1 = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Hello, World!")) + { + await foreach (WorkflowEvent evt in run1.WatchStreamAsync()) + { + // Consume all events from first turn + } + } + + // Act - second streaming run (multi-turn scenario with same workflow) + await using (StreamingRun run2 = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Second turn!")) + { + await foreach (WorkflowEvent evt in run2.WatchStreamAsync()) + { + // Consume all events from second turn + } + } + + // Assert - two workflow.session activities should have been started and stopped + var startedSessions = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) + .ToList(); + startedSessions.Should().HaveCount(2, + "each streaming invocation should start its own workflow.session Activity"); + + var stoppedSessions = this._stoppedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) + .ToList(); + stoppedSessions.Should().HaveCount(2, + "each workflow.session Activity should be stopped/disposed so it is exported to telemetry backends"); + + // Assert - two workflow.run activities should have been started and stopped + var startedWorkflowRuns = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) + .ToList(); + startedWorkflowRuns.Should().HaveCount(2, + "each streaming invocation should start its own workflow.run Activity"); + + var stoppedWorkflowRuns = this._stoppedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) + .ToList(); + stoppedWorkflowRuns.Should().HaveCount(2, + "each workflow.run Activity should be stopped/disposed so it is exported to telemetry backends in multi-turn scenarios"); + } + /// /// Verifies that all started activities (not just workflow.run) are properly stopped. /// This ensures no spans are "leaked" without being exported. /// [Fact] - public async Task AllActivities_AreStopped_AfterWorkflowCompletion() + public async Task AllActivities_AreStopped_AfterWorkflowCompletionAsync() { // Arrange using var testActivity = new Activity("AllActivitiesStopTest").Start(); From ae784bf854135e848efcf3e7a8be65206300911f Mon Sep 17 00:00:00 2001 From: alliscode Date: Tue, 24 Feb 2026 14:11:16 -0800 Subject: [PATCH 3/7] Address Copilot review: fix Activity/CTS disposal, rename activity, add error tag\n\n1. LockstepRunEventStream: Remove 'using' from Activity in async iterator\n and manually dispose in finally block (fixes #4155 pattern). Also dispose\n linkedSource CTS in finally to prevent leak.\n2. Tags.cs: Add ErrorMessage (\"error.message\") tag for runtime errors,\n distinct from BuildErrorMessage (\"build.error.message\").\n3. ActivityNames: Rename WorkflowRun from \"workflow_invoke\" to \"workflow.run\"\n for cross-language consistency.\n4. WorkflowTelemetryContext: Fix XML doc to say \"outer/parent span\" instead\n of \"root-level span\".\n5. ObservabilityTests: Assert WorkflowSession absence when DisableWorkflowRun\n is true.\n6. WorkflowRunActivityStopTests: Fix streaming test race by disposing\n StreamingRun before asserting activities are stopped.\n7. StreamingRunEventStream/LockstepRunEventStream: Use Tags.ErrorMessage\n instead of Tags.BuildErrorMessage for runtime error events." --- .../Execution/LockstepRunEventStream.cs | 13 +++++++++++-- .../Execution/StreamingRunEventStream.cs | 6 +++--- .../Observability/ActivityNames.cs | 2 +- .../Observability/Tags.cs | 1 + .../Observability/WorkflowTelemetryContext.cs | 5 +++-- .../ObservabilityTests.cs | 3 +++ .../WorkflowRunActivityStopTests.cs | 9 ++++++--- 7 files changed, 28 insertions(+), 11 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs index 978278ebfa..f6cdb4f375 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs @@ -56,7 +56,11 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe this._stepRunner.OutgoingEvents.EventRaised += OnWorkflowEventAsync; - using Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); + // Do NOT use a 'using' declaration for the Activity inside an async iterator. + // In compiled async iterator state machines, 'using' locals are only disposed when + // DisposeAsync() is called on the enumerator. Activity.Stop (fired by Dispose) is + // time-sensitive for OpenTelemetry export, so we dispose explicitly in the finally block. + Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.SessionId, this._stepRunner.SessionId); try @@ -85,7 +89,7 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe { activity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() { { Tags.ErrorType, ex.GetType().FullName }, - { Tags.BuildErrorMessage, ex.Message }, + { Tags.ErrorMessage, ex.Message }, })); activity.CaptureException(ex); throw; @@ -141,6 +145,11 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe { this.RunStatus = this._stepRunner.HasUnservicedRequests ? RunStatus.PendingRequests : RunStatus.Idle; this._stepRunner.OutgoingEvents.EventRaised -= OnWorkflowEventAsync; + + // Explicitly dispose the Activity so Activity.Stop fires deterministically, + // regardless of how the async iterator enumerator is disposed. + activity?.Dispose(); + linkedSource.Dispose(); } ValueTask OnWorkflowEventAsync(object? sender, WorkflowEvent e) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index ddf871e57e..c91dc7998e 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -132,7 +132,7 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) { runActivity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() { { Tags.ErrorType, ex.GetType().FullName }, - { Tags.BuildErrorMessage, ex.Message }, + { Tags.ErrorMessage, ex.Message }, })); runActivity.CaptureException(ex); } @@ -142,7 +142,7 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) { sessionActivity.AddEvent(new ActivityEvent(EventNames.SessionError, tags: new() { { Tags.ErrorType, ex.GetType().FullName }, - { Tags.BuildErrorMessage, ex.Message }, + { Tags.ErrorMessage, ex.Message }, })); sessionActivity.CaptureException(ex); } @@ -157,7 +157,7 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) // Mark as ended when run loop exits this._runStatus = RunStatus.Ended; - // Safety net: stop the run-stage activity if not already stopped (e.g. on cancellation or error) + // Stop the run-stage activity if not already stopped (e.g. on cancellation or error) if (runActivity is not null) { runActivity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs index a89bddc00b..fc5da75755 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs @@ -6,7 +6,7 @@ internal static class ActivityNames { public const string WorkflowBuild = "workflow.build"; public const string WorkflowSession = "workflow.session"; - public const string WorkflowRun = "workflow_invoke"; + public const string WorkflowRun = "workflow.run"; public const string MessageSend = "message.send"; public const string ExecutorProcess = "executor.process"; public const string EdgeGroupProcess = "edge_group.process"; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs index 88c68eceb9..47ce701794 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs @@ -11,6 +11,7 @@ internal static class Tags public const string BuildErrorMessage = "build.error.message"; public const string BuildErrorType = "build.error.type"; public const string ErrorType = "error.type"; + public const string ErrorMessage = "error.message"; public const string SessionId = "session.id"; public const string ExecutorId = "executor.id"; public const string ExecutorType = "executor.type"; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs index ad3e071424..69dd8eb849 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs @@ -88,9 +88,10 @@ public WorkflowTelemetryContext(WorkflowTelemetryOptions options, ActivitySource } /// - /// Starts a workflow session activity if enabled. This is a root-level span + /// Starts a workflow session activity if enabled. This is the outer/parent span /// that represents the entire lifetime of a workflow execution (from start - /// until stop, cancellation, or error). Individual run stages are nested within it. + /// until stop, cancellation, or error) within the current trace. + /// Individual run stages are typically nested within it. /// /// An activity if workflow run telemetry is enabled, otherwise null. public Activity? StartWorkflowSessionActivity() diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs index b24cb479a1..6166a7abc6 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs @@ -277,6 +277,9 @@ public async Task DisableWorkflowRun_PreventsWorkflowRunActivityAsync() capturedActivities.Should().NotContain( a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal), "WorkflowRun activity should be disabled."); + capturedActivities.Should().NotContain( + a => a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal), + "WorkflowSession activity should also be disabled when DisableWorkflowRun is true."); capturedActivities.Should().Contain( a => a.OperationName.StartsWith(ActivityNames.WorkflowBuild, StringComparison.Ordinal), "Other activities should still be created."); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs index 20777c04a0..0ecb2d975a 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs @@ -2,12 +2,10 @@ using System; using System.Collections.Concurrent; -using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using FluentAssertions; -using Microsoft.Agents.AI.Workflows.InProc; using Microsoft.Agents.AI.Workflows.Observability; namespace Microsoft.Agents.AI.Workflows.UnitTests; @@ -166,12 +164,17 @@ public async Task WorkflowRunActivity_IsStopped_Streaming_OffThreadAsync() // Act - use streaming path (WatchStreamAsync), which is the pattern from the issue var workflow = CreateWorkflow(); - await using StreamingRun run = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Hello, World!"); + StreamingRun run = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Hello, World!"); await foreach (WorkflowEvent evt in run.WatchStreamAsync()) { // Consume all events } + // Dispose the run before asserting — the run Activity is disposed when the + // run loop exits, which happens during DisposeAsync. Without this, assertions + // can race against the background run loop's finally block. + await run.DisposeAsync(); + // Assert - workflow.session should have been started var startedSessions = this._startedActivities .Where(a => a.RootId == testActivity.RootId && From 1bd0a2a85e62a9dd7702bf5a8cfff656f75c2e34 Mon Sep 17 00:00:00 2001 From: alliscode Date: Tue, 24 Feb 2026 14:53:42 -0800 Subject: [PATCH 4/7] Review fixes: revert workflow_invoke rename, use 'using' for linkedSource, move SessionStarted earlier\n\n- Revert ActivityNames.WorkflowRun back to \"workflow_invoke\" (OTEL semantic convention contract)\n- Use 'using' declaration for linkedSource CTS in LockstepRunEventStream (no timing sensitivity)\n- Move SessionStarted event before WaitForInputAsync in StreamingRunEventStream to match Lockstep behavior" --- .../Execution/LockstepRunEventStream.cs | 11 ++++------- .../Execution/StreamingRunEventStream.cs | 3 ++- .../Observability/ActivityNames.cs | 2 +- .../Observability/WorkflowTelemetryContext.cs | 2 +- .../ObservabilityTests.cs | 8 ++++---- .../WorkflowRunActivityStopTests.cs | 16 ++++++++-------- 6 files changed, 20 insertions(+), 22 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs index f6cdb4f375..8db06d942e 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs @@ -50,16 +50,14 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe } #endif - CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(this._stopCancellation.Token, cancellationToken); + using CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(this._stopCancellation.Token, cancellationToken); ConcurrentQueue eventSink = []; this._stepRunner.OutgoingEvents.EventRaised += OnWorkflowEventAsync; - // Do NOT use a 'using' declaration for the Activity inside an async iterator. - // In compiled async iterator state machines, 'using' locals are only disposed when - // DisposeAsync() is called on the enumerator. Activity.Stop (fired by Dispose) is - // time-sensitive for OpenTelemetry export, so we dispose explicitly in the finally block. + // Not 'using' — Activity.Stop must fire deterministically in the finally block + // for OpenTelemetry export, not deferred to the enumerator's DisposeAsync. Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.SessionId, this._stepRunner.SessionId); @@ -149,7 +147,6 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe // Explicitly dispose the Activity so Activity.Stop fires deterministically, // regardless of how the async iterator enumerator is disposed. activity?.Dispose(); - linkedSource.Dispose(); } ValueTask OnWorkflowEventAsync(object? sender, WorkflowEvent e) @@ -187,7 +184,7 @@ public ValueTask DisposeAsync() { this._stopCancellation.Cancel(); - // Stop the session activity — the session ends when the stream is disposed + // Stop the session activity if (this._sessionActivity is not null) { this._sessionActivity.AddEvent(new ActivityEvent(EventNames.SessionCompleted)); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index c91dc7998e..a09dedd8ad 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -68,6 +68,8 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) Activity? runActivity = null; + sessionActivity?.AddEvent(new ActivityEvent(EventNames.SessionStarted)); + try { // Wait for the first input before starting @@ -75,7 +77,6 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) await this._inputWaiter.WaitForInputAsync(cancellationToken: linkedSource.Token).ConfigureAwait(false); this._runStatus = RunStatus.Running; - sessionActivity?.AddEvent(new ActivityEvent(EventNames.SessionStarted)); while (!linkedSource.Token.IsCancellationRequested) { diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs index fc5da75755..1639fc3c3c 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs @@ -6,7 +6,7 @@ internal static class ActivityNames { public const string WorkflowBuild = "workflow.build"; public const string WorkflowSession = "workflow.session"; - public const string WorkflowRun = "workflow.run"; + public const string WorkflowInvoke = "workflow_invoke"; public const string MessageSend = "message.send"; public const string ExecutorProcess = "executor.process"; public const string EdgeGroupProcess = "edge_group.process"; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs index 69dd8eb849..974ffce5c5 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs @@ -116,7 +116,7 @@ public WorkflowTelemetryContext(WorkflowTelemetryOptions options, ActivitySource return null; } - return this.ActivitySource.StartActivity(ActivityNames.WorkflowRun); + return this.ActivitySource.StartActivity(ActivityNames.WorkflowInvoke); } /// diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs index 6166a7abc6..62768dea4b 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs @@ -76,7 +76,7 @@ private static Dictionary GetExpectedActivityNameCounts() => { { ActivityNames.WorkflowBuild, 1 }, { ActivityNames.WorkflowSession, 1 }, - { ActivityNames.WorkflowRun, 1 }, + { ActivityNames.WorkflowInvoke, 1 }, { ActivityNames.EdgeGroupProcess, 2 }, { ActivityNames.ExecutorProcess, 2 }, { ActivityNames.MessageSend, 2 } @@ -127,7 +127,7 @@ private async Task TestWorkflowEndToEndActivitiesAsync(string executionEnvironme } // Verify WorkflowRun activity events include workflow lifecycle events - var workflowRunActivity = capturedActivities.First(a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)); + var workflowRunActivity = capturedActivities.First(a => a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)); var activityEvents = workflowRunActivity.Events.ToList(); activityEvents.Should().Contain(e => e.Name == EventNames.WorkflowStarted, "activity should have workflow started event"); activityEvents.Should().Contain(e => e.Name == EventNames.WorkflowCompleted, "activity should have workflow completed event"); @@ -275,7 +275,7 @@ public async Task DisableWorkflowRun_PreventsWorkflowRunActivityAsync() // Assert var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); capturedActivities.Should().NotContain( - a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal), + a => a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal), "WorkflowRun activity should be disabled."); capturedActivities.Should().NotContain( a => a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal), @@ -308,7 +308,7 @@ public async Task DisableExecutorProcess_PreventsExecutorProcessActivityAsync() a => a.OperationName.StartsWith(ActivityNames.ExecutorProcess, StringComparison.Ordinal), "ExecutorProcess activity should be disabled."); capturedActivities.Should().Contain( - a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal), + a => a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal), "Other activities should still be created."); } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs index 0ecb2d975a..ef458a83eb 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs @@ -95,13 +95,13 @@ public async Task WorkflowRunActivity_IsStopped_LockstepAsync() // Assert - workflow.run should have been started and stopped var startedWorkflowRuns = this._startedActivities .Where(a => a.RootId == testActivity.RootId && - a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) + a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); startedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be started"); var stoppedWorkflowRuns = this._stoppedActivities .Where(a => a.RootId == testActivity.RootId && - a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) + a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); stoppedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); @@ -139,13 +139,13 @@ public async Task WorkflowRunActivity_IsStopped_OffThreadAsync() // Assert - workflow.run should have been started and stopped var startedWorkflowRuns = this._startedActivities .Where(a => a.RootId == testActivity.RootId && - a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) + a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); startedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be started"); var stoppedWorkflowRuns = this._stoppedActivities .Where(a => a.RootId == testActivity.RootId && - a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) + a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); stoppedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); @@ -185,14 +185,14 @@ public async Task WorkflowRunActivity_IsStopped_Streaming_OffThreadAsync() // Assert - workflow.run should have been started var startedWorkflowRuns = this._startedActivities .Where(a => a.RootId == testActivity.RootId && - a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) + a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); startedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be started"); // Assert - workflow.run should have been stopped var stoppedWorkflowRuns = this._stoppedActivities .Where(a => a.RootId == testActivity.RootId && - a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) + a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); stoppedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); @@ -247,14 +247,14 @@ public async Task WorkflowRunActivity_IsStopped_Streaming_OffThread_MultiTurnAsy // Assert - two workflow.run activities should have been started and stopped var startedWorkflowRuns = this._startedActivities .Where(a => a.RootId == testActivity.RootId && - a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) + a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); startedWorkflowRuns.Should().HaveCount(2, "each streaming invocation should start its own workflow.run Activity"); var stoppedWorkflowRuns = this._stoppedActivities .Where(a => a.RootId == testActivity.RootId && - a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) + a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); stoppedWorkflowRuns.Should().HaveCount(2, "each workflow.run Activity should be stopped/disposed so it is exported to telemetry backends in multi-turn scenarios"); From b28dacea6c9739ab020cb9132f3c96ad6ae2828a Mon Sep 17 00:00:00 2001 From: alliscode Date: Tue, 24 Feb 2026 15:07:14 -0800 Subject: [PATCH 5/7] Improve naming and comments in WorkflowRunActivityStopTests" --- .../WorkflowRunActivityStopTests.cs | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs index ef458a83eb..2e93ac834f 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs @@ -12,8 +12,8 @@ namespace Microsoft.Agents.AI.Workflows.UnitTests; /// /// Regression test for https://github.com/microsoft/agent-framework/issues/4155 -/// Verifies that the workflow.run Activity is properly stopped/disposed so it gets exported -/// to telemetry backends. The ActivityStopped callback must fire for the workflow.run span. +/// Verifies that the workflow_invoke Activity is properly stopped/disposed so it gets exported +/// to telemetry backends. The ActivityStopped callback must fire for the workflow_invoke span. /// [Collection("ObservabilityTests")] public sealed class WorkflowRunActivityStopTests : IDisposable @@ -62,7 +62,7 @@ private static Workflow CreateWorkflow() } /// - /// Verifies that the workflow.run Activity is stopped (and thus exportable) when + /// Verifies that the workflow_invoke Activity is stopped (and thus exportable) when /// using the Lockstep execution environment. /// Bug: The Activity created by LockstepRunEventStream.TakeEventStreamAsync is never /// disposed because yield break in async iterators does not trigger using disposal. @@ -92,23 +92,23 @@ public async Task WorkflowRunActivity_IsStopped_LockstepAsync() stoppedSessions.Should().HaveCount(1, "workflow.session Activity should be stopped/disposed so it is exported to telemetry backends"); - // Assert - workflow.run should have been started and stopped + // Assert - workflow_invoke should have been started and stopped var startedWorkflowRuns = this._startedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); - startedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be started"); + startedWorkflowRuns.Should().HaveCount(1, "workflow_invoke Activity should be started"); var stoppedWorkflowRuns = this._stoppedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); stoppedWorkflowRuns.Should().HaveCount(1, - "workflow.run Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); + "workflow_invoke Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); } /// - /// Verifies that the workflow.run Activity is stopped when using the OffThread (Default) + /// Verifies that the workflow_invoke Activity is stopped when using the OffThread (Default) /// execution environment (StreamingRunEventStream). /// [Fact] @@ -136,23 +136,23 @@ public async Task WorkflowRunActivity_IsStopped_OffThreadAsync() stoppedSessions.Should().HaveCount(1, "workflow.session Activity should be stopped/disposed so it is exported to telemetry backends"); - // Assert - workflow.run should have been started and stopped + // Assert - workflow_invoke should have been started and stopped var startedWorkflowRuns = this._startedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); - startedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be started"); + startedWorkflowRuns.Should().HaveCount(1, "workflow_invoke Activity should be started"); var stoppedWorkflowRuns = this._stoppedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); stoppedWorkflowRuns.Should().HaveCount(1, - "workflow.run Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); + "workflow_invoke Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); } /// - /// Verifies that the workflow.run Activity is stopped when using the streaming API + /// Verifies that the workflow_invoke Activity is stopped when using the streaming API /// (StreamingRun.WatchStreamAsync) with the OffThread execution environment. /// This matches the exact usage pattern described in the issue. /// @@ -182,24 +182,24 @@ public async Task WorkflowRunActivity_IsStopped_Streaming_OffThreadAsync() .ToList(); startedSessions.Should().HaveCount(1, "workflow.session Activity should be started"); - // Assert - workflow.run should have been started + // Assert - workflow_invoke should have been started var startedWorkflowRuns = this._startedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); - startedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be started"); + startedWorkflowRuns.Should().HaveCount(1, "workflow_invoke Activity should be started"); - // Assert - workflow.run should have been stopped + // Assert - workflow_invoke should have been stopped var stoppedWorkflowRuns = this._stoppedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); stoppedWorkflowRuns.Should().HaveCount(1, - "workflow.run Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); + "workflow_invoke Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); } /// - /// Verifies that a new workflow.run activity is started and stopped for each + /// Verifies that a new workflow_invoke activity is started and stopped for each /// streaming invocation, even when using the same workflow in a multi-turn pattern, /// and that each session gets its own session activity. /// @@ -244,24 +244,24 @@ public async Task WorkflowRunActivity_IsStopped_Streaming_OffThread_MultiTurnAsy stoppedSessions.Should().HaveCount(2, "each workflow.session Activity should be stopped/disposed so it is exported to telemetry backends"); - // Assert - two workflow.run activities should have been started and stopped + // Assert - two workflow_invoke activities should have been started and stopped var startedWorkflowRuns = this._startedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); startedWorkflowRuns.Should().HaveCount(2, - "each streaming invocation should start its own workflow.run Activity"); + "each streaming invocation should start its own workflow_invoke Activity"); var stoppedWorkflowRuns = this._stoppedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); stoppedWorkflowRuns.Should().HaveCount(2, - "each workflow.run Activity should be stopped/disposed so it is exported to telemetry backends in multi-turn scenarios"); + "each workflow_invoke Activity should be stopped/disposed so it is exported to telemetry backends in multi-turn scenarios"); } /// - /// Verifies that all started activities (not just workflow.run) are properly stopped. + /// Verifies that all started activities (not just workflow_invoke) are properly stopped. /// This ensures no spans are "leaked" without being exported. /// [Fact] From c53e3316d8ead4206fd0a2a73eea82758ef86a0a Mon Sep 17 00:00:00 2001 From: alliscode Date: Tue, 24 Feb 2026 16:09:17 -0800 Subject: [PATCH 6/7] Prevent session Activity.Current leak in lockstep mode, add nesting test Save and restore Activity.Current in LockstepRunEventStream.Start() so the session activity doesn't leak into caller code via AsyncLocal. Re-establish Activity.Current = sessionActivity before creating the run activity in TakeEventStreamAsync to preserve parent-child nesting. Add test verifying app activities after RunAsync are not parented under the session, and that the workflow_invoke activity nests under the session." --- .../Execution/LockstepRunEventStream.cs | 32 ++++++++------ .../WorkflowRunActivityStopTests.cs | 43 +++++++++++++++++++ 2 files changed, 62 insertions(+), 13 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs index 8db06d942e..506a0d1039 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs @@ -31,12 +31,16 @@ public LockstepRunEventStream(ISuperStepRunner stepRunner) public void Start() { - // Start the session-level activity that spans the entire lockstep execution lifetime. - // Individual run-stage activities are nested within this session activity. + // Save and restore Activity.Current so the long-lived session activity + // doesn't leak into caller code via AsyncLocal. + Activity? previousActivity = Activity.Current; + this._sessionActivity = this._stepRunner.TelemetryContext.StartWorkflowSessionActivity(); this._sessionActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId) .SetTag(Tags.SessionId, this._stepRunner.SessionId); this._sessionActivity?.AddEvent(new ActivityEvent(EventNames.SessionStarted)); + + Activity.Current = previousActivity; } public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPendingRequest, [EnumeratorCancellation] CancellationToken cancellationToken = default) @@ -56,15 +60,17 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe this._stepRunner.OutgoingEvents.EventRaised += OnWorkflowEventAsync; - // Not 'using' — Activity.Stop must fire deterministically in the finally block - // for OpenTelemetry export, not deferred to the enumerator's DisposeAsync. - Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); - activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.SessionId, this._stepRunner.SessionId); + // Re-establish session as parent so the run activity nests correctly. + Activity.Current = this._sessionActivity; + + // Not 'using' — must dispose explicitly in finally for deterministic export. + Activity? runActivity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); + runActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.SessionId, this._stepRunner.SessionId); try { this.RunStatus = RunStatus.Running; - activity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted)); + runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted)); do { @@ -73,7 +79,7 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe { // Because we may be yielding out of this function, we need to ensure that the Activity.Current // is set to our activity for the duration of this loop iteration. - Activity.Current = activity; + Activity.Current = runActivity; // Drain SuperSteps while there are steps to run try @@ -83,13 +89,13 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe catch (OperationCanceledException) { } - catch (Exception ex) when (activity is not null) + catch (Exception ex) when (runActivity is not null) { - activity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() { + runActivity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() { { Tags.ErrorType, ex.GetType().FullName }, { Tags.ErrorMessage, ex.Message }, })); - activity.CaptureException(ex); + runActivity.CaptureException(ex); throw; } @@ -137,7 +143,7 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe } } while (!ShouldBreak()); - activity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); + runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); } finally { @@ -146,7 +152,7 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe // Explicitly dispose the Activity so Activity.Stop fires deterministically, // regardless of how the async iterator enumerator is disposed. - activity?.Dispose(); + runActivity?.Dispose(); } ValueTask OnWorkflowEventAsync(object? sender, WorkflowEvent e) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs index 2e93ac834f..f35910f26b 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs @@ -298,4 +298,47 @@ public async Task AllActivities_AreStopped_AfterWorkflowCompletionAsync() $"Activities started but never stopped: [{string.Join(", ", neverStoppedNames)}]"); } } + + /// + /// Verifies that Activity.Current is not leaked after lockstep RunAsync. + /// Application code creating activities after RunAsync returns should not + /// be parented under the workflow session span. The run activity should + /// still nest correctly under the session. + /// + [Fact] + public async Task Lockstep_SessionActivity_DoesNotLeak_IntoCaller_ActivityCurrentAsync() + { + // Arrange + using var testActivity = new Activity("SessionLeakTest").Start(); + var workflow = CreateWorkflow(); + + // Act — run the workflow via lockstep (Start + drain happen inside RunAsync) + Run run = await InProcessExecution.Lockstep.RunAsync(workflow, "Hello, World!"); + + // Create an application activity after RunAsync returns. + // If the session leaked into Activity.Current, this would be parented under it. + using var appActivity = new Activity("AppWork").Start(); + appActivity.Stop(); + + await run.DisposeAsync(); + + // Assert — the app activity should be parented under the test root, not the session + var sessionActivities = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) + .ToList(); + sessionActivities.Should().HaveCount(1, "one session activity should exist"); + + appActivity.ParentId.Should().Be(testActivity.Id, + "application activity should be parented under the test root, not the workflow session"); + + // Assert — the run activity should still be parented under the session + var invokeActivities = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) + .ToList(); + invokeActivities.Should().HaveCount(1, "one workflow_invoke activity should exist"); + invokeActivities[0].ParentId.Should().Be(sessionActivities[0].Id, + "workflow_invoke activity should be nested under the session activity"); + } } From 6468ee50051078fefd753d47aa676daa949d9e41 Mon Sep 17 00:00:00 2001 From: alliscode Date: Tue, 24 Feb 2026 16:38:13 -0800 Subject: [PATCH 7/7] Fix stale XML doc: WorkflowRun -> WorkflowInvoke in ObservabilityTests --- .../ObservabilityTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs index 62768dea4b..af8a9d8e0d 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs @@ -46,7 +46,7 @@ public ObservabilityTests() /// This workflow is expected to create 9 activities that will be captured by the tests /// - ActivityNames.WorkflowBuild /// - ActivityNames.WorkflowSession - /// -- ActivityNames.WorkflowRun + /// -- ActivityNames.WorkflowInvoke /// --- ActivityNames.EdgeGroupProcess /// --- ActivityNames.ExecutorProcess (UppercaseExecutor) /// ---- ActivityNames.MessageSend