From 9e907acca94fa0738dd207e184d66caf31f8baac Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Wed, 27 May 2026 12:43:32 -0400 Subject: [PATCH 1/3] Improve accuracy and change semantics of metric `wasm_memory_bytes` Prior to this commit, the metric `wasm_memory_bytes` had several problems: 1. Despite its name, it was used for both Wasmtime and V8 modules. For V8 modules, it was the same value as `v8_used_heap_size_bytes`. 2. It stored only the value for a single instance at any given time, so it under-reported a database's memory usage. 3. The same row (set of label values) was written concurrently by all instances of a particular database, with each one clobbering the previously written value. In this commit, we change the metric so that: 1. It is recorded only for Wasmtime instances, not V8 instances. For V8 instances, instead directly check `v8_used_heap_size_bytes`, or one of the other V8 heap metrics. This change involved moving the recording of this metric from `module_host_actor.rs` to `wasmtime/wasm_instance_env.rs` 2. Similar to the V8 heap metrics, all the instances cooperatively share the metric entry, updating it by incrementing and decrementing rather than `set`ting. Note that this metric is used for billing, and so we will need to update our billing code (elsewhere) to account for the change. In particular, our billing code should now charge for the sum of `wasm_memory_bytes` and `v8_used_heap_size_bytes`. We also should expect with this change for each database's recorded usage to increase, as we are now accurately recording the usage for all instances, not just one. --- crates/core/src/host/host_controller.rs | 4 +- crates/core/src/host/module_common.rs | 7 +-- crates/core/src/host/v8/mod.rs | 25 +------- .../src/host/wasm_common/module_host_actor.rs | 30 +++------- crates/core/src/host/wasmtime/mod.rs | 1 + .../src/host/wasmtime/wasm_instance_env.rs | 58 ++++++++++++++++++- .../core/src/host/wasmtime/wasmtime_module.rs | 19 +++--- crates/core/src/worker_metrics/mod.rs | 2 +- 8 files changed, 79 insertions(+), 67 deletions(-) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 2fec6f79ebf..47c7d44e7e5 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -1,7 +1,7 @@ use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule}; use super::scheduler::SchedulerStarter; use super::v8::V8HeapMetrics; -use super::wasmtime::WasmtimeRuntime; +use super::wasmtime::{WasmMemoryBytesMetric, WasmtimeRuntime}; use super::{Scheduler, UpdateDatabaseResult}; use crate::client::{ClientActorId, ClientName}; use crate::config::{V8Config, WasmConfig}; @@ -1411,8 +1411,8 @@ where let _ = DATA_SIZE_METRICS .data_size_blob_store_bytes_used_by_blobs .remove_label_values(db); - let _ = WORKER_METRICS.wasm_memory_bytes.remove_label_values(db); + WasmMemoryBytesMetric::remove_all_metric_label_values_for_database(db); V8HeapMetrics::remove_all_metric_label_values_for_database(db); let _ = WORKER_METRICS.v8_request_queue_length.remove_label_values(db); diff --git a/crates/core/src/host/module_common.rs b/crates/core/src/host/module_common.rs index bff864759f3..49f1126b54b 100644 --- a/crates/core/src/host/module_common.rs +++ b/crates/core/src/host/module_common.rs @@ -11,7 +11,7 @@ use crate::{ module_host_context::ModuleCreationContext, replica_context::ReplicaContext, }; -use spacetimedb_lib::{Identity, RawModuleDef}; +use spacetimedb_lib::RawModuleDef; use spacetimedb_schema::{def::ModuleDef, error::ValidationErrors}; use std::sync::Arc; @@ -70,11 +70,6 @@ impl ModuleCommon { self.info.clone() } - /// Returns the identity of the database. - pub fn database_identity(&self) -> &Identity { - &self.info.database_identity - } - /// Returns the energy monitor. pub fn energy_monitor(&self) -> Arc { self.energy_monitor.clone() diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 63c8e409424..c0bae9885fe 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -339,8 +339,6 @@ fn env_on_isolate_unwrap(isolate: &mut Isolate) -> &mut JsInstanceEnv { struct JsInstanceEnv { instance_env: InstanceEnv, module_def: Option>, - /// Last used-heap sample captured by the worker's periodic heap checks. - cached_used_heap_size: usize, /// The slab of `BufferIters` created for this instance. iters: RowIters, @@ -365,7 +363,6 @@ impl JsInstanceEnv { Self { instance_env, module_def: None, - cached_used_heap_size: 0, call_times: CallTimes::new(), iters: <_>::default(), chunk_pool: <_>::default(), @@ -420,16 +417,6 @@ impl JsInstanceEnv { } } - /// Refresh the cached heap usage after an explicit V8 heap sample. - fn set_cached_used_heap_size(&mut self, bytes: usize) { - self.cached_used_heap_size = bytes; - } - - /// Return the last heap sample without forcing a fresh V8 query. - fn cached_used_heap_size(&self) -> usize { - self.cached_used_heap_size - } - fn set_module_def(&mut self, module_def: Arc) { self.module_def = Some(module_def); } @@ -1117,7 +1104,6 @@ fn sample_heap_stats(scope: &mut PinScope<'_, '_>, metrics: &mut V8HeapMetrics) // Whenever we sample heap statistics, we cache them on the isolate so that // the per-call execution stats can avoid querying them on each invocation. let stats = scope.get_heap_statistics(); - env_on_isolate_unwrap(scope).set_cached_used_heap_size(stats.used_heap_size()); metrics.observe(&stats); stats } @@ -1991,25 +1977,16 @@ where // Derive energy stats. let energy = energy_from_elapsed(budget, timings.total_duration); - // Reuse the last periodic heap sample instead of querying V8 on every call. - // We use this statistic for energy tracking, so eventual consistency is fine. - let memory_allocation = env.cached_used_heap_size(); - if heap_limit_hit.get() > 1 { let database_identity = *env.instance_env.database_identity(); tracing::warn!( %database_identity, - used_heap_size = memory_allocation, current_heap_limit = scope.get_heap_statistics().heap_size_limit(), "Module hit heap limit multiple times in single call, even after doubling!", ) } - let stats = ExecutionStats { - energy, - timings, - memory_allocation, - }; + let stats = ExecutionStats { energy, timings }; ExecutionResult { stats, call_result } }) } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index b269bbb92e4..f9fab2f301e 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -31,7 +31,7 @@ use anyhow::{anyhow, bail, ensure, Context}; use bytes::{Buf, Bytes}; use core::future::Future; use core::time::Duration; -use prometheus::{Histogram, IntCounter, IntGauge}; +use prometheus::{Histogram, IntCounter}; use spacetimedb_auth::identity::ConnectionAuthCtx; use spacetimedb_client_api_messages::energy::EnergyQuanta; use spacetimedb_datastore::db_metrics::DB_METRICS; @@ -105,6 +105,12 @@ pub struct EnergyStats { pub remaining: FunctionBudget, } +impl Default for EnergyStats { + fn default() -> Self { + Self::ZERO + } +} + impl EnergyStats { pub const ZERO: Self = Self { budget: FunctionBudget::ZERO, @@ -207,6 +213,7 @@ pub(crate) fn run_query_for_view( Ok(rows) } +#[derive(Default)] pub struct ExecutionTimings { pub total_duration: Duration, pub wasm_instance_env_call_times: CallTimes, @@ -226,10 +233,10 @@ impl ExecutionTimings { /// The result that `__call_reducer__` produces during normal non-trap execution. pub type ReducerResult = Result, Box>; +#[derive(Default)] pub struct ExecutionStats { pub energy: EnergyStats, pub timings: ExecutionTimings, - pub memory_allocation: usize, } impl ExecutionStats { @@ -562,8 +569,6 @@ impl WasmModuleInstance { pub struct InstanceCommon { info: Arc, energy_monitor: Arc, - allocated_memory: usize, - metric_wasm_memory_bytes: IntGauge, vm_metrics: AllVmMetrics, } @@ -576,11 +581,6 @@ impl InstanceCommon { info: module.info(), vm_metrics, energy_monitor: module.energy_monitor(), - // Will be updated on the first reducer call. - allocated_memory: 0, - metric_wasm_memory_bytes: WORKER_METRICS - .wasm_memory_bytes - .with_label_values(module.database_identity()), } } @@ -762,19 +762,12 @@ impl InstanceCommon { let ProcedureExecuteResult { stats: ExecutionStats { - memory_allocation, // TODO(procedure-energy): Do something with timing and energy. .. }, call_result, } = result; - // TODO(shub): deduplicate with reducer and view logic. - if self.allocated_memory != memory_allocation { - self.metric_wasm_memory_bytes.set(memory_allocation as i64); - self.allocated_memory = memory_allocation; - } - let trapped = call_result.is_err(); let result = match call_result { @@ -1022,14 +1015,9 @@ impl InstanceCommon { let energy_used = stats.energy.used(); let energy_quanta_used = energy_used.into(); let timings = &stats.timings; - let memory_allocation = stats.memory_allocation; self.energy_monitor .record_reducer(&energy_fingerprint, energy_quanta_used, timings.total_duration); - if self.allocated_memory != memory_allocation { - self.metric_wasm_memory_bytes.set(memory_allocation as i64); - self.allocated_memory = memory_allocation; - } maybe_log_long_running_function(function_name, timings.total_duration); diff --git a/crates/core/src/host/wasmtime/mod.rs b/crates/core/src/host/wasmtime/mod.rs index e9396fc6d7e..149592f23b5 100644 --- a/crates/core/src/host/wasmtime/mod.rs +++ b/crates/core/src/host/wasmtime/mod.rs @@ -10,6 +10,7 @@ use anyhow::Context; use spacetimedb_paths::server::ServerDataDir; use std::borrow::Cow; use std::time::Duration; +pub(in crate::host) use wasm_instance_env::WasmMemoryBytesMetric; use wasmtime::{self, Engine, Linker, StoreContext, StoreContextMut}; pub use wasmtime_module::{WasmtimeInstance, WasmtimeModule}; diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 7c0b2a088b2..210590e8734 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -14,10 +14,12 @@ use crate::host::wasm_common::module_host_actor::{ use crate::host::wasm_common::{err_to_errno_and_log, RowIterIdx, RowIters, TimingSpan, TimingSpanIdx, TimingSpanSet}; use crate::host::AbiCall; use crate::subscription::module_subscription_manager::TransactionOffset; +use crate::worker_metrics::WORKER_METRICS; use anyhow::{anyhow, Context as _}; +use prometheus::IntGauge; use spacetimedb_data_structures::map::IntMap; use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo}; -use spacetimedb_lib::{bsatn, ConnectionId, Timestamp}; +use spacetimedb_lib::{bsatn, ConnectionId, Identity, Timestamp}; use spacetimedb_primitives::errno::HOST_CALL_FAILURE; use spacetimedb_primitives::{errno, ColId}; use spacetimedb_schema::def::ModuleDef; @@ -133,6 +135,53 @@ pub(super) struct WasmInstanceEnv { /// A pool of unused allocated chunks that can be reused. // TODO(Centril): consider using this pool for `console_timer_start` and `bytes_sink_write`. chunk_pool: ChunkPool, + + linear_memory_size_metric: WasmMemoryBytesMetric, +} + +pub(in crate::host) struct WasmMemoryBytesMetric { + wasm_memory_bytes: IntGauge, + + /// Previous value observed by this intance. + /// + /// In [`Self::observe`], we use this to compute a delta against the instance's new memory usage, + /// then increment/decrement the metric value by that delta. + /// We do this rather than `set`ting the metric value as multiple instances may coexist + /// and share the same metric label value. + /// This happens when a database has procedures and reducers running concurrently, + /// and may also happen during a module update, as there may be a period when + /// the new version has already been instantiated but the old version has not yet shut down. + last_observed: i64, +} + +impl WasmMemoryBytesMetric { + fn new(database_identity: Identity) -> Self { + Self { + wasm_memory_bytes: WORKER_METRICS.wasm_memory_bytes.with_label_values(&database_identity), + last_observed: 0, + } + } + + fn observe(&mut self, memory_usage: usize) { + let delta = memory_usage as i64 - self.last_observed; + + if delta > 0 { + self.wasm_memory_bytes.add(delta); + } else { + self.wasm_memory_bytes.sub(-delta); + } + } + + pub(in crate::host) fn remove_all_metric_label_values_for_database(database_identity: &Identity) { + let _ = WORKER_METRICS.wasm_memory_bytes.remove_label_values(database_identity); + } +} + +impl Drop for WasmMemoryBytesMetric { + fn drop(&mut self) { + // Clean up this instance's metric value by subtracting its part of the usage. + self.wasm_memory_bytes.sub(self.last_observed); + } } const STANDARD_BYTES_SINK: u32 = 1; @@ -145,6 +194,7 @@ type RtResult = anyhow::Result; impl WasmInstanceEnv { /// Create a new `WasmEnstanceEnv` from the given `InstanceEnv`. pub fn new(instance_env: InstanceEnv) -> Self { + let database_identity = *instance_env.database_identity(); Self { instance_env, module_def: None, @@ -158,6 +208,7 @@ impl WasmInstanceEnv { timing_spans: Default::default(), call_times: CallTimes::new(), chunk_pool: <_>::default(), + linear_memory_size_metric: WasmMemoryBytesMetric::new(database_identity), } } @@ -216,6 +267,11 @@ impl WasmInstanceEnv { self.call_view_anon = call_view_anon; } + /// Record an observation in [`Self::linear_memory_size_metric`]. + pub fn record_memory_size(&mut self, memory_size: usize) { + self.linear_memory_size_metric.observe(memory_size); + } + /// Returns a reference to the memory, assumed to be initialized. pub fn get_mem(&self) -> Mem { self.mem.expect("Initialized memory") diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 0690120eb16..cb39c13d92a 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -584,7 +584,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { let Some(call_procedure) = self.call_procedure.as_ref() else { let res = module_host_actor::ProcedureExecuteResult { - stats: zero_execution_stats(store), + stats: ExecutionStats::default(), call_result: Err(anyhow::anyhow!( "Module defines procedure {} but does not export `{}`", op.name, @@ -696,20 +696,15 @@ fn finish_opcall(store: &mut Store, initial_budget: FunctionBud remaining, }; - let stats = ExecutionStats { - energy, - timings, - memory_allocation: get_memory_size(store), - }; + record_memory_size(store); + + let stats = ExecutionStats { energy, timings }; (stats, ret_bytes) } -fn zero_execution_stats(store: &Store) -> ExecutionStats { - ExecutionStats { - energy: module_host_actor::EnergyStats::ZERO, - timings: module_host_actor::ExecutionTimings::zero(), - memory_allocation: get_memory_size(store), - } +fn record_memory_size(store: &mut Store) { + let memory_usage = get_memory_size(store); + store.data_mut().record_memory_size(memory_usage); } fn get_memory_size(store: &Store) -> usize { diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 9246022ced2..f02a3eda51f 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -283,7 +283,7 @@ metrics_group!( pub sender_errors: IntCounterVec, #[name = spacetime_worker_wasm_memory_bytes] - #[help = "The number of bytes of linear memory allocated by the database's WASM module instance"] + #[help = "The total number of bytes of linear memory allocated by all of the database's WASM module instances"] #[labels(database_identity: Identity)] pub wasm_memory_bytes: IntGaugeVec, From 6c350eb8de76517478115fb0f1cc2b034f8e70f7 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Thu, 28 May 2026 10:27:23 -0400 Subject: [PATCH 2/3] Joshua's review --- crates/core/src/host/v8/mod.rs | 10 ++++------ crates/core/src/host/wasmtime/wasm_instance_env.rs | 6 +++++- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index c0bae9885fe..99745d20384 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -1100,9 +1100,7 @@ fn adjust_gauge(gauge: &IntGauge, delta: i64) { } } -fn sample_heap_stats(scope: &mut PinScope<'_, '_>, metrics: &mut V8HeapMetrics) -> v8::HeapStatistics { - // Whenever we sample heap statistics, we cache them on the isolate so that - // the per-call execution stats can avoid querying them on each invocation. +fn record_heap_stats(scope: &mut PinScope<'_, '_>, metrics: &mut V8HeapMetrics) -> v8::HeapStatistics { let stats = scope.get_heap_statistics(); metrics.observe(&stats); stats @@ -1127,14 +1125,14 @@ fn should_retire_worker_for_heap( metrics: &mut V8HeapMetrics, config: V8HeapPolicyConfig, ) -> Option<(usize, usize)> { - let stats = sample_heap_stats(scope, metrics); + let stats = record_heap_stats(scope, metrics); let (used, limit) = heap_usage(&stats); if !heap_fraction_at_or_above(used, limit, config.heap_gc_trigger_fraction) { return None; } scope.low_memory_notification(); - let stats = sample_heap_stats(scope, metrics); + let stats = record_heap_stats(scope, metrics); let (used, limit) = heap_usage(&stats); if heap_fraction_at_or_above(used, limit, config.heap_retire_fraction) { Some((used, limit)) @@ -1666,7 +1664,7 @@ where .with_label_values(&info.database_identity), initial_heap_limit: heap_policy.heap_limit_bytes, }; - let _initial_heap_stats = sample_heap_stats(inst.scope, &mut heap_metrics); + let _initial_heap_stats = record_heap_stats(inst.scope, &mut heap_metrics); // Process requests to the worker. // diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 210590e8734..a79e569d0cd 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -163,13 +163,17 @@ impl WasmMemoryBytesMetric { } fn observe(&mut self, memory_usage: usize) { - let delta = memory_usage as i64 - self.last_observed; + let memory_usage = memory_usage as i64; + + let delta = memory_usage - self.last_observed; if delta > 0 { self.wasm_memory_bytes.add(delta); } else { self.wasm_memory_bytes.sub(-delta); } + + self.last_observed = memory_usage; } pub(in crate::host) fn remove_all_metric_label_values_for_database(database_identity: &Identity) { From 17375a4379014266ab46f026241f8ea4075201f1 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Fri, 29 May 2026 10:22:45 -0400 Subject: [PATCH 3/3] Reexport `JsWorkerKind` --- crates/core/src/host/v8/mod.rs | 2 +- crates/core/src/worker_metrics/mod.rs | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index b86ce1144e3..1f420c352e9 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -171,7 +171,7 @@ const REDUCER_ARGS_BUFFER_SIZE: usize = 4_096; const JS_PROCEDURE_INSTANCE_QUEUE_CAPACITY: usize = 1; #[derive(Copy, Clone)] -pub(crate) enum JsWorkerKind { +pub enum JsWorkerKind { Main, Procedure, } diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index f02a3eda51f..690afdd8c94 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -1,6 +1,6 @@ +use crate::hash::Hash; use crate::messages::control_db::HostType; use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool; -use crate::{hash::Hash, host::v8::JsWorkerKind}; use once_cell::sync::Lazy; use prometheus::{GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec}; use spacetimedb_datastore::execution_context::WorkloadType; @@ -11,6 +11,9 @@ use spacetimedb_table::page_pool::PagePool; use std::{sync::Once, time::Duration}; use tokio::{spawn, time::sleep}; +// Used as a metrics label value, so private billing code needs access to it. +pub use crate::host::v8::JsWorkerKind; + metrics_group!( pub struct WorkerMetrics { #[name = spacetime_worker_connected_clients]