Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions src/workerd/api/hibernatable-web-socket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ kj::Promise<WorkerInterface::CustomEvent::Result> HibernatableWebSocketCustomEve
kj::Maybe<kj::StringPtr> entrypointName,
kj::Maybe<Worker::VersionInfo> versionInfo,
Frankenvalue props,
kj::TaskSet& waitUntilTasks) {
kj::TaskSet& waitUntilTasks,
bool isDynamicDispatch) {
// Mark the request as delivered because we're about to run some JS.
auto& context = incomingRequest->getContext();
incomingRequest->delivered();
Expand All @@ -83,33 +84,34 @@ kj::Promise<WorkerInterface::CustomEvent::Result> HibernatableWebSocketCustomEve
try {
co_await context.run(
[entrypointName = entrypointName, &context, eventParameters = kj::mv(eventParameters),
versionInfo = kj::mv(versionInfo), props = kj::mv(props)](Worker::Lock& lock) mutable {
versionInfo = kj::mv(versionInfo), props = kj::mv(props),
isDynamicDispatch](Worker::Lock& lock) mutable {
KJ_SWITCH_ONEOF(eventParameters.eventType) {
KJ_CASE_ONEOF(text, HibernatableSocketParams::Text) {
return lock.getGlobalScope().sendHibernatableWebSocketMessage(context,
kj::mv(text.message), eventParameters.eventTimeoutMs,
kj::mv(eventParameters.websocketId), lock,
lock.getExportedHandler(
entrypointName, kj::mv(versionInfo), kj::mv(props), context.getActor()));
lock.getExportedHandler(entrypointName, kj::mv(versionInfo), kj::mv(props),
context.getActor(), isDynamicDispatch));
}
KJ_CASE_ONEOF(data, HibernatableSocketParams::Data) {
return lock.getGlobalScope().sendHibernatableWebSocketMessage(context,
kj::mv(data.message), eventParameters.eventTimeoutMs,
kj::mv(eventParameters.websocketId), lock,
lock.getExportedHandler(
entrypointName, kj::mv(versionInfo), kj::mv(props), context.getActor()));
lock.getExportedHandler(entrypointName, kj::mv(versionInfo), kj::mv(props),
context.getActor(), isDynamicDispatch));
}
KJ_CASE_ONEOF(close, HibernatableSocketParams::Close) {
return lock.getGlobalScope().sendHibernatableWebSocketClose(context, kj::mv(close),
eventParameters.eventTimeoutMs, kj::mv(eventParameters.websocketId), lock,
lock.getExportedHandler(
entrypointName, kj::mv(versionInfo), kj::mv(props), context.getActor()));
lock.getExportedHandler(entrypointName, kj::mv(versionInfo), kj::mv(props),
context.getActor(), isDynamicDispatch));
}
KJ_CASE_ONEOF(e, HibernatableSocketParams::Error) {
return lock.getGlobalScope().sendHibernatableWebSocketError(context, kj::mv(e.error),
eventParameters.eventTimeoutMs, kj::mv(eventParameters.websocketId), lock,
lock.getExportedHandler(
entrypointName, kj::mv(versionInfo), kj::mv(props), context.getActor()));
lock.getExportedHandler(entrypointName, kj::mv(versionInfo), kj::mv(props),
context.getActor(), isDynamicDispatch));
}
KJ_UNREACHABLE;
}
Expand Down
3 changes: 2 additions & 1 deletion src/workerd/api/hibernatable-web-socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ class HibernatableWebSocketCustomEvent final: public WorkerInterface::CustomEven
kj::Maybe<kj::StringPtr> entrypointName,
kj::Maybe<Worker::VersionInfo> versionInfo,
Frankenvalue props,
kj::TaskSet& waitUntilTasks) override;
kj::TaskSet& waitUntilTasks,
bool isDynamicDispatch) override;

kj::Promise<Result> sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
Expand Down
9 changes: 5 additions & 4 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,8 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEvent::run(
kj::Maybe<kj::StringPtr> entrypointName,
kj::Maybe<Worker::VersionInfo> versionInfo,
Frankenvalue props,
kj::TaskSet& waitUntilTasks) {
kj::TaskSet& waitUntilTasks,
bool isDynamicDispatch) {
// This method has three main chunks of logic:
// 1. Do all necessary setup work. This starts right below this comment.
// 2. Call into the worker's queue event handler.
Expand All @@ -846,14 +847,14 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEvent::run(
auto runProm = context.run(
[this, entrypointName = entrypointName, &context, queueEvent = kj::addRef(*queueEventHolder),
&metrics = incomingRequest->getMetrics(), versionInfo = kj::mv(versionInfo),
props = kj::mv(props)](Worker::Lock& lock) mutable {
props = kj::mv(props), isDynamicDispatch](Worker::Lock& lock) mutable {
jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock);

auto& typeHandler = lock.getWorker().getIsolate().getApi().getQueueTypeHandler(lock);
auto startResp = startQueueEvent(lock.getGlobalScope(), context, kj::mv(params),
context.addObject(result), lock,
lock.getExportedHandler(
entrypointName, kj::mv(versionInfo), kj::mv(props), context.getActor()),
lock.getExportedHandler(entrypointName, kj::mv(versionInfo), kj::mv(props),
context.getActor(), isDynamicDispatch),
typeHandler);
queueEvent->event = kj::mv(startResp.event);
queueEvent->exportedHandlerProm = kj::mv(startResp.exportedHandlerProm);
Expand Down
3 changes: 2 additions & 1 deletion src/workerd/api/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,8 @@ class QueueCustomEvent final: public WorkerInterface::CustomEvent, public kj::Re
kj::Maybe<kj::StringPtr> entrypointName,
kj::Maybe<Worker::VersionInfo> versionInfo,
Frankenvalue props,
kj::TaskSet& waitUntilTasks) override;
kj::TaskSet& waitUntilTasks,
bool isDynamicDispatch) override;

kj::Promise<Result> sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
Expand Down
17 changes: 10 additions & 7 deletions src/workerd/api/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,8 @@ kj::Promise<void> sendTracesToExportedHandler(kj::Own<IoContext::IncomingRequest
kj::Maybe<kj::StringPtr> entrypointNamePtr,
kj::Maybe<Worker::VersionInfo> versionInfo,
Frankenvalue props,
kj::ArrayPtr<kj::Own<Trace>> traces) {
kj::ArrayPtr<kj::Own<Trace>> traces,
bool isDynamicDispatch) {
// Mark the request as delivered because we're about to run some JS.
incomingRequest->delivered();

Expand All @@ -672,11 +673,12 @@ kj::Promise<void> sendTracesToExportedHandler(kj::Own<IoContext::IncomingRequest
try {
co_await context.run(
[&context, nonEmptyTraces = nonEmptyTraces.asPtr(), entrypointName = kj::mv(entrypointName),
versionInfo = kj::mv(versionInfo), props = kj::mv(props)](Worker::Lock& lock) mutable {
versionInfo = kj::mv(versionInfo), props = kj::mv(props),
isDynamicDispatch](Worker::Lock& lock) mutable {
jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock);

auto handler = lock.getExportedHandler(
entrypointName, kj::mv(versionInfo), kj::mv(props), context.getActor());
auto handler = lock.getExportedHandler(entrypointName, kj::mv(versionInfo), kj::mv(props),
context.getActor(), isDynamicDispatch);
return lock.getGlobalScope().sendTraces(nonEmptyTraces, lock, handler);
});
} catch (kj::Exception& e) {
Expand Down Expand Up @@ -706,10 +708,11 @@ auto TraceCustomEvent::run(kj::Own<IoContext::IncomingRequest> incomingRequest,
kj::Maybe<kj::StringPtr> entrypointNamePtr,
kj::Maybe<Worker::VersionInfo> versionInfo,
Frankenvalue props,
kj::TaskSet& waitUntilTasks) -> kj::Promise<Result> {
kj::TaskSet& waitUntilTasks,
bool isDynamicDispatch) -> kj::Promise<Result> {
// Don't bother to wait around for the handler to run, just hand it off to the waitUntil tasks.
waitUntilTasks.add(sendTracesToExportedHandler(
kj::mv(incomingRequest), entrypointNamePtr, kj::mv(versionInfo), kj::mv(props), traces));
waitUntilTasks.add(sendTracesToExportedHandler(kj::mv(incomingRequest), entrypointNamePtr,
kj::mv(versionInfo), kj::mv(props), traces, isDynamicDispatch));

// Reporting a proper outcome and return event here would be nice, but for that we'd need to await
// running the tail handler...
Expand Down
3 changes: 2 additions & 1 deletion src/workerd/api/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,8 @@ class TraceCustomEvent final: public WorkerInterface::CustomEvent {
kj::Maybe<kj::StringPtr> entrypointName,
kj::Maybe<Worker::VersionInfo> versionInfo,
Frankenvalue props,
kj::TaskSet& waitUntilTasks) override;
kj::TaskSet& waitUntilTasks,
bool isDynamicDispatch) override;

kj::Promise<Result> sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
Expand Down
14 changes: 9 additions & 5 deletions src/workerd/api/worker-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1982,7 +1982,8 @@ class EntrypointJsRpcTarget final: public JsRpcTargetBase {
kj::Maybe<Worker::VersionInfo> versionInfo,
Frankenvalue props,
kj::Maybe<kj::String> wrapperModule,
kj::Maybe<kj::Own<BaseTracer>> tracer)
kj::Maybe<kj::Own<BaseTracer>> tracer,
bool isDynamicDispatch)
: JsRpcTargetBase(ioCtx, CantOutliveIncomingRequest()),
ioCtx(ioCtx),
// Most of the time we don't really have to clone this but it's hard to fully prove, so
Expand All @@ -1991,7 +1992,8 @@ class EntrypointJsRpcTarget final: public JsRpcTargetBase {
versionInfo(kj::mv(versionInfo)),
props(kj::mv(props)),
wrapperModule(kj::mv(wrapperModule)),
tracer(kj::mv(tracer)) {}
tracer(kj::mv(tracer)),
isDynamicDispatch(isDynamicDispatch) {}

// Override call() to emit the Return event when the top-level RPC call completes.
// This marks when the handler returned a value, NOT when all data has been streamed or all
Expand All @@ -2008,7 +2010,7 @@ class EntrypointJsRpcTarget final: public JsRpcTargetBase {
jsg::Lock& js = lock;

auto handler = KJ_REQUIRE_NONNULL(lock.getExportedHandler(entrypointName, kj::mv(versionInfo),
kj::mv(props), ioCtx.getActor()),
kj::mv(props), ioCtx.getActor(), isDynamicDispatch),
"Failed to get handler to worker.");

if (handler->missingSuperclass && wrapperModule == kj::none) {
Expand Down Expand Up @@ -2077,6 +2079,7 @@ class EntrypointJsRpcTarget final: public JsRpcTargetBase {
Frankenvalue props;
kj::Maybe<kj::String> wrapperModule;
kj::Maybe<kj::Own<BaseTracer>> tracer;
bool isDynamicDispatch;

bool isReservedName(kj::StringPtr name) override {
if ( // "fetch" and "connect" are treated specially on entrypoints.
Expand Down Expand Up @@ -2165,7 +2168,8 @@ kj::Promise<WorkerInterface::CustomEvent::Result> JsRpcSessionCustomEvent::run(
kj::Maybe<kj::StringPtr> entrypointName,
kj::Maybe<Worker::VersionInfo> versionInfo,
Frankenvalue props,
kj::TaskSet& waitUntilTasks) {
kj::TaskSet& waitUntilTasks,
bool isDynamicDispatch) {
IoContext& ioctx = incomingRequest->getContext();

incomingRequest->delivered();
Expand All @@ -2177,7 +2181,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> JsRpcSessionCustomEvent::run(
});

EntrypointJsRpcTarget target(ioctx, entrypointName, kj::mv(versionInfo), kj::mv(props),
kj::mv(wrapperModule), mapAddRef(incomingRequest->getWorkerTracer()));
kj::mv(wrapperModule), mapAddRef(incomingRequest->getWorkerTracer()), isDynamicDispatch);
capnp::RevocableServer<rpc::JsRpcTarget> revcableTarget(target);

try {
Expand Down
3 changes: 2 additions & 1 deletion src/workerd/api/worker-rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,8 @@ class JsRpcSessionCustomEvent final: public WorkerInterface::CustomEvent {
kj::Maybe<kj::StringPtr> entrypointName,
kj::Maybe<Worker::VersionInfo> versionInfo,
Frankenvalue props,
kj::TaskSet& waitUntilTasks) override;
kj::TaskSet& waitUntilTasks,
bool isDynamicDispatch) override;

kj::Promise<Result> sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
Expand Down
8 changes: 8 additions & 0 deletions src/workerd/io/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -526,3 +526,11 @@ kj_test(
"//src/workerd/tests:test-fixture",
],
)

kj_test(
src = "worker-getexportedhandler-test.c++",
deps = [
":io",
"//src/workerd/tests:test-fixture",
],
)
19 changes: 12 additions & 7 deletions src/workerd/io/trace-stream.c++
Original file line number Diff line number Diff line change
Expand Up @@ -620,12 +620,14 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server {
kj::Maybe<kj::StringPtr> entrypointNamePtr,
kj::Maybe<Worker::VersionInfo> versionInfo,
Frankenvalue props,
kj::Own<kj::PromiseFulfiller<void>> doneFulfiller)
kj::Own<kj::PromiseFulfiller<void>> doneFulfiller,
bool isDynamicDispatch)
: weakIoContext(ioContext.getWeakRef()),
entrypointNamePtr(kj::mv(entrypointNamePtr)),
versionInfo(kj::mv(versionInfo)),
props(kj::mv(props)),
doneFulfiller(kj::mv(doneFulfiller)) {}
doneFulfiller(kj::mv(doneFulfiller)),
isDynamicDispatch(isDynamicDispatch) {}

KJ_DISALLOW_COPY_AND_MOVE(TailStreamTarget);
~TailStreamTarget() {
Expand Down Expand Up @@ -735,9 +737,10 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server {
events.size() == 1 && events[0].event.is<Onset>(), "Expected only a single onset event");
auto& event = events[0];

auto handler = KJ_REQUIRE_NONNULL(lock.getExportedHandler(entrypointNamePtr,
kj::mv(versionInfo), kj::mv(props), ioContext.getActor()),
"Failed to get handler to worker.");
auto handler =
KJ_REQUIRE_NONNULL(lock.getExportedHandler(entrypointNamePtr, kj::mv(versionInfo),
kj::mv(props), ioContext.getActor(), isDynamicDispatch),
"Failed to get handler to worker.");
StringCache stringCache;

jsg::Lock& js = lock;
Expand Down Expand Up @@ -934,6 +937,7 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server {
// or rejected if the capability is dropped before receiving the outcome
// event.
kj::Own<kj::PromiseFulfiller<void>> doneFulfiller;
bool isDynamicDispatch;

// The maybeHandler will be empty until we receive and process the
// onset event.
Expand All @@ -954,13 +958,14 @@ kj::Promise<WorkerInterface::CustomEvent::Result> TailStreamCustomEvent::run(
kj::Maybe<kj::StringPtr> entrypointName,
kj::Maybe<Worker::VersionInfo> versionInfo,
Frankenvalue props,
kj::TaskSet& waitUntilTasks) {
kj::TaskSet& waitUntilTasks,
bool isDynamicDispatch) {
IoContext& ioContext = incomingRequest->getContext();
incomingRequest->delivered();

auto [donePromise, doneFulfiller] = kj::newPromiseAndFulfiller<void>();
capFulfiller->fulfill(kj::heap<TailStreamTarget>(ioContext, kj::mv(entrypointName),
kj::mv(versionInfo), kj::mv(props), kj::mv(doneFulfiller)));
kj::mv(versionInfo), kj::mv(props), kj::mv(doneFulfiller), isDynamicDispatch));

donePromise = donePromise.attach(ioContext.registerPendingEvent());

Expand Down
3 changes: 2 additions & 1 deletion src/workerd/io/trace-stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class TailStreamCustomEvent final: public WorkerInterface::CustomEvent {
kj::Maybe<kj::StringPtr> entrypointName,
kj::Maybe<Worker::VersionInfo> versionInfo,
Frankenvalue props,
kj::TaskSet& waitUntilTasks) override;
kj::TaskSet& waitUntilTasks,
bool isDynamicDispatch) override;

kj::Promise<Result> sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
Expand Down
Loading
Loading