From b513020f9f68a6bdea992b6254ccad85254b9abf Mon Sep 17 00:00:00 2001 From: James Date: Mon, 6 Apr 2026 12:15:42 -0400 Subject: [PATCH 1/2] fix(cold-sql): pool starvation and query inefficiencies (#45) - Add per-backend pool defaults and SqlConnector builder - Add safety checks to integer conversions - Replace get_logs COUNT+SELECT with single LIMIT query - Consolidate get_receipt and batch drain_above - Bump workspace version to 0.7.0 Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.toml | 16 +- crates/cold-sql/src/backend.rs | 289 ++++++-- crates/cold-sql/src/columns.rs | 9 +- crates/cold-sql/src/connector.rs | 46 +- crates/cold-sql/src/convert.rs | 26 +- crates/cold-sql/src/lib.rs | 2 +- ...026-04-03-cold-sql-pool-and-query-fixes.md | 672 ++++++++++++++++++ ...03-cold-sql-pool-and-query-fixes-design.md | 187 +++++ 8 files changed, 1155 insertions(+), 92 deletions(-) create mode 100644 docs/superpowers/plans/2026-04-03-cold-sql-pool-and-query-fixes.md create mode 100644 docs/superpowers/specs/2026-04-03-cold-sql-pool-and-query-fixes-design.md diff --git a/Cargo.toml b/Cargo.toml index 9810525..b322005 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["crates/*"] resolver = "2" [workspace.package] -version = "0.6.9" +version = "0.7.0" edition = "2024" rust-version = "1.92" authors = ["init4"] @@ -35,13 +35,13 @@ incremental = false [workspace.dependencies] # internal -signet-hot = { version = "0.6.9", path = "./crates/hot" } -signet-hot-mdbx = { version = "0.6.9", path = "./crates/hot-mdbx" } -signet-cold = { version = "0.6.9", path = "./crates/cold" } -signet-cold-mdbx = { version = "0.6.9", path = "./crates/cold-mdbx" } -signet-cold-sql = { version = "0.6.9", path = "./crates/cold-sql" } -signet-storage = { version = "0.6.9", path = "./crates/storage" } -signet-storage-types = { version = "0.6.9", path = "./crates/types" } +signet-hot = { version = "0.7.0", path = "./crates/hot" } +signet-hot-mdbx = { version = "0.7.0", path = "./crates/hot-mdbx" } +signet-cold = { version = "0.7.0", path = "./crates/cold" } +signet-cold-mdbx = { version = "0.7.0", path = "./crates/cold-mdbx" } +signet-cold-sql = { version = "0.7.0", path = "./crates/cold-sql" } +signet-storage = { version = "0.7.0", path = "./crates/storage" } +signet-storage-types = { version = "0.7.0", path = "./crates/types" } # External, in-house signet-libmdbx = { version = "0.8.0" } diff --git a/crates/cold-sql/src/backend.rs b/crates/cold-sql/src/backend.rs index 22684f2..74eb9df 100644 --- a/crates/cold-sql/src/backend.rs +++ b/crates/cold-sql/src/backend.rs @@ -10,15 +10,16 @@ use crate::columns::{ COL_BENEFICIARY, COL_BLOB_GAS_USED, COL_BLOB_VERSIONED_HASHES, COL_BLOCK_DATA_HASH, COL_BLOCK_HASH, COL_BLOCK_LOG_INDEX, COL_BLOCK_NUMBER, COL_BLOCK_TIMESTAMP, COL_CHAIN_ID, COL_CNT, COL_CUMULATIVE_GAS_USED, COL_DATA, COL_DIFFICULTY, COL_EVENT_TYPE, - COL_EXCESS_BLOB_GAS, COL_EXTRA_DATA, COL_FIRST_LOG_INDEX, COL_FROM_ADDRESS, COL_GAS, - COL_GAS_LIMIT, COL_GAS_PRICE, COL_GAS_USED, COL_HOST_BLOCK_NUMBER, COL_INPUT, COL_LOGS_BLOOM, - COL_MAX_BN, COL_MAX_FEE_PER_BLOB_GAS, COL_MAX_FEE_PER_GAS, COL_MAX_PRIORITY_FEE_PER_GAS, - COL_MIX_HASH, COL_NONCE, COL_OMMERS_HASH, COL_ORDER_INDEX, COL_PARENT_BEACON_BLOCK_ROOT, - COL_PARENT_HASH, COL_PRIOR_GAS, COL_RECEIPTS_ROOT, COL_REQUESTS_HASH, COL_REWARD_ADDRESS, - COL_ROLLUP_CHAIN_ID, COL_ROLLUP_RECIPIENT, COL_SENDER, COL_SIG_R, COL_SIG_S, COL_SIG_Y_PARITY, - COL_STATE_ROOT, COL_SUCCESS, COL_TIMESTAMP, COL_TO_ADDRESS, COL_TOKEN, COL_TOPIC0, COL_TOPIC1, - COL_TOPIC2, COL_TOPIC3, COL_TRANSACTIONS_ROOT, COL_TX_HASH, COL_TX_INDEX, COL_TX_TYPE, - COL_VALUE, COL_WITHDRAWALS_ROOT, + COL_EXCESS_BLOB_GAS, COL_EXTRA_DATA, COL_FROM_ADDRESS, COL_GAS, COL_GAS_LIMIT, COL_GAS_PRICE, + COL_GAS_USED, COL_HOST_BLOCK_NUMBER, COL_INPUT, COL_LOGS_BLOOM, COL_MAX_BN, + COL_MAX_FEE_PER_BLOB_GAS, COL_MAX_FEE_PER_GAS, COL_MAX_PRIORITY_FEE_PER_GAS, COL_MIX_HASH, + COL_NONCE, COL_OMMERS_HASH, COL_ORDER_INDEX, COL_PARENT_BEACON_BLOCK_ROOT, COL_PARENT_HASH, + COL_PRIOR_GAS, COL_R_CUMULATIVE_GAS_USED, COL_R_FIRST_LOG_INDEX, COL_R_FROM_ADDRESS, + COL_R_SUCCESS, COL_R_TX_HASH, COL_R_TX_TYPE, COL_RECEIPTS_ROOT, COL_REQUESTS_HASH, + COL_REWARD_ADDRESS, COL_ROLLUP_CHAIN_ID, COL_ROLLUP_RECIPIENT, COL_SENDER, COL_SIG_R, + COL_SIG_S, COL_SIG_Y_PARITY, COL_STATE_ROOT, COL_SUCCESS, COL_TIMESTAMP, COL_TO_ADDRESS, + COL_TOKEN, COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3, COL_TRANSACTIONS_ROOT, COL_TX_HASH, + COL_TX_INDEX, COL_TX_TYPE, COL_VALUE, COL_WITHDRAWALS_ROOT, }; use crate::convert::{ EVENT_ENTER, EVENT_ENTER_TOKEN, EVENT_TRANSACT, build_receipt, decode_access_list_or_empty, @@ -49,6 +50,7 @@ use signet_zenith::{ Zenith, }; use sqlx::{AnyPool, Row}; +use std::time::Duration; /// SQL-based cold storage backend. /// @@ -75,6 +77,19 @@ pub struct SqlColdBackend { is_postgres: bool, } +/// Pool configuration overrides for [`SqlColdBackend::connect_with`]. +/// +/// `None` values use backend-specific defaults: +/// - SQLite: `max_connections = 1`, `acquire_timeout = 5s` +/// - PostgreSQL: `max_connections = 10`, `acquire_timeout = 5s` +#[derive(Debug, Clone, Copy, Default)] +pub struct PoolOverrides { + /// Override the maximum number of connections in the pool. + pub max_connections: Option, + /// Override the connection acquire timeout. + pub acquire_timeout: Option, +} + impl SqlColdBackend { /// Create a new SQL cold storage backend from an existing [`AnyPool`]. /// @@ -104,20 +119,40 @@ impl SqlColdBackend { Ok(Self { pool, is_postgres }) } - /// Connect to a database URL and create the backend. + /// Connect to a database URL with explicit pool overrides. /// /// Installs the default sqlx drivers on the first call. The database /// type is inferred from the URL scheme (`sqlite:` or `postgres:`). /// - /// For SQLite in-memory databases (`sqlite::memory:`), the pool is - /// limited to one connection to ensure all operations share the same - /// database. - pub async fn connect(url: &str) -> Result { + /// # Pool Defaults + /// + /// - **SQLite**: `max_connections = 1` (required for in-memory databases + /// to share state), `acquire_timeout = 5s`. + /// - **PostgreSQL**: `max_connections = 10`, `acquire_timeout = 5s`. + /// + /// Override any default by setting the corresponding field in + /// `overrides`. + pub async fn connect_with(url: &str, overrides: PoolOverrides) -> Result { sqlx::any::install_default_drivers(); - let pool: AnyPool = sqlx::pool::PoolOptions::new().max_connections(1).connect(url).await?; + let is_sqlite = url.starts_with("sqlite:"); + let default_max = if is_sqlite { 1 } else { 10 }; + let default_timeout = Duration::from_secs(5); + let pool: AnyPool = sqlx::pool::PoolOptions::new() + .max_connections(overrides.max_connections.unwrap_or(default_max)) + .acquire_timeout(overrides.acquire_timeout.unwrap_or(default_timeout)) + .connect(url) + .await?; Self::new(pool).await } + /// Connect to a database URL with default pool settings. + /// + /// Convenience wrapper around [`connect_with`](Self::connect_with). + /// See that method for default pool sizes per backend. + pub async fn connect(url: &str) -> Result { + Self::connect_with(url, PoolOverrides::default()).await + } + // ======================================================================== // Specifier resolution // ======================================================================== @@ -334,9 +369,12 @@ fn tx_from_row(r: &sqlx::any::AnyRow) -> Result let sig = Signature::new(r.get(COL_SIG_R), r.get(COL_SIG_S), r.get::(COL_SIG_Y_PARITY) != 0); - let tx_type_raw = r.get::(COL_TX_TYPE) as u8; - let tx_type = TxType::try_from(tx_type_raw) - .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type_raw}")))?; + let tx_type_raw: i32 = r.get(COL_TX_TYPE); + let tx_type_u8: u8 = tx_type_raw + .try_into() + .map_err(|_| SqlColdError::Convert(format!("tx_type out of u8 range: {tx_type_raw}")))?; + let tx_type = TxType::try_from(tx_type_u8) + .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type_u8}")))?; let chain_id: Option = r.get(COL_CHAIN_ID); let nonce = from_i64(r.get(COL_NONCE)); @@ -1085,15 +1123,27 @@ impl ColdStorageRead for SqlColdBackend { ReceiptSpecifier::BlockAndIndex { block, index } => (block, index), }; - let Some(header) = self.fetch_header_by_number(block).await? else { - return Ok(None); - }; - - // Fetch receipt + tx_hash + from_address - let receipt_row = sqlx::query( - "SELECT r.*, t.tx_hash, t.from_address - FROM receipts r - JOIN transactions t ON r.block_number = t.block_number AND r.tx_index = t.tx_index + // Combined query: receipt + tx metadata + full header + prior gas. + // Header columns use standard names (h.*) so header_from_row works. + // Receipt/tx columns use r_ prefix to avoid name collisions. + let combined = sqlx::query( + "SELECT h.*, \ + r.tx_type AS r_tx_type, r.success AS r_success, \ + r.cumulative_gas_used AS r_cumulative_gas_used, \ + r.first_log_index AS r_first_log_index, \ + r.tx_index AS r_tx_index, \ + t.tx_hash AS r_tx_hash, t.from_address AS r_from_address, \ + COALESCE( \ + (SELECT CAST(MAX(r2.cumulative_gas_used) AS bigint) \ + FROM receipts r2 \ + WHERE r2.block_number = r.block_number \ + AND r2.tx_index < r.tx_index), \ + 0 \ + ) AS prior_gas \ + FROM receipts r \ + JOIN transactions t ON r.block_number = t.block_number \ + AND r.tx_index = t.tx_index \ + JOIN headers h ON r.block_number = h.block_number \ WHERE r.block_number = $1 AND r.tx_index = $2", ) .bind(to_i64(block)) @@ -1102,23 +1152,28 @@ impl ColdStorageRead for SqlColdBackend { .await .map_err(SqlColdError::from)?; - let Some(rr) = receipt_row else { + let Some(rr) = combined else { return Ok(None); }; - let bn: i64 = rr.get(COL_BLOCK_NUMBER); - let tx_idx: i64 = rr.get(COL_TX_INDEX); - let tx_hash = rr.get(COL_TX_HASH); - let sender = rr.get(COL_FROM_ADDRESS); - let tx_type = rr.get::(COL_TX_TYPE) as i16; - let success = rr.get::(COL_SUCCESS) != 0; - let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED); + // Extract header using existing helper (h.* columns are unaliased). + let header = header_from_row(&rr).map_err(ColdStorageError::from)?.seal_slow(); + + // Extract receipt fields from r_ prefixed aliases. + let tx_hash = rr.get(COL_R_TX_HASH); + let sender = rr.get(COL_R_FROM_ADDRESS); + let tx_type: i32 = rr.get(COL_R_TX_TYPE); + let success = rr.get::(COL_R_SUCCESS) != 0; + let cumulative_gas_used: i64 = rr.get(COL_R_CUMULATIVE_GAS_USED); + let first_log_index: u64 = from_i64(rr.get::(COL_R_FIRST_LOG_INDEX)); + let prior_cumulative_gas: u64 = rr.get::, _>(COL_PRIOR_GAS).unwrap_or(0) as u64; + // Logs still require a separate query (variable row count). let log_rows = sqlx::query( "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index", ) - .bind(bn) - .bind(tx_idx) + .bind(to_i64(block)) + .bind(to_i64(index)) .fetch_all(&self.pool) .await .map_err(SqlColdError::from)?; @@ -1126,21 +1181,6 @@ impl ColdStorageRead for SqlColdBackend { let logs = log_rows.iter().map(log_from_row).collect(); let built = build_receipt(tx_type, success, cumulative_gas_used, logs) .map_err(ColdStorageError::from)?; - - // Read first_log_index directly from the receipt row; compute - // gas_used from the prior receipt's cumulative gas. - let first_log_index: u64 = from_i64(rr.get::(COL_FIRST_LOG_INDEX)); - let prior = sqlx::query( - "SELECT CAST(MAX(r.cumulative_gas_used) AS bigint) as prior_gas - FROM receipts r WHERE r.block_number = $1 AND r.tx_index < $2", - ) - .bind(to_i64(block)) - .bind(to_i64(index)) - .fetch_one(&self.pool) - .await - .map_err(SqlColdError::from)?; - let prior_cumulative_gas: u64 = - prior.get::, _>(COL_PRIOR_GAS).unwrap_or(0) as u64; let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas; let ir = IndexedReceipt { receipt: built, tx_hash, first_log_index, gas_used, sender }; @@ -1193,7 +1233,7 @@ impl ColdStorageRead for SqlColdBackend { let tx_idx: i64 = rr.get(COL_TX_INDEX); let tx_hash = rr.get(COL_TX_HASH); let sender = rr.get(COL_FROM_ADDRESS); - let tx_type = rr.get::(COL_TX_TYPE) as i16; + let tx_type = rr.get::(COL_TX_TYPE); let success = rr.get::(COL_SUCCESS) != 0; let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED); let logs = logs_by_tx.remove(&tx_idx).unwrap_or_default(); @@ -1298,21 +1338,11 @@ impl ColdStorageRead for SqlColdBackend { let (filter_clause, params) = build_log_filter_clause(filter, 3); let where_clause = format!("l.block_number >= $1 AND l.block_number <= $2{filter_clause}"); - // Run a cheap COUNT(*) query first to reject queries that exceed - // the limit without loading any row data. - let count_sql = format!("SELECT COUNT(*) as cnt FROM logs l WHERE {where_clause}"); - let mut count_query = sqlx::query(&count_sql).bind(to_i64(from)).bind(to_i64(to)); - for param in ¶ms { - count_query = count_query.bind(*param); - } - let count_row = count_query.fetch_one(&self.pool).await.map_err(SqlColdError::from)?; - let count = from_i64(count_row.get::(COL_CNT)) as usize; - if count > max_logs { - return Err(ColdStorageError::TooManyLogs { limit: max_logs }); - } - - // Fetch the actual log data with JOINs and the correlated subquery - // for block_log_index (absolute position within block). + // Use LIMIT to cap results. Fetch one extra row to detect overflow + // without a separate COUNT query. PostgreSQL stops scanning after + // finding enough rows, making this faster than COUNT in both the + // under-limit and over-limit cases. + let limit_idx = 3 + params.len() as u32; let data_sql = format!( "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \ (r.first_log_index + l.log_index) AS block_log_index \ @@ -1323,15 +1353,24 @@ impl ColdStorageRead for SqlColdBackend { JOIN receipts r ON l.block_number = r.block_number \ AND l.tx_index = r.tx_index \ WHERE {where_clause} \ - ORDER BY l.block_number, l.tx_index, l.log_index" + ORDER BY l.block_number, l.tx_index, l.log_index \ + LIMIT ${limit_idx}" ); let mut query = sqlx::query(&data_sql).bind(to_i64(from)).bind(to_i64(to)); for param in ¶ms { query = query.bind(*param); } + // Clamp to i64::MAX before converting: SQL LIMIT is an i64, and + // saturating_add avoids overflow when max_logs is usize::MAX. + let limit = max_logs.saturating_add(1).min(i64::MAX as usize); + query = query.bind(to_i64(limit as u64)); let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?; + if rows.len() > max_logs { + return Err(ColdStorageError::TooManyLogs { limit: max_logs }); + } + rows.into_iter() .map(|r| { let log = log_from_row(&r); @@ -1400,7 +1439,117 @@ impl ColdStorageWrite for SqlColdBackend { } } -impl ColdStorage for SqlColdBackend {} +impl ColdStorage for SqlColdBackend { + async fn drain_above(&mut self, block: BlockNumber) -> ColdResult>> { + let bn = to_i64(block); + let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?; + + // 1. Fetch all headers above block. + let header_rows = + sqlx::query("SELECT * FROM headers WHERE block_number > $1 ORDER BY block_number") + .bind(bn) + .fetch_all(&mut *tx) + .await + .map_err(SqlColdError::from)?; + + if header_rows.is_empty() { + tx.commit().await.map_err(SqlColdError::from)?; + return Ok(Vec::new()); + } + + let mut headers: std::collections::BTreeMap = + std::collections::BTreeMap::new(); + for r in &header_rows { + let num: i64 = r.get(COL_BLOCK_NUMBER); + let h = header_from_row(r).map_err(ColdStorageError::from)?.seal_slow(); + headers.insert(num, h); + } + + // 2. Fetch all receipt + tx metadata above block. + let receipt_rows = sqlx::query( + "SELECT r.*, t.tx_hash, t.from_address \ + FROM receipts r \ + JOIN transactions t ON r.block_number = t.block_number \ + AND r.tx_index = t.tx_index \ + WHERE r.block_number > $1 \ + ORDER BY r.block_number, r.tx_index", + ) + .bind(bn) + .fetch_all(&mut *tx) + .await + .map_err(SqlColdError::from)?; + + // 3. Fetch all logs above block. + let log_rows = sqlx::query( + "SELECT * FROM logs WHERE block_number > $1 \ + ORDER BY block_number, tx_index, log_index", + ) + .bind(bn) + .fetch_all(&mut *tx) + .await + .map_err(SqlColdError::from)?; + + // Group logs by (block_number, tx_index). + let mut logs_by_block_tx: std::collections::BTreeMap<(i64, i64), Vec> = + std::collections::BTreeMap::new(); + for r in &log_rows { + let block_num: i64 = r.get(COL_BLOCK_NUMBER); + let tx_idx: i64 = r.get(COL_TX_INDEX); + logs_by_block_tx.entry((block_num, tx_idx)).or_default().push(log_from_row(r)); + } + + // Group receipt rows by block_number. + let mut receipts_by_block: std::collections::BTreeMap> = + std::collections::BTreeMap::new(); + for r in &receipt_rows { + let block_num: i64 = r.get(COL_BLOCK_NUMBER); + receipts_by_block.entry(block_num).or_default().push(r); + } + + // 4. Assemble ColdReceipts per block. + let mut all_receipts = Vec::with_capacity(headers.len()); + for (&block_num, header) in &headers { + let block_receipt_rows = receipts_by_block.remove(&block_num).unwrap_or_default(); + let mut first_log_index = 0u64; + let mut prior_cumulative_gas = 0u64; + let block_receipts: ColdResult> = block_receipt_rows + .into_iter() + .enumerate() + .map(|(idx, rr)| { + let tx_idx: i64 = rr.get(COL_TX_INDEX); + let tx_hash = rr.get(COL_TX_HASH); + let sender = rr.get(COL_FROM_ADDRESS); + let tx_type: i32 = rr.get(COL_TX_TYPE); + let success = rr.get::(COL_SUCCESS) != 0; + let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED); + let logs = logs_by_block_tx.remove(&(block_num, tx_idx)).unwrap_or_default(); + let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs) + .map_err(ColdStorageError::from)?; + let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas; + prior_cumulative_gas = receipt.inner.cumulative_gas_used; + let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender }; + first_log_index += ir.receipt.inner.logs.len() as u64; + Ok(ColdReceipt::new(ir, header, idx as u64)) + }) + .collect(); + all_receipts.push(block_receipts?); + } + + // 5. Delete from all tables (same order as truncate_above). + for table in + ["logs", "transactions", "receipts", "signet_events", "zenith_headers", "headers"] + { + sqlx::query(&format!("DELETE FROM {table} WHERE block_number > $1")) + .bind(bn) + .execute(&mut *tx) + .await + .map_err(SqlColdError::from)?; + } + + tx.commit().await.map_err(SqlColdError::from)?; + Ok(all_receipts) + } +} #[cfg(all(test, feature = "test-utils"))] mod tests { diff --git a/crates/cold-sql/src/columns.rs b/crates/cold-sql/src/columns.rs index f0fc8a3..6f30183 100644 --- a/crates/cold-sql/src/columns.rs +++ b/crates/cold-sql/src/columns.rs @@ -53,7 +53,6 @@ pub(crate) const COL_FROM_ADDRESS: &str = "from_address"; // ── receipt columns ───────────────────────────────────────────────────────── pub(crate) const COL_SUCCESS: &str = "success"; pub(crate) const COL_CUMULATIVE_GAS_USED: &str = "cumulative_gas_used"; -pub(crate) const COL_FIRST_LOG_INDEX: &str = "first_log_index"; // ── log columns ───────────────────────────────────────────────────────────── pub(crate) const COL_ADDRESS: &str = "address"; @@ -84,3 +83,11 @@ pub(crate) const COL_PRIOR_GAS: &str = "prior_gas"; pub(crate) const COL_BLOCK_LOG_INDEX: &str = "block_log_index"; pub(crate) const COL_BLOCK_TIMESTAMP: &str = "block_timestamp"; pub(crate) const COL_MAX_BN: &str = "max_bn"; + +// ── get_receipt combined query aliases ────────────────────────────────────── +pub(crate) const COL_R_TX_TYPE: &str = "r_tx_type"; +pub(crate) const COL_R_SUCCESS: &str = "r_success"; +pub(crate) const COL_R_CUMULATIVE_GAS_USED: &str = "r_cumulative_gas_used"; +pub(crate) const COL_R_FIRST_LOG_INDEX: &str = "r_first_log_index"; +pub(crate) const COL_R_TX_HASH: &str = "r_tx_hash"; +pub(crate) const COL_R_FROM_ADDRESS: &str = "r_from_address"; diff --git a/crates/cold-sql/src/connector.rs b/crates/cold-sql/src/connector.rs index 08bce0f..994a83a 100644 --- a/crates/cold-sql/src/connector.rs +++ b/crates/cold-sql/src/connector.rs @@ -1,7 +1,8 @@ //! SQL cold storage connector. -use crate::{SqlColdBackend, SqlColdError}; +use crate::{PoolOverrides, SqlColdBackend, SqlColdError}; use signet_cold::ColdConnect; +use std::time::Duration; /// Errors that can occur when initializing SQL connectors. #[derive(Debug, thiserror::Error)] @@ -21,16 +22,23 @@ pub enum SqlConnectorError { /// - URLs starting with `postgres://` or `postgresql://` use PostgreSQL /// - URLs starting with `sqlite:` use SQLite /// +/// Pool settings use backend-specific defaults. Use +/// [`with_max_connections`] and [`with_acquire_timeout`] to override. +/// +/// [`with_max_connections`]: Self::with_max_connections +/// [`with_acquire_timeout`]: Self::with_acquire_timeout +/// /// # Example /// /// ```ignore /// use signet_cold_sql::SqlConnector; /// -/// // PostgreSQL -/// let pg = SqlConnector::new("postgres://localhost/signet"); +/// // PostgreSQL with custom pool size +/// let pg = SqlConnector::new("postgres://localhost/signet") +/// .with_max_connections(20); /// let backend = pg.connect().await?; /// -/// // SQLite +/// // SQLite (defaults) /// let sqlite = SqlConnector::new("sqlite::memory:"); /// let backend = sqlite.connect().await?; /// ``` @@ -38,15 +46,21 @@ pub enum SqlConnectorError { #[derive(Debug, Clone)] pub struct SqlConnector { url: String, + overrides: PoolOverrides, } #[cfg(any(feature = "sqlite", feature = "postgres"))] impl SqlConnector { /// Create a new SQL connector. /// - /// The database type is detected from the URL prefix. + /// The database type is detected from the URL prefix. Pool settings + /// use backend-specific defaults. Use [`with_max_connections`] and + /// [`with_acquire_timeout`] to override. + /// + /// [`with_max_connections`]: Self::with_max_connections + /// [`with_acquire_timeout`]: Self::with_acquire_timeout pub fn new(url: impl Into) -> Self { - Self { url: url.into() } + Self { url: url.into(), overrides: PoolOverrides::default() } } /// Get a reference to the connection URL. @@ -54,9 +68,26 @@ impl SqlConnector { &self.url } + /// Override the maximum number of pool connections. + /// + /// Default: 1 for SQLite, 10 for PostgreSQL. + pub const fn with_max_connections(mut self, n: u32) -> Self { + self.overrides.max_connections = Some(n); + self + } + + /// Override the connection acquire timeout. + /// + /// Default: 5 seconds for all backends. + pub const fn with_acquire_timeout(mut self, timeout: Duration) -> Self { + self.overrides.acquire_timeout = Some(timeout); + self + } + /// Create a connector from environment variables. /// /// Reads the SQL URL from the specified environment variable. + /// Uses default pool settings. /// /// # Example /// @@ -78,6 +109,7 @@ impl ColdConnect for SqlConnector { fn connect(&self) -> impl std::future::Future> + Send { let url = self.url.clone(); - async move { SqlColdBackend::connect(&url).await } + let overrides = self.overrides; + async move { SqlColdBackend::connect_with(&url, overrides).await } } } diff --git a/crates/cold-sql/src/convert.rs b/crates/cold-sql/src/convert.rs index b90ae4f..b62f0e9 100644 --- a/crates/cold-sql/src/convert.rs +++ b/crates/cold-sql/src/convert.rs @@ -20,12 +20,25 @@ use signet_storage_types::Receipt; // ============================================================================ /// Convert u64 to i64 for SQL storage. -pub(crate) const fn to_i64(v: u64) -> i64 { +/// +/// # Panics +/// +/// Debug-asserts that `v` fits in an `i64`. All block numbers, gas +/// values, and indices in Ethereum are well below `i64::MAX`; a value +/// that overflows indicates data corruption. +pub(crate) fn to_i64(v: u64) -> i64 { + debug_assert!(v <= i64::MAX as u64, "u64 value {v} overflows i64"); v as i64 } /// Convert i64 from SQL back to u64. -pub(crate) const fn from_i64(v: i64) -> u64 { +/// +/// # Panics +/// +/// Debug-asserts that `v` is non-negative. Negative values from the +/// database indicate data corruption. +pub(crate) fn from_i64(v: i64) -> u64 { + debug_assert!(v >= 0, "negative i64 value {v} cannot represent u64"); v as u64 } @@ -208,13 +221,16 @@ pub(crate) fn decode_b256_vec(data: &[u8]) -> Result, SqlColdError> { /// Reconstruct a [`Receipt`] from primitive column values and decoded logs. pub(crate) fn build_receipt( - tx_type: i16, + tx_type_raw: i32, success: bool, cumulative_gas_used: i64, logs: Vec, ) -> Result { - let tx_type = TxType::try_from(tx_type as u8) - .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type}")))?; + let tx_type_u8: u8 = tx_type_raw + .try_into() + .map_err(|_| SqlColdError::Convert(format!("tx_type out of u8 range: {tx_type_raw}")))?; + let tx_type = TxType::try_from(tx_type_u8) + .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type_u8}")))?; Ok(Receipt { tx_type, inner: AlloyReceipt { diff --git a/crates/cold-sql/src/lib.rs b/crates/cold-sql/src/lib.rs index f28e2ad..8ff594a 100644 --- a/crates/cold-sql/src/lib.rs +++ b/crates/cold-sql/src/lib.rs @@ -45,7 +45,7 @@ mod convert; #[cfg(any(feature = "sqlite", feature = "postgres"))] mod backend; #[cfg(any(feature = "sqlite", feature = "postgres"))] -pub use backend::SqlColdBackend; +pub use backend::{PoolOverrides, SqlColdBackend}; #[cfg(any(feature = "sqlite", feature = "postgres"))] mod connector; diff --git a/docs/superpowers/plans/2026-04-03-cold-sql-pool-and-query-fixes.md b/docs/superpowers/plans/2026-04-03-cold-sql-pool-and-query-fixes.md new file mode 100644 index 0000000..2a6ca08 --- /dev/null +++ b/docs/superpowers/plans/2026-04-03-cold-sql-pool-and-query-fixes.md @@ -0,0 +1,672 @@ +# Cold SQL Pool & Query Fixes Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Fix single-connection pool starvation and query inefficiencies in the SQL cold storage backend. + +**Architecture:** All changes are in `signet-cold-sql`. Pool configuration is split between `backend.rs` (defaults) and `connector.rs` (builder). Query optimizations touch `backend.rs` only. Integer safety touches `convert.rs` and `backend.rs`. Each task is independently testable via the existing conformance suite. + +**Tech Stack:** Rust, sqlx (AnyPool), PostgreSQL, SQLite + +**Spec:** `docs/superpowers/specs/2026-04-03-cold-sql-pool-and-query-fixes-design.md` + +--- + +### Task 1: Integer Conversion Safety + +**Files:** +- Modify: `crates/cold-sql/src/convert.rs:22-30` +- Modify: `crates/cold-sql/src/backend.rs:337-339` + +- [ ] **Step 1: Fix `to_i64` and `from_i64` in convert.rs** + +Replace the bare `as` casts with debug-asserted conversions: + +```rust +/// Convert u64 to i64 for SQL storage. +/// +/// # Panics +/// +/// Debug-asserts that `v` fits in an `i64`. All block numbers, gas +/// values, and indices in Ethereum are well below `i64::MAX`; a value +/// that overflows indicates data corruption. +pub(crate) fn to_i64(v: u64) -> i64 { + debug_assert!(v <= i64::MAX as u64, "u64 value {v} overflows i64"); + v as i64 +} + +/// Convert i64 from SQL back to u64. +/// +/// # Panics +/// +/// Debug-asserts that `v` is non-negative. Negative values from the +/// database indicate data corruption. +pub(crate) fn from_i64(v: i64) -> u64 { + debug_assert!(v >= 0, "negative i64 value {v} cannot represent u64"); + v as u64 +} +``` + +Drop `const` — these are never used in const contexts and `debug_assert!` isn't available in const fns. + +- [ ] **Step 2: Fix `tx_type` truncation in backend.rs** + +In `tx_from_row` (~line 337), replace: + +```rust + let tx_type_raw = r.get::(COL_TX_TYPE) as u8; + let tx_type = TxType::try_from(tx_type_raw) + .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type_raw}")))?; +``` + +With: + +```rust + let tx_type_raw: i32 = r.get(COL_TX_TYPE); + let tx_type_u8: u8 = tx_type_raw + .try_into() + .map_err(|_| SqlColdError::Convert(format!("tx_type out of u8 range: {tx_type_raw}")))?; + let tx_type = TxType::try_from(tx_type_u8) + .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type_u8}")))?; +``` + +- [ ] **Step 3: Run conformance tests** + +Run: `cargo t -p signet-cold-sql --all-features` +Expected: All tests pass (the assertions only fire in debug builds, and valid data won't trigger them). + +- [ ] **Step 4: Run clippy** + +Run: `cargo clippy -p signet-cold-sql --all-features --all-targets` +Run: `cargo clippy -p signet-cold-sql --no-default-features --all-targets` +Expected: No new warnings. + +- [ ] **Step 5: Commit** + +```bash +git add crates/cold-sql/src/convert.rs crates/cold-sql/src/backend.rs +git commit -m "fix(cold-sql): add safety checks to integer conversions" +``` + +--- + +### Task 2: Pool Configuration + +**Files:** +- Modify: `crates/cold-sql/src/backend.rs:107-119` +- Modify: `crates/cold-sql/src/connector.rs` + +- [ ] **Step 1: Add `connect_with` to SqlColdBackend** + +In `backend.rs`, add a `use std::time::Duration;` import at the top alongside the existing imports. + +Then replace the existing `connect` method and add `connect_with`: + +```rust + /// Pool configuration overrides for [`connect_with`](Self::connect_with). + /// + /// `None` values use backend-specific defaults: + /// - SQLite: `max_connections = 1`, `acquire_timeout = 5s` + /// - PostgreSQL: `max_connections = 10`, `acquire_timeout = 5s` + #[derive(Debug, Clone, Default)] + pub struct PoolOverrides { + /// Override the maximum number of connections in the pool. + pub max_connections: Option, + /// Override the connection acquire timeout. + pub acquire_timeout: Option, + } +``` + +Note: `PoolOverrides` should be defined as a top-level struct in `backend.rs` (outside the `impl` block), and re-exported from `lib.rs`. + +```rust + /// Connect to a database URL with explicit pool overrides. + /// + /// Installs the default sqlx drivers on the first call. The database + /// type is inferred from the URL scheme (`sqlite:` or `postgres:`). + /// + /// # Pool Defaults + /// + /// - **SQLite**: `max_connections = 1` (required for in-memory databases + /// to share state), `acquire_timeout = 5s`. + /// - **PostgreSQL**: `max_connections = 10`, `acquire_timeout = 5s`. + /// + /// Override any default by setting the corresponding field in + /// `overrides`. + pub async fn connect_with( + url: &str, + overrides: PoolOverrides, + ) -> Result { + sqlx::any::install_default_drivers(); + let is_sqlite = url.starts_with("sqlite:"); + let default_max = if is_sqlite { 1 } else { 10 }; + let default_timeout = Duration::from_secs(5); + let pool: AnyPool = sqlx::pool::PoolOptions::new() + .max_connections(overrides.max_connections.unwrap_or(default_max)) + .acquire_timeout(overrides.acquire_timeout.unwrap_or(default_timeout)) + .connect(url) + .await?; + Self::new(pool).await + } + + /// Connect to a database URL with default pool settings. + /// + /// Convenience wrapper around [`connect_with`](Self::connect_with). + /// See that method for default pool sizes per backend. + pub async fn connect(url: &str) -> Result { + Self::connect_with(url, PoolOverrides::default()).await + } +``` + +- [ ] **Step 2: Re-export PoolOverrides from lib.rs** + +In `lib.rs`, add the re-export alongside `SqlColdBackend`: + +```rust +#[cfg(any(feature = "sqlite", feature = "postgres"))] +pub use backend::{PoolOverrides, SqlColdBackend}; +``` + +- [ ] **Step 3: Add builder methods to SqlConnector** + +In `connector.rs`, update `SqlConnector` to carry pool overrides: + +```rust +use crate::{PoolOverrides, SqlColdBackend, SqlColdError}; +use signet_cold::ColdConnect; +use std::time::Duration; + +// ... SqlConnectorError stays unchanged ... + +#[cfg(any(feature = "sqlite", feature = "postgres"))] +#[derive(Debug, Clone)] +pub struct SqlConnector { + url: String, + overrides: PoolOverrides, +} + +#[cfg(any(feature = "sqlite", feature = "postgres"))] +impl SqlConnector { + /// Create a new SQL connector. + /// + /// The database type is detected from the URL prefix. Pool settings + /// use backend-specific defaults. Use [`with_max_connections`] and + /// [`with_acquire_timeout`] to override. + /// + /// [`with_max_connections`]: Self::with_max_connections + /// [`with_acquire_timeout`]: Self::with_acquire_timeout + pub fn new(url: impl Into) -> Self { + Self { url: url.into(), overrides: PoolOverrides::default() } + } + + /// Get a reference to the connection URL. + pub fn url(&self) -> &str { + &self.url + } + + /// Override the maximum number of pool connections. + /// + /// Default: 1 for SQLite, 10 for PostgreSQL. + pub fn with_max_connections(mut self, n: u32) -> Self { + self.overrides.max_connections = Some(n); + self + } + + /// Override the connection acquire timeout. + /// + /// Default: 5 seconds for all backends. + pub fn with_acquire_timeout(mut self, timeout: Duration) -> Self { + self.overrides.acquire_timeout = Some(timeout); + self + } + + /// Create a connector from environment variables. + /// + /// Reads the SQL URL from the specified environment variable. + /// Uses default pool settings. + pub fn from_env(env_var: &'static str) -> Result { + let url = + std::env::var(env_var).map_err(|_| SqlConnectorError::MissingEnvVar(env_var))?; + Ok(Self::new(url)) + } +} + +#[cfg(any(feature = "sqlite", feature = "postgres"))] +impl ColdConnect for SqlConnector { + type Cold = SqlColdBackend; + type Error = SqlColdError; + + fn connect( + &self, + ) -> impl std::future::Future> + Send { + let url = self.url.clone(); + let overrides = self.overrides.clone(); + async move { SqlColdBackend::connect_with(&url, overrides).await } + } +} +``` + +- [ ] **Step 4: Run conformance tests** + +Run: `cargo t -p signet-cold-sql --all-features` +Expected: All tests pass. SQLite still uses 1 connection. + +- [ ] **Step 5: Run clippy and format** + +Run: `cargo clippy -p signet-cold-sql --all-features --all-targets` +Run: `cargo clippy -p signet-cold-sql --no-default-features --all-targets` +Run: `cargo +nightly fmt` +Expected: Clean. + +- [ ] **Step 6: Commit** + +```bash +git add crates/cold-sql/src/backend.rs crates/cold-sql/src/connector.rs crates/cold-sql/src/lib.rs +git commit -m "fix(cold-sql): use per-backend pool defaults and add SqlConnector builder" +``` + +--- + +### Task 3: `get_logs` LIMIT Optimization + +**Files:** +- Modify: `crates/cold-sql/src/backend.rs` (fn `get_logs`, ~line 1293) + +- [ ] **Step 1: Replace COUNT + SELECT with LIMIT** + +Replace the entire `get_logs` implementation: + +```rust + async fn get_logs(&self, filter: &Filter, max_logs: usize) -> ColdResult> { + let from = filter.get_from_block().unwrap_or(0); + let to = filter.get_to_block().unwrap_or(u64::MAX); + + // Build WHERE clause: block range ($1, $2) + address/topic filters. + let (filter_clause, params) = build_log_filter_clause(filter, 3); + let where_clause = format!("l.block_number >= $1 AND l.block_number <= $2{filter_clause}"); + + // Use LIMIT to cap results. Fetch one extra row to detect overflow + // without a separate COUNT query. PostgreSQL stops scanning after + // finding enough rows, making this faster than COUNT in both the + // under-limit and over-limit cases. + let limit_idx = 3 + params.len() as u32; + let data_sql = format!( + "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \ + (r.first_log_index + l.log_index) AS block_log_index \ + FROM logs l \ + JOIN headers h ON l.block_number = h.block_number \ + JOIN transactions t ON l.block_number = t.block_number \ + AND l.tx_index = t.tx_index \ + JOIN receipts r ON l.block_number = r.block_number \ + AND l.tx_index = r.tx_index \ + WHERE {where_clause} \ + ORDER BY l.block_number, l.tx_index, l.log_index \ + LIMIT ${limit_idx}" + ); + let mut query = sqlx::query(&data_sql).bind(to_i64(from)).bind(to_i64(to)); + for param in ¶ms { + query = query.bind(*param); + } + let limit = max_logs + 1; + query = query.bind(to_i64(limit as u64)); + + let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?; + + if rows.len() > max_logs { + return Err(ColdStorageError::TooManyLogs { limit: max_logs }); + } + + rows.into_iter() + .map(|r| { + let log = log_from_row(&r); + Ok(RpcLog { + inner: log, + block_hash: Some(r.get(COL_BLOCK_HASH)), + block_number: Some(from_i64(r.get::(COL_BLOCK_NUMBER))), + block_timestamp: Some(from_i64(r.get::(COL_BLOCK_TIMESTAMP))), + transaction_hash: Some(r.get(COL_TX_HASH)), + transaction_index: Some(from_i64(r.get::(COL_TX_INDEX))), + log_index: Some(from_i64(r.get::(COL_BLOCK_LOG_INDEX))), + removed: false, + }) + }) + .collect::>>() + } +``` + +- [ ] **Step 2: Run conformance tests** + +Run: `cargo t -p signet-cold-sql --all-features` +Expected: All tests pass. The conformance suite exercises `get_logs` with limits. + +- [ ] **Step 3: Clippy** + +Run: `cargo clippy -p signet-cold-sql --all-features --all-targets` +Expected: Clean. + +- [ ] **Step 4: Commit** + +```bash +git add crates/cold-sql/src/backend.rs +git commit -m "perf(cold-sql): replace get_logs COUNT+SELECT with single LIMIT query" +``` + +--- + +### Task 4: `get_receipt` Query Consolidation + +**Files:** +- Modify: `crates/cold-sql/src/columns.rs` +- Modify: `crates/cold-sql/src/backend.rs` (fn `get_receipt`, ~line 1068) + +- [ ] **Step 1: Add aliased column constants** + +In `columns.rs`, add at the bottom (in the "query-specific aliases" section): + +```rust +// ── get_receipt combined query aliases ────────────────────────────────────── +pub(crate) const COL_R_TX_TYPE: &str = "r_tx_type"; +pub(crate) const COL_R_SUCCESS: &str = "r_success"; +pub(crate) const COL_R_CUMULATIVE_GAS_USED: &str = "r_cumulative_gas_used"; +pub(crate) const COL_R_FIRST_LOG_INDEX: &str = "r_first_log_index"; +pub(crate) const COL_R_TX_HASH: &str = "r_tx_hash"; +pub(crate) const COL_R_FROM_ADDRESS: &str = "r_from_address"; +pub(crate) const COL_R_TX_INDEX: &str = "r_tx_index"; +``` + +Add the corresponding imports in `backend.rs` where the other column imports are (~line 8-21). + +- [ ] **Step 2: Replace `get_receipt` implementation** + +Replace the `get_receipt` body with a consolidated version. The combined query selects `h.*` (header columns keep standard names for reuse with `header_from_row`), receipt/tx columns with `r_` prefix aliases, and a correlated subquery for `prior_gas`: + +```rust + async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult> { + // Resolve to (block, index) + let (block, index) = match spec { + ReceiptSpecifier::TxHash(hash) => { + let row = sqlx::query( + "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1", + ) + .bind(hash) + .fetch_optional(&self.pool) + .await + .map_err(SqlColdError::from)?; + let Some(r) = row else { return Ok(None) }; + ( + from_i64(r.get::(COL_BLOCK_NUMBER)), + from_i64(r.get::(COL_TX_INDEX)), + ) + } + ReceiptSpecifier::BlockAndIndex { block, index } => (block, index), + }; + + // Combined query: receipt + tx metadata + full header + prior gas. + // Header columns use standard names (h.*) so header_from_row works. + // Receipt/tx columns use r_ prefix to avoid collisions. + let combined = sqlx::query( + "SELECT h.*, \ + r.tx_type AS r_tx_type, r.success AS r_success, \ + r.cumulative_gas_used AS r_cumulative_gas_used, \ + r.first_log_index AS r_first_log_index, \ + r.tx_index AS r_tx_index, \ + t.tx_hash AS r_tx_hash, t.from_address AS r_from_address, \ + COALESCE( \ + (SELECT CAST(MAX(r2.cumulative_gas_used) AS bigint) \ + FROM receipts r2 \ + WHERE r2.block_number = r.block_number \ + AND r2.tx_index < r.tx_index), \ + 0 \ + ) AS prior_gas \ + FROM receipts r \ + JOIN transactions t ON r.block_number = t.block_number \ + AND r.tx_index = t.tx_index \ + JOIN headers h ON r.block_number = h.block_number \ + WHERE r.block_number = $1 AND r.tx_index = $2", + ) + .bind(to_i64(block)) + .bind(to_i64(index)) + .fetch_optional(&self.pool) + .await + .map_err(SqlColdError::from)?; + + let Some(rr) = combined else { + return Ok(None); + }; + + // Extract header using existing helper (h.* columns are unaliased). + let header = header_from_row(&rr).map_err(ColdStorageError::from)?.seal_slow(); + + // Extract receipt fields from r_ prefixed aliases. + let tx_hash = rr.get(COL_R_TX_HASH); + let sender = rr.get(COL_R_FROM_ADDRESS); + let tx_type = rr.get::(COL_R_TX_TYPE) as i16; + let success = rr.get::(COL_R_SUCCESS) != 0; + let cumulative_gas_used: i64 = rr.get(COL_R_CUMULATIVE_GAS_USED); + let first_log_index: u64 = from_i64(rr.get::(COL_R_FIRST_LOG_INDEX)); + let prior_cumulative_gas: u64 = + rr.get::, _>(COL_PRIOR_GAS).unwrap_or(0) as u64; + + // Logs still require a separate query (variable row count). + let log_rows = sqlx::query( + "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index", + ) + .bind(to_i64(block)) + .bind(to_i64(index)) + .fetch_all(&self.pool) + .await + .map_err(SqlColdError::from)?; + + let logs = log_rows.iter().map(log_from_row).collect(); + let built = build_receipt(tx_type, success, cumulative_gas_used, logs) + .map_err(ColdStorageError::from)?; + let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas; + + let ir = IndexedReceipt { receipt: built, tx_hash, first_log_index, gas_used, sender }; + Ok(Some(ColdReceipt::new(ir, &header, index))) + } +``` + +- [ ] **Step 3: Run conformance tests** + +Run: `cargo t -p signet-cold-sql --all-features` +Expected: All tests pass. + +- [ ] **Step 4: Clippy and format** + +Run: `cargo clippy -p signet-cold-sql --all-features --all-targets` +Run: `cargo +nightly fmt` +Expected: Clean. + +- [ ] **Step 5: Commit** + +```bash +git add crates/cold-sql/src/backend.rs crates/cold-sql/src/columns.rs +git commit -m "perf(cold-sql): consolidate get_receipt from 5 queries to 2-3" +``` + +--- + +### Task 5: `drain_above` Batch Override + +**Files:** +- Modify: `crates/cold-sql/src/backend.rs` (replace `impl ColdStorage for SqlColdBackend {}`, ~line 1403) + +- [ ] **Step 1: Replace empty ColdStorage impl with drain_above override** + +Replace `impl ColdStorage for SqlColdBackend {}` with: + +```rust +impl ColdStorage for SqlColdBackend { + async fn drain_above( + &mut self, + block: BlockNumber, + ) -> ColdResult>> { + let bn = to_i64(block); + let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?; + + // 1. Fetch all headers above block. + let header_rows = + sqlx::query("SELECT * FROM headers WHERE block_number > $1 ORDER BY block_number") + .bind(bn) + .fetch_all(&mut *tx) + .await + .map_err(SqlColdError::from)?; + + if header_rows.is_empty() { + tx.commit().await.map_err(SqlColdError::from)?; + return Ok(Vec::new()); + } + + let mut headers: std::collections::BTreeMap = + std::collections::BTreeMap::new(); + for r in &header_rows { + let num: i64 = r.get(COL_BLOCK_NUMBER); + let h = header_from_row(r).map_err(ColdStorageError::from)?.seal_slow(); + headers.insert(num, h); + } + + // 2. Fetch all receipt + tx metadata above block. + let receipt_rows = sqlx::query( + "SELECT r.*, t.tx_hash, t.from_address \ + FROM receipts r \ + JOIN transactions t ON r.block_number = t.block_number \ + AND r.tx_index = t.tx_index \ + WHERE r.block_number > $1 \ + ORDER BY r.block_number, r.tx_index", + ) + .bind(bn) + .fetch_all(&mut *tx) + .await + .map_err(SqlColdError::from)?; + + // 3. Fetch all logs above block. + let log_rows = sqlx::query( + "SELECT * FROM logs WHERE block_number > $1 \ + ORDER BY block_number, tx_index, log_index", + ) + .bind(bn) + .fetch_all(&mut *tx) + .await + .map_err(SqlColdError::from)?; + + // Group logs by (block_number, tx_index). + let mut logs_by_block_tx: std::collections::BTreeMap<(i64, i64), Vec> = + std::collections::BTreeMap::new(); + for r in &log_rows { + let block_num: i64 = r.get(COL_BLOCK_NUMBER); + let tx_idx: i64 = r.get(COL_TX_INDEX); + logs_by_block_tx.entry((block_num, tx_idx)).or_default().push(log_from_row(r)); + } + + // Group receipt rows by block_number. + let mut receipts_by_block: std::collections::BTreeMap> = + std::collections::BTreeMap::new(); + for r in &receipt_rows { + let block_num: i64 = r.get(COL_BLOCK_NUMBER); + receipts_by_block.entry(block_num).or_default().push(r); + } + + // 4. Assemble ColdReceipts per block. + let mut all_receipts = Vec::with_capacity(headers.len()); + for (&block_num, header) in &headers { + let block_receipt_rows = + receipts_by_block.remove(&block_num).unwrap_or_default(); + let mut first_log_index = 0u64; + let mut prior_cumulative_gas = 0u64; + let block_receipts: ColdResult> = block_receipt_rows + .into_iter() + .enumerate() + .map(|(idx, rr)| { + let tx_idx: i64 = rr.get(COL_TX_INDEX); + let tx_hash = rr.get(COL_TX_HASH); + let sender = rr.get(COL_FROM_ADDRESS); + let tx_type = rr.get::(COL_TX_TYPE) as i16; + let success = rr.get::(COL_SUCCESS) != 0; + let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED); + let logs = logs_by_block_tx + .remove(&(block_num, tx_idx)) + .unwrap_or_default(); + let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs) + .map_err(ColdStorageError::from)?; + let gas_used = + receipt.inner.cumulative_gas_used - prior_cumulative_gas; + prior_cumulative_gas = receipt.inner.cumulative_gas_used; + let ir = IndexedReceipt { + receipt, + tx_hash, + first_log_index, + gas_used, + sender, + }; + first_log_index += ir.receipt.inner.logs.len() as u64; + Ok(ColdReceipt::new(ir, header, idx as u64)) + }) + .collect(); + all_receipts.push(block_receipts?); + } + + // 5. Delete from all tables (same order as truncate_above). + for table in + ["logs", "transactions", "receipts", "signet_events", "zenith_headers", "headers"] + { + sqlx::query(&format!("DELETE FROM {table} WHERE block_number > $1")) + .bind(bn) + .execute(&mut *tx) + .await + .map_err(SqlColdError::from)?; + } + + tx.commit().await.map_err(SqlColdError::from)?; + Ok(all_receipts) + } +} +``` + +- [ ] **Step 2: Run conformance tests** + +Run: `cargo t -p signet-cold-sql --all-features` +Expected: All tests pass. The conformance suite tests `drain_above` (via `ColdStorage` trait). + +- [ ] **Step 3: Clippy and format** + +Run: `cargo clippy -p signet-cold-sql --all-features --all-targets` +Run: `cargo +nightly fmt` +Expected: Clean. + +- [ ] **Step 4: Commit** + +```bash +git add crates/cold-sql/src/backend.rs +git commit -m "perf(cold-sql): override drain_above to batch queries in single transaction" +``` + +--- + +### Task 6: Final Verification + +- [ ] **Step 1: Full clippy pass (both feature configurations)** + +Run: `cargo clippy -p signet-cold-sql --all-features --all-targets` +Run: `cargo clippy -p signet-cold-sql --no-default-features --all-targets` +Expected: Clean. + +- [ ] **Step 2: Format** + +Run: `cargo +nightly fmt` +Expected: No changes. + +- [ ] **Step 3: Full test suite** + +Run: `cargo t -p signet-cold-sql --all-features` +Expected: All tests pass. + +- [ ] **Step 4: PostgreSQL conformance (if available)** + +Run: `./scripts/test-postgres.sh` +Expected: All tests pass. + +- [ ] **Step 5: Workspace clippy check** + +Run: `cargo clippy --workspace --all-features --all-targets` +Expected: No new warnings from our changes. diff --git a/docs/superpowers/specs/2026-04-03-cold-sql-pool-and-query-fixes-design.md b/docs/superpowers/specs/2026-04-03-cold-sql-pool-and-query-fixes-design.md new file mode 100644 index 0000000..514ae11 --- /dev/null +++ b/docs/superpowers/specs/2026-04-03-cold-sql-pool-and-query-fixes-design.md @@ -0,0 +1,187 @@ +# Cold Storage SQL Backend: Pool Starvation & Query Fixes + +## Problem + +The cold storage task layer supports 64 concurrent readers and 8 stream +producers, but `SqlColdBackend::connect()` creates a **single-connection +pool** for all backends — including PostgreSQL. This serializes every +operation at the sqlx layer and creates several failure modes: + +- **Stream starvation**: A REPEATABLE READ streaming transaction holds the + sole connection for up to 60 seconds, blocking all writes and reads. +- **Write amplification**: The task drains all in-flight reads before each + write. With 64 readers serialized on one connection, drain time scales + linearly with queue depth. +- **Cascading backpressure**: Write latency climbs, the 256-slot write + channel fills, `dispatch_append_blocks` returns `Backpressure`, and the + node pipeline stalls. + +Several query patterns compound the problem with unnecessary round-trips. + +## Changes + +### 1. Pool Configuration + +**Files:** `crates/cold-sql/src/backend.rs`, `crates/cold-sql/src/connector.rs` + +#### `SqlColdBackend::connect()` + +Detect backend from URL prefix and apply appropriate defaults: + +- **SQLite** (`url.starts_with("sqlite:")`): `max_connections(1)` — required + for in-memory databases to share state across queries. +- **PostgreSQL** (everything else): `max_connections(10)`, + `acquire_timeout(5s)`. + +The `acquire_timeout` prevents indefinite hangs when PostgreSQL is +unresponsive. Without it, `pool.begin().await` blocks forever and the cold +storage task silently stalls. + +A new internal method `connect_with(url, overrides)` handles the actual pool +construction. `connect(url)` becomes a convenience wrapper passing no +overrides. + +#### `SqlConnector` Builder + +Add optional override fields: + +```rust +pub struct SqlConnector { + url: String, + max_connections: Option, + acquire_timeout: Option, +} +``` + +Builder methods: + +- `with_max_connections(n: u32) -> Self` +- `with_acquire_timeout(d: Duration) -> Self` + +The `ColdConnect::connect()` implementation passes these overrides through to +`SqlColdBackend::connect_with()`. Production callers (e.g., `StorageConfig` +in node-config) can tune pool size without constructing their own `AnyPool`. + +`from_env()` and `new(url)` use defaults (no overrides), preserving backward +compatibility. + +### 2. `get_logs`: LIMIT Instead of COUNT + SELECT + +**File:** `crates/cold-sql/src/backend.rs` (fn `get_logs`, ~line 1293) + +Replace the two-query pattern with a single query: + +```sql +SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, + (r.first_log_index + l.log_index) AS block_log_index +FROM logs l +JOIN headers h ON l.block_number = h.block_number +JOIN transactions t ON l.block_number = t.block_number AND l.tx_index = t.tx_index +JOIN receipts r ON l.block_number = r.block_number AND l.tx_index = r.tx_index +WHERE +ORDER BY l.block_number, l.tx_index, l.log_index +LIMIT +``` + +If the result has more than `max_logs` rows, return `TooManyLogs`. + +This is strictly better: + +- **Under limit**: 1 query instead of 2. +- **Over limit**: PostgreSQL stops after `max_logs + 1` rows. The COUNT + approach scans the entire result set. +- **No information lost**: `TooManyLogs { limit }` only reports the limit, + not the actual count. + +The LIMIT parameter is appended as the last bind parameter after the +address/topic filter params. `build_log_filter_clause` already returns the +next available parameter index. + +### 3. `get_receipt`: Consolidate to 2-3 Queries + +**File:** `crates/cold-sql/src/backend.rs` (fn `get_receipt`, ~line 1068) + +Merge the header fetch, receipt data, and prior cumulative gas into one query: + +```sql +SELECT r.*, t.tx_hash, t.from_address, + h.block_hash AS h_block_hash, h.parent_hash AS h_parent_hash, ..., + COALESCE( + (SELECT MAX(r2.cumulative_gas_used) + FROM receipts r2 + WHERE r2.block_number = r.block_number + AND r2.tx_index < r.tx_index), + 0 + ) AS prior_gas +FROM receipts r +JOIN transactions t ON r.block_number = t.block_number AND r.tx_index = t.tx_index +JOIN headers h ON r.block_number = h.block_number +WHERE r.block_number = $1 AND r.tx_index = $2 +``` + +Then fetch logs in a second query. Total: 2 queries for block+index +specifier, 3 for TxHash specifier (needs initial hash resolve). + +Header columns are aliased with an `h_` prefix to avoid collisions with +receipt columns. A new `receipt_with_header_from_row` helper extracts both +types from the combined row. + +### 4. `drain_above`: Batch Override + +**File:** `crates/cold-sql/src/backend.rs` (impl `ColdStorage`) + +Replace the empty `impl ColdStorage for SqlColdBackend {}` with an explicit +`drain_above` override. Single transaction: + +1. Fetch all headers where `block_number > $1` (1 query) +2. Fetch all receipts + tx_hash + from_address where `block_number > $1` + (1 query with JOINs) +3. Fetch all logs where `block_number > $1` (1 query) +4. Group in memory by block number, construct `Vec>` +5. DELETE from 6 tables where `block_number > $1` (6 queries) +6. COMMIT + +Total: 9 queries in one transaction regardless of block count. The default +implementation does `3N + 7` queries for N blocks. + +Receipt and log grouping reuses the same in-memory grouping logic from +`get_receipts_in_block`. Extract the shared grouping code into a helper +function. + +### 5. Integer Conversion Safety + +**File:** `crates/cold-sql/src/convert.rs` + +Drop `const` from `to_i64` and `from_i64` and add debug assertions: + +- `to_i64(v: u64)`: `debug_assert!(v <= i64::MAX as u64)` +- `from_i64(v: i64)`: `debug_assert!(v >= 0)` + +These catch data corruption in debug/test builds with zero runtime cost in +release. + +**File:** `crates/cold-sql/src/backend.rs` (~line 337) + +Replace `r.get::(COL_TX_TYPE) as u8` with checked conversion: + +```rust +let raw: i32 = r.get(COL_TX_TYPE); +let tx_type_u8: u8 = raw.try_into() + .map_err(|_| SqlColdError::Convert(format!("tx_type out of range: {raw}")))?; +``` + +## Files Modified + +| File | Changes | +|------|---------| +| `crates/cold-sql/src/backend.rs` | Pool defaults, get_logs LIMIT, get_receipt consolidation, drain_above override, tx_type safety | +| `crates/cold-sql/src/connector.rs` | Builder fields + methods for pool configuration | +| `crates/cold-sql/src/convert.rs` | Debug assertions on to_i64 / from_i64 | + +## Verification + +1. `cargo t -p signet-cold-sql --all-features` — SQLite conformance +2. `./scripts/test-postgres.sh` — PostgreSQL conformance +3. `cargo clippy -p signet-cold-sql --all-features --all-targets` +4. `cargo clippy -p signet-cold-sql --no-default-features --all-targets` +5. `cargo +nightly fmt` From 78f78dafc7cdca8778f30a5af02723e42e1e1cd4 Mon Sep 17 00:00:00 2001 From: James Date: Mon, 6 Apr 2026 12:53:47 -0400 Subject: [PATCH 2/2] chore: remove docs files and add docs/ to gitignore Co-Authored-By: Claude Opus 4.6 (1M context) --- .gitignore | 3 +- ...026-04-03-cold-sql-pool-and-query-fixes.md | 672 ------------------ ...03-cold-sql-pool-and-query-fixes-design.md | 187 ----- 3 files changed, 2 insertions(+), 860 deletions(-) delete mode 100644 docs/superpowers/plans/2026-04-03-cold-sql-pool-and-query-fixes.md delete mode 100644 docs/superpowers/specs/2026-04-03-cold-sql-pool-and-query-fixes-design.md diff --git a/.gitignore b/.gitignore index 4460a27..46490fe 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ Cargo.lock .idea/ .claude/* -!.claude/skills/ \ No newline at end of file +!.claude/skills/ +docs/ \ No newline at end of file diff --git a/docs/superpowers/plans/2026-04-03-cold-sql-pool-and-query-fixes.md b/docs/superpowers/plans/2026-04-03-cold-sql-pool-and-query-fixes.md deleted file mode 100644 index 2a6ca08..0000000 --- a/docs/superpowers/plans/2026-04-03-cold-sql-pool-and-query-fixes.md +++ /dev/null @@ -1,672 +0,0 @@ -# Cold SQL Pool & Query Fixes Implementation Plan - -> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. - -**Goal:** Fix single-connection pool starvation and query inefficiencies in the SQL cold storage backend. - -**Architecture:** All changes are in `signet-cold-sql`. Pool configuration is split between `backend.rs` (defaults) and `connector.rs` (builder). Query optimizations touch `backend.rs` only. Integer safety touches `convert.rs` and `backend.rs`. Each task is independently testable via the existing conformance suite. - -**Tech Stack:** Rust, sqlx (AnyPool), PostgreSQL, SQLite - -**Spec:** `docs/superpowers/specs/2026-04-03-cold-sql-pool-and-query-fixes-design.md` - ---- - -### Task 1: Integer Conversion Safety - -**Files:** -- Modify: `crates/cold-sql/src/convert.rs:22-30` -- Modify: `crates/cold-sql/src/backend.rs:337-339` - -- [ ] **Step 1: Fix `to_i64` and `from_i64` in convert.rs** - -Replace the bare `as` casts with debug-asserted conversions: - -```rust -/// Convert u64 to i64 for SQL storage. -/// -/// # Panics -/// -/// Debug-asserts that `v` fits in an `i64`. All block numbers, gas -/// values, and indices in Ethereum are well below `i64::MAX`; a value -/// that overflows indicates data corruption. -pub(crate) fn to_i64(v: u64) -> i64 { - debug_assert!(v <= i64::MAX as u64, "u64 value {v} overflows i64"); - v as i64 -} - -/// Convert i64 from SQL back to u64. -/// -/// # Panics -/// -/// Debug-asserts that `v` is non-negative. Negative values from the -/// database indicate data corruption. -pub(crate) fn from_i64(v: i64) -> u64 { - debug_assert!(v >= 0, "negative i64 value {v} cannot represent u64"); - v as u64 -} -``` - -Drop `const` — these are never used in const contexts and `debug_assert!` isn't available in const fns. - -- [ ] **Step 2: Fix `tx_type` truncation in backend.rs** - -In `tx_from_row` (~line 337), replace: - -```rust - let tx_type_raw = r.get::(COL_TX_TYPE) as u8; - let tx_type = TxType::try_from(tx_type_raw) - .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type_raw}")))?; -``` - -With: - -```rust - let tx_type_raw: i32 = r.get(COL_TX_TYPE); - let tx_type_u8: u8 = tx_type_raw - .try_into() - .map_err(|_| SqlColdError::Convert(format!("tx_type out of u8 range: {tx_type_raw}")))?; - let tx_type = TxType::try_from(tx_type_u8) - .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type_u8}")))?; -``` - -- [ ] **Step 3: Run conformance tests** - -Run: `cargo t -p signet-cold-sql --all-features` -Expected: All tests pass (the assertions only fire in debug builds, and valid data won't trigger them). - -- [ ] **Step 4: Run clippy** - -Run: `cargo clippy -p signet-cold-sql --all-features --all-targets` -Run: `cargo clippy -p signet-cold-sql --no-default-features --all-targets` -Expected: No new warnings. - -- [ ] **Step 5: Commit** - -```bash -git add crates/cold-sql/src/convert.rs crates/cold-sql/src/backend.rs -git commit -m "fix(cold-sql): add safety checks to integer conversions" -``` - ---- - -### Task 2: Pool Configuration - -**Files:** -- Modify: `crates/cold-sql/src/backend.rs:107-119` -- Modify: `crates/cold-sql/src/connector.rs` - -- [ ] **Step 1: Add `connect_with` to SqlColdBackend** - -In `backend.rs`, add a `use std::time::Duration;` import at the top alongside the existing imports. - -Then replace the existing `connect` method and add `connect_with`: - -```rust - /// Pool configuration overrides for [`connect_with`](Self::connect_with). - /// - /// `None` values use backend-specific defaults: - /// - SQLite: `max_connections = 1`, `acquire_timeout = 5s` - /// - PostgreSQL: `max_connections = 10`, `acquire_timeout = 5s` - #[derive(Debug, Clone, Default)] - pub struct PoolOverrides { - /// Override the maximum number of connections in the pool. - pub max_connections: Option, - /// Override the connection acquire timeout. - pub acquire_timeout: Option, - } -``` - -Note: `PoolOverrides` should be defined as a top-level struct in `backend.rs` (outside the `impl` block), and re-exported from `lib.rs`. - -```rust - /// Connect to a database URL with explicit pool overrides. - /// - /// Installs the default sqlx drivers on the first call. The database - /// type is inferred from the URL scheme (`sqlite:` or `postgres:`). - /// - /// # Pool Defaults - /// - /// - **SQLite**: `max_connections = 1` (required for in-memory databases - /// to share state), `acquire_timeout = 5s`. - /// - **PostgreSQL**: `max_connections = 10`, `acquire_timeout = 5s`. - /// - /// Override any default by setting the corresponding field in - /// `overrides`. - pub async fn connect_with( - url: &str, - overrides: PoolOverrides, - ) -> Result { - sqlx::any::install_default_drivers(); - let is_sqlite = url.starts_with("sqlite:"); - let default_max = if is_sqlite { 1 } else { 10 }; - let default_timeout = Duration::from_secs(5); - let pool: AnyPool = sqlx::pool::PoolOptions::new() - .max_connections(overrides.max_connections.unwrap_or(default_max)) - .acquire_timeout(overrides.acquire_timeout.unwrap_or(default_timeout)) - .connect(url) - .await?; - Self::new(pool).await - } - - /// Connect to a database URL with default pool settings. - /// - /// Convenience wrapper around [`connect_with`](Self::connect_with). - /// See that method for default pool sizes per backend. - pub async fn connect(url: &str) -> Result { - Self::connect_with(url, PoolOverrides::default()).await - } -``` - -- [ ] **Step 2: Re-export PoolOverrides from lib.rs** - -In `lib.rs`, add the re-export alongside `SqlColdBackend`: - -```rust -#[cfg(any(feature = "sqlite", feature = "postgres"))] -pub use backend::{PoolOverrides, SqlColdBackend}; -``` - -- [ ] **Step 3: Add builder methods to SqlConnector** - -In `connector.rs`, update `SqlConnector` to carry pool overrides: - -```rust -use crate::{PoolOverrides, SqlColdBackend, SqlColdError}; -use signet_cold::ColdConnect; -use std::time::Duration; - -// ... SqlConnectorError stays unchanged ... - -#[cfg(any(feature = "sqlite", feature = "postgres"))] -#[derive(Debug, Clone)] -pub struct SqlConnector { - url: String, - overrides: PoolOverrides, -} - -#[cfg(any(feature = "sqlite", feature = "postgres"))] -impl SqlConnector { - /// Create a new SQL connector. - /// - /// The database type is detected from the URL prefix. Pool settings - /// use backend-specific defaults. Use [`with_max_connections`] and - /// [`with_acquire_timeout`] to override. - /// - /// [`with_max_connections`]: Self::with_max_connections - /// [`with_acquire_timeout`]: Self::with_acquire_timeout - pub fn new(url: impl Into) -> Self { - Self { url: url.into(), overrides: PoolOverrides::default() } - } - - /// Get a reference to the connection URL. - pub fn url(&self) -> &str { - &self.url - } - - /// Override the maximum number of pool connections. - /// - /// Default: 1 for SQLite, 10 for PostgreSQL. - pub fn with_max_connections(mut self, n: u32) -> Self { - self.overrides.max_connections = Some(n); - self - } - - /// Override the connection acquire timeout. - /// - /// Default: 5 seconds for all backends. - pub fn with_acquire_timeout(mut self, timeout: Duration) -> Self { - self.overrides.acquire_timeout = Some(timeout); - self - } - - /// Create a connector from environment variables. - /// - /// Reads the SQL URL from the specified environment variable. - /// Uses default pool settings. - pub fn from_env(env_var: &'static str) -> Result { - let url = - std::env::var(env_var).map_err(|_| SqlConnectorError::MissingEnvVar(env_var))?; - Ok(Self::new(url)) - } -} - -#[cfg(any(feature = "sqlite", feature = "postgres"))] -impl ColdConnect for SqlConnector { - type Cold = SqlColdBackend; - type Error = SqlColdError; - - fn connect( - &self, - ) -> impl std::future::Future> + Send { - let url = self.url.clone(); - let overrides = self.overrides.clone(); - async move { SqlColdBackend::connect_with(&url, overrides).await } - } -} -``` - -- [ ] **Step 4: Run conformance tests** - -Run: `cargo t -p signet-cold-sql --all-features` -Expected: All tests pass. SQLite still uses 1 connection. - -- [ ] **Step 5: Run clippy and format** - -Run: `cargo clippy -p signet-cold-sql --all-features --all-targets` -Run: `cargo clippy -p signet-cold-sql --no-default-features --all-targets` -Run: `cargo +nightly fmt` -Expected: Clean. - -- [ ] **Step 6: Commit** - -```bash -git add crates/cold-sql/src/backend.rs crates/cold-sql/src/connector.rs crates/cold-sql/src/lib.rs -git commit -m "fix(cold-sql): use per-backend pool defaults and add SqlConnector builder" -``` - ---- - -### Task 3: `get_logs` LIMIT Optimization - -**Files:** -- Modify: `crates/cold-sql/src/backend.rs` (fn `get_logs`, ~line 1293) - -- [ ] **Step 1: Replace COUNT + SELECT with LIMIT** - -Replace the entire `get_logs` implementation: - -```rust - async fn get_logs(&self, filter: &Filter, max_logs: usize) -> ColdResult> { - let from = filter.get_from_block().unwrap_or(0); - let to = filter.get_to_block().unwrap_or(u64::MAX); - - // Build WHERE clause: block range ($1, $2) + address/topic filters. - let (filter_clause, params) = build_log_filter_clause(filter, 3); - let where_clause = format!("l.block_number >= $1 AND l.block_number <= $2{filter_clause}"); - - // Use LIMIT to cap results. Fetch one extra row to detect overflow - // without a separate COUNT query. PostgreSQL stops scanning after - // finding enough rows, making this faster than COUNT in both the - // under-limit and over-limit cases. - let limit_idx = 3 + params.len() as u32; - let data_sql = format!( - "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \ - (r.first_log_index + l.log_index) AS block_log_index \ - FROM logs l \ - JOIN headers h ON l.block_number = h.block_number \ - JOIN transactions t ON l.block_number = t.block_number \ - AND l.tx_index = t.tx_index \ - JOIN receipts r ON l.block_number = r.block_number \ - AND l.tx_index = r.tx_index \ - WHERE {where_clause} \ - ORDER BY l.block_number, l.tx_index, l.log_index \ - LIMIT ${limit_idx}" - ); - let mut query = sqlx::query(&data_sql).bind(to_i64(from)).bind(to_i64(to)); - for param in ¶ms { - query = query.bind(*param); - } - let limit = max_logs + 1; - query = query.bind(to_i64(limit as u64)); - - let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?; - - if rows.len() > max_logs { - return Err(ColdStorageError::TooManyLogs { limit: max_logs }); - } - - rows.into_iter() - .map(|r| { - let log = log_from_row(&r); - Ok(RpcLog { - inner: log, - block_hash: Some(r.get(COL_BLOCK_HASH)), - block_number: Some(from_i64(r.get::(COL_BLOCK_NUMBER))), - block_timestamp: Some(from_i64(r.get::(COL_BLOCK_TIMESTAMP))), - transaction_hash: Some(r.get(COL_TX_HASH)), - transaction_index: Some(from_i64(r.get::(COL_TX_INDEX))), - log_index: Some(from_i64(r.get::(COL_BLOCK_LOG_INDEX))), - removed: false, - }) - }) - .collect::>>() - } -``` - -- [ ] **Step 2: Run conformance tests** - -Run: `cargo t -p signet-cold-sql --all-features` -Expected: All tests pass. The conformance suite exercises `get_logs` with limits. - -- [ ] **Step 3: Clippy** - -Run: `cargo clippy -p signet-cold-sql --all-features --all-targets` -Expected: Clean. - -- [ ] **Step 4: Commit** - -```bash -git add crates/cold-sql/src/backend.rs -git commit -m "perf(cold-sql): replace get_logs COUNT+SELECT with single LIMIT query" -``` - ---- - -### Task 4: `get_receipt` Query Consolidation - -**Files:** -- Modify: `crates/cold-sql/src/columns.rs` -- Modify: `crates/cold-sql/src/backend.rs` (fn `get_receipt`, ~line 1068) - -- [ ] **Step 1: Add aliased column constants** - -In `columns.rs`, add at the bottom (in the "query-specific aliases" section): - -```rust -// ── get_receipt combined query aliases ────────────────────────────────────── -pub(crate) const COL_R_TX_TYPE: &str = "r_tx_type"; -pub(crate) const COL_R_SUCCESS: &str = "r_success"; -pub(crate) const COL_R_CUMULATIVE_GAS_USED: &str = "r_cumulative_gas_used"; -pub(crate) const COL_R_FIRST_LOG_INDEX: &str = "r_first_log_index"; -pub(crate) const COL_R_TX_HASH: &str = "r_tx_hash"; -pub(crate) const COL_R_FROM_ADDRESS: &str = "r_from_address"; -pub(crate) const COL_R_TX_INDEX: &str = "r_tx_index"; -``` - -Add the corresponding imports in `backend.rs` where the other column imports are (~line 8-21). - -- [ ] **Step 2: Replace `get_receipt` implementation** - -Replace the `get_receipt` body with a consolidated version. The combined query selects `h.*` (header columns keep standard names for reuse with `header_from_row`), receipt/tx columns with `r_` prefix aliases, and a correlated subquery for `prior_gas`: - -```rust - async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult> { - // Resolve to (block, index) - let (block, index) = match spec { - ReceiptSpecifier::TxHash(hash) => { - let row = sqlx::query( - "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1", - ) - .bind(hash) - .fetch_optional(&self.pool) - .await - .map_err(SqlColdError::from)?; - let Some(r) = row else { return Ok(None) }; - ( - from_i64(r.get::(COL_BLOCK_NUMBER)), - from_i64(r.get::(COL_TX_INDEX)), - ) - } - ReceiptSpecifier::BlockAndIndex { block, index } => (block, index), - }; - - // Combined query: receipt + tx metadata + full header + prior gas. - // Header columns use standard names (h.*) so header_from_row works. - // Receipt/tx columns use r_ prefix to avoid collisions. - let combined = sqlx::query( - "SELECT h.*, \ - r.tx_type AS r_tx_type, r.success AS r_success, \ - r.cumulative_gas_used AS r_cumulative_gas_used, \ - r.first_log_index AS r_first_log_index, \ - r.tx_index AS r_tx_index, \ - t.tx_hash AS r_tx_hash, t.from_address AS r_from_address, \ - COALESCE( \ - (SELECT CAST(MAX(r2.cumulative_gas_used) AS bigint) \ - FROM receipts r2 \ - WHERE r2.block_number = r.block_number \ - AND r2.tx_index < r.tx_index), \ - 0 \ - ) AS prior_gas \ - FROM receipts r \ - JOIN transactions t ON r.block_number = t.block_number \ - AND r.tx_index = t.tx_index \ - JOIN headers h ON r.block_number = h.block_number \ - WHERE r.block_number = $1 AND r.tx_index = $2", - ) - .bind(to_i64(block)) - .bind(to_i64(index)) - .fetch_optional(&self.pool) - .await - .map_err(SqlColdError::from)?; - - let Some(rr) = combined else { - return Ok(None); - }; - - // Extract header using existing helper (h.* columns are unaliased). - let header = header_from_row(&rr).map_err(ColdStorageError::from)?.seal_slow(); - - // Extract receipt fields from r_ prefixed aliases. - let tx_hash = rr.get(COL_R_TX_HASH); - let sender = rr.get(COL_R_FROM_ADDRESS); - let tx_type = rr.get::(COL_R_TX_TYPE) as i16; - let success = rr.get::(COL_R_SUCCESS) != 0; - let cumulative_gas_used: i64 = rr.get(COL_R_CUMULATIVE_GAS_USED); - let first_log_index: u64 = from_i64(rr.get::(COL_R_FIRST_LOG_INDEX)); - let prior_cumulative_gas: u64 = - rr.get::, _>(COL_PRIOR_GAS).unwrap_or(0) as u64; - - // Logs still require a separate query (variable row count). - let log_rows = sqlx::query( - "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index", - ) - .bind(to_i64(block)) - .bind(to_i64(index)) - .fetch_all(&self.pool) - .await - .map_err(SqlColdError::from)?; - - let logs = log_rows.iter().map(log_from_row).collect(); - let built = build_receipt(tx_type, success, cumulative_gas_used, logs) - .map_err(ColdStorageError::from)?; - let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas; - - let ir = IndexedReceipt { receipt: built, tx_hash, first_log_index, gas_used, sender }; - Ok(Some(ColdReceipt::new(ir, &header, index))) - } -``` - -- [ ] **Step 3: Run conformance tests** - -Run: `cargo t -p signet-cold-sql --all-features` -Expected: All tests pass. - -- [ ] **Step 4: Clippy and format** - -Run: `cargo clippy -p signet-cold-sql --all-features --all-targets` -Run: `cargo +nightly fmt` -Expected: Clean. - -- [ ] **Step 5: Commit** - -```bash -git add crates/cold-sql/src/backend.rs crates/cold-sql/src/columns.rs -git commit -m "perf(cold-sql): consolidate get_receipt from 5 queries to 2-3" -``` - ---- - -### Task 5: `drain_above` Batch Override - -**Files:** -- Modify: `crates/cold-sql/src/backend.rs` (replace `impl ColdStorage for SqlColdBackend {}`, ~line 1403) - -- [ ] **Step 1: Replace empty ColdStorage impl with drain_above override** - -Replace `impl ColdStorage for SqlColdBackend {}` with: - -```rust -impl ColdStorage for SqlColdBackend { - async fn drain_above( - &mut self, - block: BlockNumber, - ) -> ColdResult>> { - let bn = to_i64(block); - let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?; - - // 1. Fetch all headers above block. - let header_rows = - sqlx::query("SELECT * FROM headers WHERE block_number > $1 ORDER BY block_number") - .bind(bn) - .fetch_all(&mut *tx) - .await - .map_err(SqlColdError::from)?; - - if header_rows.is_empty() { - tx.commit().await.map_err(SqlColdError::from)?; - return Ok(Vec::new()); - } - - let mut headers: std::collections::BTreeMap = - std::collections::BTreeMap::new(); - for r in &header_rows { - let num: i64 = r.get(COL_BLOCK_NUMBER); - let h = header_from_row(r).map_err(ColdStorageError::from)?.seal_slow(); - headers.insert(num, h); - } - - // 2. Fetch all receipt + tx metadata above block. - let receipt_rows = sqlx::query( - "SELECT r.*, t.tx_hash, t.from_address \ - FROM receipts r \ - JOIN transactions t ON r.block_number = t.block_number \ - AND r.tx_index = t.tx_index \ - WHERE r.block_number > $1 \ - ORDER BY r.block_number, r.tx_index", - ) - .bind(bn) - .fetch_all(&mut *tx) - .await - .map_err(SqlColdError::from)?; - - // 3. Fetch all logs above block. - let log_rows = sqlx::query( - "SELECT * FROM logs WHERE block_number > $1 \ - ORDER BY block_number, tx_index, log_index", - ) - .bind(bn) - .fetch_all(&mut *tx) - .await - .map_err(SqlColdError::from)?; - - // Group logs by (block_number, tx_index). - let mut logs_by_block_tx: std::collections::BTreeMap<(i64, i64), Vec> = - std::collections::BTreeMap::new(); - for r in &log_rows { - let block_num: i64 = r.get(COL_BLOCK_NUMBER); - let tx_idx: i64 = r.get(COL_TX_INDEX); - logs_by_block_tx.entry((block_num, tx_idx)).or_default().push(log_from_row(r)); - } - - // Group receipt rows by block_number. - let mut receipts_by_block: std::collections::BTreeMap> = - std::collections::BTreeMap::new(); - for r in &receipt_rows { - let block_num: i64 = r.get(COL_BLOCK_NUMBER); - receipts_by_block.entry(block_num).or_default().push(r); - } - - // 4. Assemble ColdReceipts per block. - let mut all_receipts = Vec::with_capacity(headers.len()); - for (&block_num, header) in &headers { - let block_receipt_rows = - receipts_by_block.remove(&block_num).unwrap_or_default(); - let mut first_log_index = 0u64; - let mut prior_cumulative_gas = 0u64; - let block_receipts: ColdResult> = block_receipt_rows - .into_iter() - .enumerate() - .map(|(idx, rr)| { - let tx_idx: i64 = rr.get(COL_TX_INDEX); - let tx_hash = rr.get(COL_TX_HASH); - let sender = rr.get(COL_FROM_ADDRESS); - let tx_type = rr.get::(COL_TX_TYPE) as i16; - let success = rr.get::(COL_SUCCESS) != 0; - let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED); - let logs = logs_by_block_tx - .remove(&(block_num, tx_idx)) - .unwrap_or_default(); - let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs) - .map_err(ColdStorageError::from)?; - let gas_used = - receipt.inner.cumulative_gas_used - prior_cumulative_gas; - prior_cumulative_gas = receipt.inner.cumulative_gas_used; - let ir = IndexedReceipt { - receipt, - tx_hash, - first_log_index, - gas_used, - sender, - }; - first_log_index += ir.receipt.inner.logs.len() as u64; - Ok(ColdReceipt::new(ir, header, idx as u64)) - }) - .collect(); - all_receipts.push(block_receipts?); - } - - // 5. Delete from all tables (same order as truncate_above). - for table in - ["logs", "transactions", "receipts", "signet_events", "zenith_headers", "headers"] - { - sqlx::query(&format!("DELETE FROM {table} WHERE block_number > $1")) - .bind(bn) - .execute(&mut *tx) - .await - .map_err(SqlColdError::from)?; - } - - tx.commit().await.map_err(SqlColdError::from)?; - Ok(all_receipts) - } -} -``` - -- [ ] **Step 2: Run conformance tests** - -Run: `cargo t -p signet-cold-sql --all-features` -Expected: All tests pass. The conformance suite tests `drain_above` (via `ColdStorage` trait). - -- [ ] **Step 3: Clippy and format** - -Run: `cargo clippy -p signet-cold-sql --all-features --all-targets` -Run: `cargo +nightly fmt` -Expected: Clean. - -- [ ] **Step 4: Commit** - -```bash -git add crates/cold-sql/src/backend.rs -git commit -m "perf(cold-sql): override drain_above to batch queries in single transaction" -``` - ---- - -### Task 6: Final Verification - -- [ ] **Step 1: Full clippy pass (both feature configurations)** - -Run: `cargo clippy -p signet-cold-sql --all-features --all-targets` -Run: `cargo clippy -p signet-cold-sql --no-default-features --all-targets` -Expected: Clean. - -- [ ] **Step 2: Format** - -Run: `cargo +nightly fmt` -Expected: No changes. - -- [ ] **Step 3: Full test suite** - -Run: `cargo t -p signet-cold-sql --all-features` -Expected: All tests pass. - -- [ ] **Step 4: PostgreSQL conformance (if available)** - -Run: `./scripts/test-postgres.sh` -Expected: All tests pass. - -- [ ] **Step 5: Workspace clippy check** - -Run: `cargo clippy --workspace --all-features --all-targets` -Expected: No new warnings from our changes. diff --git a/docs/superpowers/specs/2026-04-03-cold-sql-pool-and-query-fixes-design.md b/docs/superpowers/specs/2026-04-03-cold-sql-pool-and-query-fixes-design.md deleted file mode 100644 index 514ae11..0000000 --- a/docs/superpowers/specs/2026-04-03-cold-sql-pool-and-query-fixes-design.md +++ /dev/null @@ -1,187 +0,0 @@ -# Cold Storage SQL Backend: Pool Starvation & Query Fixes - -## Problem - -The cold storage task layer supports 64 concurrent readers and 8 stream -producers, but `SqlColdBackend::connect()` creates a **single-connection -pool** for all backends — including PostgreSQL. This serializes every -operation at the sqlx layer and creates several failure modes: - -- **Stream starvation**: A REPEATABLE READ streaming transaction holds the - sole connection for up to 60 seconds, blocking all writes and reads. -- **Write amplification**: The task drains all in-flight reads before each - write. With 64 readers serialized on one connection, drain time scales - linearly with queue depth. -- **Cascading backpressure**: Write latency climbs, the 256-slot write - channel fills, `dispatch_append_blocks` returns `Backpressure`, and the - node pipeline stalls. - -Several query patterns compound the problem with unnecessary round-trips. - -## Changes - -### 1. Pool Configuration - -**Files:** `crates/cold-sql/src/backend.rs`, `crates/cold-sql/src/connector.rs` - -#### `SqlColdBackend::connect()` - -Detect backend from URL prefix and apply appropriate defaults: - -- **SQLite** (`url.starts_with("sqlite:")`): `max_connections(1)` — required - for in-memory databases to share state across queries. -- **PostgreSQL** (everything else): `max_connections(10)`, - `acquire_timeout(5s)`. - -The `acquire_timeout` prevents indefinite hangs when PostgreSQL is -unresponsive. Without it, `pool.begin().await` blocks forever and the cold -storage task silently stalls. - -A new internal method `connect_with(url, overrides)` handles the actual pool -construction. `connect(url)` becomes a convenience wrapper passing no -overrides. - -#### `SqlConnector` Builder - -Add optional override fields: - -```rust -pub struct SqlConnector { - url: String, - max_connections: Option, - acquire_timeout: Option, -} -``` - -Builder methods: - -- `with_max_connections(n: u32) -> Self` -- `with_acquire_timeout(d: Duration) -> Self` - -The `ColdConnect::connect()` implementation passes these overrides through to -`SqlColdBackend::connect_with()`. Production callers (e.g., `StorageConfig` -in node-config) can tune pool size without constructing their own `AnyPool`. - -`from_env()` and `new(url)` use defaults (no overrides), preserving backward -compatibility. - -### 2. `get_logs`: LIMIT Instead of COUNT + SELECT - -**File:** `crates/cold-sql/src/backend.rs` (fn `get_logs`, ~line 1293) - -Replace the two-query pattern with a single query: - -```sql -SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, - (r.first_log_index + l.log_index) AS block_log_index -FROM logs l -JOIN headers h ON l.block_number = h.block_number -JOIN transactions t ON l.block_number = t.block_number AND l.tx_index = t.tx_index -JOIN receipts r ON l.block_number = r.block_number AND l.tx_index = r.tx_index -WHERE -ORDER BY l.block_number, l.tx_index, l.log_index -LIMIT -``` - -If the result has more than `max_logs` rows, return `TooManyLogs`. - -This is strictly better: - -- **Under limit**: 1 query instead of 2. -- **Over limit**: PostgreSQL stops after `max_logs + 1` rows. The COUNT - approach scans the entire result set. -- **No information lost**: `TooManyLogs { limit }` only reports the limit, - not the actual count. - -The LIMIT parameter is appended as the last bind parameter after the -address/topic filter params. `build_log_filter_clause` already returns the -next available parameter index. - -### 3. `get_receipt`: Consolidate to 2-3 Queries - -**File:** `crates/cold-sql/src/backend.rs` (fn `get_receipt`, ~line 1068) - -Merge the header fetch, receipt data, and prior cumulative gas into one query: - -```sql -SELECT r.*, t.tx_hash, t.from_address, - h.block_hash AS h_block_hash, h.parent_hash AS h_parent_hash, ..., - COALESCE( - (SELECT MAX(r2.cumulative_gas_used) - FROM receipts r2 - WHERE r2.block_number = r.block_number - AND r2.tx_index < r.tx_index), - 0 - ) AS prior_gas -FROM receipts r -JOIN transactions t ON r.block_number = t.block_number AND r.tx_index = t.tx_index -JOIN headers h ON r.block_number = h.block_number -WHERE r.block_number = $1 AND r.tx_index = $2 -``` - -Then fetch logs in a second query. Total: 2 queries for block+index -specifier, 3 for TxHash specifier (needs initial hash resolve). - -Header columns are aliased with an `h_` prefix to avoid collisions with -receipt columns. A new `receipt_with_header_from_row` helper extracts both -types from the combined row. - -### 4. `drain_above`: Batch Override - -**File:** `crates/cold-sql/src/backend.rs` (impl `ColdStorage`) - -Replace the empty `impl ColdStorage for SqlColdBackend {}` with an explicit -`drain_above` override. Single transaction: - -1. Fetch all headers where `block_number > $1` (1 query) -2. Fetch all receipts + tx_hash + from_address where `block_number > $1` - (1 query with JOINs) -3. Fetch all logs where `block_number > $1` (1 query) -4. Group in memory by block number, construct `Vec>` -5. DELETE from 6 tables where `block_number > $1` (6 queries) -6. COMMIT - -Total: 9 queries in one transaction regardless of block count. The default -implementation does `3N + 7` queries for N blocks. - -Receipt and log grouping reuses the same in-memory grouping logic from -`get_receipts_in_block`. Extract the shared grouping code into a helper -function. - -### 5. Integer Conversion Safety - -**File:** `crates/cold-sql/src/convert.rs` - -Drop `const` from `to_i64` and `from_i64` and add debug assertions: - -- `to_i64(v: u64)`: `debug_assert!(v <= i64::MAX as u64)` -- `from_i64(v: i64)`: `debug_assert!(v >= 0)` - -These catch data corruption in debug/test builds with zero runtime cost in -release. - -**File:** `crates/cold-sql/src/backend.rs` (~line 337) - -Replace `r.get::(COL_TX_TYPE) as u8` with checked conversion: - -```rust -let raw: i32 = r.get(COL_TX_TYPE); -let tx_type_u8: u8 = raw.try_into() - .map_err(|_| SqlColdError::Convert(format!("tx_type out of range: {raw}")))?; -``` - -## Files Modified - -| File | Changes | -|------|---------| -| `crates/cold-sql/src/backend.rs` | Pool defaults, get_logs LIMIT, get_receipt consolidation, drain_above override, tx_type safety | -| `crates/cold-sql/src/connector.rs` | Builder fields + methods for pool configuration | -| `crates/cold-sql/src/convert.rs` | Debug assertions on to_i64 / from_i64 | - -## Verification - -1. `cargo t -p signet-cold-sql --all-features` — SQLite conformance -2. `./scripts/test-postgres.sh` — PostgreSQL conformance -3. `cargo clippy -p signet-cold-sql --all-features --all-targets` -4. `cargo clippy -p signet-cold-sql --no-default-features --all-targets` -5. `cargo +nightly fmt`