@@ -80,6 +80,8 @@ def __init__(self, user_data: UserData, device_info: DeviceData) -> None:
8080 self ._mutex = Lock ()
8181 self ._decoder : Decoder = create_mqtt_decoder (device_info .device .local_key )
8282 self ._encoder : Encoder = create_mqtt_encoder (device_info .device .local_key )
83+ self .previous_attempt_was_subscribe = False
84+ self ._topic = f"rr/m/o/{ self ._mqtt_user } /{ self ._hashed_user } /{ self .device_info .device .duid } "
8385
8486 def _mqtt_on_connect (
8587 self ,
@@ -105,19 +107,19 @@ def _mqtt_on_connect(
105107 self ._logger .debug ("Failed to notify connect future, not in queue" )
106108 return
107109 self ._logger .info (f"Connected to mqtt { self ._mqtt_host } :{ self ._mqtt_port } " )
108- topic = f"rr/m/o/{ self ._mqtt_user } /{ self ._hashed_user } /{ self .device_info .device .duid } "
109- (result , mid ) = self ._mqtt_client .subscribe (topic )
110+ (result , mid ) = self ._mqtt_client .subscribe (self ._topic )
110111 if result != 0 :
111112 message = f"Failed to subscribe ({ str (rc )} )"
112113 self ._logger .error (message )
113114 if connection_queue :
114115 connection_queue .set_exception (VacuumError (message ))
115116 return
116- self ._logger .info (f"Subscribed to topic { topic } " )
117+ self ._logger .info (f"Subscribed to topic { self . _topic } " )
117118 if connection_queue :
118119 connection_queue .set_result (True )
119120
120121 def _mqtt_on_message (self , * args , ** kwargs ):
122+ self .previous_attempt_was_subscribe = False
121123 client , __ , msg = args
122124 try :
123125 messages = self ._decoder (msg .payload )
@@ -199,4 +201,34 @@ def _send_msg_raw(self, msg: bytes) -> None:
199201 f"rr/m/i/{ self ._mqtt_user } /{ self ._hashed_user } /{ self .device_info .device .duid } " , msg
200202 )
201203 if info .rc != mqtt .MQTT_ERR_SUCCESS :
202- raise RoborockException (f"Failed to publish ({ str (info .rc )} )" )
204+ raise RoborockException (f"Failed to publish ({ mqtt .error_string (info .rc )} )" )
205+
206+ async def validate_connection (self ) -> None :
207+ """Override the default validate connection to try to re-subscribe rather than disconnect."""
208+ if self .previous_attempt_was_subscribe :
209+ # If we have already tried to unsub and resub, and we are still in this state,
210+ # we should just do the normal validate connection.
211+ return await super ().validate_connection ()
212+ try :
213+ if not self .should_keepalive ():
214+ self .previous_attempt_was_subscribe = True
215+ loop = asyncio .get_running_loop ()
216+
217+ self ._logger .info ("Resetting Roborock connection due to keepalive timeout" )
218+ (result , mid ) = await loop .run_in_executor (None , self ._mqtt_client .unsubscribe , self ._topic )
219+
220+ if result != 0 :
221+ message = f"Failed to unsubscribe ({ mqtt .error_string (result )} )"
222+ self ._logger .error (message )
223+ return await super ().validate_connection ()
224+ (result , mid ) = await loop .run_in_executor (None , self ._mqtt_client .subscribe , self ._topic )
225+
226+ if result != 0 :
227+ message = f"Failed to subscribe ({ mqtt .error_string (result )} )"
228+ self ._logger .error (message )
229+ return await super ().validate_connection ()
230+
231+ self ._logger .info (f"Subscribed to topic { self ._topic } " )
232+ except Exception : # noqa
233+ return await super ().validate_connection ()
234+ await self .async_connect ()
0 commit comments