From b42202dacb1be6fcd3e9e8751b80079251184c65 Mon Sep 17 00:00:00 2001 From: Allen Porter Date: Thu, 4 Dec 2025 21:32:45 -0800 Subject: [PATCH 1/2] chore: Update devices documentation with design details This documents the current code base as is with a higher level overview. This may deserve a better home after we've removed the original API, but seems Ok fr now. --- roborock/devices/README.md | 425 ++++++++++++++++++++++++++++++++++++- 1 file changed, 416 insertions(+), 9 deletions(-) diff --git a/roborock/devices/README.md b/roborock/devices/README.md index 9496b49d..769a7d64 100644 --- a/roborock/devices/README.md +++ b/roborock/devices/README.md @@ -4,6 +4,78 @@ The devices module provides functionality to discover Roborock devices on the network. This section documents the full lifecycle of device discovery across Cloud and Network. +## Architecture Overview + +The library is organized into distinct layers, each with a specific responsibility: + +```mermaid +graph TB + subgraph "Application Layer" + User[Application Code] + end + + subgraph "Device Management Layer" + DM[DeviceManager] + Traits[Device Traits] + end + + subgraph "Channel Layer" + V1C[V1Channel] + RPC[RpcChannel] + MC[MqttChannel] + LC[LocalChannel] + end + + subgraph "Session Layer" + MS[MqttSession] + LS[LocalSession Factory] + end + + subgraph "Protocol Layer" + Proto[Protocol Encoders/Decoders] + V1P[V1 Protocol] + A01P[A01 Protocol] + B01P[B01 Protocol] + end + + subgraph "Transport Layer" + MQTT[MQTT Broker] + TCP[TCP Socket] + end + + User --> DM + User --> Traits + DM --> V1C + Traits --> RPC + V1C --> RPC + RPC --> MC + RPC --> LC + MC --> MS + LC --> LS + MS --> Proto + LC --> Proto + Proto --> V1P + Proto --> A01P + Proto --> B01P + MS --> MQTT + LC --> TCP + + style User fill:#e1f5ff + style DM fill:#fff4e1 + style V1C fill:#ffe1e1 + style RPC fill:#ffe1e1 + style MS fill:#e1ffe1 + style Proto fill:#f0e1ff +``` + +### Layer Responsibilities + +1. **Device Management Layer**: High-level device discovery and lifecycle management +2. **Channel Layer**: Request/response correlation and connection management +3. **Session Layer**: Connection pooling and subscription management +4. **Protocol Layer**: Message encoding/decoding for different device versions +5. **Transport Layer**: Low-level MQTT and TCP communication + ## Init account setup ### Login @@ -42,6 +114,64 @@ that a newer version of the API should be used. ## Device Connections +### Connection Flow + +The following diagram shows how a device connection is established and how commands flow through the system: + +```mermaid +sequenceDiagram + participant App as Application + participant DM as DeviceManager + participant V1C as V1Channel + participant RPC as RpcChannel + participant MC as MqttChannel + participant LC as LocalChannel + participant MS as MqttSession + participant Broker as MQTT Broker + participant Device as Vacuum Device + + App->>DM: create_device_manager() + DM->>MS: Create MQTT Session + MS->>Broker: Connect + Broker-->>MS: Connected + + App->>DM: get_devices() + DM->>V1C: Create V1Channel + V1C->>MC: Create MqttChannel + V1C->>LC: Create LocalChannel (if available) + + Note over V1C: Subscribe to device topics + V1C->>MC: subscribe() + MC->>MS: subscribe(topic, callback) + MS->>Broker: SUBSCRIBE + + Note over V1C: Fetch network info via MQTT + V1C->>RPC: send_command(GET_NETWORK_INFO) + RPC->>MC: publish(request) + MC->>MS: publish(topic, message) + MS->>Broker: PUBLISH + Broker->>Device: Command + Device->>Broker: Response + Broker->>MS: Message + MS->>MC: callback(message) + MC->>RPC: decoded message + RPC-->>V1C: NetworkInfo + + Note over V1C: Connect locally using IP from NetworkInfo + V1C->>LC: connect() + LC->>Device: TCP Connect :58867 + Device-->>LC: Connected + + Note over V1C: Send command (prefers local) + App->>V1C: send_command(GET_STATUS) + V1C->>RPC: send_command() + RPC->>LC: publish(request) [Try local first] + LC->>Device: Command + Device->>LC: Response + LC->>RPC: decoded message + RPC-->>App: Status +``` + ### MQTT connection - Initial device information must be obtained from MQTT @@ -53,16 +183,180 @@ that a newer version of the API should be used. - Incoming and outgoing messages are decoded/encoded using the device `local_key` - Otherwise all commands may be performed locally. -## Local connection +### Local connection - We can use the `ip` from the `NetworkingInfo` to find the device - The local connection is preferred to for improved latency and reducing load on the cloud servers to avoid rate limiting. - Connections are made using a normal TCP socket on port `58867` - Incoming and outgoing messages are decoded/encoded using the device `local_key` -- Messages received on the stream may be partially received so we keep a running as messages are partially decoded +- Messages received on the stream may be partially received so we keep a running buffer as messages are partially decoded + +### RPC Pattern + +The library uses a publish/subscribe model for both MQTT and local connections, with an RPC abstraction on top: + +```mermaid +graph LR + subgraph "RPC Layer" + A[send_command] -->|1. Create request| B[Encoder] + B -->|2. Subscribe for response| C[Channel.subscribe] + B -->|3. Publish request| D[Channel.publish] + C -->|4. Wait for match| E[find_response callback] + E -->|5. Match request_id| F[Future.set_result] + F -->|6. Return| G[Command Result] + end + + subgraph "Channel Layer" + C --> H[Subscription Map] + D --> I[Transport] + I --> J[Device] + J --> K[Incoming Messages] + K --> H + H --> E + end +``` + +**Key Design Points:** + +1. **Temporary Subscriptions**: Each RPC creates a temporary subscription that matches the request ID +2. **Subscription Reuse**: `MqttSession` keeps subscriptions alive for 60 seconds (or idle timeout) to enable reuse during command bursts +3. **Timeout Handling**: Commands timeout after 10 seconds if no response is received +4. **Multiple Strategies**: `V1Channel` tries local first, then falls back to MQTT if local fails ## Design +### Current Architecture + +The current design separates concerns into distinct layers: + +```mermaid +classDiagram + class Channel { + <> + +subscribe(callback) Callable + +publish(message) + +is_connected() bool + } + + class MqttChannel { + -MqttSession session + -duid: str + -local_key: str + +subscribe(callback) + +publish(message) + } + + class LocalChannel { + -host: str + -transport: Transport + -local_key: str + +connect() + +subscribe(callback) + +publish(message) + +close() + } + + class V1Channel { + -MqttChannel mqtt_channel + -LocalChannel local_channel + -RpcChannel rpc_channel + +send_command(method, params) + +subscribe(callback) + } + + class RpcChannel { + -List~RpcStrategy~ strategies + +send_command(method, params) + } + + class RpcStrategy { + +name: str + +channel: Channel + +encoder: Callable + +decoder: Callable + +health_manager: HealthManager + } + + class MqttSession { + -Client client + -dict listeners + -dict idle_timers + +subscribe(topic, callback) + +publish(topic, payload) + +close() + } + + Channel <|-- MqttChannel + Channel <|-- LocalChannel + Channel <|-- V1Channel + MqttChannel --> MqttSession + V1Channel --> MqttChannel + V1Channel --> LocalChannel + V1Channel --> RpcChannel + RpcChannel --> RpcStrategy + RpcStrategy --> Channel +``` + +### Key Components + +#### Channel Interface + +The `Channel` abstraction provides a uniform interface for both MQTT and local connections: + +- **`subscribe(callback)`**: Register a callback for incoming messages +- **`publish(message)`**: Send a message to the device +- **`is_connected`**: Check connection status + +This abstraction allows the RPC layer to work identically over both transports. + +#### MqttSession + +The `MqttSession` manages a shared MQTT connection for all devices: + +- **Subscription Pooling**: Multiple callbacks can subscribe to the same topic +- **Idle Timeout**: Keeps subscriptions alive for 10 seconds after the last callback unsubscribes +- **Reconnection**: Automatically reconnects and re-establishes subscriptions on connection loss +- **Thread-Safe**: Uses asyncio primitives for safe concurrent access + +#### RpcChannel with Multiple Strategies + +The `RpcChannel` implements the request/response pattern over pub/sub channels: + +```python +# Example: V1Channel tries local first, then MQTT +strategies = [ + RpcStrategy(name="local", channel=local_channel, ...), + RpcStrategy(name="mqtt", channel=mqtt_channel, ...), +] +rpc_channel = RpcChannel(strategies) +``` + +For each command: +1. Try the first strategy (local) +2. If it fails, try the next strategy (MQTT) +3. Return the first successful result + +#### Health Management + +Each strategy can have a `HealthManager` that tracks success/failure: + +- **Exponential Backoff**: After failures, wait before retrying +- **Automatic Recovery**: Periodically attempt to restore failed connections +- **Network Info Refresh**: Refresh local IP addresses after extended periods + +### Protocol Versions + +Different device models use different protocol versions: + +| Protocol | Devices | Encoding | +|----------|---------|----------| +| V1 | Most vacuum robots | JSON RPC with AES encryption | +| A01 | Dyad, Zeo | DPS-based protocol | +| B01 | Some newer models | DPS-based protocol | +| L01 | Local protocol variant | Binary protocol negotiation | + +The protocol layer handles encoding/decoding transparently based on the device's `pv` field. + ### Prior API Issues - Complex Inheritance Hierarchy: Multiple inheritance with classes like RoborockMqttClientV1 inheriting from both RoborockMqttClient and RoborockClientV1 @@ -77,9 +371,9 @@ that a newer version of the API should be used. - Manual Connection Management: Users need to manually set up both MQTT and local clients as shown in the README example -## Design Changes +### Design Goals -- Prefer a single unfieid client that handles both MQTT and local connections internally. +- Prefer a single unified client that handles both MQTT and local connections internally. - Home and device discovery (fetching home data and device setup) will be behind a single API. @@ -87,14 +381,127 @@ that a newer version of the API should be used. - The clients should be working in terms of devices. We need to detect capabilities for each device and not expose details about API versions. -- Reliability issues: The current Home Assistant integration has issues with reliability and needs to be simplified. It may be that there are bugs with the exception handling and it's too heavy the cloud APIs and could benefit from more seamless caching. +- Reliability issues: The current Home Assistant integration has issues with reliability and needs to be simplified. It may be that there are bugs with the exception handling and it's too heavy on the cloud APIs and could benefit from more seamless caching. + +### Migration from Legacy APIs + +The library previously had: +- Separate `RoborockMqttClientV1` and `RoborockLocalClientV1` classes +- Manual connection management +- Callback-heavy design with `on_message_received` +- Complex inheritance hierarchies + +The new design: +- `DeviceManager` handles all connection management +- `V1Channel` automatically manages both MQTT and local +- Asyncio-first with minimal callbacks +- Clear separation of concerns through layers +- Users work with devices, not raw clients + +**Note**: Legacy APIs in `version_1_apis/` and `version_a01_apis/` are deprecated and will be removed. + ## Implementation Details -- We don't really need to worry about backwards compatibility for the new set of APIs. +### Code Organization + +``` +roborock/ +├── devices/ # Device management and channels +│ ├── device_manager.py # High-level device lifecycle +│ ├── channel.py # Base Channel interface +│ ├── mqtt_channel.py # MQTT channel implementation +│ ├── local_channel.py # Local TCP channel implementation +│ ├── v1_channel.py # V1 protocol channel with RPC strategies +│ ├── a01_channel.py # A01 protocol helpers +│ ├── b01_channel.py # B01 protocol helpers +│ └── traits/ # Device-specific command traits +│ └── v1/ # V1 device traits +│ ├── __init__.py # Trait initialization +│ ├── clean.py # Cleaning commands +│ ├── map.py # Map management +│ └── ... +├── mqtt/ # MQTT session management +│ ├── session.py # Base session interface +│ └── roborock_session.py # MQTT session with idle timeout +├── protocols/ # Protocol encoders/decoders +│ ├── v1_protocol.py # V1 JSON RPC protocol +│ ├── a01_protocol.py # A01 protocol +│ ├── b01_protocol.py # B01 protocol +│ └── ... +└── data/ # Data containers and mappings + ├── containers.py # Status, HomeData, etc. + └── v1/ # V1-specific data structures +``` + +### Threading Model + +The library is **asyncio-only** with no threads: + +- All I/O is non-blocking using `asyncio` +- No thread synchronization needed (single event loop) +- Callbacks are executed in the event loop +- Use `asyncio.create_task()` for background work + +### Error Handling + +```mermaid +graph TD + A[send_command] --> B{Local Available?} + B -->|Yes| C[Try Local] + B -->|No| D[Try MQTT] + C --> E{Success?} + E -->|Yes| F[Return Result] + E -->|No| G{Timeout?} + G -->|Yes| H[Update Health Manager] + H --> D + G -->|No| I{Connection Error?} + I -->|Yes| J[Mark Connection Failed] + J --> D + I -->|No| D + D --> K{Success?} + K -->|Yes| F + K -->|No| L[Raise RoborockException] +``` + +**Exception Types:** + +- `RoborockException`: Base exception for all library errors +- `RoborockConnectionException`: Connection-related failures +- `RoborockTimeout`: Command timeout (10 seconds) + +### Caching Strategy + +To reduce API calls and improve reliability: + +1. **Home Data**: Cached on disk, refreshed periodically +2. **Network Info**: Cached for 12 hours +3. **Device Capabilities**: Detected once and cached +4. **MQTT Subscriptions**: Kept alive for 60 seconds (idle timeout) + +### Testing -- We'll have a `RoborockManager` responsible for managing the connections and getting devices. +The test suite uses mocking extensively to avoid real devices: -- Caching can be persisted to disk. The caller can implement the cache storage themselves, but we need to give them an API to do so. +- `Mock` and `AsyncMock` for channels and sessions +- Fake message generators (`mqtt_packet.gen_publish()`) +- Snapshot testing for complex data structures +- Time-based tests use small timeouts (10-50ms) for speed -- Users don't really choose between cloud vs local. However, we will want to allow the caller to know if its using the locale connection so we can show a warnings. +Example test structure: +```python +@pytest.fixture +def mock_mqtt_channel(): + """Mock MQTT channel that simulates responses.""" + channel = AsyncMock(spec=MqttChannel) + channel.response_queue = [] + + async def publish_side_effect(message): + # Simulate device response + if channel.response_queue: + response = channel.response_queue.pop(0) + await callback(response) + + channel.publish.side_effect = publish_side_effect + return channel +``` From 452ae62f574685ce8aad431051276bc80a61156c Mon Sep 17 00:00:00 2001 From: Allen Porter Date: Thu, 4 Dec 2025 21:33:11 -0800 Subject: [PATCH 2/2] chore: fix lint errors in readme --- roborock/devices/README.md | 48 ++++++++++++++++++++------------------ 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/roborock/devices/README.md b/roborock/devices/README.md index 769a7d64..38637b98 100644 --- a/roborock/devices/README.md +++ b/roborock/devices/README.md @@ -13,36 +13,36 @@ graph TB subgraph "Application Layer" User[Application Code] end - + subgraph "Device Management Layer" DM[DeviceManager] Traits[Device Traits] end - + subgraph "Channel Layer" V1C[V1Channel] RPC[RpcChannel] MC[MqttChannel] LC[LocalChannel] end - + subgraph "Session Layer" MS[MqttSession] LS[LocalSession Factory] end - + subgraph "Protocol Layer" Proto[Protocol Encoders/Decoders] V1P[V1 Protocol] A01P[A01 Protocol] B01P[B01 Protocol] end - + subgraph "Transport Layer" MQTT[MQTT Broker] TCP[TCP Socket] end - + User --> DM User --> Traits DM --> V1C @@ -59,7 +59,7 @@ graph TB Proto --> B01P MS --> MQTT LC --> TCP - + style User fill:#e1f5ff style DM fill:#fff4e1 style V1C fill:#ffe1e1 @@ -129,22 +129,22 @@ sequenceDiagram participant MS as MqttSession participant Broker as MQTT Broker participant Device as Vacuum Device - + App->>DM: create_device_manager() DM->>MS: Create MQTT Session MS->>Broker: Connect Broker-->>MS: Connected - + App->>DM: get_devices() DM->>V1C: Create V1Channel V1C->>MC: Create MqttChannel V1C->>LC: Create LocalChannel (if available) - + Note over V1C: Subscribe to device topics V1C->>MC: subscribe() MC->>MS: subscribe(topic, callback) MS->>Broker: SUBSCRIBE - + Note over V1C: Fetch network info via MQTT V1C->>RPC: send_command(GET_NETWORK_INFO) RPC->>MC: publish(request) @@ -156,12 +156,12 @@ sequenceDiagram MS->>MC: callback(message) MC->>RPC: decoded message RPC-->>V1C: NetworkInfo - + Note over V1C: Connect locally using IP from NetworkInfo V1C->>LC: connect() LC->>Device: TCP Connect :58867 Device-->>LC: Connected - + Note over V1C: Send command (prefers local) App->>V1C: send_command(GET_STATUS) V1C->>RPC: send_command() @@ -172,6 +172,8 @@ sequenceDiagram RPC-->>App: Status ``` + + ### MQTT connection - Initial device information must be obtained from MQTT @@ -205,7 +207,7 @@ graph LR E -->|5. Match request_id| F[Future.set_result] F -->|6. Return| G[Command Result] end - + subgraph "Channel Layer" C --> H[Subscription Map] D --> I[Transport] @@ -237,7 +239,7 @@ classDiagram +publish(message) +is_connected() bool } - + class MqttChannel { -MqttSession session -duid: str @@ -245,7 +247,7 @@ classDiagram +subscribe(callback) +publish(message) } - + class LocalChannel { -host: str -transport: Transport @@ -255,7 +257,7 @@ classDiagram +publish(message) +close() } - + class V1Channel { -MqttChannel mqtt_channel -LocalChannel local_channel @@ -263,12 +265,12 @@ classDiagram +send_command(method, params) +subscribe(callback) } - + class RpcChannel { -List~RpcStrategy~ strategies +send_command(method, params) } - + class RpcStrategy { +name: str +channel: Channel @@ -276,7 +278,7 @@ classDiagram +decoder: Callable +health_manager: HealthManager } - + class MqttSession { -Client client -dict listeners @@ -285,7 +287,7 @@ classDiagram +publish(topic, payload) +close() } - + Channel <|-- MqttChannel Channel <|-- LocalChannel Channel <|-- V1Channel @@ -495,13 +497,13 @@ def mock_mqtt_channel(): """Mock MQTT channel that simulates responses.""" channel = AsyncMock(spec=MqttChannel) channel.response_queue = [] - + async def publish_side_effect(message): # Simulate device response if channel.response_queue: response = channel.response_queue.pop(0) await callback(response) - + channel.publish.side_effect = publish_side_effect return channel ```