diff --git a/src/workerd/api/http.c++ b/src/workerd/api/http.c++ index ca5e4dfd6fa..8232f682864 100644 --- a/src/workerd/api/http.c++ +++ b/src/workerd/api/http.c++ @@ -2323,8 +2323,10 @@ jsg::Promise Fetcher::delete_(jsg::Lock& js, kj::String url) { return throwOnError(js, "DELETE", fetchImpl(js, JSG_THIS, kj::mv(url), kj::mv(subInit))); } -jsg::Promise Fetcher::queue( - jsg::Lock& js, kj::String queueName, kj::Array messages) { +jsg::Promise Fetcher::queue(jsg::Lock& js, + kj::String queueName, + kj::Array messages, + jsg::Optional metadata) { auto& ioContext = IoContext::current(); auto encodedMessages = kj::heapArrayBuilder(messages.size()); @@ -2357,6 +2359,7 @@ jsg::Promise Fetcher::queue( auto event = kj::refcounted(QueueEvent::Params{ .queueName = kj::mv(queueName), .messages = encodedMessages.finish(), + .metadata = kj::mv(metadata).orDefault({}), }); auto eventRef = diff --git a/src/workerd/api/http.h b/src/workerd/api/http.h index 7930d44e2a2..e9cb6c1eaa1 100644 --- a/src/workerd/api/http.h +++ b/src/workerd/api/http.h @@ -392,8 +392,10 @@ class Fetcher: public JsRpcClientProvider { JSG_STRUCT(outcome, ackAll, retryBatch, explicitAcks, retryMessages); }; - jsg::Promise queue( - jsg::Lock& js, kj::String queueName, kj::Array messages); + jsg::Promise queue(jsg::Lock& js, + kj::String queueName, + kj::Array messages, + jsg::Optional metadata); struct ScheduledOptions { jsg::Optional scheduledTime; @@ -446,7 +448,7 @@ class Fetcher: public JsRpcClientProvider { ) & { fetch(input: RequestInfo | URL, init?: RequestInit): Promise; connect(address: SocketAddress | string, options?: SocketOptions): Socket; - queue(queueName: string, messages: ServiceBindingQueueMessage[]): Promise; + queue(queueName: string, messages: ServiceBindingQueueMessage[], metadata?: MessageBatchMetadata): Promise; scheduled(options?: FetcherScheduledOptions): Promise; }); } else { diff --git a/src/workerd/api/queue.c++ b/src/workerd/api/queue.c++ index 9cc76c906f0..b662670bf43 100644 --- a/src/workerd/api/queue.c++ +++ b/src/workerd/api/queue.c++ @@ -477,11 +477,23 @@ QueueEvent::QueueEvent( messagesBuilder.add(js.alloc(js, incoming[i], result)); } messages = messagesBuilder.finish(); + + // Extract metadata. If the sender didn't set the field, capnp defaults all values to zero. + auto m = params.getMetadata().getMetrics(); + metadata = MessageBatchMetadata{ + .metrics = + MessageBatchMetrics{ + .backlogCount = m.getBacklogCount(), + .backlogBytes = m.getBacklogBytes(), + .oldestMessageTimestamp = m.getOldestMessageTimestamp(), + }, + }; } QueueEvent::QueueEvent(jsg::Lock& js, Params params, IoPtr result) : ExtendableEvent("queue"), queueName(kj::mv(params.queueName)), + metadata(kj::mv(params.metadata)), result(result) { auto messagesBuilder = kj::heapArrayBuilder>(params.messages.size()); for (auto i: kj::indices(params.messages)) { @@ -774,6 +786,7 @@ kj::Promise QueueCustomEvent::sendRpc( KJ_CASE_ONEOF(p, rpc::EventDispatcher::QueueParams::Reader) { req.setQueueName(p.getQueueName()); req.setMessages(p.getMessages()); + req.setMetadata(p.getMetadata()); } KJ_CASE_ONEOF(p, QueueEvent::Params) { req.setQueueName(p.queueName); @@ -787,6 +800,13 @@ kj::Promise QueueCustomEvent::sendRpc( } messages[i].setAttempts(p.messages[i].attempts); } + { + auto metadataBuilder = req.initMetadata(); + auto metricsBuilder = metadataBuilder.initMetrics(); + metricsBuilder.setBacklogCount(p.metadata.metrics.backlogCount); + metricsBuilder.setBacklogBytes(p.metadata.metrics.backlogBytes); + metricsBuilder.setOldestMessageTimestamp(p.metadata.metrics.oldestMessageTimestamp); + } } } diff --git a/src/workerd/api/queue.h b/src/workerd/api/queue.h index 3b785b1b511..f0ca9f0b49b 100644 --- a/src/workerd/api/queue.h +++ b/src/workerd/api/queue.h @@ -116,6 +116,22 @@ class WorkerQueue: public jsg::Object { // Event handler types +// Metadata delivered with a message batch in the queue() handler + +struct MessageBatchMetrics { + double backlogCount; + double backlogBytes; + double oldestMessageTimestamp; + JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp); + JSG_STRUCT_TS_OVERRIDE(MessageBatchMetrics); +}; + +struct MessageBatchMetadata { + MessageBatchMetrics metrics; + JSG_STRUCT(metrics); + JSG_STRUCT_TS_OVERRIDE(MessageBatchMetadata); +}; + // Types for other workers passing messages into and responses out of a queue handler. struct IncomingQueueMessage { @@ -235,6 +251,7 @@ class QueueEvent final: public ExtendableEvent { struct Params { kj::String queueName; kj::Array messages; + MessageBatchMetadata metadata; }; explicit QueueEvent(jsg::Lock& js, @@ -250,23 +267,37 @@ class QueueEvent final: public ExtendableEvent { kj::StringPtr getQueueName() { return queueName; } + MessageBatchMetadata getMetadata() { + return metadata; + } void retryAll(jsg::Optional options); void ackAll(); - JSG_RESOURCE_TYPE(QueueEvent) { + JSG_RESOURCE_TYPE(QueueEvent, CompatibilityFlags::Reader flags) { JSG_INHERIT(ExtendableEvent); JSG_LAZY_READONLY_INSTANCE_PROPERTY(messages, getMessages); JSG_READONLY_INSTANCE_PROPERTY(queue, getQueueName); + if (flags.getWorkerdExperimental()) { + JSG_READONLY_INSTANCE_PROPERTY(metadata, getMetadata); + } + JSG_METHOD(retryAll); JSG_METHOD(ackAll); JSG_TS_ROOT(); - JSG_TS_OVERRIDE(QueueEvent { - readonly messages: readonly Message[]; - }); + if (flags.getWorkerdExperimental()) { + JSG_TS_OVERRIDE(QueueEvent { + readonly messages: readonly Message[]; + readonly metadata: MessageBatchMetadata; + }); + } else { + JSG_TS_OVERRIDE(QueueEvent { + readonly messages: readonly Message[]; + }); + } } void visitForMemoryInfo(jsg::MemoryTracker& tracker) const { @@ -274,6 +305,7 @@ class QueueEvent final: public ExtendableEvent { tracker.trackField("message", message); } tracker.trackField("queueName", queueName); + tracker.trackFieldWithSize("metadata", sizeof(MessageBatchMetadata)); tracker.trackFieldWithSize("IoPtr", sizeof(IoPtr)); } @@ -297,6 +329,7 @@ class QueueEvent final: public ExtendableEvent { // array to avoid one intermediate copy? kj::Array> messages; kj::String queueName; + MessageBatchMetadata metadata; IoPtr result; CompletionStatus completionStatus = Incomplete{}; @@ -316,6 +349,9 @@ class QueueController final: public jsg::Object { kj::StringPtr getQueueName() { return event->getQueueName(); } + MessageBatchMetadata getMetadata() { + return event->getMetadata(); + } void retryAll(jsg::Optional options) { event->retryAll(options); } @@ -323,17 +359,28 @@ class QueueController final: public jsg::Object { event->ackAll(); } - JSG_RESOURCE_TYPE(QueueController) { + JSG_RESOURCE_TYPE(QueueController, CompatibilityFlags::Reader flags) { JSG_READONLY_INSTANCE_PROPERTY(messages, getMessages); JSG_READONLY_INSTANCE_PROPERTY(queue, getQueueName); + if (flags.getWorkerdExperimental()) { + JSG_READONLY_INSTANCE_PROPERTY(metadata, getMetadata); + } + JSG_METHOD(retryAll); JSG_METHOD(ackAll); JSG_TS_ROOT(); - JSG_TS_OVERRIDE(MessageBatch { - readonly messages: readonly Message[]; - }); + if (flags.getWorkerdExperimental()) { + JSG_TS_OVERRIDE(MessageBatch { + readonly messages: readonly Message[]; + readonly metadata: MessageBatchMetadata; + }); + } else { + JSG_TS_OVERRIDE(MessageBatch { + readonly messages: readonly Message[]; + }); + } } void visitForMemoryInfo(jsg::MemoryTracker& tracker) const { @@ -400,8 +447,9 @@ class QueueCustomEvent final: public WorkerInterface::CustomEvent, public kj::Re #define EW_QUEUE_ISOLATE_TYPES \ api::WorkerQueue, api::WorkerQueue::SendOptions, api::WorkerQueue::SendBatchOptions, \ - api::WorkerQueue::MessageSendRequest, api::WorkerQueue::Metrics, api::IncomingQueueMessage, \ - api::QueueRetryBatch, api::QueueRetryMessage, api::QueueResponse, api::QueueRetryOptions, \ - api::QueueMessage, api::QueueEvent, api::QueueController, api::QueueExportedHandler + api::WorkerQueue::MessageSendRequest, api::WorkerQueue::Metrics, api::MessageBatchMetrics, \ + api::MessageBatchMetadata, api::IncomingQueueMessage, api::QueueRetryBatch, \ + api::QueueRetryMessage, api::QueueResponse, api::QueueRetryOptions, api::QueueMessage, \ + api::QueueEvent, api::QueueController, api::QueueExportedHandler } // namespace workerd::api diff --git a/src/workerd/api/tests/BUILD.bazel b/src/workerd/api/tests/BUILD.bazel index cd3c475d771..e89fa4d4901 100644 --- a/src/workerd/api/tests/BUILD.bazel +++ b/src/workerd/api/tests/BUILD.bazel @@ -158,6 +158,12 @@ wd_test( data = ["queue-metrics-test.js"], ) +wd_test( + src = "queue-metadata-test.wd-test", + args = ["--experimental"], + data = ["queue-metadata-test.js"], +) + wd_test( src = "r2-test.wd-test", args = ["--experimental"], diff --git a/src/workerd/api/tests/queue-metadata-test.js b/src/workerd/api/tests/queue-metadata-test.js new file mode 100644 index 00000000000..6e7ebd60185 --- /dev/null +++ b/src/workerd/api/tests/queue-metadata-test.js @@ -0,0 +1,76 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +import assert from 'node:assert'; + +export default { + async queue(batch, env, ctx) { + const flagEnabled = env.METADATA_FLAG; + + if (!flagEnabled) { + // Flag disabled → metadata property should not exist + assert.strictEqual(batch.metadata, undefined); + batch.ackAll(); + return; + } + + // Flag enabled → metadata should always be present + assert.ok(batch.metadata, 'Expected batch.metadata to be defined'); + assert.ok( + batch.metadata.metrics, + 'Expected batch.metadata.metrics to be defined' + ); + + if ( + batch.metadata.metrics.backlogCount === 0 && + batch.metadata.metrics.backlogBytes === 0 && + batch.metadata.metrics.oldestMessageTimestamp === 0 + ) { + // If metadata is omitted → all values default to zero + batch.ackAll(); + return; + } + + // Explicit metadata path + assert.strictEqual(batch.metadata.metrics.backlogCount, 100); + assert.strictEqual(batch.metadata.metrics.backlogBytes, 2048); + assert.strictEqual(batch.metadata.metrics.oldestMessageTimestamp, 1000000); + batch.ackAll(); + }, + + async test(ctrl, env, ctx) { + const flagEnabled = env.METADATA_FLAG; + const timestamp = new Date(); + + if (flagEnabled) { + const response1 = await env.SERVICE.queue( + 'test-queue', + [{ id: '0', timestamp, body: 'hello', attempts: 1 }], + { + metrics: { + backlogCount: 100, + backlogBytes: 2048, + oldestMessageTimestamp: 1000000, + }, + } + ); + assert.strictEqual(response1.outcome, 'ok'); + assert(response1.ackAll); + + // Test with omitted metadata + const response2 = await env.SERVICE.queue('test-queue', [ + { id: '1', timestamp, body: 'world', attempts: 1 }, + ]); + assert.strictEqual(response2.outcome, 'ok'); + assert(response2.ackAll); + } else { + // Flag disabled → handler still works, metadata not visible + const response = await env.SERVICE.queue('test-queue', [ + { id: '0', timestamp, body: 'foobar', attempts: 1 }, + ]); + assert.strictEqual(response.outcome, 'ok'); + assert(response.ackAll); + } + }, +}; diff --git a/src/workerd/api/tests/queue-metadata-test.wd-test b/src/workerd/api/tests/queue-metadata-test.wd-test new file mode 100644 index 00000000000..3ea4de985b2 --- /dev/null +++ b/src/workerd/api/tests/queue-metadata-test.wd-test @@ -0,0 +1,30 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const unitTests :Workerd.Config = ( + services = [ + ( name = "queue-metadata-test", + worker = ( + modules = [ + ( name = "worker", esModule = embed "queue-metadata-test.js" ) + ], + bindings = [ + ( name = "SERVICE", service = "queue-metadata-test" ), + ( name = "METADATA_FLAG", json = "true" ), + ], + compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "experimental"], + ) + ), + ( name = "queue-metadata-disabled-test", + worker = ( + modules = [ + ( name = "worker-disabled", esModule = embed "queue-metadata-test.js" ) + ], + bindings = [ + ( name = "SERVICE", service = "queue-metadata-disabled-test" ), + ( name = "METADATA_FLAG", json = "false" ), + ], + compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers"], + ) + ), + ], +); diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index 07b6c82dbde..6d70a4a90e5 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -373,6 +373,20 @@ struct QueueResponse @0x90e98932c0bfc0de { # List of retry options for messages that were explicitly marked for retry. } +struct MessageBatchMetrics { + backlogCount @0 :Float64; + # Number of messages remaining in the queue backlog. + backlogBytes @1 :Float64; + # Total bytes of messages remaining in the queue backlog. + oldestMessageTimestamp @2 :Float64; + # Timestamp (ms since epoch) of the oldest message in the queue. +} + +struct MessageBatchMetadata { + metrics @0 :MessageBatchMetrics; + # Best effort queue metrics at the time the batch was dispatched. +} + struct HibernatableWebSocketEventMessage { payload :union { text @0 :Text; @@ -762,11 +776,13 @@ interface EventDispatcher @0xf20697475ec1752d { # It would be cleaner to handle that inside the implementation so we could mark the entire # interface (and file) with allowCancellation. - queue @8 (messages :List(QueueMessage), queueName :Text) -> (result :QueueResponse) + queue @8 (messages :List(QueueMessage), queueName :Text, metadata :MessageBatchMetadata) + -> (result :QueueResponse) $Cxx.allowCancellation; # Delivers a batch of queue messages to a worker's queue event handler. Returns information about # the success of the batch, including which messages should be considered acknowledged and which - # should be retried. + # should be retried. The optional metadata field carries queue metrics at the time the batch was + # dispatched; it is safe for the sender to omit this field (the consumer sees it as absent). jsRpcSession @9 () -> (topLevel :JsRpcTarget) $Cxx.allowCancellation; # Opens a JS rpc "session". The call does not return until the session is complete. diff --git a/types/generated-snapshot/experimental/index.d.ts b/types/generated-snapshot/experimental/index.d.ts index 8bc03203dbb..b479dd2ce4d 100755 --- a/types/generated-snapshot/experimental/index.d.ts +++ b/types/generated-snapshot/experimental/index.d.ts @@ -2214,6 +2214,7 @@ type Fetcher< queue( queueName: string, messages: ServiceBindingQueueMessage[], + metadata?: MessageBatchMetadata, ): Promise; scheduled(options?: FetcherScheduledOptions): Promise; }; @@ -2422,6 +2423,14 @@ interface QueueMetrics { backlogBytes: number; oldestMessageTimestamp: number; } +interface MessageBatchMetrics { + backlogCount: number; + backlogBytes: number; + oldestMessageTimestamp: number; +} +interface MessageBatchMetadata { + metrics: MessageBatchMetrics; +} interface QueueRetryBatch { retry: boolean; delaySeconds?: number; @@ -2444,12 +2453,14 @@ interface Message { interface QueueEvent extends ExtendableEvent { readonly messages: readonly Message[]; readonly queue: string; + readonly metadata: MessageBatchMetadata; retryAll(options?: QueueRetryOptions): void; ackAll(): void; } interface MessageBatch { readonly messages: readonly Message[]; readonly queue: string; + readonly metadata: MessageBatchMetadata; retryAll(options?: QueueRetryOptions): void; ackAll(): void; } diff --git a/types/generated-snapshot/experimental/index.ts b/types/generated-snapshot/experimental/index.ts index 87cb84253d9..87e33af7c9a 100755 --- a/types/generated-snapshot/experimental/index.ts +++ b/types/generated-snapshot/experimental/index.ts @@ -2217,6 +2217,7 @@ export type Fetcher< queue( queueName: string, messages: ServiceBindingQueueMessage[], + metadata?: MessageBatchMetadata, ): Promise; scheduled(options?: FetcherScheduledOptions): Promise; }; @@ -2425,6 +2426,14 @@ export interface QueueMetrics { backlogBytes: number; oldestMessageTimestamp: number; } +export interface MessageBatchMetrics { + backlogCount: number; + backlogBytes: number; + oldestMessageTimestamp: number; +} +export interface MessageBatchMetadata { + metrics: MessageBatchMetrics; +} export interface QueueRetryBatch { retry: boolean; delaySeconds?: number; @@ -2447,12 +2456,14 @@ export interface Message { export interface QueueEvent extends ExtendableEvent { readonly messages: readonly Message[]; readonly queue: string; + readonly metadata: MessageBatchMetadata; retryAll(options?: QueueRetryOptions): void; ackAll(): void; } export interface MessageBatch { readonly messages: readonly Message[]; readonly queue: string; + readonly metadata: MessageBatchMetadata; retryAll(options?: QueueRetryOptions): void; ackAll(): void; } diff --git a/types/test/types/rpc.ts b/types/test/types/rpc.ts index 21f45cb0e01..07a4d0346da 100644 --- a/types/test/types/rpc.ts +++ b/types/test/types/rpc.ts @@ -462,7 +462,8 @@ export default >{ expectTypeOf(env.RPC_SERVICE.queue).toEqualTypeOf< ( queueName: string, - messages: ServiceBindingQueueMessage[] + messages: ServiceBindingQueueMessage[], + metadata?: MessageBatchMetadata ) => Promise >(); expectTypeOf(env.RPC_SERVICE.scheduled).toEqualTypeOf<