Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 182 additions & 0 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,74 @@ kj::Promise<void> WorkerQueue::send(
.attach(context.registerPendingEvent());
};

jsg::Promise<WorkerQueue::SendResponse> WorkerQueue::sendWithResponse(jsg::Lock& js,
jsg::JsValue body,
jsg::Optional<SendOptions> options,
const jsg::TypeHandler<SendResponse>& responseHandler) {
auto& context = IoContext::current();

JSG_REQUIRE(!body.isUndefined(), TypeError, "Message body cannot be undefined");

auto headers = kj::HttpHeaders(context.getHeaderTable());
headers.set(kj::HttpHeaderId::CONTENT_TYPE, MimeType::OCTET_STREAM.toString());

kj::Maybe<kj::StringPtr> contentType;
KJ_IF_SOME(opts, options) {
KJ_IF_SOME(type, opts.contentType) {
auto validatedType = validateContentType(type);
headers.addPtrPtr(HDR_MSG_FORMAT, validatedType);
contentType = validatedType;
}
KJ_IF_SOME(secs, opts.delaySeconds) {
headers.addPtr(HDR_MSG_DELAY, kj::str(secs));
}
}

Serialized serialized;
KJ_IF_SOME(type, contentType) {
serialized = serialize(js, body, type, SerializeArrayBufferBehavior::DEEP_COPY);
} else if (workerd::FeatureFlags::get(js).getQueuesJsonMessages()) {
headers.addPtrPtr("X-Msg-Fmt", IncomingQueueMessage::ContentType::JSON);
serialized = serialize(
js, body, IncomingQueueMessage::ContentType::JSON, SerializeArrayBufferBehavior::DEEP_COPY);
} else {
serialized = serializeV8(js, body);
}

auto client = context.getHttpClient(subrequestChannel, true, kj::none, "queue_send"_kjc);
auto req = client->request(
kj::HttpMethod::POST, "https://fake-host/message"_kjc, headers, serialized.data.size());

const auto& headerIds = context.getHeaderIds();
const auto exposeErrorCodes = workerd::FeatureFlags::get(js).getQueueExposeErrorCodes();

static constexpr auto handleSend = [](auto req, auto serialized, auto client, auto& headerIds,
bool exposeErrorCodes) -> kj::Promise<kj::String> {
co_await req.body->write(serialized.data);
auto response = co_await req.response;

if (exposeErrorCodes) {
JSG_REQUIRE(response.statusCode == 200, Error, buildQueueErrorMessage(response, headerIds));
} else {
JSG_REQUIRE(
response.statusCode == 200, Error, kj::str("Queue send failed: ", response.statusText));
}

auto responseBody = co_await response.body->readAllBytes();
co_return kj::str(responseBody.asChars());
};

auto promise =
handleSend(kj::mv(req), kj::mv(serialized), kj::mv(client), headerIds, exposeErrorCodes);

return context.awaitIo(
js, kj::mv(promise), [&responseHandler](jsg::Lock& js, kj::String text) -> SendResponse {
auto parsed = jsg::JsValue::fromJson(js, text);
return JSG_REQUIRE_NONNULL(
responseHandler.tryUnwrap(js, parsed), Error, "Failed to parse queue send response", text);
});
}

kj::Promise<void> WorkerQueue::sendBatch(jsg::Lock& js,
jsg::Sequence<MessageSendRequest> batch,
jsg::Optional<SendBatchOptions> options) {
Expand Down Expand Up @@ -394,6 +462,120 @@ jsg::Promise<WorkerQueue::Metrics> WorkerQueue::metrics(
});
}

jsg::Promise<WorkerQueue::SendBatchResponse> WorkerQueue::sendBatchWithResponse(jsg::Lock& js,
jsg::Sequence<MessageSendRequest> batch,
jsg::Optional<SendBatchOptions> options,
const jsg::TypeHandler<SendBatchResponse>& responseHandler) {
auto& context = IoContext::current();

JSG_REQUIRE(batch.size() > 0, TypeError, "sendBatch() requires at least one message");

size_t totalSize = 0;
size_t largestMessage = 0;
auto messageCount = batch.size();
auto builder = kj::heapArrayBuilder<SerializedWithOptions>(messageCount);
for (auto& message: batch) {
auto body = message.body.getHandle(js);
JSG_REQUIRE(!body.isUndefined(), TypeError, "Message body cannot be undefined");

SerializedWithOptions item;
KJ_IF_SOME(secs, message.delaySeconds) {
item.delaySeconds = secs;
}

KJ_IF_SOME(contentType, message.contentType) {
item.contentType = validateContentType(contentType);
item.body = serialize(js, body, contentType, SerializeArrayBufferBehavior::SHALLOW_REFERENCE);
} else if (workerd::FeatureFlags::get(js).getQueuesJsonMessages()) {
item.contentType = IncomingQueueMessage::ContentType::JSON;
item.body = serialize(js, body, IncomingQueueMessage::ContentType::JSON,
SerializeArrayBufferBehavior::SHALLOW_REFERENCE);
} else {
item.body = serializeV8(js, body);
}

builder.add(kj::mv(item));
totalSize += builder.back().body.data.size();
largestMessage = kj::max(largestMessage, builder.back().body.data.size());
}
auto serializedBodies = builder.finish();

auto estimatedSize = (totalSize + 2) / 3 * 4 + messageCount * 64 + 32;
kj::Vector<char> bodyBuilder(estimatedSize);
bodyBuilder.addAll("{\"messages\":["_kj);
for (size_t i = 0; i < messageCount; ++i) {
bodyBuilder.addAll("{\"body\":\""_kj);
bodyBuilder.addAll(kj::encodeBase64(serializedBodies[i].body.data));
bodyBuilder.add('"');

KJ_IF_SOME(contentType, serializedBodies[i].contentType) {
bodyBuilder.addAll(",\"contentType\":\""_kj);
bodyBuilder.addAll(contentType);
bodyBuilder.add('"');
}

KJ_IF_SOME(delaySecs, serializedBodies[i].delaySeconds) {
bodyBuilder.addAll(",\"delaySecs\": "_kj);
bodyBuilder.addAll(kj::str(delaySecs));
}

bodyBuilder.addAll("}"_kj);
if (i < messageCount - 1) {
bodyBuilder.add(',');
}
}
bodyBuilder.addAll("]}"_kj);
bodyBuilder.add('\0');
KJ_DASSERT(bodyBuilder.size() <= estimatedSize);
kj::String body(bodyBuilder.releaseAsArray());
KJ_DASSERT(jsg::JsValue::fromJson(js, body).isObject());

auto client = context.getHttpClient(subrequestChannel, true, kj::none, "queue_send"_kjc);

auto headers = kj::HttpHeaders(context.getHeaderTable());
headers.addPtr("CF-Queue-Batch-Count"_kj, kj::str(messageCount));
headers.addPtr("CF-Queue-Batch-Bytes"_kj, kj::str(totalSize));
headers.addPtr("CF-Queue-Largest-Msg"_kj, kj::str(largestMessage));
headers.set(kj::HttpHeaderId::CONTENT_TYPE, MimeType::JSON.toString());

KJ_IF_SOME(opts, options) {
KJ_IF_SOME(secs, opts.delaySeconds) {
headers.addPtr(HDR_MSG_DELAY, kj::str(secs));
}
}

auto req =
client->request(kj::HttpMethod::POST, "https://fake-host/batch"_kjc, headers, body.size());

const auto& headerIds = context.getHeaderIds();
const auto exposeErrorCodes = workerd::FeatureFlags::get(js).getQueueExposeErrorCodes();
static constexpr auto handleWrite = [](auto req, auto body, auto client, auto& headerIds,
bool exposeErrorCodes) -> kj::Promise<kj::String> {
co_await req.body->write(body.asBytes());
auto response = co_await req.response;

if (exposeErrorCodes) {
JSG_REQUIRE(response.statusCode == 200, Error, buildQueueErrorMessage(response, headerIds));
} else {
JSG_REQUIRE(response.statusCode == 200, Error,
kj::str("Queue sendBatch failed: ", response.statusText));
}

auto responseBody = co_await response.body->readAllBytes();
co_return kj::str(responseBody.asChars());
};

auto promise =
handleWrite(kj::mv(req), kj::mv(body), kj::mv(client), headerIds, exposeErrorCodes);

return context.awaitIo(
js, kj::mv(promise), [&responseHandler](jsg::Lock& js, kj::String text) -> SendBatchResponse {
auto parsed = jsg::JsValue::fromJson(js, text);
return JSG_REQUIRE_NONNULL(
responseHandler.tryUnwrap(js, parsed), Error, "Failed to parse queue send response", text);
});
}

QueueMessage::QueueMessage(
jsg::Lock& js, rpc::QueueMessage::Reader message, IoPtr<QueueEventResult> result)
: id(kj::str(message.getId())),
Expand Down
66 changes: 61 additions & 5 deletions src/workerd/api/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,46 @@ class WorkerQueue: public jsg::Object {
JSG_STRUCT_TS_OVERRIDE(QueueMetrics);
};

struct SendMetrics {
double backlogCount;
double backlogBytes;
double oldestMessageTimestamp;
JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp);
JSG_STRUCT_TS_OVERRIDE(QueueSendMetrics);
};

struct SendMetadata {
SendMetrics metrics;
JSG_STRUCT(metrics);
JSG_STRUCT_TS_OVERRIDE(QueueSendMetadata);
};

struct SendResponse {
SendMetadata metadata;
JSG_STRUCT(metadata);
JSG_STRUCT_TS_OVERRIDE(QueueSendResponse);
};

struct SendBatchMetrics {
double backlogCount;
double backlogBytes;
double oldestMessageTimestamp;
JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp);
JSG_STRUCT_TS_OVERRIDE(QueueSendBatchMetrics);
};

struct SendBatchMetadata {
SendBatchMetrics metrics;
JSG_STRUCT(metrics);
JSG_STRUCT_TS_OVERRIDE(QueueSendBatchMetadata);
};

struct SendBatchResponse {
SendBatchMetadata metadata;
JSG_STRUCT(metadata);
JSG_STRUCT_TS_OVERRIDE(QueueSendBatchResponse);
};

struct SendOptions {
// TODO(soon): Support metadata.

Expand Down Expand Up @@ -77,26 +117,39 @@ class WorkerQueue: public jsg::Object {

kj::Promise<void> send(jsg::Lock& js, jsg::JsValue body, jsg::Optional<SendOptions> options);

jsg::Promise<SendResponse> sendWithResponse(jsg::Lock& js,
jsg::JsValue body,
jsg::Optional<SendOptions> options,
const jsg::TypeHandler<SendResponse>& responseHandler);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forget, did we decide to put send() / sendBatch() with responses behind a compat flag? To me this change looks non-breaking, and so does not necessarily need to be behind a compat flag. It'd certainly make implementation a lot simpler too?

Copy link
Copy Markdown
Contributor Author

@KennethRuan KennethRuan Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this here for posterity:
Based on discussions, we decided to put these changes under the experimental flag. After internal testing, we'll need to put in a PR to remove the experimental flag and also regenerate the production type files. We were imagining all the metrics changes can go in together, so this may be the easier approach for us.

Link to other metrics-related Queues PRs:
#6246
#6339

kj::Promise<void> sendBatch(jsg::Lock& js,
jsg::Sequence<MessageSendRequest> batch,
jsg::Optional<SendBatchOptions> options);

jsg::Promise<SendBatchResponse> sendBatchWithResponse(jsg::Lock& js,
jsg::Sequence<MessageSendRequest> batch,
jsg::Optional<SendBatchOptions> options,
const jsg::TypeHandler<SendBatchResponse>& responseHandler);

jsg::Promise<Metrics> metrics(jsg::Lock& js, const jsg::TypeHandler<Metrics>& metricsHandler);

JSG_RESOURCE_TYPE(WorkerQueue, CompatibilityFlags::Reader flags) {
JSG_METHOD(send);
JSG_METHOD(sendBatch);
if (flags.getWorkerdExperimental()) {
JSG_METHOD_NAMED(send, sendWithResponse);
JSG_METHOD_NAMED(sendBatch, sendBatchWithResponse);
JSG_METHOD(metrics);
} else {
JSG_METHOD(send);
JSG_METHOD(sendBatch);
}

JSG_TS_ROOT();
if (flags.getWorkerdExperimental()) {
JSG_TS_OVERRIDE(Queue<Body = unknown> {
send(message: Body, options?: QueueSendOptions): Promise<void>;
send(message: Body, options?: QueueSendOptions): Promise<QueueSendResponse>;
sendBatch(messages
: Iterable<MessageSendRequest<Body>>, options ?: QueueSendBatchOptions)
: Promise<void>;
: Promise<QueueSendBatchResponse>;
metrics(): Promise<QueueMetrics>;
});
} else {
Expand Down Expand Up @@ -446,7 +499,10 @@ class QueueCustomEvent final: public WorkerInterface::CustomEvent, public kj::Re
};

#define EW_QUEUE_ISOLATE_TYPES \
api::WorkerQueue, api::WorkerQueue::SendOptions, api::WorkerQueue::SendBatchOptions, \
api::WorkerQueue, api::WorkerQueue::SendMetrics, api::WorkerQueue::SendMetadata, \
api::WorkerQueue::SendResponse, api::WorkerQueue::SendBatchMetrics, \
api::WorkerQueue::SendBatchMetadata, api::WorkerQueue::SendBatchResponse, \
api::WorkerQueue::SendOptions, api::WorkerQueue::SendBatchOptions, \
api::WorkerQueue::MessageSendRequest, api::WorkerQueue::Metrics, api::MessageBatchMetrics, \
api::MessageBatchMetadata, api::IncomingQueueMessage, api::QueueRetryBatch, \
api::QueueRetryMessage, api::QueueResponse, api::QueueRetryOptions, api::QueueMessage, \
Expand Down
8 changes: 8 additions & 0 deletions src/workerd/api/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ wd_test(
data = ["queue-metadata-test.js"],
)

wd_test(
src = "queue-producer-metadata-test.wd-test",
args = ["--experimental"],
data = [
"queue-producer-metadata-test.js",
],
)

wd_test(
src = "r2-test.wd-test",
args = ["--experimental"],
Expand Down
75 changes: 75 additions & 0 deletions src/workerd/api/tests/queue-producer-metadata-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) 2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import assert from 'node:assert';

const SEND_RESPONSE = {
metadata: {
metrics: {
backlogCount: 100,
backlogBytes: 2048,
oldestMessageTimestamp: 1000000,
},
},
};

const SEND_BATCH_RESPONSE = {
metadata: {
metrics: {
backlogCount: 200,
backlogBytes: 4096,
oldestMessageTimestamp: 2000000,
},
},
};

export default {
async fetch(request) {
const { pathname } = new URL(request.url);

if (pathname === '/message') {
const text = await request.text();
assert.strictEqual(request.method, 'POST');
assert.strictEqual(text, 'abc');
return Response.json(SEND_RESPONSE);
}

if (pathname === '/batch') {
assert.strictEqual(request.method, 'POST');
const body = await request.json();
assert.strictEqual(body.messages.length, 1);
return Response.json(SEND_BATCH_RESPONSE);
}

return new Response('Not Found', { status: 404 });
},

async test(ctrl, env) {
const responseBodyEnabled = env.RESPONSE_BODY_FLAG;

const sendResult = await env.QUEUE.send('abc', { contentType: 'text' });
const sendBatchResult = await env.QUEUE.sendBatch([
{ body: 'def', contentType: 'text' },
]);

if (responseBodyEnabled) {
assert.strictEqual(sendResult.metadata.metrics.backlogCount, 100);
assert.strictEqual(sendResult.metadata.metrics.backlogBytes, 2048);
assert.strictEqual(
sendResult.metadata.metrics.oldestMessageTimestamp,
1000000
);

assert.strictEqual(sendBatchResult.metadata.metrics.backlogCount, 200);
assert.strictEqual(sendBatchResult.metadata.metrics.backlogBytes, 4096);
assert.strictEqual(
sendBatchResult.metadata.metrics.oldestMessageTimestamp,
2000000
);
} else {
assert.strictEqual(sendResult, undefined);
assert.strictEqual(sendBatchResult, undefined);
}
},
};
Loading
Loading