Skip to content

Commit ba57677

Browse files
authored
feat: Implement direct device trait updates from data protocol messages using dps metadata and add corresponding update listeners (#799)
Uses `dps` metadata and add corresponding update listeners. This uses the same dps converter patern used by q10, but does not share code explicitly. This also renames discover_features to start in v1 properties.
1 parent 86b839e commit ba57677

18 files changed

Lines changed: 417 additions & 34 deletions

File tree

docs/DEVICES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ graph LR
357357
1. **Temporary Subscriptions**: Each RPC creates a temporary subscription that matches the request ID
358358
2. **Subscription Reuse**: `MqttSession` keeps subscriptions alive for 60 seconds (or idle timeout) to enable reuse during command bursts
359359
3. **Timeout Handling**: Commands timeout after 10 seconds if no response is received
360-
4. **Multiple Strategies**: `V1Channel` tries local first, then falls back to MQTT if local fails
360+
4. **Multiple Strategies**: `V1Channel` tries connect to both Local faster local commands and MQTT for streaming updates.
361361

362362
## Class Design & Components
363363

roborock/callbacks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def wrapper(value: V) -> None:
2626
try:
2727
callback(value)
2828
except Exception as ex: # noqa: BLE001
29-
logger.error("Uncaught error in callback '%s': %s", callback.__name__, ex)
29+
logger.error("Uncaught error in callback '%s': %s", getattr(callback, "__name__", "Unknown"), ex)
3030

3131
return wrapper
3232

roborock/cli.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ async def _v1_trait(context: RoborockContext, device_id: str, display_func: Call
419419
device = await device_manager.get_device(device_id)
420420
if device.v1_properties is None:
421421
raise RoborockUnsupportedFeature(f"Device {device.name} does not support V1 protocol")
422-
await device.v1_properties.discover_features()
422+
await device.v1_properties.start()
423423
trait = display_func(device.v1_properties)
424424
if trait is None:
425425
raise RoborockUnsupportedFeature("Trait not supported by device")

roborock/data/v1/v1_containers.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,14 @@ class FieldNameBase(StrEnum):
100100

101101

102102
class StatusField(FieldNameBase):
103-
"""An enum that represents a field in the `Status` class.
103+
"""An enum that represents a field in the `StatusV2` class.
104104
105105
This is used with `roborock.devices.traits.v1.status.DeviceFeaturesTrait`
106106
to understand if a feature is supported by the device using `is_field_supported`.
107107
108-
The enum values are names of fields in the `Status` class. Each field is annotated
109-
with a metadata value to determine if the field is supported by the device.
108+
The enum values are names of fields in the `StatusV2` class. Each field is
109+
annotated with `dps` metadata to map the field to a `RoborockDataProtocol`
110+
value used to check support against the product schema.
110111
"""
111112

112113
STATE = "state"
@@ -670,8 +671,9 @@ class ConsumableField(FieldNameBase):
670671
This is used with `roborock.devices.traits.v1.status.DeviceFeaturesTrait`
671672
to understand if a feature is supported by the device using `is_field_supported`.
672673
673-
The enum values are names of fields in the `Consumable` class. Each field is annotated
674-
with a metadata value to determine if the field is supported by the device.
674+
The enum values are names of fields in the `Consumable` class. Each field is
675+
annotated with `dps` metadata to map the field to a `RoborockDataProtocol`
676+
value used to check support against the product schema.
675677
"""
676678

677679
MAIN_BRUSH_WORK_TIME = "main_brush_work_time"

roborock/devices/device.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ async def connect(self) -> None:
199199
unsub = await self._channel.subscribe(self._on_message)
200200
try:
201201
if self.v1_properties is not None:
202-
await self.v1_properties.discover_features()
202+
await self.v1_properties.start()
203203
elif self.b01_q10_properties is not None:
204204
await self.b01_q10_properties.start()
205205
except RoborockException:
@@ -216,6 +216,8 @@ async def close(self) -> None:
216216
await self._connect_task
217217
except asyncio.CancelledError:
218218
pass
219+
if self.v1_properties is not None:
220+
self.v1_properties.close()
219221
if self.b01_q10_properties is not None:
220222
await self.b01_q10_properties.close()
221223
if self._unsub:

roborock/devices/device_manager.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ def device_creator(home_data: HomeData, device: HomeDataDevice, product: HomeDat
236236
channel.rpc_channel,
237237
channel.mqtt_rpc_channel,
238238
channel.map_rpc_channel,
239+
channel.add_dps_listener,
239240
web_api,
240241
device_cache=device_cache,
241242
map_parser_config=map_parser_config,

roborock/devices/rpc/v1_channel.py

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from dataclasses import dataclass
1212
from typing import Any, TypeVar
1313

14+
from roborock.callbacks import CallbackList
1415
from roborock.data import HomeDataDevice, NetworkInfo, RoborockBase, UserData
1516
from roborock.devices.cache import DeviceCache
1617
from roborock.devices.transport.channel import Channel
@@ -30,9 +31,10 @@
3031
V1RpcChannel,
3132
create_map_response_decoder,
3233
create_security_data,
34+
decode_data_protocol_message,
3335
decode_rpc_response,
3436
)
35-
from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol
37+
from roborock.roborock_message import RoborockDataProtocol, RoborockMessage, RoborockMessageProtocol
3638
from roborock.roborock_typing import RoborockCommand
3739
from roborock.util import RoborockLoggerAdapter
3840

@@ -188,6 +190,7 @@ def __init__(
188190
self._device_cache = device_cache
189191
self._reconnect_task: asyncio.Task[None] | None = None
190192
self._last_network_info_refresh: datetime.datetime | None = None
193+
self._dps_listeners = CallbackList[dict[RoborockDataProtocol, Any]](self._logger)
191194

192195
@property
193196
def is_connected(self) -> bool:
@@ -305,12 +308,16 @@ async def subscribe(self, callback: Callable[[RoborockMessage], None]) -> Callab
305308
loop = asyncio.get_running_loop()
306309
self._reconnect_task = loop.create_task(self._background_reconnect())
307310

308-
if not self.is_local_connected:
309-
# We were not able to connect locally, so fallback to MQTT and at least
310-
# establish that connection explicitly. If this fails then raise an
311-
# error and let the caller know we failed to subscribe.
311+
# We maintain an active MQTT subscription even when connected locally to receive
312+
# unsolicited status updates (DPS push messages) directly from the cloud.
313+
try:
312314
self._mqtt_unsub = await self._mqtt_channel.subscribe(self._on_mqtt_message)
313-
self._logger.debug("V1Channel connected to device via MQTT")
315+
except RoborockException as err:
316+
if not self.is_local_connected:
317+
# Propagate error if both local and MQTT failed
318+
self._logger.debug("MQTT connection also failed: %s", err)
319+
raise
320+
self._logger.debug("MQTT subscription failed, continuing with local-only connection: %s", err)
314321

315322
def unsub() -> None:
316323
"""Unsubscribe from all messages."""
@@ -328,6 +335,16 @@ def unsub() -> None:
328335
self._callback = callback
329336
return unsub
330337

338+
def add_dps_listener(self, listener: Callable[[dict[RoborockDataProtocol, Any]], None]) -> Callable[[], None]:
339+
"""Add a listener for DPS updates.
340+
341+
This will attach a listener to the existing subscription, invoking
342+
the listener whenever new DPS values arrive from the subscription.
343+
This will only work if a subscription has already been setup, which is
344+
handled by the device start.
345+
"""
346+
return self._dps_listeners.add_callback(listener)
347+
331348
async def _get_networking_info(self, *, prefer_cache: bool = True) -> NetworkInfo:
332349
"""Retrieve networking information for the device.
333350
@@ -428,6 +445,14 @@ def _on_mqtt_message(self, message: RoborockMessage) -> None:
428445
self._logger.debug("V1Channel received MQTT message: %s", message)
429446
if self._callback:
430447
self._callback(message)
448+
try:
449+
datapoints = decode_data_protocol_message(message)
450+
except RoborockException as e:
451+
self._logger.debug("Error decoding data protocol message: %s", e)
452+
return
453+
454+
if datapoints:
455+
self._dps_listeners(datapoints)
431456

432457
def _on_local_message(self, message: RoborockMessage) -> None:
433458
"""Handle incoming local messages."""

roborock/devices/traits/v1/__init__.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,18 @@
5353
"""
5454

5555
import logging
56+
from collections.abc import Callable
5657
from dataclasses import dataclass, field, fields
5758
from typing import Any, get_args
5859

5960
from roborock.data.containers import HomeData, HomeDataProduct, RoborockBase
6061
from roborock.data.v1.v1_code_mappings import RoborockDockTypeCode
6162
from roborock.devices.cache import DeviceCache
6263
from roborock.devices.traits import Trait
64+
from roborock.exceptions import RoborockException
6365
from roborock.map.map_parser import MapParserConfig
64-
from roborock.protocols.v1_protocol import V1RpcChannel
66+
from roborock.protocols.v1_protocol import V1RpcChannel, decode_data_protocol_message
67+
from roborock.roborock_message import RoborockDataProtocol, RoborockMessage
6568
from roborock.web_api import UserWebApiClient
6669

6770
from . import (
@@ -176,6 +179,7 @@ def __init__(
176179
rpc_channel: V1RpcChannel,
177180
mqtt_rpc_channel: V1RpcChannel,
178181
map_rpc_channel: V1RpcChannel,
182+
add_dps_listener: Callable[[Callable[[dict[RoborockDataProtocol, Any]], None]], Callable[[], None]],
179183
web_api: UserWebApiClient,
180184
device_cache: DeviceCache,
181185
map_parser_config: MapParserConfig | None = None,
@@ -189,6 +193,8 @@ def __init__(
189193
self._web_api = web_api
190194
self._device_cache = device_cache
191195
self._region = region
196+
self._unsub: Callable[[], None] | None = None
197+
self._add_dps_listener = add_dps_listener
192198

193199
self.device_features = DeviceFeaturesTrait(product, self._device_cache)
194200
self.status = StatusTrait(self.device_features, region=self._region)
@@ -227,6 +233,29 @@ def _get_rpc_channel(self, trait: V1TraitMixin) -> V1RpcChannel:
227233
else:
228234
return self._rpc_channel
229235

236+
async def start(self) -> None:
237+
"""Start the properties API and discover features."""
238+
if self._unsub:
239+
return
240+
await self.discover_features()
241+
self._unsub = self._add_dps_listener(self._on_dps_update)
242+
243+
def close(self) -> None:
244+
if self._unsub:
245+
self._unsub()
246+
self._unsub = None
247+
248+
def _on_dps_update(self, dps: dict[RoborockDataProtocol, Any]) -> None:
249+
"""Handle incoming messages from the device.
250+
251+
This will notify all traits of the new values. This can be improved in
252+
the future to be dynamic when we have more traits that support dynamic
253+
updates but for now we just invoke them manually.
254+
"""
255+
_LOGGER.debug("Received message from device: %s", dps)
256+
self.status.update_from_dps(dps)
257+
self.consumables.update_from_dps(dps)
258+
230259
async def discover_features(self) -> None:
231260
"""Populate any supported traits that were not initialized in __init__."""
232261
_LOGGER.debug("Starting optional trait discovery")
@@ -330,6 +359,7 @@ def create(
330359
rpc_channel: V1RpcChannel,
331360
mqtt_rpc_channel: V1RpcChannel,
332361
map_rpc_channel: V1RpcChannel,
362+
add_dps_listener: Callable[[Callable[[dict[RoborockDataProtocol, Any]], None]], Callable[[], None]],
333363
web_api: UserWebApiClient,
334364
device_cache: DeviceCache,
335365
map_parser_config: MapParserConfig | None = None,
@@ -343,6 +373,7 @@ def create(
343373
rpc_channel,
344374
mqtt_rpc_channel,
345375
map_rpc_channel,
376+
add_dps_listener,
346377
web_api,
347378
device_cache,
348379
map_parser_config,

roborock/devices/traits/v1/consumeable.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,24 @@
44
periodically, such as filters, brushes, etc.
55
"""
66

7+
import logging
78
from enum import StrEnum
8-
from typing import Self
9+
from typing import Any, Self
910

1011
from roborock.data import Consumable
12+
from roborock.devices.traits.common import DpsDataConverter, TraitUpdateListener
1113
from roborock.devices.traits.v1 import common
14+
from roborock.roborock_message import RoborockDataProtocol
1215
from roborock.roborock_typing import RoborockCommand
1316

1417
__all__ = [
1518
"ConsumableTrait",
1619
]
1720

21+
_LOGGER = logging.getLogger(__name__)
22+
23+
_DPS_CONVERTER = DpsDataConverter.from_dataclass(Consumable)
24+
1825

1926
class ConsumableAttribute(StrEnum):
2027
"""Enum for consumable attributes."""
@@ -35,7 +42,7 @@ def from_str(cls, value: str) -> Self:
3542
raise ValueError(f"Unknown ConsumableAttribute: {value}")
3643

3744

38-
class ConsumableTrait(Consumable, common.V1TraitMixin):
45+
class ConsumableTrait(Consumable, common.V1TraitMixin, TraitUpdateListener):
3946
"""Trait for managing consumable attributes on Roborock devices.
4047
4148
After the first refresh, you can tell what consumables are supported by
@@ -45,7 +52,21 @@ class ConsumableTrait(Consumable, common.V1TraitMixin):
4552
command = RoborockCommand.GET_CONSUMABLE
4653
converter = common.DefaultConverter(Consumable)
4754

55+
def __init__(self) -> None:
56+
"""Initialize the consumable trait."""
57+
super().__init__()
58+
TraitUpdateListener.__init__(self, logger=_LOGGER)
59+
4860
async def reset_consumable(self, consumable: ConsumableAttribute) -> None:
4961
"""Reset a specific consumable attribute on the device."""
5062
await self.rpc_channel.send_command(RoborockCommand.RESET_CONSUMABLE, params=[consumable.value])
5163
await self.refresh()
64+
65+
def update_from_dps(self, decoded_dps: dict[RoborockDataProtocol, Any]) -> None:
66+
"""Update the trait from data protocol push message data.
67+
68+
This handles unsolicited status updates pushed by the device
69+
via RoborockDataProtocol codes (e.g. STATE=121, BATTERY=122).
70+
"""
71+
if _DPS_CONVERTER.update_from_dps(self, decoded_dps):
72+
self._notify_update()

roborock/devices/traits/v1/status.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import logging
12
from functools import cached_property
3+
from typing import Any
24

35
from roborock import (
46
CleanRoutes,
@@ -10,13 +12,19 @@
1012
get_water_mode_mapping,
1113
get_water_modes,
1214
)
15+
from roborock.devices.traits.common import DpsDataConverter, TraitUpdateListener
16+
from roborock.roborock_message import RoborockDataProtocol
1317
from roborock.roborock_typing import RoborockCommand
1418

1519
from . import common
1620
from .device_features import DeviceFeaturesTrait
1721

22+
_LOGGER = logging.getLogger(__name__)
1823

19-
class StatusTrait(StatusV2, common.V1TraitMixin):
24+
_DPS_CONVERTER = DpsDataConverter.from_dataclass(StatusV2)
25+
26+
27+
class StatusTrait(StatusV2, common.V1TraitMixin, TraitUpdateListener):
2028
"""Trait for managing the status of Roborock devices.
2129
2230
The StatusTrait gives you the access to the state of a Roborock vacuum.
@@ -47,6 +55,7 @@ class StatusTrait(StatusV2, common.V1TraitMixin):
4755
def __init__(self, device_feature_trait: DeviceFeaturesTrait, region: str | None = None) -> None:
4856
"""Initialize the StatusTrait."""
4957
super().__init__()
58+
TraitUpdateListener.__init__(self, logger=_LOGGER)
5059
self._device_features_trait = device_feature_trait
5160
self._region = region
5261

@@ -91,3 +100,12 @@ def mop_route_name(self) -> str | None:
91100
if self.mop_mode is None:
92101
return None
93102
return self.mop_route_mapping.get(self.mop_mode)
103+
104+
def update_from_dps(self, decoded_dps: dict[RoborockDataProtocol, Any]) -> None:
105+
"""Update the trait from data protocol push message data.
106+
107+
This handles unsolicited status updates pushed by the device
108+
via RoborockDataProtocol codes (e.g. STATE=121, BATTERY=122).
109+
"""
110+
if _DPS_CONVERTER.update_from_dps(self, decoded_dps):
111+
self._notify_update()

0 commit comments

Comments
 (0)