diff --git a/Cargo.lock b/Cargo.lock index d90fa9eed896..01a3b39541a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7928,7 +7928,6 @@ dependencies = [ "mockall", "prometheus", "slog", - "tokio", ] [[package]] @@ -12776,6 +12775,7 @@ dependencies = [ "slog", "tempfile", "thiserror 2.0.18", + "tokio", "url", "x509-cert", ] @@ -14693,6 +14693,7 @@ dependencies = [ "strum 0.26.3", "tempfile", "test-strategy 0.4.5", + "tokio", "tree-deserializer", "uuid", ] diff --git a/rs/consensus/BUILD.bazel b/rs/consensus/BUILD.bazel index bb2430c8b3c5..b99211a80598 100644 --- a/rs/consensus/BUILD.bazel +++ b/rs/consensus/BUILD.bazel @@ -77,7 +77,6 @@ DEV_DEPENDENCIES = [ "@crate_index//:slog-term", "@crate_index//:strum", "@crate_index//:tempfile", - "@crate_index//:tokio", ] NONMALICIOUS_DEPENDENCIES = [ @@ -198,5 +197,6 @@ rust_ic_bench( "//rs/types/types", "@crate_index//:criterion", "@crate_index//:tempfile", + "@crate_index//:tokio", ], ) diff --git a/rs/consensus/benches/validate_payload.rs b/rs/consensus/benches/validate_payload.rs index 307b8637b7ee..b085c815c85a 100644 --- a/rs/consensus/benches/validate_payload.rs +++ b/rs/consensus/benches/validate_payload.rs @@ -104,6 +104,7 @@ where &StateManagerConfig::new(tmpdir.path().to_path_buf()), None, ic_types::malicious_flags::MaliciousFlags::default(), + tokio::sync::watch::channel(ic_types::Height::from(0)).0, ); setup_ingress_state(now, &mut state_manager); let state_manager = Arc::new(state_manager); diff --git a/rs/consensus/certification/BUILD.bazel b/rs/consensus/certification/BUILD.bazel index e4d113670f49..2a8de07534c5 100644 --- a/rs/consensus/certification/BUILD.bazel +++ b/rs/consensus/certification/BUILD.bazel @@ -17,7 +17,6 @@ DEPENDENCIES = [ "//rs/types/types", "@crate_index//:prometheus", "@crate_index//:slog", - "@crate_index//:tokio", ] DEV_DEPENDENCIES = [ diff --git a/rs/consensus/certification/Cargo.toml b/rs/consensus/certification/Cargo.toml index f08aec6a4436..ea337a24d4a0 100644 --- a/rs/consensus/certification/Cargo.toml +++ b/rs/consensus/certification/Cargo.toml @@ -20,7 +20,6 @@ ic-replicated-state = { path = "../../replicated_state" } ic-types = { path = "../../types/types" } prometheus = { workspace = true } slog = { workspace = true } -tokio = { workspace = true } [dev-dependencies] assert_matches = { workspace = true } diff --git a/rs/consensus/certification/src/certifier.rs b/rs/consensus/certification/src/certifier.rs index 3971a8bfbcd7..5ad829bd73c3 100644 --- a/rs/consensus/certification/src/certifier.rs +++ b/rs/consensus/certification/src/certifier.rs @@ -31,7 +31,6 @@ use ic_types::{ }; use prometheus::{Histogram, IntCounter, IntGauge}; use std::{cell::RefCell, collections::BTreeSet, sync::Arc, time::Instant}; -use tokio::sync::watch; struct CertifierMetrics { shares_created: IntCounter, @@ -50,7 +49,6 @@ pub struct CertifierImpl { metrics: CertifierMetrics, /// The highest height that has been purged. Used to avoid redundant purging. highest_purged_height: RefCell, - max_certified_height_tx: watch::Sender, log: ReplicaLogger, } @@ -151,16 +149,6 @@ impl PoolMutationsProducer for CertifierImpl { .deliver_state_certification(certification); self.metrics.last_certified_height.set(height.get() as i64); debug!(&self.log, "Delivered certification for height {}", height); - - self.max_certified_height_tx.send_if_modified(|h| { - if height > *h { - *h = height; - true - } else { - false - } - }); - true } None => false, @@ -283,7 +271,6 @@ impl CertifierImpl { consensus_pool_cache: Arc, metrics_registry: MetricsRegistry, log: ReplicaLogger, - max_certified_height_tx: watch::Sender, ) -> Self { let membership = Arc::new(Membership::new( consensus_pool_cache.clone(), @@ -318,7 +305,6 @@ impl CertifierImpl { }, log, highest_purged_height: RefCell::new(Height::from(1)), - max_certified_height_tx, } } @@ -763,8 +749,6 @@ mod tests { .. } = dependencies(pool_config.clone(), 1); - let (max_certified_height_tx, _) = watch::channel(Height::from(0)); - let certifier = CertifierImpl::new( replica_config, registry, @@ -773,7 +757,6 @@ mod tests { pool.get_cache(), MetricsRegistry::new(), log, - max_certified_height_tx, ); let mut cert = if let CertificationMessage::Certification(cert) = @@ -810,8 +793,6 @@ mod tests { ic_logger::replica_logger::no_op_logger(), metrics_registry.clone(), ); - let (max_certified_height_tx, _) = watch::channel(Height::from(0)); - let certifier = CertifierImpl::new( replica_config, registry, @@ -820,7 +801,6 @@ mod tests { pool.get_cache(), metrics_registry.clone(), log, - max_certified_height_tx, ); let bouncer_factory = CertifierBouncer::new(&metrics_registry, pool.get_cache()); @@ -874,7 +854,6 @@ mod tests { pool.advance_round_normal_operation_n(6); add_expectations(state_manager.clone(), 1, 4); let metrics_registry = MetricsRegistry::new(); - let (max_certified_height_tx, _) = watch::channel(Height::from(0)); let mut cert_pool = CertificationPoolImpl::new( replica_config.node_id, pool_config, @@ -889,7 +868,6 @@ mod tests { pool.get_cache(), metrics_registry, log, - max_certified_height_tx, ); // generate a certifications for heights 1, 2 and 4 @@ -1018,8 +996,6 @@ mod tests { ic_logger::replica_logger::no_op_logger(), metrics_registry.clone(), ); - let (max_certified_height_tx, _) = watch::channel(Height::from(0)); - with_test_replica_logger(|log| { let certifier = CertifierImpl::new( replica_config, @@ -1029,7 +1005,6 @@ mod tests { pool.get_cache(), metrics_registry, log, - max_certified_height_tx, ); std::iter::empty() @@ -1102,8 +1077,6 @@ mod tests { ic_logger::replica_logger::no_op_logger(), metrics_registry.clone(), ); - let (max_certified_height_tx, _) = watch::channel(Height::from(0)); - with_test_replica_logger(|log| { let certifier = CertifierImpl::new( replica_config, @@ -1113,7 +1086,6 @@ mod tests { pool.get_cache(), metrics_registry, log, - max_certified_height_tx, ); std::iter::empty() @@ -1178,8 +1150,6 @@ mod tests { ic_logger::replica_logger::no_op_logger(), metrics_registry.clone(), ); - let (max_certified_height_tx, _) = watch::channel(Height::from(0)); - with_test_replica_logger(|log| { let certifier = CertifierImpl::new( replica_config, @@ -1189,7 +1159,6 @@ mod tests { pool.get_cache(), metrics_registry, log, - max_certified_height_tx, ); let shares = certifier.sign( @@ -1247,8 +1216,6 @@ mod tests { // make the mock state manager return empty hashes for heights 3, 4 and 5 add_expectations(state_manager.clone(), 3, 5); let metrics_registry = MetricsRegistry::new(); - let (max_certified_height_tx, _) = watch::channel(Height::from(0)); - let certifier = CertifierImpl::new( replica_config.clone(), registry, @@ -1257,7 +1224,6 @@ mod tests { pool.get_cache(), metrics_registry.clone(), log, - max_certified_height_tx, ); let mut cert_pool = CertificationPoolImpl::new( replica_config.node_id, @@ -1428,8 +1394,6 @@ mod tests { ic_logger::replica_logger::no_op_logger(), metrics_registry.clone(), ); - let (max_certified_height_tx, _) = watch::channel(Height::from(0)); - with_test_replica_logger(|log| { let certifier = CertifierImpl::new( replica_config, @@ -1439,7 +1403,6 @@ mod tests { pool.get_cache(), metrics_registry, log, - max_certified_height_tx, ); std::iter::empty() @@ -1501,144 +1464,6 @@ mod tests { }) } - /// Test that the certifier always transmits the highest certified height that - /// has been seen so far. I.e. always transmit the global maximum height. - /// Test scenario: - /// 1. Certifier receives certifications for heights 1, 2, 3. - /// - Certifier should transmit height 3. - /// 2. Certifier receives certification for height 4. - /// - Certifier should transmit height 4. - /// 3. Certifier receives certifications for heights 4, 3, 2, 1. - /// - Certifier should not transmit any height, as none of the heights are higher - /// than the last transmitted height. - #[test] - fn test_certified_heights_are_transmitted() { - ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| { - with_test_replica_logger(|log| { - let Dependencies { - pool, - replica_config, - registry, - crypto, - state_manager, - .. - } = dependencies(pool_config.clone(), 4); - - let metrics_registry = MetricsRegistry::new(); - let (max_certified_height_tx, mut max_certified_height_rx) = - watch::channel(Height::from(0)); - let cert_pool = CertificationPoolImpl::new( - replica_config.node_id, - pool_config, - ic_logger::replica_logger::no_op_logger(), - metrics_registry.clone(), - ); - - for height in 1..=4 { - cert_pool - .validated - .insert(CertificationMessage::Certification(Certification { - height: Height::from(height), - height_witness: Some(Witness::new_for_testing_with_height()), - signed: Signed { - content: gen_content(Height::from(height)), - signature: ThresholdSignature::fake(), - }, - })); - } - - let certifier = CertifierImpl::new( - replica_config, - registry, - crypto, - state_manager.clone(), - pool.get_cache(), - metrics_registry, - log, - max_certified_height_tx, - ); - - // We expect deliver_state_certification() to be called 8 times since we call - // CertifierImpl::on_state_change 3 times with 8 heights in total: - // We mock the certified heights [1, 2, 3], [4], [4, 3, 2, 1] which are in total 8 heights. - // I.e. the certifier should deliver the state certification 8 times. - state_manager - .get_mut() - .expect_deliver_state_certification() - .times(8) - .return_const(()); - - let state_hashes = |heights: Vec| { - heights - .into_iter() - .map(|h| StateHashMetadata { - height: Height::from(h), - hash: CryptoHashOfPartialState::from(CryptoHash(Vec::new())), - height_witness: Witness::new_for_testing_with_height(), - }) - .collect::>() - }; - - // We mock the state manager to return the heights - // of the states that are certified. The CertifierImpl - // should transmit the highest height that it has seen - // each time it sees a new height it certifies by delivering it - // to the state manager. - state_manager - .get_mut() - .expect_list_state_hashes_to_certify() - .times(1) - .return_const(state_hashes(vec![1, 2, 3])); - state_manager - .get_mut() - .expect_list_state_heights_to_certify() - .times(1) - .return_const(vec![]); - - certifier.on_state_change(&cert_pool); - assert_eq!( - *max_certified_height_rx.borrow_and_update(), - Height::from(3) - ); - - // New max height is 4, so it should be transmitted - state_manager - .get_mut() - .expect_list_state_hashes_to_certify() - .times(1) - .return_const(state_hashes(vec![4])); - state_manager - .get_mut() - .expect_list_state_heights_to_certify() - .times(1) - .return_const(vec![]); - certifier.on_state_change(&cert_pool); - assert_eq!( - *max_certified_height_rx.borrow_and_update(), - Height::from(4), - "Expected height 4 to be transmitted as it is higher than previous transmitted heights" - ); - - // None of these heights are higher than the last transmitted height - state_manager - .get_mut() - .expect_list_state_hashes_to_certify() - .times(1) - .return_const(state_hashes(vec![4, 3, 2, 1])); - state_manager - .get_mut() - .expect_list_state_heights_to_certify() - .times(1) - .return_const(vec![]); - certifier.on_state_change(&cert_pool); - assert!( - !max_certified_height_rx.has_changed().unwrap(), - "No new height should be sent if they are lower than a previously sent height." - ); - }) - }) - } - /// Test that the certifier delivers certification requested by the state manager /// via the function `StateManager::list_state_heights_to_certify`. /// Test scenario: @@ -1660,8 +1485,6 @@ mod tests { } = dependencies(pool_config.clone(), 4); let metrics_registry = MetricsRegistry::new(); - let (max_certified_height_tx, _max_certified_height_rx) = - watch::channel(Height::from(0)); let cert_pool = CertificationPoolImpl::new( replica_config.node_id, pool_config, @@ -1690,7 +1513,6 @@ mod tests { pool.get_cache(), metrics_registry, log, - max_certified_height_tx, ); // We expect deliver_state_certification() to be called 2 times for heights 3 and 4. diff --git a/rs/consensus/tests/framework/runner.rs b/rs/consensus/tests/framework/runner.rs index 7caddd443062..23bc79b816a8 100644 --- a/rs/consensus/tests/framework/runner.rs +++ b/rs/consensus/tests/framework/runner.rs @@ -17,7 +17,6 @@ use std::{ sync::{Arc, Mutex}, time::{Duration, Instant}, }; -use tokio::sync::watch; fn stop_immediately(_: &ConsensusInstance<'_>) -> bool { true @@ -195,7 +194,6 @@ impl<'a> ConsensusRunner<'a> { deps.consensus_pool.read().unwrap().get_cache(), deps.metrics_registry.clone(), replica_logger.clone(), - watch::channel(Height::from(0)).0, ); let now = self.time.get_relative_time(); let in_queue: Queue = Default::default(); diff --git a/rs/consensus/tests/payload.rs b/rs/consensus/tests/payload.rs index 41ffa8330805..2a4d334ff370 100644 --- a/rs/consensus/tests/payload.rs +++ b/rs/consensus/tests/payload.rs @@ -38,7 +38,6 @@ use std::{ sync::{Arc, Mutex, RwLock}, time::Duration, }; -use tokio::sync::watch; /// Test that the batches that Consensus produces contain expected batch /// numbers and payloads @@ -156,8 +155,6 @@ fn consensus_produces_expected_batches() { &PoolReader::new(&*consensus_pool.read().unwrap()), ))); - let (dummy_watcher, _) = watch::channel(Height::from(0)); - let consensus = ic_consensus::consensus::ConsensusImpl::new( replica_config.clone(), Arc::clone(®istry_client) as Arc<_>, @@ -208,7 +205,6 @@ fn consensus_produces_expected_batches() { Arc::clone(&consensus_cache), metrics_registry.clone(), no_op_logger(), - dummy_watcher, ); let driver = ConsensusDriver::new( diff --git a/rs/determinism_test/src/setup.rs b/rs/determinism_test/src/setup.rs index f3119fb4047b..c9178843d548 100644 --- a/rs/determinism_test/src/setup.rs +++ b/rs/determinism_test/src/setup.rs @@ -121,6 +121,7 @@ pub(crate) fn setup() -> ( &config.state_manager, None, ic_types::malicious_flags::MaliciousFlags::default(), + tokio::sync::watch::channel(ic_types::Height::from(0)).0, )); let (completed_execution_messages_tx, _) = tokio::sync::mpsc::channel(1); diff --git a/rs/pocket_ic_server/src/pocket_ic.rs b/rs/pocket_ic_server/src/pocket_ic.rs index 320ae68d9ebd..dc699b531254 100644 --- a/rs/pocket_ic_server/src/pocket_ic.rs +++ b/rs/pocket_ic_server/src/pocket_ic.rs @@ -3043,6 +3043,7 @@ impl PocketIc { ), None, MaliciousFlags::default(), + tokio::sync::watch::channel(ic_types::Height::from(0)).0, ); let metadata = state_manager.get_latest_state().take().metadata.clone(); // Shut down the temporary state manager to avoid race conditions. diff --git a/rs/prep/BUILD.bazel b/rs/prep/BUILD.bazel index 7fbd2479ae78..947c63b9f8a2 100644 --- a/rs/prep/BUILD.bazel +++ b/rs/prep/BUILD.bazel @@ -42,6 +42,7 @@ DEPENDENCIES = [ "@crate_index//:slog", "@crate_index//:tempfile", "@crate_index//:thiserror", + "@crate_index//:tokio", "@crate_index//:url", "@crate_index//:x509-cert", ] diff --git a/rs/prep/Cargo.toml b/rs/prep/Cargo.toml index 54090efdc1f8..d75bdc8833f2 100644 --- a/rs/prep/Cargo.toml +++ b/rs/prep/Cargo.toml @@ -45,6 +45,7 @@ serde_json = { workspace = true } slog = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true } url = { workspace = true } x509-cert = { workspace = true } diff --git a/rs/prep/src/node.rs b/rs/prep/src/node.rs index 15f1665577a5..6cd1c94e694c 100644 --- a/rs/prep/src/node.rs +++ b/rs/prep/src/node.rs @@ -227,6 +227,7 @@ impl InitializedNode { &config, None, ic_types::malicious_flags::MaliciousFlags::default(), + tokio::sync::watch::channel(ic_types::Height::from(0)).0, ); let (_height, state) = state_manager.take_tip(); diff --git a/rs/replay/src/player.rs b/rs/replay/src/player.rs index 1b3ab51d494e..e3ef4b2f9f74 100644 --- a/rs/replay/src/player.rs +++ b/rs/replay/src/player.rs @@ -300,6 +300,7 @@ impl Player { &cfg.state_manager, None, MaliciousFlags::default(), + tokio::sync::watch::channel(ic_types::Height::from(0)).0, )); let (completed_execution_messages_tx, _) = tokio::sync::mpsc::channel(1); let execution_service = ExecutionServices::setup_execution( diff --git a/rs/replica/setup_ic_network/src/lib.rs b/rs/replica/setup_ic_network/src/lib.rs index c241a63c68a9..d3b6082d7533 100644 --- a/rs/replica/setup_ic_network/src/lib.rs +++ b/rs/replica/setup_ic_network/src/lib.rs @@ -47,7 +47,7 @@ use ic_registry_subnet_type::SubnetType; use ic_replicated_state::ReplicatedState; use ic_state_manager::state_sync::types::StateSyncMessage; use ic_types::{ - Height, NodeId, SubnetId, + NodeId, SubnetId, artifact::UnvalidatedArtifactMutation, canister_http::{CanisterHttpRequest, CanisterHttpResponse, CanisterHttpResponseArtifact}, consensus::{ @@ -63,7 +63,7 @@ use std::{ str::FromStr, sync::{Arc, Mutex, RwLock}, }; -use tokio::sync::{mpsc::Sender, watch}; +use tokio::sync::mpsc::Sender; use tower_http::trace::TraceLayer; /// This limit is used to protect against a malicious peer advertising many ingress messages. @@ -353,7 +353,6 @@ pub fn setup_consensus_and_p2p( cycles_account_manager: Arc, canister_http_adapter_client: CanisterHttpAdapterClient, registry_poll_delay_duration_ms: u64, - max_certified_height_tx: watch::Sender, ) -> ( Arc>, Sender>, @@ -457,7 +456,6 @@ pub fn setup_consensus_and_p2p( cycles_account_manager, registry_poll_delay_duration_ms, canister_http_adapter_client, - max_certified_height_tx, time_source, ) } @@ -491,7 +489,6 @@ fn start_consensus( cycles_account_manager: Arc, registry_poll_delay_duration_ms: u64, canister_http_adapter_client: CanisterHttpAdapterClient, - max_certified_height_tx: watch::Sender, time_source: Arc, ) -> ( Arc>, @@ -602,7 +599,6 @@ fn start_consensus( Arc::clone(&consensus_pool_cache) as Arc<_>, metrics_registry.clone(), log.clone(), - max_certified_height_tx, ); join_handles.push(create_artifact_handler( abortable_broadcast_channels.certifier, diff --git a/rs/replica/src/setup_ic_stack.rs b/rs/replica/src/setup_ic_stack.rs index 640de97ac565..1817c37c4d72 100644 --- a/rs/replica/src/setup_ic_stack.rs +++ b/rs/replica/src/setup_ic_stack.rs @@ -165,6 +165,7 @@ pub fn construct_ic_stack( // ---------- REPLICATED STATE DEPS FOLLOW ---------- let consensus_pool_cache = consensus_pool.read().unwrap().get_cache(); let verifier = Arc::new(VerifierImpl::new(crypto.clone())); + let (max_certified_height_tx, max_certified_height_rx) = watch::channel(Height::from(0)); let state_manager = Arc::new(StateManagerImpl::new( verifier, subnet_id, @@ -177,6 +178,7 @@ pub fn construct_ic_stack( // Hence the need of the dependency on consensus here. Some(consensus_pool_cache.starting_height()), config.malicious_behavior.malicious_flags.clone(), + max_certified_height_tx, )); // ---------- EXECUTION DEPS FOLLOW ---------- @@ -304,7 +306,6 @@ pub fn construct_ic_stack( ); // ---------- CONSENSUS AND P2P DEPS FOLLOW ---------- let state_sync = StateSync::new(state_manager.clone(), log.clone()); - let (max_certified_height_tx, max_certified_height_rx) = watch::channel(Height::from(0)); let (ingress_throttler, ingress_tx, p2p_runner) = setup_consensus_and_p2p( log, @@ -335,7 +336,6 @@ pub fn construct_ic_stack( execution_services.cycles_account_manager, canister_http_adapter_client, config.nns_registry_replicator.poll_delay_duration_ms, - max_certified_height_tx, ); // ---------- PUBLIC ENDPOINT DEPS FOLLOW ---------- diff --git a/rs/replicated_state/src/metadata_state.rs b/rs/replicated_state/src/metadata_state.rs index af258aa70e24..090262494ec9 100644 --- a/rs/replicated_state/src/metadata_state.rs +++ b/rs/replicated_state/src/metadata_state.rs @@ -767,8 +767,6 @@ impl SystemMetadata { /// roll back `Stopping` states on all subnet B canisters. /// /// Notes: - /// * `prev_state_hash` has just been set by `take_tip()` to the checkpoint - /// hash (checked against the hash in the CUP). It must be preserved. /// * `own_subnet_type` has just been set during `load_checkpoint()`, based on /// the registry subnet record of the subnet that this node is part of. /// * `batch_time`, `network_topology` and `own_subnet_features` will be set diff --git a/rs/state_machine_tests/src/lib.rs b/rs/state_machine_tests/src/lib.rs index 4aeb009d228e..ea4a88325793 100644 --- a/rs/state_machine_tests/src/lib.rs +++ b/rs/state_machine_tests/src/lib.rs @@ -1183,7 +1183,6 @@ pub struct StateMachine { consensus_pool_cache: Arc, canister_http_pool: Arc>, canister_http_payload_builder: Arc, - certified_height_tx: watch::Sender, pub ingress_watcher_handle: IngressWatcherHandle, /// A drop guard to gracefully cancel the ingress watcher task. _ingress_watcher_drop_guard: tokio_util::sync::DropGuard, @@ -1821,10 +1820,6 @@ impl StateMachine { self.certify_latest_state(); let certified_height = self.state_manager.latest_certified_height(); - self.certified_height_tx - .send(certified_height) - .expect("Ingress watcher is running"); - let state = self .state_manager .get_state_at(certified_height) @@ -2045,6 +2040,11 @@ impl StateMachine { ..Default::default() }; + // Setup ingress watcher for synchronous call endpoint. + let (completed_execution_messages_tx, completed_execution_messages_rx) = + mpsc::channel(COMPLETED_EXECUTION_MESSAGES_BUFFER_SIZE); + let (certified_height_tx, certified_height_rx) = watch::channel(Height::from(0)); + let state_manager_impl = StateManagerImpl::new( Arc::new(FakeVerifier), subnet_id, @@ -2054,6 +2054,7 @@ impl StateMachine { &sm_config, None, malicious_flags.clone(), + certified_height_tx, ); let state_manager = Arc::new(StateMachineStateManager { inner: state_manager_impl, @@ -2102,11 +2103,6 @@ impl StateMachine { let chain_key_payload_builder = Arc::new(MockBatchPayloadBuilder::new().expect_noop()); - // Setup ingress watcher for synchronous call endpoint. - let (completed_execution_messages_tx, completed_execution_messages_rx) = - mpsc::channel(COMPLETED_EXECUTION_MESSAGES_BUFFER_SIZE); - let (certified_height_tx, certified_height_rx) = watch::channel(Height::from(0)); - let cancellation_token = tokio_util::sync::CancellationToken::new(); let cancellation_token_clone = cancellation_token.clone(); let ingress_watcher_drop_guard = cancellation_token.drop_guard(); @@ -2349,7 +2345,6 @@ impl StateMachine { transform_handler: Arc::new(Mutex::new(execution_services.transform_execution_service)), ingress_watcher_handle, _ingress_watcher_drop_guard: ingress_watcher_drop_guard, - certified_height_tx, runtime, // Note: state machine tests are commonly used for testing // canisters, such tests usually don't rely on any persistence. @@ -3073,6 +3068,7 @@ impl StateMachine { .process_batch(batch) .expect("Could not process batch"); + self.state_manager.flush_hash_channel(); if self.remove_old_states { self.state_manager.remove_states_below(batch_number); } @@ -3177,6 +3173,7 @@ impl StateMachine { replicated_state.metadata.batch_time = time; self.state_manager .commit_and_certify(replicated_state, CertificationScope::Metadata, None); + self.state_manager.flush_hash_channel(); self.set_time(time.into()); *self.time_of_last_round.write().unwrap() = time; } @@ -3377,6 +3374,7 @@ impl StateMachine { self.state_manager .commit_and_certify(state, CertificationScope::Metadata, None); + self.state_manager.flush_hash_channel(); } /// Enables checkpoints and makes a tick to write a checkpoint. @@ -3417,6 +3415,7 @@ impl StateMachine { *state.canister_priority_mut(canister_id) = *source_state.canister_priority(&canister_id); self.state_manager .commit_and_certify(state, CertificationScope::Full, None); + self.state_manager.flush_hash_channel(); self.state_manager.remove_states_below(h.increment()); } @@ -3442,6 +3441,8 @@ impl StateMachine { if state.take_canister_state(&canister_id).is_some() { self.state_manager .commit_and_certify(state, CertificationScope::Full, None); + self.state_manager.flush_hash_channel(); + self.state_manager.flush_tip_channel(); other_env.import_canister_state( @@ -3654,6 +3655,7 @@ impl StateMachine { self.state_manager .commit_and_certify(state, CertificationScope::Full, None); + self.state_manager.flush_hash_channel(); // Perform the split on `env`, which requires preserving the `prev_state_hash` // (as opposed to MVP subnet splitting where it is adjusted manually). @@ -3665,6 +3667,7 @@ impl StateMachine { env.state_manager .commit_and_certify(state, CertificationScope::Full, None); + env.state_manager.flush_hash_channel(); Ok(env) } @@ -4975,6 +4978,7 @@ impl StateMachine { .stable_memory = memory; self.state_manager .commit_and_certify(replicated_state, CertificationScope::Metadata, None); + self.state_manager.flush_hash_channel(); } /// Returns the query stats of the specified canister. @@ -5007,6 +5011,7 @@ impl StateMachine { self.state_manager .commit_and_certify(state, CertificationScope::Metadata, None); + self.state_manager.flush_hash_channel(); } /// Returns the cycle balance of the specified canister. @@ -5038,6 +5043,7 @@ impl StateMachine { let balance = canister_state.system_state.balance().get(); self.state_manager .commit_and_certify(state, CertificationScope::Metadata, None); + self.state_manager.flush_hash_channel(); balance } @@ -5121,6 +5127,7 @@ impl StateMachine { /// Make sure the latest state is certified. pub fn certify_latest_state(&self) { + self.state_manager.flush_hash_channel(); certify_latest_state_helper(self.state_manager.clone(), &self.secret_key, self.subnet_id) } @@ -5257,6 +5264,7 @@ impl StateMachine { } self.state_manager .commit_and_certify(replicated_state, CertificationScope::Metadata, None); + self.state_manager.flush_hash_channel(); } } @@ -5269,6 +5277,7 @@ pub fn certify_latest_state_helper( if state_manager.latest_state_height() == Height::from(0) { let (_height, replicated_state) = state_manager.take_tip(); state_manager.commit_and_certify(replicated_state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); } assert_ne!(state_manager.latest_state_height(), Height::from(0)); if state_manager.latest_state_height() > state_manager.latest_certified_height() { diff --git a/rs/state_manager/BUILD.bazel b/rs/state_manager/BUILD.bazel index 6619475fe9dc..a341c5eeaa5a 100644 --- a/rs/state_manager/BUILD.bazel +++ b/rs/state_manager/BUILD.bazel @@ -42,6 +42,7 @@ DEPENDENCIES = [ "@crate_index//:serde_bytes", "@crate_index//:slog", "@crate_index//:tempfile", + "@crate_index//:tokio", "@crate_index//:uuid", ] diff --git a/rs/state_manager/Cargo.toml b/rs/state_manager/Cargo.toml index 8c0b9b0fac1e..9acd0381e647 100644 --- a/rs/state_manager/Cargo.toml +++ b/rs/state_manager/Cargo.toml @@ -44,6 +44,7 @@ serde_bytes = { workspace = true } slog = { workspace = true } strum = { workspace = true } tempfile = { workspace = true } +tokio = { workspace = true } tree-deserializer = { path = "../tree_deserializer" } uuid = { workspace = true } diff --git a/rs/state_manager/src/lib.rs b/rs/state_manager/src/lib.rs index f0f52575c049..26057ffd4c03 100644 --- a/rs/state_manager/src/lib.rs +++ b/rs/state_manager/src/lib.rs @@ -21,7 +21,7 @@ use crate::{ tip::{PageMapToFlush, TipRequest, flush_tip_channel, spawn_tip_thread}, }; use allowed_panics::panic_with_replica_diverged_at_height; -use crossbeam_channel::Sender; +use crossbeam_channel::{Sender, bounded, unbounded}; use ic_canonical_state::lazy_tree_conversion::replicated_state_as_lazy_tree; use ic_canonical_state_tree_hash::{ hash_tree::{HashTree, HashTreeError, hash_lazy_tree}, @@ -71,6 +71,7 @@ use ic_types::{ }; use ic_utils_thread::{JoinOnDrop, deallocator_thread::DeallocatorThread}; use ic_wasm_types::ModuleLoadingStatus; +use parking_lot::RwLockWriteGuard; use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec}; use prost::Message; use std::cmp::min; @@ -91,6 +92,7 @@ use std::{ sync::Mutex, }; use tempfile::tempfile; +use tokio::sync::watch; use uuid::Uuid; /// The number of threads that state manager starts to construct checkpoints. @@ -124,7 +126,7 @@ const ARCHIVED_DIVERGED_CHECKPOINT_MAX_AGE: Duration = Duration::from_secs(30 * /// The maximum number of consecutive rounds for which the optimization of /// skipping state cloning and computing certification metadata triggers /// while catching up. -const MAX_CONSECUTIVE_ROUNDS_WITHOUT_STATE_CLONING: u64 = 10; +const MAX_CONSECUTIVE_ROUNDS_WITHOUT_STATE_CLONING_AND_HASHING: u64 = 10; /// The maximum number of future heights starting at tip height /// that the state manager optimistically asks consensus to certify @@ -190,7 +192,6 @@ pub struct StateManagerMetrics { latest_hash_tree_max_index: IntGauge, fast_forward_height: IntGauge, no_state_clone_count: IntCounter, - tip_hash_count: IntCounter, } #[derive(Clone)] @@ -486,11 +487,6 @@ impl StateManagerMetrics { "Number of heights whose states were not cloned and not stored by this node.", ); - let tip_hash_count = metrics_registry.int_counter( - "state_manager_tip_hash_count", - "Number of tip heights whose state snapshot was not stored by this node and whose state hash was computed by this node.", - ); - Self { state_manager_error_count, checkpoint_op_duration, @@ -518,7 +514,6 @@ impl StateManagerMetrics { latest_hash_tree_max_index, fast_forward_height, no_state_clone_count, - tip_hash_count, } } @@ -951,16 +946,19 @@ pub struct StateManagerImpl { // Cached latest state height. We cache it separately because it's // requested quite often and this causes high contention on the lock. latest_state_height: Arc, - latest_certified_height: AtomicU64, + latest_certified_height: Arc, fast_forward_height: AtomicU64, persist_metadata_guard: Arc>, tip_channel: Sender, _tip_thread_handle: JoinOnDrop<()>, + hash_channel: Sender, + _hash_thread_handle: JoinOnDrop<()>, fd_factory: Arc, malicious_flags: MaliciousFlags, latest_height_update_time: Arc>, /// The height at which this StateManager was started. Set once during initialization and never modified. started_height: Height, + max_certified_height_tx: Arc>, } #[cfg(debug_assertions)] @@ -1310,6 +1308,7 @@ impl StateManagerImpl { /// Finish all asynchronous operations. pub fn flush_all(&self) { + self.flush_hash_channel(); self.flush_tip_channel(); self.state_layout().flush_checkpoint_removal_channel(); } @@ -1326,6 +1325,7 @@ impl StateManagerImpl { config: &Config, starting_height: Option, malicious_flags: MaliciousFlags, + max_certified_height_tx: watch::Sender, ) -> Self { let metrics = StateManagerMetrics::new(metrics_registry, log.clone()); @@ -1368,6 +1368,7 @@ impl StateManagerImpl { metrics.clone(), malicious_flags.clone(), ); + let (_hash_thread_handle, hash_channel) = spawn_hash_thread(metrics.clone(), log.clone()); let starting_time = Instant::now(); let loaded_states_metadata = @@ -1513,8 +1514,8 @@ impl StateManagerImpl { starting_time.elapsed() ); - let latest_state_height = AtomicU64::new(0); - let latest_certified_height = AtomicU64::new(0); + let latest_state_height = Arc::new(AtomicU64::new(0)); + let latest_certified_height = Arc::new(AtomicU64::new(0)); let fast_forward_height = AtomicU64::new(0); let initial_snapshot = Snapshot { @@ -1620,7 +1621,7 @@ impl StateManagerImpl { } report_last_diverged_state(&log, &metrics, &state_layout); - + let latest_height_update_time = Arc::new(Mutex::new(Instant::now())); Self { log, metrics, @@ -1630,16 +1631,19 @@ impl StateManagerImpl { own_subnet_id, own_subnet_type, deallocator_thread, - latest_state_height: Arc::new(latest_state_height), + latest_state_height, latest_certified_height, fast_forward_height, persist_metadata_guard, tip_channel, _tip_thread_handle, + hash_channel, + _hash_thread_handle, fd_factory, malicious_flags, - latest_height_update_time: Arc::new(Mutex::new(Instant::now())), + latest_height_update_time, started_height, + max_certified_height_tx: Arc::new(max_certified_height_tx), } } @@ -1967,35 +1971,9 @@ impl StateManagerImpl { } } - fn populate_extra_metadata(&self, state: &mut ReplicatedState, height: Height) { + fn populate_extra_metadata(&self, state: &mut ReplicatedState) { state.metadata.state_sync_version = CURRENT_STATE_SYNC_VERSION; state.metadata.certification_version = ic_canonical_state::CURRENT_CERTIFICATION_VERSION; - - if height == Self::INITIAL_STATE_HEIGHT { - return; - } - let prev_height = height - Height::from(1); - - if prev_height == Self::INITIAL_STATE_HEIGHT { - return; - } - - let states = self.states.read(); - if let Some(metadata) = states.certifications_metadata.get(&prev_height) { - assert_eq!( - state.metadata.prev_state_hash, - Some(CryptoHashOfPartialState::from( - metadata.certified_state_hash.clone(), - )) - ); - } else { - info!( - self.log, - "The previous certification metadata at height {} are not available. This can happen when the replica \ - (i) catches up or (ii) syncs a newer state concurrently and removes the states below.", - prev_height, - ); - } } fn find_checkpoint_by_root_hash( @@ -2020,6 +1998,47 @@ impl StateManagerImpl { }) } + /// Helper to share `on_synced_checkpoint` code with testing code. + fn push_state_and_cert_metadata( + &self, + height: Height, + latest_state_height: &AtomicU64, + state: ReplicatedState, + states: &mut RwLockWriteGuard<'_, SharedState>, + ) { + let lazy_tree = replicated_state_as_lazy_tree(&state, height); + let hash_tree = hash_lazy_tree(&lazy_tree) + .unwrap_or_else(|err| fatal!(self.log, "Failed to compute hash tree: {:?}", err)); + update_hash_tree_metrics(&hash_tree, &self.metrics); + let height_witness = state_height_witness(&lazy_tree, &hash_tree, &self.metrics); + drop(lazy_tree); + let certification_metadata = CertificationMetadata { + certified_state_hash: crypto_hash_of_tree(&hash_tree), + height_witness, + hash_tree: Some((Arc::new(hash_tree), Instant::now())), + certification: None, + }; + + states.snapshots.push_back(Snapshot { + height, + state: Arc::new(state), + }); + states + .snapshots + .make_contiguous() + .sort_by_key(|snapshot| snapshot.height); + + self.metrics + .resident_state_count + .set(states.snapshots.len() as i64); + + states + .certifications_metadata + .insert(height, certification_metadata); + + update_latest_height(latest_state_height, height); + } + fn on_synced_checkpoint( &self, state: ReplicatedState, @@ -2046,19 +2065,6 @@ impl StateManagerImpl { } } - let lazy_tree = replicated_state_as_lazy_tree(&state, height); - let hash_tree = hash_lazy_tree(&lazy_tree) - .unwrap_or_else(|err| fatal!(self.log, "Failed to compute hash tree: {:?}", err)); - update_hash_tree_metrics(&hash_tree, &self.metrics); - let height_witness = state_height_witness(&lazy_tree, &hash_tree, &self.metrics); - drop(lazy_tree); - let certification_metadata = CertificationMetadata { - certified_state_hash: crypto_hash_of_tree(&hash_tree), - height_witness, - hash_tree: Some((Arc::new(hash_tree), Instant::now())), - certification: None, - }; - let mut states = self.states.write(); #[cfg(debug_assertions)] check_certifications_metadata_snapshots_and_states_metadata_are_consistent(&states); @@ -2082,22 +2088,12 @@ impl StateManagerImpl { if !is_snapshot_present { // Normal case: we don't have the in-memory state yet. - states.snapshots.push_back(Snapshot { + self.push_state_and_cert_metadata( height, - state: Arc::new(state), - }); - states - .snapshots - .make_contiguous() - .sort_by_key(|snapshot| snapshot.height); - - self.metrics - .resident_state_count - .set(states.snapshots.len() as i64); - - states - .certifications_metadata - .insert(height, certification_metadata); + &self.latest_state_height, + state, + &mut states, + ); } else { // Rare case: we already have the in-memory state. info!( @@ -2167,6 +2163,11 @@ impl StateManagerImpl { last_checkpoint_to_keep ); + let tip_height = { + let states = self.states.read(); + states.tip_height + }; + // In debug builds we store the latest_state_height here so // that we can verify later that this height is retained. #[cfg(debug_assertions)] @@ -2265,6 +2266,7 @@ impl StateManagerImpl { // as decisions to retain a checkpoint or an in-memory state are made independently. let inmemory_heights_to_keep = std::iter::once(latest_certified_height) .chain(extra_inmemory_heights_to_keep.iter().copied()) + .chain(std::iter::once(tip_height)) .collect::>(); let (removed, retained) = states.snapshots.drain(0..).partition(|snapshot| { @@ -2348,6 +2350,11 @@ impl StateManagerImpl { .set(latest_certified_height.get() as i64); let mut certifications = states.certifications.split_off(&last_height_to_keep); + for h in inmemory_heights_to_keep.iter() { + if let Some(cert) = states.certifications.remove(h) { + certifications.insert(*h, cert); + } + } std::mem::swap(&mut certifications, &mut states.certifications); self.deallocator_thread.send(Box::new(certifications)); @@ -2639,6 +2646,24 @@ impl StateManagerImpl { } } +fn update_latest_certified_height( + latest_certified_height: &AtomicU64, + metrics: &StateManagerMetrics, + max_certified_height_tx: &watch::Sender, + height: Height, +) { + let latest_certified = update_latest_height(latest_certified_height, height); + metrics.latest_certified_height.set(latest_certified as i64); + max_certified_height_tx.send_if_modified(|h| { + if height > *h { + *h = height; + true + } else { + false + } + }); +} + fn initial_state(own_subnet_id: SubnetId, own_subnet_type: SubnetType) -> Labeled { Labeled::new( StateManagerImpl::INITIAL_STATE_HEIGHT, @@ -2744,56 +2769,14 @@ impl StateManager for StateManagerImpl { let mut states = self.states.write(); let tip_height = states.tip_height; - let mut tip = states.tip.take().expect("failed to get TIP"); - - let (target_snapshot, target_hash) = match states.snapshots.back() { - Some(snapshot) if snapshot.height > tip_height => { - let tip_height = snapshot.height; - - let tip_metadata = states - .certifications_metadata - .get(&tip_height) - .unwrap_or_else(|| { - fatal!(self.log, "Bug: missing tip metadata @{}", tip_height) - }); + let tip = states.tip.take().expect("failed to get TIP"); - // Since the state machine will use this tip to compute the *next* state, - // we populate the prev_state_hash with the hash of the current tip. - let tip_hash = - CryptoHashOfPartialState::from(tip_metadata.certified_state_hash.clone()); - - (snapshot.clone(), tip_hash) - } + let target_snapshot = match states.snapshots.back() { + // The most recent available state is more recent than what we have in the tip, + // because we are catching up. + Some(snapshot) if snapshot.height > tip_height => snapshot.clone(), + // The tip is the most recent state we know of, proceed with that. _ => { - let tip_hash = if let Some(tip_metadata) = - states.certifications_metadata.get(&tip_height) - { - CryptoHashOfPartialState::from(tip_metadata.certified_state_hash.clone()) - } else if let Some(tip_certification) = states.certifications.get(&tip_height) { - tip_certification.signed.content.hash.clone() - } else { - std::mem::drop(states); - - let mut tip_certification_metadata = Self::compute_certification_metadata( - &tip, - tip_height, - &self.metrics, - &self.log, - ) - .unwrap_or_else(|err| { - fatal!(self.log, "Failed to compute hash tree: {:?}", err) - }); - let tip_certified_state_hash = tip_certification_metadata.certified_state_hash; - if let Some((hash_tree, _)) = tip_certification_metadata.hash_tree.take() { - self.deallocator_thread.send(Box::new(hash_tree)); - } - - self.metrics.tip_hash_count.inc(); - - CryptoHashOfPartialState::from(tip_certified_state_hash) - }; - - tip.metadata.prev_state_hash = Some(tip_hash); return (tip_height, tip); } }; @@ -2830,15 +2813,13 @@ impl StateManager for StateManagerImpl { states.tip_height = target_snapshot.height; std::mem::drop(states); - let mut new_tip = initialize_tip( + let new_tip = initialize_tip( &self.log, &self.tip_channel, &target_snapshot, checkpoint_layout, ); - new_tip.metadata.prev_state_hash = Some(target_hash); - // This might still not be the latest version: there might have been // another successful state sync while we were updating the tip. // That is not a problem: we will handle this case later in commit_and_certify(). @@ -3118,11 +3099,12 @@ impl StateManager for StateManagerImpl { ); } - let latest_certified = - update_latest_height(&self.latest_certified_height, certification.height); - self.metrics - .latest_certified_height - .set(latest_certified as i64); + update_latest_certified_height( + &self.latest_certified_height, + &self.metrics, + &self.max_certified_height_tx, + certification_height, + ); if let Some((_, certification_requested_at)) = metadata.hash_tree { self.metrics @@ -3329,12 +3311,64 @@ impl StateManager for StateManagerImpl { .with_label_values(&["commit_and_certify"]) .start_timer(); - let height = { - let states = self.states.read(); - states.tip_height.increment() + let states = self.states.read(); + let (prev_height, height) = { + let prev_height = states.tip_height; + let height = prev_height.increment(); + (prev_height, height) }; - self.populate_extra_metadata(&mut state, height); + self.populate_extra_metadata(&mut state); + + // Get the previous state hash either from consensus via certifications (if we are catching up) + // or wait for the hashing thread to finish computing it. + let maybe_hash = states + .certifications + .get(&prev_height) + .map(|x| x.signed.content.hash.clone().get()); + drop(states); + let prev_state_hash = if let Some(hash) = maybe_hash { + hash + } else { + // At prev_height 0, we don't have a hash yet, so we have to compute it. + if prev_height.get() == 0 { + let states = self.states.read(); + let initial_snapshot = &states + .snapshots + .front() + .expect("Initial state should always be present in states.snapshots."); + debug_assert_eq!(initial_snapshot.height.get(), 0); + let initial_state = &initial_snapshot.state; + let certification = StateManagerImpl::compute_certification_metadata( + initial_state, + prev_height, + &self.metrics, + &self.log, + ) + .unwrap_or_else(|err| fatal!(self.log, "Failed to compute hash tree: {:?}", err)); + certification.certified_state_hash.clone() + } else { + // Wait for the hashing thread. + let (sender, recv) = bounded(1); + self.hash_channel + .send(HashRequest::Wait { sender }) + .expect("Failed to send `Wait` to hash channel"); + recv.recv().expect("Failed to wait for hash channel"); + // After awaiting the hashing thread, snapshot and certification_metadata + // must have an entry at prev_height. + let states = self.states.read(); + if let Some(cert_md) = states.certifications_metadata.get(&prev_height) { + cert_md.certified_state_hash.clone() + } else { + fatal!( + self.log, + "Previous state hash was not available after awaiting the hash thread. This is a bug." + ); + } + } + }; + // Write the previous state hash to the state. + state.metadata.prev_state_hash = Some(CryptoHashOfPartialState::from(prev_state_hash)); if let CertificationScope::Metadata = scope { // We want to balance writing too many overlay files with having too many unflushed pages at @@ -3368,29 +3402,36 @@ impl StateManager for StateManagerImpl { // If the node is catching up (`height.get() < fast_forward_height`) // and this is not a checkpoint height (`matches!(scope, CertificationScope::Metadata)`), + // and the state hash @ height is present in states.certifications (via consensus), // then we do not clone, do not hash, and do not store the state and certification metadata. - // This optimization is skipped every `MAX_CONSECUTIVE_ROUNDS_WITHOUT_STATE_CLONING` heights + // This optimization is skipped every `MAX_CONSECUTIVE_ROUNDS_WITHOUT_STATE_CLONING_AND_HASHING` heights // so that we always have a reasonably "recent" state snapshot and // its certification metadata available. - let fast_forward_height = self.fast_forward_height.load(Ordering::Relaxed); - if matches!(scope, CertificationScope::Metadata) - && height.get() < fast_forward_height - && !height - .get() - .is_multiple_of(MAX_CONSECUTIVE_ROUNDS_WITHOUT_STATE_CLONING) - { + let maybe_delivered_certification = { + // Scope to drop this lock. let mut states = self.states.write(); - #[cfg(debug_assertions)] - check_certifications_metadata_snapshots_and_states_metadata_are_consistent(&states); + let maybe_delivered_certification = states.certifications.get(&height).cloned(); + let fast_forward_height = self.fast_forward_height.load(Ordering::Relaxed); + if matches!(scope, CertificationScope::Metadata) + && height.get() < fast_forward_height + && !height + .get() + .is_multiple_of(MAX_CONSECUTIVE_ROUNDS_WITHOUT_STATE_CLONING_AND_HASHING) + && maybe_delivered_certification.is_some() + { + #[cfg(debug_assertions)] + check_certifications_metadata_snapshots_and_states_metadata_are_consistent(&states); - assert_tip_is_none(&states); + assert_tip_is_none(&states); - self.metrics.no_state_clone_count.inc(); + self.metrics.no_state_clone_count.inc(); - states.tip_height = height; - states.tip = Some(state); - return; - } + states.tip_height = height; + states.tip = Some(state); + return; + } + maybe_delivered_certification + }; self.metrics .tip_handler_queue_length @@ -3417,18 +3458,21 @@ impl StateManager for StateManagerImpl { CertificationScope::Metadata => Arc::new(state), }; - let mut certification_metadata = - Self::compute_certification_metadata(&state, height, &self.metrics, &self.log) - .unwrap_or_else(|err| fatal!(self.log, "Failed to compute hash tree: {:?}", err)); - - if scope == CertificationScope::Full { - info!( - self.log, - "Certification hash for height {}: {:?}", - height, - certification_metadata.certified_state_hash - ); - } + // Kick off hashing of the new state. This will also compare the result with the + // delivered hash, if present, in order to detect divergence. + let hash_req = HashRequest::HashState { + state: Arc::clone(&state), + states: Arc::clone(&self.states), + latest_state_height: Arc::clone(&self.latest_state_height), + latest_certified_height: Arc::clone(&self.latest_certified_height), + height, + latest_height_update_time: Arc::clone(&self.latest_height_update_time), + reference_certification: Box::new(maybe_delivered_certification), + scope: scope.clone(), + state_layout: Box::new(self.state_layout.clone()), + max_certified_height_tx: Arc::clone(&self.max_certified_height_tx), + }; + self.hash_channel.send(hash_req).unwrap(); // This step is expensive, so we do it before the write lock for `states`. let next_tip = { @@ -3440,77 +3484,19 @@ impl StateManager for StateManagerImpl { (height, state.deref().clone()) }; + // For checkpoint heights, we await the state hash immediately. This may not be necessary, + // but it keeps the existing checkpointing behaviour as is. + // Note: This must not be called while a write lock to `states` is being held. + if scope == CertificationScope::Full { + self.flush_hash_channel(); + } + let mut states = self.states.write(); #[cfg(debug_assertions)] check_certifications_metadata_snapshots_and_states_metadata_are_consistent(&states); assert_tip_is_none(&states); - let assert_prev_hash_matches = |prev_hash| { - let hash = &certification_metadata.certified_state_hash; - if prev_hash != hash { - if let Err(err) = self.state_layout.create_diverged_state_marker(height) { - error!( - self.log, - "Failed to mark state @{} diverged: {}", height, err - ); - } - panic!( - "Committed state @{height} with hash {hash:?} which is different from previously computed or delivered hash {prev_hash:?}" - ); - } - }; - - // It's possible that we already computed this state before. We - // validate that hashes agree to spot bugs causing non-determinism as - // early as possible. - if let Some(prev_metadata) = states.certifications_metadata.get(&height) { - let prev_hash = &prev_metadata.certified_state_hash; - assert_prev_hash_matches(prev_hash); - } - - // We reuse certification delivered by consensus if possible. - // We also validate that hashes agree to spot bugs causing non-determinism as - // early as possible. - if let Some(certification) = states.certifications.get(&height) { - let prev_hash = &certification.signed.content.hash.clone().get(); - assert_prev_hash_matches(prev_hash); - certification_metadata.certification = Some(certification.clone()); - } - - if !states - .snapshots - .iter() - .any(|snapshot| snapshot.height == height) - { - states.snapshots.push_back(Snapshot { - height, - state: Arc::clone(&state), - }); - states - .snapshots - .make_contiguous() - .sort_by_key(|snapshot| snapshot.height); - - states - .certifications_metadata - .insert(height, certification_metadata); - - let latest_height = update_latest_height(&self.latest_state_height, height); - self.metrics.max_resident_height.set(latest_height as i64); - { - let mut last_height_update_time = self - .latest_height_update_time - .lock() - .expect("Failed to lock last height update time."); - let now = Instant::now(); - self.metrics - .height_update_time_seconds - .observe((now - *last_height_update_time).as_secs_f64()); - *last_height_update_time = now; - } - } - if let Some((state_metadata, compute_manifest_request)) = state_metadata_and_compute_manifest_request { @@ -3536,7 +3522,6 @@ impl StateManager for StateManagerImpl { // tip if needed. states.tip_height = next_tip.0; states.tip = Some(next_tip.1); - if scope == CertificationScope::Full { self.release_lock_and_persist_metadata(states); } @@ -3595,6 +3580,179 @@ impl StateManager for StateManagerImpl { } } +enum HashRequest { + HashState { + state: Arc, + states: Arc>, + latest_state_height: Arc, + latest_certified_height: Arc, + height: Height, + latest_height_update_time: Arc>, + /// A certification from consensus. If `Some`, we compare it with the state hash + /// we calculate and panic on divergence. It should be `Some` whenever we are catching up and could + /// skip hashing, but we do hash anyway because we are at a height which is a multiple of + /// `MAX_CONSECUTIVE_ROUNDS_WITHOUT_STATE_CLONING`. + reference_certification: Box>, + scope: CertificationScope, + /// Boxed so that variants have similar size and we don't waste space when sending `HashRequest::Wait`. + state_layout: Box, + max_certified_height_tx: Arc>, + }, + /// Wait for the message to be executed and notify back via `sender`. + Wait { sender: Sender<()> }, +} + +fn spawn_hash_thread( + metrics: StateManagerMetrics, + log: ReplicaLogger, +) -> (JoinOnDrop<()>, Sender) { + #[allow(clippy::disallowed_methods)] + let (hash_req_sender, receiver) = unbounded(); + let handle = JoinOnDrop::new( + std::thread::Builder::new() + .name("HashThread".to_string()) + .spawn(move || { + while let Ok(req) = receiver.recv() { + match req { + HashRequest::HashState { + state, + states, + latest_state_height, + latest_certified_height, + height, + latest_height_update_time, + reference_certification, + scope, + state_layout, + max_certified_height_tx, + } => { + let mut certification_metadata = + StateManagerImpl::compute_certification_metadata( + &state, height, &metrics, &log, + ) + .unwrap_or_else(|err| { + fatal!(log, "Failed to compute hash tree: {:?}", err) + }); + if scope == CertificationScope::Full { + info!( + log, + "Certification hash for height {}: {:?}", + height, + certification_metadata.certified_state_hash + ); + } + let hash = &certification_metadata.certified_state_hash; + + // Closure to compare computed hash with potential hashes from other sources. + let assert_prev_hash_matches = |prev_hash: &CryptoHash, msg: &str| { + if prev_hash != hash { + if let Err(err) = state_layout.create_diverged_state_marker(height) { + error!( + log, + "Failed to mark state @{} diverged: {}", height, err + ); + } + panic!( + "Committed state @{height} with hash {hash:?} which is different from {msg} hash {prev_hash:?}" + ); + } + }; + // If a reference hash from consensus is available, check if we agree. + if let Some(ref cert) = *reference_certification { + let delivered_hash = cert.signed.content.hash.as_ref(); + assert_prev_hash_matches(delivered_hash, "delivered"); + // If we do agree, write the certification to the metadata, so that consensus does + // not have to deliver it again. + certification_metadata.certification = *reference_certification; + } + + // It's possible that we already computed this state before. We + // validate that hashes agree to spot bugs causing non-determinism as + // early as possible. + let mut states = states.write(); + if let Some(prev_metadata) = states.certifications_metadata.get(&height) { + let prev_hash = &prev_metadata.certified_state_hash; + assert_prev_hash_matches(prev_hash, "previously computed"); + } + + // Add state and hash to snapshots and certification_metadata + if !states + .snapshots + .iter() + .any(|snapshot| snapshot.height == height) + { + states.snapshots.push_back(Snapshot { + height, + state: Arc::clone(&state), + }); + states + .snapshots + .make_contiguous() + .sort_by_key(|snapshot| snapshot.height); + + let has_certification = + certification_metadata.certification.is_some(); + states + .certifications_metadata + .insert(height, certification_metadata); + let latest_height = + update_latest_height(&latest_state_height, height); + + metrics.max_resident_height.set(latest_height as i64); + { + let mut last_height_update_time = latest_height_update_time + .lock() + .expect("Failed to lock last height update time."); + let now = Instant::now(); + metrics + .height_update_time_seconds + .observe((now - *last_height_update_time).as_secs_f64()); + *last_height_update_time = now; + } + // Only update the certified height and notify the channel if the + // certification is actually being stored. We must not fire the channel + // when the snapshot already existed (e.g., due to state sync), because + // in that case `certification_metadata` is not updated and the + // certified state at this height would not be available. + if has_certification { + update_latest_certified_height( + &latest_certified_height, + &metrics, + &max_certified_height_tx, + height, + ); + } + } + } + HashRequest::Wait { sender } => { + sender.send(()).unwrap(); + } + } + } + }) + .unwrap(), + ); + (handle, hash_req_sender) +} + +impl StateManagerImpl { + /// After this method terminates, both `SharedState.snapshots` and `SharedState.certification_metadata` + /// at the height from the previous `commit_and_certify` are populated. It also updates `latest_state_height` + /// to the maximum of the value before and the committed height. It may also update `latest_certified_state_height`. + /// + /// This used to happen synchronously inside `commit_and_certify`, but now happens in the hash thread + /// at an unpredictable time. + /// + /// Note: Do not call this function while the calling scope holds a write lock to `SharedState`. + pub fn flush_hash_channel(&self) { + let (sender, recv) = bounded(1); + self.hash_channel + .send(HashRequest::Wait { sender }) + .expect("failed to send Wait message to hashing thread"); + recv.recv().expect("failed to wait for hashing thread"); + } +} + struct CertifiedStateSnapshotImpl { certification: Certification, state: Arc, @@ -4229,6 +4387,9 @@ pub mod testing { /// Testing only: Returns `fast_forward_height`. fn fast_forward_height(&self) -> u64; + + /// Testing only: Push state + fn push_state_and_cert_metadata(&self, height: Height, state: ReplicatedState); } impl StateManagerTesting for StateManagerImpl { @@ -4337,5 +4498,15 @@ pub mod testing { fn fast_forward_height(&self) -> u64 { self.fast_forward_height.load(Ordering::Relaxed) } + + fn push_state_and_cert_metadata(&self, height: Height, state: ReplicatedState) { + let mut states = self.states.write(); + self.push_state_and_cert_metadata( + height, + &self.latest_state_height, + state, + &mut states, + ); + } } } diff --git a/rs/state_manager/src/state_sync/chunkable/cache/tests.rs b/rs/state_manager/src/state_sync/chunkable/cache/tests.rs index 9a3740e7bdf9..5ec4ec8250be 100644 --- a/rs/state_manager/src/state_sync/chunkable/cache/tests.rs +++ b/rs/state_manager/src/state_sync/chunkable/cache/tests.rs @@ -43,6 +43,7 @@ impl TestEnvironment { &config, None, ic_types::malicious_flags::MaliciousFlags::default(), + tokio::sync::watch::channel(ic_types::Height::from(0)).0, )); let state_layout = state_manager.state_layout.clone(); diff --git a/rs/state_manager/tests/common/mod.rs b/rs/state_manager/tests/common/mod.rs index 81a651714461..055139dda1fe 100644 --- a/rs/state_manager/tests/common/mod.rs +++ b/rs/state_manager/tests/common/mod.rs @@ -157,7 +157,7 @@ pub fn encode_decode_stream_test< }); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); - + state_manager.flush_hash_channel(); certify_height(&state_manager, Height::new(1)); let slice = state_manager @@ -219,7 +219,7 @@ pub fn encode_partial_slice_test( }); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); - + state_manager.flush_hash_channel(); certify_height(&state_manager, Height::new(1)); let slice = state_manager @@ -313,6 +313,7 @@ pub fn modify_encoded_stream_helper Stream>( }); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); certify_height(state_manager, Height::new(2)); @@ -573,6 +574,7 @@ fn state_manager_with_verifier_result( &config, None, ic_types::malicious_flags::MaliciousFlags::default(), + tokio::sync::watch::channel(Height::from(0)).0, ); (state_manager, tmp) } @@ -627,6 +629,7 @@ fn state_manager_test_with_state_sync_and_verifier_result< &config, None, ic_types::malicious_flags::MaliciousFlags::default(), + tokio::sync::watch::channel(Height::from(0)).0, )); f(&metrics_registry, sm.clone(), StateSync::new(sm, log)); }) @@ -665,6 +668,7 @@ where &config, starting_height, ic_types::malicious_flags::MaliciousFlags::default(), + tokio::sync::watch::channel(Height::from(0)).0, )); let state_sync = StateSync::new(state_manager.clone(), log.clone()); @@ -723,6 +727,7 @@ where &config, starting_height, ic_types::malicious_flags::MaliciousFlags::default(), + tokio::sync::watch::channel(Height::from(0)).0, ); (metrics_registry, state_manager) @@ -785,6 +790,7 @@ where &config, starting_height, ic_types::malicious_flags::MaliciousFlags::default(), + tokio::sync::watch::channel(Height::from(0)).0, ); (metrics_registry, state_manager) diff --git a/rs/state_manager/tests/state_manager.rs b/rs/state_manager/tests/state_manager.rs index 14f6a095b3e2..03d01e71bd06 100644 --- a/rs/state_manager/tests/state_manager.rs +++ b/rs/state_manager/tests/state_manager.rs @@ -992,6 +992,7 @@ fn state_manager_crash_test( &config, None, ic_types::malicious_flags::MaliciousFlags::default(), + tokio::sync::watch::channel(Height::from(0)).0, )); }) .expect_err(&format!("Crash test fixture {i} did not crash")); @@ -1010,6 +1011,7 @@ fn state_manager_crash_test( &config, None, ic_types::malicious_flags::MaliciousFlags::default(), + tokio::sync::watch::channel(Height::from(0)).0, ), ); }); @@ -1051,10 +1053,12 @@ fn latest_state_height_updated_on_commit() { assert_eq!(Height(0), state_manager.latest_state_height()); state_manager.commit_and_certify(tip, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); assert_eq!(Height(1), state_manager.latest_state_height()); let (_, tip) = state_manager.take_tip(); state_manager.commit_and_certify(tip, CertificationScope::Full, None); + state_manager.flush_hash_channel(); assert_eq!(Height(2), state_manager.latest_state_height()); }) } @@ -1067,6 +1071,7 @@ fn populates_prev_state_hash() { let (_height, state_1) = state_manager.take_tip(); state_manager.commit_and_certify(state_1, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); let state_2 = state_manager.get_latest_state().take(); let hashes = state_manager.list_state_hashes_to_certify(); @@ -1097,7 +1102,8 @@ fn returns_state_no_committed_for_future_states() { } #[test] -#[should_panic(expected = "which is different from previously computed or delivered hash")] +#[should_panic] +// We don't expect a specific panic message because it is not deterministic (could happen when sending `Wait` or when awaiting it). fn panics_on_forked_history() { state_manager_test(|_metrics, state_manager| { // Create a checkpoint at h which we can fetch later. @@ -1141,6 +1147,7 @@ fn checkpoints_outlive_state_manager() { &config, None, ic_types::malicious_flags::MaliciousFlags::default(), + tokio::sync::watch::channel(Height::from(0)).0, ); let (_height, mut state) = state_manager.take_tip(); insert_dummy_canister(&mut state, canister_id); @@ -1175,6 +1182,7 @@ fn checkpoints_outlive_state_manager() { &config, None, ic_types::malicious_flags::MaliciousFlags::default(), + tokio::sync::watch::channel(Height::from(0)).0, ); assert_eq!( @@ -1208,6 +1216,7 @@ fn certifications_are_not_persisted() { &config, None, ic_types::malicious_flags::MaliciousFlags::default(), + tokio::sync::watch::channel(Height::from(0)).0, ); let (_height, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, CertificationScope::Full, None); @@ -1226,6 +1235,7 @@ fn certifications_are_not_persisted() { &config, None, ic_types::malicious_flags::MaliciousFlags::default(), + tokio::sync::watch::channel(Height::from(0)).0, ); assert_eq!(vec![Height(1)], heights_to_certify(&state_manager)); } @@ -1499,6 +1509,7 @@ fn should_archive_checkpoints_correctly() { state_manager.commit_and_certify(state, scope.clone(), None); } + state_manager.flush_hash_channel(); assert_eq!(Height(13), state_manager.latest_state_height()); let latest_state = state_manager.get_latest_state(); @@ -1562,6 +1573,7 @@ fn can_remove_checkpoints() { state_manager.commit_and_certify(state, scope.clone(), None); } + state_manager.flush_hash_channel(); assert_eq!(state_manager.list_state_heights(CERT_ANY), heights); state_manager.flush_tip_channel(); state_manager.remove_states_below(Height(4)); @@ -1609,6 +1621,7 @@ fn cannot_remove_height_zero() { let (_height, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); assert_eq!( state_manager.list_state_heights(CERT_ANY), @@ -1640,6 +1653,7 @@ fn cannot_remove_latest_height_or_checkpoint() { state_manager.commit_and_certify(state, scope.clone(), None); } + state_manager.flush_hash_channel(); assert_eq!( state_manager.list_state_heights(CERT_ANY).last(), @@ -1660,6 +1674,7 @@ fn cannot_remove_latest_height_or_checkpoint() { let (_height, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); assert_eq!( state_manager.list_state_heights(CERT_ANY).last(), @@ -1860,6 +1875,7 @@ fn can_keep_last_checkpoint_and_higher_states_after_removal() { state_manager.commit_and_certify(state, scope.clone(), None); } + state_manager.flush_hash_channel(); assert_eq!(state_manager.list_state_heights(CERT_ANY), heights); state_manager.flush_tip_channel(); state_manager.remove_states_below(Height(10)); @@ -1913,6 +1929,7 @@ fn can_keep_latest_verified_checkpoint_after_removal_with_unverified_checkpoints state_manager.commit_and_certify(state, scope.clone(), None); } + state_manager.flush_hash_channel(); assert_eq!(state_manager.list_state_heights(CERT_ANY), heights); state_manager.flush_tip_channel(); @@ -1987,6 +2004,7 @@ fn should_restart_from_the_latest_checkpoint_requested_to_remove() { state_manager.commit_and_certify(state, scope.clone(), None); } + state_manager.flush_hash_channel(); assert_eq!(state_manager.list_state_heights(CERT_ANY), heights); state_manager.flush_tip_channel(); state_manager.remove_states_below(Height(7)); @@ -2146,6 +2164,7 @@ fn should_not_remove_latest_state_after_restarting_without_checkpoints() { let (_, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); state_manager.remove_states_below(Height(i)); + state_manager.flush_hash_channel(); state_manager.flush_deallocation_channel(); } @@ -2154,6 +2173,7 @@ fn should_not_remove_latest_state_after_restarting_without_checkpoints() { let (_, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); state_manager.remove_states_below(Height(10)); + state_manager.flush_hash_channel(); state_manager.flush_deallocation_channel(); assert_eq!(Height(i), state_manager.latest_state_height()); } @@ -2279,6 +2299,7 @@ fn latest_certified_state_is_not_removed() { state_manager_test(|_metrics, state_manager| { let (_height, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); certify_height(&state_manager, Height(1)); let (_height, state) = state_manager.take_tip(); @@ -2289,6 +2310,7 @@ fn latest_certified_state_is_not_removed() { let (_height, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); state_manager.flush_tip_channel(); state_manager.remove_states_below(Height(4)); @@ -2312,7 +2334,7 @@ fn can_return_and_remember_certifications() { let (_height, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); - + state_manager.flush_hash_channel(); assert_eq!( vec![Height(1), Height(2)], heights_to_certify(&state_manager) @@ -2328,10 +2350,12 @@ fn certifications_of_transient_states_are_not_cached() { state_manager_restart_test(|state_manager, restart_fn| { let (_height, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, CertificationScope::Full, None); + state_manager.flush_hash_channel(); certify_height(&state_manager, Height(1)); let (_height, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); certify_height(&state_manager, Height(2)); assert_eq!(Vec::::new(), heights_to_certify(&state_manager)); @@ -2342,6 +2366,7 @@ fn certifications_of_transient_states_are_not_cached() { let (_height, state) = state_manager.take_tip(); // Commit the same state again. The certification should be re-used. state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); assert_eq!( vec![Height(1), Height(2)], heights_to_certify(&state_manager) @@ -2354,7 +2379,7 @@ fn uses_latest_certified_state_to_decode_certified_streams() { state_manager_test(|_metrics, state_manager| { let (_height, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); - + state_manager.flush_hash_channel(); let subnet = subnet_test_id(42); // no streams yet @@ -2371,6 +2396,7 @@ fn uses_latest_certified_state_to_decode_certified_streams() { }); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); // Have a stream, but this state is not certified yet. assert_eq!( state_manager.encode_certified_stream_slice(subnet, None, None, None, None), @@ -2398,6 +2424,7 @@ fn encode_stream_index_is_checked() { }); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); certify_height(&state_manager, Height(1)); let zero_idx = StreamIndex::from(0); @@ -4151,6 +4178,7 @@ fn can_commit_after_prev_state_is_gone() { let (_height, mut tip) = dst_state_manager.take_tip(); insert_dummy_canister(&mut tip, canister_test_id(100)); dst_state_manager.commit_and_certify(tip, CertificationScope::Metadata, None); + dst_state_manager.flush_hash_channel(); let (_height, tip) = dst_state_manager.take_tip(); @@ -4162,9 +4190,11 @@ fn can_commit_after_prev_state_is_gone() { dst_state_manager.flush_deallocation_channel(); assert_eq!(Height(3), dst_state_manager.latest_state_height()); + // tip_height should be present + assert!(dst_state_manager.get_state_at(Height(1)).is_ok()); assert_eq!( - dst_state_manager.get_state_at(Height(1)), - Err(StateManagerError::StateRemoved(Height(1))) + dst_state_manager.get_state_at(Height(2)), + Err(StateManagerError::StateRemoved(Height(2))) ); // Check that we can still commit the old tip. @@ -5031,7 +5061,7 @@ fn should_not_leak_checkpoint_when_state_sync_into_existing_snapshot_height() { let (_height, state) = src_state_manager.take_tip(); src_state_manager.commit_and_certify(state, CertificationScope::Full, None); - + src_state_manager.flush_hash_channel(); wait_for_checkpoint(&*src_state_manager, Height(3)); certify_height(&*src_state_manager, Height(1)); @@ -5052,6 +5082,7 @@ fn should_not_leak_checkpoint_when_state_sync_into_existing_snapshot_height() { let (tip_height, state) = dst_state_manager.take_tip(); assert_eq!(tip_height, Height(0)); dst_state_manager.commit_and_certify(state, CertificationScope::Full, None); + dst_state_manager.flush_hash_channel(); dst_state_manager.flush_tip_channel(); certify_height(&*dst_state_manager, Height(1)); @@ -5068,10 +5099,12 @@ fn should_not_leak_checkpoint_when_state_sync_into_existing_snapshot_height() { let (_height, state) = dst_state_manager.take_tip(); dst_state_manager.commit_and_certify(state, CertificationScope::Full, None); + dst_state_manager.flush_hash_channel(); certify_height(&*dst_state_manager, Height(2)); let (_height, state) = dst_state_manager.take_tip(); dst_state_manager.commit_and_certify(state, CertificationScope::Full, None); + dst_state_manager.flush_hash_channel(); dst_state_manager.flush_tip_channel(); dst_state_manager.remove_states_below(Height(3)); @@ -5111,6 +5144,7 @@ fn should_not_leak_checkpoint_when_state_sync_into_existing_snapshot_height() { let (_height, state) = dst_state_manager.take_tip(); dst_state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + dst_state_manager.flush_hash_channel(); certify_height(&*dst_state_manager, Height(3)); certify_height(&*dst_state_manager, Height(4)); assert_eq!(dst_state_manager.latest_certified_height(), Height(4)); @@ -5382,6 +5416,7 @@ fn certified_read_can_certify_ingress_history_entry() { |_| {}, ); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); let path: LabeledTree<()> = LabeledTree::SubTree(flatmap! { label("request_status") => LabeledTree::SubTree( flatmap! { @@ -5423,6 +5458,7 @@ fn certified_read_can_certify_time() { state.metadata.batch_time += Duration::new(0, 100); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); let path: LabeledTree<()> = LabeledTree::SubTree(flatmap! { label("time") => Leaf(()) }); @@ -5453,6 +5489,7 @@ fn certified_read_can_certify_canister_data() { insert_dummy_canister(&mut state, canister_id); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); let path = SubTree(flatmap! { label("canister") => SubTree( @@ -5523,6 +5560,7 @@ fn certified_read_can_certify_node_public_keys_since_v12() { state.metadata.node_public_keys = node_public_keys; state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); let subnet_id = subnet_test_id(42).get(); let node_id = node_test_id(39).get(); @@ -5588,6 +5626,7 @@ fn certified_read_can_certify_api_boundary_nodes_since_v16() { }; state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); let api_bn_id = node_test_id(11).get(); let path: Vec<&[u8]> = vec![b"api_boundary_nodes", api_bn_id.as_ref()]; @@ -5626,6 +5665,7 @@ fn certified_read_succeeds_for_empty_forks() { let (_, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); let path: LabeledTree<()> = LabeledTree::SubTree(flatmap! { label("api_boundary_nodes") => LabeledTree::Leaf(()), @@ -5666,6 +5706,7 @@ fn certified_read_succeeds_for_empty_tree() { let (_, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); let path: LabeledTree<()> = LabeledTree::SubTree(flatmap! {}); @@ -5696,6 +5737,7 @@ fn certified_read_returns_absence_proof_for_non_existing_entries() { |_| {}, ); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); let path: LabeledTree<()> = LabeledTree::SubTree(flatmap! { label("request_status") => LabeledTree::SubTree( @@ -5736,6 +5778,7 @@ fn certified_read_returns_absence_proof_for_non_existing_entries_in_empty_state( let (_, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); let path: LabeledTree<()> = LabeledTree::SubTree(flatmap! { label("request_status") => LabeledTree::SubTree( @@ -5784,6 +5827,7 @@ fn certified_read_can_fetch_multiple_entries_in_one_go() { |_| {}, ); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); let path: LabeledTree<()> = LabeledTree::SubTree(flatmap! { label("request_status") => LabeledTree::SubTree( @@ -5851,6 +5895,7 @@ fn certified_read_can_produce_proof_of_absence() { |_| {}, ); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); let path: LabeledTree<()> = LabeledTree::SubTree(flatmap! { label("request_status") => LabeledTree::SubTree( @@ -5931,6 +5976,7 @@ fn certified_read_can_exclude_canister_ranges() { state.metadata.network_topology = network_topology; state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); let path = SubTree(flatmap! { label("subnet") => Leaf(()) @@ -6062,6 +6108,7 @@ fn diverged_checkpoint_is_complete() { &config, None, ic_types::malicious_flags::MaliciousFlags::default(), + tokio::sync::watch::channel(Height::from(0)).0, ); let (_, state) = state_manager.take_tip(); @@ -6084,6 +6131,7 @@ fn diverged_checkpoint_is_complete() { &config, None, ic_types::malicious_flags::MaliciousFlags::default(), + tokio::sync::watch::channel(Height::from(0)).0, ); // If the Tip thread is active while we report diverged checkpoint, it may crash // which is OK in production but confuses debug assertions. @@ -6102,6 +6150,7 @@ fn diverged_checkpoint_is_complete() { &config, None, ic_types::malicious_flags::MaliciousFlags::default(), + tokio::sync::watch::channel(Height::from(0)).0, ); // check that the diverged checkpoint has the same manifest as before @@ -6125,10 +6174,12 @@ fn report_diverged_state() { vec![Box::new(|state_manager: StateManagerImpl| { let (_height, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); std::thread::sleep(std::time::Duration::from_secs(2)); let mut certification = certify_height(&state_manager, Height(1)); let (_height, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); // Hack the certification so it is a divergence certification.height = Height(2); @@ -6342,11 +6393,13 @@ fn remove_too_many_diverged_state_markers() { fn diverge_state_at(state_manager: StateManagerImpl, divergence: u64) { let (_height, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); + state_manager.flush_hash_channel(); let mut certification = certify_height(&state_manager, Height(1)); for _i in 2..(divergence + 1) { let (_height, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, CertificationScope::Metadata, None); } + state_manager.flush_hash_channel(); // Hack the certification so it is a divergence certification.height = Height(divergence); @@ -6838,7 +6891,6 @@ fn can_delete_canister() { state_manager.commit_and_certify(state, CertificationScope::Full, None); state_manager.flush_tip_channel(); - // Check the checkpoint has the canister. let canister_path = state_manager .state_layout() @@ -8932,12 +8984,6 @@ fn no_state_clone_count(metrics: &MetricsRegistry) -> u64 { .sum::() } -fn tip_hash_count(metrics: &MetricsRegistry) -> u64 { - fetch_int_counter_vec(metrics, "state_manager_tip_hash_count") - .values() - .sum::() -} - fn flush_unflushed_delta_count(metrics: &MetricsRegistry) -> u64 { let mut labels = BTreeMap::new(); labels.insert("request".to_string(), "flush_unflushed_delta".to_string()); @@ -8969,25 +9015,38 @@ fn fake_certification_for_height_with_hash(height: Height, hash: CryptoHash) -> fn commit_and_certify_optimization_conditions() { state_manager_test(|metrics, sm| { // update `fast_forward_height` to enable optimization - sm.update_fast_forward_height(Height::new(21)); - assert_eq!(sm.fast_forward_height(), 21); + sm.update_fast_forward_height(Height::new(12)); + assert_eq!(sm.fast_forward_height(), 12); // optimization has not triggered yet assert_eq!(no_state_clone_count(metrics), 0); + // consensus delivers a certification so the optimization triggers + let certification = fake_certification_for_height(Height::new(1)); + sm.deliver_state_certification(certification.clone()); + // all conditions are satisfied => optimization triggers let state = sm.take_tip().1; sm.commit_and_certify_at_height(state, Height::new(1), CertificationScope::Metadata, None); assert_eq!(no_state_clone_count(metrics), 1); // `CertificationScope::Full` => optimization does not trigger + sm.deliver_state_certification(fake_certification_for_height_with_hash( + Height::new(2), + CryptoHash( + hex::decode("6cfc30b7160f24c186e0d891f7ca3dfbd566b2ed08922ddd62ccc84db49032aa") + .unwrap(), + ), + )); let state = sm.take_tip().1; sm.commit_and_certify_at_height(state, Height::new(2), CertificationScope::Full, None); assert_eq!(no_state_clone_count(metrics), 1); - // heights of 10 and 20 are divisible by 10 => optimization does not trigger at those heights + // deliver cert -> optimization triggers let mut expected_no_state_clone_count = 1; - for height in 3..21 { + for height in 3_u64..10_u64 { + expected_no_state_clone_count += 1; + sm.deliver_state_certification(fake_certification_for_height(Height::new(height))); let state = sm.take_tip().1; sm.commit_and_certify_at_height( state, @@ -8995,16 +9054,54 @@ fn commit_and_certify_optimization_conditions() { CertificationScope::Metadata, None, ); - if !height.is_multiple_of(10) { - expected_no_state_clone_count += 1; - } assert_eq!(no_state_clone_count(metrics), expected_no_state_clone_count); } + // at height 10, do deliver cert, but it should skip optimization anyway + let height = 10; + sm.deliver_state_certification(fake_certification_for_height_with_hash( + Height::new(height), + CryptoHash( + hex::decode("d90d05f3d971482d2ccdc594de723d786afaed32737b19e42f85a1ebe8e21a21") + .unwrap(), + ), + )); + let state = sm.take_tip().1; + sm.commit_and_certify_at_height( + state, + Height::new(height), + CertificationScope::Metadata, + None, + ); + assert_eq!(no_state_clone_count(metrics), expected_no_state_clone_count); - // height of 21 is not less than `fast_forward_height` => optimization does not trigger - assert_eq!(sm.fast_forward_height(), 21); + // height of 11 would trigger optimization if a certification were available, but we don't deliver one. + let height = 11; let state = sm.take_tip().1; - sm.commit_and_certify_at_height(state, Height::new(21), CertificationScope::Metadata, None); + sm.commit_and_certify_at_height( + state, + Height::new(height), + CertificationScope::Metadata, + None, + ); + assert_eq!(no_state_clone_count(metrics), expected_no_state_clone_count); + + // height of 12 is not less than `fast_forward_height` => optimization does not trigger + let height = 12; + assert_eq!(sm.fast_forward_height(), height); + sm.deliver_state_certification(fake_certification_for_height_with_hash( + Height::new(height), + CryptoHash( + hex::decode("bc6503abbc56b62766950627397808b87b0c3ca16312bd89d34208f01720d5e5") + .unwrap(), + ), + )); + let state = sm.take_tip().1; + sm.commit_and_certify_at_height( + state, + Height::new(height), + CertificationScope::Metadata, + None, + ); assert_eq!(no_state_clone_count(metrics), expected_no_state_clone_count); }); } @@ -9023,14 +9120,15 @@ fn commit_and_certify_optimization_semantics() { let only_initial_state = || { assert_eq!(sm.state_snapshot_heights(), vec![INITIAL_STATE_HEIGHT]); assert!(sm.certifications_metadata_heights().is_empty()); - assert!(sm.certifications().is_empty()); }; only_initial_state(); + assert!(sm.certifications().is_empty()); // optimization triggers => no state snapshot and certifications metadata are stored => still just the initial state in `states` let mut batch_time_opt = None; let mut expected_no_state_clone_count = 0; for opt_height in 1..10 { + sm.deliver_state_certification(fake_certification_for_height(Height::new(opt_height))); let (height, mut state) = sm.take_tip(); assert_eq!(height, Height::new(opt_height - 1)); // tip height is set correctly if optimization triggers if let Some(batch_time) = batch_time_opt { @@ -9055,6 +9153,7 @@ fn commit_and_certify_optimization_semantics() { let batch_time_no_opt = state.metadata.batch_time; let no_opt_height = Height::new(10); sm.commit_and_certify_at_height(state, no_opt_height, CertificationScope::Metadata, None); + sm.flush_hash_channel(); assert_eq!(no_state_clone_count(metrics), expected_no_state_clone_count); assert_eq!( @@ -9073,7 +9172,6 @@ fn commit_and_certify_optimization_semantics() { sm.certifications_metadata_certification(no_opt_height) .is_none() ); - assert!(sm.certifications().is_empty()); }); } @@ -9142,109 +9240,113 @@ fn deliver_state_certification_for_future_heights() { }); } +/// Test that `max_certified_height_tx` is updated when `deliver_state_certification` +/// certifies a height whose state is already committed (the common case), and that +/// it is NOT updated again for the same or a lower height. #[test] -fn take_tip_does_not_hash_without_optimization() { - state_manager_test(|metrics, sm| { - // optimization has not triggered yet - assert_eq!(no_state_clone_count(metrics), 0); - - // the initial state is always hashed in `take_tip` - assert_eq!(tip_hash_count(metrics), 0); - let (initial_height, initial_state) = sm.take_tip(); - assert_eq!(initial_height, INITIAL_STATE_HEIGHT); - assert_eq!(tip_hash_count(metrics), 1); - - // optimization does not trigger - let no_opt_height = Height::new(1); - sm.commit_and_certify_at_height( - initial_state, - no_opt_height, - CertificationScope::Metadata, +fn max_certified_height_fires_when_state_already_committed() { + with_test_replica_logger(|log| { + let tmp = ic_test_utilities_tmpdir::tmpdir("sm"); + let config = Config::new(tmp.path().into()); + let metrics = MetricsRegistry::new(); + let (max_certified_height_tx, mut max_certified_height_rx) = + tokio::sync::watch::channel(Height::from(0)); + let sm = StateManagerImpl::new( + std::sync::Arc::new(FakeVerifier::new()), + subnet_test_id(42), + SubnetType::Application, + log, + &metrics, + &config, None, + ic_types::malicious_flags::MaliciousFlags::default(), + max_certified_height_tx, ); - assert_eq!(no_state_clone_count(metrics), 0); - - // the state at height 1 is not hashed in `take_tip` since certification metadata are computed - assert_eq!(tip_hash_count(metrics), 1); - let (height, _state) = sm.take_tip(); - assert_eq!(tip_hash_count(metrics), 1); - assert_eq!(height, no_opt_height); - }); -} -#[test] -fn take_tip_does_not_hash_with_optimization() { - state_manager_test(|metrics, sm| { - // consensus delivers certification for the next height - let opt_height = Height::new(1); - let certification = fake_certification_for_height(opt_height); - sm.deliver_state_certification(certification); - assert_eq!( - sm.certifications().keys().cloned().collect::>(), - vec![opt_height] - ); + // Commit heights 1 and 2 so they are in certifications_metadata. + let (_, state) = sm.take_tip(); + sm.commit_and_certify(state, CertificationScope::Full, None); + let (_, state) = sm.take_tip(); + sm.commit_and_certify(state, CertificationScope::Full, None); - // update `fast_forward_height` to enable optimization - sm.update_fast_forward_height(Height::new(42)); - assert_eq!(sm.fast_forward_height(), 42); + // No certification delivered yet — receiver should not have changed. + assert!(!max_certified_height_rx.has_changed().unwrap()); - // optimization has not triggered yet - assert_eq!(no_state_clone_count(metrics), 0); + // Deliver certification for height 1: state is in certifications_metadata, + // so the height is certified immediately. + let cert1 = certify_height(&sm, Height(1)); + assert!( + max_certified_height_rx.has_changed().unwrap(), + "receiver should fire when a committed state is certified" + ); + assert_eq!(*max_certified_height_rx.borrow_and_update(), Height(1)); - // the initial state is always hashed in `take_tip` - assert_eq!(tip_hash_count(metrics), 0); - let (initial_height, initial_state) = sm.take_tip(); - assert_eq!(initial_height, INITIAL_STATE_HEIGHT); - assert_eq!(tip_hash_count(metrics), 1); + // Deliver certification for height 2: new higher height — receiver fires again. + certify_height(&sm, Height(2)); + assert!(max_certified_height_rx.has_changed().unwrap()); + assert_eq!(*max_certified_height_rx.borrow_and_update(), Height(2)); - // optimization triggers - sm.commit_and_certify_at_height( - initial_state, - opt_height, - CertificationScope::Metadata, - None, + // Re-deliver certification for height 1 (lower than current max of 2): receiver must NOT fire. + // We call deliver_state_certification directly because list_state_hashes_to_certify() + // no longer returns already-certified heights. + sm.deliver_state_certification(cert1); + assert!( + !max_certified_height_rx.has_changed().unwrap(), + "receiver must not fire for a height lower than the already-transmitted maximum" ); - assert_eq!(no_state_clone_count(metrics), 1); - - // the state at height 1 is not hashed in `take_tip` since certification was delivered - assert_eq!(tip_hash_count(metrics), 1); - let (height, _state) = sm.take_tip(); - assert_eq!(tip_hash_count(metrics), 1); - assert_eq!(height, opt_height); }); } +/// Test that `max_certified_height_tx` is NOT updated when `deliver_state_certification` +/// receives a certification for a height whose state has not been committed yet +/// (the certification is deferred into `states.certifications`). #[test] -fn take_tip_hashes_with_optimization() { - state_manager_test(|metrics, sm| { - // update `fast_forward_height` to enable optimization - sm.update_fast_forward_height(Height::new(42)); - assert_eq!(sm.fast_forward_height(), 42); +fn max_certified_height_deferred_when_cert_arrives_before_state() { + with_test_replica_logger(|log| { + let tmp = ic_test_utilities_tmpdir::tmpdir("sm"); + let config = Config::new(tmp.path().into()); + let metrics = MetricsRegistry::new(); + let (max_certified_height_tx, max_certified_height_rx) = + tokio::sync::watch::channel(Height::from(0)); + let sm = StateManagerImpl::new( + std::sync::Arc::new(FakeVerifier::new()), + subnet_test_id(42), + SubnetType::Application, + log, + &metrics, + &config, + None, + ic_types::malicious_flags::MaliciousFlags::default(), + max_certified_height_tx, + ); - // optimization has not triggered yet - assert_eq!(no_state_clone_count(metrics), 0); + // Set fast_forward_height so that height 1 is in the range of heights to certify. + sm.update_fast_forward_height(Height::new(2)); - // the initial state is always hashed in `take_tip` - assert_eq!(tip_hash_count(metrics), 0); - let (initial_height, initial_state) = sm.take_tip(); - assert_eq!(initial_height, INITIAL_STATE_HEIGHT); - assert_eq!(tip_hash_count(metrics), 1); + // Deliver a certification for height 1 before the state at height 1 is committed. + // This stores the certification in `states.certifications` (deferred path). + // The hash is fake so this cert will never match a real committed state, + // but it is sufficient to verify that the receiver does not fire prematurely. + let fake_cert = fake_certification_for_height(Height::new(1)); + sm.deliver_state_certification(fake_cert.clone()); - // optimization triggers - let opt_height = Height::new(1); - sm.commit_and_certify_at_height( - initial_state, - opt_height, - CertificationScope::Metadata, - None, + // The certification is stored in `states.certifications` but the height is not yet + // certified — the receiver must not fire. + assert_eq!(sm.certifications_metadata_heights(), vec![]); + assert_eq!( + sm.certifications().keys().cloned().collect::>(), + vec![Height::new(1)] + ); + assert_eq!(sm.certifications().get(&Height::new(1)), Some(&fake_cert)); + assert!( + !max_certified_height_rx.has_changed().unwrap(), + "receiver must not fire when the state has not been committed yet" + ); + assert_eq!( + *max_certified_height_rx.borrow(), + Height::from(0), + "max certified height must remain 0 until a state is truly certified" ); - assert_eq!(no_state_clone_count(metrics), 1); - - // the state at height 1 is hashed in `take_tip` since certification metadata are not computed - assert_eq!(tip_hash_count(metrics), 1); - let (height, _state) = sm.take_tip(); - assert_eq!(tip_hash_count(metrics), 2); - assert_eq!(height, opt_height); }); } @@ -9281,6 +9383,17 @@ fn remove_inmemory_states_below_prunes_certification() { // we commit a strictly larger height 2 without optimization let state = sm.take_tip().1; sm.commit_and_certify_at_height(state, Height::new(2), CertificationScope::Metadata, None); + // flusing the hash channel guarantees that the `latest_state_height` is incremented, which we rely on below. + sm.flush_hash_channel(); + // also move `latest_certified_height`, because that is protected from pruning. + sm.deliver_state_certification(fake_certification_for_height_with_hash( + Height::new(2), + CryptoHash( + hex::decode("cea29292fecbadde1cd534ce411b829ba9d8db4971794f827eddda5c5dbfcee4") + .unwrap(), + ), + )); + assert_eq!(no_state_clone_count(metrics), 0); // certification at height 1 is pruned now that `latest_state_height` advanced to 2 @@ -9289,6 +9402,8 @@ fn remove_inmemory_states_below_prunes_certification() { sm.certifications().keys().cloned().collect::>(), vec![] ); + // still present here because `latest_certified_height` is 2 and because the previous round was normal, not optimized. + assert_eq!(sm.certifications_metadata_heights(), vec![Height::new(2)]); }); } @@ -9338,6 +9453,10 @@ fn get_state_hash_at() { // optimization triggers every multiple of 10 let mut expected_no_state_clone_count = 0; for height in 1..checkpoint_height.get() { + if !height.is_multiple_of(10) { + sm.deliver_state_certification(fake_certification_for_height(Height::new(height))); + expected_no_state_clone_count += 1; + } let state = sm.take_tip().1; sm.commit_and_certify_at_height( state, @@ -9345,9 +9464,6 @@ fn get_state_hash_at() { CertificationScope::Metadata, None, ); - if !height.is_multiple_of(10) { - expected_no_state_clone_count += 1; - } assert_eq!(no_state_clone_count(metrics), expected_no_state_clone_count); assert_eq!( sm.get_state_hash_at(checkpoint_height), @@ -9382,6 +9498,7 @@ fn flush_with_optimization() { assert_eq!(flush_unflushed_delta_count(metrics), 0); // optimization triggers + sm.deliver_state_certification(fake_certification_for_height(Height::new(1))); let state = sm.take_tip().1; let opt_height = Height::new(1); let batch_summary = BatchSummary { @@ -9395,6 +9512,7 @@ fn flush_with_optimization() { CertificationScope::Metadata, Some(batch_summary), ); + sm.flush_hash_channel(); assert_eq!(no_state_clone_count(metrics), 1); // delta has been flushed @@ -9409,6 +9527,7 @@ fn valid_witness_in_list_state_hashes_to_certify() { let state = sm.take_tip().1; let height = Height::new(1); sm.commit_and_certify_at_height(state, height, CertificationScope::Metadata, None); + sm.flush_hash_channel(); let state_hashes = sm.list_state_hashes_to_certify(); assert_eq!(state_hashes.len(), 1); @@ -9493,8 +9612,8 @@ fn commit_and_certify_reuses_certification() { // optimization does not trigger let state = sm.take_tip().1; sm.commit_and_certify_at_height(state, no_opt_height, CertificationScope::Metadata, None); + sm.flush_hash_channel(); assert_eq!(no_state_clone_count(metrics), 0); - // `commit_and_certify` reused certification from `states.certifications` assert!( sm.certifications_metadata_certification(no_opt_height) @@ -9511,9 +9630,10 @@ fn commit_and_certify_reuses_certification() { } #[test] -#[should_panic( - expected = "Committed state @1 with hash CryptoHash(0x4e2d174de5daaeb4622d8f5e426ee09274f7ec4fb01d62fb9a3d36ae50961353) which is different from previously computed or delivered hash CryptoHash(0x2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a)" -)] +#[should_panic] +// This test fails with the following panic message, but we can't inspect it because is happens in another thread: +// Committed state @1 with hash CryptoHash(0x4e2d174de5daaeb4622d8f5e426ee09274f7ec4fb01d62fb9a3d36ae50961353) which is different from previously computed or delivered hash CryptoHash(0x2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a) +// We don't expect a specific panic message because it is not deterministic (could happen when sending `Wait` or when awaiting it). fn commit_and_certify_panic_on_delivered_fake_certification() { state_manager_test(|metrics, sm| { // consensus delivers certification for a future height @@ -9527,3 +9647,122 @@ fn commit_and_certify_panic_on_delivered_fake_certification() { assert_eq!(no_state_clone_count(metrics), 0); }); } + +#[test] +fn certification_not_pruned() { + state_manager_test(|metrics, sm| { + // update `fast_forward_height` to enable optimization + sm.update_fast_forward_height(Height::new(5)); + + // consensus delivers a certification so the optimization triggers + sm.deliver_state_certification(fake_certification_for_height_with_hash( + Height::new(1), + CryptoHash( + hex::decode("38e2d3769adae3b19e69c586c7bb20cd63d0e070a3bc59e6b8d17c31983b6556") + .unwrap(), + ), + )); + + // all conditions are satisfied => optimization triggers + let state = sm.take_tip().1; + sm.commit_and_certify_at_height(state, Height::new(1), CertificationScope::Metadata, None); + assert_eq!(no_state_clone_count(metrics), 1); + + let state = sm.take_tip().1; + let mut new_state = state.clone(); + new_state.metadata.prev_state_hash = Some( + CryptoHash( + hex::decode("38e2d3769adae3b19e69c586c7bb20cd63d0e070a3bc59e6b8d17c31983b6556") + .unwrap(), + ) + .into(), + ); + sm.push_state_and_cert_metadata(Height::new(2), new_state); + sm.remove_states_below(Height::new(2)); + + // certification @ height 1 should still be present, because it is at tip_height + assert!(sm.certifications().contains_key(&Height::new(1))); + sm.commit_and_certify_at_height(state, Height::new(2), CertificationScope::Metadata, None); + }); +} + +#[test] +fn remove_states_below_protects_tip_height() { + state_manager_test(|_metrics, sm| { + let (_height, state) = sm.take_tip(); + sm.commit_and_certify(state, CertificationScope::Full, None); + let hash_at_1 = wait_for_checkpoint(&sm, Height(1)); + + let (_height, state) = sm.take_tip(); + sm.commit_and_certify(state, CertificationScope::Metadata, None); + sm.flush_hash_channel(); + + let (tip_height, state) = sm.take_tip(); + assert_eq!(tip_height, Height(2)); + + // Fetch state at height 10 using the hash of state 1. + // This clones the verified checkpoint @1 to @10 and advances + // `latest_state_height` to 10, but does not touch `tip_height`. + sm.fetch_state(Height(10), hash_at_1.clone(), Height::new(999)); + // let hash_at_10 = wait_for_checkpoint(&state_manager, Height(10)); + // assert_eq!(hash_at_1, hash_at_10); + sm.flush_hash_channel(); + assert_eq!(sm.latest_state_height(), Height(10)); + + // height 2 should be retained + sm.remove_states_below(Height(10)); + assert!( + sm.list_state_heights(CERT_ANY).contains(&Height(2)), + "tip height @2 should be retained by remove_states_below, \ + got heights: {:?}", + sm.list_state_heights(CERT_ANY), + ); + // This needs the hash @ prev_height = 2 to + // still be available, which is only the case if the tip-height + // protection kept the certification metadata at height 2 around. + sm.commit_and_certify(state, CertificationScope::Metadata, None); + }) +} + +#[test] +fn remove_states_below_protects_tip_height_with_optimization() { + state_manager_test(|_metrics, sm| { + let (_height, state) = sm.take_tip(); + sm.commit_and_certify(state, CertificationScope::Full, None); + let hash_at_1 = wait_for_checkpoint(&sm, Height(1)); + + sm.update_fast_forward_height(Height::new(10)); + sm.deliver_state_certification(fake_certification_for_height_with_hash( + Height::new(2), + CryptoHash( + hex::decode("cea29292fecbadde1cd534ce411b829ba9d8db4971794f827eddda5c5dbfcee4") + .unwrap(), + ), + )); + + let (_height, state) = sm.take_tip(); + sm.commit_and_certify(state, CertificationScope::Metadata, None); + sm.flush_hash_channel(); + + let (tip_height, state) = sm.take_tip(); + assert_eq!(tip_height, Height(2)); + + // Fetch state at height 10 using the hash of state 1. + // This clones the verified checkpoint @1 to @10 and advances + // `latest_state_height` to 10, but does not touch `tip_height`. + sm.fetch_state(Height(10), hash_at_1.clone(), Height::new(999)); + sm.flush_hash_channel(); + assert_eq!(sm.latest_state_height(), Height(10)); + + // height 2 should be retained + sm.remove_states_below(Height(10)); + assert!(sm.certifications().contains_key(&Height::new(2))); + // the state should _not_ be present (otherwise optimization did not trigger) + assert!(!sm.list_state_heights(CERT_ANY).contains(&Height(2))); + + // This needs the hash @ prev_height = 2 to + // still be available, which is only the case if the tip-height + // protection kept the certification metadata at height 2 around. + sm.commit_and_certify(state, CertificationScope::Metadata, None); + }) +} diff --git a/rs/xnet/payload_builder/tests/common/mod.rs b/rs/xnet/payload_builder/tests/common/mod.rs index 445d64396152..631dba7b4b04 100644 --- a/rs/xnet/payload_builder/tests/common/mod.rs +++ b/rs/xnet/payload_builder/tests/common/mod.rs @@ -71,6 +71,7 @@ impl StateManagerFixture { &config, None, ic_types::malicious_flags::MaliciousFlags::default(), + tokio::sync::watch::channel(ic_types::Height::from(0)).0, ); Self { @@ -95,6 +96,7 @@ impl StateManagerFixture { height.inc_assign(); self.state_manager .commit_and_certify(state, CertificationScope::Metadata, None); + self.state_manager.flush_hash_channel(); certify_height(&self.state_manager, height); self.certified_height = height;