From f4bd937d5ee19ca17e594f00386e1cf843a13e6c Mon Sep 17 00:00:00 2001 From: KSlashh <48985735+KSlashh@users.noreply.github.com> Date: Tue, 10 Feb 2026 00:02:08 +0800 Subject: [PATCH 1/2] refactor: graph monitor & some handles --- node/README.md | 1 - node/src/bin/send_pegout.rs | 10 +- node/src/handle.rs | 300 +-- .../graph_maintenance_tasks.rs | 1637 +++++++---------- node/src/scheduled_tasks/mod.rs | 6 +- node/src/utils.rs | 366 ++-- 6 files changed, 1085 insertions(+), 1235 deletions(-) diff --git a/node/README.md b/node/README.md index 84969cba..4a073417 100644 --- a/node/README.md +++ b/node/README.md @@ -696,7 +696,6 @@ flowchart TB GM2["detect_kickoff"] GM3["detect_take1_or_challenge"] GM4["process_graph_challenge"] - GM5["scan_obsolete_sibling_graphs"] GM1 -->|"KickoffReady"| GM2 GM2 -->|"Status update"| GM3 diff --git a/node/src/bin/send_pegout.rs b/node/src/bin/send_pegout.rs index aa1f21d8..68dfa901 100644 --- a/node/src/bin/send_pegout.rs +++ b/node/src/bin/send_pegout.rs @@ -177,16 +177,16 @@ async fn select_graph( let graph = storage_processor.find_graph(&graph_id).await?; if let Some(graph) = graph { if graph.operator_pubkey != operator_pubkey { - bail!("graph {} not owned by operator {}", graph_id, operator_pubkey); + bail!("graph {graph_id} not owned by operator {operator_pubkey}"); } if graph.status != GraphStatus::OperatorDataPushed.to_string() { bail!("graph {} status {} is not OperatorDataPushed", graph_id, graph.status); } if graph.init_withdraw_tx_hash.is_some() { - bail!("graph {} already initialized withdraw", graph_id); + bail!("graph {graph_id} already initialized withdraw"); } if !is_graph_ready_by_previous(&mut storage_processor, &graph).await? { - bail!("graph {} not ready due to previous graph status", graph_id); + bail!("graph {graph_id} not ready due to previous graph status"); } info!( "select_graph picked explicit graph {} kickoff_index {} amount {}", @@ -267,7 +267,7 @@ async fn init_withdraw_for_graph( let amount_u256 = U256::from(amount); if balance < amount_u256 { - bail!("insufficient pegBTC balance: need {}, available {}", amount, balance); + bail!("insufficient pegBTC balance: need {amount}, available {balance}"); } let pegin_data = goat_client.gateway_get_pegin_data(&graph.instance_id).await?; @@ -499,7 +499,7 @@ async fn wait_until_next_ready( } if waited >= max_wait_secs { - bail!("waited {}s but graph {} still not ready", max_wait_secs, graph_id); + bail!("waited {max_wait_secs}s but graph {graph_id} still not ready"); } sleep(Duration::from_secs(poll_interval_secs)).await; waited = waited.saturating_add(poll_interval_secs); diff --git a/node/src/handle.rs b/node/src/handle.rs index 1bbc7192..d3bbe884 100644 --- a/node/src/handle.rs +++ b/node/src/handle.rs @@ -2,6 +2,7 @@ use crate::action::*; use crate::env::{get_bitvm_key, get_network, get_node_goat_address, is_relayer}; use crate::error::SpecialError; use crate::middleware::AllBehaviours; +use crate::scheduled_tasks::graph_maintenance_tasks::ChallengeSubStatus; use crate::utils::*; use anyhow::{Result, anyhow, bail}; use bitcoin::hashes::Hash; @@ -413,7 +414,7 @@ pub async fn dispatch(ctx: &mut HandlerContext<'_>, content: &GOATMessageContent handle_take1_sent_default(ctx, *instance_id, *graph_id).await } (GOATMessageContent::Take2Ready(Take2Ready { instance_id, graph_id }), Actor::Operator) => { - handle_take2_ready_operator(ctx, *instance_id, *graph_id).await + handle_take2_ready_operator(ctx, *instance_id, *graph_id, content).await } (GOATMessageContent::Take2Sent(Take2Sent { instance_id, graph_id }), Actor::Committee) => { handle_take2_sent_committee(ctx, *instance_id, *graph_id, content).await @@ -575,7 +576,7 @@ async fn refresh_and_compensate( graph: Option<&Bitvm2Graph>, scan_from_status: Option, compensate_from_status: GraphStatus, -) -> Result { +) -> Result<(GraphStatus, Option)> { let (graph_status, sub_status) = refresh_graph( ctx.local_db, ctx.btc_client, @@ -600,7 +601,7 @@ async fn refresh_and_compensate( sub_status, ) .await?; - Ok(graph_status) + Ok((graph_status, sub_status)) } async fn get_graph_and_status( @@ -649,8 +650,8 @@ async fn refresh_graph_status( instance_id: Uuid, graph_id: Uuid, message: Option<&GOATMessage>, - status: GraphStatus, -) -> Result> { + compensate_from_status: GraphStatus, +) -> Result)>> { let (graph, graph_start_status) = match message { Some(message) => { match get_graph_and_status_or_defer(ctx, instance_id, graph_id, message).await? { @@ -660,16 +661,16 @@ async fn refresh_graph_status( } None => get_graph_and_status(ctx, instance_id, graph_id).await?, }; - refresh_and_compensate( + let (graph_status, sub_status) = refresh_and_compensate( ctx, instance_id, graph_id, Some(&graph), Some(graph_start_status), - status, + compensate_from_status, ) .await?; - Ok(Some(graph)) + Ok(Some((graph, graph_status, sub_status))) } #[tracing::instrument(level = "info", skip_all, fields(instance_id = %instance_id))] @@ -1915,11 +1916,11 @@ async fn handle_kickoff_sent_committee( ) -> Result<()> { // triggered by Kickoff tx // 1. update status - let graph = + let (graph, _graph_status, _graph_sub_status) = match refresh_graph_status(ctx, instance_id, graph_id, None, GraphStatus::OperatorKickOff) .await? { - Some(graph) => graph, + Some(v) => v, None => return Ok(()), }; if !is_relayer() { @@ -1981,7 +1982,7 @@ async fn handle_kickoff_sent_challenger( ) -> Result<()> { // triggered by Kickoff tx let message = make_message(ctx, content); - let graph = match refresh_graph_status( + let (graph, _graph_status, _graph_sub_status) = match refresh_graph_status( ctx, instance_id, graph_id, @@ -1990,7 +1991,7 @@ async fn handle_kickoff_sent_challenger( ) .await? { - Some(graph) => graph, + Some(v) => v, None => return Ok(()), }; // 1. check kickoff tx status on Bitcoin chain @@ -2079,7 +2080,7 @@ async fn handle_prekickoff_sent_challenger( ) -> Result<()> { // triggered by PreKickoff tx let message = make_message(ctx, content); - let graph = match refresh_graph_status( + let (graph, _graph_status, _graph_sub_status) = match refresh_graph_status( ctx, instance_id, graph_id, @@ -2088,7 +2089,7 @@ async fn handle_prekickoff_sent_challenger( ) .await? { - Some(graph) => graph, + Some(v) => v, None => return Ok(()), }; // 1. check the previous graph status @@ -2128,7 +2129,7 @@ async fn handle_prekickoff_sent_challenger( let prev_graph_start_status = get_graph_status(ctx.local_db, prev_instance_id, prev_graph_id) .await? .ok_or_else(|| anyhow!("Graph status not found for {prev_instance_id}:{prev_graph_id}"))?; - let prev_graph_status = refresh_and_compensate( + let (prev_graph_status, _prev_graph_sub_status) = refresh_and_compensate( ctx, prev_instance_id, prev_graph_id, @@ -2169,7 +2170,7 @@ async fn handle_challenge_sent_operator( ) -> Result<()> { // triggered by Challenge tx let message = make_message(ctx, content); - let mut graph = match refresh_graph_status( + let (mut graph, _graph_status, _graph_sub_status) = match refresh_graph_status( ctx, instance_id, graph_id, @@ -2178,7 +2179,7 @@ async fn handle_challenge_sent_operator( ) .await? { - Some(graph) => graph, + Some(v) => v, None => return Ok(()), }; // 1. check the challenge tx status on Bitcoin chain @@ -2248,19 +2249,24 @@ async fn handle_watchtower_challenge_init_sent_watchtower( ) -> Result<()> { // triggered by WatchtowerChallengeInit tx let message = make_message(ctx, content); - let graph = match get_graph_or_defer( - ctx.swarm, - ctx.local_db, - ctx.goat_client, + let (graph, graph_status, _graph_sub_status) = match refresh_graph_status( + ctx, instance_id, graph_id, - &message, + Some(&message), + GraphStatus::Challenge, ) .await? { - Some(g) => g, + Some(v) => v, None => return Ok(()), }; + if graph_status != GraphStatus::Challenge { + tracing::warn!( + "Ignore WatchtowerChallengeInitSent for {instance_id}:{graph_id}: graph status is {graph_status:?}" + ); + return Ok(()); + } let watchtower_keypair = WatchtowerMasterKey::new(get_bitvm_key()?).master_keypair(); let node_index = match graph .parameters @@ -2276,7 +2282,6 @@ async fn handle_watchtower_challenge_init_sent_watchtower( return Ok(()); } }; - let graph = Bitvm2Graph::from_simplified(&graph)?; let watchtower_challenge_init_txid = graph.watchtower_challenge_init.tx().compute_txid(); if !tx_on_chain(ctx.btc_client, &watchtower_challenge_init_txid).await? { tracing::warn!( @@ -2354,20 +2359,24 @@ async fn handle_watchtower_challenge_sent_operator( // triggered by WatchtowerChallenge tx let message = make_message(ctx, content); // 1. check the watchtower-challenge tx status on Bitcoin chain, if watchtower challenge tx is confirmed, sign & broadcast operator-ack txn - let graph = match get_graph_or_defer( - ctx.swarm, - ctx.local_db, - ctx.goat_client, + let (mut graph, graph_status, _graph_sub_status) = match refresh_graph_status( + ctx, instance_id, graph_id, - &message, + Some(&message), + GraphStatus::Challenge, ) .await? { - Some(g) => g, + Some(v) => v, None => return Ok(()), }; - let mut graph = Bitvm2Graph::from_simplified(&graph)?; + if graph_status != GraphStatus::Challenge { + tracing::warn!( + "Ignore WatchtowerChallengeSent for {instance_id}:{graph_id}: graph status is {graph_status:?}" + ); + return Ok(()); + } let watchtower_challenge_init_txid = graph.watchtower_challenge_init.tx().compute_txid(); let operator_master_key = OperatorMasterKey::new(get_bitvm_key()?); let operator_graph_keypair = operator_master_key.master_keypair(); @@ -2421,20 +2430,24 @@ async fn handle_watchtower_challenge_timeout_operator( ) -> Result<()> { // triggered by timeout task let message = make_message(ctx, content); - let graph = match get_graph_or_defer( - ctx.swarm, - ctx.local_db, - ctx.goat_client, + let (mut graph, graph_status, _graph_sub_status) = match refresh_graph_status( + ctx, instance_id, graph_id, - &message, + Some(&message), + GraphStatus::Challenge, ) .await? { - Some(g) => g, + Some(v) => v, None => return Ok(()), }; - let mut graph = Bitvm2Graph::from_simplified(&graph)?; + if graph_status != GraphStatus::Challenge { + tracing::warn!( + "Ignore WatchtowerChallengeTimeout for {instance_id}:{graph_id}: graph status is {graph_status:?}" + ); + return Ok(()); + } let watchtower_challenge_init_txid = graph.watchtower_challenge_init.tx().compute_txid(); let watchtower_challenge_init_height = match ctx .btc_client @@ -2521,20 +2534,24 @@ async fn handle_operator_ack_timeout_challenger( ) -> Result<()> { // triggered by timeout task let message = make_message(ctx, content); - let graph = match get_graph_or_defer( - ctx.swarm, - ctx.local_db, - ctx.goat_client, + let (graph, graph_status, _graph_sub_status) = match refresh_graph_status( + ctx, instance_id, graph_id, - &message, + Some(&message), + GraphStatus::Challenge, ) .await? { - Some(g) => g, + Some(v) => v, None => return Ok(()), }; - let graph = Bitvm2Graph::from_simplified(&graph)?; + if graph_status != GraphStatus::Challenge { + tracing::warn!( + "Ignore AckTimeout for {instance_id}:{graph_id}: graph status is {graph_status:?}" + ); + return Ok(()); + } let watchtower_challenge_init_txid = graph.watchtower_challenge_init.tx().compute_txid(); let connector_f_vout = 1 + 2 * graph.parameters.watchtower_pubkeys.len() as u64; if outpoint_spent_txid(ctx.btc_client, &watchtower_challenge_init_txid, connector_f_vout) @@ -2614,20 +2631,24 @@ async fn handle_operator_commit_blockhash_ready_operator( ) -> Result<()> { // triggered by timeout task let message = make_message(ctx, content); - let graph = match get_graph_or_defer( - ctx.swarm, - ctx.local_db, - ctx.goat_client, + let (mut graph, graph_status, _graph_sub_status) = match refresh_graph_status( + ctx, instance_id, graph_id, - &message, + Some(&message), + GraphStatus::Challenge, ) .await? { - Some(g) => g, + Some(v) => v, None => return Ok(()), }; - let mut graph = Bitvm2Graph::from_simplified(&graph)?; + if graph_status != GraphStatus::Challenge { + tracing::warn!( + "Ignore CommitBlockHashReady for {instance_id}:{graph_id}: graph status is {graph_status:?}" + ); + return Ok(()); + } // 1. check that all WatchtowerChallenge Connectors are spent let largest_watchtower_challenge_block_hash = match get_largest_watchtower_challenge_block(&graph, ctx.btc_client).await { @@ -2680,20 +2701,24 @@ async fn handle_operator_commit_blockhash_timeout_challenger( ) -> Result<()> { // triggered by timeout task let message = make_message(ctx, content); - let graph = match get_graph_or_defer( - ctx.swarm, - ctx.local_db, - ctx.goat_client, + let (graph, graph_status, _graph_sub_status) = match refresh_graph_status( + ctx, instance_id, graph_id, - &message, + Some(&message), + GraphStatus::Challenge, ) .await? { - Some(g) => g, + Some(v) => v, None => return Ok(()), }; - let graph = Bitvm2Graph::from_simplified(&graph)?; + if graph_status != GraphStatus::Challenge { + tracing::warn!( + "Ignore CommitBlockHashTimeout for {instance_id}:{graph_id}: graph status is {graph_status:?}" + ); + return Ok(()); + } let watchtower_challenge_init_txid = graph.watchtower_challenge_init.tx().compute_txid(); let connector_f_vout = 1 + 2 * graph.parameters.watchtower_pubkeys.len() as u64; if outpoint_spent_txid(ctx.btc_client, &watchtower_challenge_init_txid, connector_f_vout) @@ -2768,20 +2793,40 @@ async fn handle_assert_init_ready_operator( ) -> Result<()> { // triggered by timeout task let message = make_message(ctx, content); - let graph = match get_graph_or_defer( - ctx.swarm, - ctx.local_db, - ctx.goat_client, + let (mut graph, graph_status, graph_sub_status) = match refresh_graph_status( + ctx, instance_id, graph_id, - &message, + Some(&message), + GraphStatus::Challenge, ) .await? { - Some(g) => g, + Some(v) => v, None => return Ok(()), }; - let mut graph = Bitvm2Graph::from_simplified(&graph)?; + if graph_status != GraphStatus::Challenge { + tracing::warn!( + "Ignore AssertInitReady for {instance_id}:{graph_id}: graph status is {graph_status:?}" + ); + return Ok(()); + } + match graph_sub_status { + Some(sub_status) => { + if !sub_status.is_watchtower_challenge_normal_finished() { + tracing::warn!( + "Ignore AssertInitReady for {instance_id}:{graph_id}: watchtower challenge not finished yet, sub-status {sub_status:?}" + ); + return Ok(()); + } + } + None => { + tracing::warn!( + "Ignore AssertInitReady for {instance_id}:{graph_id}: graph sub-status is None" + ); + return Ok(()); + } + } let operator_master_key = OperatorMasterKey::new(get_bitvm_key()?); let operator_graph_keypair = operator_master_key.master_keypair(); let assert_init_txid = graph.assert_init.tx().compute_txid(); @@ -2797,31 +2842,6 @@ async fn handle_assert_init_ready_operator( // } // 1. sign & broadcast assert-init txn if !tx_on_chain(ctx.btc_client, &assert_init_txid).await? { - let watchtower_challenge_init_txid = graph.watchtower_challenge_init.tx().compute_txid(); - let watchtower_challenge_init_height = match ctx - .btc_client - .get_tx_status(&watchtower_challenge_init_txid) - .await? - .block_height - { - Some(height) => height, - None => { - tracing::warn!( - "Ignore AssertInitReady for {instance_id}:{graph_id}: watchtower challenge init tx not confirmed yet" - ); - return Ok(()); - } - }; - let current_height = ctx.btc_client.get_height().await?; - if current_height - < watchtower_challenge_init_height - + watchtower_challenge_timeout_timelock(get_network()) - { - tracing::warn!( - "Ignore AssertInitReady for {instance_id}:{graph_id}: watchtower challenge not finished yet" - ); - return Ok(()); - } let assert_init_tx = operator_sign_assert_init(operator_graph_keypair, &mut graph)?; let anchor_vout = assert_init_tx.output.len() as u64 - 1; let assert_init_tx_total_input_amount = @@ -2895,20 +2915,24 @@ async fn handle_assert_commit_timeout_challenger( ) -> Result<()> { // triggered by timeout task let message = make_message(ctx, content); - let graph = match get_graph_or_defer( - ctx.swarm, - ctx.local_db, - ctx.goat_client, + let (graph, graph_status, _graph_sub_status) = match refresh_graph_status( + ctx, instance_id, graph_id, - &message, + Some(&message), + GraphStatus::Challenge, ) .await? { - Some(g) => g, + Some(v) => v, None => return Ok(()), }; - let graph = Bitvm2Graph::from_simplified(&graph)?; + if graph_status != GraphStatus::Challenge { + tracing::warn!( + "Ignore AssertCommitTimeout for {instance_id}:{graph_id}: graph status is {graph_status:?}" + ); + return Ok(()); + } let assert_init_txid = graph.assert_init.tx().compute_txid(); let connector_d_vout = graph.assert_commit_timeout_txns.len() as u64; if outpoint_spent_txid(ctx.btc_client, &assert_init_txid, connector_d_vout).await?.is_some() { @@ -2994,20 +3018,24 @@ async fn handle_disprove_ready_challenger( ) -> Result<()> { // triggered by AssertCommit tx or OperatorCommitBlockHash tx let message = make_message(ctx, content); - let graph = match get_graph_or_defer( - ctx.swarm, - ctx.local_db, - ctx.goat_client, + let (graph, graph_status, _graph_sub_status) = match refresh_graph_status( + ctx, instance_id, graph_id, - &message, + Some(&message), + GraphStatus::Challenge, ) .await? { - Some(g) => g, + Some(v) => v, None => return Ok(()), }; - let graph = Bitvm2Graph::from_simplified(&graph)?; + if graph_status != GraphStatus::Challenge { + tracing::warn!( + "Ignore DisproveReady for {instance_id}:{graph_id}: graph status is {graph_status:?}" + ); + return Ok(()); + } // 1. get assertions committed by Operator from Bitcoin chain let operator_commit_blockhash_txin = { let watchtower_challenge_init_txid = graph.watchtower_challenge_init.tx().compute_txid(); @@ -3133,10 +3161,16 @@ async fn handle_disprove_sent_committee( // triggered by Disprove tx // 1. update graph status let message = make_message(ctx, content); - let graph = match refresh_graph_status(ctx, instance_id, graph_id, None, GraphStatus::Disprove) - .await? + let (graph, _graph_status, _graph_sub_status) = match refresh_graph_status( + ctx, + instance_id, + graph_id, + None, + GraphStatus::Disprove, + ) + .await? { - Some(graph) => graph, + Some(v) => v, None => return Ok(()), }; if !is_relayer() { @@ -3310,20 +3344,24 @@ async fn handle_take1_ready_operator( ) -> Result<()> { // triggered by timeout task let message = make_message(ctx, content); - let graph = match get_graph_or_defer( - ctx.swarm, - ctx.local_db, - ctx.goat_client, + let (mut graph, graph_status, _graph_sub_status) = match refresh_graph_status( + ctx, instance_id, graph_id, - &message, + Some(&message), + GraphStatus::Challenge, ) .await? { - Some(g) => g, + Some(v) => v, None => return Ok(()), }; - let mut graph = Bitvm2Graph::from_simplified(&graph)?; + if graph_status != GraphStatus::Challenge { + tracing::warn!( + "Ignore Take1Ready for {instance_id}:{graph_id}: graph status is {graph_status:?}" + ); + return Ok(()); + } let kickoff_txid = graph.kickoff.tx().compute_txid(); let connector_a_vout = 0; let guardian_connector_vout = 4; @@ -3376,11 +3414,11 @@ async fn handle_take1_sent_committee( // triggered by Take1 tx // 1. update graph status let message = make_message(ctx, content); - let graph = + let (graph, _graph_status, _graph_sub_status) = match refresh_graph_status(ctx, instance_id, graph_id, None, GraphStatus::OperatorTake1) .await? { - Some(graph) => graph, + Some(v) => v, None => return Ok(()), }; if !is_relayer() { @@ -3461,12 +3499,28 @@ async fn handle_take2_ready_operator( ctx: &mut HandlerContext<'_>, instance_id: Uuid, graph_id: Uuid, + content: &GOATMessageContent, ) -> Result<()> { // triggered by timeout task - let graph = get_graph(ctx.local_db, instance_id, graph_id) - .await? - .ok_or_else(|| anyhow!("Graph not found for {instance_id}:{graph_id}"))?; - let mut graph = Bitvm2Graph::from_simplified(&graph)?; + let message = make_message(ctx, content); + let (mut graph, graph_status, _graph_sub_status) = match refresh_graph_status( + ctx, + instance_id, + graph_id, + Some(&message), + GraphStatus::Challenge, + ) + .await? + { + Some(v) => v, + None => return Ok(()), + }; + if graph_status != GraphStatus::Challenge { + tracing::warn!( + "Ignore Take2Ready for {instance_id}:{graph_id}: graph status is {graph_status:?}" + ); + return Ok(()); + } let kickoff_txid = graph.kickoff.tx().compute_txid(); let watchtower_challenge_init_txid = graph.watchtower_challenge_init.tx().compute_txid(); let assert_init_txid = graph.assert_init.tx().compute_txid(); @@ -3554,11 +3608,11 @@ async fn handle_take2_sent_committee( // triggered by Take2 tx // 1. update graph status let message = make_message(ctx, content); - let graph = + let (graph, _graph_status, _graph_sub_status) = match refresh_graph_status(ctx, instance_id, graph_id, None, GraphStatus::OperatorTake2) .await? { - Some(graph) => graph, + Some(v) => v, None => return Ok(()), }; if !is_relayer() { diff --git a/node/src/scheduled_tasks/graph_maintenance_tasks.rs b/node/src/scheduled_tasks/graph_maintenance_tasks.rs index a0147c5d..4ae28f69 100644 --- a/node/src/scheduled_tasks/graph_maintenance_tasks.rs +++ b/node/src/scheduled_tasks/graph_maintenance_tasks.rs @@ -18,10 +18,10 @@ use bitvm2_lib::operator::{ take1_timelock, take2_timelocks, watchtower_challenge_timeout_timelock, }; use client::btc_chain::BTCClient; -use client::goat_chain::{DisproveTxType, GOATClient}; +use client::goat_chain::DisproveTxType; use indexmap::IndexMap; use serde::{Deserialize, Serialize}; -use store::localdb::{GraphUpdate, LocalDB, StorageProcessor}; +use store::localdb::{LocalDB, StorageProcessor}; use store::{ GoatTxProcessingStatus, GoatTxType, Graph, GraphBtcTxVoutMonitor, GraphStatus, SerializableTxid, }; @@ -30,8 +30,8 @@ use tracing::{info, trace, warn}; use uuid::Uuid; const CONNECTOR_G_MARGIN: i64 = 3; -const CONNECTOR_D_MARGIN: u64 = 2; -const CONNECTOR_F_MARGIN: u64 = 2; +// const CONNECTOR_D_MARGIN: u64 = 2; +// const CONNECTOR_F_MARGIN: u64 = 2; const CONNECTOR_GUARDIAN_MARGIN: u64 = 2; const MONITE_BTC_TX_NAME_KICKOFF: &str = "kickoff"; @@ -85,9 +85,9 @@ enum OperatorWithdrawType { pub enum CommitBlockHashStatus { #[default] None, - WatchtowerChallengeProcessed, - OperatorCommit, - OperatorCommitTimeout, + WatchtowerChallengeProcessed, // watchtower challenge processed, but commit blockhash not detected yet + OperatorCommit, // commit blockhash detected + OperatorCommitTimeout, // commit blockhash not detected, but timelock expired } #[derive( @@ -96,9 +96,9 @@ pub enum CommitBlockHashStatus { pub enum AssertCommitStatus { #[default] None, - OperatorInit, - OperatorCommit, - OperatorCommitTimeout, + OperatorInit, // assert init vout detected, but some assert commit not detected yet + OperatorCommit, // all assert commit sent + OperatorCommitTimeout, // some assert commit not sent, but timelock expired } #[derive( @@ -107,10 +107,10 @@ pub enum AssertCommitStatus { pub enum WatchtowerChallengeStatus { #[default] None, - OperatorInit, - WatchtowerChallenge, // all Watchtower challenge - WatchtowerChallengeTimeout, // Some Watchtower did not challenge, and timelock expired - OperatorACKTimeout, // Operator did not send ACK for some Watchtower, and timelock expired + OperatorInit, // watchtower init detected, but no challenge detected yet + WatchtowerChallenge, // Some Watchtower challenge, and timelock not expired + WatchtowerChallengeTimeout, // Some Watchtower did not challenge, and timelock expired + OperatorACKTimeout, // Operator did not send ACK for some Watchtower, and timelock expired WatchtowerChallengeNormalFinished, // Normal Finished WatchtowerChallengeDisproveFinished, // Disproved Finished } @@ -161,7 +161,7 @@ pub struct WTInitTxVoutMonitorData { pub data_map: IndexMap, pub require_disproved_indexes: Vec, pub commit_blockhash_status: CommitBlockHashStatus, - pub is_challenge_timeout_sent: bool, + pub is_challenge_timeout_sent: bool, // deprecated } impl WTInitTxVoutMonitorData { @@ -183,6 +183,7 @@ impl WTInitTxVoutMonitorData { txid: &Txid, input_challenge_timeout_txids: &[SerializableTxid], nack_txids: &[SerializableTxid], + commit_blockhash_timeout_txid: &SerializableTxid, ) -> anyhow::Result<(Vec<(usize, Txid)>, Vec<(usize, Txid)>, Vec<(usize, Txid)>)> { let mut challenge_txids: Vec<(usize, Txid)> = Vec::new(); let mut challenge_timeout_txids: Vec<(usize, Txid)> = Vec::new(); @@ -214,6 +215,23 @@ impl WTInitTxVoutMonitorData { } } } + let connector_g_vout = self.data_map.len() as i32 * 2; + if matches!( + self.commit_blockhash_status, + CommitBlockHashStatus::None | CommitBlockHashStatus::WatchtowerChallengeProcessed + ) && let Some(spend_txid) = + outpoint_spent_txid(btc_client, txid, connector_g_vout as u64).await? + { + if commit_blockhash_timeout_txid.0 == spend_txid { + self.commit_blockhash_status = CommitBlockHashStatus::OperatorCommitTimeout; + } else { + self.commit_blockhash_status = CommitBlockHashStatus::OperatorCommit; + } + } else if self.commit_blockhash_status == CommitBlockHashStatus::None + && self.is_commit_blockhash_ready() + { + self.commit_blockhash_status = CommitBlockHashStatus::WatchtowerChallengeProcessed; + } Ok((challenge_txids, challenge_timeout_txids, ack_txids)) } @@ -253,22 +271,13 @@ impl WTInitTxVoutMonitorData { } pub fn get_challenge_timeout_process_desc(&self) -> (usize, usize) { - if self.is_challenge_timeout_sent { - ( - self.data_map.len() - - self - .data_map - .iter() - .filter(|(_, v)| { - **v == WatchtowerChallengeItemStatus::Challenge - || **v == WatchtowerChallengeItemStatus::OperatorACK - }) - .count(), - self.data_map.len(), - ) - } else { - (0, self.data_map.len()) - } + ( + self.data_map + .iter() + .filter(|(_, v)| **v == WatchtowerChallengeItemStatus::ChallengeTimeout) + .count(), + self.data_map.len(), + ) } pub fn get_commit_block_hash_desc(&self) -> (usize, usize) { @@ -296,17 +305,12 @@ impl WTInitTxVoutMonitorData { .filter(|(_, v)| { **v == WatchtowerChallengeItemStatus::Challenge || **v == WatchtowerChallengeItemStatus::OperatorACK + || **v == WatchtowerChallengeItemStatus::OperatorNACK }) .count(), ) } - #[allow(dead_code)] - pub fn is_challenged(&self) -> bool { - !self.require_disproved_indexes.is_empty() - || self.commit_blockhash_status == CommitBlockHashStatus::OperatorCommitTimeout - } - pub fn check_watchtower_challenge_normal_finished(&self) -> bool { self.data_map.values().all(|status| { matches!( @@ -314,7 +318,14 @@ impl WTInitTxVoutMonitorData { WatchtowerChallengeItemStatus::OperatorACK | WatchtowerChallengeItemStatus::ChallengeTimeout ) - }) + }) && self.commit_blockhash_status == CommitBlockHashStatus::OperatorCommit + } + + pub fn is_commit_blockhash_processed(&self) -> bool { + matches!( + self.commit_blockhash_status, + CommitBlockHashStatus::OperatorCommit | CommitBlockHashStatus::OperatorCommitTimeout + ) } pub fn is_commit_blockhash_ready(&self) -> bool { @@ -327,6 +338,11 @@ impl WTInitTxVoutMonitorData { ) }) } + + pub fn is_disproved(&self) -> bool { + self.data_map.values().any(|status| *status == WatchtowerChallengeItemStatus::OperatorNACK) + || self.commit_blockhash_status == CommitBlockHashStatus::OperatorCommitTimeout + } } /// Assert init tx vout item status @@ -397,6 +413,23 @@ impl AssertInitTxVoutMonitorData { } } } + + fn get_require_disproved_string(&self) -> String { + format!( + "[{}]", + self.require_disproved_indexes + .iter() + .map(|v| v.to_string()) + .collect::>() + .join("_") + ) + } + + fn is_disproved(&self) -> bool { + self.data_map + .values() + .any(|status| *status == AssertCommitItemStatus::OperatorCommitTimeout) + } } /// Parse monitor data from JSON string @@ -408,13 +441,6 @@ where .map_err(|e| anyhow::anyhow!("Failed to parse monitor data: {e}")) } -#[allow(dead_code)] -pub async fn get_initialized_graphs(goat_client: &GOATClient) -> anyhow::Result> { - // call L2 contract : getInitializedInstanceIds - // returns Vec<(instance_id, graph_id)> - goat_client.gateway_get_initialized_ids().await -} - pub async fn get_user_init_withdraw_graphs<'a>( storage_processor: &mut StorageProcessor<'a>, ) -> anyhow::Result> { @@ -427,7 +453,7 @@ pub async fn get_user_init_withdraw_graphs<'a>( Ok(goat_tx_records.iter().map(|v| (v.instance_id, v.graph_id)).collect()) } -// tick_task1 +/// may trigger: KickoffReady pub async fn detect_init_withdraw_call(local_db: &LocalDB) -> anyhow::Result<()> { trace!("start tick action: detect_init_withdraw_call"); let graphs = { @@ -474,6 +500,7 @@ pub async fn detect_init_withdraw_call(local_db: &LocalDB) -> anyhow::Result<()> Ok(()) } +/// may trigger: KickoffSent pub async fn detect_kickoff(local_db: &LocalDB, btc_client: &BTCClient) -> anyhow::Result<()> { trace!("start tick action: detect_kickoff"); let graphs = { @@ -521,6 +548,7 @@ pub async fn detect_kickoff(local_db: &LocalDB, btc_client: &BTCClient) -> anyho Ok(()) } +/// may trigger: Take1Ready, Take1Sent, ChallengeSent pub async fn detect_take1_or_challenge( local_db: &LocalDB, btc_client: &BTCClient, @@ -549,6 +577,7 @@ pub async fn detect_take1_or_challenge( ); continue; } + // process_kickoff_graph may trigger Take1Ready, Take1Sent or ChallengeSent if let Some((actor, message_content)) = process_kickoff_graph(btc_client, local_db, &graph, lock_blocks, current_height).await? { @@ -594,17 +623,23 @@ pub async fn process_graph_challenge( Ok(sub_status) => sub_status, Err(_) => { warn!( - "failed to deserialize sub_status at graph:{}, {}", + "failed to deserialize sub_status at graph:{}, reset to default, old sub_status {}", graph.graph_id, graph.sub_status ); - continue; + ChallengeSubStatus::default() } }; - if !sub_status.is_disproved() && !sub_status.is_normal_finished() { + let mut is_watchtower_challenge_normal_finished = + sub_status.is_watchtower_challenge_normal_finished(); + let mut is_assert_commit_normal_finished = sub_status.is_assert_commit_normal_finished(); + let mut is_all_commit_normal_finished = + is_watchtower_challenge_normal_finished && is_assert_commit_normal_finished; + if !sub_status.is_disproved() && !is_all_commit_normal_finished { trace!("graph:{} is not disproved", graph.graph_id); - if !sub_status.is_watchtower_challenge_normal_finished() { + if !is_watchtower_challenge_normal_finished { info!("graph:{} watchtower challenge is processing", graph.graph_id); - process_watchtower_challenge_monitoring( + // process_watchtower_challenge_monitoring may trigger: WatchtowerChallengeSent, WatchtowerChallengeTimeout, OperatorAckTimeout, DisproveSent(OperatorCommitTimeout/OperatorNack), OperatorCommitBlockHashReady, OperatorCommitBlockHashTimeout + is_watchtower_challenge_normal_finished = process_watchtower_challenge_monitoring( btc_client, local_db, &graph, @@ -612,9 +647,27 @@ pub async fn process_graph_challenge( current_height, ) .await?; - } else if !sub_status.is_assert_commit_normal_finished() { + } + if is_watchtower_challenge_normal_finished && !is_assert_commit_normal_finished { info!("graph:{} assert commit is processing", graph.graph_id); - process_assert_commit_monitoring( + // upsert AssertInitReady message whenever watchtower challenge is finished normally, repeated inserts are idempotent + upsert_message( + &mut local_db.acquire().await?, + false, + graph.graph_id, + None, + "self".to_string(), + Actor::Operator, + GOATMessageContent::AssertInitReady(AssertInitReady { + instance_id: graph.instance_id, + graph_id: graph.graph_id, + }), + 0, + 0, + ) + .await?; + // process_assert_commit_monitoring may trigger: DisproveSent(AssertTimeout), AssertCommitTimeout + is_assert_commit_normal_finished = process_assert_commit_monitoring( btc_client, local_db, &graph, @@ -623,8 +676,13 @@ pub async fn process_graph_challenge( ) .await?; } - } else if sub_status.is_normal_finished() { + is_all_commit_normal_finished = + is_watchtower_challenge_normal_finished && is_assert_commit_normal_finished; + } + if !sub_status.is_disproved() && is_all_commit_normal_finished { let mut storage_processor = local_db.acquire().await?; + info!("graph:{} watchtower challenge and assert commit is finished", graph.graph_id); + // upsert DisproveReady whenever both watchtower-challenge and assert is finished normally, repeated inserts are idempotent upsert_message( &mut storage_processor, false, @@ -640,7 +698,7 @@ pub async fn process_graph_challenge( 0, ) .await?; - info!("graph:{} watchtower challenge and assert commit is finished", graph.graph_id); + // detect_take2 may return: Take2Ready, Take2Sent, DisproveSent(Disprove) if let Some((actor, message_content)) = detect_take2(btc_client, local_db, &graph, current_height).await? { @@ -658,41 +716,12 @@ pub async fn process_graph_challenge( ) .await?; } - } else { - info!("graph:{} is disproved, waiting dispove tx sent", graph.graph_id) } - process_graph_watchtower_assert_disproved(btc_client, local_db, &graph, &mut sub_status) - .await?; } Ok(()) } -/// Handle Challenge transaction detection -async fn handle_challenge_detected( - local_db: &LocalDB, - graph_id: Uuid, - challenge_txid: Txid, -) -> anyhow::Result<()> { - info!( - "handle_challenge_detected for graph_id: {graph_id}, challenge_txid: {}", - challenge_txid.to_string() - ); - - let sub_status = serde_json::to_string(&ChallengeSubStatus::default())?; - let mut storage_processor = local_db.acquire().await?; - storage_processor - .update_graph( - &GraphUpdate::new(graph_id) - // .with_status(GraphStatus::Challenge.to_string()) - .with_challenge_txid(challenge_txid.into()) - .with_sub_status(sub_status), - ) - .await?; - info!("successfully updated graph_id: {graph_id} to challenge status"); - Ok(()) -} - /// Check if Take1Ready Take2Ready message needs to be sent async fn check_operator_withdraw_ready_condition( btc_client: &BTCClient, @@ -719,16 +748,23 @@ async fn check_operator_withdraw_ready_condition( return Ok(false); } }; + let txid_serial: SerializableTxid = txid.into(); let mut storage_processor = local_db.acquire().await?; + let existing = + storage_processor.find_graph_btc_tx_vout_monitor(&graph_id, &txid_serial).await?; + let (monitor_data, created_at, tx_name_to_use) = match existing { + Some(existing) => (existing.monitor_data, existing.created_at, existing.tx_name), + None => ("".to_string(), current_times, tx_name.clone()), + }; storage_processor .upsert_graph_btc_tx_vout_monitor(&GraphBtcTxVoutMonitor { graph_id, - tx_name, - txid: txid.into(), + tx_name: tx_name_to_use, + txid: txid_serial, height, vout_len, - monitor_data: "".to_string(), - created_at: current_times, + monitor_data, + created_at, updated_at: current_times, }) .await?; @@ -750,6 +786,7 @@ async fn check_operator_withdraw_ready_condition( } /// Process graph data in KickOff status +/// may return: Take1Ready, Take1Sent, ChallengeSent async fn process_kickoff_graph( btc_client: &BTCClient, local_db: &LocalDB, @@ -761,6 +798,7 @@ async fn process_kickoff_graph( let (kickoff_txid, take1_txid) = match (graph.kickoff_txid.clone(), graph.take1_txid.clone()) { (Some(kickoff), Some(take1)) => (kickoff.into(), take1.into()), _ => { + // NOTE: revert to previous status? warn!("process_kickoff_graph graph_id:{}, kickoff or take1 is none", graph.graph_id); return Ok(None); } @@ -828,7 +866,6 @@ async fn process_kickoff_graph( spent_txid.to_string() ); // Challenge was sent - handle_challenge_detected(local_db, graph.graph_id, spent_txid).await?; Ok(Some(( Actor::Operator, GOATMessageContent::ChallengeSent(ChallengeSent { @@ -840,595 +877,302 @@ async fn process_kickoff_graph( } } -pub async fn scan_obsolete_sibling_graphs(local_db: &LocalDB) -> anyhow::Result<()> { - trace!("scan_obsolete_sibling_graphs"); - let mut tx = local_db.start_transaction().await?; - let mut tx_records = tx - .get_goat_tx_record_by_processing_status( - &GoatTxType::WithdrawHappyPath.to_string(), - &GoatTxProcessingStatus::Pending.to_string(), - ) - .await?; - info!("scan_obsolete_sibling_graphs: take1 finished recently:{}", tx_records.len()); - let mut unhappy_path_records = tx - .get_goat_tx_record_by_processing_status( - &GoatTxType::WithdrawUnhappyPath.to_string(), - &GoatTxProcessingStatus::Pending.to_string(), - ) - .await?; - info!("scan_obsolete_sibling_graphs: take2 finished recently:{}", unhappy_path_records.len()); - tx_records.append(&mut unhappy_path_records); - info!("scan_obsolete_sibling_graphs: {:?}", tx_records.len()); - - for tx_record in tx_records { - tx.update_graphs_status_with_instance_id( - tx_record.instance_id, - Some(tx_record.graph_id), - &GraphStatus::Obsoleted.to_string(), - ) - .await?; - tx.update_goat_tx_record_processing_status( - &tx_record.graph_id, - &tx_record.instance_id, - &tx_record.tx_type, - &GoatTxProcessingStatus::Processed.to_string(), - ) - .await? - } - tx.commit().await?; - Ok(()) -} - /// Process watchtower challenge monitoring +/// may trigger: WatchtowerChallengeSent, WatchtowerChallengeTimeout, OperatorAckTimeout, DisproveSent(OperatorCommitTimeout/OperatorNack), OperatorCommitBlockHashReady, OperatorCommitBlockHashTimeout +/// return Ok(true) if watchtower challenge is normal finished #[tracing::instrument(level = "info", skip(btc_client, local_db))] async fn process_watchtower_challenge_monitoring( btc_client: &BTCClient, local_db: &LocalDB, graph: &Graph, - sub_status: &mut ChallengeSubStatus, + _sub_status: &mut ChallengeSubStatus, current_height: i64, -) -> anyhow::Result<()> { +) -> anyhow::Result { + // WatchtowerChallengeInitSent will be pushed inside refresh_watchtower_challenge_monitor_data + let (mut vout_monitor_data, watchtower_challenge_init_height, monitor_result) = + match refresh_watchtower_challenge_monitor_data(local_db, btc_client, graph).await? { + Some((data, init_height, monitor_result)) => (data, init_height, monitor_result), + None => { + warn!( + "graph_id {} fail to get vout monitor data, maybe watchtower-challenge-init-tx not confirmed yet", + graph.graph_id + ); + return Ok(false); + } + }; + let timelock_config = get_challenge_timelock_config(); info!("timelock_config: {timelock_config:?}"); - let (kickoff_txid, watchtower_challenge_init_txid, blockhash_commit_timeout_txid): ( - Txid, - Txid, - Txid, - ) = match ( - graph.kickoff_txid.clone(), - graph.watchtower_challenge_init_txid.clone(), - graph.blockhash_commit_timeout_txid.clone(), - ) { - ( - Some(kickoff_txid), - Some(watchtower_challenge_init_txid), - Some(blockhash_commit_timeout_txid), - ) => ( - kickoff_txid.into(), - watchtower_challenge_init_txid.into(), - blockhash_commit_timeout_txid.into(), - ), - _ => { + let is_challenge_timeout = watchtower_challenge_init_height + + timelock_config.watchtower_challenge_timelock + < current_height; + let is_ack_timeout = + watchtower_challenge_init_height + timelock_config.watchtower_ack_timelock < current_height; + let is_blockhash_commit_timeout = watchtower_challenge_init_height + + timelock_config.watchtower_blockhash_commit_timelock + < current_height; + info!( + "is_ack_timeout_{is_ack_timeout}, is_challenge_timeout_{is_challenge_timeout}, \ + is_blockhash_commit_timeout_{is_blockhash_commit_timeout}, watchtower_challenge_init_height:{watchtower_challenge_init_height}, current_height:{current_height} ", + ); + if vout_monitor_data.check_watchtower_challenge_normal_finished() { + info!("graph_id {} watchtower challenge is normal finished", graph.graph_id); + return Ok(true); + } + if vout_monitor_data.is_disproved() { + let challenge_start_txid = graph.challenge_txid.clone().map(|v| v.into()); + let (disprove_type, index, challenge_finish_txid) = if vout_monitor_data + .commit_blockhash_status + == CommitBlockHashStatus::OperatorCommitTimeout + { + let blockhash_commit_timeout_txid = graph.blockhash_commit_timeout_txid.clone().ok_or_else(|| anyhow::anyhow!( + "graph_id {} is disproved by OperatorCommitTimeout but blockhash_commit_timeout_txid is none", + graph.graph_id + ))?; + (DisproveTxType::OperatorCommitTimeout, 0, blockhash_commit_timeout_txid.into()) + } else if let Some((&index, _)) = vout_monitor_data + .data_map + .iter() + .find(|(_, status)| **status == WatchtowerChallengeItemStatus::OperatorNACK) + { + let nack_txid = graph.nack_txids.get(index as usize).cloned().ok_or_else(|| { + anyhow::anyhow!( + "graph_id {} is disproved by OperatorNACK at index {} but nack_txid is none", + graph.graph_id, + index + ) + })?; + (DisproveTxType::OperatorNack, index as usize, nack_txid.into()) + } else { + anyhow::bail!( + "graph_id {} vout monitor data is disproved but no OperatorNACK or OperatorCommitBlockHashTimeout found", + graph.graph_id + ) + }; + upsert_message( + &mut local_db.acquire().await?, + false, + graph.graph_id, + None, + "self".to_string(), + Actor::Operator, + GOATMessageContent::DisproveSent(DisproveSent { + instance_id: graph.instance_id, + graph_id: graph.graph_id, + disprove_type, + index, + challenge_start_txid, + challenge_finish_txid, + }), + 0, + 0, + ) + .await?; + return Ok(false); // already disproved, no need to process further + } + if !vout_monitor_data.is_commit_blockhash_processed() + && vout_monitor_data.is_commit_blockhash_ready() + { + upsert_message( + &mut local_db.acquire().await?, + false, + graph.graph_id, + None, + "self".to_string(), + Actor::Operator, + GOATMessageContent::OperatorCommitBlockHashReady(OperatorCommitBlockHashReady { + instance_id: graph.instance_id, + graph_id: graph.graph_id, + }), + 0, + 0, + ) + .await?; + } + if !vout_monitor_data.is_commit_blockhash_processed() && is_blockhash_commit_timeout { + upsert_message( + &mut local_db.acquire().await?, + false, + graph.graph_id, + None, + "self".to_string(), + Actor::Challenger, + GOATMessageContent::OperatorCommitBlockHashTimeout(OperatorCommitBlockHashTimeout { + instance_id: graph.instance_id, + graph_id: graph.graph_id, + }), + 0, + 0, + ) + .await?; + } + if vout_monitor_data + .data_map + .values() + .any(|status| *status == WatchtowerChallengeItemStatus::Challenge) + { + let watchtower_challenge_txids = monitor_result.0; + if watchtower_challenge_txids.is_empty() { warn!( - "graph_id {} kickoff txid, watchtower challenge init txid or blockhash commit timeout txid is none", + "graph_id {} watchtower challenge txids is empty when some vout status is Challenge", graph.graph_id ); - return Ok(()); + } else { + upsert_message( + &mut local_db.acquire().await?, + false, + graph.graph_id, + None, + "self".to_string(), + Actor::Operator, + GOATMessageContent::WatchtowerChallengeSent(WatchtowerChallengeSent { + instance_id: graph.instance_id, + graph_id: graph.graph_id, + watchtower_challenge_txids, + }), + 0, + 0, + ) + .await?; } - }; - - let out_monitor = { - let mut storage_processor = local_db.acquire().await?; - storage_processor - .find_graph_btc_tx_vout_monitor(&graph.graph_id, &watchtower_challenge_init_txid.into()) - .await? - }; - - if let Some(out_monitor) = out_monitor { - let mut vout_monitor_data = - match parse_monitor_data::(&out_monitor.monitor_data) { - Ok(vout_monitor_data) => vout_monitor_data, - Err(_) => { - warn!("graph_id {} fail to parse monitor data", graph.graph_id); - return Ok(()); - } - }; - let is_challenge_timeout = - out_monitor.height + timelock_config.watchtower_challenge_timelock < current_height; - let is_ack_timeout = - out_monitor.height + timelock_config.watchtower_ack_timelock < current_height; - let is_blockhash_commit_timeout = out_monitor.height - + timelock_config.watchtower_blockhash_commit_timelock - < current_height; - let mut data_change = false; - let mut is_commit_block_hash_ready = false; - let mut p2p_message_contents: Vec<(Actor, GOATMessageContent, Option)> = vec![]; - info!( - "is_ack_timeout_{is_ack_timeout}, is_challenge_timeout_{is_challenge_timeout}, \ - is_blockhash_commit_timeout_{is_blockhash_commit_timeout}, out_monitor.height:{}, current_height:{current_height} ", - out_monitor.height - ); - - // 1) Prefer detecting operator commit first; if found, skip timeout handling - if vout_monitor_data.commit_blockhash_status != CommitBlockHashStatus::OperatorCommit - && let Some(spend_txid) = outpoint_spent_txid( - btc_client, - &watchtower_challenge_init_txid, - (out_monitor.vout_len - CONNECTOR_G_MARGIN) as u64, + } + if is_ack_timeout { + // Re-sends an OperatorAckTimeout message whenever the disproved indexes change. + vout_monitor_data.update_disprove_indexes(); + if !vout_monitor_data.require_disproved_indexes.is_empty() { + upsert_message( + &mut local_db.acquire().await?, + false, + graph.graph_id, + Some(vout_monitor_data.get_require_disproved_string()), + "self".to_string(), + Actor::Challenger, + GOATMessageContent::OperatorAckTimeout(OperatorAckTimeout { + instance_id: graph.instance_id, + graph_id: graph.graph_id, + }), + 0, + 0, ) - .await? - && blockhash_commit_timeout_txid != spend_txid - { - info!( - "graph id :{} sub status update to CommitBlockHashStatus::OperatorCommit", - graph.graph_id - ); - vout_monitor_data.commit_blockhash_status = CommitBlockHashStatus::OperatorCommit; - sub_status.commit_blockhash_status = CommitBlockHashStatus::OperatorCommit; - data_change = true; + .await?; } - - // 2) Mark timeout only when commit is absent and timelock has passed - if is_blockhash_commit_timeout - && vout_monitor_data.commit_blockhash_status != CommitBlockHashStatus::OperatorCommit - { - info!( - "graph id :{} sub status update to CommitBlockHashStatus::OperatorCommitTimeout", - graph.graph_id + } + if is_challenge_timeout { + // Re-sends an WatchtowerChallengeTimeout message whenever the watchtower indexes change. + let watchtower_indexes: Vec = vout_monitor_data + .data_map + .iter() + .filter_map(|(&index, status)| match status { + WatchtowerChallengeItemStatus::OperatorInit => Some(index as usize), + _ => None, + }) + .collect(); + if !watchtower_indexes.is_empty() { + let sub_type = format!( + "[{}]", + watchtower_indexes.iter().map(|v| v.to_string()).collect::>().join("_") ); - vout_monitor_data.commit_blockhash_status = - CommitBlockHashStatus::OperatorCommitTimeout; - sub_status.commit_blockhash_status = CommitBlockHashStatus::OperatorCommitTimeout; - sub_status.disprove_type = Some(DisproveTxType::OperatorCommitTimeout); - p2p_message_contents.push(( - Actor::Challenger, - GOATMessageContent::OperatorCommitBlockHashTimeout( - OperatorCommitBlockHashTimeout { - instance_id: graph.instance_id, - graph_id: graph.graph_id, - }, - ), - None, - )); - data_change = true; + upsert_message( + &mut local_db.acquire().await?, + false, + graph.graph_id, + Some(sub_type), + "self".to_string(), + Actor::Operator, + GOATMessageContent::WatchtowerChallengeTimeout(WatchtowerChallengeTimeout { + instance_id: graph.instance_id, + graph_id: graph.graph_id, + watchtower_indexes, + }), + 0, + 0, + ) + .await?; } + } + Ok(false) +} - if !is_ack_timeout { - if is_challenge_timeout { - info!( - "watchtower challenge timeout for graph id :{}, vout_monitor_data:{:?}", - graph.graph_id, vout_monitor_data - ); - let watchtower_indexes: Vec = vout_monitor_data - .data_map - .iter() - .filter_map(|(&index, status)| match status { - WatchtowerChallengeItemStatus::OperatorInit => Some(index as usize), - _ => None, - }) - .collect(); - if !watchtower_indexes.is_empty() && !vout_monitor_data.is_challenge_timeout_sent { - let sub_type = format!( - "[{}]", - watchtower_indexes - .iter() - .map(|v| v.to_string()) - .collect::>() - .join("_") - ); - p2p_message_contents.push(( - Actor::Operator, - GOATMessageContent::WatchtowerChallengeTimeout( - WatchtowerChallengeTimeout { - instance_id: graph.instance_id, - graph_id: graph.graph_id, - watchtower_indexes, - }, - ), - Some(sub_type), - )); - sub_status.watchtower_challenge_status = - WatchtowerChallengeStatus::WatchtowerChallengeTimeout; - vout_monitor_data.is_challenge_timeout_sent = true; - data_change = true; - } - } - - let (challenge_txids, challenge_timeout_txids, ack_txids) = vout_monitor_data - .monitor_vout( - btc_client, - &watchtower_challenge_init_txid, - &graph.watchtower_challenge_timeout_txids, - &graph.nack_txids, - ) - .await?; - - if !challenge_txids.is_empty() { - data_change = true; - // contain the situations: - // 1. all watchtower challenge - // 2,challenge timeout. operator not send challenge timeout, but watchtower send challenge tx - - if [ - WatchtowerChallengeStatus::OperatorInit, - WatchtowerChallengeStatus::WatchtowerChallengeTimeout, - ] - .contains(&sub_status.watchtower_challenge_status) - && !vout_monitor_data - .data_map - .iter() - .any(|(_, v)| *v == WatchtowerChallengeItemStatus::OperatorInit) - { - info!( - "graph id :{} sub status update to WatchtowerChallengeStatus::Challenge", - graph.graph_id - ); - // all in challenge - sub_status.watchtower_challenge_status = - WatchtowerChallengeStatus::WatchtowerChallenge; - } - p2p_message_contents.push(( - Actor::Operator, - GOATMessageContent::WatchtowerChallengeSent(WatchtowerChallengeSent { - instance_id: graph.instance_id, - graph_id: graph.graph_id, - watchtower_challenge_txids: challenge_txids, - }), - None, - )); - } - - if !ack_txids.is_empty() || !challenge_timeout_txids.is_empty() { - data_change = true; - } - - if vout_monitor_data.commit_blockhash_status == CommitBlockHashStatus::None - && vout_monitor_data.is_commit_blockhash_ready() - { - data_change = true; - sub_status.commit_blockhash_status = - CommitBlockHashStatus::WatchtowerChallengeProcessed; - vout_monitor_data.commit_blockhash_status = - CommitBlockHashStatus::WatchtowerChallengeProcessed; - is_commit_block_hash_ready = true; - } - - if vout_monitor_data.check_watchtower_challenge_normal_finished() { - data_change = true; - sub_status.watchtower_challenge_status = - WatchtowerChallengeStatus::WatchtowerChallengeNormalFinished; - } - } else { - info!("graph id :{} ack timeout", graph.graph_id); - vout_monitor_data.update_disprove_indexes(); - if vout_monitor_data.require_disproved_indexes.is_empty() { - trace!( - "graph id :{} sub status update to WatchtowerChallengeStatus::OperatorACK", - graph.graph_id - ); - sub_status.watchtower_challenge_status = - WatchtowerChallengeStatus::WatchtowerChallengeNormalFinished; - } else { - trace!( - "graph id :{} sub status update to WatchtowerChallengeStatus::OperatorNACK", - graph.graph_id - ); - sub_status.watchtower_challenge_status = - WatchtowerChallengeStatus::WatchtowerChallengeDisproveFinished; - sub_status.disprove_type = Some(DisproveTxType::OperatorNack); - p2p_message_contents.push(( - Actor::Challenger, - GOATMessageContent::OperatorAckTimeout(OperatorAckTimeout { - instance_id: graph.instance_id, - graph_id: graph.graph_id, - }), - Some(vout_monitor_data.get_require_disproved_string()), - )); - } - data_change = true; - } - if data_change { - let mut tx = local_db.start_transaction().await?; - tx.update_graph( - &GraphUpdate::new(graph.graph_id) - .with_sub_status(serde_json::to_string(sub_status).unwrap()), - ) - .await?; - if is_commit_block_hash_ready { - upsert_message( - &mut tx, - false, - graph.graph_id, - None, - "self".to_string(), - Actor::Operator, - GOATMessageContent::OperatorCommitBlockHashReady( - OperatorCommitBlockHashReady { - instance_id: graph.instance_id, - graph_id: graph.graph_id, - }, - ), - 0, - 0, - ) - .await?; - } - - tx.update_graph_btc_tx_vout_monitor_data( - &graph.graph_id, - &watchtower_challenge_init_txid.into(), - serde_json::to_string(&vout_monitor_data)?, - ) - .await?; - for (actor, message_content, sub_type) in p2p_message_contents { - upsert_message( - &mut tx, - false, - graph.graph_id, - sub_type, - "self".to_string(), - actor, - message_content, - 0, - 0, - ) - .await?; - } - tx.commit().await?; - } - } else { - trace!("graph id :{} watchtower challenge init is not on chain", graph.graph_id); - // Create monitor if watchtower challenge init transaction is detected - if let Some(spent_txid) = outpoint_spent_txid(btc_client, &kickoff_txid, 1).await? - && spent_txid == watchtower_challenge_init_txid - { - info!( - "graph_id: {} watchtower_challenge_init_txid {} has been broadcasted", - graph.graph_id, - watchtower_challenge_init_txid.to_string() - ); - if let Ok(Some(watchtower_challenge_init_tx)) = - btc_client.get_tx_info(&watchtower_challenge_init_txid).await - && watchtower_challenge_init_tx.status.block_height.unwrap_or_default() > 0 - { - sub_status.watchtower_challenge_status = WatchtowerChallengeStatus::OperatorInit; - let mut tx = local_db.start_transaction().await?; - tx.update_graph( - &GraphUpdate::new(graph.graph_id) - .with_sub_status(serde_json::to_string(sub_status).unwrap()), - ) - .await?; - - tx.upsert_graph_btc_tx_vout_monitor(&GraphBtcTxVoutMonitor { - graph_id: graph.graph_id, - tx_name: MONITE_BTC_TX_NAME_WATCHTOWER_INIT.to_string(), - txid: watchtower_challenge_init_txid.into(), - height: watchtower_challenge_init_tx.status.block_height.unwrap_or_default() - as i64, - vout_len: watchtower_challenge_init_tx.vout.len() as i64, - monitor_data: serde_json::to_string(&WTInitTxVoutMonitorData::new( - (watchtower_challenge_init_tx.vout.len() as i32 - - CONNECTOR_G_MARGIN as i32) - / 2, - ))?, - created_at: current_time_secs(), - updated_at: current_time_secs(), - }) - .await?; - upsert_message( - &mut tx, - false, - graph.graph_id, - None, - "self".to_string(), - Actor::Watchtower, - GOATMessageContent::WatchtowerChallengeInitSent(WatchtowerChallengeInitSent { - instance_id: graph.instance_id, - graph_id: graph.graph_id, - }), - 0, - 0, - ) - .await?; - tx.commit().await?; - } else { - warn!( - "process_assert_commit_monitoring graph_id: {}, watchtower_challenge_init_txid {watchtower_challenge_init_txid} not found on chain", - graph.graph_id +/// Process assert commit monitoring +/// may trigger: DisproveSent(AssertTimeout), AssertCommitTimeout +/// return Ok(true) if assert commit is normal finished +async fn process_assert_commit_monitoring( + btc_client: &BTCClient, + local_db: &LocalDB, + graph: &Graph, + _sub_status: &mut ChallengeSubStatus, + current_height: i64, +) -> anyhow::Result { + let (mut vout_monitor_data, assert_init_height, _monitor_result) = + match refresh_assert_monitor_data(local_db, btc_client, graph).await? { + Some((data, init_height, monitor_result)) => (data, init_height, monitor_result), + None => { + warn!( + "graph_id {} fail to get vout monitor data, maybe assert-init-tx not confirmed yet", + graph.graph_id ); + return Ok(false); } - } - } - - Ok(()) -} + }; -/// Process assert commit monitoring -async fn process_assert_commit_monitoring( - btc_client: &BTCClient, - local_db: &LocalDB, - graph: &Graph, - sub_status: &mut ChallengeSubStatus, - current_height: i64, -) -> anyhow::Result<()> { - trace!("process_assert_commit_monitoring start"); let timelock_config = get_challenge_timelock_config(); - let (kickoff_txid, assert_init_txid): (Txid, Txid) = match ( - graph.kickoff_txid.clone(), - graph.assert_init_txid.clone(), - ) { - (Some(kickoff_txid), Some(assert_init_txid)) => { - (kickoff_txid.into(), assert_init_txid.into()) - } - _ => { - warn!( - "process_assert_commit_monitoring graph_id {} kickoff_txid or assert_init_txid is none", - graph.graph_id - ); - return Ok(()); - } - }; - let out_monitor = { - let mut storage_processor = local_db.acquire().await?; - storage_processor - .find_graph_btc_tx_vout_monitor(&graph.graph_id, &assert_init_txid.into()) - .await? - }; - - if let Some(out_monitor) = out_monitor { - let mut vout_monitor_data = - match parse_monitor_data::(&out_monitor.monitor_data) { - Ok(vout_monitor_data) => vout_monitor_data, - Err(_) => { - warn!( - "process_assert_commit_monitoring graph_id {} fail to parse monitor data", - graph.graph_id - ); - return Ok(()); - } - }; - let is_assert_commit_timeout = - out_monitor.height + timelock_config.assert_commit_timelock < current_height; - info!( - "process_assert_commit_monitoring: graph id :{} is_assert_commit_timeout:{is_assert_commit_timeout},\ - out_monitor.height:{}, timelock_config.assert_commit_timelock:{}, current_height:{current_height}", - graph.graph_id, out_monitor.height, timelock_config.assert_commit_timelock - ); - let mut data_change = false; - let mut message_content: Option<(Actor, GOATMessageContent)> = None; - if !is_assert_commit_timeout { - trace!( - "process_assert_commit_monitoring graph id :{} assert commit monitor", - graph.graph_id - ); - let vout_spent_len = vout_monitor_data - .monitor_vout(btc_client, &assert_init_txid, &graph.assert_commit_timeout_txids) - .await?; - - if vout_monitor_data.check_normal_finished() { - sub_status.assert_commit_status = AssertCommitStatus::OperatorCommit; - } - - info!( - "process_assert_commit_monitoring graph id :{} vout_spent_len:{vout_spent_len}", - graph.graph_id - ); - data_change = data_change || vout_spent_len > 0; - } else { - info!( - "process_assert_commit_monitoring graph id :{} assert commit timeout", - graph.graph_id - ); - vout_monitor_data.update_disprove_indexes(); - if vout_monitor_data.require_disproved_indexes.is_empty() { - info!( - "process_assert_commit_monitoring graph id :{} sub status update to AssertCommitStatus::OperatorCommit", - graph.graph_id - ); - sub_status.assert_commit_status = AssertCommitStatus::OperatorCommit; - } else { - info!( - "process_assert_commit_monitoring graph id :{} sub status update to AssertCommitStatus::OperatorCommitTimeout", - graph.graph_id - ); - sub_status.assert_commit_status = AssertCommitStatus::OperatorCommitTimeout; - sub_status.disprove_type = Some(DisproveTxType::AssertTimeout); - message_content = Some(( - Actor::Challenger, - GOATMessageContent::AssertCommitTimeout(AssertCommitTimeout { - instance_id: graph.instance_id, - graph_id: graph.graph_id, - }), - )); - } - data_change = true; - } - if data_change { - let mut tx = local_db.start_transaction().await?; - tx.update_graph( - &GraphUpdate::new(graph.graph_id) - .with_sub_status(serde_json::to_string(sub_status).unwrap()), - ) - .await?; - tx.update_graph_btc_tx_vout_monitor_data( - &graph.graph_id, - &assert_init_txid.into(), - serde_json::to_string(&vout_monitor_data)?, - ) - .await?; - if let Some((actor, message_content)) = message_content { - upsert_message( - &mut tx, - false, - graph.graph_id, - None, - "self".to_string(), - actor, - message_content, - 0, - 0, - ) - .await?; - } - tx.commit().await?; - } - } else { - trace!( - "process_assert_commit_monitoring graph_id: {} assert_init_txid {} not been broadcasted, start to detect", + let is_assert_commit_timeout = + assert_init_height + timelock_config.assert_commit_timelock < current_height; + info!( + "process_assert_commit_monitoring: graph id :{} is_assert_commit_timeout:{is_assert_commit_timeout},\ + assert_init_height:{assert_init_height}, timelock_config.assert_commit_timelock:{}, current_height:{current_height}", + graph.graph_id, timelock_config.assert_commit_timelock + ); + if vout_monitor_data.check_normal_finished() { + info!("graph_id {} assert commit is normal finished", graph.graph_id); + return Ok(true); + } + if vout_monitor_data.is_disproved() { + let challenge_start_txid = graph.challenge_txid.clone().map(|v| v.into()); + let disprove_type = DisproveTxType::AssertTimeout; + let index = vout_monitor_data + .data_map + .iter() + .find(|(_, status)| **status == AssertCommitItemStatus::OperatorCommitTimeout) + .map(|(&index, _)| index as usize) + .ok_or_else(|| anyhow::anyhow!("graph_id {} assert vout monitor data is disproved but no OperatorCommitTimeout found", graph.graph_id))?; + let challenge_finish_txid = graph.assert_commit_timeout_txids.get(index).cloned().ok_or_else(|| anyhow::anyhow!( + "graph_id {} is disproved by OperatorCommitTimeout at index {} but assert_commit_timeout_txid is none", graph.graph_id, - assert_init_txid.to_string() - ); - - // Create monitor if assert init transaction is detected - if let Some(spent_txid) = outpoint_spent_txid(btc_client, &kickoff_txid, 2).await? - && spent_txid == assert_init_txid - { - info!( - "process_assert_commit_monitoring graph_id: {} assert_init_txid {} has been broadcasted", - graph.graph_id, - assert_init_txid.to_string() - ); - - if let Ok(Some(assert_init_tx)) = btc_client.get_tx_info(&assert_init_txid).await - && assert_init_tx.status.block_height.unwrap_or_default() > 0 - { - sub_status.assert_commit_status = AssertCommitStatus::OperatorInit; - let mut tx = local_db.start_transaction().await?; - tx.update_graph( - &GraphUpdate::new(graph.graph_id) - .with_sub_status(serde_json::to_string(sub_status).unwrap()), - ) - .await?; - tx.upsert_graph_btc_tx_vout_monitor(&GraphBtcTxVoutMonitor { - graph_id: graph.graph_id, - tx_name: MONITE_BTC_TX_NAME_ASSERT_INIT.to_string(), - txid: assert_init_txid.into(), - height: assert_init_tx.status.block_height.unwrap_or_default() as i64, - vout_len: assert_init_tx.vout.len() as i64, - monitor_data: serde_json::to_string(&AssertInitTxVoutMonitorData::new( - assert_init_tx.vout.len() as i32 - 2, - ))?, - created_at: current_time_secs(), - updated_at: current_time_secs(), - }) - .await?; - tx.commit().await?; - } else { - warn!( - "process_assert_commit_monitoring graph_id: {}, assert_init_txid {assert_init_txid} not found on chain", - graph.graph_id - ); - } - } else { - let mut storage_processor = local_db.acquire().await?; + index + ))?.into(); + upsert_message( + &mut local_db.acquire().await?, + false, + graph.graph_id, + None, + "self".to_string(), + Actor::Operator, + GOATMessageContent::DisproveSent(DisproveSent { + instance_id: graph.instance_id, + graph_id: graph.graph_id, + disprove_type, + index, + challenge_start_txid, + challenge_finish_txid, + }), + 0, + 0, + ) + .await?; + return Ok(false); // already disproved, no need to process further + } + if is_assert_commit_timeout { + vout_monitor_data.update_disprove_indexes(); + if !vout_monitor_data.require_disproved_indexes.is_empty() { upsert_message( - &mut storage_processor, + &mut local_db.acquire().await?, false, graph.graph_id, - None, + Some(vout_monitor_data.get_require_disproved_string()), "self".to_string(), - Actor::Operator, - GOATMessageContent::AssertInitReady(AssertInitReady { + Actor::Challenger, + GOATMessageContent::AssertCommitTimeout(AssertCommitTimeout { instance_id: graph.instance_id, graph_id: graph.graph_id, }), @@ -1439,73 +1183,10 @@ async fn process_assert_commit_monitoring( } } - Ok(()) -} - -/// Find the spend transaction for a disproved index -async fn find_challenge_nack_tx( - btc_client: &BTCClient, - storage_processor: &mut StorageProcessor<'_>, - graph_id: &Uuid, - watchtower_challenge_init_txid: &SerializableTxid, -) -> anyhow::Result> { - info!("find_challenge_nack_tx for graph_id: {graph_id}"); - let out_monitor = storage_processor - .find_graph_btc_tx_vout_monitor(graph_id, watchtower_challenge_init_txid) - .await?; - - let Some(out_monitor) = out_monitor else { - return Ok(None); - }; - - if let Some(spend_txid) = outpoint_spent_txid( - btc_client, - &watchtower_challenge_init_txid.clone().into(), - out_monitor.vout_len as u64 - CONNECTOR_F_MARGIN, - ) - .await? - && let Some(tx) = btc_client.get_tx(&spend_txid).await? - { - let index = tx.input[0].previous_output.vout as i32; - if index == out_monitor.vout_len as i32 - 3_i32 { - return Ok(None); - } - - return Ok(Some((spend_txid, index / 2))); - } - - Ok(None) -} - -/// Find the spend transaction for assert timeout -async fn find_assert_timeout_tx( - btc_client: &BTCClient, - storage_processor: &mut StorageProcessor<'_>, - graph_id: &Uuid, - assert_init_txid: &SerializableTxid, -) -> anyhow::Result> { - info!("find_assert_timeout_tx for graph_id: {graph_id}"); - let out_monitor = - storage_processor.find_graph_btc_tx_vout_monitor(graph_id, assert_init_txid).await?; - - let Some(out_monitor) = out_monitor else { - return Ok(None); - }; - - if let Some(spend_txid) = outpoint_spent_txid( - btc_client, - &assert_init_txid.clone().into(), - out_monitor.vout_len as u64 - CONNECTOR_D_MARGIN, - ) - .await? - && let Some(tx) = btc_client.get_tx(&spend_txid).await? - { - return Ok(Some((spend_txid, tx.input[0].previous_output.vout as i32))); - } - - Ok(None) + Ok(false) } +/// may trigger: DisproveSent(QuickChallenge/ChallengeIncompleteKickoff) async fn detect_kickoff_ref_disprove_tx( btc_client: &BTCClient, local_db: &LocalDB, @@ -1595,214 +1276,8 @@ async fn detect_kickoff_ref_disprove_tx( Ok(detected) } -async fn fetch_challenge_txid( - btc_client: &BTCClient, - storage_processor: &mut StorageProcessor<'_>, - graph_id: Uuid, - kickoff_txid: &Option, - take1_txid: &Option, -) -> anyhow::Result> { - info!("fetch_challenge_txid for graph_id:{}", graph_id); - let (kickoff_txid, take1_txid): (Txid, Txid) = match (kickoff_txid, take1_txid) { - (Some(kickoff_txid), Some(take1_txid)) => (kickoff_txid.0, take1_txid.0), - _ => { - warn!("graph:{graph_id} kickoff_txid or take1_txid none"); - return Ok(None); - } - }; - - if let Ok(Some(txid)) = outpoint_spent_txid(btc_client, &kickoff_txid, 0).await { - if txid == take1_txid { - warn!("graph:{graph_id} take1 has been sent!"); - Ok(None) - } else { - info!("graph:{graph_id} detected challenge txid :{txid}"); - storage_processor - .update_graph(&GraphUpdate::new(graph_id).with_challenge_txid(txid.into())) - .await?; - Ok(Some(txid.into())) - } - } else { - warn!("graph:{graph_id} fail to detect challenge txid will try later"); - Ok(None) - } -} - -async fn detect_disproved_txids( - btc_client: &BTCClient, - storage_processor: &mut StorageProcessor<'_>, - graph: &Graph, - sub_status: &mut ChallengeSubStatus, -) -> anyhow::Result> { - info!("detecting disproved txids graph_id {}", graph.graph_id); - let challenge_txid: Txid = match graph.challenge_txid.clone() { - Some(challenge_txid) => challenge_txid.into(), - None => { - warn!("graph:{} challenge_txid is none", graph.graph_id); - if let Ok(Some(txid)) = fetch_challenge_txid( - btc_client, - storage_processor, - graph.graph_id, - &graph.kickoff_txid, - &graph.take1_txid, - ) - .await - { - txid.into() - } else { - return Ok(None); - } - } - }; - - if let Some(disprove_type) = sub_status.disprove_type - && let Some(disprove_txid) = graph.disprove_txid.clone() - { - return Ok(Some(( - disprove_type, - challenge_txid, - disprove_txid.into(), - sub_status.disprove_index, - ))); - } - - let ( - kickoff_txid, - watchtower_challenge_init_txid, - blockhash_commit_timeout_txid, - assert_init_txid, - take2_txid, - ): (Txid, Txid, Txid, Txid, Txid) = match ( - graph.kickoff_txid.clone(), - graph.watchtower_challenge_init_txid.clone(), - graph.blockhash_commit_timeout_txid.clone(), - graph.assert_init_txid.clone(), - graph.take2_txid.clone(), - ) { - ( - Some(kickoff_txid), - Some(watchtower_challenge_init_txid), - Some(blockhash_commit_timeout_txid), - Some(assert_init_txid), - Some(take2_txid), - ) => ( - kickoff_txid.into(), - watchtower_challenge_init_txid.into(), - blockhash_commit_timeout_txid.into(), - assert_init_txid.into(), - take2_txid.into(), - ), - _ => { - warn!( - "graph:{} kickoff_txid/watchtower_challenge_init_txid/blockhash_commit_timeout_txid/\ - assert_init_txid/take2_txid has none value", - graph.graph_id - ); - return Ok(None); - } - }; - match sub_status.disprove_type { - Some(DisproveTxType::AssertTimeout) => { - return Ok(find_assert_timeout_tx( - btc_client, - storage_processor, - &graph.graph_id, - &assert_init_txid.into(), - ) - .await? - .map(|(finish_txid, index)| { - (DisproveTxType::AssertTimeout, challenge_txid, finish_txid, index) - })); - } - - Some(DisproveTxType::OperatorNack) => { - return Ok(find_challenge_nack_tx( - btc_client, - storage_processor, - &graph.graph_id, - &watchtower_challenge_init_txid.into(), - ) - .await? - .map(|(finish_txid, index)| { - (DisproveTxType::OperatorNack, challenge_txid, finish_txid, index) - })); - } - - Some(DisproveTxType::OperatorCommitTimeout) => { - return Ok(Some(( - DisproveTxType::OperatorCommitTimeout, - challenge_txid, - blockhash_commit_timeout_txid, - 0, - ))); - } - _ => {} - } - - // QuickChallenge ChallengeIncompleteKickoff detect in status Operator kickoff - if let Some(spent_txid) = outpoint_spent_txid(btc_client, &kickoff_txid, 3).await? - && spent_txid != take2_txid - { - sub_status.disprove_type = Some(DisproveTxType::Disprove); - return Ok(Some((DisproveTxType::Disprove, challenge_txid, spent_txid, 0))); - } - - Ok(None) -} -async fn process_graph_watchtower_assert_disproved( - btc_client: &BTCClient, - local_db: &LocalDB, - graph: &Graph, - sub_status: &mut ChallengeSubStatus, -) -> anyhow::Result<()> { - info!("process_graph_watchtower_assert_disproved for graph:{}", graph.graph_id); - let mut tx = local_db.start_transaction().await?; - match detect_disproved_txids(btc_client, &mut tx, graph, sub_status).await? { - Some((disprove_type, start_txid, finish_txid, tx_index)) => { - if graph.disprove_txid.is_none() { - sub_status.disprove_index = tx_index; - tx.update_graph( - &GraphUpdate::new(graph.graph_id) - .with_disprove_txid(finish_txid.into()) - .with_sub_status(serde_json::to_string(sub_status).unwrap()), - ) - .await?; - } - - info!( - "process_graph_watchtower_assert_disproved: graph:{} disprove_type:{}, start_txid:{}. finsh_txid{}. tx_index:{} ", - graph.graph_id, disprove_type, start_txid, finish_txid, tx_index - ); - - upsert_message( - &mut tx, - false, - graph.graph_id, - None, - "self".to_string(), - Actor::Committee, - GOATMessageContent::DisproveSent(DisproveSent { - instance_id: graph.instance_id, - graph_id: graph.graph_id, - disprove_type, - index: tx_index as usize, - challenge_start_txid: Some(start_txid), - challenge_finish_txid: finish_txid, - }), - 0, - 0, - ) - .await?; - } - None => { - trace!("process_graph_watchtower_assert_disproved get disproved tx is none"); - } - } - tx.commit().await?; - Ok(()) -} - /// Process graph data in Watchtower Assert Normal status +/// may trigger: Take2Ready, Take2Sent, DisproveSent(Disprove) async fn detect_take2( btc_client: &BTCClient, local_db: &LocalDB, @@ -1909,17 +1384,30 @@ async fn detect_take2( spent_txid.to_string() ); Ok(Some(( - Actor::Committee, + Actor::All, GOATMessageContent::Take2Sent(Take2Sent { instance_id: graph.instance_id, graph_id: graph.graph_id, }), ))) } else { - Ok(None) + let disprove_type = DisproveTxType::Disprove; + let challenge_start_txid = graph.challenge_txid.clone().map(|v| v.into()); + Ok(Some(( + Actor::All, + GOATMessageContent::DisproveSent(DisproveSent { + instance_id: graph.instance_id, + graph_id: graph.graph_id, + disprove_type, + index: 0, + challenge_start_txid, + challenge_finish_txid: spent_txid, + }), + ))) } } +/// may trigger: PreKickoffSent async fn check_pre_kickoff_sent( local_db: &LocalDB, btc_client: &BTCClient, @@ -1968,3 +1456,302 @@ async fn check_pre_kickoff_sent( } Ok(pre_sents) } + +pub(crate) async fn refresh_watchtower_challenge_monitor_data( + local_db: &LocalDB, + btc_client: &BTCClient, + graph: &Graph, +) -> anyhow::Result< + Option<( + WTInitTxVoutMonitorData, + i64, + (Vec<(usize, Txid)>, Vec<(usize, Txid)>, Vec<(usize, Txid)>), + )>, +> { + let watchtower_challenge_init_txid = + graph.watchtower_challenge_init_txid.clone().ok_or_else(|| { + anyhow::anyhow!("graph_id:{} watchtower_challenge_init_txid is none", graph.graph_id) + })?; + let out_monitor = { + let mut storage_processor = local_db.acquire().await?; + storage_processor + .find_graph_btc_tx_vout_monitor(&graph.graph_id, &watchtower_challenge_init_txid) + .await? + }; + + let mut init_height; + let mut monitor_meta_update: Option<(i64, i64)> = None; + let mut existing_meta: Option<(String, i64)> = None; + let mut vout_monitor_data = if let Some(out_monitor) = out_monitor { + existing_meta = Some((out_monitor.tx_name.clone(), out_monitor.created_at)); + init_height = out_monitor.height; + match parse_monitor_data::(&out_monitor.monitor_data) { + Ok(vout_monitor_data) => vout_monitor_data, + Err(err) => { + warn!( + "graph_id:{} fail to parse watchtower monitor_data, rebuild default: {err}", + graph.graph_id + ); + let mut new_height = out_monitor.height; + let mut new_vout_len = out_monitor.vout_len; + if new_height <= 0 || new_vout_len <= 0 { + let txid: Txid = watchtower_challenge_init_txid.clone().into(); + if let Some(tx) = btc_client.get_tx_info(&txid).await? { + if new_height <= 0 { + new_height = tx.status.block_height.unwrap_or_default() as i64; + } + if new_vout_len <= 0 { + new_vout_len = tx.vout.len() as i64; + } + } + } + init_height = new_height; + let index_size = (new_vout_len as i32 - CONNECTOR_G_MARGIN as i32) / 2; + if index_size <= 0 { + warn!( + "graph_id:{} watchtower_challenge_init_txid {} invalid index_size {index_size}, skip refresh", + graph.graph_id, watchtower_challenge_init_txid.0 + ); + return Ok(None); + } + if new_height != out_monitor.height || new_vout_len != out_monitor.vout_len { + monitor_meta_update = Some((new_height, new_vout_len)); + } + WTInitTxVoutMonitorData::new(index_size) + } + } + } else { + let txid: Txid = watchtower_challenge_init_txid.clone().into(); + let watchtower_challenge_init_tx = match btc_client + .get_tx_info(&txid) + .await? + .filter(|tx| tx.status.block_height.unwrap_or_default() > 0) + { + Some(tx) => tx, + None => { + warn!( + "graph_id:{} watchtower_challenge_init_txid not on chain, skip refresh", + graph.graph_id + ); + return Ok(None); + } + }; + + init_height = watchtower_challenge_init_tx.status.block_height.unwrap_or_default() as i64; + let vout_monitor_data = WTInitTxVoutMonitorData::new( + (watchtower_challenge_init_tx.vout.len() as i32 - CONNECTOR_G_MARGIN as i32) / 2, + ); + + let current_times = current_time_secs(); + let mut tx = local_db.start_transaction().await?; + tx.upsert_graph_btc_tx_vout_monitor(&GraphBtcTxVoutMonitor { + graph_id: graph.graph_id, + tx_name: MONITE_BTC_TX_NAME_WATCHTOWER_INIT.to_string(), + txid: watchtower_challenge_init_txid.clone(), + height: watchtower_challenge_init_tx.status.block_height.unwrap_or_default() as i64, + vout_len: watchtower_challenge_init_tx.vout.len() as i64, + monitor_data: serde_json::to_string(&vout_monitor_data)?, + created_at: current_times, + updated_at: current_times, + }) + .await?; + tx.commit().await?; + + vout_monitor_data + }; + + if init_height <= 0 { + warn!( + "graph_id:{} watchtower_challenge_init_txid {} height is not confirmed, skip refresh", + graph.graph_id, watchtower_challenge_init_txid.0 + ); + return Ok(None); + } + let block_hash_commit_timeout_txid = + graph.blockhash_commit_timeout_txid.clone().ok_or_else(|| { + anyhow::anyhow!("graph_id:{} blockhash_commit_timeout_txid is empty", graph.graph_id) + })?; + let monitor_result = vout_monitor_data + .monitor_vout( + btc_client, + &watchtower_challenge_init_txid.clone().into(), + &graph.watchtower_challenge_timeout_txids, + &graph.nack_txids, + &block_hash_commit_timeout_txid, + ) + .await?; + let mut tx = local_db.start_transaction().await?; + if let (Some((height, vout_len)), Some((tx_name, created_at))) = + (monitor_meta_update, existing_meta) + { + tx.upsert_graph_btc_tx_vout_monitor(&GraphBtcTxVoutMonitor { + graph_id: graph.graph_id, + tx_name, + txid: watchtower_challenge_init_txid.clone(), + height, + vout_len, + monitor_data: serde_json::to_string(&vout_monitor_data)?, + created_at, + updated_at: current_time_secs(), + }) + .await?; + } else { + tx.update_graph_btc_tx_vout_monitor_data( + &graph.graph_id, + &watchtower_challenge_init_txid, + serde_json::to_string(&vout_monitor_data)?, + ) + .await?; + } + // Always insert WatchtowerChallengeInitSent to avoid missing; repeated inserts are idempotent + upsert_message( + &mut tx, + false, + graph.graph_id, + None, + "self".to_string(), + Actor::Watchtower, + GOATMessageContent::WatchtowerChallengeInitSent(WatchtowerChallengeInitSent { + instance_id: graph.instance_id, + graph_id: graph.graph_id, + }), + 0, + 0, + ) + .await?; + tx.commit().await?; + + Ok(Some((vout_monitor_data, init_height, monitor_result))) +} + +pub(crate) async fn refresh_assert_monitor_data( + local_db: &LocalDB, + btc_client: &BTCClient, + graph: &Graph, +) -> anyhow::Result> { + let assert_init_txid = graph + .assert_init_txid + .clone() + .ok_or_else(|| anyhow::anyhow!("graph_id:{} assert_init_txid is none", graph.graph_id))?; + let out_monitor = { + let mut storage_processor = local_db.acquire().await?; + storage_processor.find_graph_btc_tx_vout_monitor(&graph.graph_id, &assert_init_txid).await? + }; + + let mut init_height; + let mut monitor_meta_update: Option<(i64, i64)> = None; + let mut existing_meta: Option<(String, i64)> = None; + let mut vout_monitor_data = if let Some(out_monitor) = out_monitor { + existing_meta = Some((out_monitor.tx_name.clone(), out_monitor.created_at)); + init_height = out_monitor.height; + match parse_monitor_data::(&out_monitor.monitor_data) { + Ok(vout_monitor_data) => vout_monitor_data, + Err(err) => { + warn!( + "graph_id:{} fail to parse assert monitor_data, rebuild default: {err}", + graph.graph_id + ); + let mut new_height = out_monitor.height; + let mut new_vout_len = out_monitor.vout_len; + if new_height <= 0 || new_vout_len <= 0 { + let txid: Txid = assert_init_txid.clone().into(); + if let Some(tx) = btc_client.get_tx_info(&txid).await? { + if new_height <= 0 { + new_height = tx.status.block_height.unwrap_or_default() as i64; + } + if new_vout_len <= 0 { + new_vout_len = tx.vout.len() as i64; + } + } + } + init_height = new_height; + let index_size = new_vout_len as i32 - 2; + if index_size <= 0 { + warn!( + "graph_id:{} assert_init_txid {} invalid index_size {index_size}, skip refresh", + graph.graph_id, assert_init_txid.0 + ); + return Ok(None); + } + if new_height != out_monitor.height || new_vout_len != out_monitor.vout_len { + monitor_meta_update = Some((new_height, new_vout_len)); + } + AssertInitTxVoutMonitorData::new(index_size) + } + } + } else { + let txid: Txid = assert_init_txid.clone().into(); + let assert_init_tx = match btc_client + .get_tx_info(&txid) + .await? + .filter(|tx| tx.status.block_height.unwrap_or_default() > 0) + { + Some(tx) => tx, + None => { + warn!("graph_id:{} assert_init_txid not on chain, skip refresh", graph.graph_id); + return Ok(None); + } + }; + + init_height = assert_init_tx.status.block_height.unwrap_or_default() as i64; + let vout_monitor_data = + AssertInitTxVoutMonitorData::new(assert_init_tx.vout.len() as i32 - 2); + let current_times = current_time_secs(); + let mut tx = local_db.start_transaction().await?; + tx.upsert_graph_btc_tx_vout_monitor(&GraphBtcTxVoutMonitor { + graph_id: graph.graph_id, + tx_name: MONITE_BTC_TX_NAME_ASSERT_INIT.to_string(), + txid: assert_init_txid.clone(), + height: assert_init_tx.status.block_height.unwrap_or_default() as i64, + vout_len: assert_init_tx.vout.len() as i64, + monitor_data: serde_json::to_string(&vout_monitor_data)?, + created_at: current_times, + updated_at: current_times, + }) + .await?; + tx.commit().await?; + + vout_monitor_data + }; + + if init_height <= 0 { + warn!( + "graph_id:{} assert_init_txid {} height is not confirmed, skip refresh", + graph.graph_id, assert_init_txid.0 + ); + return Ok(None); + } + let monitor_result = vout_monitor_data + .monitor_vout( + btc_client, + &assert_init_txid.clone().into(), + &graph.assert_commit_timeout_txids, + ) + .await?; + let mut tx = local_db.start_transaction().await?; + if let (Some((height, vout_len)), Some((tx_name, created_at))) = + (monitor_meta_update, existing_meta) + { + tx.upsert_graph_btc_tx_vout_monitor(&GraphBtcTxVoutMonitor { + graph_id: graph.graph_id, + tx_name, + txid: assert_init_txid.clone(), + height, + vout_len, + monitor_data: serde_json::to_string(&vout_monitor_data)?, + created_at, + updated_at: current_time_secs(), + }) + .await?; + } else { + tx.update_graph_btc_tx_vout_monitor_data( + &graph.graph_id, + &assert_init_txid, + serde_json::to_string(&vout_monitor_data)?, + ) + .await?; + } + tx.commit().await?; + + Ok(Some((vout_monitor_data, init_height, monitor_result))) +} diff --git a/node/src/scheduled_tasks/mod.rs b/node/src/scheduled_tasks/mod.rs index 162aae2b..82cf52ce 100644 --- a/node/src/scheduled_tasks/mod.rs +++ b/node/src/scheduled_tasks/mod.rs @@ -8,7 +8,6 @@ use crate::action::GOATMessageContent; use crate::env::{is_enable_update_spv_contract, is_relayer}; use crate::scheduled_tasks::graph_maintenance_tasks::{ detect_init_withdraw_call, detect_kickoff, detect_take1_or_challenge, process_graph_challenge, - scan_obsolete_sibling_graphs, }; use crate::scheduled_tasks::instance_maintenance_tasks::{ instance_answers_monitor, instance_bridge_out_monitor, instance_btc_tx_monitor, @@ -35,6 +34,7 @@ async fn fetch_on_turn_graph_by_status<'a>( storage_processor.find_graphs_by_status_group_by_operator(graph_status).await?; let mut graphs: Vec = vec![]; let mut pre_operator_pubkey = "".to_string(); + // Only process one graph for each operator each time for graph in graphs_ori { if graph.operator_pubkey != pre_operator_pubkey { pre_operator_pubkey = graph.operator_pubkey.clone(); @@ -88,10 +88,6 @@ async fn run( warn!("instance_bridge_out_monitor, err {:?}", err) } - if let Err(err) = scan_obsolete_sibling_graphs(local_db).await { - warn!("scan_obsolete_sibling_graphs, err {:?}", err) - } - if let Err(err) = detect_init_withdraw_call(local_db).await { warn!("detect_init_withdraw_call, err {:?}", err) } diff --git a/node/src/utils.rs b/node/src/utils.rs index 16dc6c4f..3eb15889 100644 --- a/node/src/utils.rs +++ b/node/src/utils.rs @@ -78,7 +78,9 @@ use crate::rpc_service::routes::v1::{ }; use crate::scheduled_tasks::get_goat_message_content_type; use crate::scheduled_tasks::graph_maintenance_tasks::{ - AssertCommitStatus, ChallengeSubStatus, CommitBlockHashStatus, WatchtowerChallengeStatus, + AssertCommitItemStatus, AssertCommitStatus, ChallengeSubStatus, CommitBlockHashStatus, + WatchtowerChallengeItemStatus, WatchtowerChallengeStatus, refresh_assert_monitor_data, + refresh_watchtower_challenge_monitor_data, }; use bitcoin_light_client_circuit::hash_operator_constant; use bitvm2_lib::transactions::base::BaseTransaction; @@ -800,16 +802,7 @@ pub(crate) async fn refresh_graph( } } }; - let mut sub_status = match scan_from_sub_status { - Some(s) => s, - None => ChallengeSubStatus { - watchtower_challenge_status: WatchtowerChallengeStatus::None, - commit_blockhash_status: CommitBlockHashStatus::None, - assert_commit_status: AssertCommitStatus::None, - disprove_type: None, - disprove_index: 0, - }, - }; + let mut sub_status = scan_from_sub_status.unwrap_or_default(); // check if Graph has been posted on GoatChain if current_status == GraphStatus::CommitteePresigned { let graph_data_on_goat = goat_client.gateway_get_graph_data(&graph_id).await?; @@ -821,7 +814,7 @@ pub(crate) async fn refresh_graph( if current_status == GraphStatus::OperatorDataPushed { let pegin_data = goat_client.gateway_get_pegin_data(&instance_id).await?; let withdraw_data = goat_client.gateway_get_withdraw_data(&graph_id).await?; - // TBD: obesolete graph when pegin is claimed rather than processing + // NOTE: maybe obesolete graph when pegin is claimed rather than processing? if pegin_data.status != PeginStatus::Withdrawable && withdraw_data.status == WithdrawStatus::None { @@ -906,7 +899,7 @@ pub(crate) async fn refresh_graph( return Ok((GraphStatus::OperatorKickOff, None)); } } - update_graph_challenge_txid( + try_update_graph_challenge_txid( btc_client, local_db, graph_id, @@ -935,162 +928,181 @@ pub(crate) async fn refresh_graph( } // check Watchtower-Challenge & Assert-Commit process if current_status == GraphStatus::Challenge { + let network = get_network(); + let current_height = btc_client.get_height().await? as i64; + let db_graph = convert_graph(graph, current_time_secs()); // check Watchtower Challenge process - let watchtower_challenge_init_txid = graph.watchtower_challenge_init.tx().compute_txid(); - if tx_on_chain(btc_client, &watchtower_challenge_init_txid).await? { - sub_status.watchtower_challenge_status = WatchtowerChallengeStatus::OperatorInit; - sub_status.commit_blockhash_status = CommitBlockHashStatus::None; // set None - let watchtower_num = graph.parameters.watchtower_pubkeys.len(); - let connector_g_vout = watchtower_num * 2; - let connector_f_vout = watchtower_num * 2 + 1; - if let Some(spent_txid) = outpoint_spent_txid( - btc_client, - &watchtower_challenge_init_txid, - connector_f_vout as u64, - ) - .await? - { - // this must not be Take2 because Take2 is already checked above - current_status = GraphStatus::Disprove; - let spent_tx = btc_client.get_tx(&spent_txid).await?.unwrap(); - let first_input_vout = spent_tx.input[0].previous_output.vout; - if first_input_vout == connector_g_vout as u32 { - sub_status.commit_blockhash_status = - CommitBlockHashStatus::OperatorCommitTimeout; - sub_status.disprove_type = Some(DisproveTxType::OperatorCommitTimeout); - sub_status.watchtower_challenge_status = - WatchtowerChallengeStatus::WatchtowerChallengeDisproveFinished; - } else { - sub_status.disprove_type = Some(DisproveTxType::OperatorNack); - sub_status.disprove_index = (first_input_vout / 2) as i32; - sub_status.watchtower_challenge_status = - WatchtowerChallengeStatus::WatchtowerChallengeDisproveFinished; - } - update_graph_status( - local_db, - instance_id, - graph_id, - current_status, - Some(sub_status), - ) - .await?; - return Ok((current_status, Some(sub_status))); - } else if let Some(watchtower_challenge_init_height) = - btc_client.get_tx_status(&watchtower_challenge_init_txid).await?.block_height + let (vout_monitor_data, watchtower_challenge_init_height, _) = + match refresh_watchtower_challenge_monitor_data(local_db, btc_client, &db_graph).await? { - let current_height = btc_client.get_height().await?; - if current_height > watchtower_challenge_init_height + nack_timelock(get_network()) - { - for i in 0..watchtower_num { - let ack_connector_vout = i * 2 + 1; - if let None = outpoint_spent_txid( - btc_client, - &watchtower_challenge_init_txid, - ack_connector_vout as u64, - ) - .await? - { - sub_status.watchtower_challenge_status = - WatchtowerChallengeStatus::OperatorACKTimeout; - break; - } - } - } else if current_height - > watchtower_challenge_init_height - + watchtower_challenge_timeout_timelock(get_network()) - { - for i in 0..watchtower_num { - let challenge_connector_vout = i * 2; - if let None = outpoint_spent_txid( - btc_client, - &watchtower_challenge_init_txid, - challenge_connector_vout as u64, - ) - .await? - { - sub_status.watchtower_challenge_status = - WatchtowerChallengeStatus::WatchtowerChallengeTimeout; - break; - } - } - } - if current_height - > watchtower_challenge_init_height - + commit_blockhash_timeout_timelock(get_network()) - { - let connector_g_vout = watchtower_num * 2; - if let None = outpoint_spent_txid( - btc_client, - &watchtower_challenge_init_txid, - connector_g_vout as u64, + Some(data) => data, + None => { + update_graph_status( + local_db, + instance_id, + graph_id, + current_status, + Some(sub_status), ) - .await? - { - sub_status.commit_blockhash_status = - CommitBlockHashStatus::OperatorCommitTimeout; - } + .await?; + return Ok((current_status, Some(sub_status))); } - } - if let Some(_) = outpoint_spent_txid( - btc_client, - &watchtower_challenge_init_txid, - connector_g_vout as u64, + }; + let is_challenge_timeout = watchtower_challenge_init_height + + (watchtower_challenge_timeout_timelock(network) as i64) + < current_height; + let is_ack_timeout = + watchtower_challenge_init_height + (nack_timelock(network) as i64) < current_height; + let is_blockhash_commit_timeout = watchtower_challenge_init_height + + (commit_blockhash_timeout_timelock(network) as i64) + < current_height; + // 1) check watchtower challenge status + if let Some((&index, _)) = vout_monitor_data + .data_map + .iter() + .find(|(_, status)| **status == WatchtowerChallengeItemStatus::OperatorNACK) + { + // 1.1) WatchtowerChallengeDisproveFinished + sub_status.watchtower_challenge_status = + WatchtowerChallengeStatus::WatchtowerChallengeDisproveFinished; + sub_status.disprove_index = index; + sub_status.disprove_type = Some(DisproveTxType::OperatorNack); + current_status = GraphStatus::Disprove; + update_graph_status(local_db, instance_id, graph_id, current_status, Some(sub_status)) + .await?; + // no further check is needed if Disproved + return Ok((current_status, Some(sub_status))); + } else if vout_monitor_data.data_map.values().all(|status| { + matches!( + status, + WatchtowerChallengeItemStatus::OperatorACK + | WatchtowerChallengeItemStatus::ChallengeTimeout ) - .await? - { - // this must be OperatorCommit because OperatorCommitTimeout is already checked above - sub_status.commit_blockhash_status = CommitBlockHashStatus::OperatorCommit; - } + }) { + // 1.2) WatchtowerChallengeNormalFinished + sub_status.watchtower_challenge_status = + WatchtowerChallengeStatus::WatchtowerChallengeNormalFinished; + } else if vout_monitor_data.data_map.values().any(|status| { + matches!( + status, + WatchtowerChallengeItemStatus::OperatorInit + | WatchtowerChallengeItemStatus::Challenge + ) + }) && is_ack_timeout + { + // 1.3) OperatorACKTimeout + sub_status.watchtower_challenge_status = WatchtowerChallengeStatus::OperatorACKTimeout; + } else if vout_monitor_data + .data_map + .values() + .any(|status| *status == WatchtowerChallengeItemStatus::OperatorInit) + && is_challenge_timeout + { + // 1.4) WatchtowerChallengeTimeout + sub_status.watchtower_challenge_status = + WatchtowerChallengeStatus::WatchtowerChallengeTimeout; + } else if vout_monitor_data + .data_map + .values() + .any(|status| *status == WatchtowerChallengeItemStatus::Challenge) + { + // 1.5) WatchtowerChallenge + sub_status.watchtower_challenge_status = WatchtowerChallengeStatus::WatchtowerChallenge; + } else if vout_monitor_data + .data_map + .values() + .any(|status| *status == WatchtowerChallengeItemStatus::OperatorInit) + { + // 1.6) OperatorInit + sub_status.watchtower_challenge_status = WatchtowerChallengeStatus::OperatorInit; + } else { + // 1.7) None + sub_status.watchtower_challenge_status = WatchtowerChallengeStatus::None; } - // check Assert Commit process - let assert_init_txid = graph.assert_init.tx().compute_txid(); - if tx_on_chain(btc_client, &assert_init_txid).await? { - sub_status.assert_commit_status = AssertCommitStatus::OperatorInit; - let assert_commit_num = graph.assert_commit_timeout_txns.len(); - let connector_d_vout = assert_commit_num; - if let Some(spent_txid) = - outpoint_spent_txid(btc_client, &assert_init_txid, connector_d_vout as u64).await? - { - // this must not be Take2 because Take2 is already checked above - current_status = GraphStatus::Disprove; - let spent_tx = btc_client.get_tx(&spent_txid).await?.unwrap(); - let first_input_vout = spent_tx.input[0].previous_output.vout; - sub_status.disprove_type = Some(DisproveTxType::AssertTimeout); - sub_status.disprove_index = first_input_vout as i32; - update_graph_status( - local_db, - instance_id, - graph_id, - current_status, - Some(sub_status), - ) + // 2) check commit blockhash status + if vout_monitor_data.commit_blockhash_status == CommitBlockHashStatus::OperatorCommitTimeout + { + // 2.1) Disproved by OperatorCommitTimeout + sub_status.commit_blockhash_status = CommitBlockHashStatus::OperatorCommitTimeout; + sub_status.disprove_index = 0; + sub_status.disprove_type = Some(DisproveTxType::OperatorCommitTimeout); + current_status = GraphStatus::Disprove; + update_graph_status(local_db, instance_id, graph_id, current_status, Some(sub_status)) .await?; - return Ok((current_status, Some(sub_status))); + return Ok((current_status, Some(sub_status))); + } else if vout_monitor_data.commit_blockhash_status == CommitBlockHashStatus::OperatorCommit + { + // 2.2) OperatorCommit + sub_status.commit_blockhash_status = CommitBlockHashStatus::OperatorCommit; + } else if sub_status.watchtower_challenge_status + == WatchtowerChallengeStatus::WatchtowerChallengeNormalFinished + { + if is_blockhash_commit_timeout { + // 2.3) WatchtowerChallengeCommitTimeout + sub_status.commit_blockhash_status = CommitBlockHashStatus::OperatorCommitTimeout; } else { - if let Some(assert_init_height) = - btc_client.get_tx_status(&assert_init_txid).await?.block_height - { - let current_height = btc_client.get_height().await?; - if current_height - > assert_init_height + assert_commit_timeout_timelock(get_network()) - { - for i in 0..assert_commit_num { - let assert_connector_vout = i; - if let None = outpoint_spent_txid( - btc_client, - &assert_init_txid, - assert_connector_vout as u64, - ) - .await? - { - sub_status.assert_commit_status = - AssertCommitStatus::OperatorCommitTimeout; - break; - } - } - } + // 2.4) WatchtowerChallengeProcessed + sub_status.commit_blockhash_status = + CommitBlockHashStatus::WatchtowerChallengeProcessed; + } + } else { + // 2.5) None + sub_status.commit_blockhash_status = CommitBlockHashStatus::None; + } + // 3) check Assert Commit process + let (vout_monitor_data, assert_init_height, _) = + match refresh_assert_monitor_data(local_db, btc_client, &db_graph).await? { + Some(data) => data, + None => { + update_graph_status( + local_db, + instance_id, + graph_id, + current_status, + Some(sub_status), + ) + .await?; + return Ok((current_status, Some(sub_status))); } + }; + let is_assert_commit_timeout = + assert_init_height + (assert_commit_timeout_timelock(network) as i64) < current_height; + if let Some((&index, _)) = vout_monitor_data + .data_map + .iter() + .find(|(_, status)| **status == AssertCommitItemStatus::OperatorCommitTimeout) + { + // 3.1) Disproved by OperatorCommitTimeout + sub_status.assert_commit_status = AssertCommitStatus::OperatorCommitTimeout; + sub_status.disprove_index = index; + sub_status.disprove_type = Some(DisproveTxType::AssertTimeout); + current_status = GraphStatus::Disprove; + update_graph_status(local_db, instance_id, graph_id, current_status, Some(sub_status)) + .await?; + // no further check is needed if Disproved + return Ok((current_status, Some(sub_status))); + } else if vout_monitor_data + .data_map + .values() + .all(|status| *status == AssertCommitItemStatus::OperatorCommit) + { + // 3.2) OperatorCommit + sub_status.assert_commit_status = AssertCommitStatus::OperatorCommit; + } else if vout_monitor_data + .data_map + .values() + .any(|status| *status == AssertCommitItemStatus::OperatorInit) + { + if is_assert_commit_timeout { + // 3.3) AssertCommitTimeout + sub_status.assert_commit_status = AssertCommitStatus::OperatorCommitTimeout; + } else { + // 3.4) OperatorInit + sub_status.assert_commit_status = AssertCommitStatus::OperatorInit; } + } else { + // 3.5) None + sub_status.assert_commit_status = AssertCommitStatus::None; } } update_graph_status(local_db, instance_id, graph_id, current_status, Some(sub_status)).await?; @@ -3739,23 +3751,16 @@ pub async fn get_instance_parameters( } } -pub async fn store_graph(local_db: &LocalDB, simple_graph: &SimplifiedBitvm2Graph) -> Result<()> { - let mut tx = local_db.start_transaction().await?; - let bitvm2_graph: Bitvm2Graph = Bitvm2Graph::from_simplified(simple_graph)?; - let (graph_id, instance_id, graph_nonce) = ( - simple_graph.parameters.graph_id, - simple_graph.parameters.instance_parameters.instance_id, - simple_graph.parameters.graph_nonce, - ); +fn convert_graph(bitvm2_graph: &Bitvm2Graph, current_time: i64) -> Graph { let mut status = GraphStatus::OperatorPresigned.to_string(); if bitvm2_graph.committee_pre_signed() { status = GraphStatus::CommitteePresigned.to_string(); } - let current_time = current_time_secs(); - let mut graph = Graph { - graph_id, - instance_id, - kickoff_index: graph_nonce as i64, + + Graph { + graph_id: bitvm2_graph.parameters.graph_id, + instance_id: bitvm2_graph.parameters.instance_parameters.instance_id, + kickoff_index: bitvm2_graph.parameters.graph_nonce as i64, from_addr: "".to_string(), to_addr: "".to_string(), amount: bitvm2_graph.parameters.instance_parameters.pegin_amount.to_sat() as i64, @@ -3807,7 +3812,16 @@ pub async fn store_graph(local_db: &LocalDB, simple_graph: &SimplifiedBitvm2Grap proceed_withdraw_height: 0, created_at: current_time, updated_at: current_time, - }; + } +} + +pub async fn store_graph(local_db: &LocalDB, simple_graph: &SimplifiedBitvm2Graph) -> Result<()> { + let mut tx = local_db.start_transaction().await?; + let bitvm2_graph: Bitvm2Graph = Bitvm2Graph::from_simplified(simple_graph)?; + let graph_id = simple_graph.parameters.graph_id; + let instance_id = simple_graph.parameters.instance_parameters.instance_id; + let current_time = current_time_secs(); + let mut graph = convert_graph(&bitvm2_graph, current_time); if let Some(node_info) = tx.get_node_by_btc_pub_key(&bitvm2_graph.parameters.operator_pubkey.to_string()).await? @@ -4364,7 +4378,7 @@ pub async fn graph_exists(local_db: &LocalDB, instance_id: Uuid, graph_id: Uuid) } #[allow(clippy::too_many_arguments)] -pub async fn update_graph_challenge_txid( +pub async fn try_update_graph_challenge_txid( btc_client: &BTCClient, local_db: &LocalDB, graph_id: Uuid, @@ -4389,14 +4403,14 @@ pub async fn update_graph_challenge_txid( && spent_txid != take1_txid { info!( - "update_graph_challenge_txid update challenge_txid: {spent_txid} for graph {graph_id}" + "try_update_graph_challenge_txid update challenge_txid: {spent_txid} for graph {graph_id}" ); let mut storage_processor = local_db.acquire().await?; storage_processor .update_graph(&GraphUpdate::new(graph_id).with_challenge_txid(spent_txid.into())) .await?; } else { - info!("update_graph_challenge_txid no need to challenge_txid for graph {graph_id}"); + info!("try_update_graph_challenge_txid no need to challenge_txid for graph {graph_id}"); } Ok(()) } From 21fe2af4ffa532272a675c9621f445a9e3714bfc Mon Sep 17 00:00:00 2001 From: KSlashh <48985735+KSlashh@users.noreply.github.com> Date: Wed, 11 Feb 2026 00:44:06 +0800 Subject: [PATCH 2/2] fix: address review comments --- node/src/action.rs | 2 +- node/src/handle.rs | 2 +- .../graph_maintenance_tasks.rs | 110 +++++++++--------- .../instance_maintenance_tasks.rs | 9 +- node/src/utils.rs | 2 + 5 files changed, 67 insertions(+), 58 deletions(-) diff --git a/node/src/action.rs b/node/src/action.rs index 759b33d7..47219f23 100644 --- a/node/src/action.rs +++ b/node/src/action.rs @@ -481,7 +481,7 @@ pub async fn push_local_unhandled_messages( true, business_id, None, - "Self".to_string(), + SELF_SENDER.to_string(), actor, content, 0, diff --git a/node/src/handle.rs b/node/src/handle.rs index d3bbe884..5b21fad5 100644 --- a/node/src/handle.rs +++ b/node/src/handle.rs @@ -2813,7 +2813,7 @@ async fn handle_assert_init_ready_operator( } match graph_sub_status { Some(sub_status) => { - if !sub_status.is_watchtower_challenge_normal_finished() { + if !sub_status.is_watchtower_challenge_success() { tracing::warn!( "Ignore AssertInitReady for {instance_id}:{graph_id}: watchtower challenge not finished yet, sub-status {sub_status:?}" ); diff --git a/node/src/scheduled_tasks/graph_maintenance_tasks.rs b/node/src/scheduled_tasks/graph_maintenance_tasks.rs index 4ae28f69..35c2ba4f 100644 --- a/node/src/scheduled_tasks/graph_maintenance_tasks.rs +++ b/node/src/scheduled_tasks/graph_maintenance_tasks.rs @@ -8,7 +8,7 @@ use crate::action::{ use crate::env::get_network; use crate::rpc_service::current_time_secs; use crate::scheduled_tasks::fetch_on_turn_graph_by_status; -use crate::utils::{outpoint_spent_txid, upsert_message}; +use crate::utils::{SELF_SENDER, outpoint_spent_txid, upsert_message}; use bitcoin::Txid; use bitvm2_lib::actors::Actor; use bitvm2_lib::challenger::{ @@ -111,7 +111,7 @@ pub enum WatchtowerChallengeStatus { WatchtowerChallenge, // Some Watchtower challenge, and timelock not expired WatchtowerChallengeTimeout, // Some Watchtower did not challenge, and timelock expired OperatorACKTimeout, // Operator did not send ACK for some Watchtower, and timelock expired - WatchtowerChallengeNormalFinished, // Normal Finished + WatchtowerChallengeNormalFinished, // Normal Finished, TODO: rename it WatchtowerChallengeDisproveFinished, // Disproved Finished } @@ -125,7 +125,7 @@ pub struct ChallengeSubStatus { } impl ChallengeSubStatus { - pub fn is_watchtower_challenge_normal_finished(&self) -> bool { + pub fn is_watchtower_challenge_success(&self) -> bool { self.watchtower_challenge_status == WatchtowerChallengeStatus::WatchtowerChallengeNormalFinished && self.commit_blockhash_status == CommitBlockHashStatus::OperatorCommit @@ -135,11 +135,11 @@ impl ChallengeSubStatus { self.disprove_type.is_some() } - pub fn is_normal_finished(&self) -> bool { - self.is_watchtower_challenge_normal_finished() && self.is_assert_commit_normal_finished() + pub fn is_all_commit_success(&self) -> bool { + self.is_watchtower_challenge_success() && self.is_assert_commit_success() } - pub fn is_assert_commit_normal_finished(&self) -> bool { + pub fn is_assert_commit_success(&self) -> bool { self.assert_commit_status == AssertCommitStatus::OperatorCommit } } @@ -161,6 +161,7 @@ pub struct WTInitTxVoutMonitorData { pub data_map: IndexMap, pub require_disproved_indexes: Vec, pub commit_blockhash_status: CommitBlockHashStatus, + #[deprecated] pub is_challenge_timeout_sent: bool, // deprecated } @@ -238,9 +239,11 @@ impl WTInitTxVoutMonitorData { fn update_disprove_indexes(&mut self) { self.require_disproved_indexes = vec![]; for (index, status) in self.data_map.iter() { - if *status == WatchtowerChallengeItemStatus::OperatorInit - || *status == WatchtowerChallengeItemStatus::Challenge - { + if matches!( + *status, + WatchtowerChallengeItemStatus::OperatorInit + | WatchtowerChallengeItemStatus::Challenge + ) { self.require_disproved_indexes.push(*index as usize); } } @@ -262,8 +265,11 @@ impl WTInitTxVoutMonitorData { self.data_map .iter() .filter(|(_, v)| { - **v == WatchtowerChallengeItemStatus::Challenge - || **v == WatchtowerChallengeItemStatus::OperatorACK + matches!( + **v, + WatchtowerChallengeItemStatus::Challenge + | WatchtowerChallengeItemStatus::OperatorACK + ) }) .count(), self.data_map.len(), @@ -303,15 +309,18 @@ impl WTInitTxVoutMonitorData { self.data_map .iter() .filter(|(_, v)| { - **v == WatchtowerChallengeItemStatus::Challenge - || **v == WatchtowerChallengeItemStatus::OperatorACK - || **v == WatchtowerChallengeItemStatus::OperatorNACK + matches!( + **v, + WatchtowerChallengeItemStatus::Challenge + | WatchtowerChallengeItemStatus::OperatorACK + | WatchtowerChallengeItemStatus::OperatorNACK + ) }) .count(), ) } - pub fn check_watchtower_challenge_normal_finished(&self) -> bool { + pub fn is_watchtower_challenge_success(&self) -> bool { self.data_map.values().all(|status| { matches!( status, @@ -391,7 +400,7 @@ impl AssertInitTxVoutMonitorData { Ok(vout_spent_detect) } - pub fn check_normal_finished(&self) -> bool { + pub fn is_assert_success(&self) -> bool { self.data_map.values().all(|status| *status == AssertCommitItemStatus::OperatorCommit) } @@ -476,7 +485,7 @@ pub async fn detect_init_withdraw_call(local_db: &LocalDB) -> anyhow::Result<()> false, graph_id, None, - "self".to_string(), + SELF_SENDER.to_string(), Actor::Operator, GOATMessageContent::KickoffReady(KickoffReady { instance_id, graph_id }), 0, @@ -530,7 +539,7 @@ pub async fn detect_kickoff(local_db: &LocalDB, btc_client: &BTCClient) -> anyho false, graph.graph_id, None, - "self".to_string(), + SELF_SENDER.to_string(), Actor::All, GOATMessageContent::KickoffSent(KickoffSent { instance_id: graph.instance_id, @@ -588,7 +597,7 @@ pub async fn detect_take1_or_challenge( false, graph.graph_id, None, - "self".to_string(), + SELF_SENDER.to_string(), actor, message_content, 0, @@ -629,17 +638,15 @@ pub async fn process_graph_challenge( ChallengeSubStatus::default() } }; - let mut is_watchtower_challenge_normal_finished = - sub_status.is_watchtower_challenge_normal_finished(); - let mut is_assert_commit_normal_finished = sub_status.is_assert_commit_normal_finished(); - let mut is_all_commit_normal_finished = - is_watchtower_challenge_normal_finished && is_assert_commit_normal_finished; - if !sub_status.is_disproved() && !is_all_commit_normal_finished { + let mut is_watchtower_challenge_success = sub_status.is_watchtower_challenge_success(); + let mut is_assert_commit_success = sub_status.is_assert_commit_success(); + let mut is_all_commit_success = is_watchtower_challenge_success && is_assert_commit_success; + if !sub_status.is_disproved() && !is_all_commit_success { trace!("graph:{} is not disproved", graph.graph_id); - if !is_watchtower_challenge_normal_finished { + if !is_watchtower_challenge_success { info!("graph:{} watchtower challenge is processing", graph.graph_id); // process_watchtower_challenge_monitoring may trigger: WatchtowerChallengeSent, WatchtowerChallengeTimeout, OperatorAckTimeout, DisproveSent(OperatorCommitTimeout/OperatorNack), OperatorCommitBlockHashReady, OperatorCommitBlockHashTimeout - is_watchtower_challenge_normal_finished = process_watchtower_challenge_monitoring( + is_watchtower_challenge_success = process_watchtower_challenge_monitoring( btc_client, local_db, &graph, @@ -648,7 +655,7 @@ pub async fn process_graph_challenge( ) .await?; } - if is_watchtower_challenge_normal_finished && !is_assert_commit_normal_finished { + if is_watchtower_challenge_success && !is_assert_commit_success { info!("graph:{} assert commit is processing", graph.graph_id); // upsert AssertInitReady message whenever watchtower challenge is finished normally, repeated inserts are idempotent upsert_message( @@ -656,7 +663,7 @@ pub async fn process_graph_challenge( false, graph.graph_id, None, - "self".to_string(), + SELF_SENDER.to_string(), Actor::Operator, GOATMessageContent::AssertInitReady(AssertInitReady { instance_id: graph.instance_id, @@ -667,7 +674,7 @@ pub async fn process_graph_challenge( ) .await?; // process_assert_commit_monitoring may trigger: DisproveSent(AssertTimeout), AssertCommitTimeout - is_assert_commit_normal_finished = process_assert_commit_monitoring( + is_assert_commit_success = process_assert_commit_monitoring( btc_client, local_db, &graph, @@ -676,10 +683,9 @@ pub async fn process_graph_challenge( ) .await?; } - is_all_commit_normal_finished = - is_watchtower_challenge_normal_finished && is_assert_commit_normal_finished; + is_all_commit_success = is_watchtower_challenge_success && is_assert_commit_success; } - if !sub_status.is_disproved() && is_all_commit_normal_finished { + if !sub_status.is_disproved() && is_all_commit_success { let mut storage_processor = local_db.acquire().await?; info!("graph:{} watchtower challenge and assert commit is finished", graph.graph_id); // upsert DisproveReady whenever both watchtower-challenge and assert is finished normally, repeated inserts are idempotent @@ -688,7 +694,7 @@ pub async fn process_graph_challenge( false, graph.graph_id, None, - "self".to_string(), + SELF_SENDER.to_string(), Actor::Challenger, GOATMessageContent::DisproveReady(DisproveReady { instance_id: graph.instance_id, @@ -708,7 +714,7 @@ pub async fn process_graph_challenge( false, graph.graph_id, None, - "self".to_string(), + SELF_SENDER.to_string(), actor, message_content, 0, @@ -879,7 +885,7 @@ async fn process_kickoff_graph( /// Process watchtower challenge monitoring /// may trigger: WatchtowerChallengeSent, WatchtowerChallengeTimeout, OperatorAckTimeout, DisproveSent(OperatorCommitTimeout/OperatorNack), OperatorCommitBlockHashReady, OperatorCommitBlockHashTimeout -/// return Ok(true) if watchtower challenge is normal finished +/// return Ok(true) if watchtower challenge is success #[tracing::instrument(level = "info", skip(btc_client, local_db))] async fn process_watchtower_challenge_monitoring( btc_client: &BTCClient, @@ -915,8 +921,8 @@ async fn process_watchtower_challenge_monitoring( "is_ack_timeout_{is_ack_timeout}, is_challenge_timeout_{is_challenge_timeout}, \ is_blockhash_commit_timeout_{is_blockhash_commit_timeout}, watchtower_challenge_init_height:{watchtower_challenge_init_height}, current_height:{current_height} ", ); - if vout_monitor_data.check_watchtower_challenge_normal_finished() { - info!("graph_id {} watchtower challenge is normal finished", graph.graph_id); + if vout_monitor_data.is_watchtower_challenge_success() { + info!("graph_id {} watchtower challenge is success", graph.graph_id); return Ok(true); } if vout_monitor_data.is_disproved() { @@ -954,7 +960,7 @@ async fn process_watchtower_challenge_monitoring( false, graph.graph_id, None, - "self".to_string(), + SELF_SENDER.to_string(), Actor::Operator, GOATMessageContent::DisproveSent(DisproveSent { instance_id: graph.instance_id, @@ -978,7 +984,7 @@ async fn process_watchtower_challenge_monitoring( false, graph.graph_id, None, - "self".to_string(), + SELF_SENDER.to_string(), Actor::Operator, GOATMessageContent::OperatorCommitBlockHashReady(OperatorCommitBlockHashReady { instance_id: graph.instance_id, @@ -995,7 +1001,7 @@ async fn process_watchtower_challenge_monitoring( false, graph.graph_id, None, - "self".to_string(), + SELF_SENDER.to_string(), Actor::Challenger, GOATMessageContent::OperatorCommitBlockHashTimeout(OperatorCommitBlockHashTimeout { instance_id: graph.instance_id, @@ -1023,7 +1029,7 @@ async fn process_watchtower_challenge_monitoring( false, graph.graph_id, None, - "self".to_string(), + SELF_SENDER.to_string(), Actor::Operator, GOATMessageContent::WatchtowerChallengeSent(WatchtowerChallengeSent { instance_id: graph.instance_id, @@ -1045,7 +1051,7 @@ async fn process_watchtower_challenge_monitoring( false, graph.graph_id, Some(vout_monitor_data.get_require_disproved_string()), - "self".to_string(), + SELF_SENDER.to_string(), Actor::Challenger, GOATMessageContent::OperatorAckTimeout(OperatorAckTimeout { instance_id: graph.instance_id, @@ -1077,7 +1083,7 @@ async fn process_watchtower_challenge_monitoring( false, graph.graph_id, Some(sub_type), - "self".to_string(), + SELF_SENDER.to_string(), Actor::Operator, GOATMessageContent::WatchtowerChallengeTimeout(WatchtowerChallengeTimeout { instance_id: graph.instance_id, @@ -1095,7 +1101,7 @@ async fn process_watchtower_challenge_monitoring( /// Process assert commit monitoring /// may trigger: DisproveSent(AssertTimeout), AssertCommitTimeout -/// return Ok(true) if assert commit is normal finished +/// return Ok(true) if assert commit is success async fn process_assert_commit_monitoring( btc_client: &BTCClient, local_db: &LocalDB, @@ -1123,8 +1129,8 @@ async fn process_assert_commit_monitoring( assert_init_height:{assert_init_height}, timelock_config.assert_commit_timelock:{}, current_height:{current_height}", graph.graph_id, timelock_config.assert_commit_timelock ); - if vout_monitor_data.check_normal_finished() { - info!("graph_id {} assert commit is normal finished", graph.graph_id); + if vout_monitor_data.is_assert_success() { + info!("graph_id {} assert commit is success", graph.graph_id); return Ok(true); } if vout_monitor_data.is_disproved() { @@ -1146,7 +1152,7 @@ async fn process_assert_commit_monitoring( false, graph.graph_id, None, - "self".to_string(), + SELF_SENDER.to_string(), Actor::Operator, GOATMessageContent::DisproveSent(DisproveSent { instance_id: graph.instance_id, @@ -1170,7 +1176,7 @@ async fn process_assert_commit_monitoring( false, graph.graph_id, Some(vout_monitor_data.get_require_disproved_string()), - "self".to_string(), + SELF_SENDER.to_string(), Actor::Challenger, GOATMessageContent::AssertCommitTimeout(AssertCommitTimeout { instance_id: graph.instance_id, @@ -1257,7 +1263,7 @@ async fn detect_kickoff_ref_disprove_tx( false, graph.graph_id, None, - "self".to_string(), + SELF_SENDER.to_string(), Actor::Committee, GOATMessageContent::DisproveSent(DisproveSent { instance_id: graph.instance_id, @@ -1444,7 +1450,7 @@ async fn check_pre_kickoff_sent( false, graph_id, None, - "self".to_string(), + SELF_SENDER.to_string(), Actor::Challenger, GOATMessageContent::PreKickoffSent(PreKickoffSent { instance_id, graph_id }), 0, @@ -1609,7 +1615,7 @@ pub(crate) async fn refresh_watchtower_challenge_monitor_data( false, graph.graph_id, None, - "self".to_string(), + SELF_SENDER.to_string(), Actor::Watchtower, GOATMessageContent::WatchtowerChallengeInitSent(WatchtowerChallengeInitSent { instance_id: graph.instance_id, diff --git a/node/src/scheduled_tasks/instance_maintenance_tasks.rs b/node/src/scheduled_tasks/instance_maintenance_tasks.rs index c18fa5f0..2faf651d 100644 --- a/node/src/scheduled_tasks/instance_maintenance_tasks.rs +++ b/node/src/scheduled_tasks/instance_maintenance_tasks.rs @@ -5,7 +5,8 @@ use crate::scheduled_tasks::event_watch_task::generate_instance_from_bridge_in_r use crate::scheduled_tasks::get_timestamp_from_contract_data; use crate::utils::evm_swap_utils::IEscrowManager::EscrowData; use crate::utils::{ - check_bridge_in_uxto_available_or_self_spent, gen_instance_parameters_local, upsert_message, + SELF_SENDER, check_bridge_in_uxto_available_or_self_spent, gen_instance_parameters_local, + upsert_message, }; use alloy::sol_types::SolType; use bitvm2_lib::actors::Actor; @@ -94,7 +95,7 @@ pub async fn instance_answers_monitor( false, tx_record.instance_id, None, - "self".to_string(), + SELF_SENDER.to_string(), Actor::All, GOATMessageContent::PeginRequest(PeginRequest { instance_id: tx_record.instance_id, @@ -321,7 +322,7 @@ pub async fn instance_btc_tx_monitor( false, instance.instance_id, None, - "self".to_string(), + SELF_SENDER.to_string(), Actor::All, GOATMessageContent::ConfirmInstance(ConfirmInstance { instance_id: instance.instance_id, @@ -337,7 +338,7 @@ pub async fn instance_btc_tx_monitor( false, instance.instance_id, None, - "self".to_string(), + SELF_SENDER.to_string(), Actor::All, GOATMessageContent::PostReady(PostReady { instance_id: instance.instance_id, diff --git a/node/src/utils.rs b/node/src/utils.rs index 3eb15889..6bcb431f 100644 --- a/node/src/utils.rs +++ b/node/src/utils.rs @@ -58,6 +58,8 @@ use std::io::{BufReader, BufWriter}; use std::net::SocketAddr; use std::path::{Path, PathBuf}; use std::str::FromStr; + +pub const SELF_SENDER: &str = "self"; use std::time::{SystemTime, UNIX_EPOCH}; use store::localdb::{ GraphQuery, GraphUpdate, InstanceQuery, InstanceUpdate, LocalDB, StorageProcessor,