diff --git a/crates/dsx-exchange-consumer/example/config.example.toml b/crates/dsx-exchange-consumer/example/config.example.toml index 2264633b45..401f5f357c 100644 --- a/crates/dsx-exchange-consumer/example/config.example.toml +++ b/crates/dsx-exchange-consumer/example/config.example.toml @@ -11,6 +11,13 @@ client_id = "carbide-dsx-exchange-consumer" topic_prefix = "BMS/v1" # Maximum number of messages to buffer before dropping (default: 1024) queue_capacity = 1024 +# How long the MQTT event loop is allowed to stay disconnected from the +# broker before the service exits, relying on Kubernetes to restart the +# pod with a fresh MQTT session (default: 30s). Backstop for the +# consumer stall described in NVBug 6191840 where the client stops +# receiving messages even though the library is still attempting to +# reconnect. +reconnect_exit_threshold = "30s" # ============================================================================== # Cache Configuration - TTL settings (uses moka for automatic eviction) diff --git a/crates/dsx-exchange-consumer/src/config.rs b/crates/dsx-exchange-consumer/src/config.rs index 44fbf80540..bab2652368 100644 --- a/crates/dsx-exchange-consumer/src/config.rs +++ b/crates/dsx-exchange-consumer/src/config.rs @@ -112,6 +112,15 @@ pub struct MqttConfig { /// Messages are dropped when this limit is exceeded. pub queue_capacity: usize, + /// How long the MQTT event loop is allowed to stay disconnected from + /// the broker before the service exits, relying on Kubernetes to + /// restart the pod with a fresh MQTT session. Backstop for the + /// consumer stall described in NVBug 6191840 where the client stops + /// receiving messages even though the library is still attempting + /// to reconnect. Defaults to 30s. + #[serde(with = "humantime_serde")] + pub reconnect_exit_threshold: Duration, + #[serde(default)] pub auth: MqttAuthConfig, } @@ -124,6 +133,7 @@ impl Default for MqttConfig { client_id: "carbide-dsx-exchange-consumer".to_string(), topic_prefix: "BMS/v1".to_string(), queue_capacity: 1024, + reconnect_exit_threshold: Duration::from_secs(30), auth: MqttAuthConfig::default(), } } diff --git a/crates/dsx-exchange-consumer/src/lib.rs b/crates/dsx-exchange-consumer/src/lib.rs index 7974d06fb5..6bb424e12c 100644 --- a/crates/dsx-exchange-consumer/src/lib.rs +++ b/crates/dsx-exchange-consumer/src/lib.rs @@ -83,7 +83,13 @@ pub async fn run_service(config: Config) -> Result<(), DsxConsumerError> { .await .map_err(|e| DsxConsumerError::Secrets(e.to_string()))?; - // Connect to MQTT and get message receiver + // Connect to MQTT and get message receiver. The mqttea event loop + // is configured via `mqtt.reconnect_exit_threshold` to exit the + // process if it stays continuously disconnected from the broker + // past that duration; Kubernetes then restarts the pod with a + // fresh MQTT session. This is the backstop for the consumer stall + // described in NVBug 6191840 where the client stops receiving + // messages even though the library is still attempting to reconnect. let rx = mqtt_consumer::connect( &config.mqtt, consumer_metrics.clone(), diff --git a/crates/dsx-exchange-consumer/src/mqtt_consumer.rs b/crates/dsx-exchange-consumer/src/mqtt_consumer.rs index c363f1c6da..22d8444b01 100644 --- a/crates/dsx-exchange-consumer/src/mqtt_consumer.rs +++ b/crates/dsx-exchange-consumer/src/mqtt_consumer.rs @@ -46,6 +46,13 @@ pub enum MqttMessage { /// /// Sets up the MQTT client, registers message handlers, subscribes to topics, /// and connects. Returns a receiver that yields messages with drop-on-overflow. +/// +/// The underlying mqttea event loop is configured to call +/// `std::process::exit(1)` if it stays continuously disconnected from the +/// broker for `config.reconnect_exit_threshold` — backstop for the +/// consumer stall described in NVBug 6191840 where the client stops +/// receiving messages even though the library is still attempting to +/// reconnect. Kubernetes then restarts the pod with a fresh session. pub async fn connect( config: &MqttConfig, metrics: ConsumerMetrics, @@ -56,7 +63,9 @@ pub async fn connect( // QoS 0 is the recommended setting for DSX Exchange integrations. // BMS will republish all messages periodically to handle missed messages. let options = { - let defaults = ClientOptions::default().with_qos(QoS::AtMostOnce); + let defaults = ClientOptions::default() + .with_qos(QoS::AtMostOnce) + .with_exit_after_persistent_disconnect(config.reconnect_exit_threshold); if let Some(provider) = build_credentials_provider(config, credential_reader.clone()).await? { diff --git a/crates/mqttea/src/client/core.rs b/crates/mqttea/src/client/core.rs index 91944a7153..0b703d67a0 100644 --- a/crates/mqttea/src/client/core.rs +++ b/crates/mqttea/src/client/core.rs @@ -24,6 +24,7 @@ use std::collections::HashMap; use std::sync::Arc; +use std::time::Instant; use rumqttc::{AsyncClient, Event, EventLoop, MqttOptions, Packet, QoS}; use tokio::sync::{Mutex, RwLock, Semaphore, mpsc}; @@ -208,11 +209,24 @@ impl MqtteaClient { let queue_stats_producer = self.queue_stats.clone(); let registry_clone = self.registry.clone(); let credentials_provider = self.credentials_provider.clone(); + let exit_after_persistent_disconnect = self + .client_options + .as_ref() + .and_then(|opts| opts.exit_after_persistent_disconnect); + // None while the connection is healthy; Some(t) where t is the + // wall-clock instant of the first event loop error in the + // current outage. Cleared by any successful poll. Local to this + // task — only ever read/written from the event loop body — so no + // synchronization is needed. + let mut first_error_at: Option = None; let mut backoff_strategy = SuperBasicBackoff::new(); tokio::spawn(async move { loop { match event_loop.poll().await { Ok(event) => { + // Any successful poll means we're talking to the + // broker again; clear the outage clock. + first_error_at = None; if let Event::Incoming(Packet::Publish(publish)) = event { if let Some(msg) = ReceivedMessage::from_publish(&publish, registry_clone.clone()) @@ -256,6 +270,24 @@ impl MqtteaClient { error!("MQTT event loop connection error: {:?}", e); queue_stats_producer.increment_event_loop_errors(); + // Start (or check) the outage clock. If the + // caller asked us to bail out after a sustained + // disconnect — for example a Kubernetes-deployed + // consumer that needs a process restart to + // re-subscribe — exit so the supervisor brings + // us back up cleanly. + let outage_start = *first_error_at.get_or_insert_with(Instant::now); + if let Some(threshold) = exit_after_persistent_disconnect + && outage_start.elapsed() >= threshold + { + error!( + disconnected_secs = outage_start.elapsed().as_secs(), + threshold_secs = threshold.as_secs(), + "MQTT event loop disconnected past threshold; exiting so the supervisor restarts the process" + ); + std::process::exit(1); + } + // Refresh credentials before reconnection attempt if a provider is configured. // This ensures we use fresh tokens (e.g., OAuth2) for the next connection. // diff --git a/crates/mqttea/src/client/options.rs b/crates/mqttea/src/client/options.rs index a51c989d8f..3fcc512fbe 100644 --- a/crates/mqttea/src/client/options.rs +++ b/crates/mqttea/src/client/options.rs @@ -67,6 +67,16 @@ pub struct ClientOptions { // processed concurrently. If unset, defaults to 1, which is // effectively sequential processing. pub max_concurrency: Option, + // exit_after_persistent_disconnect, if set, makes the event loop + // call std::process::exit(1) once it has been continuously failing + // to poll the broker for longer than this duration. The recovery + // path is "let the supervisor (e.g. Kubernetes) restart the + // process with a fresh MQTT session" — useful for consumers that + // depend on the broker honoring session resumption and cannot + // afford to silently stay subscribed-to-nothing. If unset, the + // event loop just keeps retrying with backoff (the existing + // behavior). + pub exit_after_persistent_disconnect: Option, } impl ClientOptions { @@ -106,6 +116,16 @@ impl ClientOptions { self } + /// Make the event loop call `std::process::exit(1)` once it has been + /// continuously failing to poll the broker for `threshold`. Intended + /// for binaries that run under a supervisor (Kubernetes, systemd) + /// which will restart them, where a stuck MQTT session is worse than + /// a process restart. + pub fn with_exit_after_persistent_disconnect(mut self, threshold: Duration) -> Self { + self.exit_after_persistent_disconnect = Some(threshold); + self + } + /// Set a credentials provider for dynamic credential fetching. /// /// Use this for OAuth2 or other token-based authentication where