Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions src/workerd/api/actor-state.c++
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,8 @@ jsg::Promise<void> DurableObjectStorageOperations::setAlarm(
"setAlarm() cannot be called with an alarm time <= 0");

auto& context = IoContext::current();
auto traceContext = context.makeUserTraceSpan("durable_object_storage_setAlarm"_kjc);
auto traceContext =
context.makeUserTraceSpan("durable_object_storage_setAlarm"_kjc, SpanKind::PRODUCER);
// This doesn't check if we have an alarm handler per say. It checks if we have an initialized
// (post-ctor) JS durable object with an alarm handler. Notably, this means this won't throw if
// `setAlarm` is invoked in the DO ctor even if the DO class does not have an alarm handler. This
Expand Down Expand Up @@ -655,7 +656,8 @@ jsg::Promise<jsg::JsRef<jsg::JsValue>> DurableObjectStorage::transaction(jsg::Lo
callback,
jsg::Optional<TransactionOptions> options) {
auto& context = IoContext::current();
auto traceContext = context.makeUserTraceSpan("durable_object_storage_transaction"_kjc);
auto traceContext =
context.makeUserTraceSpan("durable_object_storage_transaction"_kjc, SpanKind::INTERNAL);

struct TxnResult {
jsg::JsRef<jsg::JsValue> value;
Expand Down Expand Up @@ -755,7 +757,8 @@ jsg::JsRef<jsg::JsValue> DurableObjectStorage::transactionSync(

jsg::Promise<void> DurableObjectStorage::sync(jsg::Lock& js) {
auto& context = IoContext::current();
auto traceContext = context.makeUserTraceSpan("durable_object_storage_sync"_kjc);
auto traceContext =
context.makeUserTraceSpan("durable_object_storage_sync"_kjc, SpanKind::INTERNAL);
KJ_IF_SOME(p, cache->onNoPendingFlush(traceContext.getInternalSpanParent())) {
// Note that we're not actually flushing since that will happen anyway once we go async. We're
// merely checking if we have any pending or in-flight operations, and providing a promise that
Expand Down Expand Up @@ -852,7 +855,8 @@ jsg::Ref<SyncKvStorage> DurableObjectStorage::getKv(jsg::Lock& js) {

kj::Promise<kj::String> DurableObjectStorage::getCurrentBookmark() {
auto& context = IoContext::current();
auto traceContext = context.makeUserTraceSpan("durable_object_storage_getCurrentBookmark"_kjc);
auto traceContext = context.makeUserTraceSpan(
"durable_object_storage_getCurrentBookmark"_kjc, SpanKind::INTERNAL);

return cache->getCurrentBookmark(traceContext.getInternalSpanParent())
.attach(kj::mv(traceContext));
Expand All @@ -868,7 +872,8 @@ kj::Promise<kj::String> DurableObjectStorage::onNextSessionRestoreBookmark(kj::S

kj::Promise<void> DurableObjectStorage::waitForBookmark(kj::String bookmark) {
auto& context = IoContext::current();
auto traceContext = context.makeUserTraceSpan("durable_object_storage_waitForBookmark"_kjc);
auto traceContext =
context.makeUserTraceSpan("durable_object_storage_waitForBookmark"_kjc, SpanKind::INTERNAL);

return cache->waitForBookmark(bookmark, traceContext.getInternalSpanParent())
.attach(kj::mv(traceContext));
Expand Down
23 changes: 14 additions & 9 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ kj::Promise<void> WorkerQueue::send(
// queue broker's domain, and the start of the URL path including the account ID and queue ID. All
// we have to do is provide the end of the path (which is "/message") to send a single message.

auto client = context.getHttpClient(subrequestChannel, true, kj::none, "queue_send"_kjc);
auto traceContext = context.makeUserTraceSpan("queue_send"_kjc, SpanKind::PRODUCER);
auto client = context.getHttpClient(subrequestChannel, true, kj::none, traceContext);
auto req = client->request(
kj::HttpMethod::POST, "https://fake-host/message"_kjc, headers, serialized.data.size());

Expand All @@ -240,7 +241,7 @@ kj::Promise<void> WorkerQueue::send(
};

return handleSend(kj::mv(req), kj::mv(serialized), kj::mv(client), headerIds, exposeErrorCodes)
.attach(context.registerPendingEvent());
.attach(kj::mv(traceContext), context.registerPendingEvent());
};

jsg::Promise<WorkerQueue::SendResponse> WorkerQueue::sendWithResponse(jsg::Lock& js,
Expand Down Expand Up @@ -277,7 +278,8 @@ jsg::Promise<WorkerQueue::SendResponse> WorkerQueue::sendWithResponse(jsg::Lock&
serialized = serializeV8(js, body);
}

auto client = context.getHttpClient(subrequestChannel, true, kj::none, "queue_send"_kjc);
auto traceContext = context.makeUserTraceSpan("queue_send"_kjc, SpanKind::PRODUCER);
auto client = context.getHttpClient(subrequestChannel, true, kj::none, traceContext);
auto req = client->request(
kj::HttpMethod::POST, "https://fake-host/message"_kjc, headers, serialized.data.size());

Expand All @@ -301,7 +303,8 @@ jsg::Promise<WorkerQueue::SendResponse> WorkerQueue::sendWithResponse(jsg::Lock&
};

auto promise =
handleSend(kj::mv(req), kj::mv(serialized), kj::mv(client), headerIds, exposeErrorCodes);
handleSend(kj::mv(req), kj::mv(serialized), kj::mv(client), headerIds, exposeErrorCodes)
.attach(kj::mv(traceContext));

return context.awaitIo(
js, kj::mv(promise), [&responseHandler](jsg::Lock& js, kj::String text) -> SendResponse {
Expand Down Expand Up @@ -384,7 +387,8 @@ kj::Promise<void> WorkerQueue::sendBatch(jsg::Lock& js,
kj::String body(bodyBuilder.releaseAsArray());
KJ_DASSERT(jsg::JsValue::fromJson(js, body).isObject());

auto client = context.getHttpClient(subrequestChannel, true, kj::none, "queue_send"_kjc);
auto traceContext = context.makeUserTraceSpan("queue_send"_kjc, SpanKind::PRODUCER);
auto client = context.getHttpClient(subrequestChannel, true, kj::none, traceContext);

// We add info about the size of the batch to the headers so that the queue implementation can
// decide whether it's too large.
Expand Down Expand Up @@ -427,7 +431,7 @@ kj::Promise<void> WorkerQueue::sendBatch(jsg::Lock& js,
};

return handleWrite(kj::mv(req), kj::mv(body), kj::mv(client), headerIds, exposeErrorCodes)
.attach(context.registerPendingEvent());
.attach(kj::mv(traceContext), context.registerPendingEvent());
};

jsg::Promise<WorkerQueue::Metrics> WorkerQueue::metrics(
Expand Down Expand Up @@ -530,7 +534,8 @@ jsg::Promise<WorkerQueue::SendBatchResponse> WorkerQueue::sendBatchWithResponse(
kj::String body(bodyBuilder.releaseAsArray());
KJ_DASSERT(jsg::JsValue::fromJson(js, body).isObject());

auto client = context.getHttpClient(subrequestChannel, true, kj::none, "queue_send"_kjc);
auto traceContext = context.makeUserTraceSpan("queue_send"_kjc, SpanKind::PRODUCER);
auto client = context.getHttpClient(subrequestChannel, true, kj::none, traceContext);

auto headers = kj::HttpHeaders(context.getHeaderTable());
headers.addPtr("CF-Queue-Batch-Count"_kj, kj::str(messageCount));
Expand Down Expand Up @@ -565,8 +570,8 @@ jsg::Promise<WorkerQueue::SendBatchResponse> WorkerQueue::sendBatchWithResponse(
co_return kj::str(responseBody.asChars());
};

auto promise =
handleWrite(kj::mv(req), kj::mv(body), kj::mv(client), headerIds, exposeErrorCodes);
auto promise = handleWrite(kj::mv(req), kj::mv(body), kj::mv(client), headerIds, exposeErrorCodes)
.attach(kj::mv(traceContext));

return context.awaitIo(
js, kj::mv(promise), [&responseHandler](jsg::Lock& js, kj::String text) -> SendBatchResponse {
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/sql.c++
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ jsg::Ref<SqlStorage::Statement> SqlStorage::prepare(jsg::Lock& js, jsg::JsString
double SqlStorage::getDatabaseSize(jsg::Lock& js) {
auto& context = IoContext::current();
TraceContext traceContext =
context.makeUserTraceSpan("durable_object_storage_getDatabaseSize"_kjc);
context.makeUserTraceSpan("durable_object_storage_getDatabaseSize"_kjc, SpanKind::INTERNAL);
traceContext.setTag("db.operation.name"_kjc, "getDatabaseSize"_kjc);
auto& db = getDb(js);
int64_t pages = execMemoized(db, pragmaPageCount,
Expand Down
Loading
Loading