From a71d76b76f66a9f4c9d09038bf314793bc73a214 Mon Sep 17 00:00:00 2001 From: Allen Porter Date: Sun, 29 Jun 2025 17:50:32 -0700 Subject: [PATCH] chore: Refactor authorization header --- roborock/web_api.py | 82 ++++++++++++++++++++++----------------------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/roborock/web_api.py b/roborock/web_api.py index 99a85b57..aafdfb61 100644 --- a/roborock/web_api.py +++ b/roborock/web_api.py @@ -90,39 +90,6 @@ def _get_header_client_id(self): md5.update(self._device_identifier.encode()) return base64.b64encode(md5.digest()).decode() - def _process_extra_hawk_values(self, values: dict | None) -> str: - if values is None: - return "" - else: - sorted_keys = sorted(values.keys()) - result = [] - for key in sorted_keys: - value = values.get(key) - result.append(f"{key}={value}") - return hashlib.md5("&".join(result).encode()).hexdigest() - - def _get_hawk_authentication( - self, rriot: RRiot, url: str, formdata: dict | None = None, params: dict | None = None - ) -> str: - timestamp = math.floor(time.time()) - nonce = secrets.token_urlsafe(6) - formdata_str = self._process_extra_hawk_values(formdata) - params_str = self._process_extra_hawk_values(params) - - prestr = ":".join( - [ - rriot.u, - rriot.s, - nonce, - str(timestamp), - hashlib.md5(url.encode()).hexdigest(), - params_str, - formdata_str, - ] - ) - mac = base64.b64encode(hmac.new(rriot.h.encode(), prestr.encode(), hashlib.sha256).digest()).decode() - return f'Hawk id="{rriot.u}",s="{rriot.s}",ts="{timestamp}",nonce="{nonce}",mac="{mac}"' - async def nc_prepare(self, user_data: UserData, timezone: str) -> dict: """This gets a few critical parameters for adding a device to your account.""" if ( @@ -144,7 +111,7 @@ async def nc_prepare(self, user_data: UserData, timezone: str) -> dict: "post", "/nc/prepare", headers={ - "Authorization": self._get_hawk_authentication( + "Authorization": _get_hawk_authentication( user_data.rriot, "/nc/prepare", {"hid": hid, "tzid": timezone} ), }, @@ -177,7 +144,7 @@ async def add_device(self, user_data: UserData, s: str, t: str) -> dict: "GET", "/user/devices/newadd", headers={ - "Authorization": self._get_hawk_authentication( + "Authorization": _get_hawk_authentication( user_data.rriot, "/user/devices/newadd", params={"s": s, "t": t} ), }, @@ -337,7 +304,7 @@ async def get_home_data(self, user_data: UserData) -> HomeData: rriot.r.a, self.session, { - "Authorization": self._get_hawk_authentication(rriot, f"/user/homes/{str(home_id)}"), + "Authorization": _get_hawk_authentication(rriot, f"/user/homes/{str(home_id)}"), }, ) home_response = await home_request.request("get", "/user/homes/" + str(home_id)) @@ -366,7 +333,7 @@ async def get_home_data_v2(self, user_data: UserData) -> HomeData: rriot.r.a, self.session, { - "Authorization": self._get_hawk_authentication(rriot, "/v2/user/homes/" + str(home_id)), + "Authorization": _get_hawk_authentication(rriot, "/v2/user/homes/" + str(home_id)), }, ) home_response = await home_request.request("get", "/v2/user/homes/" + str(home_id)) @@ -393,7 +360,7 @@ async def get_home_data_v3(self, user_data: UserData) -> HomeData: rriot.r.a, self.session, { - "Authorization": self._get_hawk_authentication(rriot, "/v3/user/homes/" + str(home_id)), + "Authorization": _get_hawk_authentication(rriot, "/v3/user/homes/" + str(home_id)), }, ) home_response = await home_request.request("get", "/v3/user/homes/" + str(home_id)) @@ -416,7 +383,7 @@ async def get_rooms(self, user_data: UserData, home_id: int | None = None) -> li rriot.r.a, self.session, { - "Authorization": self._get_hawk_authentication(rriot, "/v2/user/homes/" + str(home_id)), + "Authorization": _get_hawk_authentication(rriot, "/v2/user/homes/" + str(home_id)), }, ) room_response = await room_request.request("get", f"/user/homes/{str(home_id)}/rooms" + str(home_id)) @@ -441,7 +408,7 @@ async def get_scenes(self, user_data: UserData, device_id: str) -> list[HomeData rriot.r.a, self.session, { - "Authorization": self._get_hawk_authentication(rriot, f"/user/scene/device/{str(device_id)}"), + "Authorization": _get_hawk_authentication(rriot, f"/user/scene/device/{str(device_id)}"), }, ) scenes_response = await scenes_request.request("get", f"/user/scene/device/{str(device_id)}") @@ -463,7 +430,7 @@ async def execute_scene(self, user_data: UserData, scene_id: int) -> None: rriot.r.a, self.session, { - "Authorization": self._get_hawk_authentication(rriot, f"/user/scene/{str(scene_id)}/execute"), + "Authorization": _get_hawk_authentication(rriot, f"/user/scene/{str(scene_id)}/execute"), }, ) execute_scene_response = await execute_scene_request.request("POST", f"/user/scene/{str(scene_id)}/execute") @@ -546,3 +513,36 @@ async def request(self, method: str, url: str, params=None, data=None, headers=N finally: if close_session: await session.close() + + +def _process_extra_hawk_values(values: dict | None) -> str: + if values is None: + return "" + else: + sorted_keys = sorted(values.keys()) + result = [] + for key in sorted_keys: + value = values.get(key) + result.append(f"{key}={value}") + return hashlib.md5("&".join(result).encode()).hexdigest() + + +def _get_hawk_authentication(rriot: RRiot, url: str, formdata: dict | None = None, params: dict | None = None) -> str: + timestamp = math.floor(time.time()) + nonce = secrets.token_urlsafe(6) + formdata_str = _process_extra_hawk_values(formdata) + params_str = _process_extra_hawk_values(params) + + prestr = ":".join( + [ + rriot.u, + rriot.s, + nonce, + str(timestamp), + hashlib.md5(url.encode()).hexdigest(), + params_str, + formdata_str, + ] + ) + mac = base64.b64encode(hmac.new(rriot.h.encode(), prestr.encode(), hashlib.sha256).digest()).decode() + return f'Hawk id="{rriot.u}",s="{rriot.s}",ts="{timestamp}",nonce="{nonce}",mac="{mac}"'