diff --git a/Cargo.lock b/Cargo.lock index 4a1c238f49d..5debdc2594b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -842,6 +842,13 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chain-call-repro" +version = "0.0.0" +dependencies = [ + "spacetimedb 2.1.0", +] + [[package]] name = "check-license-symlinks" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 2e8daf523d7..7d82ec900e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ members = [ "modules/keynote-benchmarks", "modules/perf-test", "modules/module-test", + "modules/chain-call-repro", "templates/basic-rs/spacetimedb", "templates/chat-console-rs/spacetimedb", "templates/keynote-2/spacetimedb-rust-client", @@ -262,7 +263,7 @@ rand_distr = "0.5.1" rayon = "1.8" rayon-core = "1.11.0" regex = "1" -reqwest = { version = "0.12", features = ["stream", "json"] } +reqwest = { version = "0.12", features = ["stream", "json", "blocking"] } rolldown = { git = "https://github.com/rolldown/rolldown.git", tag = "v1.0.0-rc.3" } rolldown_common = { git = "https://github.com/rolldown/rolldown.git", tag = "v1.0.0-rc.3" } rolldown_error = { git = "https://github.com/rolldown/rolldown.git", tag = "v1.0.0-rc.3" } diff --git a/crates/client-api/src/lib.rs b/crates/client-api/src/lib.rs index 784ff0862b1..aa44283b71d 100644 --- a/crates/client-api/src/lib.rs +++ b/crates/client-api/src/lib.rs @@ -344,6 +344,7 @@ pub trait ControlStateWriteAccess: Send + Sync { owner_identity: &Identity, domain_names: &[DomainName], ) -> anyhow::Result; + } #[async_trait] @@ -454,6 +455,7 @@ impl ControlStateWriteAccess for Arc { .replace_dns_records(database_identity, owner_identity, domain_names) .await } + } #[async_trait] diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 9e112e94e01..68b3e6a6f9e 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -934,24 +934,37 @@ impl RelationalDB { b.active.insert(barrier_offset); } - /// Abort a durability barrier, discarding ALL deferred transactions. + /// Modify the deferred TxData at `target_offset` in the barrier's pending queue. /// - /// Used when Round 2 of pipelined 2PC aborts. All transactions behind the - /// barrier are tainted (they may have read data from the aborted 2PC tx) - /// and must not reach disk. On restart, the in-memory state is lost and - /// the pipeline is effectively flushed. - pub fn abort_durability_barrier(&self, barrier_offset: u64) { + /// Used by pipelined 2PC to add the st_2pc_state deletion (COMMIT marker) + /// to the reducer's TxData so they share a single commitlog entry. + /// Must match by offset because concurrent 2PC transactions and other + /// commits may have added entries to the queue before or after ours. + pub fn modify_barrier_pending_at(&self, target_offset: u64, f: impl FnOnce(&mut TxData)) { let mut barrier = self.durability_barrier.lock().unwrap(); - let Some(ref mut b) = *barrier else { - return; - }; - b.active.remove(&barrier_offset); - if b.active.is_empty() { - // Drop all pending transactions -- they are tainted. - *barrier = None; + if let Some(ref mut b) = *barrier { + if let Some(entry) = b.pending.iter_mut().find(|(_, td)| td.tx_offset() == Some(target_offset)) { + // Arc::make_mut clones the TxData if other references exist + // (e.g. the caller that committed the tx still holds one). + let tx_data = Arc::make_mut(&mut entry.1); + f(tx_data); + } } - // If other barriers remain, the pending list stays (those transactions - // are still blocked by the other barriers and will be resolved by them). + } + + /// Abort a durability barrier, discarding ALL deferred transactions + /// and clearing ALL active barriers. + /// + /// Used when Round 2 of pipelined 2PC aborts. The aborted 2PC's reducer + /// changes are already in committed state (in memory), so any other + /// deferred transaction may have observed them. Since the caller always + /// triggers a module restart after this call, ALL pending transactions + /// are tainted and must be dropped -- not just those behind the aborted + /// barrier. The module restart rebuilds committed state from disk. + pub fn abort_durability_barrier(&self, _barrier_offset: u64) { + let mut barrier = self.durability_barrier.lock().unwrap(); + // Drop everything: all active barriers and all pending transactions. + *barrier = None; } /// Clear one durability barrier, flushing deferred transactions that are now diff --git a/crates/core/src/host/call_edge_tracker.rs b/crates/core/src/host/call_edge_tracker.rs new file mode 100644 index 00000000000..0055700162c --- /dev/null +++ b/crates/core/src/host/call_edge_tracker.rs @@ -0,0 +1,134 @@ +// TODO: Consolidate with `ControlStateWriteAccess` once that trait is moved +// to a crate that `core` can depend on (currently in `client-api`, which +// depends on `core` -- circular dependency). The edge tracking methods should +// live on `ControlStateWriteAccess` since that is the standard interface for +// interacting with the control database. + +/// Trait for tracking cross-database call edges for distributed deadlock detection. +/// +/// Before making a cross-database reducer call, the caller registers an edge +/// A -> B (caller -> callee). If this would create a cycle in the call graph, +/// the registration fails with an error, indicating a potential distributed deadlock. +/// +/// Methods are synchronous (blocking) because they are called from the WASM +/// executor thread, which must not use async I/O. +/// +/// Implementations: +/// +/// - **Standalone** -- [`InMemoryCallEdgeTracker`] maintains an in-memory graph +/// and runs cycle detection locally. No network I/O. +/// +/// - **Cluster** -- Calls a reducer on the control database that inserts the edge +/// and runs cycle detection. Returns `Err` if a cycle is found. +use spacetimedb_lib::Identity; +use std::collections::{HashMap, HashSet}; +use std::sync::Mutex; + +pub trait CallEdgeTracker: Send + Sync + 'static { + /// Register a call edge: `caller` is about to call `callee`. + /// + /// Returns `Ok(())` if the edge was registered (no cycle). + /// Returns `Err` if registering this edge would create a cycle. + fn register_edge(&self, call_id: &str, caller: Identity, callee: Identity) -> anyhow::Result<()>; + + /// Unregister a call edge after the call completes (success or failure). + fn unregister_edge(&self, call_id: &str) -> anyhow::Result<()>; + + /// Unregister all edges for this node (crash cleanup on startup). + fn unregister_all_edges(&self) -> anyhow::Result<()>; + + /// Set the base URL for reaching the control DB (cloud only). + /// Default: no-op. Overridden by `CloudCallEdgeTracker`. + fn set_base_url(&self, _url: &str) {} +} + +/// In-memory call edge tracker with cycle detection. +/// +/// Suitable for standalone (single-node) deployments where all databases +/// share the same process. Maintains an adjacency list of active call edges +/// and checks for cycles via DFS on each registration. +pub struct InMemoryCallEdgeTracker { + state: Mutex, +} + +struct EdgeState { + /// call_id -> (caller, callee) + edges: HashMap, + /// caller -> set of callees (adjacency list for DFS) + graph: HashMap>, +} + +impl InMemoryCallEdgeTracker { + pub fn new() -> Self { + Self { + state: Mutex::new(EdgeState { + edges: HashMap::new(), + graph: HashMap::new(), + }), + } + } +} + +impl Default for InMemoryCallEdgeTracker { + fn default() -> Self { + Self::new() + } +} + +/// DFS: is there a path from `from` to `to` in the graph? +fn has_path(graph: &HashMap>, from: Identity, to: Identity) -> bool { + let mut visited = HashSet::new(); + let mut stack = vec![from]; + while let Some(current) = stack.pop() { + if current == to { + return true; + } + if !visited.insert(current) { + continue; + } + if let Some(neighbors) = graph.get(¤t) { + stack.extend(neighbors); + } + } + false +} + +impl CallEdgeTracker for InMemoryCallEdgeTracker { + fn register_edge(&self, call_id: &str, caller: Identity, callee: Identity) -> anyhow::Result<()> { + let mut state = self.state.lock().unwrap(); + + // Check for cycle: is there a path from callee back to caller? + if has_path(&state.graph, callee, caller) { + anyhow::bail!( + "cycle detected: adding edge {} -> {} would create a distributed deadlock", + caller.to_abbreviated_hex(), + callee.to_abbreviated_hex() + ); + } + + // No cycle -- insert the edge. + state.edges.insert(call_id.to_owned(), (caller, callee)); + state.graph.entry(caller).or_default().insert(callee); + Ok(()) + } + + fn unregister_edge(&self, call_id: &str) -> anyhow::Result<()> { + let mut state = self.state.lock().unwrap(); + if let Some((caller, callee)) = state.edges.remove(call_id) { + if let Some(neighbors) = state.graph.get_mut(&caller) { + neighbors.remove(&callee); + if neighbors.is_empty() { + state.graph.remove(&caller); + } + } + } + Ok(()) + } + + fn unregister_all_edges(&self) -> anyhow::Result<()> { + let mut state = self.state.lock().unwrap(); + state.edges.clear(); + state.graph.clear(); + Ok(()) + } +} diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index e8d58b41e40..09a8b324a86 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -123,11 +123,15 @@ pub struct HostController { /// /// All per-replica clones share the same underlying connection pool. pub call_reducer_client: reqwest::Client, + /// Shared blocking HTTP/2 client for reducer calls made from non-async executor threads. + /// + /// Like [`Self::call_reducer_client`], per-replica clones share the same underlying pool. + pub call_reducer_blocking_client: reqwest::blocking::Client, /// Router that resolves the HTTP base URL of the leader node for a given database. /// - /// Set to [`LocalReducerRouter`] by default; replaced with `ClusterReducerRouter` - /// in cluster deployments via [`HostController::new`] receiving the router directly. - pub call_reducer_router: Arc, + /// Initialized to [`LocalReducerRouter`] at construction. In cluster deployments, + /// replaced once at startup with `CachingResolver` via [`Self::set_call_reducer_router`]. + pub call_reducer_router: Arc>>, /// A single node-level Bearer token included in all outgoing cross-DB reducer calls. /// /// Set once at node startup by the deployment layer (standalone / cluster) so that @@ -138,6 +142,10 @@ pub struct HostController { /// /// `None` in test/embedded contexts where no JWT signer is configured. pub call_reducer_auth_token: Option, + /// Distributed deadlock detection for cross-database calls. + /// Set to [`InMemoryCallEdgeTracker`] by default; replaced with `CloudCallEdgeTracker` + /// in cluster deployments. + pub call_edge_tracker: Arc, } pub(crate) struct HostRuntimes { @@ -250,8 +258,12 @@ impl HostController { bsatn_rlb_pool: BsatnRowListBuilderPool::new(), db_cores, call_reducer_client: ReplicaContext::new_call_reducer_client(&CallReducerOnDbConfig::default()), - call_reducer_router: Arc::new(LocalReducerRouter::new("http://127.0.0.1:3000")), + call_reducer_blocking_client: ReplicaContext::new_call_reducer_blocking_client( + &CallReducerOnDbConfig::default(), + ), + call_reducer_router: Arc::new(std::sync::Mutex::new(Arc::new(LocalReducerRouter::new("http://127.0.0.1:3000")))), call_reducer_auth_token: None, + call_edge_tracker: Arc::new(crate::host::call_edge_tracker::InMemoryCallEdgeTracker::new()), } } @@ -260,6 +272,24 @@ impl HostController { self.program_storage = ps; } + /// Set the [`ReducerCallRouter`] used by this controller. + /// Can only be called once (at startup). Panics if called a second time. + /// Replace the [`ReducerCallRouter`]. Called once at startup to install + /// the cluster-aware router after the control DB connection is established. + pub fn set_call_reducer_router(&self, router: Arc) { + *self.call_reducer_router.lock().unwrap() = router; + } + + /// Get the active [`ReducerCallRouter`]. + pub fn get_call_reducer_router(&self) -> Arc { + self.call_reducer_router.lock().unwrap().clone() + } + + /// Set the [`CallEdgeTracker`] for distributed deadlock detection. + pub fn set_call_edge_tracker(&mut self, tracker: Arc) { + self.call_edge_tracker = tracker; + } + /// Get a [`ModuleHost`] managed by this controller, or launch it from /// persistent state. /// @@ -690,8 +720,10 @@ async fn make_replica_ctx( relational_db: Arc, bsatn_rlb_pool: BsatnRowListBuilderPool, call_reducer_client: reqwest::Client, + call_reducer_blocking_client: reqwest::blocking::Client, call_reducer_router: Arc, call_reducer_auth_token: Option, + call_edge_tracker: Arc, ) -> anyhow::Result { let logger = match module_logs { Some(path) => asyncify(move || Arc::new(DatabaseLogger::open_today(path))).await, @@ -725,10 +757,12 @@ async fn make_replica_ctx( logger, subscriptions, call_reducer_client, + call_reducer_blocking_client, call_reducer_router, call_reducer_auth_token, prepared_txs: crate::host::prepared_tx::PreparedTransactions::new(), on_panic: std::sync::Arc::new(std::sync::OnceLock::new()), + call_edge_tracker: call_edge_tracker, }) } @@ -805,8 +839,10 @@ struct ModuleLauncher { core: AllocatedJobCore, bsatn_rlb_pool: BsatnRowListBuilderPool, call_reducer_client: reqwest::Client, + call_reducer_blocking_client: reqwest::blocking::Client, call_reducer_router: Arc, call_reducer_auth_token: Option, + call_edge_tracker: Arc, } impl ModuleLauncher { @@ -827,8 +863,10 @@ impl ModuleLauncher { self.relational_db, self.bsatn_rlb_pool, self.call_reducer_client, + self.call_reducer_blocking_client, self.call_reducer_router, self.call_reducer_auth_token, + self.call_edge_tracker, ) .await .map(Arc::new)?; @@ -1040,8 +1078,10 @@ impl Host { core: host_controller.db_cores.take(), bsatn_rlb_pool: bsatn_rlb_pool.clone(), call_reducer_client: host_controller.call_reducer_client.clone(), - call_reducer_router: host_controller.call_reducer_router.clone(), + call_reducer_blocking_client: host_controller.call_reducer_blocking_client.clone(), + call_reducer_router: host_controller.get_call_reducer_router(), call_reducer_auth_token: host_controller.call_reducer_auth_token.clone(), + call_edge_tracker: host_controller.call_edge_tracker.clone(), } .launch_module() .await? @@ -1072,8 +1112,10 @@ impl Host { core: host_controller.db_cores.take(), bsatn_rlb_pool: bsatn_rlb_pool.clone(), call_reducer_client: host_controller.call_reducer_client.clone(), - call_reducer_router: host_controller.call_reducer_router.clone(), + call_reducer_blocking_client: host_controller.call_reducer_blocking_client.clone(), + call_reducer_router: host_controller.get_call_reducer_router(), call_reducer_auth_token: host_controller.call_reducer_auth_token.clone(), + call_edge_tracker: host_controller.call_edge_tracker.clone(), } .launch_module() .await; @@ -1098,8 +1140,10 @@ impl Host { core: host_controller.db_cores.take(), bsatn_rlb_pool: bsatn_rlb_pool.clone(), call_reducer_client: host_controller.call_reducer_client.clone(), - call_reducer_router: host_controller.call_reducer_router.clone(), + call_reducer_blocking_client: host_controller.call_reducer_blocking_client.clone(), + call_reducer_router: host_controller.get_call_reducer_router(), call_reducer_auth_token: host_controller.call_reducer_auth_token.clone(), + call_edge_tracker: host_controller.call_edge_tracker.clone(), } .launch_module() .await; @@ -1214,8 +1258,12 @@ impl Host { bsatn_rlb_pool, // Transient validation-only module; build its own client and router with defaults. call_reducer_client: ReplicaContext::new_call_reducer_client(&CallReducerOnDbConfig::default()), + call_reducer_blocking_client: ReplicaContext::new_call_reducer_blocking_client( + &CallReducerOnDbConfig::default(), + ), call_reducer_router: Arc::new(LocalReducerRouter::new("http://127.0.0.1:3000")), call_reducer_auth_token: None, + call_edge_tracker: Arc::new(crate::host::call_edge_tracker::InMemoryCallEdgeTracker::new()), } .launch_module() .await diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 7053a0790ff..5f885bbcc15 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -991,145 +991,159 @@ impl InstanceEnv { }) } - /// Call a reducer on a remote database. + /// Call a reducer on a remote database (blocking). /// - /// Unlike [`Self::http_request`], this is explicitly allowed while a transaction is open — - /// the caller is responsible for understanding the consistency implications. - /// - /// Uses [`ReplicaContext::call_reducer_router`] to resolve the leader node for - /// `database_identity`, then sends the request via the warmed HTTP client in - /// [`ReplicaContext::call_reducer_client`]. + /// Uses blocking HTTP to avoid async runtime conflicts on the WASM executor thread. + /// Registers a call edge for distributed deadlock detection before the call. /// /// Returns `(http_status, response_body)` on transport success, - /// or [`NodesError::HttpError`] if the connection itself fails. + /// or [`NodesError::HttpError`] on failure. pub fn call_reducer_on_db( &self, database_identity: Identity, reducer_name: &str, args: bytes::Bytes, - ) -> impl Future> + use<> { - let client = self.replica_ctx.call_reducer_client.clone(); - let router = self.replica_ctx.call_reducer_router.clone(); - let reducer_name = reducer_name.to_owned(); - // Node-level auth token: a single token minted at startup and shared by all replicas - // on this node. Passed as a Bearer token so `anon_auth_middleware` on the target node - // accepts the request without generating a fresh ephemeral identity per call. - let auth_token = self.replica_ctx.call_reducer_auth_token.clone(); + ) -> Result<(u16, bytes::Bytes), NodesError> { + let start = Instant::now(); let caller_identity = self.replica_ctx.database.database_identity; - async move { - let start = Instant::now(); - - let base_url = router - .resolve_base_url(database_identity) - .await - .map_err(|e| NodesError::HttpError(e.to_string()))?; - let url = format!( - "{}/v1/database/{}/call/{}", - base_url, - database_identity.to_hex(), - reducer_name, - ); - let mut req = client - .post(&url) - .header(http::header::CONTENT_TYPE, "application/octet-stream") - .body(args); - if let Some(token) = auth_token { - req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}")); - } - let result = async { - let response = req.send().await.map_err(|e| NodesError::HttpError(e.to_string()))?; - let status = response.status().as_u16(); - let body = response - .bytes() - .await - .map_err(|e| NodesError::HttpError(e.to_string()))?; - Ok::<_, NodesError>((status, body)) - } - .await; - - WORKER_METRICS - .cross_db_reducer_calls_total - .with_label_values(&caller_identity) - .inc(); - WORKER_METRICS - .cross_db_reducer_duration_seconds - .with_label_values(&caller_identity) - .observe(start.elapsed().as_secs_f64()); - - result + // Register call edge for distributed deadlock detection. + let call_id = uuid::Uuid::new_v4().to_string(); + self.register_edge_or_deadlock(&call_id, caller_identity, database_identity)?; + + let base_url = self + .replica_ctx + .call_reducer_router + .resolve_base_url_blocking(database_identity) + .map_err(|e| NodesError::HttpError(e.to_string()))?; + let url = format!( + "{}/v1/database/{}/call/{}", + base_url, + database_identity.to_hex(), + reducer_name, + ); + let mut req = self + .replica_ctx + .call_reducer_blocking_client + .post(&url) + .header(http::header::CONTENT_TYPE, "application/octet-stream") + .body(args.to_vec()); + if let Some(ref token) = self.replica_ctx.call_reducer_auth_token { + req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}")); } + + let result = crate::replica_context::execute_blocking_http( + &self.replica_ctx.call_reducer_blocking_client, + req, + |resp| { + let status = resp.status().as_u16(); + let body = resp.bytes()?; + Ok((status, body)) + }, + ) + .map_err(|e| NodesError::HttpError(e)); + + // Unregister the call edge (regardless of success/failure). + let _ = self.replica_ctx.call_edge_tracker.unregister_edge(&call_id); + + WORKER_METRICS + .cross_db_reducer_calls_total + .with_label_values(&caller_identity) + .inc(); + WORKER_METRICS + .cross_db_reducer_duration_seconds + .with_label_values(&caller_identity) + .observe(start.elapsed().as_secs_f64()); + + result } - /// Call a reducer on a remote database using the 2PC prepare protocol. + /// Call a reducer on a remote database using the 2PC prepare protocol (blocking). /// /// Like [`Self::call_reducer_on_db`], but POSTs to `/prepare/{reducer}` instead of /// `/call/{reducer}`. On success, parses the `X-Prepare-Id` response header and stores /// `(database_identity, prepare_id)` in [`Self::prepared_participants`]. - /// - /// Returns `(http_status, response_body)` on transport success. - /// The caller (coordinator reducer) is responsible for checking the status; - /// if the coordinator's reducer commits, the runtime will commit all participants, - /// and if it fails, the runtime will abort them. pub fn call_reducer_on_db_2pc( &mut self, database_identity: Identity, reducer_name: &str, args: bytes::Bytes, - ) -> impl Future), NodesError>> + use<> { - let client = self.replica_ctx.call_reducer_client.clone(); - let router = self.replica_ctx.call_reducer_router.clone(); - let reducer_name = reducer_name.to_owned(); - let auth_token = self.replica_ctx.call_reducer_auth_token.clone(); + ) -> Result<(u16, bytes::Bytes, Option), NodesError> { + let start = Instant::now(); let caller_identity = self.replica_ctx.database.database_identity; - async move { - let start = Instant::now(); + // Register call edge for distributed deadlock detection. + let call_id = uuid::Uuid::new_v4().to_string(); + self.register_edge_or_deadlock(&call_id, caller_identity, database_identity)?; + + let base_url = self + .replica_ctx + .call_reducer_router + .resolve_base_url_blocking(database_identity) + .map_err(|e| NodesError::HttpError(e.to_string()))?; + let url = format!( + "{}/v1/database/{}/prepare/{}", + base_url, + database_identity.to_hex(), + reducer_name, + ); + let mut req = self + .replica_ctx + .call_reducer_blocking_client + .post(&url) + .header(http::header::CONTENT_TYPE, "application/octet-stream") + .header("X-Coordinator-Identity", caller_identity.to_hex().to_string()) + .body(args.to_vec()); + if let Some(ref token) = self.replica_ctx.call_reducer_auth_token { + req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}")); + } - let base_url = router - .resolve_base_url(database_identity) - .await - .map_err(|e| NodesError::HttpError(e.to_string()))?; - let url = format!( - "{}/v1/database/{}/prepare/{}", - base_url, - database_identity.to_hex(), - reducer_name, - ); - let mut req = client - .post(&url) - .header(http::header::CONTENT_TYPE, "application/octet-stream") - .header("X-Coordinator-Identity", caller_identity.to_hex().to_string()) - .body(args); - if let Some(token) = auth_token { - req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}")); - } - let result = async { - let response = req.send().await.map_err(|e| NodesError::HttpError(e.to_string()))?; - let status = response.status().as_u16(); - let prepare_id = response + log::debug!("call_reducer_on_db_2pc: sending blocking HTTP to {url}"); + let result = crate::replica_context::execute_blocking_http( + &self.replica_ctx.call_reducer_blocking_client, + req, + |resp| { + let status = resp.status().as_u16(); + let prepare_id = resp .headers() .get("X-Prepare-Id") .and_then(|v| v.to_str().ok()) .map(|s| s.to_owned()); - let body = response - .bytes() - .await - .map_err(|e| NodesError::HttpError(e.to_string()))?; + let body = resp.bytes()?; Ok((status, body, prepare_id)) - } - .await; - - WORKER_METRICS - .cross_db_reducer_calls_total - .with_label_values(&caller_identity) - .inc(); - WORKER_METRICS - .cross_db_reducer_duration_seconds - .with_label_values(&caller_identity) - .observe(start.elapsed().as_secs_f64()); - - result + }, + ) + .map_err(|e| NodesError::HttpError(e)); + log::debug!("call_reducer_on_db_2pc: result={result:?}"); + + // Unregister the call edge (regardless of success/failure). + let _ = self.replica_ctx.call_edge_tracker.unregister_edge(&call_id); + + WORKER_METRICS + .cross_db_reducer_calls_total + .with_label_values(&caller_identity) + .inc(); + WORKER_METRICS + .cross_db_reducer_duration_seconds + .with_label_values(&caller_identity) + .observe(start.elapsed().as_secs_f64()); + + result + } + + /// Register a call edge for cycle detection. If a cycle is detected, + /// return an error immediately -- retrying won't help because the other + /// side is already calling us and we hold the lock. + fn register_edge_or_deadlock(&self, call_id: &str, caller: Identity, callee: Identity) -> Result<(), NodesError> { + match self + .replica_ctx + .call_edge_tracker + .register_edge(call_id, caller, callee) + { + Ok(()) => Ok(()), + Err(e) => Err(NodesError::HttpError(format!( + "distributed deadlock detected: {caller} -> {callee}: {e}" + ))), } } } @@ -1507,10 +1521,14 @@ mod test { logger, subscriptions: subs, call_reducer_client: ReplicaContext::new_call_reducer_client(&CallReducerOnDbConfig::default()), + call_reducer_blocking_client: ReplicaContext::new_call_reducer_blocking_client( + &CallReducerOnDbConfig::default(), + ), call_reducer_router: Arc::new(LocalReducerRouter::new("http://127.0.0.1:3000")), call_reducer_auth_token: None, prepared_txs: crate::host::prepared_tx::PreparedTransactions::new(), on_panic: std::sync::Arc::new(std::sync::OnceLock::new()), + call_edge_tracker: Arc::new(crate::host::call_edge_tracker::InMemoryCallEdgeTracker::new()), }, runtime, )) diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs index 06e55de6444..df455ba8dee 100644 --- a/crates/core/src/host/mod.rs +++ b/crates/core/src/host/mod.rs @@ -45,6 +45,7 @@ pub mod prepared_tx; pub mod scheduler; pub mod wasmtime; +pub mod call_edge_tracker; // Visible for integration testing. pub mod instance_env; pub mod reducer_router; diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 6706c8cbd0a..875e267c796 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1970,7 +1970,7 @@ impl ModuleHost { continue; } }; - let base_url = match router.resolve_base_url(participant_identity).await { + let base_url = match router.resolve_base_url_blocking(participant_identity) { Ok(url) => url, Err(e) => { log::warn!("recover_2pc_coordinator: cannot resolve URL for {participant_identity}: {e}"); @@ -2142,7 +2142,7 @@ impl ModuleHost { coordinator_identity: Identity, prepare_id: &str, ) -> Option { - let base_url = match router.resolve_base_url(coordinator_identity).await { + let base_url = match router.resolve_base_url_blocking(coordinator_identity) { Ok(url) => url, Err(e) => { log::warn!("2PC recovery status poll: cannot resolve coordinator URL: {e}"); diff --git a/crates/core/src/host/reducer_router.rs b/crates/core/src/host/reducer_router.rs index dcbf20c51c8..815ad6ce876 100644 --- a/crates/core/src/host/reducer_router.rs +++ b/crates/core/src/host/reducer_router.rs @@ -28,6 +28,18 @@ pub trait ReducerCallRouter: Send + Sync + 'static { /// Returns an error string when the leader cannot be resolved /// (database not found, no leader elected yet, node has no network address, etc.). fn resolve_base_url<'a>(&'a self, database_identity: Identity) -> BoxFuture<'a, anyhow::Result>; + + /// Blocking variant of [`resolve_base_url`] for use on non-async threads. + /// + /// The default implementation drives the async version on a fresh OS thread with its own + /// minimal tokio runtime, so it is safe to call from any thread — including threads that + /// are already inside a tokio `block_on` context (e.g. the `SingleCoreExecutor` thread). + /// + /// Override for routers that can resolve without spawning (e.g. [`LocalReducerRouter`]). + fn resolve_base_url_blocking(&self, database_identity: Identity) -> anyhow::Result { + let fut = self.resolve_base_url(database_identity); + futures::executor::block_on(fut) + } } // Arc is itself a ReducerCallRouter. @@ -60,4 +72,8 @@ impl ReducerCallRouter for LocalReducerRouter { let url = self.base_url.clone(); Box::pin(async move { Ok(url) }) } + + fn resolve_base_url_blocking(&self, _database_identity: Identity) -> anyhow::Result { + Ok(self.base_url.clone()) + } } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 5364c5269ab..9a03a8e2226 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -430,7 +430,7 @@ async fn send_prepared_to_persist_to_coordinator( } match req.send().await { Ok(resp) if resp.status().is_success() => { - log::info!("2PC prepared-to-persist: notified coordinator for {prepare_id}"); + log::debug!("2PC prepared-to-persist: notified coordinator for {prepare_id}"); return; } Ok(resp) => { @@ -733,53 +733,76 @@ impl WasmModuleInstance { return; } - // Step 4: flush st_2pc_prepare_marker -- writes st_2pc_state row (reducer - // inputs only) into committed state. Creates a TxData containing ONLY this - // system table insert (no user table data). Assigned offset N. + // Step 4: create TxData for the st_2pc_state INSERT (PREPARE PERSIST). + // The row is NOT inserted into committed_state -- it only exists in the + // commitlog for crash recovery. Assigned offset N. + // Build the full row once so the same ProductValue can be reused for the + // DELETE entry -- replay uses whole-row equality, so the DELETE must + // carry identical field values. + let marker_row = spacetimedb_sats::ProductValue::from(spacetimedb_datastore::system_tables::St2pcStateRow { + prepare_id: prepare_id.clone(), + coordinator_identity_hex: coordinator_identity.to_hex().to_string(), + reducer_name: recovery_reducer_name, + args_bsatn: recovery_args_bsatn, + caller_identity_hex: recovery_caller_identity_hex, + caller_connection_id_hex: recovery_caller_connection_id_hex, + timestamp_micros: recovery_timestamp_micros, + }); let barrier_offset = tx.next_tx_offset(); - let marker_tx_data = match tx.flush_2pc_prepare_marker( - &prepare_id, - coordinator_identity.to_hex().to_string(), - recovery_reducer_name, - recovery_args_bsatn, - recovery_caller_identity_hex, - recovery_caller_connection_id_hex, - recovery_timestamp_micros, - ) { - Ok(td) => std::sync::Arc::new(td), - Err(e) => { - log::error!("call_reducer_prepare_and_hold: flush_2pc_prepare_marker failed for {prepare_id}: {e}"); - let _ = stdb.rollback_mut_tx(tx); - return; - } - }; + let marker_tx_data = std::sync::Arc::new(tx.create_2pc_prepare_tx_data(&marker_row)); // Step 5: send the marker's TxData to the durability worker. This writes // the PREPARE PERSIST commitlog entry: just the st_2pc_state row with // reducer inputs. No reducer row changes are persisted here. stdb.request_durability_for_tx_data(None, &marker_tx_data); - // Step 6: set durability barrier at offset N. The marker (offset N) passes - // through; the reducer's row changes (offset N+1) will be deferred. + // Step 6: set durability barrier at offset N. The marker (offset N) + // passes through; everything after is deferred. stdb.set_durability_barrier(barrier_offset); // Step 7: commit reducer's actual row changes to the in-memory datastore. // TxData created at offset N+1 is deferred by the barrier (NOT sent to // durability worker yet). Lock released. Non-confirmed-read clients see // the changes via subscription broadcast. - let commit_result = commit_and_broadcast_event(&self.common.info.subscriptions, client, event, tx); + // Commit for side effects: applies to committed_state, broadcasts to + // non-confirmed-read clients. The tx_offset (N+1) is deferred by the + // barrier and fsynced later when the barrier clears. + let _ = commit_and_broadcast_event(&self.common.info.subscriptions, client, event, tx); // ═══ WRITE LOCK RELEASED ═══════════════════════════════ + + // Step 7b: add the st_2pc_state DELETE (COMMIT marker) to the + // reducer's deferred TxData. When the barrier clears, a single + // commitlog entry contains both the reducer changes and the + // st_2pc_state deletion -- atomically marking the 2PC as committed. + // The delete row must match the insert row exactly because + // transaction replay uses whole-row equality (delete_equal_row). + { + use spacetimedb_datastore::system_tables::ST_2PC_STATE_NAME; + let table_name = spacetimedb_schema::table_name::TableName::new( + spacetimedb_schema::identifier::Identifier::new(ST_2PC_STATE_NAME.into()).unwrap(), + ); + stdb.modify_barrier_pending_at(barrier_offset + 1, |tx_data| { + tx_data.set_deletes_for_table( + spacetimedb_datastore::system_tables::ST_2PC_STATE_ID, + &table_name, + std::sync::Arc::from([marker_row]), + ); + }); + } + // ── Round 2: Persistence Commit (async — does not block executor) ── let handle = tokio::runtime::Handle::current(); handle.spawn(async move { - // Step 8: wait for PREPARE PERSIST durability (offset N fsynced). - if let Some(prepare_offset) = marker_tx_data.tx_offset() { - if let Some(mut durable) = stdb.durable_tx_offset() { - let _ = durable.wait_for(prepare_offset).await; - } - } + // log::debug!("Spawning Round 2 task. Waiting for durability of offset {} for prepare-id {}", barrier_offset, prepare_id); + // // Step 8: wait for PREPARE PERSIST durability (offset N fsynced). + // if let Some(prepare_offset) = marker_tx_data.tx_offset() { + // if let Some(mut durable) = stdb.durable_tx_offset() { + // let _ = durable.wait_for(prepare_offset).await; + // } + // } + // log::debug!("Waited for durability of offset {} for prepare-id {}", barrier_offset, prepare_id); // Step 9: signal coordinator that B's PREPARE_PERSIST is durable. send_prepared_to_persist_to_coordinator( @@ -832,22 +855,11 @@ impl WasmModuleInstance { if persist_commit { // Step 11: clear the durability barrier so the deferred TxData - // (offset N+1, reducer row changes) flushes to the durability worker. + // (offset N+1, reducer row changes + st_2pc_state delete) flushes + // to the durability worker. The durability pipeline fsyncs it in + // the background; confirmed-read clients wait for the durable + // offset to pass N+1 naturally. stdb.clear_durability_barrier(barrier_offset); - - // Step 12: wait for COMMIT PERSIST durability (offset N+1 fsynced). - if let Some(mut durable) = stdb.durable_tx_offset() { - if let Ok(offset) = commit_result.tx_offset.await { - let _ = durable.wait_for(offset).await; - } - } - - // Step 13: delete the st_2pc_state marker (cleanup). - if let Err(e) = stdb.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |del_tx| { - Ok(del_tx.delete_st_2pc_state(&prepare_id)?) - }) { - log::error!("call_reducer_prepare_and_hold: failed to delete st_2pc_state for {prepare_id}: {e}"); - } } else { // Round 2 abort: discard all deferred transactions. stdb.abort_durability_barrier(barrier_offset); @@ -1251,7 +1263,7 @@ impl InstanceCommon { } match req.send().await { Ok(resp) if resp.status().is_success() => { - log::info!("2PC abort: {prepare_id} on {db_identity}"); + log::debug!("2PC abort: {prepare_id} on {db_identity}"); } Ok(resp) => { log::error!( @@ -1298,7 +1310,7 @@ impl InstanceCommon { } match req.send().await { Ok(resp) if resp.status().is_success() => { - log::info!("2PC commit (Round 1): {prepare_id} on {db_identity}"); + log::debug!("2PC commit (Round 1): {prepare_id} on {db_identity}"); } Ok(resp) => { log::error!( @@ -1318,7 +1330,7 @@ impl InstanceCommon { // If a participant crashes, this will time out and we abort persistence. let mut all_prepared = true; for rx in persist_rxs { - match tokio::time::timeout(Duration::from_secs(120), rx).await { + match tokio::time::timeout(Duration::from_secs(25), rx).await { Ok(Ok(())) => {} Ok(Err(_)) | Err(_) => { log::error!("2PC Round 2: timed out waiting for PREPARED_TO_PERSIST"); @@ -1354,12 +1366,11 @@ impl InstanceCommon { } // Wait for A's coordinator log to be durable. - if let Some(mut durable_offset) = stdb.durable_tx_offset() { - if let Ok(offset) = commit_tx_offset.await { - let _ = durable_offset.wait_for(offset).await; - } - } - + // if let Some(mut durable_offset) = stdb.durable_tx_offset() { + // if let Ok(offset) = commit_tx_offset.await { + // let _ = durable_offset.wait_for(offset).await; + // } + // } // Send COMMIT_PERSIST to each participant. for (db_identity, prepare_id) in &prepared_participants { let base_url = match router.resolve_base_url(*db_identity).await { @@ -1381,13 +1392,13 @@ impl InstanceCommon { } match req.send().await { Ok(resp) if resp.status().is_success() => { - log::info!("2PC commit-persist: {prepare_id} on {db_identity}"); - // Round 2 complete — delete coordinator log entry. - if let Err(e) = stdb.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |del_tx| { - Ok(del_tx.delete_st_2pc_coordinator_log(prepare_id)?) - }) { - log::warn!("delete_st_2pc_coordinator_log failed for {prepare_id}: {e}"); - } + log::debug!("2PC commit-persist: {prepare_id} on {db_identity}"); + // // Round 2 complete — delete coordinator log entry. + // if let Err(e) = stdb.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |del_tx| { + // Ok(del_tx.delete_st_2pc_coordinator_log(prepare_id)?) + // }) { + // log::warn!("delete_st_2pc_coordinator_log failed for {prepare_id}: {e}"); + // } } Ok(resp) => { log::error!( diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index e23e3da01bb..8a33b967b03 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -1996,14 +1996,13 @@ impl WasmInstanceEnv { let args_buf = mem.deref_slice(args_ptr, args_len)?; let args = bytes::Bytes::copy_from_slice(args_buf); - let handle = tokio::runtime::Handle::current(); - let fut = env + let result = env .instance_env .call_reducer_on_db(database_identity, &reducer_name, args); - let result = super::super::block_on_scoped(&handle, fut); match result { Ok((status, body)) => { + let body = if body.is_empty() { bytes::Bytes::from_static(&[0]) } else { body }; let bytes_source = WasmInstanceEnv::create_bytes_source(env, body)?; bytes_source.0.write_to(mem, out)?; Ok(status as u32) @@ -2055,11 +2054,13 @@ impl WasmInstanceEnv { let args_buf = mem.deref_slice(args_ptr, args_len)?; let args = bytes::Bytes::copy_from_slice(args_buf); - let handle = tokio::runtime::Handle::current(); - let fut = env + let result = env .instance_env .call_reducer_on_db_2pc(database_identity, &reducer_name, args); - let result = super::super::block_on_scoped(&handle, fut); + + log::debug!("call_reducer_on_db_2pc host: result is_ok={} is_http_err={}", + result.is_ok(), + matches!(&result, Err(crate::error::NodesError::HttpError(_)))); match result { Ok((status, body, prepare_id)) => { @@ -2069,6 +2070,10 @@ impl WasmInstanceEnv { { env.instance_env.prepared_participants.push((database_identity, pid)); } + // create_bytes_source returns INVALID for empty bytes, but the + // WASM-side caller reads the source unconditionally on success. + // Use a single zero byte to guarantee a valid handle. + let body = if body.is_empty() { bytes::Bytes::from_static(&[0]) } else { body }; let bytes_source = WasmInstanceEnv::create_bytes_source(env, body)?; bytes_source.0.write_to(mem, out)?; Ok(status as u32) diff --git a/crates/core/src/replica_context.rs b/crates/core/src/replica_context.rs index d73b83bcb1a..e51626c0c30 100644 --- a/crates/core/src/replica_context.rs +++ b/crates/core/src/replica_context.rs @@ -3,6 +3,7 @@ use spacetimedb_commitlog::SizeOnDisk; use super::database_logger::DatabaseLogger; use crate::db::relational_db::RelationalDB; use crate::error::DBError; +use crate::host::call_edge_tracker::CallEdgeTracker; use crate::host::prepared_tx::PreparedTransactions; use crate::host::reducer_router::ReducerCallRouter; use crate::messages::control_db::Database; @@ -72,13 +73,18 @@ pub struct ReplicaContext { /// async task that can't panic on the WASM executor thread (e.g., 2PC persistence /// abort in Round 2). Set once by `launch_module`; empty in tests. pub on_panic: Arc>>, + /// Distributed deadlock detection: tracks cross-database call edges. + /// Standalone uses [`crate::host::call_edge_tracker::NoopCallEdgeTracker`]. + /// Blocking HTTP client for cross-database calls on the WASM executor thread. + /// Built on a fresh OS thread to avoid tokio runtime conflicts. + pub call_reducer_blocking_client: reqwest::blocking::Client, + /// Distributed deadlock detection: tracks cross-database call edges. + /// Standalone uses [`crate::host::call_edge_tracker::NoopCallEdgeTracker`]. + pub call_edge_tracker: Arc, } impl ReplicaContext { - /// Build a warmed `reqwest::Client` from `config`. - /// - /// Uses HTTP/2 prior knowledge (h2c) for all connections. - /// The server must be configured to accept h2c (HTTP/2 cleartext) connections. + /// Build a warmed async `reqwest::Client` from `config`. pub fn new_call_reducer_client(config: &CallReducerOnDbConfig) -> reqwest::Client { reqwest::Client::builder() .tcp_keepalive(config.tcp_keepalive) @@ -89,6 +95,61 @@ impl ReplicaContext { .build() .expect("failed to build call_reducer_on_db HTTP client") } + + /// Build a blocking `reqwest::blocking::Client` on a fresh OS thread + /// to avoid conflicts with the tokio async runtime. + pub fn new_call_reducer_blocking_client(config: &CallReducerOnDbConfig) -> reqwest::blocking::Client { + let tcp_keepalive = config.tcp_keepalive; + let pool_idle_timeout = config.pool_idle_timeout; + let pool_max_idle_per_host = config.pool_max_idle_per_host; + let timeout = config.request_timeout; + std::thread::scope(|s| { + s.spawn(move || { + reqwest::blocking::Client::builder() + .tcp_keepalive(tcp_keepalive) + .http2_prior_knowledge() + .pool_idle_timeout(pool_idle_timeout) + .pool_max_idle_per_host(pool_max_idle_per_host) + .timeout(timeout) + .build() + .expect("failed to build call_reducer_on_db blocking HTTP client") + }) + .join() + .expect("blocking client builder thread panicked") + }) + } +} + +/// Execute a blocking reqwest request on a fresh OS thread, processing the +/// response inside that same thread. +/// +/// In debug builds, reqwest 0.12 calls `wait::enter()` on every I/O operation +/// (send, bytes, text, ...). That function creates and immediately drops a mini +/// tokio runtime as a nesting-check, which panics if the calling thread is +/// already inside a tokio `block_on` context (e.g. the WASM executor thread). +/// +/// By running both the send and all response reading inside a scoped OS thread +/// that has no tokio context, the assertion always passes. The closure `f` +/// receives the Response and must fully consume it (read body, extract headers, +/// etc.) before returning -- do not let the Response escape the closure. +pub fn execute_blocking_http( + client: &reqwest::blocking::Client, + request: reqwest::blocking::RequestBuilder, + f: F, +) -> std::result::Result +where + F: FnOnce(reqwest::blocking::Response) -> reqwest::Result + Send + 'static, + T: Send + 'static, +{ + let client = client.clone(); + std::thread::scope(|s| { + s.spawn(move || { + let request = request.build().map_err(|e| e.to_string())?; + client.execute(request).and_then(f).map_err(|e| e.to_string()) + }) + .join() + .unwrap_or_else(|e| std::panic::resume_unwind(e)) + }) } impl ReplicaContext { diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 8663186c336..199266402c6 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -1392,6 +1392,23 @@ impl CommittedState { /// Used by the 2PC participant path to flush the `st_2pc_state` PREPARE marker to the /// commitlog (via the durability worker) while keeping the reducer's write lock open, /// so that no other transaction can interleave between PREPARE and COMMIT/ABORT. + /// Create a `TxData` for an insert WITHOUT modifying any table. + /// Only consumes a `tx_offset`. Used by pipelined 2PC to write + /// st_2pc_state to the commitlog without putting it in committed state. + pub(super) fn create_insert_tx_data( + &mut self, + table_id: TableId, + table_name: &spacetimedb_schema::table_name::TableName, + row: &ProductValue, + ) -> TxData { + let row_arc: Arc<[ProductValue]> = Arc::from([row.clone()]); + let mut tx_data = TxData::default(); + tx_data.set_inserts_for_table(table_id, table_name, row_arc); + tx_data.set_tx_offset(self.next_tx_offset); + self.next_tx_offset += 1; + tx_data + } + pub(super) fn insert_row_and_consume_offset( &mut self, table_id: TableId, diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 5441212e9eb..d6368574e71 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -2735,6 +2735,20 @@ impl MutTxId { }) } + /// Create a `TxData` for a st_2pc_state INSERT without modifying committed state. + /// Only consumes a tx_offset. The row exists only in the commitlog (for recovery). + /// Takes a pre-built `ProductValue` so the caller can reuse it for the matching + /// DELETE entry (transaction replay uses whole-row equality). + pub fn create_2pc_prepare_tx_data(&mut self, row: &ProductValue) -> TxData { + let schema = self + .committed_state_write_lock + .get_schema(ST_2PC_STATE_ID) + .expect("st_2pc_state system table must exist"); + let table_name = schema.table_name.clone(); + self.committed_state_write_lock + .create_insert_tx_data(ST_2PC_STATE_ID, &table_name, row) + } + /// Delete the coordinator log entry for `participant_prepare_id` once the participant /// has acknowledged COMMIT. pub fn delete_st_2pc_coordinator_log(&mut self, participant_prepare_id: &str) -> Result<()> { diff --git a/crates/datastore/src/system_tables.rs b/crates/datastore/src/system_tables.rs index e19cc47fd74..1f9aa22f0ce 100644 --- a/crates/datastore/src/system_tables.rs +++ b/crates/datastore/src/system_tables.rs @@ -90,7 +90,7 @@ pub const ST_INDEX_ACCESSOR_ID: TableId = TableId(19); pub const ST_COLUMN_ACCESSOR_ID: TableId = TableId(20); /// The static ID of the 2PC participant state table pub const ST_2PC_STATE_ID: TableId = TableId(21); -pub(crate) const ST_2PC_STATE_NAME: &str = "st_2pc_state"; +pub const ST_2PC_STATE_NAME: &str = "st_2pc_state"; /// The static ID of the 2PC coordinator log table. /// A row is written atomically with the coordinator's commit, before sending COMMIT to participants. /// Used on coordinator crash-recovery to retransmit COMMIT decisions. diff --git a/crates/datastore/src/traits.rs b/crates/datastore/src/traits.rs index e1b99825ae2..98ea31bba47 100644 --- a/crates/datastore/src/traits.rs +++ b/crates/datastore/src/traits.rs @@ -226,7 +226,7 @@ impl TxDataTableEntry { /// /// Some extra information is embedded here /// so that the recording of execution metrics can be done without holding the tx lock. -#[derive(Default)] +#[derive(Default, Clone)] pub struct TxData { entries: SmallHashMap, diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index d7264f409a3..3d6ee16bae2 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -82,7 +82,7 @@ impl StandaloneEnv { let program_store = Arc::new(DiskStorage::new(data_dir.program_bytes().0).await?); let persistence_provider = Arc::new(LocalPersistenceProvider::new(data_dir.clone())); - let mut host_controller = HostController::new( + let host_controller = HostController::new( data_dir, config.db_config, config.v8_heap_policy, @@ -91,7 +91,7 @@ impl StandaloneEnv { persistence_provider, db_cores, ); - host_controller.call_reducer_router = Arc::new(LocalReducerRouter::new(config.local_api_url)); + host_controller.set_call_reducer_router(Arc::new(LocalReducerRouter::new(config.local_api_url))); let client_actor_index = ClientActorIndex::new(); let jwt_keys = certs.get_or_create_keys()?; @@ -484,6 +484,7 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv { .control_db .spacetime_replace_domains(database_identity, owner_identity, domain_names)?) } + } impl spacetimedb_client_api::Authorization for StandaloneEnv { diff --git a/modules/chain-call-repro/Cargo.toml b/modules/chain-call-repro/Cargo.toml new file mode 100644 index 00000000000..37e966784f4 --- /dev/null +++ b/modules/chain-call-repro/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "chain-call-repro" +version = "0.0.0" +edition.workspace = true + +[lib] +crate-type = ["cdylib"] +bench = false + +[dependencies] +spacetimedb = { path = "../../crates/bindings", features = ["unstable"] } diff --git a/modules/chain-call-repro/run.sh b/modules/chain-call-repro/run.sh new file mode 100755 index 00000000000..898c901931a --- /dev/null +++ b/modules/chain-call-repro/run.sh @@ -0,0 +1,211 @@ +#!/usr/bin/env bash + +set -euo pipefail + +SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)" +SERVER="${SPACETIME_SERVER:-local}" +A_CLIENTS="${A_CLIENTS:-4}" +B_CLIENTS="${B_CLIENTS:-4}" +CYCLE_CLIENTS="${CYCLE_CLIENTS:-0}" +ITERATIONS="${ITERATIONS:-25}" +BURN_ITERS="${BURN_ITERS:-0}" +RUN_ID="$(date +%Y%m%d%H%M%S)-$$" +DB_A="independent-repro-a-${RUN_ID}" +DB_B="independent-repro-b-${RUN_ID}" +DB_C="independent-repro-c-${RUN_ID}" +TMP_DIR="$(mktemp -d)" + +cleanup() { + rm -rf "$TMP_DIR" +} +trap cleanup EXIT + +publish_db() { + local db_name="$1" + local output + local identity + + output="$(cd "$SCRIPT_DIR" && spacetime publish --server "$SERVER" --clear-database -y "$db_name")" + printf '%s\n' "$output" >&2 + + identity="$( + printf '%s\n' "$output" \ + | grep -Eo 'identity: [0-9a-fA-F]+' \ + | sed 's/^identity: //' \ + | tail -n1 + )" + if [[ -z "$identity" ]]; then + echo "failed to parse identity from publish output for $db_name" >&2 + return 1 + fi + + printf '%s\n' "$identity" +} + +run_a_client() { + local client_id="$1" + local failures=0 + local seq + + for ((seq = 1; seq <= ITERATIONS; seq++)); do + if ! ( + cd "$SCRIPT_DIR" && + spacetime call --server "$SERVER" -- "$DB_A_ID" call_b_from_a \ + "$DB_B_ID" \ + "a-client-${client_id}" \ + "$seq" \ + "a-msg-client-${client_id}-seq-${seq}" \ + "$BURN_ITERS" + ) >"$TMP_DIR/a-client-${client_id}-seq-${seq}.log" 2>&1; then + failures=$((failures + 1)) + fi + done + + printf '%s\n' "$failures" >"$TMP_DIR/a-client-${client_id}.failures" +} + +run_b_client() { + local client_id="$1" + local failures=0 + local seq + + for ((seq = 1; seq <= ITERATIONS; seq++)); do + if ! ( + cd "$SCRIPT_DIR" && + spacetime call --server "$SERVER" -- "$DB_B_ID" call_c_from_b \ + "$DB_C_ID" \ + "b-client-${client_id}" \ + "$seq" \ + "b-msg-client-${client_id}-seq-${seq}" \ + "$BURN_ITERS" + ) >"$TMP_DIR/b-client-${client_id}-seq-${seq}.log" 2>&1; then + failures=$((failures + 1)) + fi + done + + printf '%s\n' "$failures" >"$TMP_DIR/b-client-${client_id}.failures" +} + +run_cycle_client() { + local client_id="$1" + local failures=0 + local detected=0 + local seq + + for ((seq = 1; seq <= ITERATIONS; seq++)); do + local output + if output=$( + cd "$SCRIPT_DIR" && + spacetime call --server "$SERVER" -- "$DB_A_ID" cycle_a_calls_b \ + "$DB_B_ID" \ + "$DB_A_ID" \ + "cycle-client-${client_id}" \ + "$seq" \ + "cycle-msg-client-${client_id}-seq-${seq}" \ + "$BURN_ITERS" 2>&1 + ); then + : # success (should not happen -- this is a deadlock) + else + if echo "$output" | grep -q "cycle detected\|deadlock"; then + detected=$((detected + 1)) + else + failures=$((failures + 1)) + fi + fi + done + + printf '%s\n' "$failures" >"$TMP_DIR/cycle-client-${client_id}.failures" + printf '%s\n' "$detected" >"$TMP_DIR/cycle-client-${client_id}.detected" +} + +echo "Publishing independent-call repro module to A, B, and C on server '$SERVER'..." +DB_C_ID="$(publish_db "$DB_C")" +DB_B_ID="$(publish_db "$DB_B")" +DB_A_ID="$(publish_db "$DB_A")" + +echo "A identity: $DB_A_ID" +echo "B identity: $DB_B_ID" +echo "C identity: $DB_C_ID" +echo "Starting $A_CLIENTS A-clients, $B_CLIENTS B-clients, and $CYCLE_CLIENTS cycle-clients with $ITERATIONS calls each..." + +for ((client_id = 1; client_id <= A_CLIENTS; client_id++)); do + run_a_client "$client_id" & +done +for ((client_id = 1; client_id <= B_CLIENTS; client_id++)); do + run_b_client "$client_id" & +done +for ((client_id = 1; client_id <= CYCLE_CLIENTS; client_id++)); do + run_cycle_client "$client_id" & +done +wait + +A_FAILURES=0 +for ((client_id = 1; client_id <= A_CLIENTS; client_id++)); do + client_failures="$(cat "$TMP_DIR/a-client-${client_id}.failures")" + A_FAILURES=$((A_FAILURES + client_failures)) +done + +B_FAILURES=0 +for ((client_id = 1; client_id <= B_CLIENTS; client_id++)); do + client_failures="$(cat "$TMP_DIR/b-client-${client_id}.failures")" + B_FAILURES=$((B_FAILURES + client_failures)) +done + +CYCLE_FAILURES=0 +CYCLE_DETECTED=0 +for ((client_id = 1; client_id <= CYCLE_CLIENTS; client_id++)); do + client_failures="$(cat "$TMP_DIR/cycle-client-${client_id}.failures")" + client_detected="$(cat "$TMP_DIR/cycle-client-${client_id}.detected")" + CYCLE_FAILURES=$((CYCLE_FAILURES + client_failures)) + CYCLE_DETECTED=$((CYCLE_DETECTED + client_detected)) +done + +A_SUCCESSES=$((A_CLIENTS * ITERATIONS - A_FAILURES)) +B_SUCCESSES=$((B_CLIENTS * ITERATIONS - B_FAILURES)) +CYCLE_TOTAL=$((CYCLE_CLIENTS * ITERATIONS)) +CYCLE_OTHER=$((CYCLE_TOTAL - CYCLE_DETECTED - CYCLE_FAILURES)) +TOTAL_FAILURES=$((A_FAILURES + B_FAILURES + CYCLE_FAILURES)) + +echo "Successful A->B calls: $A_SUCCESSES" +echo "Failed A->B calls: $A_FAILURES" +echo "Successful B->C calls: $B_SUCCESSES" +echo "Failed B->C calls: $B_FAILURES" +if [[ "$CYCLE_CLIENTS" -gt 0 ]]; then + echo "Cycle calls (A->B->A): $CYCLE_TOTAL total" + echo " Deadlock detected: $CYCLE_DETECTED" + echo " Other failures: $CYCLE_FAILURES" + echo " Unexpectedly succeeded: $CYCLE_OTHER" +fi + +if [[ "$A_SUCCESSES" -gt 0 ]]; then + (cd "$SCRIPT_DIR" && spacetime call --server "$SERVER" -- "$DB_A_ID" assert_kind_count sent_to_b "$A_SUCCESSES") + (cd "$SCRIPT_DIR" && spacetime call --server "$SERVER" -- "$DB_B_ID" assert_kind_count recv_from_a "$A_SUCCESSES") +fi + +if [[ "$B_SUCCESSES" -gt 0 ]]; then + (cd "$SCRIPT_DIR" && spacetime call --server "$SERVER" -- "$DB_B_ID" assert_kind_count sent_to_c "$B_SUCCESSES") + (cd "$SCRIPT_DIR" && spacetime call --server "$SERVER" -- "$DB_C_ID" assert_kind_count recv_from_b "$B_SUCCESSES") +fi + +if [[ "$TOTAL_FAILURES" -ne 0 ]]; then + echo + echo "At least one client call failed. Sample failure logs:" + find "$TMP_DIR" -name '*-client-*-seq-*.log' -type f -print0 \ + | xargs -0 grep -l "Error\|failed\|panic" \ + | head -n 10 \ + | while read -r log_file; do + echo "--- $log_file ---" + cat "$log_file" + done + exit 1 +fi + +echo +echo "Run complete." +echo "Flows exercised independently:" +echo "A reducer calls B" +echo "B reducer calls C" +echo "Use these database identities to inspect state manually if needed:" +echo "A: $DB_A_ID" +echo "B: $DB_B_ID" +echo "C: $DB_C_ID" diff --git a/modules/chain-call-repro/src/lib.rs b/modules/chain-call-repro/src/lib.rs new file mode 100644 index 00000000000..7df4ec58c31 --- /dev/null +++ b/modules/chain-call-repro/src/lib.rs @@ -0,0 +1,229 @@ +use spacetimedb::{Identity, ReducerContext, SpacetimeType, Table}; + +#[derive(SpacetimeType, Clone)] +pub struct CallPayload { + pub client_label: String, + pub seq: u64, + pub message: String, +} + +#[spacetimedb::table(accessor = call_log, public)] +pub struct CallLog { + #[primary_key] + #[auto_inc] + id: u64, + kind: String, + client_label: String, + seq: u64, + message: String, +} + +fn log_entry(ctx: &ReducerContext, kind: &str, payload: &CallPayload) { + ctx.db.call_log().insert(CallLog { + id: 0, + kind: kind.to_string(), + client_label: payload.client_label.clone(), + seq: payload.seq, + message: payload.message.clone(), + }); +} + +fn burn(iters: u64) { + if iters == 0 { + return; + } + + let mut x = 1u64; + for i in 0..iters { + x = x.wrapping_mul(6364136223846793005u64).wrapping_add(i | 1); + } + if x == 0 { + panic!("impossible burn result"); + } +} + +#[spacetimedb::reducer] +pub fn record_on_b(ctx: &ReducerContext, payload: CallPayload, burn_iters: u64) -> Result<(), String> { + burn(burn_iters); + log_entry(ctx, "recv_from_a", &payload); + Ok(()) +} + +#[spacetimedb::reducer] +pub fn record_on_c(ctx: &ReducerContext, payload: CallPayload, burn_iters: u64) -> Result<(), String> { + burn(burn_iters); + log_entry(ctx, "recv_from_b", &payload); + Ok(()) +} + +#[spacetimedb::reducer] +pub fn call_b_from_a( + ctx: &ReducerContext, + b_hex: String, + client_label: String, + seq: u64, + message: String, + burn_iters: u64, +) -> Result<(), String> { + burn(burn_iters); + + let b = Identity::from_hex(&b_hex).expect("invalid B identity"); + let payload = CallPayload { + client_label, + seq, + message, + }; + let args = spacetimedb::spacetimedb_lib::bsatn::to_vec(&(payload.clone(), burn_iters)) + .expect("failed to encode args for record_on_b"); + spacetimedb::remote_reducer::call_reducer_on_db_2pc(b, "record_on_b", &args) + .map_err(|e| format!("call_b_from_a: call to B failed: {e}"))?; + + log_entry(ctx, "sent_to_b", &payload); + Ok(()) +} + +#[spacetimedb::reducer] +pub fn call_c_from_b( + ctx: &ReducerContext, + c_hex: String, + client_label: String, + seq: u64, + message: String, + burn_iters: u64, +) -> Result<(), String> { + burn(burn_iters); + + let c = Identity::from_hex(&c_hex).expect("invalid C identity"); + let payload = CallPayload { + client_label, + seq, + message, + }; + let args = spacetimedb::spacetimedb_lib::bsatn::to_vec(&(payload.clone(), burn_iters)) + .expect("failed to encode args for record_on_c"); + spacetimedb::remote_reducer::call_reducer_on_db_2pc(c, "record_on_c", &args) + .map_err(|e| format!("call_c_from_b: call to C failed: {e}"))?; + + log_entry(ctx, "sent_to_c", &payload); + Ok(()) +} + +// --------------------------------------------------------------------------- +// Cycle: A calls B, B calls back to A +// --------------------------------------------------------------------------- + +/// Called on database A. Calls `cycle_b_calls_a` on database B, which calls back here. +#[spacetimedb::reducer] +pub fn cycle_a_calls_b( + ctx: &ReducerContext, + b_hex: String, + a_hex: String, + client_label: String, + seq: u64, + message: String, + burn_iters: u64, +) -> Result<(), String> { + burn(burn_iters); + + let payload = CallPayload { + client_label, + seq, + message, + }; + log_entry(ctx, "cycle_a_sent_to_b", &payload); + + let b = Identity::from_hex(&b_hex).expect("invalid B identity"); + let args = spacetimedb::spacetimedb_lib::bsatn::to_vec(&( + a_hex, + payload.clone(), + burn_iters, + )) + .expect("failed to encode args for cycle_b_calls_a"); + spacetimedb::remote_reducer::call_reducer_on_db_2pc(b, "cycle_b_calls_a", &args) + .map_err(|e| format!("cycle_a_calls_b: call to B failed: {e}"))?; + + Ok(()) +} + +/// Called on database B by `cycle_a_calls_b`. Calls `cycle_a_receives` back on A, +/// completing the A→B→A cycle. +#[spacetimedb::reducer] +pub fn cycle_b_calls_a( + ctx: &ReducerContext, + a_hex: String, + payload: CallPayload, + burn_iters: u64, +) -> Result<(), String> { + burn(burn_iters); + log_entry(ctx, "cycle_b_recv_from_a", &payload); + + let a = Identity::from_hex(&a_hex).expect("invalid A identity"); + let args = spacetimedb::spacetimedb_lib::bsatn::to_vec(&(payload.clone(), burn_iters)) + .expect("failed to encode args for cycle_a_receives"); + spacetimedb::remote_reducer::call_reducer_on_db_2pc(a, "cycle_a_receives", &args) + .map_err(|e| format!("cycle_b_calls_a: call back to A failed: {e}"))?; + + log_entry(ctx, "cycle_b_sent_to_a", &payload); + Ok(()) +} + +/// Terminal reducer on A, called by B to complete the cycle. +#[spacetimedb::reducer] +pub fn cycle_a_receives( + ctx: &ReducerContext, + payload: CallPayload, + burn_iters: u64, +) -> Result<(), String> { + burn(burn_iters); + log_entry(ctx, "cycle_a_recv_from_b", &payload); + Ok(()) +} + +// --------------------------------------------------------------------------- +// Busy-work reducers: no cross-database calls, just burn CPU and log +// --------------------------------------------------------------------------- + +#[spacetimedb::reducer] +pub fn busy_work_small(ctx: &ReducerContext, client_label: String, seq: u64) -> Result<(), String> { + burn(1_000); + let payload = CallPayload { + client_label, + seq, + message: "small".to_string(), + }; + log_entry(ctx, "busy_small", &payload); + Ok(()) +} + +#[spacetimedb::reducer] +pub fn busy_work_medium(ctx: &ReducerContext, client_label: String, seq: u64) -> Result<(), String> { + burn(100_000); + let payload = CallPayload { + client_label, + seq, + message: "medium".to_string(), + }; + log_entry(ctx, "busy_medium", &payload); + Ok(()) +} + +#[spacetimedb::reducer] +pub fn busy_work_large(ctx: &ReducerContext, client_label: String, seq: u64) -> Result<(), String> { + burn(10_000_000); + let payload = CallPayload { + client_label, + seq, + message: "large".to_string(), + }; + log_entry(ctx, "busy_large", &payload); + Ok(()) +} + +#[spacetimedb::reducer] +pub fn assert_kind_count(ctx: &ReducerContext, kind: String, expected: u64) -> Result<(), String> { + let actual = ctx.db.call_log().iter().filter(|row| row.kind == kind).count() as u64; + if actual != expected { + return Err(format!("expected kind '{kind}' count {expected}, got {actual}")); + } + Ok(()) +} diff --git a/modules/tpcc-metrics/src/lib.rs b/modules/tpcc-metrics/src/lib.rs index f8a7c488d29..823e2f0b13d 100644 --- a/modules/tpcc-metrics/src/lib.rs +++ b/modules/tpcc-metrics/src/lib.rs @@ -1,5 +1,7 @@ use spacetimedb::{reducer, table, ReducerContext, Table}; +const BUCKET_SIZE_MS: u64 = 1_000; + #[table(accessor = state, public)] pub struct State { #[primary_key] @@ -21,6 +23,29 @@ pub struct Txn { pub latency_ms: u16, } +#[table(accessor = txn_bucket, public)] +pub struct TxnBucket { + #[primary_key] + pub bucket_start_ms: u64, + pub count: u64, +} + +fn clear_tables(ctx: &ReducerContext) { + for row in ctx.db.state().iter() { + ctx.db.state().id().delete(row.id); + } + + for row in ctx.db.txn().iter() { + ctx.db.txn().id().delete(row.id); + } + + for row in ctx.db.txn_bucket().iter() { + ctx.db.txn_bucket() + .bucket_start_ms() + .delete(row.bucket_start_ms); + } +} + #[reducer] pub fn reset( ctx: &ReducerContext, @@ -29,13 +54,7 @@ pub fn reset( measure_start_ms: u64, measure_end_ms: u64, ) { - for row in ctx.db.state().iter() { - ctx.db.state().id().delete(row.id); - } - - for row in ctx.db.txn().iter() { - ctx.db.txn().id().delete(row.id); - } + clear_tables(ctx); ctx.db.state().insert(State { id: 0, @@ -49,13 +68,7 @@ pub fn reset( #[reducer] pub fn clear_state(ctx: &ReducerContext) { - for row in ctx.db.state().iter() { - ctx.db.state().id().delete(row.id); - } - - for row in ctx.db.txn().iter() { - ctx.db.txn().id().delete(row.id); - } + clear_tables(ctx); } #[reducer] @@ -68,3 +81,31 @@ pub fn record_txn(ctx: &ReducerContext, latency_ms: u16) { latency_ms, }); } + +#[reducer] +pub fn record_txn_bucket(ctx: &ReducerContext) { + let current_time_ms = ctx + .timestamp + .to_duration_since_unix_epoch() + .unwrap() + .as_millis() as u64; + let Some(state) = ctx.db.state().id().find(0) else { + return; + }; + + let bucket_offset_ms = current_time_ms.saturating_sub(state.run_start_ms); + let bucket_start_ms = + state.run_start_ms + ((bucket_offset_ms / BUCKET_SIZE_MS) * BUCKET_SIZE_MS); + + if let Some(bucket) = ctx.db.txn_bucket().bucket_start_ms().find(bucket_start_ms) { + ctx.db.txn_bucket().bucket_start_ms().update(TxnBucket { + count: bucket.count.saturating_add(1), + ..bucket + }); + } else { + ctx.db.txn_bucket().insert(TxnBucket { + bucket_start_ms, + count: 1, + }); + } +} diff --git a/modules/tpcc/src/load.rs b/modules/tpcc/src/load.rs index 30947066b3b..9b0437b4dbe 100644 --- a/modules/tpcc/src/load.rs +++ b/modules/tpcc/src/load.rs @@ -53,6 +53,8 @@ pub struct TpccLoadConfigRequest { pub database_number: u32, pub num_databases: u32, pub warehouses_per_database: u32, + pub warehouse_id_offset: u32, + pub skip_items: bool, pub batch_size: u32, pub seed: u64, pub load_c_last: u32, @@ -69,6 +71,8 @@ pub struct TpccLoadConfig { pub database_number: u32, pub num_databases: u32, pub warehouses_per_database: u32, + pub warehouse_id_offset: u32, + pub skip_items: bool, pub batch_size: u32, pub seed: u64, pub load_c_last: u32, @@ -227,6 +231,8 @@ fn configure_tpcc_load_internal(ctx: &ReducerContext, request: TpccLoadConfigReq database_number: request.database_number, num_databases: request.num_databases, warehouses_per_database: request.warehouses_per_database, + warehouse_id_offset: request.warehouse_id_offset, + skip_items: request.skip_items, batch_size: request.batch_size, seed: request.seed, load_c_last: request.load_c_last, @@ -265,6 +271,21 @@ fn validate_request(request: &TpccLoadConfigRequest) -> Result<(), String> { request.num_databases, request.warehouses_per_database )); } + // Validate that the warehouse ID range for this database doesn't overflow u32. + // warehouse_start = database_number * warehouses_per_database + warehouse_id_offset + 1 + // warehouse_end = warehouse_start + warehouses_per_database - 1 + if request + .database_number + .checked_mul(request.warehouses_per_database) + .and_then(|v| v.checked_add(request.warehouse_id_offset)) + .and_then(|v| v.checked_add(request.warehouses_per_database)) + .is_none() + { + return Err(format!( + "warehouse id range overflow u32 (database_number={}, warehouses_per_database={}, warehouse_id_offset={})", + request.database_number, request.warehouses_per_database, request.warehouse_id_offset + )); + } Ok(()) } @@ -272,8 +293,16 @@ fn initial_state(request: &TpccLoadConfigRequest, now: Timestamp) -> TpccLoadSta TpccLoadState { singleton_id: LOAD_SINGLETON_ID, status: TpccLoadStatus::Idle, - phase: TpccLoadPhase::Items, - next_warehouse_id: warehouse_start(request.database_number, request.warehouses_per_database), + phase: if request.skip_items { + TpccLoadPhase::WarehousesDistricts + } else { + TpccLoadPhase::Items + }, + next_warehouse_id: warehouse_start( + request.database_number, + request.warehouses_per_database, + request.warehouse_id_offset, + ), next_district_id: 1, next_item_id: 1, next_order_id: 1, @@ -291,6 +320,8 @@ fn config_as_request(config: &TpccLoadConfig) -> TpccLoadConfigRequest { database_number: config.database_number, num_databases: config.num_databases, warehouses_per_database: config.warehouses_per_database, + warehouse_id_offset: config.warehouse_id_offset, + skip_items: config.skip_items, batch_size: config.batch_size, seed: config.seed, load_c_last: config.load_c_last, @@ -308,7 +339,7 @@ fn build_remote_warehouses(request: &TpccLoadConfigRequest) -> Vec Result { let _timer = LogStopwatch::new("load_warehouses_district_chunk"); - let end_warehouse = warehouse_end(config.database_number, config.warehouses_per_database); - if job.next_warehouse_id < warehouse_start(config.database_number, config.warehouses_per_database) + let end_warehouse = warehouse_end(config.database_number, config.warehouses_per_database, config.warehouse_id_offset); + if job.next_warehouse_id < warehouse_start(config.database_number, config.warehouses_per_database, config.warehouse_id_offset) || job.next_warehouse_id > end_warehouse { return Err(format!("invalid warehouse cursor {}", job.next_warehouse_id)); @@ -437,7 +468,7 @@ fn load_warehouse_district_chunk( TpccLoadPhase::WarehousesDistricts }, next_warehouse_id: if job.next_warehouse_id == end_warehouse { - warehouse_start(config.database_number, config.warehouses_per_database) + warehouse_start(config.database_number, config.warehouses_per_database, config.warehouse_id_offset) } else { job.next_warehouse_id + 1 }, @@ -451,8 +482,8 @@ fn load_warehouse_district_chunk( fn load_stock_chunk(ctx: &ReducerContext, config: &TpccLoadConfig, job: &TpccLoadJob) -> Result { let _timer = LogStopwatch::new("load_stock_chunk"); - let start_warehouse = warehouse_start(config.database_number, config.warehouses_per_database); - let end_warehouse = warehouse_end(config.database_number, config.warehouses_per_database); + let start_warehouse = warehouse_start(config.database_number, config.warehouses_per_database, config.warehouse_id_offset); + let end_warehouse = warehouse_end(config.database_number, config.warehouses_per_database, config.warehouse_id_offset); if job.next_warehouse_id < start_warehouse || job.next_warehouse_id > end_warehouse { return Err(format!("invalid stock warehouse cursor {}", job.next_warehouse_id)); } @@ -492,8 +523,8 @@ fn load_customer_history_chunk( job: &TpccLoadJob, ) -> Result { let _timer = LogStopwatch::new("load_customer_history_chunk"); - let start_warehouse = warehouse_start(config.database_number, config.warehouses_per_database); - let end_warehouse = warehouse_end(config.database_number, config.warehouses_per_database); + let start_warehouse = warehouse_start(config.database_number, config.warehouses_per_database, config.warehouse_id_offset); + let end_warehouse = warehouse_end(config.database_number, config.warehouses_per_database, config.warehouse_id_offset); if job.next_warehouse_id < start_warehouse || job.next_warehouse_id > end_warehouse { return Err(format!("invalid customer warehouse cursor {}", job.next_warehouse_id)); } @@ -542,8 +573,8 @@ fn load_customer_history_chunk( fn load_order_chunk(ctx: &ReducerContext, config: &TpccLoadConfig, job: &TpccLoadJob) -> Result { let _timer = LogStopwatch::new("load_order_chunk"); - let start_warehouse = warehouse_start(config.database_number, config.warehouses_per_database); - let end_warehouse = warehouse_end(config.database_number, config.warehouses_per_database); + let start_warehouse = warehouse_start(config.database_number, config.warehouses_per_database, config.warehouse_id_offset); + let end_warehouse = warehouse_end(config.database_number, config.warehouses_per_database, config.warehouse_id_offset); if job.next_warehouse_id < start_warehouse || job.next_warehouse_id > end_warehouse { return Err(format!("invalid order warehouse cursor {}", job.next_warehouse_id)); } @@ -835,21 +866,26 @@ fn customer_permutation(config: &TpccLoadConfig, warehouse_id: WarehouseId, dist permutation } -fn warehouse_range(database_number: u32, warehouses_per_database: u32) -> std::ops::Range { - let start = warehouse_start(database_number, warehouses_per_database); +fn warehouse_range( + database_number: u32, + warehouses_per_database: u32, + offset: u32, +) -> std::ops::Range { + let start = warehouse_start(database_number, warehouses_per_database, offset); let end = start + warehouses_per_database; start..end } -fn warehouse_start(database_number: u32, warehouses_per_database: u32) -> WarehouseId { +fn warehouse_start(database_number: u32, warehouses_per_database: u32, offset: u32) -> WarehouseId { database_number .checked_mul(warehouses_per_database) + .and_then(|value| value.checked_add(offset)) .and_then(|value| value.checked_add(1)) .expect("warehouse id arithmetic validated at configure_tpcc_load time") } -fn warehouse_end(database_number: u32, warehouses_per_database: u32) -> WarehouseId { - warehouse_start(database_number, warehouses_per_database) + warehouses_per_database - 1 +fn warehouse_end(database_number: u32, warehouses_per_database: u32, offset: u32) -> WarehouseId { + warehouse_start(database_number, warehouses_per_database, offset) + warehouses_per_database - 1 } fn deterministic_rng(seed: u64, tag: u64, parts: &[u64]) -> StdRng { diff --git a/modules/tpcc/src/new_order.rs b/modules/tpcc/src/new_order.rs index 7887f615e14..00dc06fa434 100644 --- a/modules/tpcc/src/new_order.rs +++ b/modules/tpcc/src/new_order.rs @@ -1,9 +1,7 @@ use std::collections::HashMap; use crate::{ - district, find_customer_by_id, find_district, find_stock, find_warehouse, item, order_line, pack_order_key, - remote::{remote_warehouse_home, simulate_remote_call}, - stock, District, Item, OrderLine, Stock, WarehouseId, DISTRICTS_PER_WAREHOUSE, TAX_SCALE, + DISTRICTS_PER_WAREHOUSE, District, Item, OrderLine, Stock, TAX_SCALE, WarehouseId, district, find_customer_by_id, find_district, find_stock, find_warehouse, item, order_line, pack_order_key, remote::{call_remote_reducer, remote_warehouse_home, simulate_remote_call}, stock }; use spacetimedb::{log_stopwatch::LogStopwatch, reducer, Identity, ReducerContext, SpacetimeType, Table, Timestamp}; @@ -183,19 +181,19 @@ fn call_remote_order_multiple_items_and_decrement_stock( remote_database_identity: Identity, input: OrderMultipleItemsInput, ) -> Result, String> { - // call_remote_reducer( - // ctx, - // remote_database_identity, - // "order_multiple_items_and_decrement_stocks", - // &input, - // ) - simulate_remote_call( + call_remote_reducer( ctx, remote_database_identity, "order_multiple_items_and_decrement_stocks", &input, - )?; - Ok(simulated_remote_order_outputs(input)) + ) + // simulate_remote_call( + // ctx, + // remote_database_identity, + // "order_multiple_items_and_decrement_stocks", + // &input, + // )?; + // Ok(simulated_remote_order_outputs(input)) } struct ProcessedNewOrderItem { diff --git a/modules/tpcc/src/payment.rs b/modules/tpcc/src/payment.rs index 72f26d6f0f3..c64e533abcd 100644 --- a/modules/tpcc/src/payment.rs +++ b/modules/tpcc/src/payment.rs @@ -1,7 +1,5 @@ use crate::{ - customer, district, find_district, find_warehouse, history, - remote::{remote_warehouse_home, simulate_remote_call}, - resolve_customer, warehouse, Customer, CustomerSelector, District, History, Warehouse, MAX_C_DATA_LEN, + Customer, CustomerSelector, District, History, MAX_C_DATA_LEN, Warehouse, customer, district, find_district, find_warehouse, history, remote::{call_remote_reducer, remote_warehouse_home, simulate_remote_call}, resolve_customer, warehouse }; use spacetimedb::{ log_stopwatch::LogStopwatch, procedure, reducer, Identity, ProcedureContext, ReducerContext, SpacetimeType, Table, @@ -128,19 +126,19 @@ fn call_remote_resolve_and_update_customer_for_payment( remote_database_identity: Identity, request: &PaymentRequest, ) -> Result { - // call_remote_reducer( - // ctx, - // remote_database_identity, - // "resolve_and_update_customer_for_payment", - // request, - // ) - simulate_remote_call( + call_remote_reducer( ctx, remote_database_identity, "resolve_and_update_customer_for_payment", request, - )?; - Ok(simulated_remote_customer(request)) + ) + // simulate_remote_call( + // ctx, + // remote_database_identity, + // "resolve_and_update_customer_for_payment", + // request, + // )?; + // Ok(simulated_remote_customer(request)) } #[procedure] diff --git a/tools/tpcc-dashboard/src/App.tsx b/tools/tpcc-dashboard/src/App.tsx index 52924ae4233..204dc9a764e 100644 --- a/tools/tpcc-dashboard/src/App.tsx +++ b/tools/tpcc-dashboard/src/App.tsx @@ -1,15 +1,10 @@ import { useContext, useEffect } from 'react'; import { SpacetimeDBContext } from './context'; -import { - deleteState, - insertState, - throughputStateUpdate, -} from './features/globalState'; +import { deleteState, insertState, removeTxnBucket, upsertTxnBucket } from './features/globalState'; import { useAppDispatch, useAppSelector } from './hooks'; import NewOrderThroughtputChart from './NewOrderThroughtputChart'; import StatsCards from './StatsCards'; import './App.css'; -import LatencyDistributionChart from './LatencyDistributionChart'; function App() { const conn = useContext(SpacetimeDBContext); @@ -37,23 +32,37 @@ function App() { dispatch(deleteState()); }); - conn?.db.txn.onInsert((_, txn) => { + conn.db.txn_bucket.onInsert((_, bucket) => { dispatch( - throughputStateUpdate({ - id: String(txn.id), - measurementTimeMs: Number(txn.measurementTimeMs), - latencyMs: Number(txn.latencyMs), + upsertTxnBucket({ + bucketStartMs: Number(bucket.bucketStartMs), + count: Number(bucket.count), + }) + ); + }); + conn.db.txn_bucket.onUpdate((_, _oldBucket, bucket) => { + dispatch( + upsertTxnBucket({ + bucketStartMs: Number(bucket.bucketStartMs), + count: Number(bucket.count), + }) + ); + }); + conn.db.txn_bucket.onDelete((_, bucket) => { + dispatch( + removeTxnBucket({ + bucketStartMs: Number(bucket.bucketStartMs), }) ); }); const subscription = conn - ?.subscriptionBuilder() + .subscriptionBuilder() .onError(err => console.error('Subscription error:', err)) - .subscribe(['SELECT * FROM state', 'SELECT * FROM txn']); + .subscribe(['SELECT * FROM state', 'SELECT * FROM txn_bucket']); return () => { - subscription?.unsubscribe(); + subscription.unsubscribe(); }; }, [conn, dispatch]); @@ -65,7 +74,6 @@ function App() {
-
); } diff --git a/tools/tpcc-dashboard/src/LatencyDistributionChart.tsx b/tools/tpcc-dashboard/src/LatencyDistributionChart.tsx index a51abfaf799..772dd9cbf79 100644 --- a/tools/tpcc-dashboard/src/LatencyDistributionChart.tsx +++ b/tools/tpcc-dashboard/src/LatencyDistributionChart.tsx @@ -11,7 +11,7 @@ import { useAppSelector } from './hooks'; import { useMemo } from 'react'; export default function LatencyDistributionChart() { - const latencyData = useAppSelector(state => state.globalState.latencyData); + const latencyData = useAppSelector(state => state.globalState.bucketCounts); const chartData = useMemo(() => { const sortedLatencies = Object.keys(latencyData) diff --git a/tools/tpcc-dashboard/src/NewOrderThroughtputChart.tsx b/tools/tpcc-dashboard/src/NewOrderThroughtputChart.tsx index 399223e0960..33f6bca73d7 100644 --- a/tools/tpcc-dashboard/src/NewOrderThroughtputChart.tsx +++ b/tools/tpcc-dashboard/src/NewOrderThroughtputChart.tsx @@ -20,7 +20,7 @@ interface ThroughputBucketPoint { } function buildTpccThroughputSeries( - transactionTimes: number[], + bucketCounts: Record, runStartMs: number, runEndMs: number, bucketSizeMs: number @@ -32,18 +32,9 @@ function buildTpccThroughputSeries( const buckets = Array.from({ length: bucketCount }, (_, i) => ({ bucketStartMs: runStartMs + i * bucketSizeMs, bucketEndMs: Math.min(runStartMs + (i + 1) * bucketSizeMs, runEndMs), - count: 0, + count: bucketCounts[runStartMs + i * bucketSizeMs] ?? 0, })); - for (const ts of transactionTimes) { - if (ts < runStartMs || ts >= runEndMs) continue; - - const index = Math.floor((ts - runStartMs) / bucketSizeMs); - if (index >= 0 && index < buckets.length) { - buckets[index].count += 1; - } - } - return buckets.map(bucket => ({ elapsedSec: (bucket.bucketStartMs - runStartMs) / 1000, tpmC: bucket.count * (60_000 / bucketSizeMs), @@ -61,13 +52,18 @@ export default function NewOrderThroughtputChart() { const measurementEndMs = useAppSelector( state => state.globalState.measureEndMs ); - const data = useAppSelector(state => state.globalState.throughputData); + const bucketCounts = useAppSelector(state => state.globalState.bucketCounts); const chartData = useMemo(() => { - const bucketSizeMs = 10_000; + const bucketSizeMs = 1_000; - return buildTpccThroughputSeries(data, runStartMs, runEndMs, bucketSizeMs); - }, [data, runStartMs, runEndMs]); + return buildTpccThroughputSeries( + bucketCounts, + runStartMs, + runEndMs, + bucketSizeMs + ); + }, [bucketCounts, runStartMs, runEndMs]); return (
diff --git a/tools/tpcc-dashboard/src/StatsCards.tsx b/tools/tpcc-dashboard/src/StatsCards.tsx index e6ea32489a8..37b67921c0d 100644 --- a/tools/tpcc-dashboard/src/StatsCards.tsx +++ b/tools/tpcc-dashboard/src/StatsCards.tsx @@ -12,28 +12,32 @@ import './StatsCards.css'; function getTpmC( measureStartMs: number, - measureEndMs: number, - measuredTransactionCount: number -): number | null { - const nowMs = Date.now(); + bucketCounts: Record +): number { + const measuredBucketStarts = Object.keys(bucketCounts) + .map(Number) + .filter(bucketStartMs => bucketStartMs >= measureStartMs) + .sort((a, b) => a - b); - if (measureStartMs <= 0 || measureEndMs <= measureStartMs) { - return null; + if (measuredBucketStarts.length === 0) { + return 0; } - if (nowMs < measureStartMs) { - return null; - } - - const effectiveEndMs = Math.min(nowMs, measureEndMs); - const elapsedTimeSec = (effectiveEndMs - measureStartMs) / 1000; + const firstBucketStartMs = measuredBucketStarts[0]; + const latestBucketStartMs = + measuredBucketStarts[measuredBucketStarts.length - 1]; + const totalMeasuredTransactions = measuredBucketStarts.reduce( + (sum, bucketStartMs) => sum + (bucketCounts[bucketStartMs] ?? 0), + 0 + ); + const elapsedTimeSec = + (latestBucketStartMs + 1_000 - firstBucketStartMs) / 1000; if (elapsedTimeSec <= 0) { - return null; + return 0; } - const tpmC = (measuredTransactionCount / elapsedTimeSec) * 60; - return Math.trunc(tpmC); + return Math.trunc((totalMeasuredTransactions / elapsedTimeSec) * 60); } function StatCard({ @@ -71,8 +75,10 @@ export default function StatsCards() { const measuredTransactionCount = useAppSelector( state => state.globalState.measuredTransactionCount ); + const bucketCounts = useAppSelector(state => state.globalState.bucketCounts); - const tpmC = getTpmC(measureStartMs, measureEndMs, measuredTransactionCount); + const tpmC = getTpmC(measureStartMs, bucketCounts); + const theoreticalMaxThroughput = warehouses * 12.86; return (
@@ -86,16 +92,16 @@ export default function StatsCards() { } label="Max. Theorical Throughput" - value={warehouses * 12.86} + value={theoreticalMaxThroughput} unit="tpmC" /> } label="% Max. Theorical Throughput" value={ - tpmC === null + theoreticalMaxThroughput <= 0 ? 'N/A' - : ((tpmC / (warehouses * 12.86)) * 100).toFixed(2) + '%' + : ((tpmC / theoreticalMaxThroughput) * 100).toFixed(2) + '%' } /> } label="MQTh" - value={ - tpmC === null ? 'Warmup in progress...' : Math.trunc(tpmC) + ' tpmC' - } + value={tpmC + ' tpmC'} />
); diff --git a/tools/tpcc-dashboard/src/features/globalState.ts b/tools/tpcc-dashboard/src/features/globalState.ts index 60f3c00be02..a18efe22581 100644 --- a/tools/tpcc-dashboard/src/features/globalState.ts +++ b/tools/tpcc-dashboard/src/features/globalState.ts @@ -1,5 +1,36 @@ import { createSlice, type PayloadAction } from '@reduxjs/toolkit'; +function createInitialState(): GlobalState { + return { + isReady: false, + warehouses: 0, + measureStartMs: 0, + measureEndMs: 0, + runStartMs: 0, + runEndMs: 0, + totalTransactionCount: 0, + measuredTransactionCount: 0, + bucketCounts: {}, + }; +} + +function recalculateCounts(state: GlobalState) { + let totalTransactionCount = 0; + let measuredTransactionCount = 0; + + for (const [bucketStartMs, count] of Object.entries(state.bucketCounts)) { + const bucketCount = Number(count); + totalTransactionCount += bucketCount; + + if (Number(bucketStartMs) >= state.measureStartMs) { + measuredTransactionCount += bucketCount; + } + } + + state.totalTransactionCount = totalTransactionCount; + state.measuredTransactionCount = measuredTransactionCount; +} + export interface GlobalState { isReady: boolean; warehouses: number; @@ -9,24 +40,10 @@ export interface GlobalState { runEndMs: number; totalTransactionCount: number; measuredTransactionCount: number; - /// Time in ms when the transaction was measured - throughputData: number[]; - /// Latency frequency distribution, where the key is the latency in ms and the value is the count of transactions with that latency. - latencyData: Record; + bucketCounts: Record; } -const initialState: GlobalState = { - isReady: false, - warehouses: 0, - measureStartMs: 0, - measureEndMs: 0, - runStartMs: 0, - runEndMs: 0, - totalTransactionCount: 0, - measuredTransactionCount: 0, - throughputData: [], - latencyData: {}, -}; +const initialState: GlobalState = createInitialState(); export const globalStateSlice = createSlice({ name: 'globalState', @@ -50,43 +67,31 @@ export const globalStateSlice = createSlice({ state.measureEndMs = payload.measureEndMs; state.runStartMs = payload.runStartMs; state.runEndMs = payload.runEndMs; - state.totalTransactionCount = 0; - state.measuredTransactionCount = 0; - state.throughputData = []; - state.latencyData = {}; + recalculateCounts(state); }, - deleteState: state => { + deleteState: () => { console.log('State deleted, resetting to initial state'); - state.isReady = false; + return createInitialState(); }, - throughputStateUpdate: ( + upsertTxnBucket: ( state, action: PayloadAction<{ - id: string; - measurementTimeMs: number; - latencyMs: number; + bucketStartMs: number; + count: number; }> ) => { const payload = action.payload; - state.totalTransactionCount += 1; - if (Number(payload.measurementTimeMs) >= state.measureStartMs) { - // Each update here is a single transaction, so we can just increment the count by one. - state.measuredTransactionCount += 1; - } - - state.throughputData.push(Number(payload.measurementTimeMs)); - - const latency = Number(payload.latencyMs); - if (state.latencyData[latency]) { - state.latencyData[latency] += 1; - } else { - state.latencyData[latency] = 1; - } + state.bucketCounts[payload.bucketStartMs] = payload.count; + recalculateCounts(state); + }, + removeTxnBucket: (state, action: PayloadAction<{ bucketStartMs: number }>) => { + delete state.bucketCounts[action.payload.bucketStartMs]; + recalculateCounts(state); }, }, }); -export const { insertState, deleteState, throughputStateUpdate } = +export const { insertState, deleteState, upsertTxnBucket, removeTxnBucket } = globalStateSlice.actions; export default globalStateSlice.reducer; diff --git a/tools/tpcc-dashboard/src/module_bindings/index.ts b/tools/tpcc-dashboard/src/module_bindings/index.ts index e7cf4b9d457..5281ad76b8b 100644 --- a/tools/tpcc-dashboard/src/module_bindings/index.ts +++ b/tools/tpcc-dashboard/src/module_bindings/index.ts @@ -1,7 +1,7 @@ // THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE // WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. -// This was generated using spacetimedb cli version 2.1.0 (commit 6981f48b4bc1a71c8dd9bdfe5a2c343f6370243d). +// This was generated using spacetimedb cli version 2.1.0 (commit 7df4044d896459c2a6ca5b9e2c716052aa39e88a). /* eslint-disable */ /* tslint:disable */ @@ -36,6 +36,7 @@ import { // Import all reducer arg schemas import ClearStateReducer from "./clear_state_reducer"; import RecordTxnReducer from "./record_txn_reducer"; +import RecordTxnBucketReducer from "./record_txn_bucket_reducer"; import ResetReducer from "./reset_reducer"; // Import all procedure arg schemas @@ -43,6 +44,7 @@ import ResetReducer from "./reset_reducer"; // Import all table schema definitions import StateRow from "./state_table"; import TxnRow from "./txn_table"; +import TxnBucketRow from "./txn_bucket_table"; /** Type-only namespace exports for generated type groups. */ @@ -70,12 +72,24 @@ const tablesSchema = __schema({ { name: 'txn_id_key', constraint: 'unique', columns: ['id'] }, ], }, TxnRow), + txn_bucket: __table({ + name: 'txn_bucket', + indexes: [ + { accessor: 'bucket_start_ms', name: 'txn_bucket_bucket_start_ms_idx_btree', algorithm: 'btree', columns: [ + 'bucketStartMs', + ] }, + ], + constraints: [ + { name: 'txn_bucket_bucket_start_ms_key', constraint: 'unique', columns: ['bucketStartMs'] }, + ], + }, TxnBucketRow), }); /** The schema information for all reducers in this module. This is defined the same way as the reducers would have been defined in the server, except the body of the reducer is omitted in code generation. */ const reducersSchema = __reducers( __reducerSchema("clear_state", ClearStateReducer), __reducerSchema("record_txn", RecordTxnReducer), + __reducerSchema("record_txn_bucket", RecordTxnBucketReducer), __reducerSchema("reset", ResetReducer), ); diff --git a/tools/tpcc-dashboard/src/module_bindings/record_txn_bucket_reducer.ts b/tools/tpcc-dashboard/src/module_bindings/record_txn_bucket_reducer.ts new file mode 100644 index 00000000000..e18fbc0a086 --- /dev/null +++ b/tools/tpcc-dashboard/src/module_bindings/record_txn_bucket_reducer.ts @@ -0,0 +1,13 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +/* eslint-disable */ +/* tslint:disable */ +import { + TypeBuilder as __TypeBuilder, + t as __t, + type AlgebraicTypeType as __AlgebraicTypeType, + type Infer as __Infer, +} from "spacetimedb"; + +export default {}; diff --git a/tools/tpcc-dashboard/src/module_bindings/txn_bucket_table.ts b/tools/tpcc-dashboard/src/module_bindings/txn_bucket_table.ts new file mode 100644 index 00000000000..702c9a51d58 --- /dev/null +++ b/tools/tpcc-dashboard/src/module_bindings/txn_bucket_table.ts @@ -0,0 +1,16 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +/* eslint-disable */ +/* tslint:disable */ +import { + TypeBuilder as __TypeBuilder, + t as __t, + type AlgebraicTypeType as __AlgebraicTypeType, + type Infer as __Infer, +} from "spacetimedb"; + +export default __t.row({ + bucketStartMs: __t.u64().primaryKey().name("bucket_start_ms"), + count: __t.u64(), +}); diff --git a/tools/tpcc-dashboard/src/module_bindings/types.ts b/tools/tpcc-dashboard/src/module_bindings/types.ts index 76b0190fb70..37421c0c85a 100644 --- a/tools/tpcc-dashboard/src/module_bindings/types.ts +++ b/tools/tpcc-dashboard/src/module_bindings/types.ts @@ -27,3 +27,9 @@ export const Txn = __t.object("Txn", { }); export type Txn = __Infer; +export const TxnBucket = __t.object("TxnBucket", { + bucketStartMs: __t.u64(), + count: __t.u64(), +}); +export type TxnBucket = __Infer; + diff --git a/tools/tpcc-dashboard/src/module_bindings/types/reducers.ts b/tools/tpcc-dashboard/src/module_bindings/types/reducers.ts index ef5e1d1856b..ee9bac76e78 100644 --- a/tools/tpcc-dashboard/src/module_bindings/types/reducers.ts +++ b/tools/tpcc-dashboard/src/module_bindings/types/reducers.ts @@ -8,9 +8,11 @@ import { type Infer as __Infer } from "spacetimedb"; // Import all reducer arg schemas import ClearStateReducer from "../clear_state_reducer"; import RecordTxnReducer from "../record_txn_reducer"; +import RecordTxnBucketReducer from "../record_txn_bucket_reducer"; import ResetReducer from "../reset_reducer"; export type ClearStateParams = __Infer; export type RecordTxnParams = __Infer; +export type RecordTxnBucketParams = __Infer; export type ResetParams = __Infer; diff --git a/tools/tpcc-runner/src/config.rs b/tools/tpcc-runner/src/config.rs index a624663bd8a..43652b7b83c 100644 --- a/tools/tpcc-runner/src/config.rs +++ b/tools/tpcc-runner/src/config.rs @@ -43,6 +43,8 @@ pub struct LoadConfig { pub load_parallelism: usize, pub batch_size: usize, pub reset: bool, + pub warehouse_id_offset: u32, + pub skip_items: bool, } #[derive(Debug, Clone)] @@ -90,6 +92,15 @@ pub struct LoadArgs { pub batch_size: Option, #[arg(long)] pub reset: Option, + /// Offset added to all warehouse IDs for this load. Use when adding warehouses + /// to a database that already has data (e.g. set to 70 to load warehouses 71-140 + /// into a database that already has warehouses 1-70). + #[arg(long)] + pub warehouse_id_offset: Option, + /// Skip loading the global Items table. Use together with --warehouse-id-offset + /// when adding warehouses to an existing database. + #[arg(long)] + pub skip_items: Option, } #[derive(Debug, Clone, Args)] @@ -188,6 +199,8 @@ struct FileLoadConfig { load_parallelism: Option, batch_size: Option, reset: Option, + warehouse_id_offset: Option, + skip_items: Option, } #[derive(Debug, Clone, Default, Deserialize)] @@ -284,6 +297,8 @@ impl LoadArgs { load_parallelism: load_parallelism.min(usize::try_from(num_databases).unwrap_or(usize::MAX)), batch_size, reset: self.reset.or(file.load.reset).unwrap_or(true), + warehouse_id_offset: self.warehouse_id_offset.or(file.load.warehouse_id_offset).unwrap_or(0), + skip_items: self.skip_items.or(file.load.skip_items).unwrap_or(false), }) } } diff --git a/tools/tpcc-runner/src/coordinator.rs b/tools/tpcc-runner/src/coordinator.rs index 7d80ac66933..6dc11630c2d 100644 --- a/tools/tpcc-runner/src/coordinator.rs +++ b/tools/tpcc-runner/src/coordinator.rs @@ -11,7 +11,7 @@ use std::time::Duration; use crate::config::CoordinatorConfig; use crate::metrics_module_bindings::reset_reducer::reset; -use crate::metrics_module_client::connect_metrics_module; +use crate::metrics_module_client::connect_metrics_module_async; use crate::protocol::{ DriverAssignment, RegisterDriverRequest, RegisterDriverResponse, RunSchedule, ScheduleResponse, SubmitSummaryRequest, @@ -68,37 +68,43 @@ async fn register_driver( State(state): State, Json(request): Json, ) -> Json { - let mut inner = state.inner.lock(); - let (assignment, is_new_registration) = match inner.registrations.get(&request.driver_id) { - Some(existing) => (existing.assignment.clone(), false), - None => { - if inner.registration_order.len() >= inner.config.expected_drivers { - return Json(RegisterDriverResponse { - accepted: false, - assignment: None, - }); + let (assignment, is_new_registration, registered, expected_drivers) = { + let mut inner = state.inner.lock(); + let expected_drivers = inner.config.expected_drivers; + match inner.registrations.get(&request.driver_id) { + Some(existing) => { + let registered = inner.registrations.len(); + (existing.assignment.clone(), false, registered, expected_drivers) + } + None => { + if inner.registration_order.len() >= expected_drivers { + return Json(RegisterDriverResponse { + accepted: false, + assignment: None, + }); + } + let index = inner.registration_order.len(); + let assignment = assignment_for_index(&inner.config, index); + inner.registration_order.push(request.driver_id.clone()); + inner.registrations.insert( + request.driver_id.clone(), + DriverRegistration { + assignment: assignment.clone(), + }, + ); + let registered = inner.registrations.len(); + (assignment, true, registered, expected_drivers) } - let index = inner.registration_order.len(); - let assignment = assignment_for_index(&inner.config, index); - inner.registration_order.push(request.driver_id.clone()); - inner.registrations.insert( - request.driver_id.clone(), - DriverRegistration { - assignment: assignment.clone(), - }, - ); - (assignment, true) } }; - maybe_create_schedule(&mut inner); - let registered = inner.registrations.len(); + maybe_create_schedule(&state).await; let warehouse_end = assignment_end(&assignment); if is_new_registration { log::info!( "driver {} registered and ready ({}/{}): warehouses {}..={} ({} warehouse(s))", request.driver_id, registered, - inner.config.expected_drivers, + expected_drivers, assignment.warehouse_start, warehouse_end, assignment.driver_warehouse_count @@ -168,38 +174,58 @@ async fn submit_summary( Ok(Json(aggregate)) } -fn maybe_create_schedule(inner: &mut CoordinatorState) { - if inner.schedule.is_some() || inner.registrations.len() < inner.config.expected_drivers { - return; - } +async fn maybe_create_schedule(state: &AppState) { + // Check whether schedule creation is needed, and grab config, without holding the lock + // during the async metrics module connection below. + let config = { + let inner = state.inner.lock(); + if inner.schedule.is_some() || inner.registrations.len() < inner.config.expected_drivers { + return; + } + inner.config.clone() + }; + let warmup_start_ms = now_millis() + 2_000; - let measure_start_ms = warmup_start_ms + (inner.config.warmup_secs * 1_000); - let measure_end_ms = measure_start_ms + (inner.config.measure_secs * 1_000); + let measure_start_ms = warmup_start_ms + (config.warmup_secs * 1_000); + let measure_end_ms = measure_start_ms + (config.measure_secs * 1_000); - let metrics_client = connect_metrics_module(&inner.config.connection).unwrap(); + let metrics_client = match connect_metrics_module_async(&config.connection).await { + Ok(client) => client, + Err(e) => { + log::error!("failed to connect to metrics module: {e:#}"); + return; + } + }; let _ = metrics_client.reducers.reset( - inner.config.warehouses as u64, - inner.config.warmup_secs * 1000, + config.warehouses as u64, + config.warmup_secs * 1000, measure_start_ms, measure_end_ms, ); let schedule = RunSchedule { - run_id: inner.config.run_id.clone(), + run_id: config.run_id.clone(), warmup_start_ms, measure_start_ms, measure_end_ms, stop_ms: measure_end_ms, }; + + let mut inner = state.inner.lock(); + if inner.schedule.is_some() { + // Another concurrent registration call already created the schedule. + return; + } inner.schedule = Some(schedule.clone()); log::info!( "all {} driver(s) registered; schedule ready for run {} (warmup_start_ms={} measure_start_ms={} measure_end_ms={})", - inner.config.expected_drivers, - inner.config.run_id, + config.expected_drivers, + config.run_id, warmup_start_ms, measure_start_ms, measure_end_ms ); + drop(inner); tokio::spawn(log_schedule_events(schedule)); } diff --git a/tools/tpcc-runner/src/driver.rs b/tools/tpcc-runner/src/driver.rs index f748e830be6..21e6d1c0f2b 100644 --- a/tools/tpcc-runner/src/driver.rs +++ b/tools/tpcc-runner/src/driver.rs @@ -10,7 +10,7 @@ use tokio::task::JoinSet; use crate::client::{expect_ok, ModuleClient}; use crate::config::{default_run_id, DriverConfig}; -use crate::metrics_module_bindings::{record_txn, DbConnection as MetricsDbConnection}; +use crate::metrics_module_bindings::{record_txn_bucket, DbConnection as MetricsDbConnection}; use crate::metrics_module_client::connect_metrics_module_async; use crate::module_bindings::*; use crate::protocol::{ @@ -233,6 +233,15 @@ async fn run_terminal(runtime: TerminalRuntime) -> Result<()> { } let kind = choose_transaction(&mut rng); + let keying_delay = keying_time(kind, config.keying_time_scale); + if !keying_delay.is_zero() && crate::summary::now_millis() < schedule.stop_ms { + tokio::time::sleep(keying_delay).await; + } + + if abort.load(Ordering::Relaxed) || crate::summary::now_millis() >= schedule.stop_ms { + break; + } + let started_ms = crate::summary::now_millis(); let context = TransactionContext { client: client.as_ref(), @@ -250,7 +259,7 @@ async fn run_terminal(runtime: TerminalRuntime) -> Result<()> { // Some metrics depend on knowing all completed orders, even outside the // measurement window if record.kind == TransactionKind::NewOrder && record.success { - let _ = metrics_client.reducers.record_txn(record.latency_ms as u16); + let _ = metrics_client.reducers.record_txn_bucket(); } if record.timestamp_ms >= schedule.measure_start_ms && record.timestamp_ms < schedule.measure_end_ms { @@ -258,14 +267,15 @@ async fn run_terminal(runtime: TerminalRuntime) -> Result<()> { } } Err(err) => { - abort.store(true, Ordering::Relaxed); - return Err(err); + log::error!( + "terminal task error: {err:#}", + ); } } - let delay = keying_time(kind, config.keying_time_scale) + think_time(kind, config.think_time_scale, &mut rng); - if !delay.is_zero() && crate::summary::now_millis() < schedule.stop_ms { - tokio::time::sleep(delay).await; + let think_delay = think_time(kind, config.think_time_scale, &mut rng); + if !think_delay.is_zero() && crate::summary::now_millis() < schedule.stop_ms { + tokio::time::sleep(think_delay).await; } } Ok(()) diff --git a/tools/tpcc-runner/src/loader.rs b/tools/tpcc-runner/src/loader.rs index 559aa819d79..3471ad89f9c 100644 --- a/tools/tpcc-runner/src/loader.rs +++ b/tools/tpcc-runner/src/loader.rs @@ -132,6 +132,8 @@ fn build_load_request( database_number, num_databases: config.num_databases, warehouses_per_database: config.warehouses_per_database, + warehouse_id_offset: config.warehouse_id_offset, + skip_items: config.skip_items, batch_size: u32::try_from(config.batch_size).context("batch_size exceeds u32")?, seed: LOAD_SEED, load_c_last, diff --git a/tools/tpcc-runner/src/metrics_module_bindings/clear_state_reducer.rs b/tools/tpcc-runner/src/metrics_module_bindings/clear_state_reducer.rs index fffd58d13d3..df42291348e 100644 --- a/tools/tpcc-runner/src/metrics_module_bindings/clear_state_reducer.rs +++ b/tools/tpcc-runner/src/metrics_module_bindings/clear_state_reducer.rs @@ -56,6 +56,7 @@ impl clear_state for super::RemoteReducers { + Send + 'static, ) -> __sdk::Result<()> { - self.imp.invoke_reducer_with_callback(ClearStateArgs {}, callback) + self.imp + .invoke_reducer_with_callback::<_, ()>(ClearStateArgs {}, callback) } } diff --git a/tools/tpcc-runner/src/metrics_module_bindings/mod.rs b/tools/tpcc-runner/src/metrics_module_bindings/mod.rs index 0f15ca42b08..0e35a1d7729 100644 --- a/tools/tpcc-runner/src/metrics_module_bindings/mod.rs +++ b/tools/tpcc-runner/src/metrics_module_bindings/mod.rs @@ -1,24 +1,30 @@ // THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE // WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. -// This was generated using spacetimedb cli version 2.1.0 (commit 6981f48b4bc1a71c8dd9bdfe5a2c343f6370243d). +// This was generated using spacetimedb cli version 2.1.0 (commit 7df4044d896459c2a6ca5b9e2c716052aa39e88a). #![allow(unused, clippy::all)] use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; pub mod clear_state_reducer; +pub mod record_txn_bucket_reducer; pub mod record_txn_reducer; pub mod reset_reducer; pub mod state_table; pub mod state_type; +pub mod txn_bucket_table; +pub mod txn_bucket_type; pub mod txn_table; pub mod txn_type; pub use clear_state_reducer::clear_state; +pub use record_txn_bucket_reducer::record_txn_bucket; pub use record_txn_reducer::record_txn; pub use reset_reducer::reset; pub use state_table::*; pub use state_type::State; +pub use txn_bucket_table::*; +pub use txn_bucket_type::TxnBucket; pub use txn_table::*; pub use txn_type::Txn; @@ -34,6 +40,7 @@ pub enum Reducer { RecordTxn { latency_ms: u16, }, + RecordTxnBucket, Reset { warehouse_count: u64, warmup_duration_ms: u64, @@ -51,6 +58,7 @@ impl __sdk::Reducer for Reducer { match self { Reducer::ClearState => "clear_state", Reducer::RecordTxn { .. } => "record_txn", + Reducer::RecordTxnBucket => "record_txn_bucket", Reducer::Reset { .. } => "reset", _ => unreachable!(), } @@ -62,6 +70,7 @@ impl __sdk::Reducer for Reducer { Reducer::RecordTxn { latency_ms } => __sats::bsatn::to_vec(&record_txn_reducer::RecordTxnArgs { latency_ms: latency_ms.clone(), }), + Reducer::RecordTxnBucket => __sats::bsatn::to_vec(&record_txn_bucket_reducer::RecordTxnBucketArgs {}), Reducer::Reset { warehouse_count, warmup_duration_ms, @@ -84,6 +93,7 @@ impl __sdk::Reducer for Reducer { pub struct DbUpdate { state: __sdk::TableUpdate, txn: __sdk::TableUpdate, + txn_bucket: __sdk::TableUpdate, } impl TryFrom<__ws::v2::TransactionUpdate> for DbUpdate { @@ -94,6 +104,9 @@ impl TryFrom<__ws::v2::TransactionUpdate> for DbUpdate { match &table_update.table_name[..] { "state" => db_update.state.append(state_table::parse_table_update(table_update)?), "txn" => db_update.txn.append(txn_table::parse_table_update(table_update)?), + "txn_bucket" => db_update + .txn_bucket + .append(txn_bucket_table::parse_table_update(table_update)?), unknown => { return Err(__sdk::InternalError::unknown_name("table", unknown, "DatabaseUpdate").into()); @@ -118,6 +131,9 @@ impl __sdk::DbUpdate for DbUpdate { diff.txn = cache .apply_diff_to_table::("txn", &self.txn) .with_updates_by_pk(|row| &row.id); + diff.txn_bucket = cache + .apply_diff_to_table::("txn_bucket", &self.txn_bucket) + .with_updates_by_pk(|row| &row.bucket_start_ms); diff } @@ -129,6 +145,9 @@ impl __sdk::DbUpdate for DbUpdate { .state .append(__sdk::parse_row_list_as_inserts(table_rows.rows)?), "txn" => db_update.txn.append(__sdk::parse_row_list_as_inserts(table_rows.rows)?), + "txn_bucket" => db_update + .txn_bucket + .append(__sdk::parse_row_list_as_inserts(table_rows.rows)?), unknown => { return Err(__sdk::InternalError::unknown_name("table", unknown, "QueryRows").into()); } @@ -144,6 +163,9 @@ impl __sdk::DbUpdate for DbUpdate { .state .append(__sdk::parse_row_list_as_deletes(table_rows.rows)?), "txn" => db_update.txn.append(__sdk::parse_row_list_as_deletes(table_rows.rows)?), + "txn_bucket" => db_update + .txn_bucket + .append(__sdk::parse_row_list_as_deletes(table_rows.rows)?), unknown => { return Err(__sdk::InternalError::unknown_name("table", unknown, "QueryRows").into()); } @@ -159,6 +181,7 @@ impl __sdk::DbUpdate for DbUpdate { pub struct AppliedDiff<'r> { state: __sdk::TableAppliedDiff<'r, State>, txn: __sdk::TableAppliedDiff<'r, Txn>, + txn_bucket: __sdk::TableAppliedDiff<'r, TxnBucket>, __unused: std::marker::PhantomData<&'r ()>, } @@ -170,6 +193,7 @@ impl<'r> __sdk::AppliedDiff<'r> for AppliedDiff<'r> { fn invoke_row_callbacks(&self, event: &EventContext, callbacks: &mut __sdk::DbCallbacks) { callbacks.invoke_table_row_callbacks::("state", &self.state, event); callbacks.invoke_table_row_callbacks::("txn", &self.txn, event); + callbacks.invoke_table_row_callbacks::("txn_bucket", &self.txn_bucket, event); } } @@ -829,6 +853,7 @@ impl __sdk::SpacetimeModule for RemoteModule { fn register_tables(client_cache: &mut __sdk::ClientCache) { state_table::register_table(client_cache); txn_table::register_table(client_cache); + txn_bucket_table::register_table(client_cache); } - const ALL_TABLE_NAMES: &'static [&'static str] = &["state", "txn"]; + const ALL_TABLE_NAMES: &'static [&'static str] = &["state", "txn", "txn_bucket"]; } diff --git a/tools/tpcc-runner/src/metrics_module_bindings/record_txn_bucket_reducer.rs b/tools/tpcc-runner/src/metrics_module_bindings/record_txn_bucket_reducer.rs new file mode 100644 index 00000000000..20c629a9720 --- /dev/null +++ b/tools/tpcc-runner/src/metrics_module_bindings/record_txn_bucket_reducer.rs @@ -0,0 +1,62 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#![allow(unused, clippy::all)] +use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; + +#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)] +#[sats(crate = __lib)] +pub(super) struct RecordTxnBucketArgs {} + +impl From for super::Reducer { + fn from(args: RecordTxnBucketArgs) -> Self { + Self::RecordTxnBucket + } +} + +impl __sdk::InModule for RecordTxnBucketArgs { + type Module = super::RemoteModule; +} + +#[allow(non_camel_case_types)] +/// Extension trait for access to the reducer `record_txn_bucket`. +/// +/// Implemented for [`super::RemoteReducers`]. +pub trait record_txn_bucket { + /// Request that the remote module invoke the reducer `record_txn_bucket` to run as soon as possible. + /// + /// This method returns immediately, and errors only if we are unable to send the request. + /// The reducer will run asynchronously in the future, + /// and this method provides no way to listen for its completion status. + /// /// Use [`record_txn_bucket:record_txn_bucket_then`] to run a callback after the reducer completes. + fn record_txn_bucket(&self) -> __sdk::Result<()> { + self.record_txn_bucket_then(|_, _| {}) + } + + /// Request that the remote module invoke the reducer `record_txn_bucket` to run as soon as possible, + /// registering `callback` to run when we are notified that the reducer completed. + /// + /// This method returns immediately, and errors only if we are unable to send the request. + /// The reducer will run asynchronously in the future, + /// and its status can be observed with the `callback`. + fn record_txn_bucket_then( + &self, + + callback: impl FnOnce(&super::ReducerEventContext, Result, __sdk::InternalError>) + + Send + + 'static, + ) -> __sdk::Result<()>; +} + +impl record_txn_bucket for super::RemoteReducers { + fn record_txn_bucket_then( + &self, + + callback: impl FnOnce(&super::ReducerEventContext, Result, __sdk::InternalError>) + + Send + + 'static, + ) -> __sdk::Result<()> { + self.imp + .invoke_reducer_with_callback::<_, ()>(RecordTxnBucketArgs {}, callback) + } +} diff --git a/tools/tpcc-runner/src/metrics_module_bindings/record_txn_reducer.rs b/tools/tpcc-runner/src/metrics_module_bindings/record_txn_reducer.rs index c7577f2bedc..3ccccd97476 100644 --- a/tools/tpcc-runner/src/metrics_module_bindings/record_txn_reducer.rs +++ b/tools/tpcc-runner/src/metrics_module_bindings/record_txn_reducer.rs @@ -63,6 +63,6 @@ impl record_txn for super::RemoteReducers { + 'static, ) -> __sdk::Result<()> { self.imp - .invoke_reducer_with_callback(RecordTxnArgs { latency_ms }, callback) + .invoke_reducer_with_callback::<_, ()>(RecordTxnArgs { latency_ms }, callback) } } diff --git a/tools/tpcc-runner/src/metrics_module_bindings/reset_reducer.rs b/tools/tpcc-runner/src/metrics_module_bindings/reset_reducer.rs index b0a35831f4c..f74c3cd3662 100644 --- a/tools/tpcc-runner/src/metrics_module_bindings/reset_reducer.rs +++ b/tools/tpcc-runner/src/metrics_module_bindings/reset_reducer.rs @@ -86,7 +86,7 @@ impl reset for super::RemoteReducers { + Send + 'static, ) -> __sdk::Result<()> { - self.imp.invoke_reducer_with_callback( + self.imp.invoke_reducer_with_callback::<_, ()>( ResetArgs { warehouse_count, warmup_duration_ms, diff --git a/tools/tpcc-runner/src/metrics_module_bindings/txn_bucket_table.rs b/tools/tpcc-runner/src/metrics_module_bindings/txn_bucket_table.rs new file mode 100644 index 00000000000..14b3d56c3cf --- /dev/null +++ b/tools/tpcc-runner/src/metrics_module_bindings/txn_bucket_table.rs @@ -0,0 +1,157 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#![allow(unused, clippy::all)] +use super::txn_bucket_type::TxnBucket; +use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; + +/// Table handle for the table `txn_bucket`. +/// +/// Obtain a handle from the [`TxnBucketTableAccess::txn_bucket`] method on [`super::RemoteTables`], +/// like `ctx.db.txn_bucket()`. +/// +/// Users are encouraged not to explicitly reference this type, +/// but to directly chain method calls, +/// like `ctx.db.txn_bucket().on_insert(...)`. +pub struct TxnBucketTableHandle<'ctx> { + imp: __sdk::TableHandle, + ctx: std::marker::PhantomData<&'ctx super::RemoteTables>, +} + +#[allow(non_camel_case_types)] +/// Extension trait for access to the table `txn_bucket`. +/// +/// Implemented for [`super::RemoteTables`]. +pub trait TxnBucketTableAccess { + #[allow(non_snake_case)] + /// Obtain a [`TxnBucketTableHandle`], which mediates access to the table `txn_bucket`. + fn txn_bucket(&self) -> TxnBucketTableHandle<'_>; +} + +impl TxnBucketTableAccess for super::RemoteTables { + fn txn_bucket(&self) -> TxnBucketTableHandle<'_> { + TxnBucketTableHandle { + imp: self.imp.get_table::("txn_bucket"), + ctx: std::marker::PhantomData, + } + } +} + +pub struct TxnBucketInsertCallbackId(__sdk::CallbackId); +pub struct TxnBucketDeleteCallbackId(__sdk::CallbackId); + +impl<'ctx> __sdk::Table for TxnBucketTableHandle<'ctx> { + type Row = TxnBucket; + type EventContext = super::EventContext; + + fn count(&self) -> u64 { + self.imp.count() + } + fn iter(&self) -> impl Iterator + '_ { + self.imp.iter() + } + + type InsertCallbackId = TxnBucketInsertCallbackId; + + fn on_insert( + &self, + callback: impl FnMut(&Self::EventContext, &Self::Row) + Send + 'static, + ) -> TxnBucketInsertCallbackId { + TxnBucketInsertCallbackId(self.imp.on_insert(Box::new(callback))) + } + + fn remove_on_insert(&self, callback: TxnBucketInsertCallbackId) { + self.imp.remove_on_insert(callback.0) + } + + type DeleteCallbackId = TxnBucketDeleteCallbackId; + + fn on_delete( + &self, + callback: impl FnMut(&Self::EventContext, &Self::Row) + Send + 'static, + ) -> TxnBucketDeleteCallbackId { + TxnBucketDeleteCallbackId(self.imp.on_delete(Box::new(callback))) + } + + fn remove_on_delete(&self, callback: TxnBucketDeleteCallbackId) { + self.imp.remove_on_delete(callback.0) + } +} + +pub struct TxnBucketUpdateCallbackId(__sdk::CallbackId); + +impl<'ctx> __sdk::TableWithPrimaryKey for TxnBucketTableHandle<'ctx> { + type UpdateCallbackId = TxnBucketUpdateCallbackId; + + fn on_update( + &self, + callback: impl FnMut(&Self::EventContext, &Self::Row, &Self::Row) + Send + 'static, + ) -> TxnBucketUpdateCallbackId { + TxnBucketUpdateCallbackId(self.imp.on_update(Box::new(callback))) + } + + fn remove_on_update(&self, callback: TxnBucketUpdateCallbackId) { + self.imp.remove_on_update(callback.0) + } +} + +/// Access to the `bucket_start_ms` unique index on the table `txn_bucket`, +/// which allows point queries on the field of the same name +/// via the [`TxnBucketBucketStartMsUnique::find`] method. +/// +/// Users are encouraged not to explicitly reference this type, +/// but to directly chain method calls, +/// like `ctx.db.txn_bucket().bucket_start_ms().find(...)`. +pub struct TxnBucketBucketStartMsUnique<'ctx> { + imp: __sdk::UniqueConstraintHandle, + phantom: std::marker::PhantomData<&'ctx super::RemoteTables>, +} + +impl<'ctx> TxnBucketTableHandle<'ctx> { + /// Get a handle on the `bucket_start_ms` unique index on the table `txn_bucket`. + pub fn bucket_start_ms(&self) -> TxnBucketBucketStartMsUnique<'ctx> { + TxnBucketBucketStartMsUnique { + imp: self.imp.get_unique_constraint::("bucket_start_ms"), + phantom: std::marker::PhantomData, + } + } +} + +impl<'ctx> TxnBucketBucketStartMsUnique<'ctx> { + /// Find the subscribed row whose `bucket_start_ms` column value is equal to `col_val`, + /// if such a row is present in the client cache. + pub fn find(&self, col_val: &u64) -> Option { + self.imp.find(col_val) + } +} + +#[doc(hidden)] +pub(super) fn register_table(client_cache: &mut __sdk::ClientCache) { + let _table = client_cache.get_or_make_table::("txn_bucket"); + _table.add_unique_constraint::("bucket_start_ms", |row| &row.bucket_start_ms); +} + +#[doc(hidden)] +pub(super) fn parse_table_update(raw_updates: __ws::v2::TableUpdate) -> __sdk::Result<__sdk::TableUpdate> { + __sdk::TableUpdate::parse_table_update(raw_updates).map_err(|e| { + __sdk::InternalError::failed_parse("TableUpdate", "TableUpdate") + .with_cause(e) + .into() + }) +} + +#[allow(non_camel_case_types)] +/// Extension trait for query builder access to the table `TxnBucket`. +/// +/// Implemented for [`__sdk::QueryTableAccessor`]. +pub trait txn_bucketQueryTableAccess { + #[allow(non_snake_case)] + /// Get a query builder for the table `TxnBucket`. + fn txn_bucket(&self) -> __sdk::__query_builder::Table; +} + +impl txn_bucketQueryTableAccess for __sdk::QueryTableAccessor { + fn txn_bucket(&self) -> __sdk::__query_builder::Table { + __sdk::__query_builder::Table::new("txn_bucket") + } +} diff --git a/tools/tpcc-runner/src/metrics_module_bindings/txn_bucket_type.rs b/tools/tpcc-runner/src/metrics_module_bindings/txn_bucket_type.rs new file mode 100644 index 00000000000..774dafd2063 --- /dev/null +++ b/tools/tpcc-runner/src/metrics_module_bindings/txn_bucket_type.rs @@ -0,0 +1,52 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#![allow(unused, clippy::all)] +use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; + +#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)] +#[sats(crate = __lib)] +pub struct TxnBucket { + pub bucket_start_ms: u64, + pub count: u64, +} + +impl __sdk::InModule for TxnBucket { + type Module = super::RemoteModule; +} + +/// Column accessor struct for the table `TxnBucket`. +/// +/// Provides typed access to columns for query building. +pub struct TxnBucketCols { + pub bucket_start_ms: __sdk::__query_builder::Col, + pub count: __sdk::__query_builder::Col, +} + +impl __sdk::__query_builder::HasCols for TxnBucket { + type Cols = TxnBucketCols; + fn cols(table_name: &'static str) -> Self::Cols { + TxnBucketCols { + bucket_start_ms: __sdk::__query_builder::Col::new(table_name, "bucket_start_ms"), + count: __sdk::__query_builder::Col::new(table_name, "count"), + } + } +} + +/// Indexed column accessor struct for the table `TxnBucket`. +/// +/// Provides typed access to indexed columns for query building. +pub struct TxnBucketIxCols { + pub bucket_start_ms: __sdk::__query_builder::IxCol, +} + +impl __sdk::__query_builder::HasIxCols for TxnBucket { + type IxCols = TxnBucketIxCols; + fn ix_cols(table_name: &'static str) -> Self::IxCols { + TxnBucketIxCols { + bucket_start_ms: __sdk::__query_builder::IxCol::new(table_name, "bucket_start_ms"), + } + } +} + +impl __sdk::__query_builder::CanBeLookupTable for TxnBucket {} diff --git a/tools/tpcc-runner/src/module_bindings/tpcc_load_config_request_type.rs b/tools/tpcc-runner/src/module_bindings/tpcc_load_config_request_type.rs index 0e38d4f9edd..7f43a508490 100644 --- a/tools/tpcc-runner/src/module_bindings/tpcc_load_config_request_type.rs +++ b/tools/tpcc-runner/src/module_bindings/tpcc_load_config_request_type.rs @@ -10,6 +10,8 @@ pub struct TpccLoadConfigRequest { pub database_number: u32, pub num_databases: u32, pub warehouses_per_database: u32, + pub warehouse_id_offset: u32, + pub skip_items: bool, pub batch_size: u32, pub seed: u64, pub load_c_last: u32,