diff --git a/magicblock-chainlink/src/remote_account_provider/chain_laser_actor.rs b/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/actor.rs similarity index 50% rename from magicblock-chainlink/src/remote_account_provider/chain_laser_actor.rs rename to magicblock-chainlink/src/remote_account_provider/chain_laser_actor/actor.rs index 913c0b326..dc72d5c70 100644 --- a/magicblock-chainlink/src/remote_account_provider/chain_laser_actor.rs +++ b/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/actor.rs @@ -1,7 +1,5 @@ use std::{ - collections::{HashMap, HashSet}, fmt, - pin::Pin, sync::{ atomic::{AtomicU16, AtomicU64, Ordering}, Arc, @@ -9,38 +7,31 @@ use std::{ time::Duration, }; -use futures_util::{Stream, StreamExt}; use helius_laserstream::{ - client, - grpc::{ - subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, - SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, - SubscribeUpdate, - }, - ChannelOptions, LaserstreamConfig, LaserstreamError, + grpc::{subscribe_update::UpdateOneof, CommitmentLevel, SubscribeUpdate}, + LaserstreamConfig, LaserstreamError, }; use magicblock_core::logger::log_trace_debug; use magicblock_metrics::metrics::{ inc_account_subscription_account_updates_count, - inc_account_subscription_activations_count, inc_per_program_account_updates_count, inc_program_subscription_account_updates_count, }; -use parking_lot::RwLock; use solana_account::Account; use solana_commitment_config::CommitmentLevel as SolanaCommitmentLevel; use solana_pubkey::Pubkey; use solana_sdk_ids::sysvar::clock; use tokio::sync::{mpsc, oneshot}; -use tokio_stream::StreamMap; use tonic::Code; use tracing::*; use super::{ - chain_rpc_client::{ChainRpcClient, ChainRpcClientImpl}, - chain_slot::ChainSlot, + LaserResult, SharedSubscriptions, StreamFactory, StreamHandle, + StreamManager, StreamManagerConfig, StreamUpdateSource, }; use crate::remote_account_provider::{ + chain_rpc_client::{ChainRpcClient, ChainRpcClientImpl}, + chain_slot::ChainSlot, pubsub_common::{ ChainPubsubActorMessage, MESSAGE_CHANNEL_SIZE, SUBSCRIPTION_UPDATE_CHANNEL_SIZE, @@ -49,18 +40,8 @@ use crate::remote_account_provider::{ SubscriptionUpdate, }; -type LaserResult = Result; -type LaserStreamUpdate = (usize, LaserResult); -type LaserStream = Pin + Send>>; - -const PER_STREAM_SUBSCRIPTION_LIMIT: usize = 1_000; -const SUBSCRIPTION_ACTIVATION_INTERVAL_MILLIS: u64 = 10_000; -const SLOTS_BETWEEN_ACTIVATIONS: u64 = - SUBSCRIPTION_ACTIVATION_INTERVAL_MILLIS / 400; const MAX_SLOTS_BACKFILL: u64 = 400; -pub type SharedSubscriptions = Arc>>; - // ----------------- // Slots // ----------------- @@ -98,59 +79,54 @@ impl fmt::Display for AccountUpdateSource { // ----------------- // ChainLaserActor // ----------------- -/// ChainLaserActor manages gRPC subscriptions to Helius Laser or Triton endpoints. +/// ChainLaserActor manages gRPC subscriptions to Helius Laser +/// or Triton endpoints. /// /// ## Subscription Lifecycle /// -/// 1. **Requested**: User calls `subscribe(pubkey)`. Pubkey is added to `subscriptions` set. -/// 2. **Queued**: Every [SUBSCRIPTION_ACTIVATION_INTERVAL_MILLIS], `update_active_subscriptions()` creates new streams. -/// 3. **Active**: Subscriptions are sent to Helius/Triton via gRPC streams in `active_subscriptions`. -/// 4. **Updates**: Account updates flow back via the streams and are forwarded to the consumer. +/// 1. **Requested**: User calls `subscribe(pubkey)`. +/// 2. **Active**: The pubkey is immediately forwarded to the +/// [StreamManager] which handles stream creation/chunking. +/// 3. **Updates**: Account updates flow back via the streams +/// and are forwarded to the consumer. /// /// ## Stream Management /// -/// - Subscriptions are grouped into chunks of up to 1,000 per stream (Helius limit). -/// - Each chunk gets its own gRPC stream (`StreamMap`). -/// - When subscriptions change, ALL streams are dropped and recreated. -/// - This simplifies reasoning but loses in-flight updates during the transition. +/// Stream creation, chunking, promotion, and optimization are +/// fully delegated to [StreamManager]. /// /// ## Reconnection Behavior /// -/// - If a stream ends unexpectedly, `signal_connection_issue()` is called. -/// - The actor sends an abort signal to the submux, which triggers reconnection. -/// - The actor itself doesn't attempt to reconnect; it relies on external recovery. -pub struct ChainLaserActor { - /// Configuration used to create the laser client - laser_client_config: LaserstreamConfig, - /// Requested subscriptions, some may not be active yet. - /// Shared with ChainLaserClientImpl for sync access to - /// subscription_count and subscriptions_union. - subscriptions: SharedSubscriptions, - /// Pubkeys of currently active subscriptions - active_subscription_pubkeys: HashSet, - /// Subscriptions that have been activated via the helius provider - active_subscriptions: StreamMap, - /// Active streams for program subscriptions - program_subscriptions: Option<(HashSet, LaserStream)>, +/// - If a stream ends unexpectedly, `signal_connection_issue()` +/// is called. +/// - The actor sends an abort signal to the submux, which +/// triggers reconnection. +/// - The actor itself doesn't attempt to reconnect; it relies +/// on external recovery. +pub struct ChainLaserActor> { + /// Manager for creating and polling laser streams + stream_manager: StreamManager, /// Receives subscribe/unsubscribe messages to this actor messages_receiver: mpsc::Receiver, - /// Sends updates for any account subscription that is received via - /// the Laser client subscription mechanism + /// Sends updates for any account subscription that is + /// received via the Laser client subscription mechanism subscription_updates_sender: mpsc::Sender, /// The commitment level to use for subscriptions commitment: CommitmentLevel, /// Channel used to signal connection issues to the submux abort_sender: mpsc::Sender<()>, - /// Slot tracking for chain slot synchronization and activation lookback + /// Slot tracking for chain slot synchronization and + /// activation lookback slots: Slots, - /// Unique client ID including the gRPC provider name for this actor instance used in logs - /// and metrics + /// Unique client ID including the gRPC provider name for + /// this actor instance used in logs and metrics client_id: String, - /// RPC client for diagnostics (e.g., fetching slot when falling behind) + /// RPC client for diagnostics (e.g., fetching slot when + /// falling behind) rpc_client: ChainRpcClientImpl, } -impl ChainLaserActor { +impl ChainLaserActor { pub fn new_from_url( pubsub_url: &str, client_id: &str, @@ -165,7 +141,7 @@ impl ChainLaserActor { mpsc::Receiver, SharedSubscriptions, ) { - let channel_options = ChannelOptions { + let channel_options = helius_laserstream::ChannelOptions { connect_timeout_secs: Some(5), http2_keep_alive_interval_secs: Some(15), tcp_keepalive_secs: Some(30), @@ -200,6 +176,33 @@ impl ChainLaserActor { mpsc::Sender, mpsc::Receiver, SharedSubscriptions, + ) { + let stream_factory = super::StreamFactoryImpl::new(laser_client_config); + Self::with_stream_factory( + client_id, + stream_factory, + commitment, + abort_sender, + slots, + rpc_client, + ) + } +} + +impl> ChainLaserActor { + /// Create actor with a custom stream factory (for testing) + pub fn with_stream_factory( + client_id: &str, + stream_factory: S, + commitment: SolanaCommitmentLevel, + abort_sender: mpsc::Sender<()>, + slots: Slots, + rpc_client: ChainRpcClientImpl, + ) -> ( + Self, + mpsc::Sender, + mpsc::Receiver, + SharedSubscriptions, ) { let (subscription_updates_sender, subscription_updates_receiver) = mpsc::channel(SUBSCRIPTION_UPDATE_CHANNEL_SIZE); @@ -207,16 +210,13 @@ impl ChainLaserActor { mpsc::channel(MESSAGE_CHANNEL_SIZE); let commitment = grpc_commitment_from_solana(commitment); - let subscriptions: SharedSubscriptions = Default::default(); - let shared_subscriptions = Arc::clone(&subscriptions); + let stream_manager = + StreamManager::new(StreamManagerConfig::default(), stream_factory); + let shared_subscriptions = Arc::clone(stream_manager.subscriptions()); let me = Self { - laser_client_config, + stream_manager, messages_receiver, - subscriptions, - active_subscriptions: Default::default(), - active_subscription_pubkeys: Default::default(), - program_subscriptions: Default::default(), subscription_updates_sender, commitment, abort_sender, @@ -237,72 +237,36 @@ impl ChainLaserActor { #[instrument(skip(self), fields(client_id = %self.client_id))] fn shutdown(&mut self) { info!("Shutting down laser actor"); - self.subscriptions.write().clear(); - self.active_subscriptions.clear(); - self.active_subscription_pubkeys.clear(); + Self::clear_subscriptions(&mut self.stream_manager); } #[instrument(skip(self), fields(client_id = %self.client_id))] pub async fn run(mut self) { - let mut activate_subs_interval = - tokio::time::interval(std::time::Duration::from_millis( - SUBSCRIPTION_ACTIVATION_INTERVAL_MILLIS, - )); - loop { tokio::select! { - // Actor messages msg = self.messages_receiver.recv() => { match msg { Some(msg) => { - let is_shutdown = self.handle_msg(msg); - if is_shutdown { + if self.handle_msg(msg).await { break; } } - None => { - break; - } - } - } - // Account subscription updates - update = self.active_subscriptions.next(), if !self.active_subscriptions.is_empty() => { - match update { - Some(update) => { - self.handle_account_update(update).await; - } - None => { - debug!("Account subscription stream ended"); - Self::signal_connection_issue( - &self.subscriptions, - &mut self.active_subscriptions, - &mut self.active_subscription_pubkeys, - &mut self.program_subscriptions, - &self.abort_sender, - &self.client_id, - ) - .await; - } + None => break, } }, - // Program subscription updates - update = async { - match &mut self.program_subscriptions { - Some((_, stream)) => stream.next().await, - None => std::future::pending().await, - } - }, if self.program_subscriptions.is_some() => { + update = self.stream_manager.next_update(), if self.stream_manager.has_any_subscriptions() => { match update { - Some(update) => { - self.handle_program_update(update).await; + Some((src, result)) => { + self.handle_stream_result( + src, result, + ).await; } None => { - debug!("Program subscription stream ended"); + debug!( + "Subscription stream ended" + ); Self::signal_connection_issue( - &self.subscriptions, - &mut self.active_subscriptions, - &mut self.active_subscription_pubkeys, - &mut self.program_subscriptions, + &mut self.stream_manager, &self.abort_sender, &self.client_id, ) @@ -310,22 +274,17 @@ impl ChainLaserActor { } } }, - // Activate pending subscriptions - _ = activate_subs_interval.tick() => { - self.update_active_subscriptions(); - }, - } } } - fn handle_msg(&mut self, msg: ChainPubsubActorMessage) -> bool { + async fn handle_msg(&mut self, msg: ChainPubsubActorMessage) -> bool { use ChainPubsubActorMessage::*; match msg { AccountSubscribe { pubkey, response, .. } => { - self.add_sub(pubkey, response); + self.add_sub(pubkey, response).await; false } AccountUnsubscribe { pubkey, response } => { @@ -333,12 +292,19 @@ impl ChainLaserActor { false } ProgramSubscribe { pubkey, response } => { - let commitment = self.commitment; - let laser_client_config = self.laser_client_config.clone(); - self.add_program_sub(pubkey, commitment, laser_client_config); - let _ = response.send(Ok(())).inspect_err(|_| { - warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response"); - }); + let result = self + .stream_manager + .add_program_subscription(pubkey, &self.commitment) + .await; + if let Err(e) = result { + let _ = response.send(Err(e)).inspect_err(|_| { + warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response"); + }); + } else { + let _ = response.send(Ok(())).inspect_err(|_| { + warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response"); + }); + }; false } Reconnect { response } => { @@ -355,12 +321,7 @@ impl ChainLaserActor { } Shutdown { response } => { info!(client_id = self.client_id, "Received Shutdown message"); - Self::clear_subscriptions( - &self.subscriptions, - &mut self.active_subscriptions, - &mut self.active_subscription_pubkeys, - &mut self.program_subscriptions, - ); + Self::clear_subscriptions(&mut self.stream_manager); let _ = response.send(Ok(())).inspect_err(|_| { warn!( client_id = self.client_id, @@ -372,133 +333,83 @@ impl ChainLaserActor { } } - /// Tracks subscriptions, but does not yet activate them. - fn add_sub( + /// Subscribes to the given pubkey immediately by forwarding + /// to the stream manager. + async fn add_sub( &mut self, pubkey: Pubkey, sub_response: oneshot::Sender>, ) { - let inserted = { - // Fast path: check with read lock first - let already_subscribed = { - let subs = self.subscriptions.read(); - subs.contains(&pubkey) - }; - - if already_subscribed { - false - } else { - // Write lock only when we need to modify - let mut subs = self.subscriptions.write(); - subs.insert(pubkey); - true - } - }; - if !inserted { - trace!(pubkey = %pubkey, "Already subscribed to account"); + if self.stream_manager.is_subscribed(&pubkey) { + debug!( + pubkey = %pubkey, + "Already subscribed to account" + ); sub_response.send(Ok(())).unwrap_or_else(|_| { warn!(pubkey = %pubkey, "Failed to send already subscribed response"); }); - } else { - if self.active_subscriptions.is_empty() { - self.update_active_subscriptions(); - } - sub_response.send(Ok(())).unwrap_or_else(|_| { - warn!(pubkey = %pubkey, "Failed to send subscribe response"); - }) + return; } + + let from_slot = self.determine_from_slot().map(|(_, fs)| fs); + let result = self + .stream_manager + .account_subscribe(&[pubkey], &self.commitment, from_slot) + .await; + + let response = match result { + Ok(()) => Ok(()), + Err(e) => { + error!( + pubkey = %pubkey, + error = ?e, + "Failed to subscribe to account" + ); + Err(e) + } + }; + sub_response.send(response).unwrap_or_else(|_| { + warn!( + pubkey = %pubkey, + "Failed to send subscribe response" + ); + }); } - /// Removes a subscription, but does not yet deactivate it. + /// Removes a subscription and forwards to the stream manager. fn remove_sub( &mut self, pubkey: &Pubkey, unsub_response: oneshot::Sender>, ) { - let removed = self.subscriptions.write().remove(pubkey); - match removed { - true => { - trace!(pubkey = %pubkey, "Unsubscribed from account"); - unsub_response.send(Ok(())).unwrap_or_else(|_| { - warn!(pubkey = %pubkey, "Failed to send unsubscribe response"); - }); - } - false => { - unsub_response - .send(Err( - RemoteAccountProviderError::AccountSubscriptionDoesNotExist( - pubkey.to_string(), - ), - )) - .unwrap_or_else(|_| { - warn!(pubkey = %pubkey, "Failed to send unsubscribe response"); - }); - } - } - } - - fn update_active_subscriptions(&mut self) { - // Copy subscriptions and release the read lock immediately - let new_pubkeys: HashSet = { - let subs = self.subscriptions.read(); - // Check if the active subscriptions match what we already have - if subs.eq(&self.active_subscription_pubkeys) { - trace!( - count = subs.len(), - "Active subscriptions already up to date" - ); - return; - } - subs.iter().copied().collect() - }; - - inc_account_subscription_activations_count(&self.client_id); - - let mut new_subs: StreamMap = StreamMap::new(); - - // Re-create streams for all subscriptions - let sub_refs = new_pubkeys.iter().collect::>(); - - let chunks = sub_refs - .chunks(PER_STREAM_SUBSCRIPTION_LIMIT) - .map(|chunk| chunk.to_vec()) - .collect::>(); - - let (chain_slot, from_slot) = self - .determine_from_slot() - .map(|(cs, fs)| (Some(cs), Some(fs))) - .unwrap_or((None, None)); - - if tracing::enabled!(tracing::Level::TRACE) { + if self.stream_manager.is_subscribed(pubkey) { + self.stream_manager.account_unsubscribe(&[*pubkey]); trace!( - account_count = new_pubkeys.len(), - chain_slot, - from_slot, - stream_count = chunks.len(), - "Activating account subscriptions" - ); - } - - for (idx, chunk) in chunks.into_iter().enumerate() { - let stream = Self::create_accounts_and_slot_stream( - &chunk, - &self.commitment, - &self.laser_client_config, - idx, - from_slot, + pubkey = %pubkey, + "Unsubscribed from account" ); - new_subs.insert(idx, Box::pin(stream)); + unsub_response.send(Ok(())).unwrap_or_else(|_| { + warn!(pubkey = %pubkey, "Failed to send unsubscribe response"); + }); + } else { + unsub_response + .send(Err( + RemoteAccountProviderError::AccountSubscriptionDoesNotExist( + pubkey.to_string(), + ), + )) + .unwrap_or_else(|_| { + warn!(pubkey = %pubkey, "Failed to send unsubscribe response"); + }); } - - // Drop current active subscriptions by reassignig to new ones - self.active_subscriptions = new_subs; - self.active_subscription_pubkeys = new_pubkeys; } - /// Determines the from_slot for backfilling subscription updates. + /// Determines the from_slot for backfilling subscription + /// updates. /// - /// Returns `Some((chain_slot, from_slot))` if backfilling is supported and we have a valid chain slot, - /// otherwise returns `None`. + /// Returns `Some((chain_slot, from_slot))` if backfilling is + /// supported and we have a valid chain slot, otherwise + /// returns `None`. fn determine_from_slot(&self) -> Option<(u64, u64)> { if !self.slots.supports_backfill { return None; @@ -506,22 +417,17 @@ impl ChainLaserActor { let chain_slot = self.slots.chain_slot.load(); if chain_slot == 0 { - // If we didn't get a chain slot update yet we cannot backfill return None; } - // Get last activation slot and update to current chain slot let last_activation_slot = self .slots .last_activation_slot .swap(chain_slot, Ordering::Relaxed); - // when this is called the first time make the best effort to find a reasonable - // slot to backfill from. let from_slot = if last_activation_slot == 0 { - chain_slot.saturating_sub(SLOTS_BETWEEN_ACTIVATIONS + 1) + chain_slot.saturating_sub(MAX_SLOTS_BACKFILL) } else { - // Limit how far back we go in order to avoid data loss errors let target_slot = last_activation_slot.saturating_sub(1); let delta = chain_slot.saturating_sub(target_slot); if delta < MAX_SLOTS_BACKFILL { @@ -533,137 +439,38 @@ impl ChainLaserActor { Some((chain_slot, from_slot)) } - /// Helper to create a dedicated stream for a number of accounts. - /// It includes a slot subscription for chain slot synchronization. - /// This is not 100% cleanly separated but avoids creating another connection - /// just for slot updates. - /// NOTE: no slot update subscription will be created until the first - /// accounts subscription is created. - fn create_accounts_and_slot_stream( - pubkeys: &[&Pubkey], - commitment: &CommitmentLevel, - laser_client_config: &LaserstreamConfig, - idx: usize, - from_slot: Option, - ) -> impl Stream { - let mut accounts = HashMap::new(); - accounts.insert( - format!("account_subs: {idx}"), - SubscribeRequestFilterAccounts { - account: pubkeys.iter().map(|pk| pk.to_string()).collect(), - ..Default::default() - }, - ); - - // Subscribe to slot updates for chain_slot synchronization - let mut slots = HashMap::new(); - slots.insert( - "slot_updates".to_string(), - SubscribeRequestFilterSlots { - filter_by_commitment: Some(true), - ..Default::default() - }, - ); - - let request = SubscribeRequest { - accounts, - slots, - commitment: Some((*commitment).into()), - // NOTE: triton does not support backfilling and we could not verify this with - // helius due to being rate limited. - from_slot, - ..Default::default() - }; - client::subscribe(laser_client_config.clone(), request).0 - } - - fn add_program_sub( + /// Handles an update from any subscription stream. + #[instrument(skip(self), fields(client_id = %self.client_id))] + async fn handle_stream_result( &mut self, - program_id: Pubkey, - commitment: CommitmentLevel, - laser_client_config: LaserstreamConfig, + src: StreamUpdateSource, + result: LaserResult, ) { - if self - .program_subscriptions - .as_ref() - .map(|(subscribed_programs, _)| { - subscribed_programs.contains(&program_id) - }) - .unwrap_or(false) - { - trace!(program_id = %program_id, "Program subscription already exists"); - return; - } - - let mut subscribed_programs = self - .program_subscriptions - .as_ref() - .map(|x| x.0.iter().cloned().collect::>()) - .unwrap_or_default(); - - subscribed_programs.insert(program_id); - - let mut accounts = HashMap::new(); - accounts.insert( - format!("program_sub: {program_id}"), - SubscribeRequestFilterAccounts { - owner: subscribed_programs - .iter() - .map(|pk| pk.to_string()) - .collect(), - ..Default::default() - }, - ); - let request = SubscribeRequest { - accounts, - commitment: Some(commitment.into()), - ..Default::default() + let update_source = match src { + StreamUpdateSource::Account => AccountUpdateSource::Account, + StreamUpdateSource::Program => AccountUpdateSource::Program, }; - let stream = client::subscribe(laser_client_config.clone(), request).0; - self.program_subscriptions = - Some((subscribed_programs, Box::pin(stream))); - } - - /// Handles an update from one of the account data streams. - #[instrument(skip(self), fields(client_id = %self.client_id, stream_index = %idx))] - async fn handle_account_update( - &mut self, - (idx, result): LaserStreamUpdate, - ) { - match result { - Ok(subscribe_update) => { - self.process_subscription_update( - subscribe_update, - AccountUpdateSource::Account, - ) - .await; - } - Err(err) => { - self.handle_stream_error(&err, "account update").await; - } - } - } - - /// Handles an update from the program subscriptions stream. - #[instrument(skip(self), fields(client_id = %self.client_id))] - async fn handle_program_update(&mut self, result: LaserResult) { match result { Ok(subscribe_update) => { self.process_subscription_update( subscribe_update, - AccountUpdateSource::Program, + update_source, ) .await; } Err(err) => { - self.handle_stream_error(&err, "program subscription").await; + let label = match src { + StreamUpdateSource::Account => "account update", + StreamUpdateSource::Program => "program subscription", + }; + self.handle_stream_error(&err, label).await; } } } - /// Common error handling for stream errors. Detects "fallen behind" errors - /// and spawns diagnostics to compare our last known slot with the actual - /// chain slot via RPC. + /// Common error handling for stream errors. Detects "fallen + /// behind" errors and spawns diagnostics to compare our last + /// known slot with the actual chain slot via RPC. async fn handle_stream_error( &mut self, err: &LaserstreamError, @@ -673,12 +480,14 @@ impl ChainLaserActor { self.spawn_fallen_behind_diagnostics(source); } - error!(error = ?err, slots = ?self.slots, "Error in {} stream", source); + error!( + error = ?err, + slots = ?self.slots, + "Error in {} stream", + source, + ); Self::signal_connection_issue( - &self.subscriptions, - &mut self.active_subscriptions, - &mut self.active_subscription_pubkeys, - &mut self.program_subscriptions, + &mut self.stream_manager, &self.abort_sender, &self.client_id, ) @@ -745,28 +554,19 @@ impl ChainLaserActor { }); } - fn clear_subscriptions( - subscriptions: &SharedSubscriptions, - active_subscriptions: &mut StreamMap, - active_subscription_pubkeys: &mut HashSet, - program_subscriptions: &mut Option<(HashSet, LaserStream)>, - ) { - subscriptions.write().clear(); - active_subscriptions.clear(); - active_subscription_pubkeys.clear(); - *program_subscriptions = None; + fn clear_subscriptions(stream_manager: &mut StreamManager) { + stream_manager.clear_account_subscriptions(); + stream_manager.clear_program_subscriptions(); } - /// Signals a connection issue by clearing all subscriptions and - /// sending a message on the abort channel. - /// NOTE: the laser client should handle reconnects internally, but - /// we add this as a backup in case it is unable to do so - #[instrument(skip(subscriptions, active_subscriptions, active_subscription_pubkeys, program_subscriptions, abort_sender), fields(client_id = %client_id))] + /// Signals a connection issue by clearing all subscriptions + /// and sending a message on the abort channel. + /// NOTE: the laser client should handle reconnects + /// internally, but we add this as a backup in case it is + /// unable to do so + #[instrument(skip(stream_manager, abort_sender), fields(client_id = %client_id))] async fn signal_connection_issue( - subscriptions: &SharedSubscriptions, - active_subscriptions: &mut StreamMap, - active_subscription_pubkeys: &mut HashSet, - program_subscriptions: &mut Option<(HashSet, LaserStream)>, + stream_manager: &mut StreamManager, abort_sender: &mpsc::Sender<()>, client_id: &str, ) { @@ -780,19 +580,16 @@ impl ChainLaserActor { &SIGNAL_CONNECTION_COUNT, ); - // Clear all subscriptions - Self::clear_subscriptions( - subscriptions, - active_subscriptions, - active_subscription_pubkeys, - program_subscriptions, - ); + Self::clear_subscriptions(stream_manager); - // Use try_send to avoid blocking and naturally coalesce signals + // Use try_send to avoid blocking and naturally + // coalesce signals let _ = abort_sender.try_send(()).inspect_err(|err| { - // Channel full is expected when reconnect is already in progress if !matches!(err, mpsc::error::TrySendError::Full(_)) { - error!(error = ?err, "Failed to signal connection issue"); + error!( + error = ?err, + "Failed to signal connection issue" + ); } }); } @@ -873,7 +670,7 @@ impl ChainLaserActor { ); } - if !self.subscriptions.read().contains(&pubkey) { + if !self.stream_manager.is_subscribed(&pubkey) { return; } diff --git a/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/mock.rs b/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/mock.rs new file mode 100644 index 000000000..b63b34f5f --- /dev/null +++ b/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/mock.rs @@ -0,0 +1,258 @@ +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use helius_laserstream::{ + grpc::{self, SubscribeRequest}, + LaserstreamError, +}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use super::{LaserResult, StreamFactory}; +use crate::remote_account_provider::chain_laser_actor::{ + LaserStreamWithHandle, StreamHandle, +}; + +/// A test mock that captures subscription requests and allows driving +/// streams programmatically. +#[derive(Clone)] +pub struct MockStreamFactory { + /// Every `SubscribeRequest` passed to `subscribe()` is recorded + /// here so tests can assert on filter contents, commitment levels, + /// etc. + captured_requests: Arc>>, + + /// Requests sent through a `MockStreamHandle::write()` call are + /// recorded here so tests can verify handle-driven updates. + handle_requests: Arc>>, + + /// A sender that the test uses to push `LaserResult` items into + /// the streams returned by `subscribe()`. + /// Each call to `subscribe()` creates a new mpsc channel; the rx + /// side becomes the returned stream, and the tx side is stored + /// here so the test can drive updates. + stream_senders: Arc>>>>, +} + +impl MockStreamFactory { + /// Create a new mock stream factory + pub fn new() -> Self { + Self { + captured_requests: Arc::new(Mutex::new(Vec::new())), + handle_requests: Arc::new(Mutex::new(Vec::new())), + stream_senders: Arc::new(Mutex::new(Vec::new())), + } + } + + /// Get the captured subscription requests (from `subscribe()`) + pub fn captured_requests(&self) -> Vec { + self.captured_requests.lock().unwrap().clone() + } + + /// Get the requests sent through stream handles (from + /// `handle.write()`) + pub fn handle_requests(&self) -> Vec { + self.handle_requests.lock().unwrap().clone() + } + + /// Push an error update to a specific stream + pub fn push_error_to_stream(&self, idx: usize, error: LaserstreamError) { + let senders = self.stream_senders.lock().unwrap(); + if let Some(sender) = senders.get(idx) { + let _ = sender.send(Err(error)); + } + } + + /// Push a success update to all active streams + pub fn push_success_to_all(&self, update: grpc::SubscribeUpdate) { + let senders = self.stream_senders.lock().unwrap(); + for sender in senders.iter() { + let _ = sender.send(Ok(update.clone())); + } + } + + /// Push an update to a specific stream by index + pub fn push_update_to_stream(&self, idx: usize, update: LaserResult) { + let senders = self.stream_senders.lock().unwrap(); + if let Some(sender) = senders.get(idx) { + let _ = sender.send(update); + } + } + + /// Get the number of active streams + pub fn active_stream_count(&self) -> usize { + self.stream_senders.lock().unwrap().len() + } + + /// Close a specific stream by index + pub fn close_stream(&self, idx: usize) { + let mut senders = self.stream_senders.lock().unwrap(); + if idx < senders.len() { + senders.remove(idx); + } + } + + /// Clear all state (requests, handle requests and streams) + pub fn clear(&self) { + self.captured_requests.lock().unwrap().clear(); + self.handle_requests.lock().unwrap().clear(); + self.stream_senders.lock().unwrap().clear(); + } +} + +impl Default for MockStreamFactory { + fn default() -> Self { + Self::new() + } +} + +/// Mock handle that records write requests and drains them into the +/// shared `handle_requests` vec on the factory. +#[derive(Clone)] +pub struct MockStreamHandle { + handle_requests: Arc>>, +} + +#[async_trait] +impl StreamHandle for MockStreamHandle { + async fn write( + &self, + request: SubscribeRequest, + ) -> Result<(), LaserstreamError> { + self.handle_requests.lock().unwrap().push(request); + Ok(()) + } +} + +impl StreamFactory for MockStreamFactory { + fn subscribe( + &self, + request: SubscribeRequest, + ) -> LaserStreamWithHandle { + // Record the initial subscribe request + self.captured_requests.lock().unwrap().push(request); + + // Create a channel for driving LaserResult items into the + // stream + let (stream_tx, stream_rx) = mpsc::unbounded_channel::(); + let stream = Box::pin(UnboundedReceiverStream::new(stream_rx)); + + let stream_tx = Arc::new(stream_tx); + self.stream_senders.lock().unwrap().push(stream_tx); + + // The handle shares the factory's handle_requests vec so + // every write is visible to tests immediately. + let handle = MockStreamHandle { + handle_requests: Arc::clone(&self.handle_requests), + }; + + LaserStreamWithHandle { stream, handle } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use helius_laserstream::grpc::{ + CommitmentLevel, SubscribeRequestFilterAccounts, + }; + + use super::*; + + #[test] + fn test_mock_captures_requests() { + let mock = MockStreamFactory::new(); + + let mut accounts = HashMap::new(); + accounts.insert( + "test".to_string(), + SubscribeRequestFilterAccounts::default(), + ); + + let request = SubscribeRequest { + accounts, + commitment: Some(CommitmentLevel::Finalized.into()), + ..Default::default() + }; + + let _stream = mock.subscribe(request.clone()); + + let captured = mock.captured_requests(); + assert_eq!(captured.len(), 1); + assert_eq!(captured[0].commitment, request.commitment); + } + + #[tokio::test] + async fn test_mock_handle_write_records_requests() { + let mock = MockStreamFactory::new(); + + let request = SubscribeRequest::default(); + let result = mock.subscribe(request); + + assert_eq!(mock.active_stream_count(), 1); + + // Write an updated request through the handle + let mut accounts = HashMap::new(); + accounts.insert( + "updated".to_string(), + SubscribeRequestFilterAccounts::default(), + ); + let update_request = SubscribeRequest { + accounts, + commitment: Some(CommitmentLevel::Confirmed.into()), + ..Default::default() + }; + + result.handle.write(update_request.clone()).await.unwrap(); + + let handle_reqs = mock.handle_requests(); + assert_eq!(handle_reqs.len(), 1); + assert_eq!(handle_reqs[0].commitment, update_request.commitment); + assert!(handle_reqs[0].accounts.contains_key("updated")); + } + + #[tokio::test] + async fn test_mock_handle_write_multiple() { + let mock = MockStreamFactory::new(); + + let r1 = mock.subscribe(SubscribeRequest::default()); + let r2 = mock.subscribe(SubscribeRequest::default()); + + // Both handles share the same handle_requests vec + r1.handle + .write(SubscribeRequest { + commitment: Some(CommitmentLevel::Processed.into()), + ..Default::default() + }) + .await + .unwrap(); + + r2.handle + .write(SubscribeRequest { + commitment: Some(CommitmentLevel::Finalized.into()), + ..Default::default() + }) + .await + .unwrap(); + + let handle_reqs = mock.handle_requests(); + assert_eq!(handle_reqs.len(), 2); + assert_eq!(mock.captured_requests().len(), 2); + } + + #[test] + fn test_mock_can_clear() { + let mock = MockStreamFactory::new(); + + let request = SubscribeRequest::default(); + let _stream = mock.subscribe(request); + + assert_eq!(mock.captured_requests().len(), 1); + + mock.clear(); + + assert_eq!(mock.captured_requests().len(), 0); + assert_eq!(mock.handle_requests().len(), 0); + } +} diff --git a/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/mod.rs b/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/mod.rs new file mode 100644 index 000000000..182e0fb55 --- /dev/null +++ b/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/mod.rs @@ -0,0 +1,96 @@ +use std::{collections::HashSet, pin::Pin, sync::Arc}; + +use async_trait::async_trait; +use futures_util::Stream; +use helius_laserstream::{ + grpc::{SubscribeRequest, SubscribeUpdate}, + LaserstreamError, StreamHandle as HeliusStreamHandle, +}; +use parking_lot::RwLock; +use solana_pubkey::Pubkey; + +pub use self::{ + actor::{ChainLaserActor, Slots}, + stream_manager::{StreamManager, StreamManagerConfig, StreamUpdateSource}, +}; + +pub type SharedSubscriptions = Arc>>; + +mod actor; +#[cfg(test)] +mod mock; +mod stream_manager; + +/// Result of a laser stream operation +pub type LaserResult = Result; + +/// A laser stream of subscription updates +pub type LaserStream = Pin + Send>>; + +/// Abstraction over stream creation for testability +pub trait StreamFactory: Send + Sync + 'static { + /// Create a stream for the given subscription request + fn subscribe(&self, request: SubscribeRequest) -> LaserStreamWithHandle; +} + +/// A trait to represent the [HeliusStreamHandle]. +/// This is needed since we cannot create the helius one since +/// [helius_laserstream::StreamHandle::write_tx] is private and there is no constructor. +#[async_trait] +pub trait StreamHandle { + /// Send a new subscription request to update the active subscription. + async fn write( + &self, + request: SubscribeRequest, + ) -> Result<(), LaserstreamError>; +} + +pub struct LaserStreamWithHandle { + pub(crate) stream: LaserStream, + pub(crate) handle: S, +} + +pub struct StreamHandleImpl { + pub handle: HeliusStreamHandle, +} + +#[async_trait] +impl StreamHandle for StreamHandleImpl { + async fn write( + &self, + request: SubscribeRequest, + ) -> Result<(), LaserstreamError> { + // This async operation gets forwarded to the underlying subscription sender of the laser + // client and completes after the given item has been fully processed into the sink, + // including flushing. + // The assumption is that at that point it has been processed on the receiver end and the + // subscription is updated. + // See: https://github.com/helius-labs/laserstream-sdk/blob/v0.2.2/rust/src/client.rs#L196-L201 + self.handle.write(request).await + } +} + +/// Production stream factory that wraps helius client subscribe +pub struct StreamFactoryImpl { + config: helius_laserstream::LaserstreamConfig, +} + +impl StreamFactoryImpl { + pub fn new(config: helius_laserstream::LaserstreamConfig) -> Self { + Self { config } + } +} + +impl StreamFactory for StreamFactoryImpl { + fn subscribe( + &self, + request: SubscribeRequest, + ) -> LaserStreamWithHandle { + let (stream, handle) = + helius_laserstream::client::subscribe(self.config.clone(), request); + LaserStreamWithHandle { + stream: Box::pin(stream), + handle: StreamHandleImpl { handle }, + } + } +} diff --git a/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs b/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs new file mode 100644 index 000000000..3179025bd --- /dev/null +++ b/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs @@ -0,0 +1,1544 @@ +use std::collections::{HashMap, HashSet}; + +use helius_laserstream::grpc::{ + CommitmentLevel, SubscribeRequest, SubscribeRequestFilterAccounts, + SubscribeRequestFilterSlots, +}; +use solana_pubkey::Pubkey; +use tokio::time::Duration; +use tokio_stream::StreamMap; + +use super::{ + LaserResult, LaserStream, LaserStreamWithHandle, SharedSubscriptions, + StreamFactory, +}; +use crate::remote_account_provider::{ + chain_laser_actor::StreamHandle, RemoteAccountProviderError, + RemoteAccountProviderResult, +}; + +/// Identifies whether a stream update came from an account or +/// program subscription stream. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum StreamUpdateSource { + Account, + Program, +} + +/// Identifies a stream within the [StreamMap]. +/// +/// Each variant maps to a stream category. The `usize` index +/// corresponds to the position within the respective `Vec` of +/// handles. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +enum StreamKey { + CurrentNew, + UnoptimizedOld(usize), + OptimizedOld(usize), + Program, +} + +impl StreamKey { + fn source(&self) -> StreamUpdateSource { + match self { + StreamKey::Program => StreamUpdateSource::Program, + _ => StreamUpdateSource::Account, + } + } +} + +/// Configuration for the generational stream manager. +#[allow(unused)] +pub struct StreamManagerConfig { + /// Max subscriptions per optimized old stream chunk. + pub max_subs_in_old_optimized: usize, + /// Max unoptimized old streams before optimization is triggered. + pub max_old_unoptimized: usize, + /// Max subscriptions in the current-new stream before it is + /// promoted to an unoptimized old stream. + pub max_subs_in_new: usize, +} + +impl Default for StreamManagerConfig { + fn default() -> Self { + Self { + max_subs_in_old_optimized: 2000, + max_old_unoptimized: 10, + max_subs_in_new: 200, + } + } +} + +/// Manages the creation and lifecycle of GRPC laser streams. +/// +/// Account subscriptions follow a generational approach: +/// - New subscriptions go into the *current-new* stream. +/// - When the current-new stream exceeds [StreamManagerConfig::max_subs_in_new] it is +/// promoted to the [Self::unoptimized_old_handles] vec and a fresh current-new stream is created. +/// - When [Self::unoptimized_old_handles] exceed [StreamManagerConfig::max_old_unoptimized], +/// optimization is triggered which rebuilds all streams from the +/// `subscriptions` set into [StreamManager::optimized_old_handles] chunked by +/// [StreamManagerConfig::max_subs_in_old_optimized]. +/// +/// Unsubscribe only removes from the [Self::subscriptions] HashSet — it +/// never touches streams. Updates for unsubscribed pubkeys are +/// ignored at the actor level. +/// Unsubscribed accounts are dropped as part of optimization. +/// +/// Streams are stored in a persistent [StreamMap] keyed by +/// [StreamKey]. The map is only updated when stream topology +/// changes (subscribe, promote, optimize, clear). The +/// corresponding handles are stored separately for use in +/// [Self::update_subscriptions]. +#[allow(unused)] +pub struct StreamManager> { + /// Configures limits for stream management + config: StreamManagerConfig, + /// The factory used to create streams + stream_factory: SF, + /// The canonical set of currently active account subscriptions. + /// These include subscriptions maintained across the different set + /// of streams. + subscriptions: SharedSubscriptions, + /// Pubkeys that are part of the current-new stream's filter. + current_new_subs: HashSet, + + // -- Handles (needed for update_subscriptions) -- + /// Handle for the current-new stream. + current_new_handle: Option, + /// Handles for unoptimized old streams. + unoptimized_old_handles: Vec, + /// Handles for optimized old streams. + optimized_old_handles: Vec, + /// Handle + pubkey set for program subscriptions. + program_sub: Option<(HashSet, S)>, + + // -- All streams live here. -- + /// Streams separated from the handles in order to allow using them + /// inside a StreamMap + /// They are addressed via the StreamKey which includes an index for + /// [Self::unoptimized_old_handles] and [Self::optimized_old_handles]. + /// The key index matches the index of the corresponding vec. + /// Persistent stream map polled by [Self::next_update]. + /// Updated only when stream topology changes. + stream_map: StreamMap, +} + +#[allow(unused)] +impl> StreamManager { + pub fn new(config: StreamManagerConfig, stream_factory: SF) -> Self { + Self { + config, + stream_factory, + subscriptions: Default::default(), + current_new_subs: HashSet::new(), + current_new_handle: None, + unoptimized_old_handles: Vec::new(), + optimized_old_handles: Vec::new(), + program_sub: None, + stream_map: StreamMap::new(), + } + } + + /// Update a stream's subscriptions with retry logic. + /// + /// Attempts to write the given request to the stream handle up to 5 + /// times with linear backoff. Returns an error if all retries are + /// exhausted. + async fn update_subscriptions( + handle: &S, + task: &str, + request: SubscribeRequest, + ) -> RemoteAccountProviderResult<()> { + const MAX_RETRIES: usize = 5; + let mut retries = MAX_RETRIES; + let initial_retries = retries; + + loop { + match handle.write(request.clone()).await { + Ok(()) => return Ok(()), + Err(err) => { + if retries > 0 { + retries -= 1; + // Linear backoff: sleep longer as retries decrease + let backoff_ms = + 50u64 * (initial_retries - retries) as u64; + tokio::time::sleep(Duration::from_millis(backoff_ms)) + .await; + continue; + } + return Err(RemoteAccountProviderError::GrpcSubscriptionUpdateFailed( + task.to_string(), + MAX_RETRIES, + format!("{err} ({err:?}"), + )); + } + } + } + } + + // --------------------- + // Account subscription + // --------------------- + + /// Subscribe to account updates for the given pubkeys. + /// + /// Each pubkey is added to [Self::subscriptions] and to the + /// current-new stream. If the current-new stream exceeds + /// [StreamManagerConfig::max_subs_in_new] it is promoted and + /// a fresh one is created. If unoptimized old handles exceed + /// [StreamManagerConfig::max_old_unoptimized], optimization + /// is triggered. + pub async fn account_subscribe( + &mut self, + pubkeys: &[Pubkey], + commitment: &CommitmentLevel, + from_slot: Option, + ) -> RemoteAccountProviderResult<()> { + // Filter out pubkeys already in subscriptions. + let new_pks: Vec = { + let subs = self.subscriptions.read(); + pubkeys + .iter() + .filter(|pk| !subs.contains(pk)) + .copied() + .collect() + }; + + if new_pks.is_empty() { + return Ok(()); + } + + { + let mut subs = self.subscriptions.write(); + for pk in &new_pks { + subs.insert(*pk); + self.current_new_subs.insert(*pk); + } + } + + // Update the current-new stream with the full + // current_new_subs filter (either create new if doesn't + // exist, or update existing via write). + if let Some(handle) = &self.current_new_handle { + let request = Self::build_account_request( + &self.current_new_subs.iter().collect::>(), + commitment, + from_slot, + ); + Self::update_subscriptions(handle, "account_subscribe", request) + .await? + } else { + let pks: Vec = + self.current_new_subs.iter().copied().collect(); + let pk_refs: Vec<&Pubkey> = pks.iter().collect(); + self.insert_current_new_stream(&pk_refs, commitment, from_slot); + } + + // Promote if current-new exceeds threshold. + if self.current_new_subs.len() > self.config.max_subs_in_new { + let overflow_count = + self.current_new_subs.len() - self.config.max_subs_in_new; + // The overflow pubkeys are the tail of new_pks. + let overflow_start = new_pks.len().saturating_sub(overflow_count); + let overflow_pks = &new_pks[overflow_start..]; + + // Move current-new to unoptimized old. + if let Some(stream) = self.stream_map.remove(&StreamKey::CurrentNew) + { + let idx = self.unoptimized_old_handles.len(); + self.stream_map + .insert(StreamKey::UnoptimizedOld(idx), stream); + } + if let Some(handle) = self.current_new_handle.take() { + self.unoptimized_old_handles.push(handle); + } + self.current_new_subs.clear(); + + // Start fresh current-new with overflow pubkeys. + if !overflow_pks.is_empty() { + for pk in overflow_pks { + self.current_new_subs.insert(*pk); + } + self.insert_current_new_stream( + &overflow_pks.iter().collect::>(), + commitment, + from_slot, + ); + } + + // If unoptimized old handles exceed the limit, + // optimize. + if self.unoptimized_old_handles.len() + > self.config.max_old_unoptimized + { + self.optimize(commitment); + } + } + + Ok(()) + } + + /// Unsubscribe the given pubkeys. + /// + /// Removes them from the `subscriptions` HashSet only — streams + /// are never modified. Updates for these pubkeys will be ignored + /// by the actor. + pub fn account_unsubscribe(&mut self, pubkeys: &[Pubkey]) { + let mut subs = self.subscriptions.write(); + for pk in pubkeys { + subs.remove(pk); + } + } + + /// Clears all account subscriptions and drops all account + /// streams. + pub fn clear_account_subscriptions(&mut self) { + self.subscriptions.write().clear(); + self.current_new_subs.clear(); + self.current_new_handle = None; + self.stream_map.remove(&StreamKey::CurrentNew); + for i in 0..self.unoptimized_old_handles.len() { + self.stream_map.remove(&StreamKey::UnoptimizedOld(i)); + } + self.unoptimized_old_handles.clear(); + for i in 0..self.optimized_old_handles.len() { + self.stream_map.remove(&StreamKey::OptimizedOld(i)); + } + self.optimized_old_handles.clear(); + } + + /// Returns `true` if any account stream exists. + pub fn has_account_subscriptions(&self) -> bool { + self.current_new_handle.is_some() + || !self.unoptimized_old_handles.is_empty() + || !self.optimized_old_handles.is_empty() + } + + /// Polls all streams in the [StreamMap], returning the next + /// available update tagged with its source. + /// Returns `None` when the map is empty. + pub async fn next_update( + &mut self, + ) -> Option<(StreamUpdateSource, LaserResult)> { + use tokio_stream::StreamExt; + let (key, result) = self.stream_map.next().await?; + Some((key.source(), result)) + } + + /// Returns `true` if any stream (account or program) exists. + pub fn has_any_subscriptions(&self) -> bool { + !self.stream_map.is_empty() + } + + /// Rebuild all account streams from `subscriptions`. + /// + /// 1. Chunk `subscriptions` into groups of + /// `max_subs_in_old_optimized`. + /// 2. Create a new stream for each chunk → + /// `optimized_old_handles`. + /// 3. Clear `unoptimized_old_handles`. + /// 4. Reset the current-new stream (empty filter). + pub fn optimize(&mut self, commitment: &CommitmentLevel) { + // Remove all account streams from the map. + self.stream_map.remove(&StreamKey::CurrentNew); + for i in 0..self.unoptimized_old_handles.len() { + self.stream_map.remove(&StreamKey::UnoptimizedOld(i)); + } + for i in 0..self.optimized_old_handles.len() { + self.stream_map.remove(&StreamKey::OptimizedOld(i)); + } + + // Collect all active subscriptions and chunk them. + let all_pks: Vec = + self.subscriptions.read().iter().copied().collect(); + + // Build optimized old streams from chunks. + self.optimized_old_handles = Vec::new(); + for (i, chunk) in all_pks + .chunks(self.config.max_subs_in_old_optimized) + .enumerate() + { + let refs: Vec<&Pubkey> = chunk.iter().collect(); + let LaserStreamWithHandle { stream, handle } = + self.stream_factory.subscribe(Self::build_account_request( + &refs, commitment, None, + )); + self.stream_map.insert(StreamKey::OptimizedOld(i), stream); + self.optimized_old_handles.push(handle); + } + + // Clear unoptimized old handles. + self.unoptimized_old_handles.clear(); + + // Reset the current-new stream. + self.current_new_subs.clear(); + self.current_new_handle = None; + } + + /// Returns `true` if the pubkey is in the active + /// `subscriptions` set. + pub fn is_subscribed(&self, pubkey: &Pubkey) -> bool { + self.subscriptions.read().contains(pubkey) + } + + // --------------------------------------------------------- + // Accessors — internal state inspection + // --------------------------------------------------------- + + /// Returns a reference to the shared subscriptions. + pub fn subscriptions(&self) -> &SharedSubscriptions { + &self.subscriptions + } + + /// Returns the number of pubkeys in the current-new stream's + /// filter. + fn current_new_sub_count(&self) -> usize { + self.current_new_subs.len() + } + + /// Returns a reference to the current-new stream's pubkey + /// set. + fn current_new_subs(&self) -> &HashSet { + &self.current_new_subs + } + + /// Returns the number of unoptimized old streams. + fn unoptimized_old_stream_count(&self) -> usize { + self.unoptimized_old_handles.len() + } + + /// Returns the number of optimized old streams. + fn optimized_old_stream_count(&self) -> usize { + self.optimized_old_handles.len() + } + + /// Returns the total number of account streams across all + /// generations. + fn account_stream_count(&self) -> usize { + let current = usize::from(self.current_new_handle.is_some()); + self.optimized_old_handles.len() + + self.unoptimized_old_handles.len() + + current + } + + // --------------------------------------------------------- + // Internal helpers + // --------------------------------------------------------- + + /// Build a `SubscribeRequest` for the given account pubkeys. + /// Includes a slot subscription for chain slot + /// synchronisation. + fn build_account_request( + pubkeys: &[&Pubkey], + commitment: &CommitmentLevel, + from_slot: Option, + ) -> SubscribeRequest { + let mut accounts = HashMap::new(); + accounts.insert( + "account_subs".to_string(), + SubscribeRequestFilterAccounts { + account: pubkeys.iter().map(|pk| pk.to_string()).collect(), + ..Default::default() + }, + ); + + let mut slots = HashMap::new(); + slots.insert( + "slot_updates".to_string(), + SubscribeRequestFilterSlots { + filter_by_commitment: Some(true), + ..Default::default() + }, + ); + + SubscribeRequest { + accounts, + slots, + commitment: Some((*commitment).into()), + from_slot, + ..Default::default() + } + } + + /// Create an account stream via the factory and insert it + /// as the current-new stream in the [StreamMap]. + fn insert_current_new_stream( + &mut self, + pubkeys: &[&Pubkey], + commitment: &CommitmentLevel, + from_slot: Option, + ) { + let request = + Self::build_account_request(pubkeys, commitment, from_slot); + let LaserStreamWithHandle { stream, handle } = + self.stream_factory.subscribe(request); + self.stream_map.insert(StreamKey::CurrentNew, stream); + self.current_new_handle = Some(handle); + } + + /// Adds a program subscription. If the program is already + /// subscribed, this is a no-op. Otherwise, updates the + /// program stream to include all subscribed programs. + pub async fn add_program_subscription( + &mut self, + program_id: Pubkey, + commitment: &CommitmentLevel, + ) -> RemoteAccountProviderResult<()> { + if self + .program_sub + .as_ref() + .is_some_and(|(subs, _)| subs.contains(&program_id)) + { + return Ok(()); + } + + let mut subscribed_programs = self + .program_sub + .as_ref() + .map(|(subs, _)| subs.clone()) + .unwrap_or_default(); + + subscribed_programs.insert(program_id); + + let program_ids: Vec<&Pubkey> = subscribed_programs.iter().collect(); + let request = Self::build_program_request(&program_ids, commitment); + + if let Some((subs, handle)) = &self.program_sub { + Self::update_subscriptions(handle, "program_subscribe", request) + .await?; + if let Some((subs, _)) = &mut self.program_sub { + *subs = subscribed_programs; + } + } else { + let LaserStreamWithHandle { stream, handle } = + self.create_program_stream(&program_ids, commitment); + self.stream_map.insert(StreamKey::Program, stream); + self.program_sub = Some((subscribed_programs, handle)); + } + + Ok(()) + } + + /// Returns whether there are active program subscriptions. + pub fn has_program_subscriptions(&self) -> bool { + self.program_sub.is_some() + } + + /// Clears all program subscriptions. + pub fn clear_program_subscriptions(&mut self) { + self.stream_map.remove(&StreamKey::Program); + self.program_sub = None; + } + + /// Build a `SubscribeRequest` for the given program IDs. + fn build_program_request( + program_ids: &[&Pubkey], + commitment: &CommitmentLevel, + ) -> SubscribeRequest { + let mut accounts = HashMap::new(); + accounts.insert( + "program_sub".to_string(), + SubscribeRequestFilterAccounts { + owner: program_ids.iter().map(|pk| pk.to_string()).collect(), + ..Default::default() + }, + ); + + SubscribeRequest { + accounts, + commitment: Some((*commitment).into()), + ..Default::default() + } + } + + /// Creates a subscription stream for program updates. + fn create_program_stream( + &self, + program_ids: &[&Pubkey], + commitment: &CommitmentLevel, + ) -> LaserStreamWithHandle { + let request = Self::build_program_request(program_ids, commitment); + self.stream_factory.subscribe(request) + } +} + +#[cfg(test)] +mod tests { + use helius_laserstream::grpc::CommitmentLevel; + use solana_pubkey::Pubkey; + + use super::*; + use crate::remote_account_provider::chain_laser_actor::mock::{ + MockStreamFactory, MockStreamHandle, + }; + + // ----------------- + // Helpers + // ----------------- + fn test_config() -> StreamManagerConfig { + StreamManagerConfig { + max_subs_in_old_optimized: 10, + max_old_unoptimized: 3, + max_subs_in_new: 5, + } + } + + fn create_manager() -> ( + StreamManager, + MockStreamFactory, + ) { + let factory = MockStreamFactory::new(); + let manager = StreamManager::new(test_config(), factory.clone()); + (manager, factory) + } + + fn make_pubkeys(n: usize) -> Vec { + (0..n).map(|_| Pubkey::new_unique()).collect() + } + + /// Collect all account pubkey strings from a captured + /// `SubscribeRequest`'s account filters. + fn account_pubkeys_from_request(req: &SubscribeRequest) -> HashSet { + req.accounts + .values() + .flat_map(|f| f.account.iter().cloned()) + .collect() + } + + /// Assert that `subscriptions()` contains exactly `expected` + /// (order-independent, exact count). + fn assert_subscriptions_eq( + mgr: &StreamManager, + expected: &[Pubkey], + ) { + let subs = mgr.subscriptions().read(); + assert_eq!( + subs.len(), + expected.len(), + "expected {} subscriptions, got {}", + expected.len(), + subs.len(), + ); + for pk in expected { + assert!(subs.contains(pk), "subscription set missing pubkey {pk}",); + } + } + + /// Assert that a `SubscribeRequest` filter contains exactly the + /// given pubkeys (order-independent, exact count). + fn assert_request_has_exact_pubkeys( + req: &SubscribeRequest, + expected: &[Pubkey], + ) { + let filter = account_pubkeys_from_request(req); + assert_eq!( + filter.len(), + expected.len(), + "expected {} pubkeys in filter, got {}", + expected.len(), + filter.len(), + ); + for pk in expected { + assert!( + filter.contains(&pk.to_string()), + "request filter missing pubkey {pk}", + ); + } + } + + // --------------------------------------------------------- + // Additional helpers + // --------------------------------------------------------- + + const COMMITMENT: CommitmentLevel = CommitmentLevel::Processed; + + /// Subscribe `n` pubkeys one-at-a-time, returning the created + /// pubkeys. + async fn subscribe_n( + mgr: &mut StreamManager, + n: usize, + ) -> Vec { + let pks = make_pubkeys(n); + mgr.account_subscribe(&pks, &COMMITMENT, None) + .await + .unwrap(); + pks + } + + /// Subscribe pubkeys in batches of `batch` until `total` pubkeys + /// have been subscribed. Returns all created pubkeys. + async fn subscribe_in_batches( + mgr: &mut StreamManager, + total: usize, + batch: usize, + ) -> Vec { + let mut all = Vec::new(); + let mut remaining = total; + while remaining > 0 { + let n = remaining.min(batch); + let pks = make_pubkeys(n); + mgr.account_subscribe(&pks, &COMMITMENT, None) + .await + .unwrap(); + all.extend(pks); + remaining -= n; + } + all + } + + /// Returns the union of all account pubkey strings across all + /// captured requests from `start_idx` onward. + fn all_filter_pubkeys_from( + factory: &MockStreamFactory, + start_idx: usize, + ) -> HashSet { + factory + .captured_requests() + .iter() + .skip(start_idx) + .flat_map(account_pubkeys_from_request) + .collect() + } + + // ------------------------------------------------------------- + // 1. Subscription Tracking + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_subscribe_single_pubkey_adds_to_subscriptions() { + let (mut mgr, factory) = create_manager(); + let pk = Pubkey::new_unique(); + + mgr.account_subscribe(&[pk], &COMMITMENT, None) + .await + .unwrap(); + + assert_subscriptions_eq(&mgr, &[pk]); + + let reqs = factory.captured_requests(); + assert_eq!(reqs.len(), 1); + assert_request_has_exact_pubkeys(&reqs[0], &[pk]); + } + + #[tokio::test] + async fn test_subscribe_multiple_pubkeys_at_once() { + let (mut mgr, factory) = create_manager(); + let pks = make_pubkeys(5); + + mgr.account_subscribe(&pks, &COMMITMENT, None) + .await + .unwrap(); + + assert_subscriptions_eq(&mgr, &pks); + + let reqs = factory.captured_requests(); + assert_eq!(reqs.len(), 1); + assert_request_has_exact_pubkeys(&reqs[0], &pks); + } + + #[tokio::test] + async fn test_subscribe_duplicate_pubkey_is_noop() { + let (mut mgr, factory) = create_manager(); + let pk = Pubkey::new_unique(); + + mgr.account_subscribe(&[pk], &COMMITMENT, None) + .await + .unwrap(); + let calls_after_first = factory.captured_requests().len(); + + mgr.account_subscribe(&[pk], &COMMITMENT, None) + .await + .unwrap(); + + assert_subscriptions_eq(&mgr, &[pk]); + assert_eq!(factory.captured_requests().len(), calls_after_first); + } + + #[tokio::test] + async fn test_subscribe_incremental_calls_accumulate() { + let (mut mgr, factory) = create_manager(); + let pks = make_pubkeys(3); + + mgr.account_subscribe(&[pks[0]], &COMMITMENT, None) + .await + .unwrap(); + mgr.account_subscribe(&[pks[1]], &COMMITMENT, None) + .await + .unwrap(); + mgr.account_subscribe(&[pks[2]], &COMMITMENT, None) + .await + .unwrap(); + + assert_subscriptions_eq(&mgr, &pks); + + // First subscribe call creates the stream with just pks[0] + let reqs = factory.captured_requests(); + assert_eq!(reqs.len(), 1); + assert_request_has_exact_pubkeys(&reqs[0], &[pks[0]]); + + // Subsequent calls update via handle.write() which accumulates + let handle_reqs = factory.handle_requests(); + assert!(!handle_reqs.is_empty()); + let last_handle_req = handle_reqs.last().unwrap(); + assert_request_has_exact_pubkeys(last_handle_req, &pks); + } + + // ------------------------------------------------------------- + // 2. Current-New Stream Lifecycle + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_new_stream_created_on_first_subscribe() { + let (mut mgr, factory) = create_manager(); + assert_eq!(mgr.account_stream_count(), 0); + + subscribe_n(&mut mgr, 1).await; + + assert_eq!(mgr.account_stream_count(), 1); + assert_eq!(factory.active_stream_count(), 1); + } + + #[tokio::test] + async fn test_current_new_stream_stays_below_threshold() { + let (mut mgr, _factory) = create_manager(); + // MAX_NEW - 1 = 4 + subscribe_in_batches(&mut mgr, 4, 2).await; + + assert_eq!(mgr.account_stream_count(), 1); + assert_eq!(mgr.unoptimized_old_stream_count(), 0); + } + + #[tokio::test] + async fn test_current_new_stream_promoted_at_threshold() { + let (mut mgr, factory) = create_manager(); + // Subscribe MAX_NEW (5) pubkeys first. + let first_five = make_pubkeys(5); + mgr.account_subscribe(&first_five, &COMMITMENT, None) + .await + .unwrap(); + assert_eq!(mgr.unoptimized_old_stream_count(), 0); + + // Subscribe the 6th pubkey → triggers promotion. + let sixth = Pubkey::new_unique(); + mgr.account_subscribe(&[sixth], &COMMITMENT, None) + .await + .unwrap(); + + assert_eq!(mgr.unoptimized_old_stream_count(), 1); + // A new current-new stream was created for the 6th pubkey. + assert!(mgr.current_new_subs().contains(&sixth)); + // The factory received a new subscribe call for the fresh + // current-new stream. + let reqs = factory.captured_requests(); + assert!(reqs.len() >= 2); + } + + #[tokio::test] + async fn test_multiple_promotions_accumulate_unoptimized() { + let (mut mgr, _factory) = create_manager(); + // First promotion: subscribe 6 pubkeys (exceeds MAX_NEW=5). + subscribe_n(&mut mgr, 6).await; + assert_eq!(mgr.unoptimized_old_stream_count(), 1); + + // Second promotion: subscribe 5 more to fill the new current, + // then 1 more to exceed. + subscribe_n(&mut mgr, 5).await; + assert_eq!(mgr.unoptimized_old_stream_count(), 2); + + // Current-new stream should only hold the overflow pubkeys. + assert!(mgr.current_new_sub_count() <= 1); + } + + // ------------------------------------------------------------- + // 3. Optimization Trigger via MAX_OLD_UNOPTIMIZED + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_optimization_triggered_when_unoptimized_exceeds_max() { + let (mut mgr, _factory) = create_manager(); + // MAX_OLD_UNOPTIMIZED = 3. We need 4 promotions. + // Each promotion needs > MAX_NEW (5) pubkeys in current-new. + // Subscribe 6 four times → 4 promotions. + for _ in 0..3 { + subscribe_n(&mut mgr, 6).await; + } + assert_eq!(mgr.unoptimized_old_stream_count(), 3); + + // 4th promotion triggers optimization. + subscribe_n(&mut mgr, 6).await; + + // After optimization: unoptimized should be empty. + assert_eq!(mgr.unoptimized_old_stream_count(), 0); + // Optimized old streams should exist. + let total_subs = mgr.subscriptions().read().len(); + let expected_optimized = total_subs.div_ceil(10); // ceil(total / MAX_OLD_OPTIMIZED) + assert_eq!(mgr.optimized_old_stream_count(), expected_optimized,); + } + + #[tokio::test] + async fn test_optimization_not_triggered_below_max_unoptimized() { + let (mut mgr, _factory) = create_manager(); + // Exactly MAX_OLD_UNOPTIMIZED (3) promotions. + for _ in 0..3 { + subscribe_n(&mut mgr, 6).await; + } + assert_eq!(mgr.unoptimized_old_stream_count(), 3); + assert_eq!(mgr.optimized_old_stream_count(), 0); + } + + // ------------------------------------------------------------- + // 4. Manual / Interval-Driven Optimization + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_optimize_creates_correct_number_of_optimized_streams() { + let (mut mgr, _factory) = create_manager(); + subscribe_n(&mut mgr, 25).await; + + mgr.optimize(&COMMITMENT); + + // ceil(25 / 10) = 3 + assert_eq!(mgr.optimized_old_stream_count(), 3); + } + + #[tokio::test] + async fn test_optimize_clears_unoptimized_old_streams() { + let (mut mgr, _factory) = create_manager(); + // Create several unoptimized old streams. + for _ in 0..3 { + subscribe_n(&mut mgr, 6).await; + } + assert!(mgr.unoptimized_old_stream_count() > 0); + + mgr.optimize(&COMMITMENT); + + assert_eq!(mgr.unoptimized_old_stream_count(), 0); + assert!(mgr.optimized_old_stream_count() > 0); + } + + #[tokio::test] + async fn test_optimize_resets_current_new_stream() { + let (mut mgr, _factory) = create_manager(); + subscribe_n(&mut mgr, 8).await; + + mgr.optimize(&COMMITMENT); + + assert_eq!(mgr.current_new_sub_count(), 0); + } + + #[tokio::test] + async fn test_optimize_excludes_unsubscribed_pubkeys() { + let (mut mgr, factory) = create_manager(); + let pks = subscribe_n(&mut mgr, 15).await; + + // Unsubscribe 5 of them. + let to_unsub: Vec = pks[0..5].to_vec(); + mgr.account_unsubscribe(&to_unsub); + + let reqs_before = factory.captured_requests().len(); + mgr.optimize(&COMMITMENT); + + // Optimized streams should only contain the 10 remaining + // pubkeys. + let remaining: HashSet = + pks[5..].iter().map(|pk| pk.to_string()).collect(); + let filter_pks = all_filter_pubkeys_from(&factory, reqs_before); + assert_eq!(filter_pks.len(), 10); + for pk in &to_unsub { + assert!( + !filter_pks.contains(&pk.to_string()), + "unsubscribed pubkey {pk} found in optimized filter", + ); + } + for pk_str in &remaining { + assert!( + filter_pks.contains(pk_str), + "expected pubkey {pk_str} missing from optimized filter", + ); + } + } + + #[tokio::test] + async fn test_optimize_with_zero_subscriptions() { + let (mut mgr, _factory) = create_manager(); + let pks = subscribe_n(&mut mgr, 5).await; + mgr.account_unsubscribe(&pks); + + mgr.optimize(&COMMITMENT); + + assert_eq!(mgr.optimized_old_stream_count(), 0); + assert_eq!(mgr.unoptimized_old_stream_count(), 0); + } + + #[tokio::test] + async fn test_optimize_idempotent() { + let (mut mgr, _factory) = create_manager(); + subscribe_n(&mut mgr, 15).await; + + mgr.optimize(&COMMITMENT); + let count_after_first = mgr.optimized_old_stream_count(); + + mgr.optimize(&COMMITMENT); + assert_eq!(mgr.optimized_old_stream_count(), count_after_first,); + } + + // ------------------------------------------------------------- + // 5. Behavior During Optimization + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_subscribe_during_optimization_goes_to_current_new() { + let (mut mgr, _factory) = create_manager(); + subscribe_n(&mut mgr, 20).await; + + mgr.optimize(&COMMITMENT); + + // Subscribe a new pubkey after optimization. + let new_pk = Pubkey::new_unique(); + mgr.account_subscribe(&[new_pk], &COMMITMENT, None) + .await + .unwrap(); + + assert!(mgr.subscriptions().read().contains(&new_pk)); + assert!(mgr.current_new_subs().contains(&new_pk)); + } + + #[tokio::test] + async fn test_no_double_optimization_trigger() { + let (mut mgr, _factory) = create_manager(); + // Fill up to MAX_OLD_UNOPTIMIZED. + for _ in 0..3 { + subscribe_n(&mut mgr, 6).await; + } + assert_eq!(mgr.unoptimized_old_stream_count(), 3); + + // 4th promotion triggers optimization. + subscribe_n(&mut mgr, 6).await; + assert_eq!(mgr.unoptimized_old_stream_count(), 0); + let optimized_after_first = mgr.optimized_old_stream_count(); + + // Now subscribe enough to exceed MAX_SUBS_IN_NEW again, + // causing a promotion. Since optimization just ran, it should + // NOT trigger again immediately. + subscribe_n(&mut mgr, 6).await; + // Unoptimized grows by 1 but no second optimization. + assert!(mgr.unoptimized_old_stream_count() <= 1); + assert_eq!(mgr.optimized_old_stream_count(), optimized_after_first,); + } + + // ------------------------------------------------------------- + // 6. Unsubscribe + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_unsubscribe_removes_from_subscriptions_set() { + let (mut mgr, _factory) = create_manager(); + let pks = make_pubkeys(3); + mgr.account_subscribe(&pks, &COMMITMENT, None) + .await + .unwrap(); + + mgr.account_unsubscribe(&[pks[1]]); + + assert_subscriptions_eq(&mgr, &[pks[0], pks[2]]); + } + + #[test] + fn test_unsubscribe_nonexistent_pubkey_is_noop() { + let (mut mgr, _factory) = create_manager(); + let random = Pubkey::new_unique(); + + mgr.account_unsubscribe(&[random]); + + assert!(mgr.subscriptions().read().is_empty()); + } + + #[tokio::test] + async fn test_unsubscribe_already_unsubscribed_pubkey() { + let (mut mgr, _factory) = create_manager(); + let pk = Pubkey::new_unique(); + mgr.account_subscribe(&[pk], &COMMITMENT, None) + .await + .unwrap(); + + mgr.account_unsubscribe(&[pk]); + mgr.account_unsubscribe(&[pk]); + + assert!(mgr.subscriptions().read().is_empty()); + } + + #[tokio::test] + async fn test_unsubscribe_does_not_modify_streams() { + let (mut mgr, factory) = create_manager(); + let pks = make_pubkeys(4); + mgr.account_subscribe(&pks, &COMMITMENT, None) + .await + .unwrap(); + let calls_before = factory.captured_requests().len(); + + mgr.account_unsubscribe(&pks[0..2]); + + // No new factory calls after unsubscribe. + assert_eq!(factory.captured_requests().len(), calls_before); + // Current-new subs still contain all 4 (streams not updated). + for pk in &pks { + assert!(mgr.current_new_subs().contains(pk)); + } + } + + #[tokio::test] + async fn test_unsubscribe_all_then_optimize_clears_streams() { + let (mut mgr, _factory) = create_manager(); + // Subscribe 8 pubkeys (creates current-new + 1 unoptimized). + let pks = subscribe_n(&mut mgr, 8).await; + mgr.account_unsubscribe(&pks); + + mgr.optimize(&COMMITMENT); + + assert_eq!(mgr.optimized_old_stream_count(), 0); + assert_eq!(mgr.unoptimized_old_stream_count(), 0); + } + + #[tokio::test] + async fn test_unsubscribe_batch() { + let (mut mgr, factory) = create_manager(); + let pks = make_pubkeys(5); + mgr.account_subscribe(&pks, &COMMITMENT, None) + .await + .unwrap(); + let calls_before = factory.captured_requests().len(); + + mgr.account_unsubscribe(&[pks[0], pks[2], pks[4]]); + + assert_subscriptions_eq(&mgr, &[pks[1], pks[3]]); + assert_eq!(factory.captured_requests().len(), calls_before); + } + + // ------------------------------------------------------------- + // 7. Subscription Membership Check + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_is_subscribed_returns_true_for_active() { + let (mut mgr, _factory) = create_manager(); + let pk = Pubkey::new_unique(); + mgr.account_subscribe(&[pk], &COMMITMENT, None) + .await + .unwrap(); + + assert!(mgr.is_subscribed(&pk)); + } + + #[tokio::test] + async fn test_is_subscribed_returns_false_after_unsubscribe() { + let (mut mgr, _factory) = create_manager(); + let pk = Pubkey::new_unique(); + mgr.account_subscribe(&[pk], &COMMITMENT, None) + .await + .unwrap(); + mgr.account_unsubscribe(&[pk]); + + assert!(!mgr.is_subscribed(&pk)); + } + + #[test] + fn test_is_subscribed_returns_false_for_never_subscribed() { + let (mgr, _factory) = create_manager(); + let random = Pubkey::new_unique(); + + assert!(!mgr.is_subscribed(&random)); + } + + // ------------------------------------------------------------- + // 8. Stream Count Across Generations + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_account_stream_count_includes_all_generations() { + let (mut mgr, _factory) = create_manager(); + // Create optimized old streams. + subscribe_n(&mut mgr, 15).await; + mgr.optimize(&COMMITMENT); + + // Create an unoptimized old stream via promotion. + subscribe_n(&mut mgr, 6).await; + + // Current-new also exists from the overflow pubkey. + let count = mgr.account_stream_count(); + assert!(count > 0); + assert_eq!( + count, + mgr.optimized_old_stream_count() + + mgr.unoptimized_old_stream_count() + + 1, // current-new + ); + } + + #[test] + fn test_account_stream_count_zero_when_no_subscriptions() { + let (mgr, _factory) = create_manager(); + assert_eq!(mgr.account_stream_count(), 0); + } + + #[tokio::test] + async fn test_account_stream_count_after_optimize_drops_unoptimized() { + let (mut mgr, _factory) = create_manager(); + // Create unoptimized old streams. + for _ in 0..2 { + subscribe_n(&mut mgr, 6).await; + } + assert!(mgr.unoptimized_old_stream_count() > 0); + + mgr.optimize(&COMMITMENT); + + assert_eq!(mgr.unoptimized_old_stream_count(), 0); + // Only optimized old streams remain (current-new is empty + // after optimize). + assert_eq!( + mgr.account_stream_count(), + mgr.optimized_old_stream_count(), + ); + } + + // ------------------------------------------------------------- + // 9. Edge Cases and Stress + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_subscribe_exactly_at_max_subs_in_new_no_promotion() { + let (mut mgr, _factory) = create_manager(); + // Exactly MAX_NEW (5) pubkeys — should NOT promote. + subscribe_n(&mut mgr, 5).await; + + assert_eq!(mgr.unoptimized_old_stream_count(), 0); + assert_eq!(mgr.account_stream_count(), 1); + } + + #[tokio::test] + async fn test_single_pubkey_optimization() { + let (mut mgr, _factory) = create_manager(); + subscribe_n(&mut mgr, 1).await; + + mgr.optimize(&COMMITMENT); + + assert_eq!(mgr.optimized_old_stream_count(), 1); + assert_eq!(mgr.current_new_sub_count(), 0); + } + + #[tokio::test] + async fn test_subscribe_max_old_optimized_plus_one() { + let (mut mgr, _factory) = create_manager(); + // MAX_OLD_OPTIMIZED + 1 = 11 + subscribe_n(&mut mgr, 11).await; + + mgr.optimize(&COMMITMENT); + + assert_eq!(mgr.optimized_old_stream_count(), 2); + } + + #[tokio::test] + async fn test_large_scale_subscribe_and_optimize() { + let (mut mgr, factory) = create_manager(); + let pks = subscribe_n(&mut mgr, 50).await; + + let reqs_before = factory.captured_requests().len(); + mgr.optimize(&COMMITMENT); + + // ceil(50 / 10) = 5 + assert_eq!(mgr.optimized_old_stream_count(), 5); + assert_eq!(mgr.subscriptions().read().len(), 50); + assert_eq!(mgr.current_new_sub_count(), 0); + + // Verify the union of all optimized stream filters equals all + // 50 pubkeys. + let filter_pks = all_filter_pubkeys_from(&factory, reqs_before); + assert_eq!(filter_pks.len(), 50); + for pk in &pks { + assert!(filter_pks.contains(&pk.to_string())); + } + } + + #[tokio::test] + async fn test_interleaved_subscribe_unsubscribe_then_optimize() { + let (mut mgr, factory) = create_manager(); + let pks = subscribe_n(&mut mgr, 20).await; + // Unsubscribe 8 scattered. + let unsub1: Vec = + pks.iter().step_by(2).take(8).copied().collect(); + mgr.account_unsubscribe(&unsub1); + + // Subscribe 5 new ones. + let new_pks = subscribe_n(&mut mgr, 5).await; + // Unsubscribe 2 of the new ones. + mgr.account_unsubscribe(&new_pks[0..2]); + + let expected_count = 20 - 8 + 5 - 2; + assert_eq!(mgr.subscriptions().read().len(), expected_count); + + let reqs_before = factory.captured_requests().len(); + mgr.optimize(&COMMITMENT); + + let filter_pks = all_filter_pubkeys_from(&factory, reqs_before); + assert_eq!(filter_pks.len(), expected_count); + // Verify unsubscribed pubkeys are absent. + for pk in &unsub1 { + assert!(!filter_pks.contains(&pk.to_string())); + } + for pk in &new_pks[0..2] { + assert!(!filter_pks.contains(&pk.to_string())); + } + } + + #[tokio::test] + async fn test_rapid_subscribe_unsubscribe_same_pubkey() { + let (mut mgr, _factory) = create_manager(); + let pk = Pubkey::new_unique(); + + mgr.account_subscribe(&[pk], &COMMITMENT, None) + .await + .unwrap(); + mgr.account_unsubscribe(&[pk]); + mgr.account_subscribe(&[pk], &COMMITMENT, None) + .await + .unwrap(); + + assert!(mgr.subscriptions().read().contains(&pk)); + assert!(mgr.current_new_subs().contains(&pk)); + } + + // ------------------------------------------------------------- + // 10. Stream Factory Interaction Verification + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_factory_called_with_correct_commitment() { + let (mut mgr, factory) = create_manager(); + let commitment = CommitmentLevel::Finalized; + let pk = Pubkey::new_unique(); + + mgr.account_subscribe(&[pk], &commitment, None) + .await + .unwrap(); + + let reqs = factory.captured_requests(); + assert_eq!(reqs.len(), 1); + assert_eq!( + reqs[0].commitment, + Some(i32::from(CommitmentLevel::Finalized)), + ); + } + + #[tokio::test] + async fn test_factory_called_with_slot_filter() { + let (mut mgr, factory) = create_manager(); + subscribe_n(&mut mgr, 1).await; + + let reqs = factory.captured_requests(); + assert!(!reqs[0].slots.is_empty()); + } + + #[tokio::test] + async fn test_optimize_factory_calls_contain_chunked_pubkeys() { + let (mut mgr, factory) = create_manager(); + subscribe_n(&mut mgr, 15).await; + + let reqs_before = factory.captured_requests().len(); + mgr.optimize(&COMMITMENT); + + let optimize_reqs: Vec<_> = factory + .captured_requests() + .into_iter() + .skip(reqs_before) + .collect(); + assert_eq!(optimize_reqs.len(), 2); + + let first_pks = account_pubkeys_from_request(&optimize_reqs[0]); + let second_pks = account_pubkeys_from_request(&optimize_reqs[1]); + assert_eq!(first_pks.len(), 10); + assert_eq!(second_pks.len(), 5); + + // No overlap. + assert!(first_pks.is_disjoint(&second_pks)); + } + + #[tokio::test] + async fn test_factory_not_called_on_unsubscribe() { + let (mut mgr, factory) = create_manager(); + subscribe_n(&mut mgr, 5).await; + let calls_before = factory.captured_requests().len(); + + let pks: Vec = + mgr.subscriptions().read().iter().take(3).copied().collect(); + mgr.account_unsubscribe(&pks); + + assert_eq!(factory.captured_requests().len(), calls_before); + } + + // ------------------------------------------------------------- + // 11. from_slot Support + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_from_slot_set_on_subscribe_request() { + let (mut mgr, factory) = create_manager(); + let pk = Pubkey::new_unique(); + + mgr.account_subscribe(&[pk], &COMMITMENT, Some(42)) + .await + .unwrap(); + + let reqs = factory.captured_requests(); + assert_eq!(reqs.len(), 1); + assert_eq!(reqs[0].from_slot, Some(42)); + } + + #[tokio::test] + async fn test_from_slot_none_when_not_provided() { + let (mut mgr, factory) = create_manager(); + let pk = Pubkey::new_unique(); + + mgr.account_subscribe(&[pk], &COMMITMENT, None) + .await + .unwrap(); + + let reqs = factory.captured_requests(); + assert_eq!(reqs.len(), 1); + assert_eq!(reqs[0].from_slot, None); + } + + #[tokio::test] + async fn test_from_slot_forwarded_to_handle_write() { + let (mut mgr, factory) = create_manager(); + let pks = make_pubkeys(2); + + // First call creates the stream. + mgr.account_subscribe(&[pks[0]], &COMMITMENT, Some(100)) + .await + .unwrap(); + // Second call updates via handle.write(). + mgr.account_subscribe(&[pks[1]], &COMMITMENT, Some(200)) + .await + .unwrap(); + + let handle_reqs = factory.handle_requests(); + assert_eq!(handle_reqs.len(), 1); + assert_eq!(handle_reqs[0].from_slot, Some(200)); + } + + #[tokio::test] + async fn test_optimize_sets_from_slot_none() { + let (mut mgr, factory) = create_manager(); + mgr.account_subscribe(&make_pubkeys(5), &COMMITMENT, Some(42)) + .await + .unwrap(); + + let reqs_before = factory.captured_requests().len(); + mgr.optimize(&COMMITMENT); + + let optimize_reqs: Vec<_> = factory + .captured_requests() + .into_iter() + .skip(reqs_before) + .collect(); + assert!(!optimize_reqs.is_empty()); + for req in &optimize_reqs { + assert_eq!( + req.from_slot, None, + "optimized streams should have from_slot=None", + ); + } + } + + // --------------------------------------------------------- + // 12. next_update Stream Updates + // --------------------------------------------------------- + + #[tokio::test] + async fn test_next_update_receives_account_updates() { + use std::time::Duration; + + use helius_laserstream::grpc::SubscribeUpdate; + + let (mut mgr, factory) = create_manager(); + subscribe_n(&mut mgr, 2).await; + + factory.push_update_to_stream(0, Ok(SubscribeUpdate::default())); + + let result = + tokio::time::timeout(Duration::from_millis(100), mgr.next_update()) + .await + .expect("next_update timed out"); + + let (source, update) = result.expect("stream ended"); + assert_eq!(source, StreamUpdateSource::Account); + assert!(update.is_ok()); + } + + #[tokio::test] + async fn test_next_update_receives_program_updates() { + use std::time::Duration; + + use helius_laserstream::grpc::SubscribeUpdate; + + let (mut mgr, factory) = create_manager(); + let program_id = Pubkey::new_unique(); + mgr.add_program_subscription(program_id, &COMMITMENT) + .await + .unwrap(); + + factory.push_update_to_stream(0, Ok(SubscribeUpdate::default())); + + let result = + tokio::time::timeout(Duration::from_millis(100), mgr.next_update()) + .await + .expect("next_update timed out"); + + let (source, update) = result.expect("stream ended"); + assert_eq!(source, StreamUpdateSource::Program); + assert!(update.is_ok()); + } + + #[tokio::test] + async fn test_next_update_receives_mixed_account_and_program() { + use std::time::Duration; + + use helius_laserstream::grpc::SubscribeUpdate; + + let (mut mgr, factory) = create_manager(); + + // Account stream → index 0 + subscribe_n(&mut mgr, 2).await; + // Program stream → index 1 + let program_id = Pubkey::new_unique(); + mgr.add_program_subscription(program_id, &COMMITMENT) + .await + .unwrap(); + + factory.push_update_to_stream(0, Ok(SubscribeUpdate::default())); + factory.push_update_to_stream(1, Ok(SubscribeUpdate::default())); + + let mut sources = Vec::new(); + for _ in 0..2 { + let result = tokio::time::timeout( + Duration::from_millis(100), + mgr.next_update(), + ) + .await + .expect("next_update timed out"); + + let (source, update) = result.expect("stream ended"); + assert!(update.is_ok()); + sources.push(source); + } + + assert!( + sources.contains(&StreamUpdateSource::Account), + "expected an Account update", + ); + assert!( + sources.contains(&StreamUpdateSource::Program), + "expected a Program update", + ); + } +} diff --git a/magicblock-chainlink/src/remote_account_provider/errors.rs b/magicblock-chainlink/src/remote_account_provider/errors.rs index 15d7415e2..4284ef2c6 100644 --- a/magicblock-chainlink/src/remote_account_provider/errors.rs +++ b/magicblock-chainlink/src/remote_account_provider/errors.rs @@ -114,6 +114,11 @@ pub enum RemoteAccountProviderError { "The LoaderV4 program {0} account state deserialization failed: {1}" )] LoaderV4StateDeserializationFailed(Pubkey, String), + + #[error( + "Failed to update gRPC subscription to {0} after {1} retries: {2}" + )] + GrpcSubscriptionUpdateFailed(String, usize, String), } impl From for RemoteAccountProviderError