-
Notifications
You must be signed in to change notification settings - Fork 57
fix: add a health manager for restarting unhealthy mqtt connections #605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds a health monitoring system for MQTT connections to automatically restart sessions when repeated timeouts indicate an unhealthy connection. The implementation includes a callback-based health manager that tracks consecutive timeouts and triggers restarts with cooldown protection.
Key Changes
- Introduced
HealthManagerclass that monitors MQTT connection health by tracking consecutive timeouts and triggering restarts after a threshold (3 timeouts) with a 30-minute cooldown period - Refactored
RoborockMqttSessionto support explicit restarts by splitting the connection logic into separate reconnect loop and connection tasks - Integrated health manager into V1 RPC channel to automatically restart MQTT sessions when command timeouts occur
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| roborock/mqtt/health_manager.py | New health manager class that tracks timeouts and triggers restarts with cooldown |
| roborock/mqtt/session.py | Added abstract restart method to the MqttSession interface |
| roborock/mqtt/roborock_session.py | Refactored connection handling to support restarts, splitting logic into reconnect loop and connection tasks |
| roborock/devices/mqtt_channel.py | Added restart method that delegates to the underlying MQTT session |
| roborock/devices/v1_rpc_channel.py | Integrated health manager to track RPC timeouts and trigger session restarts |
| tests/mqtt/test_health_manager.py | Comprehensive tests for health manager timeout tracking, success resets, and cooldown behavior |
| tests/mqtt/test_roborock_session.py | Added test for session restart functionality and fast backoff fixture |
| tests/conftest.py | Added restart mock method to FakeChannel test fixture |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Advance time past cooldown | ||
| mock_datetime.datetime.now.return_value = now + datetime.timedelta(minutes=31) | ||
|
|
||
| # Trigger timeouts again | ||
| await health_manager.on_timeout() | ||
| await health_manager.on_timeout() | ||
| await health_manager.on_timeout() |
Copilot
AI
Nov 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] After the restart is triggered and the cooldown timer is set (lines 51-55), the consecutive timeout counter is reset to 0. When three more timeouts occur (lines 61-63), the counter reaches the threshold of 3 again, but restart isn't called due to the cooldown check.
However, the consecutive timeout counter remains at 3 after line 64. When time advances past the cooldown (line 67) and the test triggers three more timeouts (lines 70-72), the counter would go from 3 to 6, not from 0 to 3.
This works in the current test because the implementation still triggers a restart when _consecutive_timeouts >= TIMEOUT_THRESHOLD, but it's testing a slightly different scenario than intended. Consider adding an assertion after line 64 or resetting expectations to make the test behavior clearer:
# After cooldown period, counter is still at 3, so even one timeout would trigger restart
# Advance time past cooldown
mock_datetime.datetime.now.return_value = now + datetime.timedelta(minutes=31)
await health_manager.on_timeout() # Counter now at 4, triggers restart
restart.assert_called_once()| # Advance time past cooldown | |
| mock_datetime.datetime.now.return_value = now + datetime.timedelta(minutes=31) | |
| # Trigger timeouts again | |
| await health_manager.on_timeout() | |
| await health_manager.on_timeout() | |
| await health_manager.on_timeout() | |
| # The consecutive timeout counter is now at 3 | |
| assert health_manager._consecutive_timeouts == 3 | |
| # Advance time past cooldown | |
| mock_datetime.datetime.now.return_value = now + datetime.timedelta(minutes=31) | |
| # Even a single timeout now triggers restart | |
| await health_manager.on_timeout() |
| from collections.abc import Awaitable, Callable | ||
|
|
||
| # Number of consecutive timeouts before considering the connection unhealthy. | ||
| TIMEOUT_THRESHOLD = 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would technically trigger immediately with gathered function calls. But anything more than this is probably too much complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
potential follow up could be to keep track of the last timeout. if the timeout is more than say 15 seconds ago, it increases the increment. Could be a follow up PR though, I don't want to slow this one down as we are on a time crunch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we're currently sending most commands serially, but yeah this kind of heuristic is hard. i was also considering if we could do it entirely in the mqtt session but hard when you can't correlate incoming and outgoing messages to know if something really did timeout.
Not sure if i get the timeout point you're making but interested in following up.
This adds the ability to restart an mqtt session and adds a callback based health manager that handles restarting the session when the number of repeated timeouts for the mqtt channel reaches a threshold.