From c919554243bf4924a6e35ff6abd3dc3eb83f653d Mon Sep 17 00:00:00 2001 From: Blake Date: Wed, 11 Feb 2026 09:12:33 +0800 Subject: [PATCH 1/5] feat: implement sequencer set hash monitoring --- Cargo.lock | 1 + crates/cbft-rpc/Cargo.toml | 1 + crates/cbft-rpc/src/lib.rs | 74 ++++ ...eate_sequencer_set_hash_monitor_tables.sql | 22 ++ crates/store/src/localdb.rs | 233 +++++++++++- crates/store/src/schema.rs | 20 + node/src/bin/sequencer-set-publish.rs | 50 ++- node/src/lib.rs | 4 +- node/src/main.rs | 63 +++- node/src/scheduled_tasks/mod.rs | 2 + .../sequencer_set_hash_monitor_task.rs | 357 ++++++++++++++++++ 11 files changed, 814 insertions(+), 13 deletions(-) create mode 100644 crates/store/migrations/20260206160000_create_sequencer_set_hash_monitor_tables.sql create mode 100644 node/src/scheduled_tasks/sequencer_set_hash_monitor_task.rs diff --git a/Cargo.lock b/Cargo.lock index bdcee9a7..0704ab4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2930,6 +2930,7 @@ name = "cbft-rpc" version = "0.3.3" dependencies = [ "anyhow", + "async-trait", "base64 0.21.7", "bitcoin-light-client-circuit", "commit-chain", diff --git a/crates/cbft-rpc/Cargo.toml b/crates/cbft-rpc/Cargo.toml index ed105e64..605fbce4 100644 --- a/crates/cbft-rpc/Cargo.toml +++ b/crates/cbft-rpc/Cargo.toml @@ -19,6 +19,7 @@ tendermint-light-client-verifier = { workspace = true, default-features = false, ] } tendermint-rpc = { workspace = true, features = ["http-client"] } anyhow = { workspace = true } +async-trait = { workspace = true } tracing.workspace = true [dev-dependencies] diff --git a/crates/cbft-rpc/src/lib.rs b/crates/cbft-rpc/src/lib.rs index ef158d30..1498f6c3 100644 --- a/crates/cbft-rpc/src/lib.rs +++ b/crates/cbft-rpc/src/lib.rs @@ -5,6 +5,42 @@ use tendermint::validator::Info; use tendermint_light_client_verifier::types::{LightBlock, PeerId, ValidatorSet}; use tendermint_rpc::{Client, HttpClient}; +/// Abstraction over Cosmos RPC calls used by the sequencer set hash monitor. +/// Enables mock-based testing of sync logic without real RPC. +#[async_trait::async_trait] +pub trait CosmosRpcProvider: Send + Sync { + /// Fetch the latest block height from the Cosmos chain. + async fn get_latest_block_height(&self) -> Result; + + /// Fetch validators_hash and goat block number for a given cosmos block. + /// Returns `None` for goat_block_number when the block has no CBFT tx data. + async fn get_validators_hash_and_goat_block( + &self, + cosmos_block_height: u64, + ) -> Result<([u8; 32], Option)>; +} + +/// Default implementation that delegates to real Cosmos RPC endpoints. +#[derive(Debug)] +pub struct DefaultCosmosRpcProvider { + pub cosmos_rpc_url: String, +} + +#[async_trait::async_trait] +impl CosmosRpcProvider for DefaultCosmosRpcProvider { + async fn get_latest_block_height(&self) -> Result { + fetch_latest_cosmos_block_height(&self.cosmos_rpc_url).await + } + + async fn get_validators_hash_and_goat_block( + &self, + cosmos_block_height: u64, + ) -> Result<([u8; 32], Option)> { + fetch_cbft_block_validators_hash_and_goat_block(&self.cosmos_rpc_url, cosmos_block_height) + .await + } +} + #[tracing::instrument(level = "info")] pub async fn fetch_validators(cosmos_rpc_url: &str, block_height: u64) -> Result> { let rpc = HttpClient::new(cosmos_rpc_url).unwrap(); @@ -15,6 +51,44 @@ pub async fn fetch_validators(cosmos_rpc_url: &str, block_height: u64) -> Result Ok(validators_response.validators) } +#[tracing::instrument(level = "info")] +pub async fn fetch_latest_cosmos_block_height(cosmos_rpc_url: &str) -> Result { + let rpc = HttpClient::new(cosmos_rpc_url).unwrap(); + let status = rpc.status().await.map_err(|e| anyhow!("Error fetching status: {e:?}"))?; + Ok(status.sync_info.latest_block_height.into()) +} + +/// Fetch the validators_hash and goat block number for a given cosmos block. +/// Returns `None` for goat_block_number when the block has no CBFT tx data +/// or the payload cannot be parsed (e.g. empty blocks). +#[tracing::instrument(level = "info")] +pub async fn fetch_cbft_block_validators_hash_and_goat_block( + cosmos_rpc_url: &str, + cosmos_block_height: u64, +) -> Result<([u8; 32], Option)> { + let rpc = HttpClient::new(cosmos_rpc_url).unwrap(); + let block_data = + rpc.block(Height::try_from(cosmos_block_height).unwrap()).await.map_err(|e| { + anyhow!("Error fetching block data for height {cosmos_block_height}: {e:?}") + })?; + + let validators_hash: [u8; 32] = block_data + .block + .header + .validators_hash + .as_bytes() + .try_into() + .map_err(|_| anyhow!("Invalid validators_hash length"))?; + + if block_data.block.data.is_empty() { + return Ok((validators_hash, None)); + } + + let goat_block_number = + parse_cbft_tx_payload(&block_data.block.data[0]).map(|payload| payload.block_number); + Ok((validators_hash, goat_block_number)) +} + // get cosmos block height from block hash: curl https://rpc.testnet3.goat.network/goat-rpc/block_by_hash?hash=0xd2e236b8f89278a527a042727cd4eebb59a566006a844f294da54ed727b95470 pub async fn get_cosmos_block_height_at( cosmos_rpc_url: &str, diff --git a/crates/store/migrations/20260206160000_create_sequencer_set_hash_monitor_tables.sql b/crates/store/migrations/20260206160000_create_sequencer_set_hash_monitor_tables.sql new file mode 100644 index 00000000..aa4ebc14 --- /dev/null +++ b/crates/store/migrations/20260206160000_create_sequencer_set_hash_monitor_tables.sql @@ -0,0 +1,22 @@ +CREATE TABLE IF NOT EXISTS sequencer_set_hash_changes +( + id INTEGER PRIMARY KEY AUTOINCREMENT, + cosmos_block_height BIGINT NOT NULL UNIQUE, + goat_block_height BIGINT NOT NULL, + validators_hash TEXT NOT NULL, + created_at BIGINT NOT NULL DEFAULT 0, + updated_at BIGINT NOT NULL DEFAULT 0 +); + +CREATE INDEX IF NOT EXISTS idx_sequencer_set_hash_changes_goat_block_height + ON sequencer_set_hash_changes (goat_block_height); + +CREATE TABLE IF NOT EXISTS sequencer_set_scan_state +( + id INTEGER PRIMARY KEY CHECK (id = 1), + next_cosmos_block_height BIGINT NOT NULL, + latest_goat_block_height BIGINT NOT NULL, + latest_validators_hash TEXT NOT NULL, + created_at BIGINT NOT NULL DEFAULT 0, + updated_at BIGINT NOT NULL DEFAULT 0 +); diff --git a/crates/store/src/localdb.rs b/crates/store/src/localdb.rs index 6fd564c3..5acc76ab 100644 --- a/crates/store/src/localdb.rs +++ b/crates/store/src/localdb.rs @@ -2,7 +2,8 @@ use crate::utils::{QueryBuilder, QueryParam, create_place_holders}; use crate::{ BridgeOutGlobalStats, GoatTxRecord, Graph, GraphBtcTxVoutMonitor, GraphRawData, Instance, LongRunningTaskProof, Message, Node, NodesOverview, OperatorProof, PeginGraphProcessData, - PeginInstanceProcessData, SerializableTxid, WatchContract, WatchtowerProof, + PeginInstanceProcessData, SequencerSetHashChange, SequencerSetScanState, SerializableTxid, + WatchContract, WatchtowerProof, }; use indexmap::IndexMap; @@ -2908,6 +2909,105 @@ impl<'a> StorageProcessor<'a> { .await?; Ok(res) } + + pub async fn upsert_sequencer_set_hash_change( + &mut self, + cosmos_block_height: i64, + goat_block_height: i64, + validators_hash: &str, + ) -> anyhow::Result { + let current_time = get_current_timestamp_secs(); + let res = sqlx::query( + r#"INSERT INTO sequencer_set_hash_changes (cosmos_block_height, goat_block_height, validators_hash, created_at, updated_at) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (cosmos_block_height) DO UPDATE + SET goat_block_height = excluded.goat_block_height, + validators_hash = excluded.validators_hash, + updated_at = excluded.updated_at"#, + ) + .bind(cosmos_block_height) + .bind(goat_block_height) + .bind(validators_hash) + .bind(current_time) + .bind(current_time) + .execute(self.conn()) + .await?; + Ok(res.rows_affected()) + } + + pub async fn find_latest_sequencer_set_hash_change( + &mut self, + ) -> anyhow::Result> { + let res = sqlx::query_as::<_, SequencerSetHashChange>( + "SELECT * FROM sequencer_set_hash_changes ORDER BY cosmos_block_height DESC LIMIT 1", + ) + .fetch_optional(self.conn()) + .await?; + Ok(res) + } + + pub async fn find_first_sequencer_set_hash_change_by_goat_block_at_or_after( + &mut self, + goat_block_height: i64, + ) -> anyhow::Result> { + let res = sqlx::query_as::<_, SequencerSetHashChange>( + "SELECT * FROM sequencer_set_hash_changes WHERE goat_block_height >= ? ORDER BY goat_block_height ASC LIMIT 1", + ) + .bind(goat_block_height) + .fetch_optional(self.conn()) + .await?; + Ok(res) + } + + pub async fn find_first_sequencer_set_hash_change_by_cosmos_block_at_or_after( + &mut self, + cosmos_block_height: i64, + ) -> anyhow::Result> { + let res = sqlx::query_as::<_, SequencerSetHashChange>( + "SELECT * FROM sequencer_set_hash_changes WHERE cosmos_block_height >= ? ORDER BY cosmos_block_height ASC LIMIT 1", + ) + .bind(cosmos_block_height) + .fetch_optional(self.conn()) + .await?; + Ok(res) + } + + pub async fn get_sequencer_set_scan_state( + &mut self, + ) -> anyhow::Result> { + let res = sqlx::query_as::<_, SequencerSetScanState>( + "SELECT * FROM sequencer_set_scan_state WHERE id = 1 LIMIT 1", + ) + .fetch_optional(self.conn()) + .await?; + Ok(res) + } + + pub async fn upsert_sequencer_set_scan_state( + &mut self, + next_cosmos_block_height: i64, + latest_goat_block_height: i64, + latest_validators_hash: &str, + ) -> anyhow::Result { + let current_time = get_current_timestamp_secs(); + let res = sqlx::query( + r#"INSERT INTO sequencer_set_scan_state (id, next_cosmos_block_height, latest_goat_block_height, latest_validators_hash, created_at, updated_at) + VALUES (1, ?, ?, ?, ?, ?) + ON CONFLICT (id) DO UPDATE + SET next_cosmos_block_height = excluded.next_cosmos_block_height, + latest_goat_block_height = excluded.latest_goat_block_height, + latest_validators_hash = excluded.latest_validators_hash, + updated_at = excluded.updated_at"#, + ) + .bind(next_cosmos_block_height) + .bind(latest_goat_block_height) + .bind(latest_validators_hash) + .bind(current_time) + .bind(current_time) + .execute(self.conn()) + .await?; + Ok(res.rows_affected()) + } } // fn truncate_string(s: &str, max_len: usize) -> &str { @@ -2919,3 +3019,134 @@ pub async fn create_local_db(db_path: &str) -> LocalDB { local_db.migrate().await; local_db } + +#[cfg(test)] +mod sequencer_set_tests { + use super::*; + + async fn setup_db() -> LocalDB { + create_local_db("sqlite::memory:").await + } + + #[tokio::test] + async fn test_upsert_and_find_latest_hash_change() { + let db = setup_db().await; + let mut s = db.acquire().await.unwrap(); + + s.upsert_sequencer_set_hash_change(100, 1000, "aabbcc").await.unwrap(); + s.upsert_sequencer_set_hash_change(200, 2000, "ddeeff").await.unwrap(); + s.upsert_sequencer_set_hash_change(150, 1500, "112233").await.unwrap(); + + let latest = s.find_latest_sequencer_set_hash_change().await.unwrap().unwrap(); + assert_eq!(latest.cosmos_block_height, 200); + assert_eq!(latest.goat_block_height, 2000); + assert_eq!(latest.validators_hash, "ddeeff"); + } + + #[tokio::test] + async fn test_upsert_hash_change_conflict() { + let db = setup_db().await; + let mut s = db.acquire().await.unwrap(); + + s.upsert_sequencer_set_hash_change(100, 1000, "aabbcc").await.unwrap(); + // Same cosmos_block_height, different data — should overwrite + s.upsert_sequencer_set_hash_change(100, 1001, "ddeeff").await.unwrap(); + + let latest = s.find_latest_sequencer_set_hash_change().await.unwrap().unwrap(); + assert_eq!(latest.cosmos_block_height, 100); + assert_eq!(latest.goat_block_height, 1001); + assert_eq!(latest.validators_hash, "ddeeff"); + } + + #[tokio::test] + async fn test_find_by_goat_block_at_or_after() { + let db = setup_db().await; + let mut s = db.acquire().await.unwrap(); + + s.upsert_sequencer_set_hash_change(100, 1000, "aa").await.unwrap(); + s.upsert_sequencer_set_hash_change(200, 2000, "bb").await.unwrap(); + s.upsert_sequencer_set_hash_change(300, 3000, "cc").await.unwrap(); + + // Exact match + let r = s + .find_first_sequencer_set_hash_change_by_goat_block_at_or_after(2000) + .await + .unwrap() + .unwrap(); + assert_eq!(r.goat_block_height, 2000); + + // Between records — should return next one + let r = s + .find_first_sequencer_set_hash_change_by_goat_block_at_or_after(1500) + .await + .unwrap() + .unwrap(); + assert_eq!(r.goat_block_height, 2000); + + // Beyond all records — should return None + let r = + s.find_first_sequencer_set_hash_change_by_goat_block_at_or_after(4000).await.unwrap(); + assert!(r.is_none()); + } + + #[tokio::test] + async fn test_find_by_cosmos_block_at_or_after() { + let db = setup_db().await; + let mut s = db.acquire().await.unwrap(); + + s.upsert_sequencer_set_hash_change(100, 1000, "aa").await.unwrap(); + s.upsert_sequencer_set_hash_change(200, 2000, "bb").await.unwrap(); + + let r = s + .find_first_sequencer_set_hash_change_by_cosmos_block_at_or_after(150) + .await + .unwrap() + .unwrap(); + assert_eq!(r.cosmos_block_height, 200); + + let r = + s.find_first_sequencer_set_hash_change_by_cosmos_block_at_or_after(300).await.unwrap(); + assert!(r.is_none()); + } + + #[tokio::test] + async fn test_scan_state_upsert_and_get() { + let db = setup_db().await; + let mut s = db.acquire().await.unwrap(); + + assert!(s.get_sequencer_set_scan_state().await.unwrap().is_none()); + + s.upsert_sequencer_set_scan_state(100, 1000, "aabbcc").await.unwrap(); + let state = s.get_sequencer_set_scan_state().await.unwrap().unwrap(); + assert_eq!(state.next_cosmos_block_height, 100); + assert_eq!(state.latest_goat_block_height, 1000); + assert_eq!(state.latest_validators_hash, "aabbcc"); + + // Update + s.upsert_sequencer_set_scan_state(200, 2000, "ddeeff").await.unwrap(); + let state = s.get_sequencer_set_scan_state().await.unwrap().unwrap(); + assert_eq!(state.next_cosmos_block_height, 200); + assert_eq!(state.latest_validators_hash, "ddeeff"); + } + + #[tokio::test] + async fn test_find_returns_none_when_empty() { + let db = setup_db().await; + let mut s = db.acquire().await.unwrap(); + + assert!(s.find_latest_sequencer_set_hash_change().await.unwrap().is_none()); + assert!( + s.find_first_sequencer_set_hash_change_by_goat_block_at_or_after(0) + .await + .unwrap() + .is_none() + ); + assert!( + s.find_first_sequencer_set_hash_change_by_cosmos_block_at_or_after(0) + .await + .unwrap() + .is_none() + ); + assert!(s.get_sequencer_set_scan_state().await.unwrap().is_none()); + } +} diff --git a/crates/store/src/schema.rs b/crates/store/src/schema.rs index d37426d1..2d8d9ad6 100644 --- a/crates/store/src/schema.rs +++ b/crates/store/src/schema.rs @@ -729,6 +729,26 @@ pub struct LongRunningTaskProof { pub updated_at: i64, } +#[derive(Clone, FromRow, Debug, Serialize, Deserialize, Default)] +pub struct SequencerSetHashChange { + pub id: i64, + pub cosmos_block_height: i64, + pub goat_block_height: i64, + pub validators_hash: String, + pub created_at: i64, + pub updated_at: i64, +} + +#[derive(Clone, FromRow, Debug, Serialize, Deserialize, Default)] +pub struct SequencerSetScanState { + pub id: i64, + pub next_cosmos_block_height: i64, + pub latest_goat_block_height: i64, + pub latest_validators_hash: String, + pub created_at: i64, + pub updated_at: i64, +} + #[cfg(test)] mod tests { use super::*; diff --git a/node/src/bin/sequencer-set-publish.rs b/node/src/bin/sequencer-set-publish.rs index 7a3941b6..d3900eb7 100644 --- a/node/src/bin/sequencer-set-publish.rs +++ b/node/src/bin/sequencer-set-publish.rs @@ -27,7 +27,7 @@ use dotenv::dotenv; use hex::FromHex; use tracing_subscriber::EnvFilter; -use cbft_rpc::{fetch_cbft_validator_info, fetch_validators}; +use cbft_rpc::fetch_validators; const RELAYER_FEE: u64 = 500; // satoshis, just to make sure the tx gets mined in time. This is not the actual fee amount. use bitcoin::secp256k1::{Message, Secp256k1}; @@ -83,6 +83,9 @@ struct Args { #[arg(long, env = "OUTPUT_FILE", default_value = "output.data")] output_file: String, + + #[arg(long, env = "DB_PATH", default_value = "sqlite:/tmp/bitvm2-node.db")] + db_path: String, } #[derive(Default, Serialize, Deserialize, Debug)] @@ -158,6 +161,33 @@ fn save_output(input: OutputData, output_file: &str) { std::fs::write(output_file, serde_json::to_string_pretty(&output).unwrap()).unwrap(); } +/// Query sequencer set hash from local DB. +/// If `goat_block_number` is provided, find the first record at or after that goat block. +/// If not provided, return the latest record. +async fn get_sequencer_set_hash_from_db( + db_path: &str, + goat_block_number: Option, +) -> Result<([u8; 32], u64, u64), Box> { + let local_db = store::create_local_db(db_path).await; + let mut storage = local_db.acquire().await?; + let record = if let Some(goat_block_number) = goat_block_number { + storage + .find_first_sequencer_set_hash_change_by_goat_block_at_or_after(i64::try_from( + goat_block_number, + )?) + .await? + } else { + storage.find_latest_sequencer_set_hash_change().await? + }; + let record = + record.ok_or("No validators_hash record found in db. Start the monitor task first")?; + + let sequencer_set_hash = <[u8; 32]>::from_hex(record.validators_hash.trim_start_matches("0x"))?; + let goat_block_number = u64::try_from(record.goat_block_height)?; + let cosmos_block_number = u64::try_from(record.cosmos_block_height)?; + Ok((sequencer_set_hash, goat_block_number, cosmos_block_number)) +} + #[derive(Subcommand, Debug)] enum Commands { Pubkey { @@ -174,7 +204,7 @@ enum Commands { #[arg(long, env = "OWNER_BTC_KEY_WIF")] owner_btc_key_wif: Option, #[arg(long)] - goat_block_number: u64, + goat_block_number: Option, #[arg(long, env = "PUBLISHER_BTC_PUBKEYS", value_delimiter = ',', value_parser = decode_btc_public_keys)] publisher_btc_pubkeys: Vec, #[arg(long, env = "NEXT_PUBLISHER_BTC_PUBKEYS", value_delimiter = ',', value_parser = decode_btc_public_keys)] @@ -186,7 +216,7 @@ enum Commands { #[arg(long, env = "OWNER_BTC_KEY_WIF")] owner_btc_key_wif: Option, #[arg(long)] - goat_block_number: u64, + goat_block_number: Option, #[arg(long, env = "PUBLISHER_BTC_PUBKEYS", value_delimiter = ',', value_parser = decode_btc_public_keys)] publisher_btc_pubkeys: Vec, #[arg(long, env = "NEXT_PUBLISHER_BTC_PUBKEYS", value_delimiter = ',', value_parser = decode_btc_public_keys)] @@ -286,10 +316,11 @@ async fn main() -> Result<(), Box> { next_publisher_btc_pubkeys, goat_genesis_block_hash, } => { - let (sequencer_set_hash, cl_block_number) = - fetch_cbft_validator_info(&args.cosmos_rpc_url, goat_block_number, None, 1000) - .await?; - println!("cl block number: {cl_block_number}, el block number: {goat_block_number}"); + let (sequencer_set_hash, goat_block_number, cosmos_block_number) = + get_sequencer_set_hash_from_db(&args.db_path, goat_block_number).await?; + println!( + "resolved cl block number: {cosmos_block_number}, resolved el block number: {goat_block_number}" + ); let fee_tx = cached_output.fee_tx.unwrap(); let update_connector = cached_output.update_connector; @@ -318,9 +349,8 @@ async fn main() -> Result<(), Box> { commit_info, } => { println!("goat genesis block hash: {:#?}", hex::encode(goat_genesis_block_hash)); - let (sequencer_set_hash, cosmos_block_number) = - fetch_cbft_validator_info(&args.cosmos_rpc_url, goat_block_number, None, 1000) - .await?; + let (sequencer_set_hash, goat_block_number, cosmos_block_number) = + get_sequencer_set_hash_from_db(&args.db_path, goat_block_number).await?; let sequencers = fetch_validators(&args.cosmos_rpc_url, cosmos_block_number).await?; let fee_tx = cached_output.fee_tx; diff --git a/node/src/lib.rs b/node/src/lib.rs index ab325b65..3042c519 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -8,7 +8,9 @@ pub mod p2p_msg_handler; pub mod rpc_service; mod scheduled_tasks; pub mod utils; -pub use scheduled_tasks::{run_maintenance_tasks, run_watch_event_task}; +pub use scheduled_tasks::{ + run_maintenance_tasks, run_sequencer_set_hash_monitor_task, run_watch_event_task, +}; mod error; mod vk; diff --git a/node/src/main.rs b/node/src/main.rs index bfcf7d03..d873597a 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -16,7 +16,9 @@ use tracing_subscriber::EnvFilter; use bitvm2_noded::utils::{ self, generate_local_key, save_local_info, set_node_external_socket_addr_env, }; -use bitvm2_noded::{rpc_service, run_maintenance_tasks, run_watch_event_task}; +use bitvm2_noded::{ + rpc_service, run_maintenance_tasks, run_sequencer_set_hash_monitor_task, run_watch_event_task, +}; use anyhow::Result; use bitvm2_noded::middleware::swarm::{Bitvm2SwarmConfig, BitvmNetworkManager}; @@ -42,6 +44,26 @@ struct Opts { #[arg(long, default_value = "sqlite:/tmp/bitvm2-node.db")] pub db_path: String, + /// Enable sequencer_set_hash monitor task + #[arg(long, env = "ENABLE_SEQUENCER_SET_HASH_MONITOR", default_value_t = false)] + pub enable_sequencer_set_hash_monitor: bool, + + /// Start cosmos block for sequencer_set_hash monitor + #[arg(long, env = "SEQUENCER_SET_MONITOR_START_COSMOS_BLOCK")] + pub sequencer_set_monitor_start_cosmos_block: Option, + + /// Monitor polling interval seconds + #[arg(long, env = "SEQUENCER_SET_MONITOR_INTERVAL_SECS", default_value_t = 5)] + pub sequencer_set_monitor_interval_secs: u64, + + /// Cosmos RPC URL used by sequencer_set_hash monitor + #[arg( + long, + env = "COSMOS_RPC_URL", + default_value = "https://rpc.testnet3.goat.network/goat-rpc" + )] + pub cosmos_rpc_url: String, + /// Peer nodes as the bootnodes #[arg(long)] bootnodes: Vec, @@ -118,6 +140,17 @@ async fn main() -> Result<(), Box> { return Ok(()); } let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).try_init(); + + let is_committee = actor == Actor::Committee || actor == Actor::All; + if is_committee + && opt.enable_sequencer_set_hash_monitor + && opt.sequencer_set_monitor_start_cosmos_block.is_none() + { + return Err(anyhow::anyhow!( + "sequencer_set_monitor_start_cosmos_block is required when sequencer set monitor is enabled" + ) + .into()); + } let mut metric_registry = Registry::default(); // Create cancellation token for graceful shutdown @@ -156,6 +189,7 @@ async fn main() -> Result<(), Box> { let local_db_clone1 = local_db.clone(); let local_db_clone2 = local_db.clone(); let local_db_clone3 = local_db.clone(); + let local_db_clone4 = local_db.clone(); let opt_rpc_addr = opt.rpc_addr.clone(); let peer_id_string_clone = peer_id_string.clone(); let metric_registry_clone = Arc::new(Mutex::new(metric_registry)); @@ -211,6 +245,33 @@ async fn main() -> Result<(), Box> { } } })); + + if opt.enable_sequencer_set_hash_monitor && is_committee { + let start_cosmos_block = opt.sequencer_set_monitor_start_cosmos_block.unwrap(); + let monitor_interval = opt.sequencer_set_monitor_interval_secs; + let cosmos_rpc_url = opt.cosmos_rpc_url.clone(); + let cancel_token_clone = cancellation_token.clone(); + task_handles.push(tokio::spawn(async move { + let goat_client = + Arc::new(GOATClient::new(goat_config_from_env().await, get_goat_network())); + match run_sequencer_set_hash_monitor_task( + local_db_clone4, + goat_client, + cosmos_rpc_url, + start_cosmos_block, + monitor_interval, + cancel_token_clone, + ) + .await + { + Ok(tag) => Ok(tag), + Err(e) => { + tracing::error!("Sequencer set monitor task error: {}", e); + Err("sequencer_set_monitor_error".to_string()) + } + } + })); + } // } let cancel_token_clone = cancellation_token.clone(); diff --git a/node/src/scheduled_tasks/mod.rs b/node/src/scheduled_tasks/mod.rs index 82cf52ce..3bf51978 100644 --- a/node/src/scheduled_tasks/mod.rs +++ b/node/src/scheduled_tasks/mod.rs @@ -2,6 +2,7 @@ mod event_watch_task; pub mod graph_maintenance_tasks; pub mod instance_maintenance_tasks; mod node_maintenance_tasks; +mod sequencer_set_hash_monitor_task; mod spv_maintenance_tasks; use crate::action::GOATMessageContent; @@ -19,6 +20,7 @@ use bitvm2_lib::actors::Actor; use client::btc_chain::BTCClient; use client::goat_chain::GOATClient; pub use event_watch_task::{is_processing_gateway_history_events, run_watch_event_task}; +pub use sequencer_set_hash_monitor_task::run_sequencer_set_hash_monitor_task; use std::sync::Arc; use std::time::Duration; use store::localdb::{LocalDB, StorageProcessor}; diff --git a/node/src/scheduled_tasks/sequencer_set_hash_monitor_task.rs b/node/src/scheduled_tasks/sequencer_set_hash_monitor_task.rs new file mode 100644 index 00000000..23b6369a --- /dev/null +++ b/node/src/scheduled_tasks/sequencer_set_hash_monitor_task.rs @@ -0,0 +1,357 @@ +use anyhow::Context; +use cbft_rpc::{CosmosRpcProvider, DefaultCosmosRpcProvider}; +use std::sync::Arc; +use std::time::Duration; +use store::localdb::LocalDB; +use tokio_util::sync::CancellationToken; +use tracing::{info, warn}; + +pub(crate) async fn load_or_init_scan_state( + local_db: &LocalDB, + rpc: &impl CosmosRpcProvider, + start_cosmos_block: u64, +) -> anyhow::Result { + let mut storage = local_db.acquire().await?; + if let Some(state) = storage.get_sequencer_set_scan_state().await? { + return Ok(state); + } + + if let Some(last_change) = storage.find_latest_sequencer_set_hash_change().await? { + let next_cosmos_block_height = last_change.cosmos_block_height.saturating_add(1); + storage + .upsert_sequencer_set_scan_state( + next_cosmos_block_height, + last_change.goat_block_height, + &last_change.validators_hash, + ) + .await?; + return Ok(store::SequencerSetScanState { + id: 1, + next_cosmos_block_height, + latest_goat_block_height: last_change.goat_block_height, + latest_validators_hash: last_change.validators_hash, + created_at: 0, + updated_at: 0, + }); + } + + // Drop the DB connection before making the async RPC call to avoid holding it across await. + drop(storage); + let (validators_hash, goat_block_height) = + rpc.get_validators_hash_and_goat_block(start_cosmos_block).await?; + let goat_block_height = goat_block_height.ok_or_else(|| { + anyhow::anyhow!( + "Cosmos block {} has no CBFT tx data, cannot initialize scan state from this block", + start_cosmos_block + ) + })?; + let validators_hash_hex = hex::encode(validators_hash); + let start_cosmos_block = + i64::try_from(start_cosmos_block).context("start cosmos block exceeds i64")?; + let goat_block_height = + i64::try_from(goat_block_height).context("goat block height exceeds i64")?; + let next_cosmos_block_height = start_cosmos_block.saturating_add(1); + + let mut storage = local_db.acquire().await?; + storage + .upsert_sequencer_set_hash_change( + start_cosmos_block, + goat_block_height, + &validators_hash_hex, + ) + .await?; + storage + .upsert_sequencer_set_scan_state( + next_cosmos_block_height, + goat_block_height, + &validators_hash_hex, + ) + .await?; + info!( + "Initialized sequencer_set_hash monitor at cosmos block {}, validators_hash {}", + start_cosmos_block, validators_hash_hex + ); + + Ok(store::SequencerSetScanState { + id: 1, + next_cosmos_block_height, + latest_goat_block_height: goat_block_height, + latest_validators_hash: validators_hash_hex, + created_at: 0, + updated_at: 0, + }) +} + +pub(crate) async fn sync_sequencer_set_hash_changes( + local_db: &LocalDB, + rpc: &impl CosmosRpcProvider, + start_cosmos_block: u64, +) -> anyhow::Result<()> { + let mut state = load_or_init_scan_state(local_db, rpc, start_cosmos_block).await?; + let latest_cosmos_block = rpc.get_latest_block_height().await?; + let latest_cosmos_block = + i64::try_from(latest_cosmos_block).context("latest cosmos block exceeds i64")?; + + if state.next_cosmos_block_height > latest_cosmos_block { + return Ok(()); + } + + const SCAN_STATE_FLUSH_INTERVAL: i64 = 100; + let mut blocks_since_last_flush: i64 = 0; + + while state.next_cosmos_block_height <= latest_cosmos_block { + let cosmos_block_height = u64::try_from(state.next_cosmos_block_height) + .context("cosmos block height is negative")?; + let (validators_hash, goat_block_height) = + rpc.get_validators_hash_and_goat_block(cosmos_block_height).await?; + + // Skip cosmos blocks without CBFT tx data (empty blocks or unparsable payload). + let goat_block_height = match goat_block_height { + Some(h) => h, + None => { + state.next_cosmos_block_height = state.next_cosmos_block_height.saturating_add(1); + blocks_since_last_flush += 1; + continue; + } + }; + + let validators_hash_hex = hex::encode(validators_hash); + let goat_block_height = + i64::try_from(goat_block_height).context("goat block height exceeds i64")?; + + if state.latest_validators_hash != validators_hash_hex { + let mut storage = local_db.acquire().await?; + storage + .upsert_sequencer_set_hash_change( + state.next_cosmos_block_height, + goat_block_height, + &validators_hash_hex, + ) + .await?; + info!( + "Detected validators_hash change at cosmos block {} (goat block {}): {}", + state.next_cosmos_block_height, goat_block_height, validators_hash_hex + ); + state.latest_validators_hash = validators_hash_hex; + } + + state.latest_goat_block_height = goat_block_height; + state.next_cosmos_block_height = state.next_cosmos_block_height.saturating_add(1); + blocks_since_last_flush += 1; + + // Flush scan state periodically to reduce DB writes during catch-up. + if blocks_since_last_flush >= SCAN_STATE_FLUSH_INTERVAL + || state.next_cosmos_block_height > latest_cosmos_block + { + let mut storage = local_db.acquire().await?; + storage + .upsert_sequencer_set_scan_state( + state.next_cosmos_block_height, + state.latest_goat_block_height, + &state.latest_validators_hash, + ) + .await?; + blocks_since_last_flush = 0; + info!( + "Scan progress: cosmos block {}/{}", + state.next_cosmos_block_height, latest_cosmos_block + ); + } + } + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +pub async fn run_sequencer_set_hash_monitor_task( + local_db: LocalDB, + _goat_client: Arc, + cosmos_rpc_url: String, + start_cosmos_block: u64, + interval_secs: u64, + cancellation_token: CancellationToken, +) -> anyhow::Result { + let rpc = DefaultCosmosRpcProvider { cosmos_rpc_url }; + loop { + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(interval_secs)) => { + if let Err(err) = sync_sequencer_set_hash_changes( + &local_db, + &rpc, + start_cosmos_block, + ).await { + warn!("sequencer_set_hash monitor sync failed: {err:?}"); + } + } + _ = cancellation_token.cancelled() => { + info!("Sequencer set hash monitor task received shutdown signal"); + return Ok("sequencer_set_hash_monitor_shutdown".to_string()); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use anyhow::Result; + use cbft_rpc::CosmosRpcProvider; + use std::collections::HashMap; + use store::localdb::create_local_db; + + /// Mock Cosmos RPC provider for testing. + /// `blocks` maps cosmos_block_height -> (validators_hash, Option). + struct MockCosmosRpc { + blocks: HashMap)>, + latest_height: u64, + } + + #[async_trait::async_trait] + impl CosmosRpcProvider for MockCosmosRpc { + async fn get_latest_block_height(&self) -> Result { + Ok(self.latest_height) + } + async fn get_validators_hash_and_goat_block( + &self, + cosmos_block_height: u64, + ) -> Result<([u8; 32], Option)> { + self.blocks + .get(&cosmos_block_height) + .cloned() + .ok_or_else(|| anyhow::anyhow!("block {} not found in mock", cosmos_block_height)) + } + } + + fn make_hash(byte: u8) -> [u8; 32] { + [byte; 32] + } + + #[tokio::test] + async fn test_sync_init_from_empty() { + let db = create_local_db("sqlite::memory:").await; + let mut blocks = HashMap::new(); + // Blocks 10..=12, all with same validators_hash + for i in 10..=12 { + blocks.insert(i, (make_hash(0xAA), Some(1000 + i))); + } + let rpc = MockCosmosRpc { blocks, latest_height: 12 }; + + sync_sequencer_set_hash_changes(&db, &rpc, 10).await.unwrap(); + + let mut s = db.acquire().await.unwrap(); + // Initial block should be recorded as a hash change + let latest = s.find_latest_sequencer_set_hash_change().await.unwrap().unwrap(); + assert_eq!(latest.cosmos_block_height, 10); + assert_eq!(latest.validators_hash, hex::encode(make_hash(0xAA))); + + // Scan state should be at block 13 (past latest) + let state = s.get_sequencer_set_scan_state().await.unwrap().unwrap(); + assert_eq!(state.next_cosmos_block_height, 13); + } + + #[tokio::test] + async fn test_sync_detects_hash_change() { + let db = create_local_db("sqlite::memory:").await; + let mut blocks = HashMap::new(); + // Blocks 10..=12 with hash AA, blocks 13..=15 with hash BB + for i in 10..=12 { + blocks.insert(i, (make_hash(0xAA), Some(1000 + i))); + } + for i in 13..=15 { + blocks.insert(i, (make_hash(0xBB), Some(1000 + i))); + } + let rpc = MockCosmosRpc { blocks, latest_height: 15 }; + + sync_sequencer_set_hash_changes(&db, &rpc, 10).await.unwrap(); + + let mut s = db.acquire().await.unwrap(); + // Should have 2 hash change records: one at block 10 (init), one at block 13 (change) + let first = s + .find_first_sequencer_set_hash_change_by_cosmos_block_at_or_after(10) + .await + .unwrap() + .unwrap(); + assert_eq!(first.cosmos_block_height, 10); + assert_eq!(first.validators_hash, hex::encode(make_hash(0xAA))); + + let second = s + .find_first_sequencer_set_hash_change_by_cosmos_block_at_or_after(11) + .await + .unwrap() + .unwrap(); + assert_eq!(second.cosmos_block_height, 13); + assert_eq!(second.validators_hash, hex::encode(make_hash(0xBB))); + } + + #[tokio::test] + async fn test_sync_skips_empty_blocks() { + let db = create_local_db("sqlite::memory:").await; + let mut blocks = HashMap::new(); + blocks.insert(10, (make_hash(0xAA), Some(1000))); + blocks.insert(11, (make_hash(0xAA), None)); // empty block + blocks.insert(12, (make_hash(0xAA), None)); // empty block + blocks.insert(13, (make_hash(0xAA), Some(1003))); + let rpc = MockCosmosRpc { blocks, latest_height: 13 }; + + sync_sequencer_set_hash_changes(&db, &rpc, 10).await.unwrap(); + + // Scan state should advance past all blocks including empty ones + let mut s = db.acquire().await.unwrap(); + let state = s.get_sequencer_set_scan_state().await.unwrap().unwrap(); + assert_eq!(state.next_cosmos_block_height, 14); + assert_eq!(state.latest_goat_block_height, 1003); + } + + #[tokio::test] + async fn test_sync_resumes_from_scan_state() { + let db = create_local_db("sqlite::memory:").await; + + // Pre-seed scan state at block 13 + { + let mut s = db.acquire().await.unwrap(); + s.upsert_sequencer_set_hash_change(10, 1000, &hex::encode(make_hash(0xAA))) + .await + .unwrap(); + s.upsert_sequencer_set_scan_state(13, 1002, &hex::encode(make_hash(0xAA))) + .await + .unwrap(); + } + + let mut blocks = HashMap::new(); + // Only provide blocks 13..=15, earlier blocks should NOT be needed + for i in 13..=15 { + blocks.insert(i, (make_hash(0xAA), Some(1000 + i))); + } + let rpc = MockCosmosRpc { blocks, latest_height: 15 }; + + sync_sequencer_set_hash_changes(&db, &rpc, 10).await.unwrap(); + + let mut s = db.acquire().await.unwrap(); + let state = s.get_sequencer_set_scan_state().await.unwrap().unwrap(); + assert_eq!(state.next_cosmos_block_height, 16); + } + + #[tokio::test] + async fn test_sync_already_caught_up() { + let db = create_local_db("sqlite::memory:").await; + + // Pre-seed scan state already past latest + { + let mut s = db.acquire().await.unwrap(); + s.upsert_sequencer_set_scan_state(20, 1019, &hex::encode(make_hash(0xAA))) + .await + .unwrap(); + } + + let blocks = HashMap::new(); + let rpc = MockCosmosRpc { blocks, latest_height: 15 }; + + // Should return Ok(()) immediately without querying any blocks + sync_sequencer_set_hash_changes(&db, &rpc, 10).await.unwrap(); + + // Scan state should be unchanged + let mut s = db.acquire().await.unwrap(); + let state = s.get_sequencer_set_scan_state().await.unwrap().unwrap(); + assert_eq!(state.next_cosmos_block_height, 20); + } +} From e02b50f5a278601ad82d1a3eab49d838b2545fd4 Mon Sep 17 00:00:00 2001 From: Blake Date: Thu, 12 Feb 2026 01:05:42 +0800 Subject: [PATCH 2/5] feat: add environment variables and functions for sequencer set hash monitoring --- node/src/env.rs | 23 +++++++++++++++++++++++ node/src/main.rs | 40 +++++++++++----------------------------- 2 files changed, 34 insertions(+), 29 deletions(-) diff --git a/node/src/env.rs b/node/src/env.rs index 3648787d..91a754dc 100644 --- a/node/src/env.rs +++ b/node/src/env.rs @@ -77,6 +77,11 @@ pub const DEFAULT_OPERATOR_PROOF_WAIT_SECS: usize = 60; pub const ENV_ALWAYS_CHALLENGE: &str = "ALWAYS_CHALLENGE"; pub const ENV_GENESIS_SEQUENCER_COMMIT_TXID: &str = "GENESIS_SEQUENCER_COMMIT_TXID"; +pub const ENV_ENABLE_SEQUENCER_SET_HASH_MONITOR: &str = "ENABLE_SEQUENCER_SET_HASH_MONITOR"; +pub const ENV_SEQUENCER_SET_MONITOR_START_COSMOS_BLOCK: &str = + "SEQUENCER_SET_MONITOR_START_COSMOS_BLOCK"; +pub const ENV_COSMOS_RPC_URL: &str = "COSMOS_RPC_URL"; +pub const DEFAULT_COSMOS_RPC_URL: &str = "https://rpc.testnet3.goat.network/goat-rpc"; // fee estimate // TODO: more precise fee estimation @@ -122,6 +127,7 @@ pub const GATEWAY_RATE_MULTIPLIER: u64 = 10000; pub const HEARTBEAT_INTERVAL_SECOND: u64 = 60 * 5; pub const REGULAR_TASK_INTERVAL_SECOND: u64 = 20; +pub const SEQUENCER_SET_MONITOR_INTERVAL_SECS: u64 = 5; pub fn get_network() -> Network { let network = std::env::var(ENV_BITCOIN_NETWORK).unwrap_or("testnet4".to_string()); @@ -340,6 +346,23 @@ pub fn get_btc_url_from_env() -> Option { std::env::var(ENV_BTC_CHAIN_URL).ok() } +pub fn get_enable_sequencer_set_hash_monitor_from_env() -> bool { + match std::env::var(ENV_ENABLE_SEQUENCER_SET_HASH_MONITOR) { + Ok(value) => value.to_lowercase() == "true", + Err(_) => false, + } +} + +pub fn get_sequencer_set_monitor_start_cosmos_block_from_env() -> Option { + std::env::var(ENV_SEQUENCER_SET_MONITOR_START_COSMOS_BLOCK) + .ok() + .and_then(|value| value.parse::().ok()) +} + +pub fn get_cosmos_rpc_url_from_env() -> String { + std::env::var(ENV_COSMOS_RPC_URL).unwrap_or(DEFAULT_COSMOS_RPC_URL.to_string()) +} + pub fn get_goat_url_from_env() -> Url { std::env::var(ENV_GOAT_CHAIN_URL) .ok() diff --git a/node/src/main.rs b/node/src/main.rs index d873597a..95c87270 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -2,8 +2,8 @@ use base64::Engine; use bitvm2_lib::actors::Actor; use bitvm2_noded::env::{ - self, ENV_PEER_KEY, check_node_info, get_btc_url_from_env, get_goat_network, get_network, - get_node_pubkey, goat_config_from_env, + self, ENV_PEER_KEY, SEQUENCER_SET_MONITOR_INTERVAL_SECS, check_node_info, get_btc_url_from_env, + get_goat_network, get_network, get_node_pubkey, goat_config_from_env, }; use clap::{Parser, Subcommand}; use client::{btc_chain::BTCClient, goat_chain::GOATClient}; @@ -44,26 +44,6 @@ struct Opts { #[arg(long, default_value = "sqlite:/tmp/bitvm2-node.db")] pub db_path: String, - /// Enable sequencer_set_hash monitor task - #[arg(long, env = "ENABLE_SEQUENCER_SET_HASH_MONITOR", default_value_t = false)] - pub enable_sequencer_set_hash_monitor: bool, - - /// Start cosmos block for sequencer_set_hash monitor - #[arg(long, env = "SEQUENCER_SET_MONITOR_START_COSMOS_BLOCK")] - pub sequencer_set_monitor_start_cosmos_block: Option, - - /// Monitor polling interval seconds - #[arg(long, env = "SEQUENCER_SET_MONITOR_INTERVAL_SECS", default_value_t = 5)] - pub sequencer_set_monitor_interval_secs: u64, - - /// Cosmos RPC URL used by sequencer_set_hash monitor - #[arg( - long, - env = "COSMOS_RPC_URL", - default_value = "https://rpc.testnet3.goat.network/goat-rpc" - )] - pub cosmos_rpc_url: String, - /// Peer nodes as the bootnodes #[arg(long)] bootnodes: Vec, @@ -142,9 +122,12 @@ async fn main() -> Result<(), Box> { let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).try_init(); let is_committee = actor == Actor::Committee || actor == Actor::All; + let enable_sequencer_set_hash_monitor = env::get_enable_sequencer_set_hash_monitor_from_env(); + let sequencer_set_monitor_start_cosmos_block = + env::get_sequencer_set_monitor_start_cosmos_block_from_env(); if is_committee - && opt.enable_sequencer_set_hash_monitor - && opt.sequencer_set_monitor_start_cosmos_block.is_none() + && enable_sequencer_set_hash_monitor + && sequencer_set_monitor_start_cosmos_block.is_none() { return Err(anyhow::anyhow!( "sequencer_set_monitor_start_cosmos_block is required when sequencer set monitor is enabled" @@ -246,10 +229,9 @@ async fn main() -> Result<(), Box> { } })); - if opt.enable_sequencer_set_hash_monitor && is_committee { - let start_cosmos_block = opt.sequencer_set_monitor_start_cosmos_block.unwrap(); - let monitor_interval = opt.sequencer_set_monitor_interval_secs; - let cosmos_rpc_url = opt.cosmos_rpc_url.clone(); + if enable_sequencer_set_hash_monitor && is_committee { + let start_cosmos_block = sequencer_set_monitor_start_cosmos_block.unwrap(); + let cosmos_rpc_url = env::get_cosmos_rpc_url_from_env(); let cancel_token_clone = cancellation_token.clone(); task_handles.push(tokio::spawn(async move { let goat_client = @@ -259,7 +241,7 @@ async fn main() -> Result<(), Box> { goat_client, cosmos_rpc_url, start_cosmos_block, - monitor_interval, + SEQUENCER_SET_MONITOR_INTERVAL_SECS, cancel_token_clone, ) .await From d840cb54d8999f44765d36f966b4561ec5db8cb3 Mon Sep 17 00:00:00 2001 From: Blake Date: Thu, 12 Feb 2026 02:24:45 +0800 Subject: [PATCH 3/5] feat: enhance sequencer set hash monitoring with improved error handling and logging --- .../sequencer_set_hash_monitor_task.rs | 84 +++++++++++++------ 1 file changed, 57 insertions(+), 27 deletions(-) diff --git a/node/src/scheduled_tasks/sequencer_set_hash_monitor_task.rs b/node/src/scheduled_tasks/sequencer_set_hash_monitor_task.rs index 23b6369a..d7ffd7af 100644 --- a/node/src/scheduled_tasks/sequencer_set_hash_monitor_task.rs +++ b/node/src/scheduled_tasks/sequencer_set_hash_monitor_task.rs @@ -96,46 +96,75 @@ pub(crate) async fn sync_sequencer_set_hash_changes( return Ok(()); } - const SCAN_STATE_FLUSH_INTERVAL: i64 = 100; + let finish_info = format!( + "finish monitor seq_set_hash from: {}, to: {})", + state.next_cosmos_block_height, latest_cosmos_block + ); + + const SCAN_STATE_FLUSH_INTERVAL: i64 = 10; let mut blocks_since_last_flush: i64 = 0; while state.next_cosmos_block_height <= latest_cosmos_block { - let cosmos_block_height = u64::try_from(state.next_cosmos_block_height) - .context("cosmos block height is negative")?; - let (validators_hash, goat_block_height) = - rpc.get_validators_hash_and_goat_block(cosmos_block_height).await?; - - // Skip cosmos blocks without CBFT tx data (empty blocks or unparsable payload). - let goat_block_height = match goat_block_height { - Some(h) => h, - None => { - state.next_cosmos_block_height = state.next_cosmos_block_height.saturating_add(1); - blocks_since_last_flush += 1; - continue; + info!( + "Syncing sequencer_set_hash changes at cosmos block {}", + state.next_cosmos_block_height + ); + + // Define a closure or block to handle the per-block logic so we can catch errors + let result: anyhow::Result<()> = async { + let cosmos_block_height = u64::try_from(state.next_cosmos_block_height) + .context("cosmos block height is negative")?; + let (validators_hash, goat_block_height) = + rpc.get_validators_hash_and_goat_block(cosmos_block_height).await?; + + // Skip cosmos blocks without CBFT tx data (empty blocks or unparsable payload). + let goat_block_height = match goat_block_height { + Some(h) => h, + None => { + return Ok(()); + } + }; + + let validators_hash_hex = hex::encode(validators_hash); + let goat_block_height = + i64::try_from(goat_block_height).context("goat block height exceeds i64")?; + + if state.latest_validators_hash != validators_hash_hex { + let mut storage = local_db.acquire().await?; + storage + .upsert_sequencer_set_hash_change( + state.next_cosmos_block_height, + goat_block_height, + &validators_hash_hex, + ) + .await?; + info!( + "Detected validators_hash change at cosmos block {} (goat block {}): {}", + state.next_cosmos_block_height, goat_block_height, validators_hash_hex + ); + state.latest_validators_hash = validators_hash_hex; } - }; - let validators_hash_hex = hex::encode(validators_hash); - let goat_block_height = - i64::try_from(goat_block_height).context("goat block height exceeds i64")?; + state.latest_goat_block_height = goat_block_height; + + Ok(()) + } + .await; - if state.latest_validators_hash != validators_hash_hex { + if let Err(e) = result { + // Attempt to save state before returning the error let mut storage = local_db.acquire().await?; storage - .upsert_sequencer_set_hash_change( + .upsert_sequencer_set_scan_state( state.next_cosmos_block_height, - goat_block_height, - &validators_hash_hex, + state.latest_goat_block_height, + &state.latest_validators_hash, ) .await?; - info!( - "Detected validators_hash change at cosmos block {} (goat block {}): {}", - state.next_cosmos_block_height, goat_block_height, validators_hash_hex - ); - state.latest_validators_hash = validators_hash_hex; + return Err(e); } - state.latest_goat_block_height = goat_block_height; + // Advance state on success (or empty block) state.next_cosmos_block_height = state.next_cosmos_block_height.saturating_add(1); blocks_since_last_flush += 1; @@ -158,6 +187,7 @@ pub(crate) async fn sync_sequencer_set_hash_changes( ); } } + info!(finish_info); Ok(()) } From cc8e10fd67312dab5422df9d8bcceb8d64c23f78 Mon Sep 17 00:00:00 2001 From: Blake Date: Thu, 12 Feb 2026 14:05:26 +0800 Subject: [PATCH 4/5] feat: add Publisher actor to support sequencer set hash monitoring --- crates/bitvm2-ga/src/actors.rs | 1 + node/src/main.rs | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/bitvm2-ga/src/actors.rs b/crates/bitvm2-ga/src/actors.rs index 878d457e..7f664bb3 100644 --- a/crates/bitvm2-ga/src/actors.rs +++ b/crates/bitvm2-ga/src/actors.rs @@ -7,5 +7,6 @@ pub enum Actor { Operator, Challenger, Watchtower, + Publisher, All, } diff --git a/node/src/main.rs b/node/src/main.rs index 95c87270..77ca7190 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -121,11 +121,11 @@ async fn main() -> Result<(), Box> { } let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).try_init(); - let is_committee = actor == Actor::Committee || actor == Actor::All; + let is_publisher = actor == Actor::Publisher || actor == Actor::All; let enable_sequencer_set_hash_monitor = env::get_enable_sequencer_set_hash_monitor_from_env(); let sequencer_set_monitor_start_cosmos_block = env::get_sequencer_set_monitor_start_cosmos_block_from_env(); - if is_committee + if is_publisher && enable_sequencer_set_hash_monitor && sequencer_set_monitor_start_cosmos_block.is_none() { @@ -229,7 +229,7 @@ async fn main() -> Result<(), Box> { } })); - if enable_sequencer_set_hash_monitor && is_committee { + if enable_sequencer_set_hash_monitor && is_publisher { let start_cosmos_block = sequencer_set_monitor_start_cosmos_block.unwrap(); let cosmos_rpc_url = env::get_cosmos_rpc_url_from_env(); let cancel_token_clone = cancellation_token.clone(); From a8961b7dbb391c8614ba7dbdd6728b3aa4c0f1e6 Mon Sep 17 00:00:00 2001 From: Blake Date: Thu, 12 Feb 2026 14:15:17 +0800 Subject: [PATCH 5/5] feat: remove env for sequencer set hash monitor, then the publisher will be a separate process. --- node/src/env.rs | 8 -------- node/src/main.rs | 8 ++------ 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/node/src/env.rs b/node/src/env.rs index 91a754dc..56ddd95e 100644 --- a/node/src/env.rs +++ b/node/src/env.rs @@ -77,7 +77,6 @@ pub const DEFAULT_OPERATOR_PROOF_WAIT_SECS: usize = 60; pub const ENV_ALWAYS_CHALLENGE: &str = "ALWAYS_CHALLENGE"; pub const ENV_GENESIS_SEQUENCER_COMMIT_TXID: &str = "GENESIS_SEQUENCER_COMMIT_TXID"; -pub const ENV_ENABLE_SEQUENCER_SET_HASH_MONITOR: &str = "ENABLE_SEQUENCER_SET_HASH_MONITOR"; pub const ENV_SEQUENCER_SET_MONITOR_START_COSMOS_BLOCK: &str = "SEQUENCER_SET_MONITOR_START_COSMOS_BLOCK"; pub const ENV_COSMOS_RPC_URL: &str = "COSMOS_RPC_URL"; @@ -346,13 +345,6 @@ pub fn get_btc_url_from_env() -> Option { std::env::var(ENV_BTC_CHAIN_URL).ok() } -pub fn get_enable_sequencer_set_hash_monitor_from_env() -> bool { - match std::env::var(ENV_ENABLE_SEQUENCER_SET_HASH_MONITOR) { - Ok(value) => value.to_lowercase() == "true", - Err(_) => false, - } -} - pub fn get_sequencer_set_monitor_start_cosmos_block_from_env() -> Option { std::env::var(ENV_SEQUENCER_SET_MONITOR_START_COSMOS_BLOCK) .ok() diff --git a/node/src/main.rs b/node/src/main.rs index 77ca7190..a360ba8f 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -122,13 +122,9 @@ async fn main() -> Result<(), Box> { let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).try_init(); let is_publisher = actor == Actor::Publisher || actor == Actor::All; - let enable_sequencer_set_hash_monitor = env::get_enable_sequencer_set_hash_monitor_from_env(); let sequencer_set_monitor_start_cosmos_block = env::get_sequencer_set_monitor_start_cosmos_block_from_env(); - if is_publisher - && enable_sequencer_set_hash_monitor - && sequencer_set_monitor_start_cosmos_block.is_none() - { + if is_publisher && sequencer_set_monitor_start_cosmos_block.is_none() { return Err(anyhow::anyhow!( "sequencer_set_monitor_start_cosmos_block is required when sequencer set monitor is enabled" ) @@ -229,7 +225,7 @@ async fn main() -> Result<(), Box> { } })); - if enable_sequencer_set_hash_monitor && is_publisher { + if is_publisher { let start_cosmos_block = sequencer_set_monitor_start_cosmos_block.unwrap(); let cosmos_rpc_url = env::get_cosmos_rpc_url_from_env(); let cancel_token_clone = cancellation_token.clone();