Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bitvm2-ga/src/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ pub enum Actor {
Operator,
Challenger,
Watchtower,
Publisher,
All,
}
1 change: 1 addition & 0 deletions crates/cbft-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ tendermint-light-client-verifier = { workspace = true, default-features = false,
] }
tendermint-rpc = { workspace = true, features = ["http-client"] }
anyhow = { workspace = true }
async-trait = { workspace = true }
tracing.workspace = true

[dev-dependencies]
Expand Down
74 changes: 74 additions & 0 deletions crates/cbft-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,42 @@ use tendermint::validator::Info;
use tendermint_light_client_verifier::types::{LightBlock, PeerId, ValidatorSet};
use tendermint_rpc::{Client, HttpClient};

/// Abstraction over Cosmos RPC calls used by the sequencer set hash monitor.
/// Enables mock-based testing of sync logic without real RPC.
#[async_trait::async_trait]
pub trait CosmosRpcProvider: Send + Sync {
/// Fetch the latest block height from the Cosmos chain.
async fn get_latest_block_height(&self) -> Result<u64>;

/// Fetch validators_hash and goat block number for a given cosmos block.
/// Returns `None` for goat_block_number when the block has no CBFT tx data.
async fn get_validators_hash_and_goat_block(
&self,
cosmos_block_height: u64,
) -> Result<([u8; 32], Option<u64>)>;
}

/// Default implementation that delegates to real Cosmos RPC endpoints.
#[derive(Debug)]
pub struct DefaultCosmosRpcProvider {
pub cosmos_rpc_url: String,
}

#[async_trait::async_trait]
impl CosmosRpcProvider for DefaultCosmosRpcProvider {
async fn get_latest_block_height(&self) -> Result<u64> {
fetch_latest_cosmos_block_height(&self.cosmos_rpc_url).await
}

async fn get_validators_hash_and_goat_block(
&self,
cosmos_block_height: u64,
) -> Result<([u8; 32], Option<u64>)> {
fetch_cbft_block_validators_hash_and_goat_block(&self.cosmos_rpc_url, cosmos_block_height)
.await
}
}

#[tracing::instrument(level = "info")]
pub async fn fetch_validators(cosmos_rpc_url: &str, block_height: u64) -> Result<Vec<Info>> {
let rpc = HttpClient::new(cosmos_rpc_url).unwrap();
Expand All @@ -15,6 +51,44 @@ pub async fn fetch_validators(cosmos_rpc_url: &str, block_height: u64) -> Result
Ok(validators_response.validators)
}

#[tracing::instrument(level = "info")]
pub async fn fetch_latest_cosmos_block_height(cosmos_rpc_url: &str) -> Result<u64> {
let rpc = HttpClient::new(cosmos_rpc_url).unwrap();
let status = rpc.status().await.map_err(|e| anyhow!("Error fetching status: {e:?}"))?;
Ok(status.sync_info.latest_block_height.into())
}

/// Fetch the validators_hash and goat block number for a given cosmos block.
/// Returns `None` for goat_block_number when the block has no CBFT tx data
/// or the payload cannot be parsed (e.g. empty blocks).
#[tracing::instrument(level = "info")]
pub async fn fetch_cbft_block_validators_hash_and_goat_block(
cosmos_rpc_url: &str,
cosmos_block_height: u64,
) -> Result<([u8; 32], Option<u64>)> {
let rpc = HttpClient::new(cosmos_rpc_url).unwrap();
let block_data =
rpc.block(Height::try_from(cosmos_block_height).unwrap()).await.map_err(|e| {
anyhow!("Error fetching block data for height {cosmos_block_height}: {e:?}")
})?;

let validators_hash: [u8; 32] = block_data
.block
.header
.validators_hash
.as_bytes()
.try_into()
.map_err(|_| anyhow!("Invalid validators_hash length"))?;

if block_data.block.data.is_empty() {
return Ok((validators_hash, None));
}

let goat_block_number =
parse_cbft_tx_payload(&block_data.block.data[0]).map(|payload| payload.block_number);
Ok((validators_hash, goat_block_number))
}

// get cosmos block height from block hash: curl https://rpc.testnet3.goat.network/goat-rpc/block_by_hash?hash=0xd2e236b8f89278a527a042727cd4eebb59a566006a844f294da54ed727b95470
pub async fn get_cosmos_block_height_at(
cosmos_rpc_url: &str,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
CREATE TABLE IF NOT EXISTS sequencer_set_hash_changes
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
cosmos_block_height BIGINT NOT NULL UNIQUE,
goat_block_height BIGINT NOT NULL,
validators_hash TEXT NOT NULL,
created_at BIGINT NOT NULL DEFAULT 0,
updated_at BIGINT NOT NULL DEFAULT 0
);

CREATE INDEX IF NOT EXISTS idx_sequencer_set_hash_changes_goat_block_height
ON sequencer_set_hash_changes (goat_block_height);

CREATE TABLE IF NOT EXISTS sequencer_set_scan_state
(
id INTEGER PRIMARY KEY CHECK (id = 1),
next_cosmos_block_height BIGINT NOT NULL,
latest_goat_block_height BIGINT NOT NULL,
latest_validators_hash TEXT NOT NULL,
created_at BIGINT NOT NULL DEFAULT 0,
updated_at BIGINT NOT NULL DEFAULT 0
);
233 changes: 232 additions & 1 deletion crates/store/src/localdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use crate::utils::{QueryBuilder, QueryParam, create_place_holders};
use crate::{
BridgeOutGlobalStats, GoatTxRecord, Graph, GraphBtcTxVoutMonitor, GraphRawData, Instance,
LongRunningTaskProof, Message, Node, NodesOverview, OperatorProof, PeginGraphProcessData,
PeginInstanceProcessData, SerializableTxid, WatchContract, WatchtowerProof,
PeginInstanceProcessData, SequencerSetHashChange, SequencerSetScanState, SerializableTxid,
WatchContract, WatchtowerProof,
};

use indexmap::IndexMap;
Expand Down Expand Up @@ -2908,6 +2909,105 @@ impl<'a> StorageProcessor<'a> {
.await?;
Ok(res)
}

pub async fn upsert_sequencer_set_hash_change(
&mut self,
cosmos_block_height: i64,
goat_block_height: i64,
validators_hash: &str,
) -> anyhow::Result<u64> {
let current_time = get_current_timestamp_secs();
let res = sqlx::query(
r#"INSERT INTO sequencer_set_hash_changes (cosmos_block_height, goat_block_height, validators_hash, created_at, updated_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (cosmos_block_height) DO UPDATE
SET goat_block_height = excluded.goat_block_height,
validators_hash = excluded.validators_hash,
updated_at = excluded.updated_at"#,
)
.bind(cosmos_block_height)
.bind(goat_block_height)
.bind(validators_hash)
.bind(current_time)
.bind(current_time)
.execute(self.conn())
.await?;
Ok(res.rows_affected())
}

pub async fn find_latest_sequencer_set_hash_change(
&mut self,
) -> anyhow::Result<Option<SequencerSetHashChange>> {
let res = sqlx::query_as::<_, SequencerSetHashChange>(
"SELECT * FROM sequencer_set_hash_changes ORDER BY cosmos_block_height DESC LIMIT 1",
)
.fetch_optional(self.conn())
.await?;
Ok(res)
}

pub async fn find_first_sequencer_set_hash_change_by_goat_block_at_or_after(
&mut self,
goat_block_height: i64,
) -> anyhow::Result<Option<SequencerSetHashChange>> {
let res = sqlx::query_as::<_, SequencerSetHashChange>(
"SELECT * FROM sequencer_set_hash_changes WHERE goat_block_height >= ? ORDER BY goat_block_height ASC LIMIT 1",
)
.bind(goat_block_height)
.fetch_optional(self.conn())
.await?;
Ok(res)
}

pub async fn find_first_sequencer_set_hash_change_by_cosmos_block_at_or_after(
&mut self,
cosmos_block_height: i64,
) -> anyhow::Result<Option<SequencerSetHashChange>> {
let res = sqlx::query_as::<_, SequencerSetHashChange>(
"SELECT * FROM sequencer_set_hash_changes WHERE cosmos_block_height >= ? ORDER BY cosmos_block_height ASC LIMIT 1",
)
.bind(cosmos_block_height)
.fetch_optional(self.conn())
.await?;
Ok(res)
}

pub async fn get_sequencer_set_scan_state(
&mut self,
) -> anyhow::Result<Option<SequencerSetScanState>> {
let res = sqlx::query_as::<_, SequencerSetScanState>(
"SELECT * FROM sequencer_set_scan_state WHERE id = 1 LIMIT 1",
)
.fetch_optional(self.conn())
.await?;
Ok(res)
}

pub async fn upsert_sequencer_set_scan_state(
&mut self,
next_cosmos_block_height: i64,
latest_goat_block_height: i64,
latest_validators_hash: &str,
) -> anyhow::Result<u64> {
let current_time = get_current_timestamp_secs();
let res = sqlx::query(
r#"INSERT INTO sequencer_set_scan_state (id, next_cosmos_block_height, latest_goat_block_height, latest_validators_hash, created_at, updated_at)
VALUES (1, ?, ?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE
SET next_cosmos_block_height = excluded.next_cosmos_block_height,
latest_goat_block_height = excluded.latest_goat_block_height,
latest_validators_hash = excluded.latest_validators_hash,
updated_at = excluded.updated_at"#,
)
.bind(next_cosmos_block_height)
.bind(latest_goat_block_height)
.bind(latest_validators_hash)
.bind(current_time)
.bind(current_time)
.execute(self.conn())
.await?;
Ok(res.rows_affected())
}
}

// fn truncate_string(s: &str, max_len: usize) -> &str {
Expand All @@ -2919,3 +3019,134 @@ pub async fn create_local_db(db_path: &str) -> LocalDB {
local_db.migrate().await;
local_db
}

#[cfg(test)]
mod sequencer_set_tests {
use super::*;

async fn setup_db() -> LocalDB {
create_local_db("sqlite::memory:").await
}

#[tokio::test]
async fn test_upsert_and_find_latest_hash_change() {
let db = setup_db().await;
let mut s = db.acquire().await.unwrap();

s.upsert_sequencer_set_hash_change(100, 1000, "aabbcc").await.unwrap();
s.upsert_sequencer_set_hash_change(200, 2000, "ddeeff").await.unwrap();
s.upsert_sequencer_set_hash_change(150, 1500, "112233").await.unwrap();

let latest = s.find_latest_sequencer_set_hash_change().await.unwrap().unwrap();
assert_eq!(latest.cosmos_block_height, 200);
assert_eq!(latest.goat_block_height, 2000);
assert_eq!(latest.validators_hash, "ddeeff");
}

#[tokio::test]
async fn test_upsert_hash_change_conflict() {
let db = setup_db().await;
let mut s = db.acquire().await.unwrap();

s.upsert_sequencer_set_hash_change(100, 1000, "aabbcc").await.unwrap();
// Same cosmos_block_height, different data — should overwrite
s.upsert_sequencer_set_hash_change(100, 1001, "ddeeff").await.unwrap();

let latest = s.find_latest_sequencer_set_hash_change().await.unwrap().unwrap();
assert_eq!(latest.cosmos_block_height, 100);
assert_eq!(latest.goat_block_height, 1001);
assert_eq!(latest.validators_hash, "ddeeff");
}

#[tokio::test]
async fn test_find_by_goat_block_at_or_after() {
let db = setup_db().await;
let mut s = db.acquire().await.unwrap();

s.upsert_sequencer_set_hash_change(100, 1000, "aa").await.unwrap();
s.upsert_sequencer_set_hash_change(200, 2000, "bb").await.unwrap();
s.upsert_sequencer_set_hash_change(300, 3000, "cc").await.unwrap();

// Exact match
let r = s
.find_first_sequencer_set_hash_change_by_goat_block_at_or_after(2000)
.await
.unwrap()
.unwrap();
assert_eq!(r.goat_block_height, 2000);

// Between records — should return next one
let r = s
.find_first_sequencer_set_hash_change_by_goat_block_at_or_after(1500)
.await
.unwrap()
.unwrap();
assert_eq!(r.goat_block_height, 2000);

// Beyond all records — should return None
let r =
s.find_first_sequencer_set_hash_change_by_goat_block_at_or_after(4000).await.unwrap();
assert!(r.is_none());
}

#[tokio::test]
async fn test_find_by_cosmos_block_at_or_after() {
let db = setup_db().await;
let mut s = db.acquire().await.unwrap();

s.upsert_sequencer_set_hash_change(100, 1000, "aa").await.unwrap();
s.upsert_sequencer_set_hash_change(200, 2000, "bb").await.unwrap();

let r = s
.find_first_sequencer_set_hash_change_by_cosmos_block_at_or_after(150)
.await
.unwrap()
.unwrap();
assert_eq!(r.cosmos_block_height, 200);

let r =
s.find_first_sequencer_set_hash_change_by_cosmos_block_at_or_after(300).await.unwrap();
assert!(r.is_none());
}

#[tokio::test]
async fn test_scan_state_upsert_and_get() {
let db = setup_db().await;
let mut s = db.acquire().await.unwrap();

assert!(s.get_sequencer_set_scan_state().await.unwrap().is_none());

s.upsert_sequencer_set_scan_state(100, 1000, "aabbcc").await.unwrap();
let state = s.get_sequencer_set_scan_state().await.unwrap().unwrap();
assert_eq!(state.next_cosmos_block_height, 100);
assert_eq!(state.latest_goat_block_height, 1000);
assert_eq!(state.latest_validators_hash, "aabbcc");

// Update
s.upsert_sequencer_set_scan_state(200, 2000, "ddeeff").await.unwrap();
let state = s.get_sequencer_set_scan_state().await.unwrap().unwrap();
assert_eq!(state.next_cosmos_block_height, 200);
assert_eq!(state.latest_validators_hash, "ddeeff");
}

#[tokio::test]
async fn test_find_returns_none_when_empty() {
let db = setup_db().await;
let mut s = db.acquire().await.unwrap();

assert!(s.find_latest_sequencer_set_hash_change().await.unwrap().is_none());
assert!(
s.find_first_sequencer_set_hash_change_by_goat_block_at_or_after(0)
.await
.unwrap()
.is_none()
);
assert!(
s.find_first_sequencer_set_hash_change_by_cosmos_block_at_or_after(0)
.await
.unwrap()
.is_none()
);
assert!(s.get_sequencer_set_scan_state().await.unwrap().is_none());
}
}
Loading
Loading