Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 50 additions & 24 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,35 @@ namespace {
// Header for the message format.
static constexpr kj::StringPtr HDR_MSG_FORMAT = "X-Msg-Fmt"_kj;

// The upstream service sends 0 when there is "no data" available on a timestamp field (e.g. no `oldestMessageTimestamp`).
// This method converts it to kj::none so users see `undefined`.
void clearEpochSentinel(jsg::Optional<kj::Date>& ts) {
KJ_IF_SOME(date, ts) {
if (date == kj::UNIX_EPOCH) {
ts = kj::none;
}
}
}

// Returns a callback suitable for IoContext::awaitIo() that parses a JSON response string into
// a typed struct via the given TypeHandler, then clears the epoch sentinel on
// oldestMessageTimestamp.
//
// The returned callback captures `handler` by reference. TypeHandler instances are managed by
// the JSG type registration system and live for the lifetime of the isolate, so this is safe.
//
// getOldestMessageTimestamp: (T&) -> jsg::Optional<kj::Date>&
template <typename T>
auto parseQueueResponse(
const jsg::TypeHandler<T>& handler, kj::StringPtr errorMsg, auto getOldestMessageTimestamp) {
return [&handler, errorMsg, getOldestMessageTimestamp](jsg::Lock& js, kj::String text) -> T {
auto parsed = jsg::JsValue::fromJson(js, text);
auto result = JSG_REQUIRE_NONNULL(handler.tryUnwrap(js, parsed), Error, errorMsg, text);
clearEpochSentinel(getOldestMessageTimestamp(result));
return kj::mv(result);
};
}

// Header for the message delivery delay.
static constexpr kj::StringPtr HDR_MSG_DELAY = "X-Msg-Delay-Secs"_kj;

Expand Down Expand Up @@ -303,12 +332,9 @@ jsg::Promise<WorkerQueue::SendResponse> WorkerQueue::sendWithResponse(jsg::Lock&
auto promise =
handleSend(kj::mv(req), kj::mv(serialized), kj::mv(client), headerIds, exposeErrorCodes);

return context.awaitIo(
js, kj::mv(promise), [&responseHandler](jsg::Lock& js, kj::String text) -> SendResponse {
auto parsed = jsg::JsValue::fromJson(js, text);
return JSG_REQUIRE_NONNULL(
responseHandler.tryUnwrap(js, parsed), Error, "Failed to parse queue send response", text);
});
return context.awaitIo(js, kj::mv(promise),
parseQueueResponse(responseHandler, "Failed to parse queue send response"_kj,
[](SendResponse& r) -> auto& { return r.metadata.metrics.oldestMessageTimestamp; }));
}

kj::Promise<void> WorkerQueue::sendBatch(jsg::Lock& js,
Expand Down Expand Up @@ -451,15 +477,9 @@ jsg::Promise<WorkerQueue::Metrics> WorkerQueue::metrics(

auto promise = handleMetrics(kj::mv(req), kj::mv(client), headerIds);

return context.awaitIo(
js, kj::mv(promise), [&metricsHandler](jsg::Lock& js, kj::String text) -> Metrics {
auto parsed = jsg::JsValue::fromJson(js, text);
KJ_IF_SOME(result, metricsHandler.tryUnwrap(js, parsed)) {
return kj::mv(result);
}
_JSG_INTERNAL_FAIL_REQUIRE(
JSG_EXCEPTION(Error), "Failed to parse queue metrics response", text);
});
return context.awaitIo(js, kj::mv(promise),
parseQueueResponse(metricsHandler, "Failed to parse queue metrics response"_kj,
[](Metrics& m) -> auto& { return m.oldestMessageTimestamp; }));
}

jsg::Promise<WorkerQueue::SendBatchResponse> WorkerQueue::sendBatchWithResponse(jsg::Lock& js,
Expand Down Expand Up @@ -568,12 +588,9 @@ jsg::Promise<WorkerQueue::SendBatchResponse> WorkerQueue::sendBatchWithResponse(
auto promise =
handleWrite(kj::mv(req), kj::mv(body), kj::mv(client), headerIds, exposeErrorCodes);

return context.awaitIo(
js, kj::mv(promise), [&responseHandler](jsg::Lock& js, kj::String text) -> SendBatchResponse {
auto parsed = jsg::JsValue::fromJson(js, text);
return JSG_REQUIRE_NONNULL(
responseHandler.tryUnwrap(js, parsed), Error, "Failed to parse queue send response", text);
});
return context.awaitIo(js, kj::mv(promise),
parseQueueResponse(responseHandler, "Failed to parse queue send response"_kj,
[](SendBatchResponse& r) -> auto& { return r.metadata.metrics.oldestMessageTimestamp; }));
}

QueueMessage::QueueMessage(
Expand Down Expand Up @@ -660,14 +677,19 @@ QueueEvent::QueueEvent(
}
messages = messagesBuilder.finish();

// Extract metadata. If the sender didn't set the field, capnp defaults all values to zero.
// Extract metadata. If the sender didn't set the field, capnp defaults all to the zero values.
auto m = params.getMetadata().getMetrics();
jsg::Optional<kj::Date> oldestTimestamp;
if (m.getOldestMessageTimestamp() != 0) {
oldestTimestamp =
kj::UNIX_EPOCH + static_cast<int64_t>(m.getOldestMessageTimestamp()) * kj::MILLISECONDS;
}
metadata = MessageBatchMetadata{
.metrics =
MessageBatchMetrics{
.backlogCount = m.getBacklogCount(),
.backlogBytes = m.getBacklogBytes(),
.oldestMessageTimestamp = m.getOldestMessageTimestamp(),
.oldestMessageTimestamp = oldestTimestamp,
},
};
}
Expand All @@ -677,6 +699,8 @@ QueueEvent::QueueEvent(jsg::Lock& js, Params params, IoPtr<QueueEventResult> res
queueName(kj::mv(params.queueName)),
metadata(kj::mv(params.metadata)),
result(result) {
clearEpochSentinel(metadata.metrics.oldestMessageTimestamp);

auto messagesBuilder = kj::heapArrayBuilder<jsg::Ref<QueueMessage>>(params.messages.size());
for (auto i: kj::indices(params.messages)) {
messagesBuilder.add(js.alloc<QueueMessage>(js, kj::mv(params.messages[i]), result));
Expand Down Expand Up @@ -987,7 +1011,9 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEvent::sendRpc(
auto metricsBuilder = metadataBuilder.initMetrics();
metricsBuilder.setBacklogCount(p.metadata.metrics.backlogCount);
metricsBuilder.setBacklogBytes(p.metadata.metrics.backlogBytes);
metricsBuilder.setOldestMessageTimestamp(p.metadata.metrics.oldestMessageTimestamp);
KJ_IF_SOME(ts, p.metadata.metrics.oldestMessageTimestamp) {
metricsBuilder.setOldestMessageTimestamp((ts - kj::UNIX_EPOCH) / kj::MILLISECONDS);
}
}
}
}
Expand Down
30 changes: 18 additions & 12 deletions src/workerd/api/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,22 @@ class WorkerQueue: public jsg::Object {
// representing this queue.
WorkerQueue(uint subrequestChannel): subrequestChannel(subrequestChannel) {}

// The metrics structs below (Metrics, SendMetrics, SendBatchMetrics) are deserialized from
// JSON responses where the upstream service uses 0 as a sentinel for "no data" on timestamp
// fields. Callers MUST call clearEpochSentinel() on oldestMessageTimestamp after deserialization to convert the
// sentinel to kj::none (JS undefined).
struct Metrics {
double backlogCount;
double backlogBytes;
double oldestMessageTimestamp;
double backlogCount = 0;
double backlogBytes = 0;
jsg::Optional<kj::Date> oldestMessageTimestamp;
JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp);
JSG_STRUCT_TS_OVERRIDE(QueueMetrics);
};

struct SendMetrics {
double backlogCount;
double backlogBytes;
double oldestMessageTimestamp;
double backlogCount = 0;
double backlogBytes = 0;
jsg::Optional<kj::Date> oldestMessageTimestamp;
JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp);
JSG_STRUCT_TS_OVERRIDE(QueueSendMetrics);
};
Expand All @@ -56,9 +60,9 @@ class WorkerQueue: public jsg::Object {
};

struct SendBatchMetrics {
double backlogCount;
double backlogBytes;
double oldestMessageTimestamp;
double backlogCount = 0;
double backlogBytes = 0;
jsg::Optional<kj::Date> oldestMessageTimestamp;
JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp);
JSG_STRUCT_TS_OVERRIDE(QueueSendBatchMetrics);
};
Expand Down Expand Up @@ -171,10 +175,12 @@ class WorkerQueue: public jsg::Object {

// Metadata delivered with a message batch in the queue() handler

// Same sentinel caveat as WorkerQueue::Metrics above: the capnp path uses 0 to mean "no data"
// for oldestMessageTimestamp. As such, we must explicitly set it to kj::none (JS undefined).
struct MessageBatchMetrics {
double backlogCount;
double backlogBytes;
double oldestMessageTimestamp;
double backlogCount = 0;
double backlogBytes = 0;
jsg::Optional<kj::Date> oldestMessageTimestamp;
JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp);
JSG_STRUCT_TS_OVERRIDE(MessageBatchMetrics);
};
Expand Down
6 changes: 6 additions & 0 deletions src/workerd/api/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ wd_test(
data = ["queue-metadata-test.js"],
)

wd_test(
src = "queue-metrics-sentinel-test.wd-test",
args = ["--experimental"],
data = ["queue-metrics-sentinel-test.js"],
)

wd_test(
src = "queue-producer-metadata-test.wd-test",
args = ["--experimental"],
Expand Down
13 changes: 10 additions & 3 deletions src/workerd/api/tests/queue-metadata-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,24 @@ export default {
if (
batch.metadata.metrics.backlogCount === 0 &&
batch.metadata.metrics.backlogBytes === 0 &&
batch.metadata.metrics.oldestMessageTimestamp === 0
batch.metadata.metrics.oldestMessageTimestamp === undefined
) {
// If metadata is omitted → all values default to zero
// If metadata is omitted → counts default to zero, timestamp is undefined
batch.ackAll();
return;
}

// Explicit metadata path
assert.strictEqual(batch.metadata.metrics.backlogCount, 100);
assert.strictEqual(batch.metadata.metrics.backlogBytes, 2048);
assert.strictEqual(batch.metadata.metrics.oldestMessageTimestamp, 1000000);
assert.ok(
batch.metadata.metrics.oldestMessageTimestamp instanceof Date,
'Expected oldestMessageTimestamp to be a Date'
);
assert.strictEqual(
batch.metadata.metrics.oldestMessageTimestamp.getTime(),
1000000
);
batch.ackAll();
},

Expand Down
84 changes: 84 additions & 0 deletions src/workerd/api/tests/queue-metrics-sentinel-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) 2026 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

// Tests that the upstream sentinel value of 0 for oldestMessageTimestamp is
// correctly converted to undefined (kj::none) by clearEpochSentinel().

import assert from 'node:assert';

export default {
async fetch(request) {
const { pathname } = new URL(request.url);

if (pathname === '/metrics') {
return Response.json({
backlogCount: 5,
backlogBytes: 100,
oldestMessageTimestamp: 0,
});
}

if (pathname === '/message') {
await request.arrayBuffer();
return Response.json({
metadata: {
metrics: {
backlogCount: 5,
backlogBytes: 100,
oldestMessageTimestamp: 0,
},
},
});
}

if (pathname === '/batch') {
await request.arrayBuffer();
return Response.json({
metadata: {
metrics: {
backlogCount: 10,
backlogBytes: 200,
oldestMessageTimestamp: 0,
},
},
});
}

return new Response('Not Found', { status: 404 });
},

async test(ctrl, env) {
// Test metrics() zero-sentinel → undefined
const metrics = await env.QUEUE.metrics();
assert.strictEqual(metrics.backlogCount, 5);
assert.strictEqual(metrics.backlogBytes, 100);
assert.strictEqual(
metrics.oldestMessageTimestamp,
undefined,
'Expected oldestMessageTimestamp to be undefined when upstream sends 0'
);

// Test send() zero-sentinel → undefined
const sendResult = await env.QUEUE.send('abc', { contentType: 'text' });
assert.strictEqual(sendResult.metadata.metrics.backlogCount, 5);
assert.strictEqual(sendResult.metadata.metrics.backlogBytes, 100);
assert.strictEqual(
sendResult.metadata.metrics.oldestMessageTimestamp,
undefined,
'Expected send oldestMessageTimestamp to be undefined when upstream sends 0'
);

// Test sendBatch() zero-sentinel → undefined
const sendBatchResult = await env.QUEUE.sendBatch([
{ body: 'def', contentType: 'text' },
]);
assert.strictEqual(sendBatchResult.metadata.metrics.backlogCount, 10);
assert.strictEqual(sendBatchResult.metadata.metrics.backlogBytes, 200);
assert.strictEqual(
sendBatchResult.metadata.metrics.oldestMessageTimestamp,
undefined,
'Expected sendBatch oldestMessageTimestamp to be undefined when upstream sends 0'
);
},
};
17 changes: 17 additions & 0 deletions src/workerd/api/tests/queue-metrics-sentinel-test.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "queue-metrics-sentinel-test",
worker = (
modules = [
( name = "worker", esModule = embed "queue-metrics-sentinel-test.js" )
],
bindings = [
( name = "QUEUE", queue = "queue-metrics-sentinel-test" ),
],
compatibilityFlags = ["nodejs_compat", "queues_json_messages", "experimental", "capture_async_api_throws"],
)
),
],
);
6 changes: 5 additions & 1 deletion src/workerd/api/tests/queue-metrics-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ export default {
const metrics = await env.QUEUE.metrics();
assert.strictEqual(metrics.backlogCount, 100);
assert.strictEqual(metrics.backlogBytes, 2048);
assert.strictEqual(metrics.oldestMessageTimestamp, 1000000);
assert.ok(
metrics.oldestMessageTimestamp instanceof Date,
'Expected oldestMessageTimestamp to be a Date'
);
assert.strictEqual(metrics.oldestMessageTimestamp.getTime(), 1000000);
} else {
// Flag OFF → metrics() should not be exposed on the binding
assert.strictEqual(typeof env.QUEUE.metrics, 'undefined');
Expand Down
12 changes: 10 additions & 2 deletions src/workerd/api/tests/queue-producer-metadata-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,23 @@ export default {
if (responseBodyEnabled) {
assert.strictEqual(sendResult.metadata.metrics.backlogCount, 100);
assert.strictEqual(sendResult.metadata.metrics.backlogBytes, 2048);
assert.ok(
sendResult.metadata.metrics.oldestMessageTimestamp instanceof Date,
'Expected oldestMessageTimestamp to be a Date'
);
assert.strictEqual(
sendResult.metadata.metrics.oldestMessageTimestamp,
sendResult.metadata.metrics.oldestMessageTimestamp.getTime(),
1000000
);

assert.strictEqual(sendBatchResult.metadata.metrics.backlogCount, 200);
assert.strictEqual(sendBatchResult.metadata.metrics.backlogBytes, 4096);
assert.ok(
sendBatchResult.metadata.metrics.oldestMessageTimestamp instanceof Date,
'Expected oldestMessageTimestamp to be a Date'
);
assert.strictEqual(
sendBatchResult.metadata.metrics.oldestMessageTimestamp,
sendBatchResult.metadata.metrics.oldestMessageTimestamp.getTime(),
2000000
);
} else {
Expand Down
Loading
Loading