diff --git a/src/workerd/api/worker-loader.c++ b/src/workerd/api/worker-loader.c++ index 669a4b2dfd8..99f5a212705 100644 --- a/src/workerd/api/worker-loader.c++ +++ b/src/workerd/api/worker-loader.c++ @@ -14,11 +14,12 @@ jsg::Ref WorkerStub::getEntrypoint(jsg::Lock& js, jsg::Optional> name, jsg::Optional options) { Frankenvalue props; - + kj::Maybe limits; KJ_IF_SOME(o, options) { KJ_IF_SOME(p, o.props) { props = Frankenvalue::fromJs(js, p.getHandle(js)); } + limits = o.limits; } kj::Maybe entrypointName; @@ -30,7 +31,7 @@ jsg::Ref WorkerStub::getEntrypoint(jsg::Lock& js, } } - auto subreqChannel = channel->getEntrypoint(kj::mv(entrypointName), kj::mv(props)); + auto subreqChannel = channel->getEntrypoint(kj::mv(entrypointName), kj::mv(props), limits); return js.alloc(IoContext::current().addObject(kj::mv(subreqChannel))); } @@ -38,11 +39,12 @@ jsg::Ref WorkerStub::getDurableObjectClass(jsg::Lock& js, jsg::Optional> name, jsg::Optional options) { Frankenvalue props; - + kj::Maybe limits; KJ_IF_SOME(o, options) { KJ_IF_SOME(p, o.props) { props = Frankenvalue::fromJs(js, p.getHandle(js)); } + limits = o.limits; } kj::Maybe entrypointName; @@ -55,7 +57,7 @@ jsg::Ref WorkerStub::getDurableObjectClass(jsg::Lock& js, } return js.alloc(IoContext::current().addObject( - channel->getActorClass(kj::mv(entrypointName), kj::mv(props)))); + channel->getActorClass(kj::mv(entrypointName), kj::mv(props), limits))); } jsg::Ref WorkerLoader::get( @@ -157,6 +159,7 @@ DynamicWorkerSource WorkerLoader::toDynamicWorkerSource(jsg::Lock& js, return {.source = kj::mv(extractedSource), .compatibilityFlags = compatFlags, + .limits = code.limits, .env = kj::mv(env), .globalOutbound = kj::mv(globalOutbound), .tails = kj::mv(tailChannels), diff --git a/src/workerd/api/worker-loader.h b/src/workerd/api/worker-loader.h index bba4f96e3a9..e17eae3d780 100644 --- a/src/workerd/api/worker-loader.h +++ b/src/workerd/api/worker-loader.h @@ -21,8 +21,9 @@ class WorkerStub: public jsg::Object { struct EntrypointOptions { jsg::Optional> props; + jsg::Optional limits; - JSG_STRUCT(props); + JSG_STRUCT(props, limits); }; jsg::Ref getEntrypoint(jsg::Lock& js, @@ -90,6 +91,8 @@ class WorkerLoader: public jsg::Object { jsg::Optional> compatibilityFlags; jsg::Optional allowExperimental = false; + jsg::Optional limits; + kj::String mainModule; // Modules are specified as an object mapping names to content. If the content is just a @@ -116,6 +119,7 @@ class WorkerLoader: public jsg::Object { JSG_STRUCT(compatibilityDate, compatibilityFlags, allowExperimental, + limits, mainModule, modules, env, @@ -156,6 +160,6 @@ class WorkerLoader: public jsg::Object { #define EW_WORKER_LOADER_ISOLATE_TYPES \ api::WorkerStub, api::WorkerStub::EntrypointOptions, api::WorkerLoader, \ - api::WorkerLoader::Module, api::WorkerLoader::WorkerCode + api::WorkerLoader::Module, api::WorkerLoader::WorkerCode, workerd::ResourceLimits } // namespace workerd::api diff --git a/src/workerd/io/io-channels.h b/src/workerd/io/io-channels.h index 179209b10a4..49f3d107552 100644 --- a/src/workerd/io/io-channels.h +++ b/src/workerd/io/io-channels.h @@ -297,6 +297,19 @@ class IoChannelFactory { ChannelTokenUsage usage, kj::ArrayPtr token); }; +// ResourceLimits provides a means to control the resource allocation for a worker stage via a +// set of optionally overridden parameters. +struct ResourceLimits { + jsg::Optional cpuMs; + jsg::Optional subRequests; + + JSG_STRUCT(cpuMs, subRequests); + + ResourceLimits clone() const { + return {cpuMs, subRequests}; + } +}; + // Represents a dynamically-loaded Worker to which requests can be sent. // // This object is returned before the Worker actually loads, so if any errors occur while loading, @@ -304,10 +317,10 @@ class IoChannelFactory { class WorkerStubChannel { public: virtual kj::Own getEntrypoint( - kj::Maybe name, Frankenvalue props) = 0; + kj::Maybe name, Frankenvalue props, kj::Maybe limits) = 0; virtual kj::Own getActorClass( - kj::Maybe name, Frankenvalue props) = 0; + kj::Maybe name, Frankenvalue props, kj::Maybe limits) = 0; // TODO(someday): Allow caller to enumerate entrypoints? }; @@ -317,6 +330,8 @@ struct DynamicWorkerSource { WorkerSource source; CompatibilityFlags::Reader compatibilityFlags; + kj::Maybe limits; + // `env` object to pass to the loaded worker. Can contain anything that can be serialized to // a `Frankenvalue` (which should eventually include all binding types, RPC stubs, etc.). Frankenvalue env; @@ -347,6 +362,7 @@ struct DynamicWorkerSource { return { .source = source.clone(), .compatibilityFlags = compatibilityFlags, + .limits = limits.map([](auto& limits) { return limits.clone(); }), .env = env.clone(), .globalOutbound = mapAddRef(globalOutbound), .tails = KJ_MAP(t, tails) { return kj::addRef(*t); }, diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index c1c9581b566..ab1c422da70 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -4135,12 +4135,12 @@ class Server::WorkerLoaderNamespace: public kj::Refcounted { } kj::Own getEntrypoint( - kj::Maybe name, Frankenvalue props) override { + kj::Maybe name, Frankenvalue props, kj::Maybe limits) override { return kj::refcounted(addRefToThis(), kj::mv(name), kj::mv(props)); } kj::Own getActorClass( - kj::Maybe name, Frankenvalue props) override { + kj::Maybe name, Frankenvalue props, kj::Maybe limits) override { return kj::refcounted(addRefToThis(), kj::mv(name), kj::mv(props)); } diff --git a/types/generated-snapshot/experimental/index.d.ts b/types/generated-snapshot/experimental/index.d.ts index 72a3d8f7bc6..e09a0859f0c 100755 --- a/types/generated-snapshot/experimental/index.d.ts +++ b/types/generated-snapshot/experimental/index.d.ts @@ -4267,6 +4267,7 @@ interface WorkerStub { } interface WorkerStubEntrypointOptions { props?: any; + limits?: workerdResourceLimits; } interface WorkerLoader { get( @@ -4288,6 +4289,7 @@ interface WorkerLoaderWorkerCode { compatibilityDate: string; compatibilityFlags?: string[]; allowExperimental?: boolean; + limits?: workerdResourceLimits; mainModule: string; modules: Record; env?: any; @@ -4295,6 +4297,10 @@ interface WorkerLoaderWorkerCode { tails?: Fetcher[]; streamingTails?: Fetcher[]; } +interface workerdResourceLimits { + cpuMs?: number; + subRequests?: number; +} /** * The Workers runtime supports a subset of the Performance API, used to measure timing and performance, * as well as timing of subrequests and other operations. diff --git a/types/generated-snapshot/experimental/index.ts b/types/generated-snapshot/experimental/index.ts index 1ba1e4f325b..94514d4d1cf 100755 --- a/types/generated-snapshot/experimental/index.ts +++ b/types/generated-snapshot/experimental/index.ts @@ -4273,6 +4273,7 @@ export interface WorkerStub { } export interface WorkerStubEntrypointOptions { props?: any; + limits?: workerdResourceLimits; } export interface WorkerLoader { get( @@ -4294,6 +4295,7 @@ export interface WorkerLoaderWorkerCode { compatibilityDate: string; compatibilityFlags?: string[]; allowExperimental?: boolean; + limits?: workerdResourceLimits; mainModule: string; modules: Record; env?: any; @@ -4301,6 +4303,10 @@ export interface WorkerLoaderWorkerCode { tails?: Fetcher[]; streamingTails?: Fetcher[]; } +export interface workerdResourceLimits { + cpuMs?: number; + subRequests?: number; +} /** * The Workers runtime supports a subset of the Performance API, used to measure timing and performance, * as well as timing of subrequests and other operations. diff --git a/types/generated-snapshot/latest/index.d.ts b/types/generated-snapshot/latest/index.d.ts index b569c0a3c41..ca31f14b99a 100755 --- a/types/generated-snapshot/latest/index.d.ts +++ b/types/generated-snapshot/latest/index.d.ts @@ -3888,6 +3888,7 @@ interface WorkerStub { } interface WorkerStubEntrypointOptions { props?: any; + limits?: workerdResourceLimits; } interface WorkerLoader { get( @@ -3909,6 +3910,7 @@ interface WorkerLoaderWorkerCode { compatibilityDate: string; compatibilityFlags?: string[]; allowExperimental?: boolean; + limits?: workerdResourceLimits; mainModule: string; modules: Record; env?: any; @@ -3916,6 +3918,10 @@ interface WorkerLoaderWorkerCode { tails?: Fetcher[]; streamingTails?: Fetcher[]; } +interface workerdResourceLimits { + cpuMs?: number; + subRequests?: number; +} /** * The Workers runtime supports a subset of the Performance API, used to measure timing and performance, * as well as timing of subrequests and other operations. diff --git a/types/generated-snapshot/latest/index.ts b/types/generated-snapshot/latest/index.ts index f2c4af42220..3224b3e71f0 100755 --- a/types/generated-snapshot/latest/index.ts +++ b/types/generated-snapshot/latest/index.ts @@ -3894,6 +3894,7 @@ export interface WorkerStub { } export interface WorkerStubEntrypointOptions { props?: any; + limits?: workerdResourceLimits; } export interface WorkerLoader { get( @@ -3915,6 +3916,7 @@ export interface WorkerLoaderWorkerCode { compatibilityDate: string; compatibilityFlags?: string[]; allowExperimental?: boolean; + limits?: workerdResourceLimits; mainModule: string; modules: Record; env?: any; @@ -3922,6 +3924,10 @@ export interface WorkerLoaderWorkerCode { tails?: Fetcher[]; streamingTails?: Fetcher[]; } +export interface workerdResourceLimits { + cpuMs?: number; + subRequests?: number; +} /** * The Workers runtime supports a subset of the Performance API, used to measure timing and performance, * as well as timing of subrequests and other operations.