From b88cbd3b43d020db137f80cf974355b85ab7dc27 Mon Sep 17 00:00:00 2001 From: Chandramouleswaran Ravichandran Date: Wed, 15 Apr 2026 18:33:46 -0700 Subject: [PATCH 1/3] Preserve durable trace context in SQL history Keep orchestration replay span identity and sub-orchestration client span IDs when round-tripping SQL trace context so exported spans stitch correctly across continuations and nested sub-orchestrations. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/DurableTask.SqlServer/SqlUtils.cs | 88 +++- .../Integration/Orchestrations.cs | 417 ++++++++++++++++++ .../Unit/SqlUtilsTraceContextTests.cs | 158 +++++++ 3 files changed, 659 insertions(+), 4 deletions(-) create mode 100644 test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTraceContextTests.cs diff --git a/src/DurableTask.SqlServer/SqlUtils.cs b/src/DurableTask.SqlServer/SqlUtils.cs index 84983f4..34ce3db 100644 --- a/src/DurableTask.SqlServer/SqlUtils.cs +++ b/src/DurableTask.SqlServer/SqlUtils.cs @@ -24,6 +24,10 @@ static class SqlUtils { static readonly Random random = new Random(); static readonly char[] TraceContextSeparators = new char[] { '\n' }; + const string TraceContextTraceStatePrefix = "@tracestate="; + const string TraceContextIdPrefix = "@id="; + const string TraceContextSpanIdPrefix = "@spanid="; + const string TraceContextClientSpanIdPrefix = "@clientspanid="; const int MaxTagsPayloadSize = 8000; public static string? GetStringOrNull(this DbDataReader reader, int columnIndex) @@ -135,6 +139,7 @@ public static HistoryEvent GetHistoryEvent(this DbDataReader reader, bool isOrch { Input = GetPayloadText(reader), InstanceId = "", // Placeholder - shouldn't technically be needed (adding it requires a SQL schema change) + ClientSpanId = GetSubOrchestrationClientSpanId(reader), Name = GetName(reader), Version = null, }; @@ -441,6 +446,16 @@ static DateTime GetUtcDateTime(DbDataReader reader, int ordinal) internal static SqlString GetTraceContext(HistoryEvent e) { + if (e is SubOrchestrationInstanceCreatedEvent subOrchestrationEvent) + { + if (string.IsNullOrEmpty(subOrchestrationEvent.ClientSpanId)) + { + return SqlString.Null; + } + + return new SqlString($"{TraceContextClientSpanIdPrefix}{subOrchestrationEvent.ClientSpanId}"); + } + if (e is not ISupportsDurableTraceContext eventWithTraceContext || eventWithTraceContext.ParentTraceContext == null) { @@ -452,7 +467,24 @@ internal static SqlString GetTraceContext(HistoryEvent e) // We prefer a simple format instead of JSON because external callers may interact with this // data and we don't want to expose them to some internal JSON serialization format. var sb = new StringBuilder(traceContext.TraceParent, capacity: 800); - if (!string.IsNullOrEmpty(traceContext.TraceState)) + if (!string.IsNullOrEmpty(traceContext.Id) || !string.IsNullOrEmpty(traceContext.SpanId)) + { + if (!string.IsNullOrEmpty(traceContext.TraceState)) + { + sb.Append('\n').Append(TraceContextTraceStatePrefix).Append(traceContext.TraceState); + } + + if (!string.IsNullOrEmpty(traceContext.Id)) + { + sb.Append('\n').Append(TraceContextIdPrefix).Append(traceContext.Id); + } + + if (!string.IsNullOrEmpty(traceContext.SpanId)) + { + sb.Append('\n').Append(TraceContextSpanIdPrefix).Append(traceContext.SpanId); + } + } + else if (!string.IsNullOrEmpty(traceContext.TraceState)) { sb.Append('\n').Append(traceContext.TraceState); } @@ -474,18 +506,66 @@ internal static SqlString GetTraceContext(HistoryEvent e) return null; } - string[] parts = text.Split(TraceContextSeparators, count: 2, StringSplitOptions.RemoveEmptyEntries); + string[] parts = text.Split(TraceContextSeparators, StringSplitOptions.None); var traceContext = new DistributedTraceContext(traceParent: parts[0]); - if (parts.Length > 1) + for (int i = 1; i < parts.Length; i++) { - traceContext.TraceState = parts[1]; + string part = parts[i]; + if (string.IsNullOrEmpty(part)) + { + continue; + } + + if (part.StartsWith(TraceContextTraceStatePrefix, StringComparison.Ordinal)) + { + traceContext.TraceState = part.Substring(TraceContextTraceStatePrefix.Length); + } + else if (part.StartsWith(TraceContextIdPrefix, StringComparison.Ordinal)) + { + traceContext.Id = part.Substring(TraceContextIdPrefix.Length); + } + else if (part.StartsWith(TraceContextSpanIdPrefix, StringComparison.Ordinal)) + { + traceContext.SpanId = part.Substring(TraceContextSpanIdPrefix.Length); + } + else if (traceContext.TraceState == null) + { + // Preserve the legacy format, where the optional second line stored only tracestate. + traceContext.TraceState = part; + } } traceContext.ActivityStartTime = GetTimestamp(reader); return traceContext; } + static string? GetSubOrchestrationClientSpanId(DbDataReader reader) + { + int ordinal = reader.GetOrdinal("TraceContext"); + if (reader.IsDBNull(ordinal)) + { + return null; + } + + string text = reader.GetString(ordinal); + if (string.IsNullOrEmpty(text)) + { + return null; + } + + string[] parts = text.Split(TraceContextSeparators, StringSplitOptions.None); + foreach (string part in parts) + { + if (part.StartsWith(TraceContextClientSpanIdPrefix, StringComparison.Ordinal)) + { + return part.Substring(TraceContextClientSpanIdPrefix.Length); + } + } + + return null; + } + internal static IDictionary? GetTags(DbDataReader reader) { int ordinal = reader.GetOrdinal("Tags"); diff --git a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs index e80f8b5..31e0113 100644 --- a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs +++ b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs @@ -690,6 +690,12 @@ public async Task RetryFailedSubOrchestration(bool userSpecifiedInstanceId) await parentInstance.WaitForCompletion(expectedOutput: true); } + // This regression locks down the expected DurableTask span shape for a simple + // parent orchestration -> sub-orchestration -> activity flow. The important + // nuance is that an activity under a sub-orchestration is nested via an + // activity client span, so the hierarchy is: + // parent orchestration server -> sub-orchestration client -> sub-orchestration server + // -> activity client -> activity server. [Fact] public async Task TraceContextFlowCorrectly() { @@ -800,6 +806,15 @@ public async Task TraceContextFlowCorrectly() Assert.True(subOrchestratorSpan.Duration > delay, $"Unexpected duration: {subOrchestratorSpan.Duration}"); Assert.True(subOrchestratorSpan.Duration < delay * 2, $"Unexpected duration: {subOrchestratorSpan.Duration}"); + Activity subOrchestratorClientSpan = exportedItems.LastOrDefault( + span => span.OperationName == $"orchestration:{subOrchestrationName}" && span.Kind == ActivityKind.Client); + Assert.NotNull(subOrchestratorClientSpan); + + // The sub-orchestration execution span hangs off the sub-orchestration client span, + // which in turn hangs off the parent orchestration execution span. + Assert.Equal(orchestratorSpan.SpanId, subOrchestratorClientSpan.ParentSpanId); + Assert.Equal(subOrchestratorClientSpan.SpanId, subOrchestratorSpan.ParentSpanId); + // Validate the activity span, which should be a subset of the sub-orchestration span Activity activitySpan = exportedItems.LastOrDefault( span => span.OperationName == $"activity:{activityName}" && span.Kind == ActivityKind.Server); @@ -811,6 +826,299 @@ public async Task TraceContextFlowCorrectly() Assert.True(activitySpan.Duration < subOrchestratorSpan.Duration); Assert.True(activitySpan.Duration > delay); Assert.True(activitySpan.Duration < delay * 2); + + Activity activityClientSpan = exportedItems.LastOrDefault( + span => span.OperationName == $"activity:{activityName}" && span.Kind == ActivityKind.Client); + Assert.NotNull(activityClientSpan); + + // This is the key shape for selling the trace fix: an activity scheduled by a + // sub-orchestrator must remain inside that sub-orchestrator's execution span + // hierarchy rather than floating at the root with a dangling parent. + Assert.Equal(subOrchestratorSpan.SpanId, activityClientSpan.ParentSpanId); + Assert.Equal(activityClientSpan.SpanId, activitySpan.ParentSpanId); + } + + // This regression protects the SQL-specific bug where an orchestration reload could + // generate a fresh execution span ID, leaving later activity completion and timer + // spans pointing at a parent that no longer exists in the exported trace. + [Fact] + public async Task TraceContextMaintainsStableOrchestrationSpanAcrossContinuations() + { + string traceSourceName = "MyTraceSource"; + string orchestrationName = "ContinuationTraceContextOrchestration"; + string[] activityNames = { "FirstActivity", "SecondActivity" }; + TimeSpan delay = TimeSpan.FromMilliseconds(250); + + var exportedItems = new List(); + using TracerProvider tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource(traceSourceName, "DurableTask.Core") + .ConfigureResource(r => r.AddService("Test")) + .AddInMemoryExporter(exportedItems) + .Build(); + + using var traceSource = new ActivitySource(traceSourceName); + using var clientSpan = traceSource.StartActivity("TestSpan"); + clientSpan.TraceStateString = "TestTraceState"; + + TestInstance instance = await this.testService.RunOrchestration( + input: "input", + orchestrationName: orchestrationName, + implementation: async (ctx, input) => + { + string first = await ctx.ScheduleTask(activityNames[0], "", input); + await ctx.CreateTimer(ctx.CurrentUtcDateTime.Add(delay), ""); + string second = await ctx.ScheduleTask(activityNames[1], "", input); + await ctx.CreateTimer(ctx.CurrentUtcDateTime.Add(delay), ""); + return string.Join(",", first, second); + }, + activities: new[] + { + (activityNames[0], TestService.MakeActivity((TaskContext ctx, string input) => $"first-{input}")), + (activityNames[1], TestService.MakeActivity((TaskContext ctx, string input) => $"second-{input}")), + }); + + await instance.WaitForCompletion(expectedOutput: "first-input,second-input"); + + clientSpan.Stop(); + tracerProvider.ForceFlush(); + + Activity[] orchestrationSpans = exportedItems + .Where(span => span.OperationName == $"orchestration:{orchestrationName}" && span.Kind == ActivityKind.Server) + .ToArray(); + Assert.NotEmpty(orchestrationSpans); + + ActivitySpanId[] orchestrationSpanIds = orchestrationSpans + .Select(span => span.SpanId) + .Distinct() + .ToArray(); + Assert.Single(orchestrationSpanIds); + + ActivitySpanId orchestrationSpanId = orchestrationSpanIds[0]; + var activityOperationNames = new HashSet(activityNames.Select(name => $"activity:{name}")); + + Activity[] completionSpans = exportedItems + .Where(span => span.Kind == ActivityKind.Client && activityOperationNames.Contains(span.OperationName)) + .ToArray(); + Assert.Equal(2, completionSpans.Length); + Assert.All(completionSpans, span => Assert.Equal(orchestrationSpanId, span.ParentSpanId)); + + Activity[] timerSpans = exportedItems + .Where(span => span.Kind == ActivityKind.Internal && span.OperationName == $"orchestration:{orchestrationName}:timer") + .ToArray(); + Assert.Equal(2, timerSpans.Length); + Assert.All(timerSpans, span => Assert.Equal(orchestrationSpanId, span.ParentSpanId)); + } + + // This test observes raw Activity objects directly, without relying on exporter + // behavior, so it can fail fast if any emitted ParentSpanId refers to a span that + // was never actually produced. The hierarchy intentionally exercises nested + // sub-orchestrations plus an activity at the deepest level. + [Fact] + public async Task ActivityListenerCapturesNestedSubOrchestrationHierarchyWithoutMissingParents() + { + string traceSourceName = "ActivityListenerNestedTraceSource"; + string parentOrchestrationName = "ActivityListenerParentOrchestration"; + string childOrchestrationName = "ActivityListenerChildOrchestration"; + string grandchildOrchestrationName = "ActivityListenerGrandchildOrchestration"; + string[] activityNames = + { + "ActivityListenerParentActivity", + "ActivityListenerChildActivity", + "ActivityListenerGrandchildActivity", + "ActivityListenerAfterSubOrchestrationActivity", + }; + TimeSpan delay = TimeSpan.FromMilliseconds(150); + + using var listener = new ActivityCaptureListener(traceSourceName, "DurableTask.Core"); + using var traceSource = new ActivitySource(traceSourceName); + using Activity incomingRequest = traceSource.StartActivity("IncomingRequest") ?? + throw new InvalidOperationException("Failed to start the incoming request activity."); + + this.testService.RegisterInlineOrchestration( + grandchildOrchestrationName, + implementation: async (ctx, input) => + { + await ctx.CreateTimer(ctx.CurrentUtcDateTime.Add(delay), string.Empty); + return await ctx.ScheduleTask(activityNames[2], version: string.Empty, input); + }); + + this.testService.RegisterInlineOrchestration( + childOrchestrationName, + implementation: async (ctx, input) => + { + string childActivity = await ctx.ScheduleTask(activityNames[1], version: string.Empty, input); + await ctx.CreateTimer(ctx.CurrentUtcDateTime.Add(delay), string.Empty); + string grandchild = await ctx.CreateSubOrchestrationInstance(grandchildOrchestrationName, version: string.Empty, input); + return string.Join("|", childActivity, grandchild); + }); + + TestInstance instance = await this.testService.RunOrchestration( + input: "payload", + orchestrationName: parentOrchestrationName, + implementation: async (ctx, input) => + { + string parentActivity = await ctx.ScheduleTask(activityNames[0], version: string.Empty, input); + await ctx.CreateTimer(ctx.CurrentUtcDateTime.Add(delay), string.Empty); + string child = await ctx.CreateSubOrchestrationInstance(childOrchestrationName, version: string.Empty, input); + await ctx.CreateTimer(ctx.CurrentUtcDateTime.Add(delay), string.Empty); + string afterChild = await ctx.ScheduleTask(activityNames[3], version: string.Empty, input); + return string.Join("|", parentActivity, child, afterChild); + }, + activities: new[] + { + (activityNames[0], TestService.MakeActivity((TaskContext ctx, string input) => $"parent-{input}")), + (activityNames[1], TestService.MakeActivity((TaskContext ctx, string input) => $"child-{input}")), + (activityNames[2], TestService.MakeActivity((TaskContext ctx, string input) => $"grandchild-{input}")), + (activityNames[3], TestService.MakeActivity((TaskContext ctx, string input) => $"after-{input}")), + }); + + await instance.WaitForCompletion(expectedOutput: "parent-payload|child-payload|grandchild-payload|after-payload"); + + incomingRequest.Stop(); + + CapturedActivity[] traceActivities = listener.GetActivities(incomingRequest.TraceId); + this.WriteCapturedActivities(traceActivities); + + // The bug we are guarding against is "parentSpanId emitted, but the parent span + // does not exist". This assertion checks exactly that against the raw listener feed. + AssertNoMissingParents(traceActivities); + + CapturedActivity parentOrchestration = GetUniqueSpan( + traceActivities, + $"orchestration:{parentOrchestrationName}", + ActivityKind.Server); + CapturedActivity childOrchestrationClient = GetUniqueSpan( + traceActivities, + $"orchestration:{childOrchestrationName}", + ActivityKind.Client); + CapturedActivity childOrchestrationServer = GetUniqueSpan( + traceActivities, + $"orchestration:{childOrchestrationName}", + ActivityKind.Server); + CapturedActivity grandchildOrchestrationClient = GetUniqueSpan( + traceActivities, + $"orchestration:{grandchildOrchestrationName}", + ActivityKind.Client); + CapturedActivity grandchildOrchestrationServer = GetUniqueSpan( + traceActivities, + $"orchestration:{grandchildOrchestrationName}", + ActivityKind.Server); + CapturedActivity grandchildActivityClient = GetUniqueSpan( + traceActivities, + $"activity:{activityNames[2]}", + ActivityKind.Client); + CapturedActivity grandchildActivityServer = GetUniqueSpan( + traceActivities, + $"activity:{activityNames[2]}", + ActivityKind.Server); + + // Expected nesting: + // parent orchestration server + // -> child sub-orchestration client + // -> child sub-orchestration server + // -> grandchild sub-orchestration client + // -> grandchild sub-orchestration server + // -> grandchild activity client + // -> grandchild activity server + Assert.Equal(parentOrchestration.SpanId, childOrchestrationClient.ParentSpanId); + Assert.Equal(childOrchestrationClient.SpanId, childOrchestrationServer.ParentSpanId); + Assert.Equal(childOrchestrationServer.SpanId, grandchildOrchestrationClient.ParentSpanId); + Assert.Equal(grandchildOrchestrationClient.SpanId, grandchildOrchestrationServer.ParentSpanId); + Assert.Equal(grandchildOrchestrationServer.SpanId, grandchildActivityClient.ParentSpanId); + Assert.Equal(grandchildActivityClient.SpanId, grandchildActivityServer.ParentSpanId); + } + + // This test covers sibling sub-orchestrations that run under the same parent. It + // ensures each invocation gets its own client/server pair and that the activities + // inside each child orchestration remain attached to the corresponding child span + // instead of pointing to a missing or reused parent. + [Fact] + public async Task ActivityListenerCapturesRepeatedSubOrchestrationsWithoutMissingParents() + { + string traceSourceName = "ActivityListenerRepeatedTraceSource"; + string parentOrchestrationName = "ActivityListenerRepeatedParentOrchestration"; + string childOrchestrationName = "ActivityListenerRepeatedChildOrchestration"; + string childActivityName = "ActivityListenerRepeatedChildActivity"; + TimeSpan delay = TimeSpan.FromMilliseconds(125); + + using var listener = new ActivityCaptureListener(traceSourceName, "DurableTask.Core"); + using var traceSource = new ActivitySource(traceSourceName); + using Activity incomingRequest = traceSource.StartActivity("IncomingRequest") ?? + throw new InvalidOperationException("Failed to start the incoming request activity."); + + this.testService.RegisterInlineOrchestration( + childOrchestrationName, + implementation: async (ctx, input) => + { + string activityResult = await ctx.ScheduleTask(childActivityName, version: string.Empty, input); + await ctx.CreateTimer(ctx.CurrentUtcDateTime.Add(delay), string.Empty); + return activityResult; + }); + + TestInstance instance = await this.testService.RunOrchestration( + input: "payload", + orchestrationName: parentOrchestrationName, + implementation: async (ctx, input) => + { + string first = await ctx.CreateSubOrchestrationInstance(childOrchestrationName, version: string.Empty, input: $"{input}-1"); + await ctx.CreateTimer(ctx.CurrentUtcDateTime.Add(delay), string.Empty); + string second = await ctx.CreateSubOrchestrationInstance(childOrchestrationName, version: string.Empty, input: $"{input}-2"); + return string.Join("|", first, second); + }, + activities: new[] + { + (childActivityName, TestService.MakeActivity((TaskContext ctx, string input) => $"child-{input}")), + }); + + await instance.WaitForCompletion(expectedOutput: "child-payload-1|child-payload-2"); + + incomingRequest.Stop(); + + CapturedActivity[] traceActivities = listener.GetActivities(incomingRequest.TraceId); + this.WriteCapturedActivities(traceActivities); + + // Every non-root ParentSpanId emitted for this trace must resolve to another + // captured span, otherwise Geneva can show the child span at the root. + AssertNoMissingParents(traceActivities); + + CapturedActivity parentOrchestration = GetUniqueSpan( + traceActivities, + $"orchestration:{parentOrchestrationName}", + ActivityKind.Server); + CapturedActivity[] childOrchestrationClientSpans = GetDistinctSpans( + traceActivities, + $"orchestration:{childOrchestrationName}", + ActivityKind.Client); + CapturedActivity[] childOrchestrationServerSpans = GetDistinctSpans( + traceActivities, + $"orchestration:{childOrchestrationName}", + ActivityKind.Server); + CapturedActivity[] childActivityClientSpans = GetDistinctSpans( + traceActivities, + $"activity:{childActivityName}", + ActivityKind.Client); + CapturedActivity[] childActivityServerSpans = GetDistinctSpans( + traceActivities, + $"activity:{childActivityName}", + ActivityKind.Server); + + Assert.Equal(2, childOrchestrationClientSpans.Length); + Assert.Equal(2, childOrchestrationServerSpans.Length); + Assert.Equal(2, childActivityClientSpans.Length); + Assert.Equal(2, childActivityServerSpans.Length); + + // Each sibling sub-orchestration should attach to the same parent orchestration, + // but keep its own distinct client/server/activity chain below that parent. + Assert.All(childOrchestrationClientSpans, span => Assert.Equal(parentOrchestration.SpanId, span.ParentSpanId)); + + var childOrchestrationClientIds = childOrchestrationClientSpans.Select(span => span.SpanId).ToHashSet(); + Assert.All(childOrchestrationServerSpans, span => Assert.Contains(span.ParentSpanId, childOrchestrationClientIds)); + + var childOrchestrationServerIds = childOrchestrationServerSpans.Select(span => span.SpanId).ToHashSet(); + Assert.All(childActivityClientSpans, span => Assert.Contains(span.ParentSpanId, childOrchestrationServerIds)); + + var childActivityClientIds = childActivityClientSpans.Select(span => span.SpanId).ToHashSet(); + Assert.All(childActivityServerSpans, span => Assert.Contains(span.ParentSpanId, childActivityClientIds)); } [Fact] @@ -1372,5 +1680,114 @@ public async Task ActivityTagsMergedWithOrchestrationTags() Assert.Equal("platform", capturedTags["team"]); Assert.Equal("high", capturedTags["priority"]); } + + void WriteCapturedActivities(IEnumerable activities) + { + foreach (CapturedActivity activity in activities.OrderBy(a => a.StartTimeUtc)) + { + this.outputHelper.WriteLine( + $"{activity.TraceId}/{activity.SpanId} parent={activity.ParentSpanId} kind={activity.Kind} source={activity.SourceName} op={activity.OperationName}"); + } + } + + static void AssertNoMissingParents(IReadOnlyCollection activities) + { + var knownSpanIds = activities.Select(activity => activity.SpanId).ToHashSet(); + CapturedActivity[] missingParents = activities + .Where(activity => activity.ParentSpanId != default && !knownSpanIds.Contains(activity.ParentSpanId)) + .ToArray(); + + Assert.True( + missingParents.Length == 0, + "Captured activities have missing parents:" + Environment.NewLine + + string.Join( + Environment.NewLine, + missingParents.Select(activity => + $"{activity.OperationName} [{activity.Kind}] span={activity.SpanId} parent={activity.ParentSpanId}"))); + } + + static CapturedActivity GetUniqueSpan( + IEnumerable activities, + string operationName, + ActivityKind kind) + { + CapturedActivity[] matchingSpans = GetDistinctSpans(activities, operationName, kind); + Assert.Single(matchingSpans); + return matchingSpans[0]; + } + + static CapturedActivity[] GetDistinctSpans( + IEnumerable activities, + string operationName, + ActivityKind kind) + { + return activities + .Where(activity => activity.OperationName == operationName && activity.Kind == kind) + .GroupBy(activity => activity.SpanId) + .Select(group => group.OrderByDescending(activity => activity.Duration).First()) + .ToArray(); + } + + sealed class ActivityCaptureListener : IDisposable + { + readonly object sync = new object(); + readonly HashSet sourceNames; + readonly List activities = new List(); + readonly ActivityListener listener; + + public ActivityCaptureListener(params string[] sourceNames) + { + this.sourceNames = new HashSet(sourceNames, StringComparer.Ordinal); + this.listener = new ActivityListener + { + ShouldListenTo = source => this.sourceNames.Contains(source.Name), + Sample = static (ref ActivityCreationOptions options) => ActivitySamplingResult.AllDataAndRecorded, + SampleUsingParentId = static (ref ActivityCreationOptions options) => ActivitySamplingResult.AllDataAndRecorded, + ActivityStopped = activity => + { + lock (this.sync) + { + this.activities.Add(CapturedActivity.From(activity)); + } + }, + }; + + ActivitySource.AddActivityListener(this.listener); + } + + public CapturedActivity[] GetActivities(ActivityTraceId traceId) + { + lock (this.sync) + { + return this.activities.Where(activity => activity.TraceId == traceId).ToArray(); + } + } + + public void Dispose() => this.listener.Dispose(); + } + + sealed record CapturedActivity( + string SourceName, + string OperationName, + ActivityKind Kind, + ActivityTraceId TraceId, + ActivitySpanId SpanId, + ActivitySpanId ParentSpanId, + DateTime StartTimeUtc, + TimeSpan Duration) + { + public static CapturedActivity From(Activity activity) + { + return new CapturedActivity( + activity.Source.Name, + activity.OperationName, + activity.Kind, + activity.TraceId, + activity.SpanId, + activity.ParentSpanId, + activity.StartTimeUtc, + activity.Duration); + } + } } } diff --git a/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTraceContextTests.cs b/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTraceContextTests.cs new file mode 100644 index 0000000..3cfdc6f --- /dev/null +++ b/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTraceContextTests.cs @@ -0,0 +1,158 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.SqlServer.Tests.Unit +{ + using System; + using System.Data; + using System.Data.SqlTypes; + using DurableTask.Core; + using DurableTask.Core.History; + using DurableTask.Core.Tracing; + using Xunit; + + public class SqlUtilsTraceContextTests + { + [Fact] + public void GetTraceContext_RoundTripsExtendedTraceContextFields() + { + DateTime timestamp = new DateTime(2026, 04, 15, 22, 30, 00, DateTimeKind.Utc); + var traceContext = new DistributedTraceContext( + traceParent: "00-0123456789abcdef0123456789abcdef-0123456789abcdef-01", + traceState: "vendor=value") + { + Id = "00-0123456789abcdef0123456789abcdef-fedcba9876543210-01", + SpanId = "fedcba9876543210", + }; + + var startedEvent = new ExecutionStartedEvent(-1, input: null) + { + Name = "TestOrchestration", + OrchestrationInstance = new OrchestrationInstance + { + InstanceId = "instance", + ExecutionId = "execution", + }, + ParentTraceContext = traceContext, + Timestamp = timestamp, + }; + + SqlString serialized = SqlUtils.GetTraceContext(startedEvent); + Assert.False(serialized.IsNull); + Assert.Equal( + "00-0123456789abcdef0123456789abcdef-0123456789abcdef-01\n@tracestate=vendor=value\n@id=00-0123456789abcdef0123456789abcdef-fedcba9876543210-01\n@spanid=fedcba9876543210", + serialized.Value); + + using var reader = CreateExecutionStartedReader(serialized.Value, timestamp); + Assert.True(reader.Read()); + + var roundTrippedEvent = Assert.IsType(reader.GetHistoryEvent()); + Assert.NotNull(roundTrippedEvent.ParentTraceContext); + Assert.Equal(traceContext.TraceParent, roundTrippedEvent.ParentTraceContext.TraceParent); + Assert.Equal(traceContext.TraceState, roundTrippedEvent.ParentTraceContext.TraceState); + Assert.Equal(traceContext.Id, roundTrippedEvent.ParentTraceContext.Id); + Assert.Equal(traceContext.SpanId, roundTrippedEvent.ParentTraceContext.SpanId); + Assert.Equal(new DateTimeOffset(timestamp), roundTrippedEvent.ParentTraceContext.ActivityStartTime); + } + + [Fact] + public void GetHistoryEvent_PreservesLegacyTraceContextFormat() + { + DateTime timestamp = new DateTime(2026, 04, 15, 22, 45, 00, DateTimeKind.Utc); + const string traceParent = "00-0123456789abcdef0123456789abcdef-0123456789abcdef-01"; + const string traceState = "rojo=00f067aa0ba902b7,congo=t61rcWkgMzE"; + + using var reader = CreateExecutionStartedReader($"{traceParent}\n{traceState}", timestamp); + Assert.True(reader.Read()); + + var roundTrippedEvent = Assert.IsType(reader.GetHistoryEvent()); + Assert.NotNull(roundTrippedEvent.ParentTraceContext); + Assert.Equal(traceParent, roundTrippedEvent.ParentTraceContext.TraceParent); + Assert.Equal(traceState, roundTrippedEvent.ParentTraceContext.TraceState); + Assert.Null(roundTrippedEvent.ParentTraceContext.Id); + Assert.Null(roundTrippedEvent.ParentTraceContext.SpanId); + Assert.Equal(new DateTimeOffset(timestamp), roundTrippedEvent.ParentTraceContext.ActivityStartTime); + } + + [Fact] + public void GetHistoryEvent_RoundTripsSubOrchestrationClientSpanId() + { + DateTime timestamp = new DateTime(2026, 04, 15, 23, 00, 00, DateTimeKind.Utc); + const string clientSpanId = "fedcba9876543210"; + + var createdEvent = new SubOrchestrationInstanceCreatedEvent(eventId: 7) + { + ClientSpanId = clientSpanId, + Input = "{}", + Name = "MySubOrchestration", + Timestamp = timestamp, + }; + + SqlString serialized = SqlUtils.GetTraceContext(createdEvent); + Assert.False(serialized.IsNull); + Assert.Equal("@clientspanid=fedcba9876543210", serialized.Value); + + using var reader = CreateSubOrchestrationCreatedReader(serialized.Value, timestamp); + Assert.True(reader.Read()); + + var roundTrippedEvent = Assert.IsType(reader.GetHistoryEvent()); + Assert.Equal(clientSpanId, roundTrippedEvent.ClientSpanId); + } + + static DataTableReader CreateExecutionStartedReader(string traceContext, DateTime timestamp) + { + var table = new DataTable(); + table.Columns.Add("EventType", typeof(string)); + table.Columns.Add("TaskID", typeof(int)); + table.Columns.Add("PayloadText", typeof(string)); + table.Columns.Add("Name", typeof(string)); + table.Columns.Add("InstanceID", typeof(string)); + table.Columns.Add("ExecutionID", typeof(string)); + table.Columns.Add("Tags", typeof(string)); + table.Columns.Add("Version", typeof(string)); + table.Columns.Add("TraceContext", typeof(string)); + table.Columns.Add("ParentInstanceID", typeof(string)); + table.Columns.Add("Timestamp", typeof(DateTime)); + table.Columns.Add("RuntimeStatus", typeof(string)); + + DataRow row = table.NewRow(); + row["EventType"] = EventType.ExecutionStarted.ToString(); + row["TaskID"] = -1; + row["PayloadText"] = DBNull.Value; + row["Name"] = "TestOrchestration"; + row["InstanceID"] = "instance"; + row["ExecutionID"] = "execution"; + row["Tags"] = DBNull.Value; + row["Version"] = DBNull.Value; + row["TraceContext"] = traceContext; + row["ParentInstanceID"] = DBNull.Value; + row["Timestamp"] = timestamp; + row["RuntimeStatus"] = OrchestrationStatus.Running.ToString(); + table.Rows.Add(row); + + return table.CreateDataReader(); + } + + static DataTableReader CreateSubOrchestrationCreatedReader(string traceContext, DateTime timestamp) + { + var table = new DataTable(); + table.Columns.Add("EventType", typeof(string)); + table.Columns.Add("TaskID", typeof(int)); + table.Columns.Add("PayloadText", typeof(string)); + table.Columns.Add("Name", typeof(string)); + table.Columns.Add("TraceContext", typeof(string)); + table.Columns.Add("Timestamp", typeof(DateTime)); + + DataRow row = table.NewRow(); + row["EventType"] = EventType.SubOrchestrationInstanceCreated.ToString(); + row["TaskID"] = 7; + row["PayloadText"] = "{}"; + row["Name"] = "MySubOrchestration"; + row["TraceContext"] = traceContext; + row["Timestamp"] = timestamp; + table.Rows.Add(row); + + return table.CreateDataReader(); + } + } +} From 36b62ff5184600ec46f0281e7de64f9505e78270 Mon Sep 17 00:00:00 2001 From: Chandramouleswaran Ravichandran Date: Mon, 27 Apr 2026 17:10:51 -0700 Subject: [PATCH 2/3] Retrigger CI: TerminateScheduledOrchestration is a pre-existing flaky test Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> From 4765527b3172cf7782d2f53299a230cf651bf537 Mon Sep 17 00:00:00 2001 From: Chandramouleswaran Ravichandran Date: Thu, 7 May 2026 12:38:26 -0700 Subject: [PATCH 3/3] Address Copilot review: unify TraceContext parser, tighten test assertions - Reserve line 1 of every TraceContext payload for traceparent (empty for sub-orchestration-only payloads) so all callers share a single parsing contract. Avoids the risk of treating '@clientspanid=...' as a malformed W3C traceparent if a sub-orchestration row is ever read through the general DistributedTraceContext path. - Centralize TraceContext parsing into a single ParseTraceContext helper. GetDistributedTraceContextFromReader and GetSubOrchestrationClientSpanId now share one parser, which extracts traceparent, tracestate, id, spanid, and clientspanid in one pass. Reduces drift over time. - Backward compatibility: the parser still accepts the legacy single-line '@clientspanid=...' format that was written by earlier builds, so an upgrade does not break histories already in production databases. Added a dedicated regression test for the legacy payload shape. - Tighten the sub-orchestration / activity client-span assertions in the TraceContextFlowCorrectly integration test to use Assert.Single rather than LastOrDefault. The test schedules exactly one of each, so any duplicate emission should now fail the regression deterministically. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/DurableTask.SqlServer/SqlUtils.cs | 102 +++++++++++++----- .../Integration/Orchestrations.cs | 13 ++- .../Unit/SqlUtilsTraceContextTests.cs | 24 ++++- 3 files changed, 106 insertions(+), 33 deletions(-) diff --git a/src/DurableTask.SqlServer/SqlUtils.cs b/src/DurableTask.SqlServer/SqlUtils.cs index 34ce3db..6c37c9d 100644 --- a/src/DurableTask.SqlServer/SqlUtils.cs +++ b/src/DurableTask.SqlServer/SqlUtils.cs @@ -453,7 +453,10 @@ internal static SqlString GetTraceContext(HistoryEvent e) return SqlString.Null; } - return new SqlString($"{TraceContextClientSpanIdPrefix}{subOrchestrationEvent.ClientSpanId}"); + // Reserve line 1 for traceparent (empty here) so all TraceContext payloads share + // a single parsing contract: parts[0] is always traceparent (possibly empty), + // and subsequent lines carry typed @key=value fields like @clientspanid=. + return new SqlString($"\n{TraceContextClientSpanIdPrefix}{subOrchestrationEvent.ClientSpanId}"); } if (e is not ISupportsDurableTraceContext eventWithTraceContext || @@ -492,7 +495,21 @@ internal static SqlString GetTraceContext(HistoryEvent e) return sb.ToString(); } - static DistributedTraceContext? GetTraceContext(DbDataReader reader) + /// + /// Parsed result of a TraceContext column payload. Centralizes the on-the-wire format + /// (line 1 = traceparent, subsequent lines = typed @key=value fields) so all callers + /// share the same parsing contract. + /// + struct ParsedTraceContext + { + public string? TraceParent { get; set; } + public string? TraceState { get; set; } + public string? Id { get; set; } + public string? SpanId { get; set; } + public string? ClientSpanId { get; set; } + } + + static ParsedTraceContext? ParseTraceContext(DbDataReader reader) { int ordinal = reader.GetOrdinal("TraceContext"); if (reader.IsDBNull(ordinal)) @@ -507,9 +524,28 @@ internal static SqlString GetTraceContext(HistoryEvent e) } string[] parts = text.Split(TraceContextSeparators, StringSplitOptions.None); - var traceContext = new DistributedTraceContext(traceParent: parts[0]); - for (int i = 1; i < parts.Length; i++) + string? traceParent = null; + string? traceState = null; + string? id = null; + string? spanId = null; + string? clientSpanId = null; + + // Line 1 is reserved for traceparent. Older histories may have written a typed + // "@key=value" prefix on line 1 (legacy sub-orchestration payload). Detect that + // case by checking for the @ sentinel; otherwise treat parts[0] as traceparent. + int startIndex; + if (!string.IsNullOrEmpty(parts[0]) && parts[0][0] != '@') + { + traceParent = parts[0]; + startIndex = 1; + } + else + { + startIndex = 0; + } + + for (int i = startIndex; i < parts.Length; i++) { string part = parts[i]; if (string.IsNullOrEmpty(part)) @@ -519,51 +555,61 @@ internal static SqlString GetTraceContext(HistoryEvent e) if (part.StartsWith(TraceContextTraceStatePrefix, StringComparison.Ordinal)) { - traceContext.TraceState = part.Substring(TraceContextTraceStatePrefix.Length); + traceState = part.Substring(TraceContextTraceStatePrefix.Length); } else if (part.StartsWith(TraceContextIdPrefix, StringComparison.Ordinal)) { - traceContext.Id = part.Substring(TraceContextIdPrefix.Length); + id = part.Substring(TraceContextIdPrefix.Length); } else if (part.StartsWith(TraceContextSpanIdPrefix, StringComparison.Ordinal)) { - traceContext.SpanId = part.Substring(TraceContextSpanIdPrefix.Length); + spanId = part.Substring(TraceContextSpanIdPrefix.Length); } - else if (traceContext.TraceState == null) + else if (part.StartsWith(TraceContextClientSpanIdPrefix, StringComparison.Ordinal)) + { + clientSpanId = part.Substring(TraceContextClientSpanIdPrefix.Length); + } + else if (traceState == null && i > 0) { // Preserve the legacy format, where the optional second line stored only tracestate. - traceContext.TraceState = part; + traceState = part; } } - traceContext.ActivityStartTime = GetTimestamp(reader); - return traceContext; + return new ParsedTraceContext + { + TraceParent = traceParent, + TraceState = traceState, + Id = id, + SpanId = spanId, + ClientSpanId = clientSpanId, + }; } - static string? GetSubOrchestrationClientSpanId(DbDataReader reader) + static DistributedTraceContext? GetTraceContext(DbDataReader reader) { - int ordinal = reader.GetOrdinal("TraceContext"); - if (reader.IsDBNull(ordinal)) + ParsedTraceContext? parsed = ParseTraceContext(reader); + if (parsed == null || string.IsNullOrEmpty(parsed.Value.TraceParent)) { + // No traceparent means this row carries only sub-orchestration-specific data + // (e.g. @clientspanid=...) which is not a DistributedTraceContext. return null; } - string text = reader.GetString(ordinal); - if (string.IsNullOrEmpty(text)) - { - return null; - } - - string[] parts = text.Split(TraceContextSeparators, StringSplitOptions.None); - foreach (string part in parts) + ParsedTraceContext value = parsed.Value; + var traceContext = new DistributedTraceContext(traceParent: value.TraceParent!) { - if (part.StartsWith(TraceContextClientSpanIdPrefix, StringComparison.Ordinal)) - { - return part.Substring(TraceContextClientSpanIdPrefix.Length); - } - } + TraceState = value.TraceState, + Id = value.Id, + SpanId = value.SpanId, + ActivityStartTime = GetTimestamp(reader), + }; + return traceContext; + } - return null; + static string? GetSubOrchestrationClientSpanId(DbDataReader reader) + { + return ParseTraceContext(reader)?.ClientSpanId; } internal static IDictionary? GetTags(DbDataReader reader) diff --git a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs index 31e0113..6a43061 100644 --- a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs +++ b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs @@ -806,8 +806,11 @@ public async Task TraceContextFlowCorrectly() Assert.True(subOrchestratorSpan.Duration > delay, $"Unexpected duration: {subOrchestratorSpan.Duration}"); Assert.True(subOrchestratorSpan.Duration < delay * 2, $"Unexpected duration: {subOrchestratorSpan.Duration}"); - Activity subOrchestratorClientSpan = exportedItems.LastOrDefault( - span => span.OperationName == $"orchestration:{subOrchestrationName}" && span.Kind == ActivityKind.Client); + // The test schedules exactly one sub-orchestration, so there must be exactly one + // matching client span. Asserting uniqueness (instead of LastOrDefault) makes this + // regression fail loudly if a future change ever causes a duplicate emission. + Activity subOrchestratorClientSpan = Assert.Single(exportedItems.Where( + span => span.OperationName == $"orchestration:{subOrchestrationName}" && span.Kind == ActivityKind.Client)); Assert.NotNull(subOrchestratorClientSpan); // The sub-orchestration execution span hangs off the sub-orchestration client span, @@ -827,8 +830,10 @@ public async Task TraceContextFlowCorrectly() Assert.True(activitySpan.Duration > delay); Assert.True(activitySpan.Duration < delay * 2); - Activity activityClientSpan = exportedItems.LastOrDefault( - span => span.OperationName == $"activity:{activityName}" && span.Kind == ActivityKind.Client); + // Same uniqueness rationale as the sub-orchestration client span above: the test + // schedules a single activity, so any duplicate client span would be a regression. + Activity activityClientSpan = Assert.Single(exportedItems.Where( + span => span.OperationName == $"activity:{activityName}" && span.Kind == ActivityKind.Client)); Assert.NotNull(activityClientSpan); // This is the key shape for selling the trace fix: an activity scheduled by a diff --git a/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTraceContextTests.cs b/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTraceContextTests.cs index 3cfdc6f..668322b 100644 --- a/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTraceContextTests.cs +++ b/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTraceContextTests.cs @@ -90,7 +90,9 @@ public void GetHistoryEvent_RoundTripsSubOrchestrationClientSpanId() SqlString serialized = SqlUtils.GetTraceContext(createdEvent); Assert.False(serialized.IsNull); - Assert.Equal("@clientspanid=fedcba9876543210", serialized.Value); + // Line 1 is reserved for traceparent (empty for sub-orchestration payloads) + // so all TraceContext payloads share the same parsing contract. + Assert.Equal("\n@clientspanid=fedcba9876543210", serialized.Value); using var reader = CreateSubOrchestrationCreatedReader(serialized.Value, timestamp); Assert.True(reader.Read()); @@ -99,6 +101,26 @@ public void GetHistoryEvent_RoundTripsSubOrchestrationClientSpanId() Assert.Equal(clientSpanId, roundTrippedEvent.ClientSpanId); } + // Older histories may still contain the legacy single-line "@clientspanid=..." payload + // that was written before line 1 was reserved for traceparent. This regression makes sure + // the reader keeps parsing such rows correctly, so an upgrade does not break running + // orchestrations whose history was persisted by an older build. + [Fact] + public void GetHistoryEvent_ParsesLegacyClientSpanIdPayloadWithoutLeadingNewline() + { + DateTime timestamp = new DateTime(2026, 04, 15, 23, 00, 00, DateTimeKind.Utc); + const string clientSpanId = "abcdef0123456789"; + + // Legacy on-the-wire format: the @clientspanid= prefix sits on line 1. + string legacyPayload = "@clientspanid=" + clientSpanId; + + using var reader = CreateSubOrchestrationCreatedReader(legacyPayload, timestamp); + Assert.True(reader.Read()); + + var roundTrippedEvent = Assert.IsType(reader.GetHistoryEvent()); + Assert.Equal(clientSpanId, roundTrippedEvent.ClientSpanId); + } + static DataTableReader CreateExecutionStartedReader(string traceContext, DateTime timestamp) { var table = new DataTable();