@@ -27,13 +27,9 @@ def __init__(self, user_data: UserData, device_info: DeviceData):
2727 self ._logger = RoborockLoggerAdapter (device_info .device .name , _LOGGER )
2828 self ._mqtt_endpoint = base64 .b64encode (Utils .md5 (user_data .rriot .k .encode ())[8 :14 ]).decode ()
2929 rriot = user_data .rriot
30- self ._mqtt_user = rriot .u
31- self ._hashed_user = md5hex (self ._mqtt_user + ":" + rriot .k )[2 :10 ]
30+ hashed_user = md5hex (rriot .u + ":" + rriot .k )[2 :10 ]
3231 url = urlparse (rriot .r .m )
33- self ._mqtt_host = str (url )
34- self ._mqtt_port = url .port
3532 mqtt_password = rriot .s
36- self ._hashed_password = md5hex (mqtt_password + ":" + rriot .k )[16 :]
3733
3834 self ._local_endpoint = "abc"
3935 self ._nonce = secrets .token_bytes (16 )
@@ -43,6 +39,18 @@ def __init__(self, user_data: UserData, device_info: DeviceData):
4339 self ._dnd_trait : DndTrait | None = self .determine_supported_traits (DndTrait )
4440 self ._consumable_trait : ConsumableTrait | None = self .determine_supported_traits (ConsumableTrait )
4541 self ._status_type : type [Status ] = ModelStatus .get (device_info .model , S7MaxVStatus )
42+ # TODO: One per client EVER
43+ self .session = RoborockMqttSession (
44+ MqttParams (
45+ host = str (url .hostname ),
46+ port = url .port ,
47+ tls = True ,
48+ username = hashed_user ,
49+ password = md5hex (rriot .s + ":" + rriot .k )[16 :],
50+ )
51+ )
52+ self .input_topic = f"rr/m/i/{ rriot .u } /{ hashed_user } /{ device_info .duid } "
53+ self .output_topic = f"rr/m/o/{ rriot .u } /{ hashed_user } /{ device_info .duid } "
4654
4755 def determine_supported_traits (self , trait : type [DeviceTrait ]):
4856 def _send_command (
@@ -59,16 +67,9 @@ def _send_command(
5967
6068 async def connect (self ):
6169 """Connect via MQTT and Local if possible."""
62-
63- MqttParams (
64- host = self ._mqtt_host ,
65- port = self ._mqtt_port ,
66- tls = True ,
67- username = self ._hashed_user ,
68- password = self ._hashed_password ,
69- )
70- await self .manager .subscribe (self .user_data , self .device_info , self .on_message )
71- await self .update ()
70+ if not self .session .connected :
71+ await self .session .start ()
72+ await self .session .subscribe (self .output_topic , callback = self .on_message )
7273
7374 async def update (self ):
7475 for trait in self ._all_supported_traits :
@@ -117,39 +118,41 @@ async def send_message(
117118 local_key = self .device_info .device .local_key
118119 msg = MessageParser .build (roborock_message , local_key , False )
119120 if use_cloud :
120- await self .manager .publish (self .user_data , self . device_info , msg )
121+ await self .session .publish (self .input_topic , msg )
121122 else :
122123 # Handle doing local commands
123124 pass
124125
125- def on_message (self , message : RoborockMessage ):
126- message_payload = message .get_payload ()
127- message_id = message .get_request_id ()
128- for data_point_number , data_point in message_payload .get ("dps" ).items ():
129- if data_point_number == "102" :
130- data_point_response = json .loads (data_point )
131- result = data_point_response .get ("result" )
132- if isinstance (result , list ) and len (result ) == 1 :
133- result = result [0 ]
134- if result and (trait := self ._message_id_types .get (message_id )) is not None :
135- trait .on_message (result )
136- if (error := result .get ("error" )) is not None :
137- print (error )
138- print ()
139- # If message is command not supported - remove from self.update_commands
140-
141- # If message is an error - log it?
142-
143- # If message is 'ok' - ignore it
144-
145- # If message is anything else - store ids, and map back to id to determine message type.
146- # Then update self.data
147-
148- # If we haven't received a message in X seconds, the device is likely offline. I think we can continue the connection,
149- # but we should have some way to mark ourselves as unavailable.
150-
151- # This should also probably be split with on_cloud_message and on_local_message.
152- print (message )
126+ def on_message (self , message_bytes : bytes ):
127+ messages = MessageParser .parse (message_bytes , self .device_info .device .local_key )[0 ]
128+ for message in messages :
129+ message_payload = message .get_payload ()
130+ message_id = message .get_request_id ()
131+ for data_point_number , data_point in message_payload .get ("dps" ).items ():
132+ if data_point_number == "102" :
133+ data_point_response = json .loads (data_point )
134+ result = data_point_response .get ("result" )
135+ if isinstance (result , list ) and len (result ) == 1 :
136+ result = result [0 ]
137+ if result and (trait := self ._message_id_types .get (message_id )) is not None :
138+ trait .on_message (result )
139+ if (error := result .get ("error" )) is not None :
140+ print (error )
141+ print ()
142+ # If message is command not supported - remove from self.update_commands
143+
144+ # If message is an error - log it?
145+
146+ # If message is 'ok' - ignore it
147+
148+ # If message is anything else - store ids, and map back to id to determine message type.
149+ # Then update self.data
150+
151+ # If we haven't received a message in X seconds, the device is likely offline. I think we can continue the connection,
152+ # but we should have some way to mark ourselves as unavailable.
153+
154+ # This should also probably be split with on_cloud_message and on_local_message.
155+ print (message )
153156
154157 @property
155158 def dnd (self ) -> DndTrait | None :
0 commit comments