Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/workerd/api/http.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2323,8 +2323,10 @@ jsg::Promise<void> Fetcher::delete_(jsg::Lock& js, kj::String url) {
return throwOnError(js, "DELETE", fetchImpl(js, JSG_THIS, kj::mv(url), kj::mv(subInit)));
}

jsg::Promise<Fetcher::QueueResult> Fetcher::queue(
jsg::Lock& js, kj::String queueName, kj::Array<ServiceBindingQueueMessage> messages) {
jsg::Promise<Fetcher::QueueResult> Fetcher::queue(jsg::Lock& js,
kj::String queueName,
kj::Array<ServiceBindingQueueMessage> messages,
jsg::Optional<MessageBatchMetadata> metadata) {
auto& ioContext = IoContext::current();

auto encodedMessages = kj::heapArrayBuilder<IncomingQueueMessage>(messages.size());
Expand Down Expand Up @@ -2357,6 +2359,7 @@ jsg::Promise<Fetcher::QueueResult> Fetcher::queue(
auto event = kj::refcounted<api::QueueCustomEvent>(QueueEvent::Params{
.queueName = kj::mv(queueName),
.messages = encodedMessages.finish(),
.metadata = kj::mv(metadata).orDefault({}),
});

auto eventRef =
Expand Down
8 changes: 5 additions & 3 deletions src/workerd/api/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,10 @@ class Fetcher: public JsRpcClientProvider {
JSG_STRUCT(outcome, ackAll, retryBatch, explicitAcks, retryMessages);
};

jsg::Promise<QueueResult> queue(
jsg::Lock& js, kj::String queueName, kj::Array<ServiceBindingQueueMessage> messages);
jsg::Promise<QueueResult> queue(jsg::Lock& js,
kj::String queueName,
kj::Array<ServiceBindingQueueMessage> messages,
jsg::Optional<MessageBatchMetadata> metadata);

struct ScheduledOptions {
jsg::Optional<kj::Date> scheduledTime;
Expand Down Expand Up @@ -446,7 +448,7 @@ class Fetcher: public JsRpcClientProvider {
) & {
fetch(input: RequestInfo | URL, init?: RequestInit): Promise<Response>;
connect(address: SocketAddress | string, options?: SocketOptions): Socket;
queue(queueName: string, messages: ServiceBindingQueueMessage[]): Promise<FetcherQueueResult>;
queue(queueName: string, messages: ServiceBindingQueueMessage[], metadata?: MessageBatchMetadata): Promise<FetcherQueueResult>;
scheduled(options?: FetcherScheduledOptions): Promise<FetcherScheduledResult>;
});
} else {
Expand Down
20 changes: 20 additions & 0 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -477,11 +477,23 @@ QueueEvent::QueueEvent(
messagesBuilder.add(js.alloc<QueueMessage>(js, incoming[i], result));
}
messages = messagesBuilder.finish();

// Extract metadata. If the sender didn't set the field, capnp defaults all values to zero.
auto m = params.getMetadata().getMetrics();
metadata = MessageBatchMetadata{
.metrics =
MessageBatchMetrics{
.backlogCount = m.getBacklogCount(),
.backlogBytes = m.getBacklogBytes(),
.oldestMessageTimestamp = m.getOldestMessageTimestamp(),
},
};
}

QueueEvent::QueueEvent(jsg::Lock& js, Params params, IoPtr<QueueEventResult> result)
: ExtendableEvent("queue"),
queueName(kj::mv(params.queueName)),
metadata(kj::mv(params.metadata)),
result(result) {
auto messagesBuilder = kj::heapArrayBuilder<jsg::Ref<QueueMessage>>(params.messages.size());
for (auto i: kj::indices(params.messages)) {
Expand Down Expand Up @@ -774,6 +786,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEvent::sendRpc(
KJ_CASE_ONEOF(p, rpc::EventDispatcher::QueueParams::Reader) {
req.setQueueName(p.getQueueName());
req.setMessages(p.getMessages());
req.setMetadata(p.getMetadata());
}
KJ_CASE_ONEOF(p, QueueEvent::Params) {
req.setQueueName(p.queueName);
Expand All @@ -787,6 +800,13 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEvent::sendRpc(
}
messages[i].setAttempts(p.messages[i].attempts);
}
{
auto metadataBuilder = req.initMetadata();
auto metricsBuilder = metadataBuilder.initMetrics();
metricsBuilder.setBacklogCount(p.metadata.metrics.backlogCount);
metricsBuilder.setBacklogBytes(p.metadata.metrics.backlogBytes);
metricsBuilder.setOldestMessageTimestamp(p.metadata.metrics.oldestMessageTimestamp);
}
}
}

Expand Down
70 changes: 59 additions & 11 deletions src/workerd/api/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,22 @@ class WorkerQueue: public jsg::Object {

// Event handler types

// Metadata delivered with a message batch in the queue() handler

struct MessageBatchMetrics {
double backlogCount;
double backlogBytes;
double oldestMessageTimestamp;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you buy my argument on #6246 (comment), then this should be a Date here too. As well as in any equivalent new data types introduced in #6354.

Although I'm not totally sure why we're using a different struct here than on the "metrics" endpoint in the first place. Are we worried that eventually we'll end up exposing different metrics from each place?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed with the comment! All of these changes are behind the experimental flag right now. Would it be alright if I modified all of them to use Date in a follow-up PR after #6354 is in?

That was my thought process, although I imagined they would likely stay in sync, I didn't want to permanently tie all of the metrics types together.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's totally fine, thanks!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the follow-up PR: #6445

JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp);
JSG_STRUCT_TS_OVERRIDE(MessageBatchMetrics);
};

struct MessageBatchMetadata {
MessageBatchMetrics metrics;
JSG_STRUCT(metrics);
JSG_STRUCT_TS_OVERRIDE(MessageBatchMetadata);
};

// Types for other workers passing messages into and responses out of a queue handler.

struct IncomingQueueMessage {
Expand Down Expand Up @@ -235,6 +251,7 @@ class QueueEvent final: public ExtendableEvent {
struct Params {
kj::String queueName;
kj::Array<IncomingQueueMessage> messages;
MessageBatchMetadata metadata;
};

explicit QueueEvent(jsg::Lock& js,
Expand All @@ -250,30 +267,45 @@ class QueueEvent final: public ExtendableEvent {
kj::StringPtr getQueueName() {
return queueName;
}
MessageBatchMetadata getMetadata() {
return metadata;
}

void retryAll(jsg::Optional<QueueRetryOptions> options);
void ackAll();

JSG_RESOURCE_TYPE(QueueEvent) {
JSG_RESOURCE_TYPE(QueueEvent, CompatibilityFlags::Reader flags) {
JSG_INHERIT(ExtendableEvent);

JSG_LAZY_READONLY_INSTANCE_PROPERTY(messages, getMessages);
JSG_READONLY_INSTANCE_PROPERTY(queue, getQueueName);

if (flags.getWorkerdExperimental()) {
JSG_READONLY_INSTANCE_PROPERTY(metadata, getMetadata);
}

JSG_METHOD(retryAll);
JSG_METHOD(ackAll);

JSG_TS_ROOT();
JSG_TS_OVERRIDE(QueueEvent<Body = unknown> {
readonly messages: readonly Message<Body>[];
});
if (flags.getWorkerdExperimental()) {
JSG_TS_OVERRIDE(QueueEvent<Body = unknown> {
readonly messages: readonly Message<Body>[];
readonly metadata: MessageBatchMetadata;
});
} else {
JSG_TS_OVERRIDE(QueueEvent<Body = unknown> {
readonly messages: readonly Message<Body>[];
});
}
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
for (auto& message: messages) {
tracker.trackField("message", message);
}
tracker.trackField("queueName", queueName);
tracker.trackFieldWithSize("metadata", sizeof(MessageBatchMetadata));
tracker.trackFieldWithSize("IoPtr<QueueEventResult>", sizeof(IoPtr<QueueEventResult>));
}

Expand All @@ -297,6 +329,7 @@ class QueueEvent final: public ExtendableEvent {
// array to avoid one intermediate copy?
kj::Array<jsg::Ref<QueueMessage>> messages;
kj::String queueName;
MessageBatchMetadata metadata;
IoPtr<QueueEventResult> result;
CompletionStatus completionStatus = Incomplete{};

Expand All @@ -316,24 +349,38 @@ class QueueController final: public jsg::Object {
kj::StringPtr getQueueName() {
return event->getQueueName();
}
MessageBatchMetadata getMetadata() {
return event->getMetadata();
}
void retryAll(jsg::Optional<QueueRetryOptions> options) {
event->retryAll(options);
}
void ackAll() {
event->ackAll();
}

JSG_RESOURCE_TYPE(QueueController) {
JSG_RESOURCE_TYPE(QueueController, CompatibilityFlags::Reader flags) {
JSG_READONLY_INSTANCE_PROPERTY(messages, getMessages);
JSG_READONLY_INSTANCE_PROPERTY(queue, getQueueName);

if (flags.getWorkerdExperimental()) {
JSG_READONLY_INSTANCE_PROPERTY(metadata, getMetadata);
}

JSG_METHOD(retryAll);
JSG_METHOD(ackAll);

JSG_TS_ROOT();
JSG_TS_OVERRIDE(MessageBatch<Body = unknown> {
readonly messages: readonly Message<Body>[];
});
if (flags.getWorkerdExperimental()) {
JSG_TS_OVERRIDE(MessageBatch<Body = unknown> {
readonly messages: readonly Message<Body>[];
readonly metadata: MessageBatchMetadata;
});
} else {
JSG_TS_OVERRIDE(MessageBatch<Body = unknown> {
readonly messages: readonly Message<Body>[];
});
}
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
Expand Down Expand Up @@ -400,8 +447,9 @@ class QueueCustomEvent final: public WorkerInterface::CustomEvent, public kj::Re

#define EW_QUEUE_ISOLATE_TYPES \
api::WorkerQueue, api::WorkerQueue::SendOptions, api::WorkerQueue::SendBatchOptions, \
api::WorkerQueue::MessageSendRequest, api::WorkerQueue::Metrics, api::IncomingQueueMessage, \
api::QueueRetryBatch, api::QueueRetryMessage, api::QueueResponse, api::QueueRetryOptions, \
api::QueueMessage, api::QueueEvent, api::QueueController, api::QueueExportedHandler
api::WorkerQueue::MessageSendRequest, api::WorkerQueue::Metrics, api::MessageBatchMetrics, \
api::MessageBatchMetadata, api::IncomingQueueMessage, api::QueueRetryBatch, \
api::QueueRetryMessage, api::QueueResponse, api::QueueRetryOptions, api::QueueMessage, \
api::QueueEvent, api::QueueController, api::QueueExportedHandler

} // namespace workerd::api
6 changes: 6 additions & 0 deletions src/workerd/api/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ wd_test(
data = ["queue-metrics-test.js"],
)

wd_test(
src = "queue-metadata-test.wd-test",
args = ["--experimental"],
data = ["queue-metadata-test.js"],
)

wd_test(
src = "r2-test.wd-test",
args = ["--experimental"],
Expand Down
76 changes: 76 additions & 0 deletions src/workerd/api/tests/queue-metadata-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2026 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import assert from 'node:assert';

export default {
async queue(batch, env, ctx) {
const flagEnabled = env.METADATA_FLAG;

if (!flagEnabled) {
// Flag disabled → metadata property should not exist
assert.strictEqual(batch.metadata, undefined);
batch.ackAll();
return;
}

// Flag enabled → metadata should always be present
assert.ok(batch.metadata, 'Expected batch.metadata to be defined');
assert.ok(
batch.metadata.metrics,
'Expected batch.metadata.metrics to be defined'
);

if (
batch.metadata.metrics.backlogCount === 0 &&
batch.metadata.metrics.backlogBytes === 0 &&
batch.metadata.metrics.oldestMessageTimestamp === 0
) {
// If metadata is omitted → all values default to zero
batch.ackAll();
return;
}

// Explicit metadata path
assert.strictEqual(batch.metadata.metrics.backlogCount, 100);
assert.strictEqual(batch.metadata.metrics.backlogBytes, 2048);
assert.strictEqual(batch.metadata.metrics.oldestMessageTimestamp, 1000000);
batch.ackAll();
},

async test(ctrl, env, ctx) {
const flagEnabled = env.METADATA_FLAG;
const timestamp = new Date();

if (flagEnabled) {
const response1 = await env.SERVICE.queue(
'test-queue',
[{ id: '0', timestamp, body: 'hello', attempts: 1 }],
{
metrics: {
backlogCount: 100,
backlogBytes: 2048,
oldestMessageTimestamp: 1000000,
},
}
);
assert.strictEqual(response1.outcome, 'ok');
assert(response1.ackAll);

// Test with omitted metadata
const response2 = await env.SERVICE.queue('test-queue', [
{ id: '1', timestamp, body: 'world', attempts: 1 },
]);
assert.strictEqual(response2.outcome, 'ok');
assert(response2.ackAll);
} else {
// Flag disabled → handler still works, metadata not visible
const response = await env.SERVICE.queue('test-queue', [
{ id: '0', timestamp, body: 'foobar', attempts: 1 },
]);
assert.strictEqual(response.outcome, 'ok');
assert(response.ackAll);
}
},
};
30 changes: 30 additions & 0 deletions src/workerd/api/tests/queue-metadata-test.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "queue-metadata-test",
worker = (
modules = [
( name = "worker", esModule = embed "queue-metadata-test.js" )
],
bindings = [
( name = "SERVICE", service = "queue-metadata-test" ),
( name = "METADATA_FLAG", json = "true" ),
],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "experimental"],
)
),
( name = "queue-metadata-disabled-test",
worker = (
modules = [
( name = "worker-disabled", esModule = embed "queue-metadata-test.js" )
],
bindings = [
( name = "SERVICE", service = "queue-metadata-disabled-test" ),
( name = "METADATA_FLAG", json = "false" ),
],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers"],
)
),
],
);
20 changes: 18 additions & 2 deletions src/workerd/io/worker-interface.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,20 @@ struct QueueResponse @0x90e98932c0bfc0de {
# List of retry options for messages that were explicitly marked for retry.
}

struct MessageBatchMetrics {
backlogCount @0 :Float64;
# Number of messages remaining in the queue backlog.
backlogBytes @1 :Float64;
# Total bytes of messages remaining in the queue backlog.
oldestMessageTimestamp @2 :Float64;
# Timestamp (ms since epoch) of the oldest message in the queue.
}

struct MessageBatchMetadata {
metrics @0 :MessageBatchMetrics;
# Best effort queue metrics at the time the batch was dispatched.
}

struct HibernatableWebSocketEventMessage {
payload :union {
text @0 :Text;
Expand Down Expand Up @@ -762,11 +776,13 @@ interface EventDispatcher @0xf20697475ec1752d {
# It would be cleaner to handle that inside the implementation so we could mark the entire
# interface (and file) with allowCancellation.

queue @8 (messages :List(QueueMessage), queueName :Text) -> (result :QueueResponse)
queue @8 (messages :List(QueueMessage), queueName :Text, metadata :MessageBatchMetadata)
-> (result :QueueResponse)
$Cxx.allowCancellation;
# Delivers a batch of queue messages to a worker's queue event handler. Returns information about
# the success of the batch, including which messages should be considered acknowledged and which
# should be retried.
# should be retried. The optional metadata field carries queue metrics at the time the batch was
# dispatched; it is safe for the sender to omit this field (the consumer sees it as absent).

jsRpcSession @9 () -> (topLevel :JsRpcTarget) $Cxx.allowCancellation;
# Opens a JS rpc "session". The call does not return until the session is complete.
Expand Down
Loading
Loading