Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/dsx-exchange-consumer/example/config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ client_id = "carbide-dsx-exchange-consumer"
topic_prefix = "BMS/v1"
# Maximum number of messages to buffer before dropping (default: 1024)
queue_capacity = 1024
# How long the MQTT event loop is allowed to stay disconnected from the
# broker before the service exits, relying on Kubernetes to restart the
# pod with a fresh MQTT session (default: 30s). Backstop for the
# consumer stall described in NVBug 6191840 where the client stops
# receiving messages even though the library is still attempting to
# reconnect.
reconnect_exit_threshold = "30s"

# ==============================================================================
# Cache Configuration - TTL settings (uses moka for automatic eviction)
Expand Down
10 changes: 10 additions & 0 deletions crates/dsx-exchange-consumer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ pub struct MqttConfig {
/// Messages are dropped when this limit is exceeded.
pub queue_capacity: usize,

/// How long the MQTT event loop is allowed to stay disconnected from
/// the broker before the service exits, relying on Kubernetes to
/// restart the pod with a fresh MQTT session. Backstop for the
/// consumer stall described in NVBug 6191840 where the client stops
/// receiving messages even though the library is still attempting
/// to reconnect. Defaults to 30s.
#[serde(with = "humantime_serde")]
pub reconnect_exit_threshold: Duration,

#[serde(default)]
pub auth: MqttAuthConfig,
}
Expand All @@ -124,6 +133,7 @@ impl Default for MqttConfig {
client_id: "carbide-dsx-exchange-consumer".to_string(),
topic_prefix: "BMS/v1".to_string(),
queue_capacity: 1024,
reconnect_exit_threshold: Duration::from_secs(30),
auth: MqttAuthConfig::default(),
}
}
Expand Down
8 changes: 7 additions & 1 deletion crates/dsx-exchange-consumer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,13 @@ pub async fn run_service(config: Config) -> Result<(), DsxConsumerError> {
.await
.map_err(|e| DsxConsumerError::Secrets(e.to_string()))?;

// Connect to MQTT and get message receiver
// Connect to MQTT and get message receiver. The mqttea event loop
// is configured via `mqtt.reconnect_exit_threshold` to exit the
// process if it stays continuously disconnected from the broker
// past that duration; Kubernetes then restarts the pod with a
// fresh MQTT session. This is the backstop for the consumer stall
// described in NVBug 6191840 where the client stops receiving
// messages even though the library is still attempting to reconnect.
let rx = mqtt_consumer::connect(
&config.mqtt,
consumer_metrics.clone(),
Expand Down
11 changes: 10 additions & 1 deletion crates/dsx-exchange-consumer/src/mqtt_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ pub enum MqttMessage {
///
/// Sets up the MQTT client, registers message handlers, subscribes to topics,
/// and connects. Returns a receiver that yields messages with drop-on-overflow.
///
/// The underlying mqttea event loop is configured to call
/// `std::process::exit(1)` if it stays continuously disconnected from the
/// broker for `config.reconnect_exit_threshold` — backstop for the
/// consumer stall described in NVBug 6191840 where the client stops
/// receiving messages even though the library is still attempting to
/// reconnect. Kubernetes then restarts the pod with a fresh session.
pub async fn connect(
config: &MqttConfig,
metrics: ConsumerMetrics,
Expand All @@ -56,7 +63,9 @@ pub async fn connect(
// QoS 0 is the recommended setting for DSX Exchange integrations.
// BMS will republish all messages periodically to handle missed messages.
let options = {
let defaults = ClientOptions::default().with_qos(QoS::AtMostOnce);
let defaults = ClientOptions::default()
.with_qos(QoS::AtMostOnce)
.with_exit_after_persistent_disconnect(config.reconnect_exit_threshold);
if let Some(provider) =
build_credentials_provider(config, credential_reader.clone()).await?
{
Expand Down
32 changes: 32 additions & 0 deletions crates/mqttea/src/client/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

use rumqttc::{AsyncClient, Event, EventLoop, MqttOptions, Packet, QoS};
use tokio::sync::{Mutex, RwLock, Semaphore, mpsc};
Expand Down Expand Up @@ -208,11 +209,24 @@ impl MqtteaClient {
let queue_stats_producer = self.queue_stats.clone();
let registry_clone = self.registry.clone();
let credentials_provider = self.credentials_provider.clone();
let exit_after_persistent_disconnect = self
.client_options
.as_ref()
.and_then(|opts| opts.exit_after_persistent_disconnect);
// None while the connection is healthy; Some(t) where t is the
// wall-clock instant of the first event loop error in the
// current outage. Cleared by any successful poll. Local to this
// task — only ever read/written from the event loop body — so no
// synchronization is needed.
let mut first_error_at: Option<Instant> = None;
let mut backoff_strategy = SuperBasicBackoff::new();
tokio::spawn(async move {
loop {
match event_loop.poll().await {
Ok(event) => {
// Any successful poll means we're talking to the
// broker again; clear the outage clock.
first_error_at = None;
if let Event::Incoming(Packet::Publish(publish)) = event {
if let Some(msg) =
ReceivedMessage::from_publish(&publish, registry_clone.clone())
Expand Down Expand Up @@ -256,6 +270,24 @@ impl MqtteaClient {
error!("MQTT event loop connection error: {:?}", e);
queue_stats_producer.increment_event_loop_errors();

// Start (or check) the outage clock. If the
// caller asked us to bail out after a sustained
// disconnect — for example a Kubernetes-deployed
// consumer that needs a process restart to
// re-subscribe — exit so the supervisor brings
// us back up cleanly.
let outage_start = *first_error_at.get_or_insert_with(Instant::now);
if let Some(threshold) = exit_after_persistent_disconnect
&& outage_start.elapsed() >= threshold
{
error!(
disconnected_secs = outage_start.elapsed().as_secs(),
threshold_secs = threshold.as_secs(),
"MQTT event loop disconnected past threshold; exiting so the supervisor restarts the process"
);
std::process::exit(1);
}

// Refresh credentials before reconnection attempt if a provider is configured.
// This ensures we use fresh tokens (e.g., OAuth2) for the next connection.
//
Expand Down
20 changes: 20 additions & 0 deletions crates/mqttea/src/client/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ pub struct ClientOptions {
// processed concurrently. If unset, defaults to 1, which is
// effectively sequential processing.
pub max_concurrency: Option<usize>,
// exit_after_persistent_disconnect, if set, makes the event loop
// call std::process::exit(1) once it has been continuously failing
// to poll the broker for longer than this duration. The recovery
// path is "let the supervisor (e.g. Kubernetes) restart the
// process with a fresh MQTT session" — useful for consumers that
// depend on the broker honoring session resumption and cannot
// afford to silently stay subscribed-to-nothing. If unset, the
// event loop just keeps retrying with backoff (the existing
// behavior).
pub exit_after_persistent_disconnect: Option<Duration>,
}

impl ClientOptions {
Expand Down Expand Up @@ -106,6 +116,16 @@ impl ClientOptions {
self
}

/// Make the event loop call `std::process::exit(1)` once it has been
/// continuously failing to poll the broker for `threshold`. Intended
/// for binaries that run under a supervisor (Kubernetes, systemd)
/// which will restart them, where a stuck MQTT session is worse than
/// a process restart.
pub fn with_exit_after_persistent_disconnect(mut self, threshold: Duration) -> Self {
self.exit_after_persistent_disconnect = Some(threshold);
self
}

/// Set a credentials provider for dynamic credential fetching.
///
/// Use this for OAuth2 or other token-based authentication where
Expand Down
Loading