Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion roborock/data/code_mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class RoborockModeEnum(StrEnum):
"""A custom StrEnum that also stores an integer code for each member."""

code: int
"""The integer code associated with the enum member."""

def __new__(cls, value: str, code: int) -> RoborockModeEnum:
"""Creates a new enum member."""
Expand All @@ -68,7 +69,18 @@ def from_code(cls, code: int) -> RoborockModeEnum:
for member in cls:
if member.code == code:
return member
raise ValueError(f"{code} is not a valid code for {cls.__name__}")
message = f"{code} is not a valid code for {cls.__name__}"
if message not in completed_warnings:
completed_warnings.add(message)
_LOGGER.warning(message)
raise ValueError(message)

@classmethod
def from_code_optional(cls, code: int) -> RoborockModeEnum | None:
try:
return cls.from_code(code)
except ValueError:
return None
Comment on lines +78 to +83
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine with this - how we handled it on RoborockEnum though is that we had a missing attribute that would automatically be set when you give it a invalid value. Fine either direction though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to add it that way but it doesn't work well with the enum constructor (it started saying code was required) and I couldn't really figure it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it works based on the constructor but not sure if that works here given we're doing from_code. Happy to change it just not sure what to do exactly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure either.

What we did before was override missing to handle if the value was missing. But to actually take advantage of it, every instance has to have a specific key added, which isn't ideal. As far as I am aware there is no way to do it from a parent class automatically as enums don't inherit members.

I have no issue with this approach, we will just have to play it by ear and see if we need to change how we handle anything.


@classmethod
def from_value(cls, value: str) -> RoborockModeEnum:
Expand Down
88 changes: 88 additions & 0 deletions roborock/protocols/b01_q10_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""Roborock B01 Protocol encoding and decoding."""

import json
import logging
from typing import Any

from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP
from roborock.exceptions import RoborockException
from roborock.roborock_message import (
RoborockMessage,
RoborockMessageProtocol,
)

_LOGGER = logging.getLogger(__name__)

B01_VERSION = b"B01"
ParamsType = list | dict | int | None


def encode_mqtt_payload(command: B01_Q10_DP, params: ParamsType) -> RoborockMessage:
"""Encode payload for B01 Q10 commands over MQTT.

This does not perform any special encoding for the command parameters and expects
them to already be in a request specific format.
"""
dps_data = {
"dps": {
# Important: some commands use falsy values so only default to `{}` when params is actually None.
command.code: params if params is not None else {},
}
}
return RoborockMessage(
protocol=RoborockMessageProtocol.RPC_REQUEST,
version=B01_VERSION,
payload=json.dumps(dps_data).encode("utf-8"),
)


def _convert_datapoints(datapoints: dict[str, Any], message: RoborockMessage) -> dict[B01_Q10_DP, Any]:
"""Convert the 'dps' dictionary keys from strings to B01_Q10_DP enums."""
result: dict[B01_Q10_DP, Any] = {}
for key, value in datapoints.items():
try:
code = int(key)
except ValueError as e:
raise ValueError(f"dps key is not a valid integer: {e} for {message.payload!r}") from e
if (dps := B01_Q10_DP.from_code_optional(code)) is not None:
# Update from_code to use `Self` on newer python version to remove this type ignore
result[dps] = value # type: ignore[index]
return result


def decode_rpc_response(message: RoborockMessage) -> dict[B01_Q10_DP, Any]:
"""Decode a B01 Q10 RPC_RESPONSE message.

This does not perform any special decoding for the response body, but does
convert the 'dps' keys from strings to B01_Q10_DP enums.
"""
if not message.payload:
raise RoborockException("Invalid B01 message format: missing payload")
try:
payload = json.loads(message.payload.decode())
except (json.JSONDecodeError, UnicodeDecodeError) as e:
raise RoborockException(f"Invalid B01 json payload: {e} for {message.payload!r}") from e

if (datapoints := payload.get("dps")) is None:
raise RoborockException(f"Invalid B01 json payload: missing 'dps' for {message.payload!r}")
if not isinstance(datapoints, dict):
raise RoborockException(f"Invalid B01 message format: 'dps' should be a dictionary for {message.payload!r}")

try:
result = _convert_datapoints(datapoints, message)
except ValueError as e:
raise RoborockException(f"Invalid B01 message format: {e}") from e

# The COMMON response contains nested datapoints need conversion. To simplify
# response handling at higher levels we flatten these into the main result.
if B01_Q10_DP.COMMON in result:
common_result = result.pop(B01_Q10_DP.COMMON)
if not isinstance(common_result, dict):
raise RoborockException(f"Invalid dpCommon format: expected dict, got {type(common_result).__name__}")
try:
common_dps_result = _convert_datapoints(common_result, message)
except ValueError as e:
raise RoborockException(f"Invalid dpCommon format: {e}") from e
Comment on lines +82 to +85
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm unconvinced we should error out here. i.e. a new data point is added, rather than erroring out, we should be made aware of it, but if there are other dps that we DO have, we should continue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, updated to add logic into the enum parsing to handle logging unknown codes and to support optional parsing. Updated the tests to exercise this case with a mix of known and unknown DPS keys.

result.update(common_dps_result)

return result
5 changes: 5 additions & 0 deletions tests/data/test_code_mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ def test_invalid_from_code() -> None:
B01_Q10_DP.from_code(999999)


def test_invalid_from_code_optional() -> None:
"""Test invalid from_code_optional method."""
assert B01_Q10_DP.from_code_optional(999999) is None


def test_from_name() -> None:
"""Test from_name method."""
assert B01_Q10_DP.START_CLEAN == B01_Q10_DP.from_name("START_CLEAN")
Expand Down
91 changes: 91 additions & 0 deletions tests/protocols/__snapshots__/test_b01_q10_protocol.ambr
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# serializer version: 1
# name: test_decode_rpc_payload[dpBattery]
'''
{
"dpBattery": 100
}
'''
# ---
# name: test_decode_rpc_payload[dpRequetdps]
'''
{
"dpStatus": 8,
"dpBattery": 100,
"dpfunLevel": 2,
"dpWaterLevel": 1,
"dpMainBrushLife": 0,
"dpSideBrushLife": 0,
"dpFilterLife": 0,
"dpCleanCount": 1,
"dpCleanMode": 1,
"dpCleanTaskType": 0,
"dpBackType": 5,
"dpBreakpointClean": 0,
"dpValleyPointCharging": false,
"dpRobotCountryCode": "us",
"dpUserPlan": 0,
"dpNotDisturb": 1,
"dpVolume": 74,
"dpTotalCleanArea": 0,
"dpTotalCleanCount": 0,
"dpTotalCleanTime": 0,
"dpDustSwitch": 1,
"dpMopState": 1,
"dpAutoBoost": 0,
"dpChildLock": 0,
"dpDustSetting": 0,
"dpMapSaveSwitch": true,
"dpRecendCleanRecord": false,
"dpCleanTime": 0,
"dpMultiMapSwitch": 1,
"dpSensorLife": 0,
"dpCleanArea": 0,
"dpCarpetCleanType": 0,
"dpCleanLine": 0,
"dpTimeZone": {
"timeZoneCity": "America/Los_Angeles",
"timeZoneSec": -28800
},
"dpAreaUnit": 0,
"dpNetInfo": {
"ipAdress": "1.1.1.2",
"mac": "99:AA:88:BB:77:CC",
"signal": -50,
"wifiName": "wifi-network-name"
},
"dpRobotType": 1,
"dpLineLaserObstacleAvoidance": 1,
"dpCleanProgess": 100,
"dpGroundClean": 0,
"dpFault": 0,
"dpNotDisturbExpand": {
"disturb_dust_enable": 1,
"disturb_light": 1,
"disturb_resume_clean": 1,
"disturb_voice": 1
},
"dpTimerType": 1,
"dpAddCleanState": 0
}
'''
# ---
# name: test_decode_rpc_payload[dpStatus-dpCleanTaskType]
'''
{
"dpStatus": 8,
"dpCleanTaskType": 0
}
'''
# ---
# name: test_encode_mqtt_payload[dpRequetdps-None]
b'{"dps": {"102": {}}}'
# ---
# name: test_encode_mqtt_payload[dpRequetdps-params0]
b'{"dps": {"102": {}}}'
# ---
# name: test_encode_mqtt_payload[dpStartClean-params2]
b'{"dps": {"201": {"cmd": 1}}}'
# ---
# name: test_encode_mqtt_payload[dpWaterLevel-2]
b'{"dps": {"124": 2}}'
# ---
116 changes: 116 additions & 0 deletions tests/protocols/test_b01_q10_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""Tests for the B01 protocol message encoding and decoding."""

import json
import pathlib
from collections.abc import Generator
from typing import Any

import pytest
from freezegun import freeze_time
from syrupy import SnapshotAssertion

from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP, YXWaterLevel
from roborock.exceptions import RoborockException
from roborock.protocols.b01_q10_protocol import (
decode_rpc_response,
encode_mqtt_payload,
)
from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol

TESTDATA_PATH = pathlib.Path("tests/protocols/testdata/b01_q10_protocol/")
TESTDATA_FILES = list(TESTDATA_PATH.glob("*.json"))
TESTDATA_IDS = [x.stem for x in TESTDATA_FILES]


@pytest.fixture(autouse=True)
def fixed_time_fixture() -> Generator[None, None, None]:
"""Fixture to freeze time for predictable request IDs."""
with freeze_time("2025-01-20T12:00:00"):
yield


@pytest.mark.parametrize("filename", TESTDATA_FILES, ids=TESTDATA_IDS)
def test_decode_rpc_payload(filename: str, snapshot: SnapshotAssertion) -> None:
"""Test decoding a B01 RPC response protocol message."""
with open(filename, "rb") as f:
payload = f.read()

message = RoborockMessage(
protocol=RoborockMessageProtocol.RPC_RESPONSE,
payload=payload,
seq=12750,
version=b"B01",
random=97431,
timestamp=1652547161,
)

decoded_message = decode_rpc_response(message)
assert json.dumps(decoded_message, indent=2) == snapshot


@pytest.mark.parametrize(
("payload", "expected_error_message"),
[
(b"", "missing payload"),
(b"n", "Invalid B01 json payload"),
(b"{}", "missing 'dps'"),
(b'{"dps": []}', "'dps' should be a dictionary"),
(b'{"dps": {"not_a_number": 123}}', "dps key is not a valid integer"),
(b'{"dps": {"101": 123}}', "Invalid dpCommon format: expected dict"),
(b'{"dps": {"101": {"not_a_number": 123}}}', "Invalid dpCommon format: dps key is not a valid intege"),
],
)
def test_decode_invalid_rpc_payload(payload: bytes, expected_error_message: str) -> None:
"""Test decoding a B01 RPC response protocol message."""
message = RoborockMessage(
protocol=RoborockMessageProtocol.RPC_RESPONSE,
payload=payload,
seq=12750,
version=b"B01",
random=97431,
timestamp=1652547161,
)
with pytest.raises(RoborockException, match=expected_error_message):
decode_rpc_response(message)


def test_decode_unknown_dps_code() -> None:
"""Test decoding a B01 RPC response protocol message."""
message = RoborockMessage(
protocol=RoborockMessageProtocol.RPC_RESPONSE,
payload=b'{"dps": {"909090": 123, "122":100}}',
seq=12750,
version=b"B01",
random=97431,
timestamp=1652547161,
)

decoded_message = decode_rpc_response(message)
assert decoded_message == {
B01_Q10_DP.BATTERY: 100,
}


@pytest.mark.parametrize(
("command", "params"),
[
(B01_Q10_DP.REQUETDPS, {}),
(B01_Q10_DP.REQUETDPS, None),
(B01_Q10_DP.START_CLEAN, {"cmd": 1}),
(B01_Q10_DP.WATER_LEVEL, YXWaterLevel.MIDDLE.code),
],
)
def test_encode_mqtt_payload(command: B01_Q10_DP, params: dict[str, Any], snapshot) -> None:
"""Test encoding of MQTT payload for B01 Q10 commands."""

message = encode_mqtt_payload(command, params)
assert isinstance(message, RoborockMessage)
assert message.protocol == RoborockMessageProtocol.RPC_REQUEST
assert message.version == b"B01"
assert message.payload is not None

# Snapshot the raw payload to ensure stable encoding. We verify it is
# valid json
assert snapshot == message.payload

json.loads(message.payload.decode())
1 change: 1 addition & 0 deletions tests/protocols/testdata/b01_q10_protocol/dpBattery.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"dps":{"122":100},"t":1766800902}
1 change: 1 addition & 0 deletions tests/protocols/testdata/b01_q10_protocol/dpRequetdps.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"dps":{"101":{"104":0,"105":false,"109":"us","207":0,"25":1,"26":74,"29":0,"30":0,"31":0,"37":1,"40":1,"45":0,"47":0,"50":0,"51":true,"53":false,"6":0,"60":1,"67":0,"7":0,"76":0,"78":0,"79":{"timeZoneCity":"America/Los_Angeles","timeZoneSec":-28800},"80":0,"81":{"ipAdress":"1.1.1.2","mac":"99:AA:88:BB:77:CC","signal":-50,"wifiName":"wifi-network-name"},"83":1,"86":1,"87":100,"88":0,"90":0,"92":{"disturb_dust_enable":1,"disturb_light":1,"disturb_resume_clean":1,"disturb_voice":1},"93":1,"96":0},"121":8,"122":100,"123":2,"124":1,"125":0,"126":0,"127":0,"136":1,"137":1,"138":0,"139":5},"t":1766802312}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"dps":{"121":8,"138":0},"t":1766800904}