-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMerossCloudletMonitor.py
More file actions
49 lines (39 loc) · 1.68 KB
/
MerossCloudletMonitor.py
File metadata and controls
49 lines (39 loc) · 1.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import asyncio
import os
from meross_iot.controller.mixins.electricity import ElectricityMixin
from meross_iot.http_api import MerossHttpClient
from meross_iot.manager import MerossManager
from time import sleep
EMAIL = os.environ.get('MEROSS_EMAIL') or "YOUR_MEROSS_CLOUD_EMAIL"
PASSWORD = os.environ.get('MEROSS_PASSWORD') or "YOUR_MEROSS_CLOUD_PASSWORD"
async def main():
# Setup the HTTP client API from user-password
http_api_client = await MerossHttpClient.async_from_user_password(email=EMAIL, password=PASSWORD)
# Setup and start the device manager
manager = MerossManager(http_client=http_api_client)
await manager.async_init()
# Retrieve all the devices that implement the electricity mixin
await manager.async_device_discovery()
devs = manager.find_devices(device_class=ElectricityMixin)
if len(devs) < 1:
# print("No electricity-capable device found...")
print("")
else:
dev = devs[0]
while True:
# Read the electricity power/voltage/current
instant_consumption = await dev.async_get_instant_metrics()
# print(f"Current consumption data: {instant_consumption}")
out = open("/tmp/instant_consumption.power", "w")
out.write(f"{instant_consumption.power}")
out.close()
sleep(10)
# Close the manager and logout from http_api
manager.close()
await http_api_client.async_logout()
if __name__ == '__main__':
# On Windows + Python 3.8, you should uncomment the following
# asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()