diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 2fec6f79ebf..47c7d44e7e5 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -1,7 +1,7 @@ use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule}; use super::scheduler::SchedulerStarter; use super::v8::V8HeapMetrics; -use super::wasmtime::WasmtimeRuntime; +use super::wasmtime::{WasmMemoryBytesMetric, WasmtimeRuntime}; use super::{Scheduler, UpdateDatabaseResult}; use crate::client::{ClientActorId, ClientName}; use crate::config::{V8Config, WasmConfig}; @@ -1411,8 +1411,8 @@ where let _ = DATA_SIZE_METRICS .data_size_blob_store_bytes_used_by_blobs .remove_label_values(db); - let _ = WORKER_METRICS.wasm_memory_bytes.remove_label_values(db); + WasmMemoryBytesMetric::remove_all_metric_label_values_for_database(db); V8HeapMetrics::remove_all_metric_label_values_for_database(db); let _ = WORKER_METRICS.v8_request_queue_length.remove_label_values(db); diff --git a/crates/core/src/host/module_common.rs b/crates/core/src/host/module_common.rs index bff864759f3..49f1126b54b 100644 --- a/crates/core/src/host/module_common.rs +++ b/crates/core/src/host/module_common.rs @@ -11,7 +11,7 @@ use crate::{ module_host_context::ModuleCreationContext, replica_context::ReplicaContext, }; -use spacetimedb_lib::{Identity, RawModuleDef}; +use spacetimedb_lib::RawModuleDef; use spacetimedb_schema::{def::ModuleDef, error::ValidationErrors}; use std::sync::Arc; @@ -70,11 +70,6 @@ impl ModuleCommon { self.info.clone() } - /// Returns the identity of the database. - pub fn database_identity(&self) -> &Identity { - &self.info.database_identity - } - /// Returns the energy monitor. pub fn energy_monitor(&self) -> Arc { self.energy_monitor.clone() diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 67b54cd2103..1f420c352e9 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -171,7 +171,7 @@ const REDUCER_ARGS_BUFFER_SIZE: usize = 4_096; const JS_PROCEDURE_INSTANCE_QUEUE_CAPACITY: usize = 1; #[derive(Copy, Clone)] -pub(crate) enum JsWorkerKind { +pub enum JsWorkerKind { Main, Procedure, } @@ -345,8 +345,6 @@ fn env_on_isolate_unwrap(isolate: &mut Isolate) -> &mut JsInstanceEnv { struct JsInstanceEnv { instance_env: InstanceEnv, module_def: Option>, - /// Last used-heap sample captured by the worker's periodic heap checks. - cached_used_heap_size: usize, /// The slab of `BufferIters` created for this instance. iters: RowIters, @@ -371,7 +369,6 @@ impl JsInstanceEnv { Self { instance_env, module_def: None, - cached_used_heap_size: 0, call_times: CallTimes::new(), iters: <_>::default(), chunk_pool: <_>::default(), @@ -426,16 +423,6 @@ impl JsInstanceEnv { } } - /// Refresh the cached heap usage after an explicit V8 heap sample. - fn set_cached_used_heap_size(&mut self, bytes: usize) { - self.cached_used_heap_size = bytes; - } - - /// Return the last heap sample without forcing a fresh V8 query. - fn cached_used_heap_size(&self) -> usize { - self.cached_used_heap_size - } - fn set_module_def(&mut self, module_def: Arc) { self.module_def = Some(module_def); } @@ -1119,11 +1106,8 @@ fn adjust_gauge(gauge: &IntGauge, delta: i64) { } } -fn sample_heap_stats(scope: &mut PinScope<'_, '_>, metrics: &mut V8HeapMetrics) -> v8::HeapStatistics { - // Whenever we sample heap statistics, we cache them on the isolate so that - // the per-call execution stats can avoid querying them on each invocation. +fn record_heap_stats(scope: &mut PinScope<'_, '_>, metrics: &mut V8HeapMetrics) -> v8::HeapStatistics { let stats = scope.get_heap_statistics(); - env_on_isolate_unwrap(scope).set_cached_used_heap_size(stats.used_heap_size()); metrics.observe(&stats); stats } @@ -1147,14 +1131,14 @@ fn should_retire_worker_for_heap( metrics: &mut V8HeapMetrics, config: V8HeapPolicyConfig, ) -> Option<(usize, usize)> { - let stats = sample_heap_stats(scope, metrics); + let stats = record_heap_stats(scope, metrics); let (used, limit) = heap_usage(&stats); if !heap_fraction_at_or_above(used, limit, config.heap_gc_trigger_fraction) { return None; } scope.low_memory_notification(); - let stats = sample_heap_stats(scope, metrics); + let stats = record_heap_stats(scope, metrics); let (used, limit) = heap_usage(&stats); if heap_fraction_at_or_above(used, limit, config.heap_retire_fraction) { Some((used, limit)) @@ -1683,7 +1667,7 @@ where .with_label_values(&info.database_identity), initial_heap_limit: heap_policy.heap_limit_bytes, }; - let _initial_heap_stats = sample_heap_stats(inst.scope, &mut heap_metrics); + let _initial_heap_stats = record_heap_stats(inst.scope, &mut heap_metrics); // Process requests to the worker. // @@ -1994,25 +1978,16 @@ where // Derive energy stats. let energy = energy_from_elapsed(budget, timings.total_duration); - // Reuse the last periodic heap sample instead of querying V8 on every call. - // We use this statistic for energy tracking, so eventual consistency is fine. - let memory_allocation = env.cached_used_heap_size(); - if heap_limit_hit.get() > 1 { let database_identity = *env.instance_env.database_identity(); tracing::warn!( %database_identity, - used_heap_size = memory_allocation, current_heap_limit = scope.get_heap_statistics().heap_size_limit(), "Module hit heap limit multiple times in single call, even after doubling!", ) } - let stats = ExecutionStats { - energy, - timings, - memory_allocation, - }; + let stats = ExecutionStats { energy, timings }; ExecutionResult { stats, call_result } }) } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 8751a26804d..fb8ca58e58d 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -31,7 +31,7 @@ use anyhow::{anyhow, bail, ensure, Context}; use bytes::{Buf, Bytes}; use core::future::Future; use core::time::Duration; -use prometheus::{Histogram, IntCounter, IntGauge}; +use prometheus::{Histogram, IntCounter}; use spacetimedb_auth::identity::ConnectionAuthCtx; use spacetimedb_client_api_messages::energy::EnergyQuanta; use spacetimedb_datastore::db_metrics::DB_METRICS; @@ -105,6 +105,12 @@ pub struct EnergyStats { pub remaining: FunctionBudget, } +impl Default for EnergyStats { + fn default() -> Self { + Self::ZERO + } +} + impl EnergyStats { pub const ZERO: Self = Self { budget: FunctionBudget::ZERO, @@ -207,6 +213,7 @@ pub(crate) fn run_query_for_view( Ok(rows) } +#[derive(Default)] pub struct ExecutionTimings { pub total_duration: Duration, pub wasm_instance_env_call_times: CallTimes, @@ -226,10 +233,10 @@ impl ExecutionTimings { /// The result that `__call_reducer__` produces during normal non-trap execution. pub type ReducerResult = Result, Box>; +#[derive(Default)] pub struct ExecutionStats { pub energy: EnergyStats, pub timings: ExecutionTimings, - pub memory_allocation: usize, } impl ExecutionStats { @@ -580,8 +587,6 @@ impl WasmModuleInstance { pub struct InstanceCommon { info: Arc, energy_monitor: Arc, - allocated_memory: usize, - metric_wasm_memory_bytes: IntGauge, vm_metrics: AllVmMetrics, } @@ -594,11 +599,6 @@ impl InstanceCommon { info: module.info(), vm_metrics, energy_monitor: module.energy_monitor(), - // Will be updated on the first reducer call. - allocated_memory: 0, - metric_wasm_memory_bytes: WORKER_METRICS - .wasm_memory_bytes - .with_label_values(module.database_identity()), } } @@ -780,19 +780,12 @@ impl InstanceCommon { let ProcedureExecuteResult { stats: ExecutionStats { - memory_allocation, // TODO(procedure-energy): Do something with timing and energy. .. }, call_result, } = result; - // TODO(shub): deduplicate with reducer and view logic. - if self.allocated_memory != memory_allocation { - self.metric_wasm_memory_bytes.set(memory_allocation as i64); - self.allocated_memory = memory_allocation; - } - let trapped = call_result.is_err(); let result = match call_result { @@ -1040,14 +1033,9 @@ impl InstanceCommon { let energy_used = stats.energy.used(); let energy_quanta_used = energy_used.into(); let timings = &stats.timings; - let memory_allocation = stats.memory_allocation; self.energy_monitor .record_reducer(&energy_fingerprint, energy_quanta_used, timings.total_duration); - if self.allocated_memory != memory_allocation { - self.metric_wasm_memory_bytes.set(memory_allocation as i64); - self.allocated_memory = memory_allocation; - } maybe_log_long_running_function(function_name, timings.total_duration); diff --git a/crates/core/src/host/wasmtime/mod.rs b/crates/core/src/host/wasmtime/mod.rs index 45eaaab56fc..b9ff62aafec 100644 --- a/crates/core/src/host/wasmtime/mod.rs +++ b/crates/core/src/host/wasmtime/mod.rs @@ -10,6 +10,7 @@ use anyhow::Context; use spacetimedb_paths::server::ServerDataDir; use std::borrow::Cow; use std::time::Duration; +pub(in crate::host) use wasm_instance_env::WasmMemoryBytesMetric; use wasmtime::{self, Engine, Linker, StoreContext, StoreContextMut}; pub use wasmtime_module::{WasmtimeAsyncModule, WasmtimeInstance, WasmtimeModule}; diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 63eb862cddb..4145904259e 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -14,10 +14,12 @@ use crate::host::wasm_common::module_host_actor::{ use crate::host::wasm_common::{err_to_errno_and_log, RowIterIdx, RowIters, TimingSpan, TimingSpanIdx, TimingSpanSet}; use crate::host::AbiCall; use crate::subscription::module_subscription_manager::TransactionOffset; +use crate::worker_metrics::WORKER_METRICS; use anyhow::{anyhow, Context as _}; +use prometheus::IntGauge; use spacetimedb_data_structures::map::IntMap; use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo}; -use spacetimedb_lib::{bsatn, ConnectionId, Timestamp}; +use spacetimedb_lib::{bsatn, ConnectionId, Identity, Timestamp}; use spacetimedb_primitives::errno::HOST_CALL_FAILURE; use spacetimedb_primitives::{errno, ColId}; use spacetimedb_schema::def::ModuleDef; @@ -133,6 +135,57 @@ pub(super) struct WasmInstanceEnv { /// A pool of unused allocated chunks that can be reused. // TODO(Centril): consider using this pool for `console_timer_start` and `bytes_sink_write`. chunk_pool: ChunkPool, + + linear_memory_size_metric: WasmMemoryBytesMetric, +} + +pub(in crate::host) struct WasmMemoryBytesMetric { + wasm_memory_bytes: IntGauge, + + /// Previous value observed by this intance. + /// + /// In [`Self::observe`], we use this to compute a delta against the instance's new memory usage, + /// then increment/decrement the metric value by that delta. + /// We do this rather than `set`ting the metric value as multiple instances may coexist + /// and share the same metric label value. + /// This happens when a database has procedures and reducers running concurrently, + /// and may also happen during a module update, as there may be a period when + /// the new version has already been instantiated but the old version has not yet shut down. + last_observed: i64, +} + +impl WasmMemoryBytesMetric { + fn new(database_identity: Identity) -> Self { + Self { + wasm_memory_bytes: WORKER_METRICS.wasm_memory_bytes.with_label_values(&database_identity), + last_observed: 0, + } + } + + fn observe(&mut self, memory_usage: usize) { + let memory_usage = memory_usage as i64; + + let delta = memory_usage - self.last_observed; + + if delta > 0 { + self.wasm_memory_bytes.add(delta); + } else { + self.wasm_memory_bytes.sub(-delta); + } + + self.last_observed = memory_usage; + } + + pub(in crate::host) fn remove_all_metric_label_values_for_database(database_identity: &Identity) { + let _ = WORKER_METRICS.wasm_memory_bytes.remove_label_values(database_identity); + } +} + +impl Drop for WasmMemoryBytesMetric { + fn drop(&mut self) { + // Clean up this instance's metric value by subtracting its part of the usage. + self.wasm_memory_bytes.sub(self.last_observed); + } } const STANDARD_BYTES_SINK: u32 = 1; @@ -145,6 +198,7 @@ type RtResult = anyhow::Result; impl WasmInstanceEnv { /// Create a new `WasmEnstanceEnv` from the given `InstanceEnv`. pub fn new(instance_env: InstanceEnv) -> Self { + let database_identity = *instance_env.database_identity(); Self { instance_env, module_def: None, @@ -158,6 +212,7 @@ impl WasmInstanceEnv { timing_spans: Default::default(), call_times: CallTimes::new(), chunk_pool: <_>::default(), + linear_memory_size_metric: WasmMemoryBytesMetric::new(database_identity), } } @@ -216,6 +271,11 @@ impl WasmInstanceEnv { self.call_view_anon = call_view_anon; } + /// Record an observation in [`Self::linear_memory_size_metric`]. + pub fn record_memory_size(&mut self, memory_size: usize) { + self.linear_memory_size_metric.observe(memory_size); + } + /// Returns a reference to the memory, assumed to be initialized. pub fn get_mem(&self) -> Mem { self.mem.expect("Initialized memory") diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index e5835331cc2..aed93b78ec6 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -725,7 +725,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { let Some(call_procedure) = self.call_procedure.as_ref() else { let res = module_host_actor::ProcedureExecuteResult { - stats: zero_execution_stats(store), + stats: ExecutionStats::default(), call_result: Err(anyhow::anyhow!( "Module defines procedure {} but does not export `{}`", op.name, @@ -837,20 +837,15 @@ fn finish_opcall(store: &mut Store, initial_budget: FunctionBud remaining, }; - let stats = ExecutionStats { - energy, - timings, - memory_allocation: get_memory_size(store), - }; + record_memory_size(store); + + let stats = ExecutionStats { energy, timings }; (stats, ret_bytes) } -fn zero_execution_stats(store: &Store) -> ExecutionStats { - ExecutionStats { - energy: module_host_actor::EnergyStats::ZERO, - timings: module_host_actor::ExecutionTimings::zero(), - memory_allocation: get_memory_size(store), - } +fn record_memory_size(store: &mut Store) { + let memory_usage = get_memory_size(store); + store.data_mut().record_memory_size(memory_usage); } fn get_memory_size(store: &Store) -> usize { diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 9246022ced2..690afdd8c94 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -1,6 +1,6 @@ +use crate::hash::Hash; use crate::messages::control_db::HostType; use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool; -use crate::{hash::Hash, host::v8::JsWorkerKind}; use once_cell::sync::Lazy; use prometheus::{GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec}; use spacetimedb_datastore::execution_context::WorkloadType; @@ -11,6 +11,9 @@ use spacetimedb_table::page_pool::PagePool; use std::{sync::Once, time::Duration}; use tokio::{spawn, time::sleep}; +// Used as a metrics label value, so private billing code needs access to it. +pub use crate::host::v8::JsWorkerKind; + metrics_group!( pub struct WorkerMetrics { #[name = spacetime_worker_connected_clients] @@ -283,7 +286,7 @@ metrics_group!( pub sender_errors: IntCounterVec, #[name = spacetime_worker_wasm_memory_bytes] - #[help = "The number of bytes of linear memory allocated by the database's WASM module instance"] + #[help = "The total number of bytes of linear memory allocated by all of the database's WASM module instances"] #[labels(database_identity: Identity)] pub wasm_memory_bytes: IntGaugeVec,