Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/workerd/io/io-channels.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ class IoChannelFactory {
// Specifies the parent span for the subrequest for tracing purposes.
SpanParent parentSpan = SpanParent(nullptr);

// User Span Context for trace propagation.
kj::Maybe<tracing::SpanContext> userSpanContext;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is set in io-context.c++:1015 but I don't see any code that reads it from SubrequestMetadata. The downstream consumers (e.g., server.c++:3272 startSubrequest, the various startRequest implementations) pass the metadata through but never access userSpanContext.

Is consumption happening in the internal repo, or is this scaffolding for a follow-up? A brief comment here would help future readers understand the intent — e.g.:

Suggested change
kj::Maybe<tracing::SpanContext> userSpanContext;
// User Span Context for trace propagation. Consumed by [describe consumer].
kj::Maybe<tracing::SpanContext> userSpanContext;


// Serialized JSON value to pass in ew_compat field of control header to FL. If this subrequest
// does not go directly to FL, this value is ignored. Flags marked with `$neededByFl` in
// `compatibility-date.capnp` end up here.
Expand Down
8 changes: 8 additions & 0 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1008,9 +1008,17 @@ kj::Own<WorkerInterface> IoContext::getSubrequestChannelImpl(uint channel,
kj::Maybe<kj::String> cfBlobJson,
TraceContext& tracing,
IoChannelFactory& channelFactory) {
const auto& invCtx = getInvocationSpanContext();
auto traceId = invCtx.getTraceId();
auto spanId = invCtx.getSpanId();
KJ_IF_SOME(observer, tracing.getUserSpanParent().getObserver()) {
auto& userObs = kj::downcast<UserSpanObserver>(observer);
spanId = userObs.getSpanId();
}
IoChannelFactory::SubrequestMetadata metadata{
.cfBlobJson = kj::mv(cfBlobJson),
.parentSpan = tracing.getInternalSpanParent(),
.userSpanContext = tracing::SpanContext(traceId, spanId),
.featureFlagsForFl = mapCopyString(worker->getIsolate().getFeatureFlagsForFl()),
};

Expand Down
4 changes: 2 additions & 2 deletions src/workerd/io/trace-stream.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1076,8 +1076,8 @@ bool TailStreamWriter::reportImpl(TailEvent&& event, size_t sizeHint) {
// event indicating how many events were dropped if applicable.
if (event.event.is<Outcome>() && active->droppedEvents > 0) {
StreamDiagnosticsEvent diag(active->droppedEvents);
TailEvent diagTailEvent(event.spanContext.clone(), event.invocationId, event.timestamp,
event.sequence, kj::mv(diag));
TailEvent diagTailEvent(SpanContext::clone(event.spanContext), event.invocationId,
event.timestamp, event.sequence, kj::mv(diag));
active->queue.push(kj::mv(diagTailEvent));
// Increment the outcome sequence number to keep things consistent.
event.sequence++;
Expand Down
4 changes: 0 additions & 4 deletions src/workerd/io/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,6 @@ void SpanContext::toCapnp(rpc::SpanContext::Builder writer) const {
}
}

SpanContext SpanContext::clone() const {
return SpanContext(traceId, spanId);
}

kj::String KJ_STRINGIFY(const SpanContext& context) {
return kj::str(context.getTraceId(), "-", context.getSpanId());
}
Expand Down
8 changes: 7 additions & 1 deletion src/workerd/io/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,9 @@ struct SpanContext {

static SpanContext fromCapnp(rpc::SpanContext::Reader reader);
void toCapnp(rpc::SpanContext::Builder writer) const;
SpanContext clone() const;
static SpanContext clone(const SpanContext& ctx) {
return SpanContext(ctx.traceId, ctx.spanId);
}

private:
TraceId traceId;
Expand Down Expand Up @@ -1213,6 +1215,10 @@ class TraceContext {
return SpanParent(span);
}

SpanParent getUserSpanParent() {
return SpanParent(userSpan);
}

private:
SpanBuilder span;
SpanBuilder userSpan;
Expand Down
12 changes: 10 additions & 2 deletions src/workerd/io/tracer.c++
Original file line number Diff line number Diff line change
Expand Up @@ -369,13 +369,17 @@ void WorkerTracer::setEventInfoInternal(
.entrypoint = mapCopyString(trace->entrypoint),
};

tracing::SpanId parentSpanId = tracing::SpanId::nullId;
KJ_IF_SOME(trigger, context.getParent()) {
parentSpanId = trigger.getSpanId();
}
// Onset needs special handling for spanId: The top-level spanId is zero unless a trigger
// context is available (not yet implemented). The inner spanId is taken from the invocation
// context is available. The inner spanId is taken from the invocation
// span context, that span is being "opened" with the onset event. All other tail events have it
// as its parent span ID, except for recursive SpanOpens (which have the parent span instead)
// and Attribute/SpanClose events (which have the spanId opened in the corresponding SpanOpen).
auto onsetContext = tracing::InvocationSpanContext(
context.getTraceId(), context.getInvocationId(), tracing::SpanId::nullId);
context.getTraceId(), context.getInvocationId(), parentSpanId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the onset event's top-level span ID from always nullId to the trigger's spanId when a parent context exists. This is a behavioral change to the tail event wire format — streaming tail workers will now see a non-zero span ID in onset events for triggered invocations.

Is this change covered by existing tests? I didn't find test coverage for the onset spanId value. Consider adding a test case (in the trace/tracer tests) that verifies the onset context carries the parent's span ID when a trigger is present, and nullId when it isn't.


// Not applying size accounting for Onset since it is sent separately
writer->report(onsetContext,
Expand Down Expand Up @@ -627,4 +631,8 @@ kj::Date UserSpanObserver::getTime() {
return IoContext::current().now();
}

tracing::SpanId UserSpanObserver::getSpanId() {
return spanId;
}

} // namespace workerd
1 change: 1 addition & 0 deletions src/workerd/io/tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ class UserSpanObserver final: public SpanObserver {
void report(const Span& span) override;
void reportStart(kj::ConstString operationName, kj::Date startTime) override;
kj::Date getTime() override;
tracing::SpanId getSpanId();

private:
kj::Own<SpanSubmitter> submitter;
Expand Down
18 changes: 13 additions & 5 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2199,16 +2199,24 @@ class Server::WorkerService final: public Service,
kj::Own<RequestObserver> observer =
kj::refcounted<RequestObserverWithTracer>(mapAddRef(workerTracer), waitUntilTasks);

return newWorkerEntrypoint(
threadContext, kj::atomicAddRef(*worker), entrypointName, kj::mv(props), kj::mv(actor),
kj::Own<LimitEnforcer>(this, kj::NullDisposer::instance), {}, // ioContextDependency
kj::Maybe<tracing::InvocationSpanContext> triggerContext;
KJ_IF_SOME(ctx, metadata.userSpanContext) {
KJ_IF_SOME(spanId, ctx.getSpanId()) {
triggerContext =
tracing::InvocationSpanContext(ctx.getTraceId(), tracing::TraceId::nullId, spanId);
}
}

return newWorkerEntrypoint(threadContext, kj::atomicAddRef(*worker), entrypointName,
kj::mv(props), kj::mv(actor), kj::Own<LimitEnforcer>(this, kj::NullDisposer::instance),
{}, // ioContextDependency
kj::Own<IoChannelFactory>(this, kj::NullDisposer::instance), kj::mv(observer),
waitUntilTasks,
true, // tunnelExceptions
kj::mv(workerTracer), // workerTracer
kj::mv(metadata.cfBlobJson),
kj::none // versionInfo
);
kj::none, // versionInfo
kj::mv(triggerContext));
}

class ActorNamespace final {
Expand Down
Loading