11"""Modules for communicating with specific Roborock devices over MQTT."""
22
3- import asyncio
43import logging
54from collections .abc import Callable
6- from json import JSONDecodeError
75
86from roborock .containers import HomeDataDevice , RRiot , UserData
97from roborock .exceptions import RoborockException
10- from roborock .mqtt .session import MqttParams , MqttSession
8+ from roborock .mqtt .session import MqttParams , MqttSession , MqttSessionException
119from roborock .protocol import create_mqtt_decoder , create_mqtt_encoder
1210from roborock .roborock_message import RoborockMessage
1311
@@ -30,17 +28,16 @@ def __init__(self, mqtt_session: MqttSession, duid: str, local_key: str, rriot:
3028 self ._rriot = rriot
3129 self ._mqtt_params = mqtt_params
3230
33- # RPC support
34- self ._waiting_queue : dict [int , asyncio .Future [RoborockMessage ]] = {}
3531 self ._decoder = create_mqtt_decoder (local_key )
3632 self ._encoder = create_mqtt_encoder (local_key )
37- self ._queue_lock = asyncio .Lock ()
38- self ._mqtt_unsub : Callable [[], None ] | None = None
3933
4034 @property
4135 def is_connected (self ) -> bool :
42- """Return true if the channel is connected."""
43- return (self ._mqtt_unsub is not None ) and self ._mqtt_session .connected
36+ """Return true if the channel is connected.
37+
38+ This passes through the underlying MQTT session's connected state.
39+ """
40+ return self ._mqtt_session .connected
4441
4542 @property
4643 def _publish_topic (self ) -> str :
@@ -57,9 +54,6 @@ async def subscribe(self, callback: Callable[[RoborockMessage], None]) -> Callab
5754
5855 The callback will be called with the message payload when a message is received.
5956
60- All messages received will be processed through the provided callback, even
61- those sent in response to the `send_command` command.
62-
6357 Returns a callable that can be used to unsubscribe from the topic.
6458 """
6559
@@ -69,75 +63,29 @@ def message_handler(payload: bytes) -> None:
6963 return
7064 for message in messages :
7165 _LOGGER .debug ("Received message: %s" , message )
72- if message .version != b"B01" :
73- asyncio .create_task (self ._resolve_future_with_lock (message ))
7466 try :
7567 callback (message )
7668 except Exception as e :
7769 _LOGGER .exception ("Uncaught error in message handler callback: %s" , e )
7870
79- self ._mqtt_unsub = await self ._mqtt_session .subscribe (self ._subscribe_topic , message_handler )
80-
81- def unsub_wrapper () -> None :
82- if self ._mqtt_unsub is not None :
83- self ._mqtt_unsub ()
84- self ._mqtt_unsub = None
85-
86- return unsub_wrapper
87-
88- async def _resolve_future_with_lock (self , message : RoborockMessage ) -> None :
89- """Resolve waiting future with proper locking."""
90- if (request_id := message .get_request_id ()) is None :
91- _LOGGER .debug ("Received message with no request_id" )
92- return
93- async with self ._queue_lock :
94- if (future := self ._waiting_queue .pop (request_id , None )) is not None :
95- future .set_result (message )
96- else :
97- _LOGGER .debug ("Received message with no waiting handler: request_id=%s" , request_id )
71+ return await self ._mqtt_session .subscribe (self ._subscribe_topic , message_handler )
9872
99- async def send_message_no_wait (self , message : RoborockMessage ) -> None :
100- """Send a command message without waiting for a response."""
101- try :
102- encoded_msg = self ._encoder (message )
103- await self ._mqtt_session .publish (self ._publish_topic , encoded_msg )
104- except Exception :
105- logging .exception ("Uncaught error sending command" )
106- raise
107-
108- async def send_message (self , message : RoborockMessage , timeout : float = 10.0 ) -> RoborockMessage :
109- """Send a command message and wait for the response message.
73+ async def publish (self , message : RoborockMessage ) -> None :
74+ """Publish a command message.
11075
111- Returns the raw response message - caller is responsible for parsing.
76+ The caller is responsible for handling any responses and associating them
77+ with the incoming request.
11278 """
113- try :
114- if (request_id := message .get_request_id ()) is None :
115- raise RoborockException ("Message must have a request_id for RPC calls" )
116- except (ValueError , JSONDecodeError ) as err :
117- _LOGGER .exception ("Error getting request_id from message: %s" , err )
118- raise RoborockException (f"Invalid message format, Message must have a request_id: { err } " ) from err
119-
120- future : asyncio .Future [RoborockMessage ] = asyncio .Future ()
121- async with self ._queue_lock :
122- if request_id in self ._waiting_queue :
123- raise RoborockException (f"Request ID { request_id } already pending, cannot send command" )
124- self ._waiting_queue [request_id ] = future
125-
12679 try :
12780 encoded_msg = self ._encoder (message )
128- await self ._mqtt_session .publish (self ._publish_topic , encoded_msg )
129-
130- return await asyncio .wait_for (future , timeout = timeout )
131-
132- except asyncio .TimeoutError as ex :
133- async with self ._queue_lock :
134- self ._waiting_queue .pop (request_id , None )
135- raise RoborockException (f"Command timed out after { timeout } s" ) from ex
136- except Exception :
137- logging .exception ("Uncaught error sending command" )
138- async with self ._queue_lock :
139- self ._waiting_queue .pop (request_id , None )
140- raise
81+ except Exception as e :
82+ _LOGGER .exception ("Error encoding MQTT message: %s" , e )
83+ raise RoborockException (f"Failed to encode MQTT message: { e } " ) from e
84+ try :
85+ return await self ._mqtt_session .publish (self ._publish_topic , encoded_msg )
86+ except MqttSessionException as e :
87+ _LOGGER .exception ("Error publishing MQTT message: %s" , e )
88+ raise RoborockException (f"Failed to publish MQTT message: { e } " ) from e
14189
14290
14391def create_mqtt_channel (
0 commit comments