From 0a40a7b012c055e65b42376d6f4a7b4b1ccff1ef Mon Sep 17 00:00:00 2001 From: Allen Porter Date: Sat, 6 Dec 2025 09:27:52 -0800 Subject: [PATCH 1/4] fix: Fix mqtt rate limiting and broken local connections This ensures that Ensure V1RpcChannels respect current device state. --- roborock/devices/v1_channel.py | 36 +++++++++++------ tests/devices/test_v1_channel.py | 67 ++++++++++++++++++++++++++------ 2 files changed, 80 insertions(+), 23 deletions(-) diff --git a/roborock/devices/v1_channel.py b/roborock/devices/v1_channel.py index 15b51a83..3c63049f 100644 --- a/roborock/devices/v1_channel.py +++ b/roborock/devices/v1_channel.py @@ -70,9 +70,9 @@ class RpcStrategy: class RpcChannel(V1RpcChannel): """Provides an RPC interface around a pub/sub transport channel.""" - def __init__(self, rpc_strategies: list[RpcStrategy]) -> None: + def __init__(self, rpc_strategies_cb: Callable[[], list[RpcStrategy]]) -> None: """Initialize the RpcChannel with on ordered list of strategies.""" - self._rpc_strategies = rpc_strategies + self._rpc_strategies_cb = rpc_strategies_cb async def send_command( self, @@ -86,7 +86,7 @@ async def send_command( # Try each channel in order until one succeeds last_exception = None - for strategy in self._rpc_strategies: + for strategy in self._rpc_strategies_cb(): try: decoded_response = await self._send_rpc(strategy, request) except RoborockException as e: @@ -203,23 +203,35 @@ def is_mqtt_connected(self) -> bool: @property def rpc_channel(self) -> V1RpcChannel: - """Return the combined RPC channel that prefers local with a fallback to MQTT.""" - strategies = [] - if local_rpc_strategy := self._create_local_rpc_strategy(): - strategies.append(local_rpc_strategy) - strategies.append(self._create_mqtt_rpc_strategy()) - return RpcChannel(strategies) + """Return the combined RPC channel that prefers local with a fallback to MQTT. + + The returned V1RpcChannel may be long lived and will respect the + current connection state of the underlying channels. + """ + + def rpc_strategies_cb() -> list[RpcStrategy]: + strategies = [] + if local_rpc_strategy := self._create_local_rpc_strategy(): + strategies.append(local_rpc_strategy) + strategies.append(self._create_mqtt_rpc_strategy()) + return strategies + + return RpcChannel(rpc_strategies_cb) @property def mqtt_rpc_channel(self) -> V1RpcChannel: - """Return the MQTT-only RPC channel.""" - return RpcChannel([self._create_mqtt_rpc_strategy()]) + """Return the MQTT-only RPC channel. + + The returned V1RpcChannel may be long lived and will respect the + current connection state of the underlying channels. + """ + return RpcChannel(lambda: [self._create_mqtt_rpc_strategy()]) @property def map_rpc_channel(self) -> V1RpcChannel: """Return the map RPC channel used for fetching map content.""" decoder = create_map_response_decoder(security_data=self._security_data) - return RpcChannel([self._create_mqtt_rpc_strategy(decoder)]) + return RpcChannel(lambda: [self._create_mqtt_rpc_strategy(decoder)]) def _create_local_rpc_strategy(self) -> RpcStrategy | None: """Create the RPC strategy for local transport.""" diff --git a/tests/devices/test_v1_channel.py b/tests/devices/test_v1_channel.py index 794c8620..857c0b9e 100644 --- a/tests/devices/test_v1_channel.py +++ b/tests/devices/test_v1_channel.py @@ -22,7 +22,7 @@ create_mqtt_decoder, create_mqtt_encoder, ) -from roborock.protocols.v1_protocol import MapResponse, SecurityData +from roborock.protocols.v1_protocol import MapResponse, SecurityData, V1RpcChannel from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol from roborock.roborock_typing import RoborockCommand @@ -141,6 +141,29 @@ def setup_v1_channel( ) +@pytest.fixture(name="rpc_channel") +def setup_rpc_channel(v1_channel: V1Channel) -> V1RpcChannel: + """Fixture to set up the RPC channel for tests. + + We expect tests to use this to send commands via the V1Channel since we + want to exercise the behavior that the V1RpcChannel is long lived and + respects the current state of the underlying channels. + """ + return v1_channel.rpc_channel + + +@pytest.fixture(name="mqtt_rpc_channel") +def setup_mqtt_rpc_channel(v1_channel: V1Channel) -> V1RpcChannel: + """Fixture to set up the MQTT RPC channel for tests.""" + return v1_channel.mqtt_rpc_channel + + +@pytest.fixture(name="map_rpc_channel") +def setup_map_rpc_channel(v1_channel: V1Channel) -> V1RpcChannel: + """Fixture to set up the Map RPC channel for tests.""" + return v1_channel.map_rpc_channel + + @pytest.fixture(name="warning_caplog") def setup_warning_caplog(caplog: pytest.LogCaptureFixture) -> pytest.LogCaptureFixture: """Fixture to capture warning messages.""" @@ -274,6 +297,7 @@ async def test_v1_channel_send_command_local_preferred( v1_channel: V1Channel, mock_mqtt_channel: Mock, mock_local_channel: Mock, + rpc_channel: V1RpcChannel, ) -> None: """Test command sending prefers local connection when available.""" # Establish connections @@ -282,7 +306,7 @@ async def test_v1_channel_send_command_local_preferred( # Send command mock_local_channel.response_queue.append(TEST_RESPONSE) - result = await v1_channel.rpc_channel.send_command( + result = await rpc_channel.send_command( RoborockCommand.GET_STATUS, response_type=S5MaxStatus, ) @@ -295,6 +319,7 @@ async def test_v1_channel_send_command_local_fails( v1_channel: V1Channel, mock_mqtt_channel: Mock, mock_local_channel: Mock, + rpc_channel: V1RpcChannel, ) -> None: """Test case where sending with local connection fails, falling back to MQTT.""" @@ -310,7 +335,7 @@ async def test_v1_channel_send_command_local_fails( mock_mqtt_channel.response_queue.append(TEST_RESPONSE) # Send command - result = await v1_channel.rpc_channel.send_command( + result = await rpc_channel.send_command( RoborockCommand.GET_STATUS, response_type=S5MaxStatus, ) @@ -327,21 +352,37 @@ async def test_v1_channel_send_command_local_fails( assert mock_mqtt_channel.published_messages[-1].protocol == RoborockMessageProtocol.RPC_REQUEST -async def test_v1_channel_send_decoded_command_mqtt_only( +@pytest.mark.parametrize( + ("local_channel_side_effect", "local_channel_responses", "mock_mqtt_channel_responses"), + [ + # Local fails immediately, MQTT succeeds + (RoborockException("Local failed"), [], [TEST_RESPONSE]), + # Local returns no response, MQTT succeeds + (None, [], [TEST_RESPONSE]), + # Local returns invalid response, MQTT succeeds + # (None, [RoborockMessage(protocol=RoborockMessageProtocol.RPC_RESPONSE, payload=b"invalid")], [TEST_RESPONSE]), + ], +) +async def test_v1_channel_send_pick_first_available( v1_channel: V1Channel, + rpc_channel: V1RpcChannel, mock_mqtt_channel: Mock, mock_local_channel: Mock, + local_channel_side_effect: Exception | None, + local_channel_responses: list[RoborockMessage], + mock_mqtt_channel_responses: list[RoborockMessage], ) -> None: """Test command sending works with MQTT only.""" # Setup: only MQTT connection mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE) - mock_local_channel.connect.side_effect = RoborockException("No local") + mock_local_channel.connect.side_effect = local_channel_side_effect await v1_channel.subscribe(Mock()) # Send command - mock_mqtt_channel.response_queue.append(TEST_RESPONSE) - result = await v1_channel.rpc_channel.send_command( + mock_mqtt_channel.response_queue.extend(mock_mqtt_channel_responses) + mock_local_channel.response_queue.extend(local_channel_responses) + result = await rpc_channel.send_command( RoborockCommand.GET_STATUS, response_type=S5MaxStatus, ) @@ -352,6 +393,7 @@ async def test_v1_channel_send_decoded_command_mqtt_only( async def test_v1_channel_send_decoded_command_with_params( v1_channel: V1Channel, + rpc_channel: V1RpcChannel, mock_mqtt_channel: Mock, mock_local_channel: Mock, ) -> None: @@ -363,7 +405,7 @@ async def test_v1_channel_send_decoded_command_with_params( # Send command with params mock_local_channel.response_queue.append(TEST_RESPONSE) test_params = {"volume": 80} - await v1_channel.rpc_channel.send_command( + await rpc_channel.send_command( RoborockCommand.CHANGE_SOUND_VOLUME, response_type=S5MaxStatus, params=test_params, @@ -492,6 +534,8 @@ async def test_v1_channel_local_connect_network_info_failure_fallback_to_cache( async def test_v1_channel_command_encoding_validation( v1_channel: V1Channel, + mqtt_rpc_channel: V1RpcChannel, + rpc_channel: V1RpcChannel, mock_mqtt_channel: Mock, mock_local_channel: Mock, ) -> None: @@ -501,13 +545,13 @@ async def test_v1_channel_command_encoding_validation( # Send mqtt command and capture the request mock_mqtt_channel.response_queue.append(TEST_RESPONSE) - await v1_channel.mqtt_rpc_channel.send_command(RoborockCommand.CHANGE_SOUND_VOLUME, params={"volume": 50}) + await mqtt_rpc_channel.send_command(RoborockCommand.CHANGE_SOUND_VOLUME, params={"volume": 50}) assert mock_mqtt_channel.published_messages mqtt_message = mock_mqtt_channel.published_messages[0] # Send local command and capture the request mock_local_channel.response_queue.append(TEST_RESPONSE_2) - await v1_channel.rpc_channel.send_command(RoborockCommand.CHANGE_SOUND_VOLUME, params={"volume": 50}) + await rpc_channel.send_command(RoborockCommand.CHANGE_SOUND_VOLUME, params={"volume": 50}) assert mock_local_channel.published_messages local_message = mock_local_channel.published_messages[0] @@ -522,6 +566,7 @@ async def test_v1_channel_command_encoding_validation( async def test_v1_channel_send_map_command( v1_channel: V1Channel, + map_rpc_channel: V1RpcChannel, mock_mqtt_channel: Mock, mock_create_map_response_decoder: Mock, ) -> None: @@ -546,7 +591,7 @@ async def test_v1_channel_send_map_command( mock_mqtt_channel.response_queue.append(map_response_message) # Send the command and get the result - result = await v1_channel.map_rpc_channel.send_command(RoborockCommand.GET_MAP_V1) + result = await map_rpc_channel.send_command(RoborockCommand.GET_MAP_V1) # Verify the result is the data from our mocked decoder assert result == decompressed_map_data From 418eb9f34037e77bd19083ab89c640ba327bde74 Mon Sep 17 00:00:00 2001 From: Allen Porter Date: Sat, 6 Dec 2025 10:26:06 -0800 Subject: [PATCH 2/4] chore: Update roborock/devices/v1_channel.py Fix typo Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- roborock/devices/v1_channel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/roborock/devices/v1_channel.py b/roborock/devices/v1_channel.py index 3c63049f..5235415b 100644 --- a/roborock/devices/v1_channel.py +++ b/roborock/devices/v1_channel.py @@ -71,7 +71,7 @@ class RpcChannel(V1RpcChannel): """Provides an RPC interface around a pub/sub transport channel.""" def __init__(self, rpc_strategies_cb: Callable[[], list[RpcStrategy]]) -> None: - """Initialize the RpcChannel with on ordered list of strategies.""" + """Initialize the RpcChannel with an ordered list of strategies.""" self._rpc_strategies_cb = rpc_strategies_cb async def send_command( From 352a0b30f3ae49cde9fc5f2391cc0ed05801d111 Mon Sep 17 00:00:00 2001 From: Allen Porter Date: Sat, 6 Dec 2025 10:27:50 -0800 Subject: [PATCH 3/4] chore: add back test case and add test ids --- tests/devices/test_v1_channel.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/devices/test_v1_channel.py b/tests/devices/test_v1_channel.py index 857c0b9e..3a88f375 100644 --- a/tests/devices/test_v1_channel.py +++ b/tests/devices/test_v1_channel.py @@ -355,13 +355,15 @@ async def test_v1_channel_send_command_local_fails( @pytest.mark.parametrize( ("local_channel_side_effect", "local_channel_responses", "mock_mqtt_channel_responses"), [ - # Local fails immediately, MQTT succeeds (RoborockException("Local failed"), [], [TEST_RESPONSE]), - # Local returns no response, MQTT succeeds (None, [], [TEST_RESPONSE]), - # Local returns invalid response, MQTT succeeds - # (None, [RoborockMessage(protocol=RoborockMessageProtocol.RPC_RESPONSE, payload=b"invalid")], [TEST_RESPONSE]), + (None, [RoborockMessage(protocol=RoborockMessageProtocol.RPC_RESPONSE, payload=b"invalid")], [TEST_RESPONSE]), ], + ids=[ + "local-fails-mqtt-succeeds", + "local-no-response-mqtt-succeeds", + "local-invalid-response-mqtt-succeeds", + ] ) async def test_v1_channel_send_pick_first_available( v1_channel: V1Channel, From 855d0f14a10b96d07acd6dff640b837f2eb90fd5 Mon Sep 17 00:00:00 2001 From: Allen Porter Date: Sat, 6 Dec 2025 10:31:04 -0800 Subject: [PATCH 4/4] chore: fix lint errors --- tests/devices/test_v1_channel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/devices/test_v1_channel.py b/tests/devices/test_v1_channel.py index 3a88f375..2b87a358 100644 --- a/tests/devices/test_v1_channel.py +++ b/tests/devices/test_v1_channel.py @@ -363,7 +363,7 @@ async def test_v1_channel_send_command_local_fails( "local-fails-mqtt-succeeds", "local-no-response-mqtt-succeeds", "local-invalid-response-mqtt-succeeds", - ] + ], ) async def test_v1_channel_send_pick_first_available( v1_channel: V1Channel,