@@ -69,6 +69,7 @@ def __init__(
6969 self ._stop = False
7070 self ._backoff = MIN_BACKOFF_INTERVAL
7171 self ._client : aiomqtt .Client | None = None
72+ self ._client_subscribed_topics : set [str ] = set ()
7273 self ._client_lock = asyncio .Lock ()
7374 self ._listeners : CallbackMap [str , bytes ] = CallbackMap (_LOGGER )
7475 self ._connection_task : asyncio .Task [None ] | None = None
@@ -218,7 +219,7 @@ async def _mqtt_client(self, params: MqttParams) -> aiomqtt.Client:
218219 # Re-establish any existing subscriptions
219220 async with self ._client_lock :
220221 self ._client = client
221- for topic in self ._listeners . keys () :
222+ for topic in self ._client_subscribed_topics :
222223 _LOGGER .debug ("Re-establishing subscription to topic %s" , topic )
223224 # TODO: If this fails it will break the whole connection. Make
224225 # this retry again in the background with backoff.
@@ -249,32 +250,42 @@ async def subscribe(self, topic: str, callback: Callable[[bytes], None]) -> Call
249250 unsub = self ._listeners .add_callback (topic , callback )
250251
251252 async with self ._client_lock :
252- if self ._client :
253- _LOGGER .debug ("Establishing subscription to topic %s" , topic )
254- try :
255- await self ._client .subscribe (topic )
256- except MqttError as err :
257- # Clean up the callback if subscription fails
258- unsub ()
259- raise MqttSessionException (f"Error subscribing to topic: { err } " ) from err
260- else :
261- _LOGGER .debug ("Client not connected, will establish subscription later" )
262-
263- def schedule_unsubscribe ():
253+ if topic not in self ._client_subscribed_topics :
254+ self ._client_subscribed_topics .add (topic )
255+ if self ._client :
256+ _LOGGER .debug ("Establishing subscription to topic %s" , topic )
257+ try :
258+ await self ._client .subscribe (topic )
259+ except MqttError as err :
260+ # Clean up the callback if subscription fails
261+ unsub ()
262+ self ._client_subscribed_topics .discard (topic )
263+ raise MqttSessionException (f"Error subscribing to topic: { err } " ) from err
264+ else :
265+ _LOGGER .debug ("Client not connected, will establish subscription later" )
266+
267+ def schedule_unsubscribe () -> None :
264268 async def idle_unsubscribe ():
265269 try :
266270 await asyncio .sleep (self ._topic_idle_timeout .total_seconds ())
267271 # Only unsubscribe if there are no callbacks left for this topic
268272 if not self ._listeners .get_callbacks (topic ):
269273 async with self ._client_lock :
274+ # Check again if we have listeners, in case a subscribe happened
275+ # while we were waiting for the lock or after we popped the timer.
276+ if self ._listeners .get_callbacks (topic ):
277+ _LOGGER .debug ("Skipping unsubscribe for %s, new listeners added" , topic )
278+ return
279+
280+ self ._idle_timers .pop (topic , None )
281+ self ._client_subscribed_topics .discard (topic )
282+
270283 if self ._client :
271284 _LOGGER .debug ("Idle timeout expired, unsubscribing from topic %s" , topic )
272285 try :
273286 await self ._client .unsubscribe (topic )
274287 except MqttError as err :
275288 _LOGGER .warning ("Error unsubscribing from topic %s: %s" , topic , err )
276- # Clean up timer from dict
277- self ._idle_timers .pop (topic , None )
278289 except asyncio .CancelledError :
279290 _LOGGER .debug ("Idle unsubscribe for topic %s cancelled" , topic )
280291
@@ -286,7 +297,10 @@ def delayed_unsub():
286297 unsub () # Remove the callback from CallbackMap
287298 # If no more callbacks for this topic, start idle timer
288299 if not self ._listeners .get_callbacks (topic ):
300+ _LOGGER .debug ("Unsubscribing topic %s, starting idle timer" , topic )
289301 schedule_unsubscribe ()
302+ else :
303+ _LOGGER .debug ("Unsubscribing topic %s, still have active callbacks" , topic )
290304
291305 return delayed_unsub
292306
0 commit comments