Skip to content

Commit 4ad95dd

Browse files
authored
fix: Share a HealthManager instance across all mqtt channels (#672)
The motiviation is to better isolate single device timeouts from overall session health. I have not seen this as an explicit issue, but seems worth being defensive against. This will ensure that a single inactive device will not unnecessarily restart the MQTT session. Any successful RPCs by other devices will reset the counter. Adds explicit logging when a session is reset by the health manager.
1 parent ecde2a3 commit 4ad95dd

File tree

6 files changed

+37
-3
lines changed

6 files changed

+37
-3
lines changed

roborock/devices/mqtt_channel.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from roborock.callbacks import decoder_callback
77
from roborock.data import HomeDataDevice, RRiot, UserData
88
from roborock.exceptions import RoborockException
9+
from roborock.mqtt.health_manager import HealthManager
910
from roborock.mqtt.session import MqttParams, MqttSession, MqttSessionException
1011
from roborock.protocol import create_mqtt_decoder, create_mqtt_encoder
1112
from roborock.roborock_message import RoborockMessage
@@ -42,6 +43,11 @@ def is_connected(self) -> bool:
4243
"""
4344
return self._mqtt_session.connected
4445

46+
@property
47+
def health_manager(self) -> HealthManager:
48+
"""Return the health manager for the session."""
49+
return self._mqtt_session.health_manager
50+
4551
@property
4652
def is_local_connected(self) -> bool:
4753
"""Return true if the channel is connected locally."""

roborock/devices/v1_channel.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,6 @@ def __init__(
181181
self._logger = RoborockLoggerAdapter(device_uid, _LOGGER)
182182
self._security_data = security_data
183183
self._mqtt_channel = mqtt_channel
184-
self._mqtt_health_manager = HealthManager(self._mqtt_channel.restart)
185184
self._local_session = local_session
186185
self._local_channel: LocalChannel | None = None
187186
self._mqtt_unsub: Callable[[], None] | None = None
@@ -272,7 +271,7 @@ def _create_mqtt_rpc_strategy(self, decoder: Callable[[RoborockMessage], Any] =
272271
security_data=self._security_data,
273272
),
274273
decoder=decoder,
275-
health_manager=self._mqtt_health_manager,
274+
health_manager=self._mqtt_channel.health_manager,
276275
)
277276

278277
async def subscribe(self, callback: Callable[[RoborockMessage], None]) -> Callable[[], None]:

roborock/mqtt/health_manager.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@
66
"""
77

88
import datetime
9+
import logging
910
from collections.abc import Awaitable, Callable
1011

12+
_LOGGER = logging.getLogger(__name__)
13+
1114
# Number of consecutive timeouts before considering the connection unhealthy.
1215
TIMEOUT_THRESHOLD = 3
1316

@@ -45,7 +48,13 @@ async def on_timeout(self) -> None:
4548
self._consecutive_timeouts += 1
4649
if self._consecutive_timeouts >= TIMEOUT_THRESHOLD:
4750
now = datetime.datetime.now(datetime.UTC)
48-
if self._last_restart is None or now - self._last_restart >= RESTART_COOLDOWN:
51+
since_last = (now - self._last_restart) if self._last_restart else None
52+
if since_last is None or since_last >= RESTART_COOLDOWN:
53+
_LOGGER.debug(
54+
"Restarting MQTT session after %d consecutive timeouts (duration since last restart %s)",
55+
self._consecutive_timeouts,
56+
since_last,
57+
)
4958
await self._restart()
5059
self._last_restart = now
5160
self._consecutive_timeouts = 0

roborock/mqtt/roborock_session.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
from roborock.callbacks import CallbackMap
2121

22+
from .health_manager import HealthManager
2223
from .session import MqttParams, MqttSession, MqttSessionException, MqttSessionUnauthorized
2324

2425
_LOGGER = logging.getLogger(__name__)
@@ -75,12 +76,18 @@ def __init__(
7576
self._connection_task: asyncio.Task[None] | None = None
7677
self._topic_idle_timeout = topic_idle_timeout
7778
self._idle_timers: dict[str, asyncio.Task[None]] = {}
79+
self._health_manager = HealthManager(self.restart)
7880

7981
@property
8082
def connected(self) -> bool:
8183
"""True if the session is connected to the broker."""
8284
return self._healthy
8385

86+
@property
87+
def health_manager(self) -> HealthManager:
88+
"""Return the health manager for the session."""
89+
return self._health_manager
90+
8491
async def start(self) -> None:
8592
"""Start the MQTT session.
8693
@@ -337,6 +344,11 @@ def connected(self) -> bool:
337344
"""True if the session is connected to the broker."""
338345
return self._session.connected
339346

347+
@property
348+
def health_manager(self) -> HealthManager:
349+
"""Return the health manager for the session."""
350+
return self._session.health_manager
351+
340352
async def _maybe_start(self) -> None:
341353
"""Start the MQTT session if not already started."""
342354
async with self._lock:

roborock/mqtt/session.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from dataclasses import dataclass
66

77
from roborock.exceptions import RoborockException
8+
from roborock.mqtt.health_manager import HealthManager
89

910
DEFAULT_TIMEOUT = 30.0
1011

@@ -40,6 +41,11 @@ class MqttSession(ABC):
4041
def connected(self) -> bool:
4142
"""True if the session is connected to the broker."""
4243

44+
@property
45+
@abstractmethod
46+
def health_manager(self) -> HealthManager:
47+
"""Return the health manager for the session."""
48+
4349
@abstractmethod
4450
async def subscribe(self, device_id: str, callback: Callable[[bytes], None]) -> Callable[[], None]:
4551
"""Invoke the callback when messages are received on the topic.

tests/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from roborock import HomeData, UserData
1515
from roborock.data import DeviceData
16+
from roborock.mqtt.health_manager import HealthManager
1617
from roborock.protocols.v1_protocol import LocalProtocolVersion
1718
from roborock.roborock_message import RoborockMessage
1819
from roborock.version_1_apis.roborock_local_client_v1 import RoborockLocalClientV1
@@ -364,6 +365,7 @@ def __init__(self):
364365
self.close = MagicMock(side_effect=self._close)
365366
self.protocol_version = LocalProtocolVersion.V1
366367
self.restart = AsyncMock()
368+
self.health_manager = HealthManager(self.restart)
367369

368370
async def _connect(self) -> None:
369371
self._is_connected = True

0 commit comments

Comments
 (0)