diff --git a/src/workerd/api/queue.c++ b/src/workerd/api/queue.c++ index fd3d90b9eba..f4a8846cb44 100644 --- a/src/workerd/api/queue.c++ +++ b/src/workerd/api/queue.c++ @@ -23,6 +23,35 @@ namespace { // Header for the message format. static constexpr kj::StringPtr HDR_MSG_FORMAT = "X-Msg-Fmt"_kj; +// The upstream service sends 0 when there is "no data" available on a timestamp field (e.g. no `oldestMessageTimestamp`). +// This method converts it to kj::none so users see `undefined`. +void clearEpochSentinel(jsg::Optional& ts) { + KJ_IF_SOME(date, ts) { + if (date == kj::UNIX_EPOCH) { + ts = kj::none; + } + } +} + +// Returns a callback suitable for IoContext::awaitIo() that parses a JSON response string into +// a typed struct via the given TypeHandler, then clears the epoch sentinel on +// oldestMessageTimestamp. +// +// The returned callback captures `handler` by reference. TypeHandler instances are managed by +// the JSG type registration system and live for the lifetime of the isolate, so this is safe. +// +// getOldestMessageTimestamp: (T&) -> jsg::Optional& +template +auto parseQueueResponse( + const jsg::TypeHandler& handler, kj::StringPtr errorMsg, auto getOldestMessageTimestamp) { + return [&handler, errorMsg, getOldestMessageTimestamp](jsg::Lock& js, kj::String text) -> T { + auto parsed = jsg::JsValue::fromJson(js, text); + auto result = JSG_REQUIRE_NONNULL(handler.tryUnwrap(js, parsed), Error, errorMsg, text); + clearEpochSentinel(getOldestMessageTimestamp(result)); + return kj::mv(result); + }; +} + // Header for the message delivery delay. static constexpr kj::StringPtr HDR_MSG_DELAY = "X-Msg-Delay-Secs"_kj; @@ -303,12 +332,9 @@ jsg::Promise WorkerQueue::sendWithResponse(jsg::Lock& auto promise = handleSend(kj::mv(req), kj::mv(serialized), kj::mv(client), headerIds, exposeErrorCodes); - return context.awaitIo( - js, kj::mv(promise), [&responseHandler](jsg::Lock& js, kj::String text) -> SendResponse { - auto parsed = jsg::JsValue::fromJson(js, text); - return JSG_REQUIRE_NONNULL( - responseHandler.tryUnwrap(js, parsed), Error, "Failed to parse queue send response", text); - }); + return context.awaitIo(js, kj::mv(promise), + parseQueueResponse(responseHandler, "Failed to parse queue send response"_kj, + [](SendResponse& r) -> auto& { return r.metadata.metrics.oldestMessageTimestamp; })); } kj::Promise WorkerQueue::sendBatch(jsg::Lock& js, @@ -451,15 +477,9 @@ jsg::Promise WorkerQueue::metrics( auto promise = handleMetrics(kj::mv(req), kj::mv(client), headerIds); - return context.awaitIo( - js, kj::mv(promise), [&metricsHandler](jsg::Lock& js, kj::String text) -> Metrics { - auto parsed = jsg::JsValue::fromJson(js, text); - KJ_IF_SOME(result, metricsHandler.tryUnwrap(js, parsed)) { - return kj::mv(result); - } - _JSG_INTERNAL_FAIL_REQUIRE( - JSG_EXCEPTION(Error), "Failed to parse queue metrics response", text); - }); + return context.awaitIo(js, kj::mv(promise), + parseQueueResponse(metricsHandler, "Failed to parse queue metrics response"_kj, + [](Metrics& m) -> auto& { return m.oldestMessageTimestamp; })); } jsg::Promise WorkerQueue::sendBatchWithResponse(jsg::Lock& js, @@ -568,12 +588,9 @@ jsg::Promise WorkerQueue::sendBatchWithResponse( auto promise = handleWrite(kj::mv(req), kj::mv(body), kj::mv(client), headerIds, exposeErrorCodes); - return context.awaitIo( - js, kj::mv(promise), [&responseHandler](jsg::Lock& js, kj::String text) -> SendBatchResponse { - auto parsed = jsg::JsValue::fromJson(js, text); - return JSG_REQUIRE_NONNULL( - responseHandler.tryUnwrap(js, parsed), Error, "Failed to parse queue send response", text); - }); + return context.awaitIo(js, kj::mv(promise), + parseQueueResponse(responseHandler, "Failed to parse queue send response"_kj, + [](SendBatchResponse& r) -> auto& { return r.metadata.metrics.oldestMessageTimestamp; })); } QueueMessage::QueueMessage( @@ -660,14 +677,19 @@ QueueEvent::QueueEvent( } messages = messagesBuilder.finish(); - // Extract metadata. If the sender didn't set the field, capnp defaults all values to zero. + // Extract metadata. If the sender didn't set the field, capnp defaults all to the zero values. auto m = params.getMetadata().getMetrics(); + jsg::Optional oldestTimestamp; + if (m.getOldestMessageTimestamp() != 0) { + oldestTimestamp = + kj::UNIX_EPOCH + static_cast(m.getOldestMessageTimestamp()) * kj::MILLISECONDS; + } metadata = MessageBatchMetadata{ .metrics = MessageBatchMetrics{ .backlogCount = m.getBacklogCount(), .backlogBytes = m.getBacklogBytes(), - .oldestMessageTimestamp = m.getOldestMessageTimestamp(), + .oldestMessageTimestamp = oldestTimestamp, }, }; } @@ -677,6 +699,8 @@ QueueEvent::QueueEvent(jsg::Lock& js, Params params, IoPtr res queueName(kj::mv(params.queueName)), metadata(kj::mv(params.metadata)), result(result) { + clearEpochSentinel(metadata.metrics.oldestMessageTimestamp); + auto messagesBuilder = kj::heapArrayBuilder>(params.messages.size()); for (auto i: kj::indices(params.messages)) { messagesBuilder.add(js.alloc(js, kj::mv(params.messages[i]), result)); @@ -987,7 +1011,9 @@ kj::Promise QueueCustomEvent::sendRpc( auto metricsBuilder = metadataBuilder.initMetrics(); metricsBuilder.setBacklogCount(p.metadata.metrics.backlogCount); metricsBuilder.setBacklogBytes(p.metadata.metrics.backlogBytes); - metricsBuilder.setOldestMessageTimestamp(p.metadata.metrics.oldestMessageTimestamp); + KJ_IF_SOME(ts, p.metadata.metrics.oldestMessageTimestamp) { + metricsBuilder.setOldestMessageTimestamp((ts - kj::UNIX_EPOCH) / kj::MILLISECONDS); + } } } } diff --git a/src/workerd/api/queue.h b/src/workerd/api/queue.h index 89cf6f9f5e5..d2543ecb3d3 100644 --- a/src/workerd/api/queue.h +++ b/src/workerd/api/queue.h @@ -27,18 +27,22 @@ class WorkerQueue: public jsg::Object { // representing this queue. WorkerQueue(uint subrequestChannel): subrequestChannel(subrequestChannel) {} + // The metrics structs below (Metrics, SendMetrics, SendBatchMetrics) are deserialized from + // JSON responses where the upstream service uses 0 as a sentinel for "no data" on timestamp + // fields. Callers MUST call clearEpochSentinel() on oldestMessageTimestamp after deserialization to convert the + // sentinel to kj::none (JS undefined). struct Metrics { - double backlogCount; - double backlogBytes; - double oldestMessageTimestamp; + double backlogCount = 0; + double backlogBytes = 0; + jsg::Optional oldestMessageTimestamp; JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp); JSG_STRUCT_TS_OVERRIDE(QueueMetrics); }; struct SendMetrics { - double backlogCount; - double backlogBytes; - double oldestMessageTimestamp; + double backlogCount = 0; + double backlogBytes = 0; + jsg::Optional oldestMessageTimestamp; JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp); JSG_STRUCT_TS_OVERRIDE(QueueSendMetrics); }; @@ -56,9 +60,9 @@ class WorkerQueue: public jsg::Object { }; struct SendBatchMetrics { - double backlogCount; - double backlogBytes; - double oldestMessageTimestamp; + double backlogCount = 0; + double backlogBytes = 0; + jsg::Optional oldestMessageTimestamp; JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp); JSG_STRUCT_TS_OVERRIDE(QueueSendBatchMetrics); }; @@ -171,10 +175,12 @@ class WorkerQueue: public jsg::Object { // Metadata delivered with a message batch in the queue() handler +// Same sentinel caveat as WorkerQueue::Metrics above: the capnp path uses 0 to mean "no data" +// for oldestMessageTimestamp. As such, we must explicitly set it to kj::none (JS undefined). struct MessageBatchMetrics { - double backlogCount; - double backlogBytes; - double oldestMessageTimestamp; + double backlogCount = 0; + double backlogBytes = 0; + jsg::Optional oldestMessageTimestamp; JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp); JSG_STRUCT_TS_OVERRIDE(MessageBatchMetrics); }; diff --git a/src/workerd/api/tests/BUILD.bazel b/src/workerd/api/tests/BUILD.bazel index 853f12049cd..6312de1a723 100644 --- a/src/workerd/api/tests/BUILD.bazel +++ b/src/workerd/api/tests/BUILD.bazel @@ -164,6 +164,12 @@ wd_test( data = ["queue-metadata-test.js"], ) +wd_test( + src = "queue-metrics-sentinel-test.wd-test", + args = ["--experimental"], + data = ["queue-metrics-sentinel-test.js"], +) + wd_test( src = "queue-producer-metadata-test.wd-test", args = ["--experimental"], diff --git a/src/workerd/api/tests/queue-metadata-test.js b/src/workerd/api/tests/queue-metadata-test.js index 6e7ebd60185..f956dddddb1 100644 --- a/src/workerd/api/tests/queue-metadata-test.js +++ b/src/workerd/api/tests/queue-metadata-test.js @@ -25,9 +25,9 @@ export default { if ( batch.metadata.metrics.backlogCount === 0 && batch.metadata.metrics.backlogBytes === 0 && - batch.metadata.metrics.oldestMessageTimestamp === 0 + batch.metadata.metrics.oldestMessageTimestamp === undefined ) { - // If metadata is omitted → all values default to zero + // If metadata is omitted → counts default to zero, timestamp is undefined batch.ackAll(); return; } @@ -35,7 +35,14 @@ export default { // Explicit metadata path assert.strictEqual(batch.metadata.metrics.backlogCount, 100); assert.strictEqual(batch.metadata.metrics.backlogBytes, 2048); - assert.strictEqual(batch.metadata.metrics.oldestMessageTimestamp, 1000000); + assert.ok( + batch.metadata.metrics.oldestMessageTimestamp instanceof Date, + 'Expected oldestMessageTimestamp to be a Date' + ); + assert.strictEqual( + batch.metadata.metrics.oldestMessageTimestamp.getTime(), + 1000000 + ); batch.ackAll(); }, diff --git a/src/workerd/api/tests/queue-metrics-sentinel-test.js b/src/workerd/api/tests/queue-metrics-sentinel-test.js new file mode 100644 index 00000000000..5247b0a971f --- /dev/null +++ b/src/workerd/api/tests/queue-metrics-sentinel-test.js @@ -0,0 +1,84 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +// Tests that the upstream sentinel value of 0 for oldestMessageTimestamp is +// correctly converted to undefined (kj::none) by clearEpochSentinel(). + +import assert from 'node:assert'; + +export default { + async fetch(request) { + const { pathname } = new URL(request.url); + + if (pathname === '/metrics') { + return Response.json({ + backlogCount: 5, + backlogBytes: 100, + oldestMessageTimestamp: 0, + }); + } + + if (pathname === '/message') { + await request.arrayBuffer(); + return Response.json({ + metadata: { + metrics: { + backlogCount: 5, + backlogBytes: 100, + oldestMessageTimestamp: 0, + }, + }, + }); + } + + if (pathname === '/batch') { + await request.arrayBuffer(); + return Response.json({ + metadata: { + metrics: { + backlogCount: 10, + backlogBytes: 200, + oldestMessageTimestamp: 0, + }, + }, + }); + } + + return new Response('Not Found', { status: 404 }); + }, + + async test(ctrl, env) { + // Test metrics() zero-sentinel → undefined + const metrics = await env.QUEUE.metrics(); + assert.strictEqual(metrics.backlogCount, 5); + assert.strictEqual(metrics.backlogBytes, 100); + assert.strictEqual( + metrics.oldestMessageTimestamp, + undefined, + 'Expected oldestMessageTimestamp to be undefined when upstream sends 0' + ); + + // Test send() zero-sentinel → undefined + const sendResult = await env.QUEUE.send('abc', { contentType: 'text' }); + assert.strictEqual(sendResult.metadata.metrics.backlogCount, 5); + assert.strictEqual(sendResult.metadata.metrics.backlogBytes, 100); + assert.strictEqual( + sendResult.metadata.metrics.oldestMessageTimestamp, + undefined, + 'Expected send oldestMessageTimestamp to be undefined when upstream sends 0' + ); + + // Test sendBatch() zero-sentinel → undefined + const sendBatchResult = await env.QUEUE.sendBatch([ + { body: 'def', contentType: 'text' }, + ]); + assert.strictEqual(sendBatchResult.metadata.metrics.backlogCount, 10); + assert.strictEqual(sendBatchResult.metadata.metrics.backlogBytes, 200); + assert.strictEqual( + sendBatchResult.metadata.metrics.oldestMessageTimestamp, + undefined, + 'Expected sendBatch oldestMessageTimestamp to be undefined when upstream sends 0' + ); + }, +}; diff --git a/src/workerd/api/tests/queue-metrics-sentinel-test.wd-test b/src/workerd/api/tests/queue-metrics-sentinel-test.wd-test new file mode 100644 index 00000000000..539ecb1f652 --- /dev/null +++ b/src/workerd/api/tests/queue-metrics-sentinel-test.wd-test @@ -0,0 +1,17 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const unitTests :Workerd.Config = ( + services = [ + ( name = "queue-metrics-sentinel-test", + worker = ( + modules = [ + ( name = "worker", esModule = embed "queue-metrics-sentinel-test.js" ) + ], + bindings = [ + ( name = "QUEUE", queue = "queue-metrics-sentinel-test" ), + ], + compatibilityFlags = ["nodejs_compat", "queues_json_messages", "experimental", "capture_async_api_throws"], + ) + ), + ], +); diff --git a/src/workerd/api/tests/queue-metrics-test.js b/src/workerd/api/tests/queue-metrics-test.js index fdf1ef89b8b..c8ed23a17d0 100644 --- a/src/workerd/api/tests/queue-metrics-test.js +++ b/src/workerd/api/tests/queue-metrics-test.js @@ -26,7 +26,11 @@ export default { const metrics = await env.QUEUE.metrics(); assert.strictEqual(metrics.backlogCount, 100); assert.strictEqual(metrics.backlogBytes, 2048); - assert.strictEqual(metrics.oldestMessageTimestamp, 1000000); + assert.ok( + metrics.oldestMessageTimestamp instanceof Date, + 'Expected oldestMessageTimestamp to be a Date' + ); + assert.strictEqual(metrics.oldestMessageTimestamp.getTime(), 1000000); } else { // Flag OFF → metrics() should not be exposed on the binding assert.strictEqual(typeof env.QUEUE.metrics, 'undefined'); diff --git a/src/workerd/api/tests/queue-producer-metadata-test.js b/src/workerd/api/tests/queue-producer-metadata-test.js index b22738af145..f9491cb95b4 100644 --- a/src/workerd/api/tests/queue-producer-metadata-test.js +++ b/src/workerd/api/tests/queue-producer-metadata-test.js @@ -56,15 +56,23 @@ export default { if (responseBodyEnabled) { assert.strictEqual(sendResult.metadata.metrics.backlogCount, 100); assert.strictEqual(sendResult.metadata.metrics.backlogBytes, 2048); + assert.ok( + sendResult.metadata.metrics.oldestMessageTimestamp instanceof Date, + 'Expected oldestMessageTimestamp to be a Date' + ); assert.strictEqual( - sendResult.metadata.metrics.oldestMessageTimestamp, + sendResult.metadata.metrics.oldestMessageTimestamp.getTime(), 1000000 ); assert.strictEqual(sendBatchResult.metadata.metrics.backlogCount, 200); assert.strictEqual(sendBatchResult.metadata.metrics.backlogBytes, 4096); + assert.ok( + sendBatchResult.metadata.metrics.oldestMessageTimestamp instanceof Date, + 'Expected oldestMessageTimestamp to be a Date' + ); assert.strictEqual( - sendBatchResult.metadata.metrics.oldestMessageTimestamp, + sendBatchResult.metadata.metrics.oldestMessageTimestamp.getTime(), 2000000 ); } else { diff --git a/types/generated-snapshot/experimental/index.d.ts b/types/generated-snapshot/experimental/index.d.ts index 87d7dc1376f..05ec7b8797e 100755 --- a/types/generated-snapshot/experimental/index.d.ts +++ b/types/generated-snapshot/experimental/index.d.ts @@ -2409,7 +2409,7 @@ interface Queue { interface QueueSendMetrics { backlogCount: number; backlogBytes: number; - oldestMessageTimestamp: number; + oldestMessageTimestamp?: Date; } interface QueueSendMetadata { metrics: QueueSendMetrics; @@ -2420,7 +2420,7 @@ interface QueueSendResponse { interface QueueSendBatchMetrics { backlogCount: number; backlogBytes: number; - oldestMessageTimestamp: number; + oldestMessageTimestamp?: Date; } interface QueueSendBatchMetadata { metrics: QueueSendBatchMetrics; @@ -2443,12 +2443,12 @@ interface MessageSendRequest { interface QueueMetrics { backlogCount: number; backlogBytes: number; - oldestMessageTimestamp: number; + oldestMessageTimestamp?: Date; } interface MessageBatchMetrics { backlogCount: number; backlogBytes: number; - oldestMessageTimestamp: number; + oldestMessageTimestamp?: Date; } interface MessageBatchMetadata { metrics: MessageBatchMetrics; diff --git a/types/generated-snapshot/experimental/index.ts b/types/generated-snapshot/experimental/index.ts index d52a5c56a9b..c1553c04e30 100755 --- a/types/generated-snapshot/experimental/index.ts +++ b/types/generated-snapshot/experimental/index.ts @@ -2412,7 +2412,7 @@ export interface Queue { export interface QueueSendMetrics { backlogCount: number; backlogBytes: number; - oldestMessageTimestamp: number; + oldestMessageTimestamp?: Date; } export interface QueueSendMetadata { metrics: QueueSendMetrics; @@ -2423,7 +2423,7 @@ export interface QueueSendResponse { export interface QueueSendBatchMetrics { backlogCount: number; backlogBytes: number; - oldestMessageTimestamp: number; + oldestMessageTimestamp?: Date; } export interface QueueSendBatchMetadata { metrics: QueueSendBatchMetrics; @@ -2446,12 +2446,12 @@ export interface MessageSendRequest { export interface QueueMetrics { backlogCount: number; backlogBytes: number; - oldestMessageTimestamp: number; + oldestMessageTimestamp?: Date; } export interface MessageBatchMetrics { backlogCount: number; backlogBytes: number; - oldestMessageTimestamp: number; + oldestMessageTimestamp?: Date; } export interface MessageBatchMetadata { metrics: MessageBatchMetrics;