diff --git a/src/DurableTask.SqlServer/SqlUtils.cs b/src/DurableTask.SqlServer/SqlUtils.cs index 84983f4..6c37c9d 100644 --- a/src/DurableTask.SqlServer/SqlUtils.cs +++ b/src/DurableTask.SqlServer/SqlUtils.cs @@ -24,6 +24,10 @@ static class SqlUtils { static readonly Random random = new Random(); static readonly char[] TraceContextSeparators = new char[] { '\n' }; + const string TraceContextTraceStatePrefix = "@tracestate="; + const string TraceContextIdPrefix = "@id="; + const string TraceContextSpanIdPrefix = "@spanid="; + const string TraceContextClientSpanIdPrefix = "@clientspanid="; const int MaxTagsPayloadSize = 8000; public static string? GetStringOrNull(this DbDataReader reader, int columnIndex) @@ -135,6 +139,7 @@ public static HistoryEvent GetHistoryEvent(this DbDataReader reader, bool isOrch { Input = GetPayloadText(reader), InstanceId = "", // Placeholder - shouldn't technically be needed (adding it requires a SQL schema change) + ClientSpanId = GetSubOrchestrationClientSpanId(reader), Name = GetName(reader), Version = null, }; @@ -441,6 +446,19 @@ static DateTime GetUtcDateTime(DbDataReader reader, int ordinal) internal static SqlString GetTraceContext(HistoryEvent e) { + if (e is SubOrchestrationInstanceCreatedEvent subOrchestrationEvent) + { + if (string.IsNullOrEmpty(subOrchestrationEvent.ClientSpanId)) + { + return SqlString.Null; + } + + // Reserve line 1 for traceparent (empty here) so all TraceContext payloads share + // a single parsing contract: parts[0] is always traceparent (possibly empty), + // and subsequent lines carry typed @key=value fields like @clientspanid=. + return new SqlString($"\n{TraceContextClientSpanIdPrefix}{subOrchestrationEvent.ClientSpanId}"); + } + if (e is not ISupportsDurableTraceContext eventWithTraceContext || eventWithTraceContext.ParentTraceContext == null) { @@ -452,7 +470,24 @@ internal static SqlString GetTraceContext(HistoryEvent e) // We prefer a simple format instead of JSON because external callers may interact with this // data and we don't want to expose them to some internal JSON serialization format. var sb = new StringBuilder(traceContext.TraceParent, capacity: 800); - if (!string.IsNullOrEmpty(traceContext.TraceState)) + if (!string.IsNullOrEmpty(traceContext.Id) || !string.IsNullOrEmpty(traceContext.SpanId)) + { + if (!string.IsNullOrEmpty(traceContext.TraceState)) + { + sb.Append('\n').Append(TraceContextTraceStatePrefix).Append(traceContext.TraceState); + } + + if (!string.IsNullOrEmpty(traceContext.Id)) + { + sb.Append('\n').Append(TraceContextIdPrefix).Append(traceContext.Id); + } + + if (!string.IsNullOrEmpty(traceContext.SpanId)) + { + sb.Append('\n').Append(TraceContextSpanIdPrefix).Append(traceContext.SpanId); + } + } + else if (!string.IsNullOrEmpty(traceContext.TraceState)) { sb.Append('\n').Append(traceContext.TraceState); } @@ -460,7 +495,21 @@ internal static SqlString GetTraceContext(HistoryEvent e) return sb.ToString(); } - static DistributedTraceContext? GetTraceContext(DbDataReader reader) + /// + /// Parsed result of a TraceContext column payload. Centralizes the on-the-wire format + /// (line 1 = traceparent, subsequent lines = typed @key=value fields) so all callers + /// share the same parsing contract. + /// + struct ParsedTraceContext + { + public string? TraceParent { get; set; } + public string? TraceState { get; set; } + public string? Id { get; set; } + public string? SpanId { get; set; } + public string? ClientSpanId { get; set; } + } + + static ParsedTraceContext? ParseTraceContext(DbDataReader reader) { int ordinal = reader.GetOrdinal("TraceContext"); if (reader.IsDBNull(ordinal)) @@ -474,18 +523,95 @@ internal static SqlString GetTraceContext(HistoryEvent e) return null; } - string[] parts = text.Split(TraceContextSeparators, count: 2, StringSplitOptions.RemoveEmptyEntries); - var traceContext = new DistributedTraceContext(traceParent: parts[0]); + string[] parts = text.Split(TraceContextSeparators, StringSplitOptions.None); + + string? traceParent = null; + string? traceState = null; + string? id = null; + string? spanId = null; + string? clientSpanId = null; - if (parts.Length > 1) + // Line 1 is reserved for traceparent. Older histories may have written a typed + // "@key=value" prefix on line 1 (legacy sub-orchestration payload). Detect that + // case by checking for the @ sentinel; otherwise treat parts[0] as traceparent. + int startIndex; + if (!string.IsNullOrEmpty(parts[0]) && parts[0][0] != '@') { - traceContext.TraceState = parts[1]; + traceParent = parts[0]; + startIndex = 1; } + else + { + startIndex = 0; + } + + for (int i = startIndex; i < parts.Length; i++) + { + string part = parts[i]; + if (string.IsNullOrEmpty(part)) + { + continue; + } - traceContext.ActivityStartTime = GetTimestamp(reader); + if (part.StartsWith(TraceContextTraceStatePrefix, StringComparison.Ordinal)) + { + traceState = part.Substring(TraceContextTraceStatePrefix.Length); + } + else if (part.StartsWith(TraceContextIdPrefix, StringComparison.Ordinal)) + { + id = part.Substring(TraceContextIdPrefix.Length); + } + else if (part.StartsWith(TraceContextSpanIdPrefix, StringComparison.Ordinal)) + { + spanId = part.Substring(TraceContextSpanIdPrefix.Length); + } + else if (part.StartsWith(TraceContextClientSpanIdPrefix, StringComparison.Ordinal)) + { + clientSpanId = part.Substring(TraceContextClientSpanIdPrefix.Length); + } + else if (traceState == null && i > 0) + { + // Preserve the legacy format, where the optional second line stored only tracestate. + traceState = part; + } + } + + return new ParsedTraceContext + { + TraceParent = traceParent, + TraceState = traceState, + Id = id, + SpanId = spanId, + ClientSpanId = clientSpanId, + }; + } + + static DistributedTraceContext? GetTraceContext(DbDataReader reader) + { + ParsedTraceContext? parsed = ParseTraceContext(reader); + if (parsed == null || string.IsNullOrEmpty(parsed.Value.TraceParent)) + { + // No traceparent means this row carries only sub-orchestration-specific data + // (e.g. @clientspanid=...) which is not a DistributedTraceContext. + return null; + } + + ParsedTraceContext value = parsed.Value; + var traceContext = new DistributedTraceContext(traceParent: value.TraceParent!) + { + TraceState = value.TraceState, + Id = value.Id, + SpanId = value.SpanId, + ActivityStartTime = GetTimestamp(reader), + }; return traceContext; } + static string? GetSubOrchestrationClientSpanId(DbDataReader reader) + { + return ParseTraceContext(reader)?.ClientSpanId; + } + internal static IDictionary? GetTags(DbDataReader reader) { int ordinal = reader.GetOrdinal("Tags"); diff --git a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs index e80f8b5..6a43061 100644 --- a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs +++ b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs @@ -690,6 +690,12 @@ public async Task RetryFailedSubOrchestration(bool userSpecifiedInstanceId) await parentInstance.WaitForCompletion(expectedOutput: true); } + // This regression locks down the expected DurableTask span shape for a simple + // parent orchestration -> sub-orchestration -> activity flow. The important + // nuance is that an activity under a sub-orchestration is nested via an + // activity client span, so the hierarchy is: + // parent orchestration server -> sub-orchestration client -> sub-orchestration server + // -> activity client -> activity server. [Fact] public async Task TraceContextFlowCorrectly() { @@ -800,6 +806,18 @@ public async Task TraceContextFlowCorrectly() Assert.True(subOrchestratorSpan.Duration > delay, $"Unexpected duration: {subOrchestratorSpan.Duration}"); Assert.True(subOrchestratorSpan.Duration < delay * 2, $"Unexpected duration: {subOrchestratorSpan.Duration}"); + // The test schedules exactly one sub-orchestration, so there must be exactly one + // matching client span. Asserting uniqueness (instead of LastOrDefault) makes this + // regression fail loudly if a future change ever causes a duplicate emission. + Activity subOrchestratorClientSpan = Assert.Single(exportedItems.Where( + span => span.OperationName == $"orchestration:{subOrchestrationName}" && span.Kind == ActivityKind.Client)); + Assert.NotNull(subOrchestratorClientSpan); + + // The sub-orchestration execution span hangs off the sub-orchestration client span, + // which in turn hangs off the parent orchestration execution span. + Assert.Equal(orchestratorSpan.SpanId, subOrchestratorClientSpan.ParentSpanId); + Assert.Equal(subOrchestratorClientSpan.SpanId, subOrchestratorSpan.ParentSpanId); + // Validate the activity span, which should be a subset of the sub-orchestration span Activity activitySpan = exportedItems.LastOrDefault( span => span.OperationName == $"activity:{activityName}" && span.Kind == ActivityKind.Server); @@ -811,6 +829,301 @@ public async Task TraceContextFlowCorrectly() Assert.True(activitySpan.Duration < subOrchestratorSpan.Duration); Assert.True(activitySpan.Duration > delay); Assert.True(activitySpan.Duration < delay * 2); + + // Same uniqueness rationale as the sub-orchestration client span above: the test + // schedules a single activity, so any duplicate client span would be a regression. + Activity activityClientSpan = Assert.Single(exportedItems.Where( + span => span.OperationName == $"activity:{activityName}" && span.Kind == ActivityKind.Client)); + Assert.NotNull(activityClientSpan); + + // This is the key shape for selling the trace fix: an activity scheduled by a + // sub-orchestrator must remain inside that sub-orchestrator's execution span + // hierarchy rather than floating at the root with a dangling parent. + Assert.Equal(subOrchestratorSpan.SpanId, activityClientSpan.ParentSpanId); + Assert.Equal(activityClientSpan.SpanId, activitySpan.ParentSpanId); + } + + // This regression protects the SQL-specific bug where an orchestration reload could + // generate a fresh execution span ID, leaving later activity completion and timer + // spans pointing at a parent that no longer exists in the exported trace. + [Fact] + public async Task TraceContextMaintainsStableOrchestrationSpanAcrossContinuations() + { + string traceSourceName = "MyTraceSource"; + string orchestrationName = "ContinuationTraceContextOrchestration"; + string[] activityNames = { "FirstActivity", "SecondActivity" }; + TimeSpan delay = TimeSpan.FromMilliseconds(250); + + var exportedItems = new List(); + using TracerProvider tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource(traceSourceName, "DurableTask.Core") + .ConfigureResource(r => r.AddService("Test")) + .AddInMemoryExporter(exportedItems) + .Build(); + + using var traceSource = new ActivitySource(traceSourceName); + using var clientSpan = traceSource.StartActivity("TestSpan"); + clientSpan.TraceStateString = "TestTraceState"; + + TestInstance instance = await this.testService.RunOrchestration( + input: "input", + orchestrationName: orchestrationName, + implementation: async (ctx, input) => + { + string first = await ctx.ScheduleTask(activityNames[0], "", input); + await ctx.CreateTimer(ctx.CurrentUtcDateTime.Add(delay), ""); + string second = await ctx.ScheduleTask(activityNames[1], "", input); + await ctx.CreateTimer(ctx.CurrentUtcDateTime.Add(delay), ""); + return string.Join(",", first, second); + }, + activities: new[] + { + (activityNames[0], TestService.MakeActivity((TaskContext ctx, string input) => $"first-{input}")), + (activityNames[1], TestService.MakeActivity((TaskContext ctx, string input) => $"second-{input}")), + }); + + await instance.WaitForCompletion(expectedOutput: "first-input,second-input"); + + clientSpan.Stop(); + tracerProvider.ForceFlush(); + + Activity[] orchestrationSpans = exportedItems + .Where(span => span.OperationName == $"orchestration:{orchestrationName}" && span.Kind == ActivityKind.Server) + .ToArray(); + Assert.NotEmpty(orchestrationSpans); + + ActivitySpanId[] orchestrationSpanIds = orchestrationSpans + .Select(span => span.SpanId) + .Distinct() + .ToArray(); + Assert.Single(orchestrationSpanIds); + + ActivitySpanId orchestrationSpanId = orchestrationSpanIds[0]; + var activityOperationNames = new HashSet(activityNames.Select(name => $"activity:{name}")); + + Activity[] completionSpans = exportedItems + .Where(span => span.Kind == ActivityKind.Client && activityOperationNames.Contains(span.OperationName)) + .ToArray(); + Assert.Equal(2, completionSpans.Length); + Assert.All(completionSpans, span => Assert.Equal(orchestrationSpanId, span.ParentSpanId)); + + Activity[] timerSpans = exportedItems + .Where(span => span.Kind == ActivityKind.Internal && span.OperationName == $"orchestration:{orchestrationName}:timer") + .ToArray(); + Assert.Equal(2, timerSpans.Length); + Assert.All(timerSpans, span => Assert.Equal(orchestrationSpanId, span.ParentSpanId)); + } + + // This test observes raw Activity objects directly, without relying on exporter + // behavior, so it can fail fast if any emitted ParentSpanId refers to a span that + // was never actually produced. The hierarchy intentionally exercises nested + // sub-orchestrations plus an activity at the deepest level. + [Fact] + public async Task ActivityListenerCapturesNestedSubOrchestrationHierarchyWithoutMissingParents() + { + string traceSourceName = "ActivityListenerNestedTraceSource"; + string parentOrchestrationName = "ActivityListenerParentOrchestration"; + string childOrchestrationName = "ActivityListenerChildOrchestration"; + string grandchildOrchestrationName = "ActivityListenerGrandchildOrchestration"; + string[] activityNames = + { + "ActivityListenerParentActivity", + "ActivityListenerChildActivity", + "ActivityListenerGrandchildActivity", + "ActivityListenerAfterSubOrchestrationActivity", + }; + TimeSpan delay = TimeSpan.FromMilliseconds(150); + + using var listener = new ActivityCaptureListener(traceSourceName, "DurableTask.Core"); + using var traceSource = new ActivitySource(traceSourceName); + using Activity incomingRequest = traceSource.StartActivity("IncomingRequest") ?? + throw new InvalidOperationException("Failed to start the incoming request activity."); + + this.testService.RegisterInlineOrchestration( + grandchildOrchestrationName, + implementation: async (ctx, input) => + { + await ctx.CreateTimer(ctx.CurrentUtcDateTime.Add(delay), string.Empty); + return await ctx.ScheduleTask(activityNames[2], version: string.Empty, input); + }); + + this.testService.RegisterInlineOrchestration( + childOrchestrationName, + implementation: async (ctx, input) => + { + string childActivity = await ctx.ScheduleTask(activityNames[1], version: string.Empty, input); + await ctx.CreateTimer(ctx.CurrentUtcDateTime.Add(delay), string.Empty); + string grandchild = await ctx.CreateSubOrchestrationInstance(grandchildOrchestrationName, version: string.Empty, input); + return string.Join("|", childActivity, grandchild); + }); + + TestInstance instance = await this.testService.RunOrchestration( + input: "payload", + orchestrationName: parentOrchestrationName, + implementation: async (ctx, input) => + { + string parentActivity = await ctx.ScheduleTask(activityNames[0], version: string.Empty, input); + await ctx.CreateTimer(ctx.CurrentUtcDateTime.Add(delay), string.Empty); + string child = await ctx.CreateSubOrchestrationInstance(childOrchestrationName, version: string.Empty, input); + await ctx.CreateTimer(ctx.CurrentUtcDateTime.Add(delay), string.Empty); + string afterChild = await ctx.ScheduleTask(activityNames[3], version: string.Empty, input); + return string.Join("|", parentActivity, child, afterChild); + }, + activities: new[] + { + (activityNames[0], TestService.MakeActivity((TaskContext ctx, string input) => $"parent-{input}")), + (activityNames[1], TestService.MakeActivity((TaskContext ctx, string input) => $"child-{input}")), + (activityNames[2], TestService.MakeActivity((TaskContext ctx, string input) => $"grandchild-{input}")), + (activityNames[3], TestService.MakeActivity((TaskContext ctx, string input) => $"after-{input}")), + }); + + await instance.WaitForCompletion(expectedOutput: "parent-payload|child-payload|grandchild-payload|after-payload"); + + incomingRequest.Stop(); + + CapturedActivity[] traceActivities = listener.GetActivities(incomingRequest.TraceId); + this.WriteCapturedActivities(traceActivities); + + // The bug we are guarding against is "parentSpanId emitted, but the parent span + // does not exist". This assertion checks exactly that against the raw listener feed. + AssertNoMissingParents(traceActivities); + + CapturedActivity parentOrchestration = GetUniqueSpan( + traceActivities, + $"orchestration:{parentOrchestrationName}", + ActivityKind.Server); + CapturedActivity childOrchestrationClient = GetUniqueSpan( + traceActivities, + $"orchestration:{childOrchestrationName}", + ActivityKind.Client); + CapturedActivity childOrchestrationServer = GetUniqueSpan( + traceActivities, + $"orchestration:{childOrchestrationName}", + ActivityKind.Server); + CapturedActivity grandchildOrchestrationClient = GetUniqueSpan( + traceActivities, + $"orchestration:{grandchildOrchestrationName}", + ActivityKind.Client); + CapturedActivity grandchildOrchestrationServer = GetUniqueSpan( + traceActivities, + $"orchestration:{grandchildOrchestrationName}", + ActivityKind.Server); + CapturedActivity grandchildActivityClient = GetUniqueSpan( + traceActivities, + $"activity:{activityNames[2]}", + ActivityKind.Client); + CapturedActivity grandchildActivityServer = GetUniqueSpan( + traceActivities, + $"activity:{activityNames[2]}", + ActivityKind.Server); + + // Expected nesting: + // parent orchestration server + // -> child sub-orchestration client + // -> child sub-orchestration server + // -> grandchild sub-orchestration client + // -> grandchild sub-orchestration server + // -> grandchild activity client + // -> grandchild activity server + Assert.Equal(parentOrchestration.SpanId, childOrchestrationClient.ParentSpanId); + Assert.Equal(childOrchestrationClient.SpanId, childOrchestrationServer.ParentSpanId); + Assert.Equal(childOrchestrationServer.SpanId, grandchildOrchestrationClient.ParentSpanId); + Assert.Equal(grandchildOrchestrationClient.SpanId, grandchildOrchestrationServer.ParentSpanId); + Assert.Equal(grandchildOrchestrationServer.SpanId, grandchildActivityClient.ParentSpanId); + Assert.Equal(grandchildActivityClient.SpanId, grandchildActivityServer.ParentSpanId); + } + + // This test covers sibling sub-orchestrations that run under the same parent. It + // ensures each invocation gets its own client/server pair and that the activities + // inside each child orchestration remain attached to the corresponding child span + // instead of pointing to a missing or reused parent. + [Fact] + public async Task ActivityListenerCapturesRepeatedSubOrchestrationsWithoutMissingParents() + { + string traceSourceName = "ActivityListenerRepeatedTraceSource"; + string parentOrchestrationName = "ActivityListenerRepeatedParentOrchestration"; + string childOrchestrationName = "ActivityListenerRepeatedChildOrchestration"; + string childActivityName = "ActivityListenerRepeatedChildActivity"; + TimeSpan delay = TimeSpan.FromMilliseconds(125); + + using var listener = new ActivityCaptureListener(traceSourceName, "DurableTask.Core"); + using var traceSource = new ActivitySource(traceSourceName); + using Activity incomingRequest = traceSource.StartActivity("IncomingRequest") ?? + throw new InvalidOperationException("Failed to start the incoming request activity."); + + this.testService.RegisterInlineOrchestration( + childOrchestrationName, + implementation: async (ctx, input) => + { + string activityResult = await ctx.ScheduleTask(childActivityName, version: string.Empty, input); + await ctx.CreateTimer(ctx.CurrentUtcDateTime.Add(delay), string.Empty); + return activityResult; + }); + + TestInstance instance = await this.testService.RunOrchestration( + input: "payload", + orchestrationName: parentOrchestrationName, + implementation: async (ctx, input) => + { + string first = await ctx.CreateSubOrchestrationInstance(childOrchestrationName, version: string.Empty, input: $"{input}-1"); + await ctx.CreateTimer(ctx.CurrentUtcDateTime.Add(delay), string.Empty); + string second = await ctx.CreateSubOrchestrationInstance(childOrchestrationName, version: string.Empty, input: $"{input}-2"); + return string.Join("|", first, second); + }, + activities: new[] + { + (childActivityName, TestService.MakeActivity((TaskContext ctx, string input) => $"child-{input}")), + }); + + await instance.WaitForCompletion(expectedOutput: "child-payload-1|child-payload-2"); + + incomingRequest.Stop(); + + CapturedActivity[] traceActivities = listener.GetActivities(incomingRequest.TraceId); + this.WriteCapturedActivities(traceActivities); + + // Every non-root ParentSpanId emitted for this trace must resolve to another + // captured span, otherwise Geneva can show the child span at the root. + AssertNoMissingParents(traceActivities); + + CapturedActivity parentOrchestration = GetUniqueSpan( + traceActivities, + $"orchestration:{parentOrchestrationName}", + ActivityKind.Server); + CapturedActivity[] childOrchestrationClientSpans = GetDistinctSpans( + traceActivities, + $"orchestration:{childOrchestrationName}", + ActivityKind.Client); + CapturedActivity[] childOrchestrationServerSpans = GetDistinctSpans( + traceActivities, + $"orchestration:{childOrchestrationName}", + ActivityKind.Server); + CapturedActivity[] childActivityClientSpans = GetDistinctSpans( + traceActivities, + $"activity:{childActivityName}", + ActivityKind.Client); + CapturedActivity[] childActivityServerSpans = GetDistinctSpans( + traceActivities, + $"activity:{childActivityName}", + ActivityKind.Server); + + Assert.Equal(2, childOrchestrationClientSpans.Length); + Assert.Equal(2, childOrchestrationServerSpans.Length); + Assert.Equal(2, childActivityClientSpans.Length); + Assert.Equal(2, childActivityServerSpans.Length); + + // Each sibling sub-orchestration should attach to the same parent orchestration, + // but keep its own distinct client/server/activity chain below that parent. + Assert.All(childOrchestrationClientSpans, span => Assert.Equal(parentOrchestration.SpanId, span.ParentSpanId)); + + var childOrchestrationClientIds = childOrchestrationClientSpans.Select(span => span.SpanId).ToHashSet(); + Assert.All(childOrchestrationServerSpans, span => Assert.Contains(span.ParentSpanId, childOrchestrationClientIds)); + + var childOrchestrationServerIds = childOrchestrationServerSpans.Select(span => span.SpanId).ToHashSet(); + Assert.All(childActivityClientSpans, span => Assert.Contains(span.ParentSpanId, childOrchestrationServerIds)); + + var childActivityClientIds = childActivityClientSpans.Select(span => span.SpanId).ToHashSet(); + Assert.All(childActivityServerSpans, span => Assert.Contains(span.ParentSpanId, childActivityClientIds)); } [Fact] @@ -1372,5 +1685,114 @@ public async Task ActivityTagsMergedWithOrchestrationTags() Assert.Equal("platform", capturedTags["team"]); Assert.Equal("high", capturedTags["priority"]); } + + void WriteCapturedActivities(IEnumerable activities) + { + foreach (CapturedActivity activity in activities.OrderBy(a => a.StartTimeUtc)) + { + this.outputHelper.WriteLine( + $"{activity.TraceId}/{activity.SpanId} parent={activity.ParentSpanId} kind={activity.Kind} source={activity.SourceName} op={activity.OperationName}"); + } + } + + static void AssertNoMissingParents(IReadOnlyCollection activities) + { + var knownSpanIds = activities.Select(activity => activity.SpanId).ToHashSet(); + CapturedActivity[] missingParents = activities + .Where(activity => activity.ParentSpanId != default && !knownSpanIds.Contains(activity.ParentSpanId)) + .ToArray(); + + Assert.True( + missingParents.Length == 0, + "Captured activities have missing parents:" + Environment.NewLine + + string.Join( + Environment.NewLine, + missingParents.Select(activity => + $"{activity.OperationName} [{activity.Kind}] span={activity.SpanId} parent={activity.ParentSpanId}"))); + } + + static CapturedActivity GetUniqueSpan( + IEnumerable activities, + string operationName, + ActivityKind kind) + { + CapturedActivity[] matchingSpans = GetDistinctSpans(activities, operationName, kind); + Assert.Single(matchingSpans); + return matchingSpans[0]; + } + + static CapturedActivity[] GetDistinctSpans( + IEnumerable activities, + string operationName, + ActivityKind kind) + { + return activities + .Where(activity => activity.OperationName == operationName && activity.Kind == kind) + .GroupBy(activity => activity.SpanId) + .Select(group => group.OrderByDescending(activity => activity.Duration).First()) + .ToArray(); + } + + sealed class ActivityCaptureListener : IDisposable + { + readonly object sync = new object(); + readonly HashSet sourceNames; + readonly List activities = new List(); + readonly ActivityListener listener; + + public ActivityCaptureListener(params string[] sourceNames) + { + this.sourceNames = new HashSet(sourceNames, StringComparer.Ordinal); + this.listener = new ActivityListener + { + ShouldListenTo = source => this.sourceNames.Contains(source.Name), + Sample = static (ref ActivityCreationOptions options) => ActivitySamplingResult.AllDataAndRecorded, + SampleUsingParentId = static (ref ActivityCreationOptions options) => ActivitySamplingResult.AllDataAndRecorded, + ActivityStopped = activity => + { + lock (this.sync) + { + this.activities.Add(CapturedActivity.From(activity)); + } + }, + }; + + ActivitySource.AddActivityListener(this.listener); + } + + public CapturedActivity[] GetActivities(ActivityTraceId traceId) + { + lock (this.sync) + { + return this.activities.Where(activity => activity.TraceId == traceId).ToArray(); + } + } + + public void Dispose() => this.listener.Dispose(); + } + + sealed record CapturedActivity( + string SourceName, + string OperationName, + ActivityKind Kind, + ActivityTraceId TraceId, + ActivitySpanId SpanId, + ActivitySpanId ParentSpanId, + DateTime StartTimeUtc, + TimeSpan Duration) + { + public static CapturedActivity From(Activity activity) + { + return new CapturedActivity( + activity.Source.Name, + activity.OperationName, + activity.Kind, + activity.TraceId, + activity.SpanId, + activity.ParentSpanId, + activity.StartTimeUtc, + activity.Duration); + } + } } } diff --git a/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTraceContextTests.cs b/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTraceContextTests.cs new file mode 100644 index 0000000..668322b --- /dev/null +++ b/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTraceContextTests.cs @@ -0,0 +1,180 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.SqlServer.Tests.Unit +{ + using System; + using System.Data; + using System.Data.SqlTypes; + using DurableTask.Core; + using DurableTask.Core.History; + using DurableTask.Core.Tracing; + using Xunit; + + public class SqlUtilsTraceContextTests + { + [Fact] + public void GetTraceContext_RoundTripsExtendedTraceContextFields() + { + DateTime timestamp = new DateTime(2026, 04, 15, 22, 30, 00, DateTimeKind.Utc); + var traceContext = new DistributedTraceContext( + traceParent: "00-0123456789abcdef0123456789abcdef-0123456789abcdef-01", + traceState: "vendor=value") + { + Id = "00-0123456789abcdef0123456789abcdef-fedcba9876543210-01", + SpanId = "fedcba9876543210", + }; + + var startedEvent = new ExecutionStartedEvent(-1, input: null) + { + Name = "TestOrchestration", + OrchestrationInstance = new OrchestrationInstance + { + InstanceId = "instance", + ExecutionId = "execution", + }, + ParentTraceContext = traceContext, + Timestamp = timestamp, + }; + + SqlString serialized = SqlUtils.GetTraceContext(startedEvent); + Assert.False(serialized.IsNull); + Assert.Equal( + "00-0123456789abcdef0123456789abcdef-0123456789abcdef-01\n@tracestate=vendor=value\n@id=00-0123456789abcdef0123456789abcdef-fedcba9876543210-01\n@spanid=fedcba9876543210", + serialized.Value); + + using var reader = CreateExecutionStartedReader(serialized.Value, timestamp); + Assert.True(reader.Read()); + + var roundTrippedEvent = Assert.IsType(reader.GetHistoryEvent()); + Assert.NotNull(roundTrippedEvent.ParentTraceContext); + Assert.Equal(traceContext.TraceParent, roundTrippedEvent.ParentTraceContext.TraceParent); + Assert.Equal(traceContext.TraceState, roundTrippedEvent.ParentTraceContext.TraceState); + Assert.Equal(traceContext.Id, roundTrippedEvent.ParentTraceContext.Id); + Assert.Equal(traceContext.SpanId, roundTrippedEvent.ParentTraceContext.SpanId); + Assert.Equal(new DateTimeOffset(timestamp), roundTrippedEvent.ParentTraceContext.ActivityStartTime); + } + + [Fact] + public void GetHistoryEvent_PreservesLegacyTraceContextFormat() + { + DateTime timestamp = new DateTime(2026, 04, 15, 22, 45, 00, DateTimeKind.Utc); + const string traceParent = "00-0123456789abcdef0123456789abcdef-0123456789abcdef-01"; + const string traceState = "rojo=00f067aa0ba902b7,congo=t61rcWkgMzE"; + + using var reader = CreateExecutionStartedReader($"{traceParent}\n{traceState}", timestamp); + Assert.True(reader.Read()); + + var roundTrippedEvent = Assert.IsType(reader.GetHistoryEvent()); + Assert.NotNull(roundTrippedEvent.ParentTraceContext); + Assert.Equal(traceParent, roundTrippedEvent.ParentTraceContext.TraceParent); + Assert.Equal(traceState, roundTrippedEvent.ParentTraceContext.TraceState); + Assert.Null(roundTrippedEvent.ParentTraceContext.Id); + Assert.Null(roundTrippedEvent.ParentTraceContext.SpanId); + Assert.Equal(new DateTimeOffset(timestamp), roundTrippedEvent.ParentTraceContext.ActivityStartTime); + } + + [Fact] + public void GetHistoryEvent_RoundTripsSubOrchestrationClientSpanId() + { + DateTime timestamp = new DateTime(2026, 04, 15, 23, 00, 00, DateTimeKind.Utc); + const string clientSpanId = "fedcba9876543210"; + + var createdEvent = new SubOrchestrationInstanceCreatedEvent(eventId: 7) + { + ClientSpanId = clientSpanId, + Input = "{}", + Name = "MySubOrchestration", + Timestamp = timestamp, + }; + + SqlString serialized = SqlUtils.GetTraceContext(createdEvent); + Assert.False(serialized.IsNull); + // Line 1 is reserved for traceparent (empty for sub-orchestration payloads) + // so all TraceContext payloads share the same parsing contract. + Assert.Equal("\n@clientspanid=fedcba9876543210", serialized.Value); + + using var reader = CreateSubOrchestrationCreatedReader(serialized.Value, timestamp); + Assert.True(reader.Read()); + + var roundTrippedEvent = Assert.IsType(reader.GetHistoryEvent()); + Assert.Equal(clientSpanId, roundTrippedEvent.ClientSpanId); + } + + // Older histories may still contain the legacy single-line "@clientspanid=..." payload + // that was written before line 1 was reserved for traceparent. This regression makes sure + // the reader keeps parsing such rows correctly, so an upgrade does not break running + // orchestrations whose history was persisted by an older build. + [Fact] + public void GetHistoryEvent_ParsesLegacyClientSpanIdPayloadWithoutLeadingNewline() + { + DateTime timestamp = new DateTime(2026, 04, 15, 23, 00, 00, DateTimeKind.Utc); + const string clientSpanId = "abcdef0123456789"; + + // Legacy on-the-wire format: the @clientspanid= prefix sits on line 1. + string legacyPayload = "@clientspanid=" + clientSpanId; + + using var reader = CreateSubOrchestrationCreatedReader(legacyPayload, timestamp); + Assert.True(reader.Read()); + + var roundTrippedEvent = Assert.IsType(reader.GetHistoryEvent()); + Assert.Equal(clientSpanId, roundTrippedEvent.ClientSpanId); + } + + static DataTableReader CreateExecutionStartedReader(string traceContext, DateTime timestamp) + { + var table = new DataTable(); + table.Columns.Add("EventType", typeof(string)); + table.Columns.Add("TaskID", typeof(int)); + table.Columns.Add("PayloadText", typeof(string)); + table.Columns.Add("Name", typeof(string)); + table.Columns.Add("InstanceID", typeof(string)); + table.Columns.Add("ExecutionID", typeof(string)); + table.Columns.Add("Tags", typeof(string)); + table.Columns.Add("Version", typeof(string)); + table.Columns.Add("TraceContext", typeof(string)); + table.Columns.Add("ParentInstanceID", typeof(string)); + table.Columns.Add("Timestamp", typeof(DateTime)); + table.Columns.Add("RuntimeStatus", typeof(string)); + + DataRow row = table.NewRow(); + row["EventType"] = EventType.ExecutionStarted.ToString(); + row["TaskID"] = -1; + row["PayloadText"] = DBNull.Value; + row["Name"] = "TestOrchestration"; + row["InstanceID"] = "instance"; + row["ExecutionID"] = "execution"; + row["Tags"] = DBNull.Value; + row["Version"] = DBNull.Value; + row["TraceContext"] = traceContext; + row["ParentInstanceID"] = DBNull.Value; + row["Timestamp"] = timestamp; + row["RuntimeStatus"] = OrchestrationStatus.Running.ToString(); + table.Rows.Add(row); + + return table.CreateDataReader(); + } + + static DataTableReader CreateSubOrchestrationCreatedReader(string traceContext, DateTime timestamp) + { + var table = new DataTable(); + table.Columns.Add("EventType", typeof(string)); + table.Columns.Add("TaskID", typeof(int)); + table.Columns.Add("PayloadText", typeof(string)); + table.Columns.Add("Name", typeof(string)); + table.Columns.Add("TraceContext", typeof(string)); + table.Columns.Add("Timestamp", typeof(DateTime)); + + DataRow row = table.NewRow(); + row["EventType"] = EventType.SubOrchestrationInstanceCreated.ToString(); + row["TaskID"] = 7; + row["PayloadText"] = "{}"; + row["Name"] = "MySubOrchestration"; + row["TraceContext"] = traceContext; + row["Timestamp"] = timestamp; + table.Rows.Add(row); + + return table.CreateDataReader(); + } + } +}