From dda70b47ac34450bf53555d4ff52ee0902637af3 Mon Sep 17 00:00:00 2001 From: Kenneth Ruan Date: Tue, 17 Mar 2026 16:41:45 -0500 Subject: [PATCH] Add response body to queue send() and sendBatch() --- src/workerd/api/queue.c++ | 182 ++++++++++++++++++ src/workerd/api/queue.h | 66 ++++++- src/workerd/api/tests/BUILD.bazel | 8 + .../api/tests/queue-producer-metadata-test.js | 75 ++++++++ .../queue-producer-metadata-test.wd-test | 30 +++ .../experimental/index.d.ts | 26 ++- .../generated-snapshot/experimental/index.ts | 26 ++- 7 files changed, 404 insertions(+), 9 deletions(-) create mode 100644 src/workerd/api/tests/queue-producer-metadata-test.js create mode 100644 src/workerd/api/tests/queue-producer-metadata-test.wd-test diff --git a/src/workerd/api/queue.c++ b/src/workerd/api/queue.c++ index b662670bf43..fd3d90b9eba 100644 --- a/src/workerd/api/queue.c++ +++ b/src/workerd/api/queue.c++ @@ -243,6 +243,74 @@ kj::Promise WorkerQueue::send( .attach(context.registerPendingEvent()); }; +jsg::Promise WorkerQueue::sendWithResponse(jsg::Lock& js, + jsg::JsValue body, + jsg::Optional options, + const jsg::TypeHandler& responseHandler) { + auto& context = IoContext::current(); + + JSG_REQUIRE(!body.isUndefined(), TypeError, "Message body cannot be undefined"); + + auto headers = kj::HttpHeaders(context.getHeaderTable()); + headers.set(kj::HttpHeaderId::CONTENT_TYPE, MimeType::OCTET_STREAM.toString()); + + kj::Maybe contentType; + KJ_IF_SOME(opts, options) { + KJ_IF_SOME(type, opts.contentType) { + auto validatedType = validateContentType(type); + headers.addPtrPtr(HDR_MSG_FORMAT, validatedType); + contentType = validatedType; + } + KJ_IF_SOME(secs, opts.delaySeconds) { + headers.addPtr(HDR_MSG_DELAY, kj::str(secs)); + } + } + + Serialized serialized; + KJ_IF_SOME(type, contentType) { + serialized = serialize(js, body, type, SerializeArrayBufferBehavior::DEEP_COPY); + } else if (workerd::FeatureFlags::get(js).getQueuesJsonMessages()) { + headers.addPtrPtr("X-Msg-Fmt", IncomingQueueMessage::ContentType::JSON); + serialized = serialize( + js, body, IncomingQueueMessage::ContentType::JSON, SerializeArrayBufferBehavior::DEEP_COPY); + } else { + serialized = serializeV8(js, body); + } + + auto client = context.getHttpClient(subrequestChannel, true, kj::none, "queue_send"_kjc); + auto req = client->request( + kj::HttpMethod::POST, "https://fake-host/message"_kjc, headers, serialized.data.size()); + + const auto& headerIds = context.getHeaderIds(); + const auto exposeErrorCodes = workerd::FeatureFlags::get(js).getQueueExposeErrorCodes(); + + static constexpr auto handleSend = [](auto req, auto serialized, auto client, auto& headerIds, + bool exposeErrorCodes) -> kj::Promise { + co_await req.body->write(serialized.data); + auto response = co_await req.response; + + if (exposeErrorCodes) { + JSG_REQUIRE(response.statusCode == 200, Error, buildQueueErrorMessage(response, headerIds)); + } else { + JSG_REQUIRE( + response.statusCode == 200, Error, kj::str("Queue send failed: ", response.statusText)); + } + + auto responseBody = co_await response.body->readAllBytes(); + co_return kj::str(responseBody.asChars()); + }; + + auto promise = + handleSend(kj::mv(req), kj::mv(serialized), kj::mv(client), headerIds, exposeErrorCodes); + + return context.awaitIo( + js, kj::mv(promise), [&responseHandler](jsg::Lock& js, kj::String text) -> SendResponse { + auto parsed = jsg::JsValue::fromJson(js, text); + return JSG_REQUIRE_NONNULL( + responseHandler.tryUnwrap(js, parsed), Error, "Failed to parse queue send response", text); + }); +} + kj::Promise WorkerQueue::sendBatch(jsg::Lock& js, jsg::Sequence batch, jsg::Optional options) { @@ -394,6 +462,120 @@ jsg::Promise WorkerQueue::metrics( }); } +jsg::Promise WorkerQueue::sendBatchWithResponse(jsg::Lock& js, + jsg::Sequence batch, + jsg::Optional options, + const jsg::TypeHandler& responseHandler) { + auto& context = IoContext::current(); + + JSG_REQUIRE(batch.size() > 0, TypeError, "sendBatch() requires at least one message"); + + size_t totalSize = 0; + size_t largestMessage = 0; + auto messageCount = batch.size(); + auto builder = kj::heapArrayBuilder(messageCount); + for (auto& message: batch) { + auto body = message.body.getHandle(js); + JSG_REQUIRE(!body.isUndefined(), TypeError, "Message body cannot be undefined"); + + SerializedWithOptions item; + KJ_IF_SOME(secs, message.delaySeconds) { + item.delaySeconds = secs; + } + + KJ_IF_SOME(contentType, message.contentType) { + item.contentType = validateContentType(contentType); + item.body = serialize(js, body, contentType, SerializeArrayBufferBehavior::SHALLOW_REFERENCE); + } else if (workerd::FeatureFlags::get(js).getQueuesJsonMessages()) { + item.contentType = IncomingQueueMessage::ContentType::JSON; + item.body = serialize(js, body, IncomingQueueMessage::ContentType::JSON, + SerializeArrayBufferBehavior::SHALLOW_REFERENCE); + } else { + item.body = serializeV8(js, body); + } + + builder.add(kj::mv(item)); + totalSize += builder.back().body.data.size(); + largestMessage = kj::max(largestMessage, builder.back().body.data.size()); + } + auto serializedBodies = builder.finish(); + + auto estimatedSize = (totalSize + 2) / 3 * 4 + messageCount * 64 + 32; + kj::Vector bodyBuilder(estimatedSize); + bodyBuilder.addAll("{\"messages\":["_kj); + for (size_t i = 0; i < messageCount; ++i) { + bodyBuilder.addAll("{\"body\":\""_kj); + bodyBuilder.addAll(kj::encodeBase64(serializedBodies[i].body.data)); + bodyBuilder.add('"'); + + KJ_IF_SOME(contentType, serializedBodies[i].contentType) { + bodyBuilder.addAll(",\"contentType\":\""_kj); + bodyBuilder.addAll(contentType); + bodyBuilder.add('"'); + } + + KJ_IF_SOME(delaySecs, serializedBodies[i].delaySeconds) { + bodyBuilder.addAll(",\"delaySecs\": "_kj); + bodyBuilder.addAll(kj::str(delaySecs)); + } + + bodyBuilder.addAll("}"_kj); + if (i < messageCount - 1) { + bodyBuilder.add(','); + } + } + bodyBuilder.addAll("]}"_kj); + bodyBuilder.add('\0'); + KJ_DASSERT(bodyBuilder.size() <= estimatedSize); + kj::String body(bodyBuilder.releaseAsArray()); + KJ_DASSERT(jsg::JsValue::fromJson(js, body).isObject()); + + auto client = context.getHttpClient(subrequestChannel, true, kj::none, "queue_send"_kjc); + + auto headers = kj::HttpHeaders(context.getHeaderTable()); + headers.addPtr("CF-Queue-Batch-Count"_kj, kj::str(messageCount)); + headers.addPtr("CF-Queue-Batch-Bytes"_kj, kj::str(totalSize)); + headers.addPtr("CF-Queue-Largest-Msg"_kj, kj::str(largestMessage)); + headers.set(kj::HttpHeaderId::CONTENT_TYPE, MimeType::JSON.toString()); + + KJ_IF_SOME(opts, options) { + KJ_IF_SOME(secs, opts.delaySeconds) { + headers.addPtr(HDR_MSG_DELAY, kj::str(secs)); + } + } + + auto req = + client->request(kj::HttpMethod::POST, "https://fake-host/batch"_kjc, headers, body.size()); + + const auto& headerIds = context.getHeaderIds(); + const auto exposeErrorCodes = workerd::FeatureFlags::get(js).getQueueExposeErrorCodes(); + static constexpr auto handleWrite = [](auto req, auto body, auto client, auto& headerIds, + bool exposeErrorCodes) -> kj::Promise { + co_await req.body->write(body.asBytes()); + auto response = co_await req.response; + + if (exposeErrorCodes) { + JSG_REQUIRE(response.statusCode == 200, Error, buildQueueErrorMessage(response, headerIds)); + } else { + JSG_REQUIRE(response.statusCode == 200, Error, + kj::str("Queue sendBatch failed: ", response.statusText)); + } + + auto responseBody = co_await response.body->readAllBytes(); + co_return kj::str(responseBody.asChars()); + }; + + auto promise = + handleWrite(kj::mv(req), kj::mv(body), kj::mv(client), headerIds, exposeErrorCodes); + + return context.awaitIo( + js, kj::mv(promise), [&responseHandler](jsg::Lock& js, kj::String text) -> SendBatchResponse { + auto parsed = jsg::JsValue::fromJson(js, text); + return JSG_REQUIRE_NONNULL( + responseHandler.tryUnwrap(js, parsed), Error, "Failed to parse queue send response", text); + }); +} + QueueMessage::QueueMessage( jsg::Lock& js, rpc::QueueMessage::Reader message, IoPtr result) : id(kj::str(message.getId())), diff --git a/src/workerd/api/queue.h b/src/workerd/api/queue.h index f0ca9f0b49b..89cf6f9f5e5 100644 --- a/src/workerd/api/queue.h +++ b/src/workerd/api/queue.h @@ -35,6 +35,46 @@ class WorkerQueue: public jsg::Object { JSG_STRUCT_TS_OVERRIDE(QueueMetrics); }; + struct SendMetrics { + double backlogCount; + double backlogBytes; + double oldestMessageTimestamp; + JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp); + JSG_STRUCT_TS_OVERRIDE(QueueSendMetrics); + }; + + struct SendMetadata { + SendMetrics metrics; + JSG_STRUCT(metrics); + JSG_STRUCT_TS_OVERRIDE(QueueSendMetadata); + }; + + struct SendResponse { + SendMetadata metadata; + JSG_STRUCT(metadata); + JSG_STRUCT_TS_OVERRIDE(QueueSendResponse); + }; + + struct SendBatchMetrics { + double backlogCount; + double backlogBytes; + double oldestMessageTimestamp; + JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp); + JSG_STRUCT_TS_OVERRIDE(QueueSendBatchMetrics); + }; + + struct SendBatchMetadata { + SendBatchMetrics metrics; + JSG_STRUCT(metrics); + JSG_STRUCT_TS_OVERRIDE(QueueSendBatchMetadata); + }; + + struct SendBatchResponse { + SendBatchMetadata metadata; + JSG_STRUCT(metadata); + JSG_STRUCT_TS_OVERRIDE(QueueSendBatchResponse); + }; + struct SendOptions { // TODO(soon): Support metadata. @@ -77,26 +117,39 @@ class WorkerQueue: public jsg::Object { kj::Promise send(jsg::Lock& js, jsg::JsValue body, jsg::Optional options); + jsg::Promise sendWithResponse(jsg::Lock& js, + jsg::JsValue body, + jsg::Optional options, + const jsg::TypeHandler& responseHandler); + kj::Promise sendBatch(jsg::Lock& js, jsg::Sequence batch, jsg::Optional options); + jsg::Promise sendBatchWithResponse(jsg::Lock& js, + jsg::Sequence batch, + jsg::Optional options, + const jsg::TypeHandler& responseHandler); + jsg::Promise metrics(jsg::Lock& js, const jsg::TypeHandler& metricsHandler); JSG_RESOURCE_TYPE(WorkerQueue, CompatibilityFlags::Reader flags) { - JSG_METHOD(send); - JSG_METHOD(sendBatch); if (flags.getWorkerdExperimental()) { + JSG_METHOD_NAMED(send, sendWithResponse); + JSG_METHOD_NAMED(sendBatch, sendBatchWithResponse); JSG_METHOD(metrics); + } else { + JSG_METHOD(send); + JSG_METHOD(sendBatch); } JSG_TS_ROOT(); if (flags.getWorkerdExperimental()) { JSG_TS_OVERRIDE(Queue { - send(message: Body, options?: QueueSendOptions): Promise; + send(message: Body, options?: QueueSendOptions): Promise; sendBatch(messages : Iterable>, options ?: QueueSendBatchOptions) - : Promise; + : Promise; metrics(): Promise; }); } else { @@ -446,7 +499,10 @@ class QueueCustomEvent final: public WorkerInterface::CustomEvent, public kj::Re }; #define EW_QUEUE_ISOLATE_TYPES \ - api::WorkerQueue, api::WorkerQueue::SendOptions, api::WorkerQueue::SendBatchOptions, \ + api::WorkerQueue, api::WorkerQueue::SendMetrics, api::WorkerQueue::SendMetadata, \ + api::WorkerQueue::SendResponse, api::WorkerQueue::SendBatchMetrics, \ + api::WorkerQueue::SendBatchMetadata, api::WorkerQueue::SendBatchResponse, \ + api::WorkerQueue::SendOptions, api::WorkerQueue::SendBatchOptions, \ api::WorkerQueue::MessageSendRequest, api::WorkerQueue::Metrics, api::MessageBatchMetrics, \ api::MessageBatchMetadata, api::IncomingQueueMessage, api::QueueRetryBatch, \ api::QueueRetryMessage, api::QueueResponse, api::QueueRetryOptions, api::QueueMessage, \ diff --git a/src/workerd/api/tests/BUILD.bazel b/src/workerd/api/tests/BUILD.bazel index e89fa4d4901..853f12049cd 100644 --- a/src/workerd/api/tests/BUILD.bazel +++ b/src/workerd/api/tests/BUILD.bazel @@ -164,6 +164,14 @@ wd_test( data = ["queue-metadata-test.js"], ) +wd_test( + src = "queue-producer-metadata-test.wd-test", + args = ["--experimental"], + data = [ + "queue-producer-metadata-test.js", + ], +) + wd_test( src = "r2-test.wd-test", args = ["--experimental"], diff --git a/src/workerd/api/tests/queue-producer-metadata-test.js b/src/workerd/api/tests/queue-producer-metadata-test.js new file mode 100644 index 00000000000..b22738af145 --- /dev/null +++ b/src/workerd/api/tests/queue-producer-metadata-test.js @@ -0,0 +1,75 @@ +// Copyright (c) 2023 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +import assert from 'node:assert'; + +const SEND_RESPONSE = { + metadata: { + metrics: { + backlogCount: 100, + backlogBytes: 2048, + oldestMessageTimestamp: 1000000, + }, + }, +}; + +const SEND_BATCH_RESPONSE = { + metadata: { + metrics: { + backlogCount: 200, + backlogBytes: 4096, + oldestMessageTimestamp: 2000000, + }, + }, +}; + +export default { + async fetch(request) { + const { pathname } = new URL(request.url); + + if (pathname === '/message') { + const text = await request.text(); + assert.strictEqual(request.method, 'POST'); + assert.strictEqual(text, 'abc'); + return Response.json(SEND_RESPONSE); + } + + if (pathname === '/batch') { + assert.strictEqual(request.method, 'POST'); + const body = await request.json(); + assert.strictEqual(body.messages.length, 1); + return Response.json(SEND_BATCH_RESPONSE); + } + + return new Response('Not Found', { status: 404 }); + }, + + async test(ctrl, env) { + const responseBodyEnabled = env.RESPONSE_BODY_FLAG; + + const sendResult = await env.QUEUE.send('abc', { contentType: 'text' }); + const sendBatchResult = await env.QUEUE.sendBatch([ + { body: 'def', contentType: 'text' }, + ]); + + if (responseBodyEnabled) { + assert.strictEqual(sendResult.metadata.metrics.backlogCount, 100); + assert.strictEqual(sendResult.metadata.metrics.backlogBytes, 2048); + assert.strictEqual( + sendResult.metadata.metrics.oldestMessageTimestamp, + 1000000 + ); + + assert.strictEqual(sendBatchResult.metadata.metrics.backlogCount, 200); + assert.strictEqual(sendBatchResult.metadata.metrics.backlogBytes, 4096); + assert.strictEqual( + sendBatchResult.metadata.metrics.oldestMessageTimestamp, + 2000000 + ); + } else { + assert.strictEqual(sendResult, undefined); + assert.strictEqual(sendBatchResult, undefined); + } + }, +}; diff --git a/src/workerd/api/tests/queue-producer-metadata-test.wd-test b/src/workerd/api/tests/queue-producer-metadata-test.wd-test new file mode 100644 index 00000000000..daa9d7a5603 --- /dev/null +++ b/src/workerd/api/tests/queue-producer-metadata-test.wd-test @@ -0,0 +1,30 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const unitTests :Workerd.Config = ( + services = [ + ( name = "queue-producer-metadata-enabled", + worker = ( + modules = [ + ( name = "worker-producer-metadata-enabled", esModule = embed "queue-producer-metadata-test.js" ) + ], + bindings = [ + ( name = "QUEUE", queue = "queue-producer-metadata-enabled" ), + ( name = "RESPONSE_BODY_FLAG", json = "true" ), + ], + compatibilityFlags = ["nodejs_compat", "queues_json_messages", "experimental", "capture_async_api_throws"], + ) + ), + ( name = "queue-producer-metadata-disabled", + worker = ( + modules = [ + ( name = "worker-producer-metadata-disabled", esModule = embed "queue-producer-metadata-test.js" ) + ], + bindings = [ + ( name = "QUEUE", queue = "queue-producer-metadata-disabled" ), + ( name = "RESPONSE_BODY_FLAG", json = "false" ), + ], + compatibilityFlags = ["nodejs_compat", "queues_json_messages", "capture_async_api_throws"], + ) + ), + ], +); diff --git a/types/generated-snapshot/experimental/index.d.ts b/types/generated-snapshot/experimental/index.d.ts index b479dd2ce4d..87d7dc1376f 100755 --- a/types/generated-snapshot/experimental/index.d.ts +++ b/types/generated-snapshot/experimental/index.d.ts @@ -2399,13 +2399,35 @@ interface KVNamespaceGetWithMetadataResult { } type QueueContentType = "text" | "bytes" | "json" | "v8"; interface Queue { - send(message: Body, options?: QueueSendOptions): Promise; + send(message: Body, options?: QueueSendOptions): Promise; sendBatch( messages: Iterable>, options?: QueueSendBatchOptions, - ): Promise; + ): Promise; metrics(): Promise; } +interface QueueSendMetrics { + backlogCount: number; + backlogBytes: number; + oldestMessageTimestamp: number; +} +interface QueueSendMetadata { + metrics: QueueSendMetrics; +} +interface QueueSendResponse { + metadata: QueueSendMetadata; +} +interface QueueSendBatchMetrics { + backlogCount: number; + backlogBytes: number; + oldestMessageTimestamp: number; +} +interface QueueSendBatchMetadata { + metrics: QueueSendBatchMetrics; +} +interface QueueSendBatchResponse { + metadata: QueueSendBatchMetadata; +} interface QueueSendOptions { contentType?: QueueContentType; delaySeconds?: number; diff --git a/types/generated-snapshot/experimental/index.ts b/types/generated-snapshot/experimental/index.ts index 87e33af7c9a..d52a5c56a9b 100755 --- a/types/generated-snapshot/experimental/index.ts +++ b/types/generated-snapshot/experimental/index.ts @@ -2402,13 +2402,35 @@ export interface KVNamespaceGetWithMetadataResult { } export type QueueContentType = "text" | "bytes" | "json" | "v8"; export interface Queue { - send(message: Body, options?: QueueSendOptions): Promise; + send(message: Body, options?: QueueSendOptions): Promise; sendBatch( messages: Iterable>, options?: QueueSendBatchOptions, - ): Promise; + ): Promise; metrics(): Promise; } +export interface QueueSendMetrics { + backlogCount: number; + backlogBytes: number; + oldestMessageTimestamp: number; +} +export interface QueueSendMetadata { + metrics: QueueSendMetrics; +} +export interface QueueSendResponse { + metadata: QueueSendMetadata; +} +export interface QueueSendBatchMetrics { + backlogCount: number; + backlogBytes: number; + oldestMessageTimestamp: number; +} +export interface QueueSendBatchMetadata { + metrics: QueueSendBatchMetrics; +} +export interface QueueSendBatchResponse { + metadata: QueueSendBatchMetadata; +} export interface QueueSendOptions { contentType?: QueueContentType; delaySeconds?: number;