Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion src/workerd/io/trace-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ KJ_TEST("Read/Write SpanOpen works") {
capnp::MallocMessageBuilder builder;
auto infoBuilder = builder.initRoot<rpc::Trace::SpanOpen>();

SpanOpen info(0x2a2a2a2a2a2a2a2a, "foo"_kjc, kj::none);
SpanOpen info(0x2a2a2a2a2a2a2a2a, "foo"_kjc);
info.copyTo(infoBuilder);

auto reader = infoBuilder.asReader();
Expand All @@ -455,6 +455,21 @@ KJ_TEST("Read/Write SpanOpen works") {
KJ_ASSERT(info3.info == kj::none);
}

KJ_TEST("SpanOpen preserves SpanKind through serialization and clone") {
capnp::MallocMessageBuilder builder;
auto infoBuilder = builder.initRoot<rpc::Trace::SpanOpen>();

SpanOpen info(0x2a2a2a2a2a2a2a2a, "foo"_kjc, SpanKind::CLIENT);
KJ_ASSERT(info.spanKind == SpanKind::CLIENT);
info.copyTo(infoBuilder);

SpanOpen info2(infoBuilder.asReader());
KJ_ASSERT(info2.spanKind == SpanKind::CLIENT);

SpanOpen info3 = info.clone();
KJ_ASSERT(info3.spanKind == SpanKind::CLIENT);
}

KJ_TEST("Read/Write SpanClose works") {
capnp::MallocMessageBuilder builder;
auto infoBuilder = builder.initRoot<rpc::Trace::SpanClose>();
Expand Down Expand Up @@ -499,6 +514,25 @@ KJ_TEST("Read/Write Onset works") {
KJ_ASSERT(info3.workerInfo.executionModel == ExecutionModel::STATELESS);
}

KJ_TEST("Onset preserves SpanKind through serialization and clone") {
capnp::MallocMessageBuilder builder;
auto infoBuilder = builder.initRoot<rpc::Trace::Onset>();

FetchEventInfo fetchInfo(
kj::HttpMethod::GET, kj::str("https://example.com"), kj::str("{}"), nullptr);

Onset info(staticSpanId, Onset::Info(kj::mv(fetchInfo)), {.scriptName = kj::str("foo")}, nullptr,
SpanKind::SERVER);
KJ_ASSERT(info.spanKind == SpanKind::SERVER);
info.copyTo(infoBuilder);

Onset info2(infoBuilder.asReader());
KJ_ASSERT(info2.spanKind == SpanKind::SERVER);

Onset info3 = info.clone();
KJ_ASSERT(info3.spanKind == SpanKind::SERVER);
}

KJ_TEST("Read/Write Outcome works") {
capnp::MallocMessageBuilder builder;
auto infoBuilder = builder.initRoot<rpc::Trace::Outcome>();
Expand Down
43 changes: 33 additions & 10 deletions src/workerd/io/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1038,10 +1038,12 @@ Return Return::clone() const {
return Return();
}

SpanOpen::SpanOpen(SpanId spanId, kj::ConstString operationName, kj::Maybe<Info> info)
SpanOpen::SpanOpen(
SpanId spanId, kj::ConstString operationName, SpanKind spanKind, kj::Maybe<Info> info)
: operationName(kj::mv(operationName)),
info(kj::mv(info)),
spanId(spanId) {}
spanId(spanId),
spanKind(spanKind) {}

namespace {
kj::Maybe<SpanOpen::Info> readSpanOpenInfo(rpc::Trace::SpanOpen::Reader& reader) {
Expand All @@ -1067,11 +1069,13 @@ kj::Maybe<SpanOpen::Info> readSpanOpenInfo(rpc::Trace::SpanOpen::Reader& reader)
SpanOpen::SpanOpen(rpc::Trace::SpanOpen::Reader reader)
: operationName(kj::str(reader.getOperationName())),
info(readSpanOpenInfo(reader)),
spanId(reader.getSpanId()) {}
spanId(reader.getSpanId()),
spanKind(reader.getSpanKind()) {}

void SpanOpen::copyTo(rpc::Trace::SpanOpen::Builder builder) const {
builder.setOperationName(operationName.asPtr());
builder.setSpanId(spanId);
builder.setSpanKind(spanKind);
KJ_IF_SOME(i, info) {
auto infoBuilder = builder.initInfo();
KJ_SWITCH_ONEOF(i) {
Expand Down Expand Up @@ -1108,7 +1112,7 @@ SpanOpen SpanOpen::clone() const {
KJ_UNREACHABLE;
});
};
return SpanOpen(spanId, operationName.clone(), cloneInfo(info));
return SpanOpen(spanId, operationName.clone(), spanKind, cloneInfo(info));
}

kj::String KJ_STRINGIFY(const SpanOpen::Info& info) {
Expand Down Expand Up @@ -1278,18 +1282,23 @@ Onset::WorkerInfo getWorkerInfoFromReader(const rpc::Trace::Onset::Reader& reade
}
} // namespace

Onset::Onset(
SpanId spanId, Onset::Info&& info, Onset::WorkerInfo&& workerInfo, CustomInfo attributes)
Onset::Onset(SpanId spanId,
Onset::Info&& info,
Onset::WorkerInfo&& workerInfo,
CustomInfo attributes,
SpanKind spanKind)
: spanId(spanId),
info(kj::mv(info)),
workerInfo(kj::mv(workerInfo)),
attributes(kj::mv(attributes)) {}
attributes(kj::mv(attributes)),
spanKind(spanKind) {}

Onset::Onset(rpc::Trace::Onset::Reader reader)
: spanId(reader.getSpanId()),
info(readOnsetInfo(reader.getInfo())),
workerInfo(getWorkerInfoFromReader(reader)),
attributes(KJ_MAP(attr, reader.getAttributes()) { return Attribute(attr); }) {}
attributes(KJ_MAP(attr, reader.getAttributes()) { return Attribute(attr); }),
spanKind(reader.getSpanKind()) {}

void Onset::copyTo(rpc::Trace::Onset::Builder builder) const {
builder.setExecutionModel(workerInfo.executionModel);
Expand Down Expand Up @@ -1322,6 +1331,7 @@ void Onset::copyTo(rpc::Trace::Onset::Builder builder) const {
for (size_t n = 0; n < attributes.size(); n++) {
attributes[n].copyTo(attributeBuilder[n]);
}
builder.setSpanKind(spanKind);
}

Onset::WorkerInfo Onset::WorkerInfo::clone() const {
Expand Down Expand Up @@ -1375,7 +1385,7 @@ EventInfo cloneEventInfo(const EventInfo& info) {

Onset Onset::clone() const {
return Onset(spanId, cloneEventInfo(info), workerInfo.clone(),
KJ_MAP(attr, attributes) { return attr.clone(); });
KJ_MAP(attr, attributes) { return attr.clone(); }, spanKind);
}

Outcome::Outcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime)
Expand Down Expand Up @@ -1587,13 +1597,15 @@ SpanOpenData::SpanOpenData(rpc::SpanOpenData::Reader reader)
: spanId(reader.getSpanId()),
parentSpanId(reader.getParentSpanId()),
operationName(kj::str(reader.getOperationName())),
startTime(kj::UNIX_EPOCH + reader.getStartTimeNs() * kj::NANOSECONDS) {}
startTime(kj::UNIX_EPOCH + reader.getStartTimeNs() * kj::NANOSECONDS),
spanKind(reader.getSpanKind()) {}

void SpanOpenData::copyTo(rpc::SpanOpenData::Builder builder) const {
builder.setOperationName(operationName.asPtr());
builder.setStartTimeNs((startTime - kj::UNIX_EPOCH) / kj::NANOSECONDS);
builder.setSpanId(spanId);
builder.setParentSpanId(parentSpanId);
builder.setSpanKind(spanKind);
}

SpanEndData::SpanEndData(rpc::SpanEndData::Reader reader)
Expand Down Expand Up @@ -1713,6 +1725,12 @@ void SpanBuilder::setTag(kj::ConstString key, TagInitValue value) {
}
}

void SpanBuilder::setSpanKind(SpanKind kind) {
KJ_IF_SOME(s, span) {
s.spanKind = kind;
}
}

void SpanBuilder::addLog(kj::Date timestamp, kj::ConstString key, TagValue value) {
KJ_IF_SOME(s, span) {
if (s.logs.size() >= Span::MAX_LOGS) {
Expand Down Expand Up @@ -1775,6 +1793,11 @@ void TraceContext::setTag(kj::ConstString key, SpanBuilder::TagInitValue value)
}
}

void TraceContext::setSpanKind(SpanKind kind) {
span.setSpanKind(kind);
userSpan.setSpanKind(kind);
}

Span::TagValue spanTagClone(const Span::TagValue& tag) {
KJ_SWITCH_ONEOF(tag) {
KJ_CASE_ONEOF(str, kj::ConstString) {
Expand Down
10 changes: 10 additions & 0 deletions src/workerd/io/trace.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ struct UserSpanData {
parentSpanId @5 :UInt64;
}

enum SpanKind {
# The semantic kind of a span, as per https://opentelemetry.io/docs/specs/otel/trace/api/#spankind
internal @0;
server @1;
client @2;
producer @3;
consumer @4;
}

struct SpanOpenData {
# Representation of a SpanOpen event, created when a user span is opened.
operationName @0 :Text;
Expand All @@ -51,6 +60,7 @@ struct SpanOpenData {

spanId @2 :UInt64;
parentSpanId @3 :UInt64;
spanKind @4 :SpanKind;
}

struct SpanEndData {
Expand Down
32 changes: 26 additions & 6 deletions src/workerd/io/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ using kj::uint;

using LogLevel = rpc::Trace::Log::Level;
using ExecutionModel = rpc::Trace::ExecutionModel;
using SpanKind = rpc::SpanKind;

class Trace;

Expand Down Expand Up @@ -679,17 +680,20 @@ struct SpanOpenData {

kj::ConstString operationName;
kj::Date startTime;
SpanKind spanKind = SpanKind::INTERNAL;

SpanOpenData(rpc::SpanOpenData::Reader reader);
void copyTo(rpc::SpanOpenData::Builder builder) const;
explicit SpanOpenData(tracing::SpanId spanId,
tracing::SpanId parentSpanId,
kj::ConstString operationName,
kj::Date startTime)
kj::Date startTime,
SpanKind spanKind = SpanKind::INTERNAL)
: spanId(spanId),
parentSpanId(parentSpanId),
operationName(kj::mv(operationName)),
startTime(startTime) {}
startTime(startTime),
spanKind(spanKind) {}
};

struct SpanEndData {
Expand Down Expand Up @@ -738,7 +742,10 @@ struct SpanOpen final {
// details of that subrequest.
using Info = kj::OneOf<FetchEventInfo, JsRpcEventInfo, CustomInfo>;

explicit SpanOpen(SpanId spanId, kj::ConstString operationName, kj::Maybe<Info> info = kj::none);
explicit SpanOpen(SpanId spanId,
kj::ConstString operationName,
SpanKind spanKind = SpanKind::INTERNAL,
kj::Maybe<Info> info = kj::none);
SpanOpen(rpc::Trace::SpanOpen::Reader reader);
SpanOpen(SpanOpen&&) = default;
SpanOpen& operator=(SpanOpen&&) = default;
Expand All @@ -747,6 +754,7 @@ struct SpanOpen final {
kj::ConstString operationName;
kj::Maybe<Info> info = kj::none;
SpanId spanId;
SpanKind spanKind = SpanKind::INTERNAL;

void copyTo(rpc::Trace::SpanOpen::Builder builder) const;
SpanOpen clone() const;
Expand Down Expand Up @@ -790,8 +798,11 @@ struct Onset final {
WorkerInfo clone() const;
};

explicit Onset(
tracing::SpanId spanId, Info&& info, WorkerInfo&& workerInfo, CustomInfo attributes);
explicit Onset(tracing::SpanId spanId,
Info&& info,
WorkerInfo&& workerInfo,
CustomInfo attributes,
SpanKind spanKind = SpanKind::INTERNAL);

Onset(rpc::Trace::Onset::Reader reader);
Onset(Onset&&) = default;
Expand All @@ -802,6 +813,7 @@ struct Onset final {
Info info;
WorkerInfo workerInfo;
CustomInfo attributes;
SpanKind spanKind = SpanKind::INTERNAL;

void copyTo(rpc::Trace::Onset::Builder builder) const;
Onset clone() const;
Expand Down Expand Up @@ -1008,6 +1020,8 @@ struct Span {
TagMap tags;
kj::Vector<Log> logs;

SpanKind spanKind = SpanKind::INTERNAL;

// We set an arbitrary (-ish) cap on log messages for safety. If we drop logs because of this,
// we report how many in a final "dropped_logs" log.
//
Expand Down Expand Up @@ -1147,6 +1161,8 @@ class SpanBuilder {

void setTag(kj::ConstString key, TagInitValue value);

void setSpanKind(SpanKind kind);

// `key` must point to memory that will remain valid all the way until this span's data is
// serialized.
//
Expand Down Expand Up @@ -1180,7 +1196,9 @@ class SpanObserver: public kj::Refcounted {
// This should always be called exactly once per observer at span completion time.
virtual void report(const Span& span) = 0;
// Report information about the span onset.
virtual void reportStart(kj::ConstString operationName, kj::Date startTime) = 0;
virtual void reportStart(kj::ConstString operationName,
kj::Date startTime,
SpanKind spanKind = SpanKind::INTERNAL) = 0;

// The current time to be provided for the span. For user tracing, we will override this to
// provide I/O time. This *requires* that spans are only created when an IOContext is available
Expand Down Expand Up @@ -1223,6 +1241,8 @@ class TraceContext {

// Set a tag on both the internal span and user span.
void setTag(kj::ConstString key, SpanBuilder::TagInitValue value);
// Set the semantic kind on both the internal span and user span.
void setSpanKind(SpanKind kind);
bool isObserved() {
return span.isObserved() || userSpan.isObserved();
}
Expand Down
15 changes: 9 additions & 6 deletions src/workerd/io/tracer.c++
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,17 @@ void WorkerTracer::addLog(const tracing::InvocationSpanContext& context,
void WorkerTracer::addSpan(tracing::CompleteSpan&& span) {
// The span information is not transmitted via RPC at this point, we can decompose the span into
// spanOpen/spanEnd.
addSpanOpen(span.spanId, span.parentSpanId, kj::mv(span.operationName), span.startTime);
addSpanOpen(span.spanId, span.parentSpanId, kj::mv(span.operationName), span.startTime,
SpanKind::INTERNAL);
tracing::SpanEndData spanEnd(span.spanId, span.endTime, kj::mv(span.tags));
addSpanEnd(kj::mv(spanEnd), span.startTime);
}

void WorkerTracer::addSpanOpen(tracing::SpanId spanId,
tracing::SpanId parentSpanId,
kj::ConstString operationName,
kj::Date startTime) {
kj::Date startTime,
SpanKind spanKind) {
if (pipelineLogLevel == PipelineLogLevel::NONE) {
return;
}
Expand All @@ -181,8 +183,8 @@ void WorkerTracer::addSpanOpen(tracing::SpanId spanId,
size_t spanNameSize = operationName.size();
auto spanOpenContext = tracing::InvocationSpanContext(
topLevelContext.getTraceId(), topLevelContext.getInvocationId(), parentSpanId);
tailStreamWriter->report(
spanOpenContext, tracing::SpanOpen(spanId, kj::mv(operationName)), startTime, spanNameSize);
tailStreamWriter->report(spanOpenContext,
tracing::SpanOpen(spanId, kj::mv(operationName), spanKind), startTime, spanNameSize);
}

void WorkerTracer::addSpanEnd(tracing::SpanEndData&& span, kj::Maybe<kj::Date> maybeStartTime) {
Expand Down Expand Up @@ -618,8 +620,9 @@ void UserSpanObserver::report(const Span& span) {
submitter->submitSpan(spanId, parentSpanId, span);
}

void UserSpanObserver::reportStart(kj::ConstString operationName, kj::Date startTime) {
submitter->submitSpanOpen(spanId, parentSpanId, kj::mv(operationName), startTime);
void UserSpanObserver::reportStart(
kj::ConstString operationName, kj::Date startTime, SpanKind spanKind) {
submitter->submitSpanOpen(spanId, parentSpanId, kj::mv(operationName), startTime, spanKind);
}

// Provide I/O time to the tracing system for user spans.
Expand Down
Loading
Loading