diff --git a/src/workerd/io/trace-test.c++ b/src/workerd/io/trace-test.c++ index 25e7a9ce32b..a73d26f1ba9 100644 --- a/src/workerd/io/trace-test.c++ +++ b/src/workerd/io/trace-test.c++ @@ -442,7 +442,7 @@ KJ_TEST("Read/Write SpanOpen works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); - SpanOpen info(0x2a2a2a2a2a2a2a2a, "foo"_kjc, kj::none); + SpanOpen info(0x2a2a2a2a2a2a2a2a, "foo"_kjc); info.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); @@ -455,6 +455,21 @@ KJ_TEST("Read/Write SpanOpen works") { KJ_ASSERT(info3.info == kj::none); } +KJ_TEST("SpanOpen preserves SpanKind through serialization and clone") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + SpanOpen info(0x2a2a2a2a2a2a2a2a, "foo"_kjc, SpanKind::CLIENT); + KJ_ASSERT(info.spanKind == SpanKind::CLIENT); + info.copyTo(infoBuilder); + + SpanOpen info2(infoBuilder.asReader()); + KJ_ASSERT(info2.spanKind == SpanKind::CLIENT); + + SpanOpen info3 = info.clone(); + KJ_ASSERT(info3.spanKind == SpanKind::CLIENT); +} + KJ_TEST("Read/Write SpanClose works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); @@ -499,6 +514,25 @@ KJ_TEST("Read/Write Onset works") { KJ_ASSERT(info3.workerInfo.executionModel == ExecutionModel::STATELESS); } +KJ_TEST("Onset preserves SpanKind through serialization and clone") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + FetchEventInfo fetchInfo( + kj::HttpMethod::GET, kj::str("https://example.com"), kj::str("{}"), nullptr); + + Onset info(staticSpanId, Onset::Info(kj::mv(fetchInfo)), {.scriptName = kj::str("foo")}, nullptr, + SpanKind::SERVER); + KJ_ASSERT(info.spanKind == SpanKind::SERVER); + info.copyTo(infoBuilder); + + Onset info2(infoBuilder.asReader()); + KJ_ASSERT(info2.spanKind == SpanKind::SERVER); + + Onset info3 = info.clone(); + KJ_ASSERT(info3.spanKind == SpanKind::SERVER); +} + KJ_TEST("Read/Write Outcome works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index 4e2f48fb128..8f5385aa42a 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -1038,10 +1038,12 @@ Return Return::clone() const { return Return(); } -SpanOpen::SpanOpen(SpanId spanId, kj::ConstString operationName, kj::Maybe info) +SpanOpen::SpanOpen( + SpanId spanId, kj::ConstString operationName, SpanKind spanKind, kj::Maybe info) : operationName(kj::mv(operationName)), info(kj::mv(info)), - spanId(spanId) {} + spanId(spanId), + spanKind(spanKind) {} namespace { kj::Maybe readSpanOpenInfo(rpc::Trace::SpanOpen::Reader& reader) { @@ -1067,11 +1069,13 @@ kj::Maybe readSpanOpenInfo(rpc::Trace::SpanOpen::Reader& reader) SpanOpen::SpanOpen(rpc::Trace::SpanOpen::Reader reader) : operationName(kj::str(reader.getOperationName())), info(readSpanOpenInfo(reader)), - spanId(reader.getSpanId()) {} + spanId(reader.getSpanId()), + spanKind(reader.getSpanKind()) {} void SpanOpen::copyTo(rpc::Trace::SpanOpen::Builder builder) const { builder.setOperationName(operationName.asPtr()); builder.setSpanId(spanId); + builder.setSpanKind(spanKind); KJ_IF_SOME(i, info) { auto infoBuilder = builder.initInfo(); KJ_SWITCH_ONEOF(i) { @@ -1108,7 +1112,7 @@ SpanOpen SpanOpen::clone() const { KJ_UNREACHABLE; }); }; - return SpanOpen(spanId, operationName.clone(), cloneInfo(info)); + return SpanOpen(spanId, operationName.clone(), spanKind, cloneInfo(info)); } kj::String KJ_STRINGIFY(const SpanOpen::Info& info) { @@ -1278,18 +1282,23 @@ Onset::WorkerInfo getWorkerInfoFromReader(const rpc::Trace::Onset::Reader& reade } } // namespace -Onset::Onset( - SpanId spanId, Onset::Info&& info, Onset::WorkerInfo&& workerInfo, CustomInfo attributes) +Onset::Onset(SpanId spanId, + Onset::Info&& info, + Onset::WorkerInfo&& workerInfo, + CustomInfo attributes, + SpanKind spanKind) : spanId(spanId), info(kj::mv(info)), workerInfo(kj::mv(workerInfo)), - attributes(kj::mv(attributes)) {} + attributes(kj::mv(attributes)), + spanKind(spanKind) {} Onset::Onset(rpc::Trace::Onset::Reader reader) : spanId(reader.getSpanId()), info(readOnsetInfo(reader.getInfo())), workerInfo(getWorkerInfoFromReader(reader)), - attributes(KJ_MAP(attr, reader.getAttributes()) { return Attribute(attr); }) {} + attributes(KJ_MAP(attr, reader.getAttributes()) { return Attribute(attr); }), + spanKind(reader.getSpanKind()) {} void Onset::copyTo(rpc::Trace::Onset::Builder builder) const { builder.setExecutionModel(workerInfo.executionModel); @@ -1322,6 +1331,7 @@ void Onset::copyTo(rpc::Trace::Onset::Builder builder) const { for (size_t n = 0; n < attributes.size(); n++) { attributes[n].copyTo(attributeBuilder[n]); } + builder.setSpanKind(spanKind); } Onset::WorkerInfo Onset::WorkerInfo::clone() const { @@ -1375,7 +1385,7 @@ EventInfo cloneEventInfo(const EventInfo& info) { Onset Onset::clone() const { return Onset(spanId, cloneEventInfo(info), workerInfo.clone(), - KJ_MAP(attr, attributes) { return attr.clone(); }); + KJ_MAP(attr, attributes) { return attr.clone(); }, spanKind); } Outcome::Outcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime) @@ -1587,13 +1597,15 @@ SpanOpenData::SpanOpenData(rpc::SpanOpenData::Reader reader) : spanId(reader.getSpanId()), parentSpanId(reader.getParentSpanId()), operationName(kj::str(reader.getOperationName())), - startTime(kj::UNIX_EPOCH + reader.getStartTimeNs() * kj::NANOSECONDS) {} + startTime(kj::UNIX_EPOCH + reader.getStartTimeNs() * kj::NANOSECONDS), + spanKind(reader.getSpanKind()) {} void SpanOpenData::copyTo(rpc::SpanOpenData::Builder builder) const { builder.setOperationName(operationName.asPtr()); builder.setStartTimeNs((startTime - kj::UNIX_EPOCH) / kj::NANOSECONDS); builder.setSpanId(spanId); builder.setParentSpanId(parentSpanId); + builder.setSpanKind(spanKind); } SpanEndData::SpanEndData(rpc::SpanEndData::Reader reader) @@ -1713,6 +1725,12 @@ void SpanBuilder::setTag(kj::ConstString key, TagInitValue value) { } } +void SpanBuilder::setSpanKind(SpanKind kind) { + KJ_IF_SOME(s, span) { + s.spanKind = kind; + } +} + void SpanBuilder::addLog(kj::Date timestamp, kj::ConstString key, TagValue value) { KJ_IF_SOME(s, span) { if (s.logs.size() >= Span::MAX_LOGS) { @@ -1775,6 +1793,11 @@ void TraceContext::setTag(kj::ConstString key, SpanBuilder::TagInitValue value) } } +void TraceContext::setSpanKind(SpanKind kind) { + span.setSpanKind(kind); + userSpan.setSpanKind(kind); +} + Span::TagValue spanTagClone(const Span::TagValue& tag) { KJ_SWITCH_ONEOF(tag) { KJ_CASE_ONEOF(str, kj::ConstString) { diff --git a/src/workerd/io/trace.capnp b/src/workerd/io/trace.capnp index e72615d1eaa..de909bb7e8c 100644 --- a/src/workerd/io/trace.capnp +++ b/src/workerd/io/trace.capnp @@ -42,6 +42,15 @@ struct UserSpanData { parentSpanId @5 :UInt64; } +enum SpanKind { + # The semantic kind of a span, as per https://opentelemetry.io/docs/specs/otel/trace/api/#spankind + internal @0; + server @1; + client @2; + producer @3; + consumer @4; +} + struct SpanOpenData { # Representation of a SpanOpen event, created when a user span is opened. operationName @0 :Text; @@ -51,6 +60,7 @@ struct SpanOpenData { spanId @2 :UInt64; parentSpanId @3 :UInt64; + spanKind @4 :SpanKind; } struct SpanEndData { diff --git a/src/workerd/io/trace.h b/src/workerd/io/trace.h index 69493566085..b8b37ac7b44 100644 --- a/src/workerd/io/trace.h +++ b/src/workerd/io/trace.h @@ -32,6 +32,7 @@ using kj::uint; using LogLevel = rpc::Trace::Log::Level; using ExecutionModel = rpc::Trace::ExecutionModel; +using SpanKind = rpc::SpanKind; class Trace; @@ -679,17 +680,20 @@ struct SpanOpenData { kj::ConstString operationName; kj::Date startTime; + SpanKind spanKind = SpanKind::INTERNAL; SpanOpenData(rpc::SpanOpenData::Reader reader); void copyTo(rpc::SpanOpenData::Builder builder) const; explicit SpanOpenData(tracing::SpanId spanId, tracing::SpanId parentSpanId, kj::ConstString operationName, - kj::Date startTime) + kj::Date startTime, + SpanKind spanKind = SpanKind::INTERNAL) : spanId(spanId), parentSpanId(parentSpanId), operationName(kj::mv(operationName)), - startTime(startTime) {} + startTime(startTime), + spanKind(spanKind) {} }; struct SpanEndData { @@ -738,7 +742,10 @@ struct SpanOpen final { // details of that subrequest. using Info = kj::OneOf; - explicit SpanOpen(SpanId spanId, kj::ConstString operationName, kj::Maybe info = kj::none); + explicit SpanOpen(SpanId spanId, + kj::ConstString operationName, + SpanKind spanKind = SpanKind::INTERNAL, + kj::Maybe info = kj::none); SpanOpen(rpc::Trace::SpanOpen::Reader reader); SpanOpen(SpanOpen&&) = default; SpanOpen& operator=(SpanOpen&&) = default; @@ -747,6 +754,7 @@ struct SpanOpen final { kj::ConstString operationName; kj::Maybe info = kj::none; SpanId spanId; + SpanKind spanKind = SpanKind::INTERNAL; void copyTo(rpc::Trace::SpanOpen::Builder builder) const; SpanOpen clone() const; @@ -790,8 +798,11 @@ struct Onset final { WorkerInfo clone() const; }; - explicit Onset( - tracing::SpanId spanId, Info&& info, WorkerInfo&& workerInfo, CustomInfo attributes); + explicit Onset(tracing::SpanId spanId, + Info&& info, + WorkerInfo&& workerInfo, + CustomInfo attributes, + SpanKind spanKind = SpanKind::INTERNAL); Onset(rpc::Trace::Onset::Reader reader); Onset(Onset&&) = default; @@ -802,6 +813,7 @@ struct Onset final { Info info; WorkerInfo workerInfo; CustomInfo attributes; + SpanKind spanKind = SpanKind::INTERNAL; void copyTo(rpc::Trace::Onset::Builder builder) const; Onset clone() const; @@ -1008,6 +1020,8 @@ struct Span { TagMap tags; kj::Vector logs; + SpanKind spanKind = SpanKind::INTERNAL; + // We set an arbitrary (-ish) cap on log messages for safety. If we drop logs because of this, // we report how many in a final "dropped_logs" log. // @@ -1147,6 +1161,8 @@ class SpanBuilder { void setTag(kj::ConstString key, TagInitValue value); + void setSpanKind(SpanKind kind); + // `key` must point to memory that will remain valid all the way until this span's data is // serialized. // @@ -1180,7 +1196,9 @@ class SpanObserver: public kj::Refcounted { // This should always be called exactly once per observer at span completion time. virtual void report(const Span& span) = 0; // Report information about the span onset. - virtual void reportStart(kj::ConstString operationName, kj::Date startTime) = 0; + virtual void reportStart(kj::ConstString operationName, + kj::Date startTime, + SpanKind spanKind = SpanKind::INTERNAL) = 0; // The current time to be provided for the span. For user tracing, we will override this to // provide I/O time. This *requires* that spans are only created when an IOContext is available @@ -1223,6 +1241,8 @@ class TraceContext { // Set a tag on both the internal span and user span. void setTag(kj::ConstString key, SpanBuilder::TagInitValue value); + // Set the semantic kind on both the internal span and user span. + void setSpanKind(SpanKind kind); bool isObserved() { return span.isObserved() || userSpan.isObserved(); } diff --git a/src/workerd/io/tracer.c++ b/src/workerd/io/tracer.c++ index eb4d9a56228..0d280d57b3d 100644 --- a/src/workerd/io/tracer.c++ +++ b/src/workerd/io/tracer.c++ @@ -157,7 +157,8 @@ void WorkerTracer::addLog(const tracing::InvocationSpanContext& context, void WorkerTracer::addSpan(tracing::CompleteSpan&& span) { // The span information is not transmitted via RPC at this point, we can decompose the span into // spanOpen/spanEnd. - addSpanOpen(span.spanId, span.parentSpanId, kj::mv(span.operationName), span.startTime); + addSpanOpen(span.spanId, span.parentSpanId, kj::mv(span.operationName), span.startTime, + SpanKind::INTERNAL); tracing::SpanEndData spanEnd(span.spanId, span.endTime, kj::mv(span.tags)); addSpanEnd(kj::mv(spanEnd), span.startTime); } @@ -165,7 +166,8 @@ void WorkerTracer::addSpan(tracing::CompleteSpan&& span) { void WorkerTracer::addSpanOpen(tracing::SpanId spanId, tracing::SpanId parentSpanId, kj::ConstString operationName, - kj::Date startTime) { + kj::Date startTime, + SpanKind spanKind) { if (pipelineLogLevel == PipelineLogLevel::NONE) { return; } @@ -181,8 +183,8 @@ void WorkerTracer::addSpanOpen(tracing::SpanId spanId, size_t spanNameSize = operationName.size(); auto spanOpenContext = tracing::InvocationSpanContext( topLevelContext.getTraceId(), topLevelContext.getInvocationId(), parentSpanId); - tailStreamWriter->report( - spanOpenContext, tracing::SpanOpen(spanId, kj::mv(operationName)), startTime, spanNameSize); + tailStreamWriter->report(spanOpenContext, + tracing::SpanOpen(spanId, kj::mv(operationName), spanKind), startTime, spanNameSize); } void WorkerTracer::addSpanEnd(tracing::SpanEndData&& span, kj::Maybe maybeStartTime) { @@ -618,8 +620,9 @@ void UserSpanObserver::report(const Span& span) { submitter->submitSpan(spanId, parentSpanId, span); } -void UserSpanObserver::reportStart(kj::ConstString operationName, kj::Date startTime) { - submitter->submitSpanOpen(spanId, parentSpanId, kj::mv(operationName), startTime); +void UserSpanObserver::reportStart( + kj::ConstString operationName, kj::Date startTime, SpanKind spanKind) { + submitter->submitSpanOpen(spanId, parentSpanId, kj::mv(operationName), startTime, spanKind); } // Provide I/O time to the tracing system for user spans. diff --git a/src/workerd/io/tracer.h b/src/workerd/io/tracer.h index 061b7af3338..f776377060f 100644 --- a/src/workerd/io/tracer.h +++ b/src/workerd/io/tracer.h @@ -37,7 +37,8 @@ class BaseTracer: public kj::Refcounted { virtual void addSpanOpen(tracing::SpanId spanId, tracing::SpanId parentSpanId, kj::ConstString operationName, - kj::Date startTime) = 0; + kj::Date startTime, + SpanKind spanKind) = 0; // Add span events when the span is complete (Attributes and SpanClose). virtual void addSpanEnd(tracing::SpanEndData&& span, kj::Maybe maybeStartTime) = 0; @@ -138,7 +139,8 @@ class WorkerTracer final: public BaseTracer { void addSpanOpen(tracing::SpanId spanId, tracing::SpanId parentSpanId, kj::ConstString operationName, - kj::Date startTime) override; + kj::Date startTime, + SpanKind spanKind) override; void addSpanEnd(tracing::SpanEndData&& span, kj::Maybe maybeStartTime) override; void addException(const tracing::InvocationSpanContext& context, kj::Date timestamp, @@ -198,7 +200,8 @@ class SpanSubmitter: public kj::Refcounted { virtual void submitSpanOpen(tracing::SpanId spanId, tracing::SpanId parentSpanId, kj::ConstString operationName, - kj::Date startTime) = 0; + kj::Date startTime, + SpanKind spanKind) = 0; virtual void submitSpan(tracing::SpanId context, tracing::SpanId spanId, const Span& span) = 0; virtual tracing::SpanId makeSpanId() = 0; @@ -221,7 +224,9 @@ class UserSpanObserver final: public SpanObserver { kj::Own newChild() override; void report(const Span& span) override; - void reportStart(kj::ConstString operationName, kj::Date startTime) override; + void reportStart(kj::ConstString operationName, + kj::Date startTime, + SpanKind spanKind = SpanKind::INTERNAL) override; kj::Date getTime() override; private: diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index 885db96225f..1d0853cf131 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -16,6 +16,7 @@ using import "/workerd/io/outcome.capnp".EventOutcome; using import "/workerd/io/script-version.capnp".ScriptVersion; using import "/workerd/io/trace.capnp".TagValue; using import "/workerd/io/trace.capnp".UserSpanData; +using import "/workerd/io/trace.capnp".SpanKind; using import "/workerd/io/frankenvalue.capnp".Frankenvalue; # A 128-bit trace ID used to identify traces. @@ -211,6 +212,7 @@ struct Trace @0x8e8d911203762d34 { workflow @2; } executionModel @25 :ExecutionModel; + # the execution model of the worker being traced. Can be stateless for a regular worker, # durableObject for a DO worker or workflow for the upcoming Workflows feature. @@ -250,6 +252,7 @@ struct Trace @0x8e8d911203762d34 { fetch @4 :FetchEventInfo; jsRpc @5 :JsRpcEventInfo; } + spanKind @6 :SpanKind; } struct SpanClose { @@ -289,6 +292,7 @@ struct Trace @0x8e8d911203762d34 { spanId @8: UInt64; # id for the span being opened by this Onset event. attributes @9 :List(Attribute); + spanKind @10 :SpanKind; } struct Outcome { diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index c1c9581b566..32bee4fbe72 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -1704,7 +1704,7 @@ class SequentialSpanSubmitter final: public SpanSubmitter { SequentialSpanSubmitter(kj::Own workerTracer): workerTracer(kj::mv(workerTracer)) {} void submitSpan(tracing::SpanId spanId, tracing::SpanId parentSpanId, const Span& span) override { // This code path is workerd-only, we can safely utilize submitSpanOpen here. - submitSpanOpen(spanId, parentSpanId, span.operationName.clone(), span.startTime); + submitSpanOpen(spanId, parentSpanId, span.operationName.clone(), span.startTime, span.spanKind); kj::Date startTime = span.startTime; tracing::SpanEndData span2(spanId, span.endTime); span2.tags.reserve(span.tags.size()); @@ -1721,11 +1721,12 @@ class SequentialSpanSubmitter final: public SpanSubmitter { void submitSpanOpen(tracing::SpanId spanId, tracing::SpanId parentSpanId, kj::ConstString operationName, - kj::Date startTime) override { + kj::Date startTime, + SpanKind spanKind) override { if (isPredictableModeForTest()) { startTime = kj::UNIX_EPOCH; } - workerTracer->addSpanOpen(spanId, parentSpanId, kj::mv(operationName), startTime); + workerTracer->addSpanOpen(spanId, parentSpanId, kj::mv(operationName), startTime, spanKind); } tracing::SpanId makeSpanId() override {