diff --git a/src/workerd/io/io-channels.h b/src/workerd/io/io-channels.h index 179209b10a4..4274b0ed1c9 100644 --- a/src/workerd/io/io-channels.h +++ b/src/workerd/io/io-channels.h @@ -107,6 +107,9 @@ class IoChannelFactory { // Specifies the parent span for the subrequest for tracing purposes. SpanParent parentSpan = SpanParent(nullptr); + // User Span Context for trace propagation. + kj::Maybe userSpanContext; + // Serialized JSON value to pass in ew_compat field of control header to FL. If this subrequest // does not go directly to FL, this value is ignored. Flags marked with `$neededByFl` in // `compatibility-date.capnp` end up here. diff --git a/src/workerd/io/io-context.c++ b/src/workerd/io/io-context.c++ index 82582a0c2cc..cc6d2c78528 100644 --- a/src/workerd/io/io-context.c++ +++ b/src/workerd/io/io-context.c++ @@ -1008,9 +1008,17 @@ kj::Own IoContext::getSubrequestChannelImpl(uint channel, kj::Maybe cfBlobJson, TraceContext& tracing, IoChannelFactory& channelFactory) { + const auto& invCtx = getInvocationSpanContext(); + auto traceId = invCtx.getTraceId(); + auto spanId = invCtx.getSpanId(); + KJ_IF_SOME(observer, tracing.getUserSpanParent().getObserver()) { + auto& userObs = kj::downcast(observer); + spanId = userObs.getSpanId(); + } IoChannelFactory::SubrequestMetadata metadata{ .cfBlobJson = kj::mv(cfBlobJson), .parentSpan = tracing.getInternalSpanParent(), + .userSpanContext = tracing::SpanContext(traceId, spanId), .featureFlagsForFl = mapCopyString(worker->getIsolate().getFeatureFlagsForFl()), }; diff --git a/src/workerd/io/trace-stream.c++ b/src/workerd/io/trace-stream.c++ index cb7dcae0dae..6a1e6d46736 100644 --- a/src/workerd/io/trace-stream.c++ +++ b/src/workerd/io/trace-stream.c++ @@ -1076,8 +1076,8 @@ bool TailStreamWriter::reportImpl(TailEvent&& event, size_t sizeHint) { // event indicating how many events were dropped if applicable. if (event.event.is() && active->droppedEvents > 0) { StreamDiagnosticsEvent diag(active->droppedEvents); - TailEvent diagTailEvent(event.spanContext.clone(), event.invocationId, event.timestamp, - event.sequence, kj::mv(diag)); + TailEvent diagTailEvent(SpanContext::clone(event.spanContext), event.invocationId, + event.timestamp, event.sequence, kj::mv(diag)); active->queue.push(kj::mv(diagTailEvent)); // Increment the outcome sequence number to keep things consistent. event.sequence++; diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index 32f72e026b6..0ff5c22c7f1 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -309,10 +309,6 @@ void SpanContext::toCapnp(rpc::SpanContext::Builder writer) const { } } -SpanContext SpanContext::clone() const { - return SpanContext(traceId, spanId); -} - kj::String KJ_STRINGIFY(const SpanContext& context) { return kj::str(context.getTraceId(), "-", context.getSpanId()); } diff --git a/src/workerd/io/trace.h b/src/workerd/io/trace.h index e6377dc6e70..3fb4e43dde6 100644 --- a/src/workerd/io/trace.h +++ b/src/workerd/io/trace.h @@ -290,7 +290,9 @@ struct SpanContext { static SpanContext fromCapnp(rpc::SpanContext::Reader reader); void toCapnp(rpc::SpanContext::Builder writer) const; - SpanContext clone() const; + static SpanContext clone(const SpanContext& ctx) { + return SpanContext(ctx.traceId, ctx.spanId); + } private: TraceId traceId; @@ -1213,6 +1215,10 @@ class TraceContext { return SpanParent(span); } + SpanParent getUserSpanParent() { + return SpanParent(userSpan); + } + private: SpanBuilder span; SpanBuilder userSpan; diff --git a/src/workerd/io/tracer.c++ b/src/workerd/io/tracer.c++ index eb4d9a56228..513cd29f5a7 100644 --- a/src/workerd/io/tracer.c++ +++ b/src/workerd/io/tracer.c++ @@ -369,13 +369,17 @@ void WorkerTracer::setEventInfoInternal( .entrypoint = mapCopyString(trace->entrypoint), }; + tracing::SpanId parentSpanId = tracing::SpanId::nullId; + KJ_IF_SOME(trigger, context.getParent()) { + parentSpanId = trigger.getSpanId(); + } // Onset needs special handling for spanId: The top-level spanId is zero unless a trigger - // context is available (not yet implemented). The inner spanId is taken from the invocation + // context is available. The inner spanId is taken from the invocation // span context, that span is being "opened" with the onset event. All other tail events have it // as its parent span ID, except for recursive SpanOpens (which have the parent span instead) // and Attribute/SpanClose events (which have the spanId opened in the corresponding SpanOpen). auto onsetContext = tracing::InvocationSpanContext( - context.getTraceId(), context.getInvocationId(), tracing::SpanId::nullId); + context.getTraceId(), context.getInvocationId(), parentSpanId); // Not applying size accounting for Onset since it is sent separately writer->report(onsetContext, @@ -627,4 +631,8 @@ kj::Date UserSpanObserver::getTime() { return IoContext::current().now(); } +tracing::SpanId UserSpanObserver::getSpanId() { + return spanId; +} + } // namespace workerd diff --git a/src/workerd/io/tracer.h b/src/workerd/io/tracer.h index 061b7af3338..1284109806e 100644 --- a/src/workerd/io/tracer.h +++ b/src/workerd/io/tracer.h @@ -223,6 +223,7 @@ class UserSpanObserver final: public SpanObserver { void report(const Span& span) override; void reportStart(kj::ConstString operationName, kj::Date startTime) override; kj::Date getTime() override; + tracing::SpanId getSpanId(); private: kj::Own submitter; diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 3108eb56c52..1fe9ec3bc61 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -2199,16 +2199,24 @@ class Server::WorkerService final: public Service, kj::Own observer = kj::refcounted(mapAddRef(workerTracer), waitUntilTasks); - return newWorkerEntrypoint( - threadContext, kj::atomicAddRef(*worker), entrypointName, kj::mv(props), kj::mv(actor), - kj::Own(this, kj::NullDisposer::instance), {}, // ioContextDependency + kj::Maybe triggerContext; + KJ_IF_SOME(ctx, metadata.userSpanContext) { + KJ_IF_SOME(spanId, ctx.getSpanId()) { + triggerContext = + tracing::InvocationSpanContext(ctx.getTraceId(), tracing::TraceId::nullId, spanId); + } + } + + return newWorkerEntrypoint(threadContext, kj::atomicAddRef(*worker), entrypointName, + kj::mv(props), kj::mv(actor), kj::Own(this, kj::NullDisposer::instance), + {}, // ioContextDependency kj::Own(this, kj::NullDisposer::instance), kj::mv(observer), waitUntilTasks, true, // tunnelExceptions kj::mv(workerTracer), // workerTracer kj::mv(metadata.cfBlobJson), - kj::none // versionInfo - ); + kj::none, // versionInfo + kj::mv(triggerContext)); } class ActorNamespace final {