Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions plugwise/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def _detect_low_batteries(self) -> list[str]:
mac_pattern = re.compile(r"(?:[0-9A-F]{2}){8}")
matches = ["Battery", "below"]
if self._notifications:
for msg_id, notification in list(self._notifications.items()):
for msg_id, notification in self._notifications.copy().items():
mac_address: str | None = None
message: str | None = notification.get("message")
warning: str | None = notification.get("warning")
Expand Down Expand Up @@ -232,12 +232,12 @@ def _get_adam_data(self, entity: GwEntityData, data: GwEntityData) -> None:
if self._on_off_device and isinstance(self._heating_valves(), int):
data["binary_sensors"]["heating_state"] = self._heating_valves() != 0
# Add cooling_enabled binary_sensor
if "binary_sensors" in data:
if (
"cooling_enabled" not in data["binary_sensors"]
and self._cooling_present
):
data["binary_sensors"]["cooling_enabled"] = self._cooling_enabled
if (
"binary_sensors" in data
and "cooling_enabled" not in data["binary_sensors"]
and self._cooling_present
):
data["binary_sensors"]["cooling_enabled"] = self._cooling_enabled

# Show the allowed regulation_modes and gateway_modes
if entity["dev_class"] == "gateway":
Expand Down
2 changes: 1 addition & 1 deletion plugwise/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ def _scan_thermostats(self) -> None:
for entity_id, entity in self.gw_entities.items():
self._rank_thermostat(thermo_matching, loc_id, entity_id, entity)

for loc_id, loc_data in list(self._thermo_locs.items()):
for loc_id, loc_data in self._thermo_locs.items():
if loc_data["primary_prio"] != 0:
self._zones[loc_id] = {
"dev_class": "climate",
Expand Down
157 changes: 47 additions & 110 deletions tests/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ async def load_testdata(
content = await testdata_file.read()
return json.loads(content)

async def setup_app(
def setup_app(
self,
broken=False,
timeout=False,
timeout_happened=False,
raise_timeout=False,
fail_auth=False,
stretch=False,
Expand All @@ -108,7 +108,7 @@ async def setup_app(

if broken:
app.router.add_get(CORE_DOMAIN_OBJECTS, self.smile_broken)
elif timeout:
elif timeout_happened:
app.router.add_get(CORE_DOMAIN_OBJECTS, self.smile_timeout)
else:
app.router.add_get(CORE_DOMAIN_OBJECTS, self.smile_domain_objects)
Expand All @@ -132,10 +132,10 @@ async def setup_app(

return app

async def setup_legacy_app(
def setup_legacy_app(
self,
broken=False,
timeout=False,
timeout_happened=False,
raise_timeout=False,
fail_auth=False,
stretch=False,
Expand All @@ -151,7 +151,7 @@ async def setup_legacy_app(

if broken:
app.router.add_get(CORE_LOCATIONS, self.smile_broken)
elif timeout:
elif timeout_happened:
app.router.add_get(CORE_LOCATIONS, self.smile_timeout)
else:
app.router.add_get(CORE_LOCATIONS, self.smile_locations)
Expand Down Expand Up @@ -264,24 +264,27 @@ async def smile_fail_auth(cls, request):
raise aiohttp.web.HTTPUnauthorized()

@staticmethod
def connect_status(broken, timeout, fail_auth):
def connect_status(broken, timeout_happened, fail_auth):
"""Determine assumed status from settings."""
assumed_status = 200
if broken:
assumed_status = 500
if timeout:
if timeout_happened:
assumed_status = 504
if fail_auth:
assumed_status = 401
return assumed_status

async def connect(
self,
function,
broken=False,
timeout=False,
timeout_happened=False,
raise_timeout=False,
real_timeout_value=10,
fail_auth=False,
stretch=False,
url_part=CORE_DOMAIN_OBJECTS,
):
"""Connect to a smile environment and perform basic asserts."""
port = aiohttp.test_utils.unused_port()
Expand All @@ -290,7 +293,7 @@ async def connect(
)

# Happy flow
app = await self.setup_app(broken, timeout, raise_timeout, fail_auth, stretch)
app = function(broken, timeout_happened, raise_timeout, fail_auth, stretch)

server = aiohttp.test_utils.TestServer(
app, port=port, scheme="http", host="127.0.0.1"
Expand All @@ -300,20 +303,20 @@ async def connect(
client = aiohttp.test_utils.TestClient(server)
websession = client.session

url = f"{server.scheme}://{server.host}:{server.port}{CORE_DOMAIN_OBJECTS}"
url = f"{server.scheme}://{server.host}:{server.port}{url_part}"

# Try/exceptpass to accommodate for Timeout of aoihttp
try:
resp = await websession.get(url)
assumed_status = self.connect_status(broken, timeout, fail_auth)
assumed_status = self.connect_status(broken, timeout_happened, fail_auth)
assert resp.status == assumed_status
timeoutpass_result = False
assert timeoutpass_result
except Exception: # pylint: disable=broad-except
timeoutpass_result = True
assert timeoutpass_result

if not broken and not timeout and not fail_auth:
if not broken and not timeout_happened and not fail_auth:
text = await resp.text()
assert "xml" in text

Expand All @@ -340,101 +343,15 @@ async def connect(
websession=websession,
)

if not timeout:
if not timeout_happened:
assert smile._timeout == 30

# Connect to the smile
smile_version = None
try:
smile_version = await smile.connect()
assert smile_version is not None
assert smile._timeout == 10
return server, smile, client
except (
pw_exceptions.ConnectionFailedError,
pw_exceptions.InvalidXMLError,
pw_exceptions.InvalidAuthentication,
) as exception:
assert smile_version is None
await self.disconnect(server, client)
raise exception

async def connect_legacy(
self,
broken=False,
timeout=False,
raise_timeout=False,
fail_auth=False,
stretch=False,
):
"""Connect to a smile environment and perform basic asserts."""
port = aiohttp.test_utils.unused_port()
test_password = "".join(
secrets.choice(string.ascii_lowercase) for _ in range(8)
)

# Happy flow
app = await self.setup_legacy_app(
broken, timeout, raise_timeout, fail_auth, stretch
)

server = aiohttp.test_utils.TestServer(
app, port=port, scheme="http", host="127.0.0.1"
)
await server.start_server()

client = aiohttp.test_utils.TestClient(server)
websession = client.session

url = f"{server.scheme}://{server.host}:{server.port}{CORE_LOCATIONS}"

# Try/exceptpass to accommodate for Timeout of aoihttp
try:
resp = await websession.get(url)
assumed_status = self.connect_status(broken, timeout, fail_auth)
assert resp.status == assumed_status
timeoutpass_result = False
assert timeoutpass_result
except Exception: # pylint: disable=broad-except
timeoutpass_result = True
assert timeoutpass_result

if not broken and not timeout and not fail_auth:
text = await resp.text()
assert "xml" in text

# Test lack of websession
try:
smile = pw_smile.Smile(
host=server.host,
username=pw_constants.DEFAULT_USERNAME,
password=test_password,
port=server.port,
websession=None,
)
lack_of_websession = False
assert lack_of_websession
except Exception: # pylint: disable=broad-except
lack_of_websession = True
assert lack_of_websession

smile = pw_smile.Smile(
host=server.host,
username=pw_constants.DEFAULT_USERNAME,
password=test_password,
port=server.port,
websession=websession,
)

if not timeout:
assert smile._timeout == 30

# Connect to the smile
smile_version = None
try:
smile_version = await smile.connect()
assert smile_version is not None
assert smile._timeout == 30
assert smile._timeout == real_timeout_value
return server, smile, client
except (
pw_exceptions.ConnectionFailedError,
Expand All @@ -453,7 +370,7 @@ async def connect_wrapper(
if fail_auth:
try:
_LOGGER.warning("Connecting to device with invalid credentials:")
await self.connect(fail_auth=fail_auth)
await self.connect(self.setup_app, fail_auth=fail_auth)
_LOGGER.error(" - invalid credentials not handled") # pragma: no cover
raise self.ConnectError # pragma: no cover
except pw_exceptions.InvalidAuthentication as exc:
Expand All @@ -462,53 +379,73 @@ async def connect_wrapper(

if raise_timeout:
_LOGGER.warning("Connecting to device exceeding timeout in handling:")
return await self.connect(raise_timeout=True)
return await self.connect(self.setup_app, raise_timeout=True)

try:
_LOGGER.warning("Connecting to device exceeding timeout in response:")
await self.connect(timeout=True)
await self.connect(self.setup_app, timeout_happened=True)
_LOGGER.error(" - timeout not handled") # pragma: no cover
raise self.ConnectError # pragma: no cover
except pw_exceptions.ConnectionFailedError:
_LOGGER.info(" + successfully passed timeout handling.")

try:
_LOGGER.warning("Connecting to device with missing data:")
await self.connect(broken=True)
await self.connect(self.setup_app, broken=True)
_LOGGER.error(" - broken information not handled") # pragma: no cover
raise self.ConnectError # pragma: no cover
except pw_exceptions.InvalidXMLError:
_LOGGER.info(" + successfully passed XML issue handling.")

_LOGGER.info("Connecting to functioning device:")
return await self.connect(stretch=stretch)
return await self.connect(self.setup_app, stretch=stretch)

async def connect_legacy_wrapper(
self, raise_timeout=False, fail_auth=False, stretch=False
):
"""Wrap connect to try negative testing before positive testing."""
if raise_timeout:
_LOGGER.warning("Connecting to device exceeding timeout in handling:")
return await self.connect_legacy(raise_timeout=True)
return await self.connect(
self.setup_legacy_app,
raise_timeout=True,
real_timeout_value=30,
url_part=CORE_LOCATIONS,
)

try:
_LOGGER.warning("Connecting to device exceeding timeout in response:")
await self.connect_legacy(timeout=True)
await self.connect(
self.setup_legacy_app,
real_timeout_value=30,
timeout_happened=True,
url_part=CORE_LOCATIONS,
)
_LOGGER.error(" - timeout not handled") # pragma: no cover
raise self.ConnectError # pragma: no cover
except pw_exceptions.ConnectionFailedError:
_LOGGER.info(" + successfully passed timeout handling.")

try:
_LOGGER.warning("Connecting to device with missing data:")
await self.connect_legacy(broken=True)
await self.connect(
self.setup_legacy_app,
broken=True,
real_timeout_value=30,
url_part=CORE_LOCATIONS,
)
_LOGGER.error(" - broken information not handled") # pragma: no cover
raise self.ConnectError # pragma: no cover
except pw_exceptions.InvalidXMLError:
_LOGGER.info(" + successfully passed XML issue handling.")

_LOGGER.info("Connecting to functioning device:")
return await self.connect_legacy(stretch=stretch)
return await self.connect(
self.setup_legacy_app,
real_timeout_value=30,
stretch=stretch,
url_part=CORE_LOCATIONS,
)

# Generic disconnect
@classmethod
Expand Down
Loading