diff --git a/.gitignore b/.gitignore index c1c7385..1053beb 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,4 @@ __pycache__/* test/nl.py secrets.json logfile.log +lghorizon.log diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..8ec29e5 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,15 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python Debugger: Debug LGHorizon", + "type": "debugpy", + "request": "launch", + "program": "main.py", + "console": "integratedTerminal" + } + ] +} diff --git a/README.md b/README.md index 150de8d..f6d6111 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,153 @@ # LG Horizon Api -Python library to control multiple LG Horizon boxes +# LG Horizon API Python Library + +A Python library to interact with and control LG Horizon set-top boxes. This library provides functionalities for authentication, real-time device status monitoring via MQTT, and various control commands for your Horizon devices. + +## Features + +- **Authentication**: Supports authentication using username/password or a refresh token. The library automatically handles access token refreshing. +- **Device Management**: Discover and manage multiple LG Horizon set-top boxes associated with your account. +- **Real-time Status**: Monitor device status (online/running/standby) and current playback information (channel, show, VOD, recording, app) through MQTT. +- **Channel Information**: Retrieve a list of available channels and profile-specific favorite channels. +- **Recording Management**: + - Get a list of all recordings. + - Retrieve recordings for specific shows. + - Check recording quota and usage. +- **Device Control**: Send various commands to your set-top box: + - Power on/off. + - Play, pause, stop, rewind, fast forward. + - Change channels (up/down, direct channel selection). + - Record current program. + - Set player position for VOD/recordings. + - Display custom messages on the TV screen. + - Send emulated remote control key presses. +- **Robustness**: Includes automatic MQTT reconnection with exponential backoff and token refresh logic to maintain a stable connection. + +## Installation + +```bash +pip install lghorizon-python # (Replace with actual package name if different) +``` + +## Usage + +Here's a basic example of how to use the library to connect to your LG Horizon devices and monitor their state: + +First, create a `secrets.json` file in the root of your project with your LG Horizon credentials: + +```json +{ + "username": "your_username", + "password": "your_password", + "country": "nl" // e.g., "nl" for Netherlands, "be" for Belgium +} +``` + +Then, you can use the library as follows: + +```python +import asyncio +import json +import logging +import aiohttp + +from lghorizon.lghorizon_api import LGHorizonApi +from lghorizon.lghorizon_models import LGHorizonAuth + +_LOGGER = logging.getLogger(__name__) + +async def main(): + logging.basicConfig(level=logging.INFO) # Set to DEBUG for more verbose output + + with open("secrets.json", encoding="utf-8") as f: + secrets = json.load(f) + username = secrets.get("username") + password = secrets.get("password") + country = secrets.get("country", "nl") + + async with aiohttp.ClientSession() as session: + auth = LGHorizonAuth(session, country, username=username, password=password) + api = LGHorizonApi(auth) + + async def device_state_changed_callback(device_id: str): + device = devices[device_id] + _LOGGER.info( + f"Device {device.device_friendly_name} ({device.device_id}) state changed:\n" + f" State: {device.device_state.state.value}\n" + f" UI State: {device.device_state.ui_state_type.value}\n" + f" Source Type: {device.device_state.source_type.value}\n" + f" Channel: {device.device_state.channel_name or 'N/A'} ({device.device_state.channel_id or 'N/A'})\n" + f" Show: {device.device_state.show_title or 'N/A'}\n" + f" Episode: {device.device_state.episode_title or 'N/A'}\n" + f" Position: {device.device_state.position or 'N/A'} / {device.device_state.duration or 'N/A'}\n" + ) + + try: + _LOGGER.info("Initializing LG Horizon API...") + await api.initialize() + devices = await api.get_devices() + + for device in devices.values(): + _LOGGER.info(f"Registering callback for device: {device.device_friendly_name}") + await device.set_callback(device_state_changed_callback) + + _LOGGER.info("API initialized. Monitoring device states. Press Ctrl+C to exit.") + # Keep the script running to receive MQTT updates + while True: + await asyncio.sleep(3600) # Sleep for a long time, MQTT callbacks will still fire + + except Exception as e: + _LOGGER.error(f"An error occurred: {e}", exc_info=True) + finally: + _LOGGER.info("Disconnecting from LG Horizon API.") + await api.disconnect() + _LOGGER.info("Disconnected.") + +if __name__ == "__main__": + asyncio.run(main()) +``` + +## Authentication + +The `LGHorizonAuth` class handles authentication. You can initialize it with a username and password, or directly with a refresh token if you have one. The library automatically refreshes access tokens as needed. + +```python +# Using username and password +auth = LGHorizonAuth(session, "nl", username="your_username", password="your_password") + +# Using a refresh token (e.g., if you've saved it from a previous session) +# auth = LGHorizonAuth(session, "nl", refresh_token="your_refresh_token") +``` + +You can also set a callback to receive the updated refresh token when it's refreshed, allowing you to persist it for future sessions: + +```python +def token_updated_callback(new_refresh_token: str): + print(f"New refresh token received: {new_refresh_token}") + # Here you would typically save this new_refresh_token + # to your secrets.json or other persistent storage. + +# After initializing LGHorizonApi: +# api.set_token_refresh_callback(token_updated_callback) +``` + +## Error Handling + +The library defines custom exceptions for common error scenarios: + +- `LGHorizonApiError`: Base exception for all API-related errors. +- `LGHorizonApiConnectionError`: Raised for network or connection issues. +- `LGHorizonApiUnauthorizedError`: Raised when authentication fails (e.g., invalid credentials). +- `LGHorizonApiLockedError`: A specific type of `LGHorizonApiUnauthorizedError` indicating a locked account. + +These exceptions allow for more granular error handling in your application. + +## Development + +To run the example script (`main.py`) from the repository: + +1. Clone this repository. +2. Install dependencies: `pip install -r requirements.txt` (ensure `requirements.txt` is up-to-date). +3. Create a `secrets.json` file as described in the Usage section. +4. Run `python main.py`. diff --git a/lghorizon/__init__.py b/lghorizon/__init__.py index 2636e8b..1156804 100644 --- a/lghorizon/__init__.py +++ b/lghorizon/__init__.py @@ -1,41 +1,71 @@ """Python client for LG Horizon.""" from .lghorizon_api import LGHorizonApi -from .models import ( - LGHorizonBox, - LGHorizonRecordingListSeasonShow, +from .lghorizon_device import LGHorizonDevice +from .lghorizon_models import ( + LGHorizonAuth, + LGHorizonChannel, + LGHorizonCustomer, + LGHorizonDeviceState, + LGHorizonProfile, + LGHorizonRecording, + LGHorizonRecordingList, + LGHorizonShowRecordingList, + LGHorizonRecordingSeason, LGHorizonRecordingSingle, LGHorizonRecordingShow, - LGHorizonRecordingEpisode, - LGHorizonCustomer, + LGHorizonRecordingQuota, + LGHorizonRecordingType, + LGHorizonUIStateType, + LGHorizonMessageType, + LGHorizonRunningState, + LGHorizonRecordingSource, + LGHorizonRecordingState, + LGHorizonSourceType, + LGHorizonPlayerState, + LGHorizonAppsState, + LGHorizonUIState, + LGHorizonProfileOptions, + LGHorizonServicesConfig, ) from .exceptions import ( - LGHorizonApiUnauthorizedError, + LGHorizonApiError, LGHorizonApiConnectionError, + LGHorizonApiUnauthorizedError, LGHorizonApiLockedError, ) -from .const import ( - ONLINE_RUNNING, - ONLINE_STANDBY, - RECORDING_TYPE_SHOW, - RECORDING_TYPE_SEASON, - RECORDING_TYPE_SINGLE, -) __all__ = [ "LGHorizonApi", - "LGHorizonBox", - "LGHorizonRecordingListSeasonShow", - "LGHorizonRecordingSingle", - "LGHorizonRecordingShow", - "LGHorizonRecordingEpisode", + "LGHorizonDevice", + "LGHorizonAuth", + "LGHorizonChannel", "LGHorizonCustomer", - "LGHorizonApiUnauthorizedError", + "LGHorizonDeviceState", + "LGHorizonProfile", + "LGHorizonApiError", "LGHorizonApiConnectionError", + "LGHorizonApiUnauthorizedError", "LGHorizonApiLockedError", - "ONLINE_RUNNING", - "ONLINE_STANDBY", - "RECORDING_TYPE_SHOW", - "RECORDING_TYPE_SEASON", - "RECORDING_TYPE_SINGLE", -] # noqa + "LGHorizonRecordingList", + "LGHorizonRecordingSeason", + "LGHorizonRecordingSingle", + "LGHorizonRecordingShow", + "LGHorizonRecordingQuota", + "LGHorizonRecordingType", + "LGHorizonUIStateType", + "LGHorizonMessageType", + "LGHorizonRunningState", + "LGHorizonRecordingSource", + "LGHorizonRecordingState", + "LGHorizonSourceType", + "LGHorizonPlayerState", + "LGHorizonAppsState", + "LGHorizonUIState", + "LGHorizonProfileOptions", + "LGHorizonProfile", + "LGHorizonAuth", + "LGHorizonServicesConfig", + "LGHorizonRecording", + "LGHorizonShowRecordingList", +] diff --git a/lghorizon/const.py b/lghorizon/const.py index c78e4aa..94caa71 100644 --- a/lghorizon/const.py +++ b/lghorizon/const.py @@ -38,11 +38,18 @@ BE_AUTH_URL = "https://login.prd.telenet.be/openid/login.do" +PLATFORM_TYPES = { + "EOS": {"manufacturer": "Arris", "model": "DCX960"}, + "EOS2": {"manufacturer": "HUMAX", "model": "2008C-STB-TN"}, + "HORIZON": {"manufacturer": "Arris", "model": "DCX960"}, + "APOLLO": {"manufacturer": "Arris", "model": "VIP5002W"}, +} + COUNTRY_SETTINGS = { "nl": { "api_url": "https://spark-prod-nl.gnp.cloud.ziggogo.tv", "mqtt_url": "obomsg.prod.nl.horizon.tv", - "use_oauth": False, + "use_refreshtoken": False, "channels": [ { "channelId": "NL_000073_019506", @@ -106,7 +113,7 @@ }, "be-nl-preprod": { "api_url": "https://spark-preprod-be.gnp.cloud.telenet.tv", - "use_oauth": True, + "use_refreshtoken": True, "oauth_username_fieldname": "j_username", "oauth_password_fieldname": "j_password", "oauth_add_accept_header": False, @@ -131,13 +138,13 @@ }, "ie": { "api_url": "https://spark-prod-ie.gnp.cloud.virginmediatv.ie", - "use_oauth": False, + "use_refreshtoken": False, "channels": [], "language": "en", }, "pl": { "api_url": "https://spark-prod-pl.gnp.cloud.upctv.pl", - "use_oauth": False, + "use_refreshtoken": False, "channels": [], "language": "pl", "platform_types": { diff --git a/lghorizon/exceptions.py b/lghorizon/exceptions.py index 740d341..012de05 100644 --- a/lghorizon/exceptions.py +++ b/lghorizon/exceptions.py @@ -6,12 +6,12 @@ class LGHorizonApiError(Exception): class LGHorizonApiConnectionError(LGHorizonApiError): - """Generic LGHorizon exception.""" + """Exception for connection-related errors with the LG Horizon API.""" class LGHorizonApiUnauthorizedError(Exception): - """Generic LGHorizon exception.""" + """Exception for unauthorized access to the LG Horizon API.""" class LGHorizonApiLockedError(LGHorizonApiUnauthorizedError): - """Generic LGHorizon exception.""" + """Exception for locked account errors with the LG Horizon API.""" diff --git a/lghorizon/helpers.py b/lghorizon/helpers.py index c7baab3..32f0f12 100644 --- a/lghorizon/helpers.py +++ b/lghorizon/helpers.py @@ -3,7 +3,7 @@ import random -def make_id(string_length=10): +async def make_id(string_length=10): """Create an id with given length.""" letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" return "".join(random.choice(letters) for i in range(string_length)) diff --git a/lghorizon/lghorizon_api.py b/lghorizon/lghorizon_api.py index 695ce03..31462d7 100644 --- a/lghorizon/lghorizon_api.py +++ b/lghorizon/lghorizon_api.py @@ -1,539 +1,300 @@ -"""Python client for LGHorizon.""" -# pylint: disable=broad-exception-caught -# pylint: disable=line-too-long +"""LG Horizon API client.""" import logging -import json -import re - -from typing import Any, Callable, Dict, List -import backoff - -from requests import Session, exceptions as request_exceptions - -from .exceptions import ( - LGHorizonApiUnauthorizedError, - LGHorizonApiConnectionError, - LGHorizonApiLockedError, -) - -from .models import ( - LGHorizonAuth, - LGHorizonBox, - LGHorizonMqttClient, - LGHorizonCustomer, - LGHorizonChannel, - LGHorizonReplayEvent, - LGHorizonRecordingSingle, - LGHorizonVod, - LGHorizonApp, - LGHorizonBaseRecording, - LGHorizonRecordingListSeasonShow, - LGHorizonRecordingEpisode, - LGHorizonRecordingShow, -) - -from .const import ( - COUNTRY_SETTINGS, - BOX_PLAY_STATE_BUFFER, - BOX_PLAY_STATE_CHANNEL, - BOX_PLAY_STATE_DVR, - BOX_PLAY_STATE_REPLAY, - BOX_PLAY_STATE_VOD, - RECORDING_TYPE_SINGLE, - RECORDING_TYPE_SEASON, - RECORDING_TYPE_SHOW, +from typing import Any, Dict, cast, Callable, Optional + +from .lghorizon_device import LGHorizonDevice +from .lghorizon_models import LGHorizonChannel +from .lghorizon_models import LGHorizonAuth +from .lghorizon_models import LGHorizonCustomer +from .lghorizon_mqtt_client import LGHorizonMqttClient +from .lghorizon_models import LGHorizonServicesConfig +from .lghorizon_models import LGHorizonEntitlements +from .lghorizon_models import LGHorizonProfile +from .lghorizon_models import LGHorizonMessageType +from .lghorizon_message_factory import LGHorizonMessageFactory +from .lghorizon_models import LGHorizonStatusMessage, LGHorizonUIStatusMessage +from .lghorizon_models import LGHorizonRunningState +from .lghorizon_models import ( + LGHorizonRecordingList, + LGHorizonRecordingQuota, + LGHorizonShowRecordingList, ) +from .lghorizon_recording_factory import LGHorizonRecordingFactory +from .lghorizon_device_state_processor import LGHorizonDeviceStateProcessor -_logger = logging.getLogger(__name__) -_supported_platforms = ["EOS", "EOS2", "HORIZON", "APOLLO"] +_LOGGER = logging.getLogger(__name__) class LGHorizonApi: - """Main class for handling connections with LGHorizon Settop boxes.""" - - _auth: LGHorizonAuth = None - _session: Session = None - settop_boxes: Dict[str, LGHorizonBox] = None - customer: LGHorizonCustomer = None - _mqtt_client: LGHorizonMqttClient = None - _channels: Dict[str, LGHorizonChannel] = None - _country_settings = None - _country_code: str = None - recording_capacity: int = None - _entitlements: List[str] = None - _identifier: str = None - _config: str = None - _refresh_callback: Callable = None - _profile_id: str = None - - def __init__( - self, - username: str, - password: str, - country_code: str = "nl", - identifier: str = None, - refresh_token=None, - profile_id=None, - ) -> None: - """Create LGHorizon API.""" - self.username = username - self.password = password - self.refresh_token = refresh_token - self._session = Session() - self._country_settings = COUNTRY_SETTINGS[country_code] - self._country_code = country_code - self._auth = LGHorizonAuth() - self.settop_boxes = {} - self._channels = {} - self._entitlements = [] - self._identifier = identifier + """LG Horizon API client.""" + + _mqtt_client: LGHorizonMqttClient | None + auth: LGHorizonAuth + _service_config: LGHorizonServicesConfig + _customer: LGHorizonCustomer + _channels: Dict[str, LGHorizonChannel] + _entitlements: LGHorizonEntitlements + _profile_id: str + _initialized: bool = False + _devices: Dict[str, LGHorizonDevice] = {} + _message_factory: LGHorizonMessageFactory = LGHorizonMessageFactory() + _device_state_processor: LGHorizonDeviceStateProcessor | None + _recording_factory: LGHorizonRecordingFactory = LGHorizonRecordingFactory() + + def __init__(self, auth: LGHorizonAuth, profile_id: str = "") -> None: + """Initialize LG Horizon API client.""" + """Initialize LG Horizon API client. + + Args: + auth: The authentication object for API requests. + profile_id: The ID of the user profile to use (optional). + """ + self.auth = auth self._profile_id = profile_id - - def _authorize(self) -> None: - ctry_code = self._country_code[0:2] - if ctry_code in ("gb", "ch", "be"): - self._authorize_with_refresh_token() - else: - self._authorize_default() - - def _authorize_default(self) -> None: - _logger.debug("Authorizing") - auth_url = f"{self._country_settings['api_url']}/auth-service/v1/authorization" - auth_headers = {"x-device-code": "web"} - auth_payload = {"password": self.password, "username": self.username} - try: - auth_response = self._session.post( - auth_url, headers=auth_headers, json=auth_payload - ) - except Exception as ex: - raise LGHorizonApiConnectionError("Unknown connection failure") from ex - - if not auth_response.ok: - error_json = auth_response.json() - error = error_json["error"] - if error and error["statusCode"] == 97401: - raise LGHorizonApiUnauthorizedError("Invalid credentials") - elif error and error["statusCode"] == 97117: - raise LGHorizonApiLockedError("Account locked") - elif error: - raise LGHorizonApiConnectionError(error["message"]) - else: - raise LGHorizonApiConnectionError("Unknown connection error") - - self._auth.fill(auth_response.json()) - _logger.debug("Authorization succeeded") - - def _authorize_with_refresh_token(self) -> None: - """Handle authorizzationg using request token.""" - _logger.debug("Authorizing via refresh") - refresh_url = ( - f"{self._country_settings['api_url']}/auth-service/v1/authorization/refresh" + self._channels = {} + self._device_state_processor = None + self._mqtt_client = None + self._initialized = False + + async def initialize(self) -> None: + """Initialize the API client.""" + self._service_config = await self.auth.get_service_config() + self._customer = await self._get_customer_info() + if self._profile_id == "": + self._profile_id = list(self._customer.profiles.keys())[0] + await self._refresh_entitlements() + await self._refresh_channels() + self._mqtt_client = await self._create_mqtt_client() + await self._mqtt_client.connect() + await self._register_devices() + self._device_state_processor = LGHorizonDeviceStateProcessor( + self.auth, self._channels, self._customer, self._profile_id ) - headers = {"content-type": "application/json", "charset": "utf-8"} - payload = '{"refreshToken":"' + self.refresh_token + '"}' - - try: - auth_response = self._session.post( - refresh_url, headers=headers, data=payload - ) - except Exception as ex: - raise LGHorizonApiConnectionError("Unknown connection failure") from ex - - if not auth_response.ok: - _logger.debug("response %s", auth_response) - error_json = auth_response.json() - error = None - if "error" in error_json: - error = error_json["error"] - if error and error["statusCode"] == 97401: - raise LGHorizonApiUnauthorizedError("Invalid credentials") - elif error: - raise LGHorizonApiConnectionError(error["message"]) - else: - raise LGHorizonApiConnectionError("Unknown connection error") - - self._auth.fill(auth_response.json()) - self.refresh_token = self._auth.refresh_token - self._session.cookies["ACCESSTOKEN"] = self._auth.access_token - - if self._refresh_callback: - self._refresh_callback() - - _logger.debug("Authorization succeeded") - - def set_callback(self, refresh_callback: Callable) -> None: - """Set the refresh callback.""" - self._refresh_callback = refresh_callback - - def _authorize_telenet(self): - """Authorize telenet users.""" - try: - login_session = Session() - # Step 1 - Get Authorization data - _logger.debug("Step 1 - Get Authorization data") - auth_url = ( - f"{self._country_settings['api_url']}/auth-service/v1/sso/authorization" - ) - auth_response = login_session.get(auth_url) - if not auth_response.ok: - raise LGHorizonApiConnectionError("Can't connect to authorization URL") - auth_response_json = auth_response.json() - authorization_uri = auth_response_json["authorizationUri"] - authorization_validity_token = auth_response_json["validityToken"] + self._initialized = True - # Step 2 - Get Authorization cookie - _logger.debug("Step 2 - Get Authorization cookie") + async def set_token_refresh_callback( + self, token_refresh_callback: Callable[str, None] + ) -> None: + """Set the token refresh callback.""" + self.auth.token_refresh_callback = token_refresh_callback - auth_cookie_response = login_session.get(authorization_uri) - if not auth_cookie_response.ok: - raise LGHorizonApiConnectionError("Can't connect to authorization URL") + async def get_devices(self) -> dict[str, LGHorizonDevice]: + """Get devices.""" + if not self._initialized: + raise RuntimeError("LGHorizonApi not initialized") - _logger.debug("Step 3 - Login") + return self._devices - username_fieldname = self._country_settings["oauth_username_fieldname"] - pasword_fieldname = self._country_settings["oauth_password_fieldname"] + async def get_profiles(self) -> dict[str, LGHorizonProfile]: + """Get profile IDs.""" + if not self._initialized: + raise RuntimeError("LGHorizonApi not initialized") - payload = { - username_fieldname: self.username, - pasword_fieldname: self.password, - "rememberme": "true", - } + return self._customer.profiles - login_response = login_session.post( - self._country_settings["oauth_url"], payload, allow_redirects=False + async def get_profile_channels( + self, profile_id: Optional[str] = None + ) -> Dict[str, LGHorizonChannel]: + """Returns channels to display baed on profile.""" + # Attempt to retrieve the profile by the given profile_id + if not profile_id: + profile_id = self._profile_id + profile = self._customer.profiles.get(profile_id) + + # If the specified profile is not found, and there are other profiles available, + # default to the first profile in the customer's list if available. + if not profile and self._customer.profiles: + _LOGGER.debug( + "Profile with ID '%s' not found. Defaulting to first available profile.", + profile_id, ) - if not login_response.ok: - raise LGHorizonApiConnectionError("Can't connect to authorization URL") - redirect_url = login_response.headers[ - self._country_settings["oauth_redirect_header"] - ] - - if self._identifier is not None: - redirect_url += f"&dtv_identifier={self._identifier}" - redirect_response = login_session.get(redirect_url, allow_redirects=False) - success_url = redirect_response.headers[ - self._country_settings["oauth_redirect_header"] - ] - code_matches = re.findall(r"code=(.*)&", success_url) - - authorization_code = code_matches[0] - - new_payload = { - "authorizationGrant": { - "authorizationCode": authorization_code, - "validityToken": authorization_validity_token, - } - } - headers = { - "content-type": "application/json", + profile = list(self._customer.profiles.values())[0] + + # If a profile is found and it has favorite channels, filter the main channels list. + if profile and profile.favorite_channels: + _LOGGER.debug("Returning favorite channels for profile '%s'.", profile.name) + # Use a set for faster lookup of favorite channel IDs + profile_channel_ids = set(profile.favorite_channels) + return { + channel.id: channel + for channel in self._channels.values() + if channel.id in profile_channel_ids } - post_result = login_session.post( - auth_url, json.dumps(new_payload), headers=headers + + # If no profile is found (even after defaulting) or the profile has no favorite channels, + # return all available channels. + _LOGGER.debug("No specific profile channels found, returning all channels.") + return self._channels + + async def _register_devices(self) -> None: + """Register devices.""" + _LOGGER.debug("Registering devices...") + self._devices = {} + channels = await self.get_profile_channels(self._profile_id) + for raw_box in self._customer.assigned_devices: + _LOGGER.debug("Creating box for device: %s", raw_box) + if self._device_state_processor is None: + self._device_state_processor = LGHorizonDeviceStateProcessor( + self.auth, self._channels, self._customer, self._profile_id + ) + device = LGHorizonDevice( + raw_box, + self._mqtt_client, + self._device_state_processor, + self.auth, + channels, ) - self._auth.fill(post_result.json()) - self._session.cookies["ACCESSTOKEN"] = self._auth.access_token - except Exception: - pass - - def _obtain_mqtt_token(self): - _logger.debug("Obtain mqtt token...") - mqtt_auth_url = self._config["authorizationService"]["URL"] - mqtt_response = self._do_api_call(f"{mqtt_auth_url}/v1/mqtt/token") - self._auth.mqttToken = mqtt_response["token"] - _logger.debug("MQTT token: %s", self._auth.mqttToken) - - @backoff.on_exception( - backoff.expo, - BaseException, - jitter=None, - max_tries=3, - logger=_logger, - giveup=lambda e: isinstance( - e, (LGHorizonApiLockedError, LGHorizonApiUnauthorizedError) - ), - ) - def connect(self) -> None: - """Start connection process.""" - self._config = self._get_config(self._country_code) - _logger.debug("Connect to API") - self._authorize() - self._obtain_mqtt_token() - self._mqtt_client = LGHorizonMqttClient( - self._auth, - self._config["mqttBroker"]["URL"], + self._devices[device.device_id] = device + + async def disconnect(self) -> None: + """Disconnect the client.""" + if self._mqtt_client: + await self._mqtt_client.disconnect() + self._initialized = False + + async def _create_mqtt_client(self) -> LGHorizonMqttClient: + """Create and configure the MQTT client. + + Returns: An initialized LGHorizonMqttClient instance. + """ + mqtt_client = await LGHorizonMqttClient.create( + self.auth, self._on_mqtt_connected, self._on_mqtt_message, ) + return mqtt_client + + async def _on_mqtt_connected(self): + """MQTT connected callback.""" + await self._mqtt_client.subscribe("#") + await self._mqtt_client.subscribe(self.auth.household_id) + # await self._mqtt_client.subscribe(self.auth.household_id + "/#") + # await self._mqtt_client.subscribe(self.auth.household_id + "/+/#") + await self._mqtt_client.subscribe( + self.auth.household_id + "/" + self._mqtt_client.client_id + ) + await self._mqtt_client.subscribe(self.auth.household_id + "/+/status") + await self._mqtt_client.subscribe( + self.auth.household_id + "/+/networkRecordings" + ) + await self._mqtt_client.subscribe( + self.auth.household_id + "/+/networkRecordings/capacity" + ) + await self._mqtt_client.subscribe(self.auth.household_id + "/+/localRecordings") + await self._mqtt_client.subscribe( + self.auth.household_id + "/+/localRecordings/capacity" + ) + await self._mqtt_client.subscribe(self.auth.household_id + "/watchlistService") + await self._mqtt_client.subscribe(self.auth.household_id + "/purchaseService") + await self._mqtt_client.subscribe( + self.auth.household_id + "/personalizationService" + ) + await self._mqtt_client.subscribe(self.auth.household_id + "/recordingStatus") + await self._mqtt_client.subscribe( + self.auth.household_id + "/recordingStatus/lastUserAction" + ) - self._register_customer_and_boxes() - self._mqtt_client.connect() - - def disconnect(self): - """Disconnect.""" - _logger.debug("Disconnect from API") - if not self._mqtt_client or not self._mqtt_client.is_connected: - return - self._mqtt_client.disconnect() - - def _on_mqtt_connected(self) -> None: - _logger.debug("Connected to MQTT server. Registering all boxes...") - box: LGHorizonBox - for box in self.settop_boxes.values(): - box.register_mqtt() - - def _on_mqtt_message(self, message: str, topic: str) -> None: - if "action" in message and message["action"] == "OPS.getProfilesUpdate": - self._update_customer() - elif "source" in message: - device_id = message["source"] - if not isinstance(device_id, str): - _logger.debug("ignoring message - not a string") - return - if device_id not in self.settop_boxes: - return - try: - if "deviceType" in message and message["deviceType"] == "STB": - self.settop_boxes[device_id].update_state(message) - if "status" in message: - self._handle_box_update(device_id, message) - - except Exception: - _logger.exception("Could not handle status message") - _logger.warning("Full message: %s", str(message)) - self.settop_boxes[device_id].playing_info.reset() - self.settop_boxes[device_id].playing_info.set_paused(False) - elif "CPE.capacity" in message: - splitted_topic = topic.split("/") - if len(splitted_topic) != 4: - return - device_id = splitted_topic[1] - if device_id not in self.settop_boxes: - return - self.settop_boxes[device_id].update_recording_capacity(message) - - def _handle_box_update(self, device_id: str, raw_message: Any) -> None: - status_payload = raw_message["status"] - if "uiStatus" not in status_payload: - return - ui_status = status_payload["uiStatus"] - if ui_status == "mainUI": - player_state = status_payload["playerState"] - if "sourceType" not in player_state or "source" not in player_state: - return - source_type = player_state["sourceType"] - state_source = player_state["source"] - self.settop_boxes[device_id].playing_info.set_paused( - player_state["speed"] == 0 - ) - if ( - source_type - in ( - BOX_PLAY_STATE_CHANNEL, - BOX_PLAY_STATE_BUFFER, - BOX_PLAY_STATE_REPLAY, - ) - and "eventId" in state_source - ): - event_id = state_source["eventId"] - raw_replay_event = self._do_api_call( - f"{self._config['linearService']['URL']}/v2/replayEvent/{event_id}?returnLinearContent=true&language={self._country_settings['language']}" - ) - replay_event = LGHorizonReplayEvent(raw_replay_event) - channel = self._channels[replay_event.channel_id] - self.settop_boxes[device_id].update_with_replay_event( - source_type, replay_event, channel - ) - elif source_type == BOX_PLAY_STATE_DVR: - recording_id = state_source["recordingId"] - session_start_time = state_source["sessionStartTime"] - session_end_time = state_source["sessionEndTime"] - last_speed_change_time = player_state["lastSpeedChangeTime"] - relative_position = player_state["relativePosition"] - raw_recording = self._do_api_call( - f"{self._config['recordingService']['URL']}/customers/{self._auth.household_id}/details/single/{recording_id}?profileId=4504e28d-c1cb-4284-810b-f5eaab06f034&language={self._country_settings['language']}" - ) - recording = LGHorizonRecordingSingle(raw_recording) - channel = self._channels[recording.channel_id] - self.settop_boxes[device_id].update_with_recording( - source_type, - recording, - channel, - session_start_time, - session_end_time, - last_speed_change_time, - relative_position, - ) - elif source_type == BOX_PLAY_STATE_VOD: - title_id = state_source["titleId"] - last_speed_change_time = player_state["lastSpeedChangeTime"] - relative_position = player_state["relativePosition"] - raw_vod = self._do_api_call( - f"{self._config['vodService']['URL']}/v2/detailscreen/{title_id}?language={self._country_settings['language']}&profileId=4504e28d-c1cb-4284-810b-f5eaab06f034&cityId={self.customer.city_id}" - ) - vod = LGHorizonVod(raw_vod) - self.settop_boxes[device_id].update_with_vod( - source_type, vod, last_speed_change_time, relative_position - ) - elif ui_status == "apps": - app = LGHorizonApp(status_payload["appsState"]) - self.settop_boxes[device_id].update_with_app("app", app) - - @backoff.on_exception( - backoff.expo, LGHorizonApiConnectionError, max_tries=3, logger=_logger - ) - def _do_api_call(self, url: str) -> str: - _logger.info("Executing API call to %s", url) - try: - api_response = self._session.get(url) - api_response.raise_for_status() - json_response = api_response.json() - except request_exceptions.HTTPError as http_ex: - self._authorize() - raise LGHorizonApiConnectionError( - f"Unable to call {url}. Error:{str(http_ex)}" - ) from http_ex - _logger.debug("Result API call: %s", json_response) - return json_response - - def _register_customer_and_boxes(self): - self._update_customer() - self._get_channels() - if len(self.customer.settop_boxes) == 0: - _logger.warning("No boxes found.") - return - _logger.info("Registering boxes") - for device in self.customer.settop_boxes: - platform_type = device["platformType"] - if platform_type not in _supported_platforms: - continue - if ( - "platform_types" in self._country_settings - and platform_type in self._country_settings["platform_types"] - ): - platform_type = self._country_settings["platform_types"][platform_type] - else: - platform_type = None - box = LGHorizonBox( - device, platform_type, self._mqtt_client, self._auth, self._channels - ) - self.settop_boxes[box.device_id] = box - _logger.info("Box %s registered...", box.device_id) - - def _update_customer(self): - _logger.info("Get customer data") - personalisation_result = self._do_api_call( - f"{self._config['personalizationService']['URL']}/v1/customer/{self._auth.household_id}?with=profiles%2Cdevices" + async def _on_mqtt_message(self, mqtt_message: dict, mqtt_topic: str): + """MQTT message callback.""" + message = await self._message_factory.create_message(mqtt_topic, mqtt_message) + match message.message_type: + case LGHorizonMessageType.STATUS: + message.__class__ = LGHorizonStatusMessage + status_message = cast(LGHorizonStatusMessage, message) + device = self._devices.get(status_message.source, None) + if not device: + return + await device.handle_status_message(status_message) + case LGHorizonMessageType.UI_STATUS: + message.__class__ = LGHorizonUIStatusMessage + ui_status_message = cast(LGHorizonUIStatusMessage, message) + device = self._devices.get(ui_status_message.source, None) + if not device: + return + if ( + not device.device_state.state + == LGHorizonRunningState.ONLINE_RUNNING + ): + return + await device.handle_ui_status_message(ui_status_message) + + async def _get_customer_info(self) -> LGHorizonCustomer: + service_url = await self._service_config.get_service_url( + "personalizationService" ) - _logger.debug("Personalisation result: %s ", personalisation_result) - self.customer = LGHorizonCustomer(personalisation_result) - - def _get_channels(self): - self._update_entitlements() - _logger.info("Retrieving channels...") - channels_result = self._do_api_call( - f"{self._config['linearService']['URL']}/v2/channels?cityId={self.customer.city_id}&language={self._country_settings['language']}&productClass=Orion-DASH" + result = await self.auth.request( + service_url, + f"/v1/customer/{self.auth.household_id}?with=profiles%2Cdevices", ) - for channel in channels_result: - if "isRadio" in channel and channel["isRadio"]: - continue + return LGHorizonCustomer(result) + + async def _refresh_entitlements(self) -> Any: + """Retrieve entitlements.""" + _LOGGER.debug("Retrieving entitlements...") + service_url = await self._service_config.get_service_url("purchaseService") + result = await self.auth.request( + service_url, + f"/v2/customers/{self.auth.household_id}/entitlements?enableDaypass=true", + ) + self._entitlements = LGHorizonEntitlements(result) + + async def _refresh_channels(self): + """Retrieve channels.""" + _LOGGER.debug("Retrieving channels...") + service_url = await self._service_config.get_service_url("linearService") + lang = await self._customer.get_profile_lang(self._profile_id) + channels_json = await self.auth.request( + service_url, + f"/v2/channels?cityId={self._customer.city_id}&language={lang}&productClass=Orion-DASH", + ) + for channel_json in channels_json: + channel = LGHorizonChannel(channel_json) common_entitlements = list( - set(self._entitlements) & set(channel["linearProducts"]) + set(self._entitlements.entitlement_ids) & set(channel.linear_products) ) + if len(common_entitlements) == 0: continue - channel_id = channel["id"] - self._channels[channel_id] = LGHorizonChannel(channel) - _logger.info("%s retrieved.", len(self._channels)) - def get_display_channels(self): - """Returns channels to display baed on profile.""" - all_channels = self._channels.values() - if not self._profile_id or self._profile_id not in self.customer.profiles: - return all_channels - profile_channel_ids = self.customer.profiles[self._profile_id].favorite_channels - if len(profile_channel_ids) == 0: - return all_channels - - return [ - channel for channel in all_channels if channel.id in profile_channel_ids - ] - - def _get_replay_event(self, listing_id) -> Any: - """Get listing.""" - _logger.info("Retrieving replay event details...") - response = self._do_api_call( - f"{self._config['linearService']['URL']}/v2/replayEvent/{listing_id}?returnLinearContent=true&language={self._country_settings['language']}" - ) - _logger.info("Replay event details retrieved") - return response - - def get_recording_capacity(self) -> int: - """Returns remaining recording capacity""" - ctry_code = self._country_code[0:2] - if ctry_code == "gb": - _logger.debug("GB: not supported") - return None - try: - _logger.info("Retrieving recordingcapacity...") - quota_content = self._do_api_call( - f"{self._config['recordingService']['URL']}/customers/{self._auth.household_id}/quota" - ) - if "quota" not in quota_content and "occupied" not in quota_content: - _logger.error("Unable to fetch recording capacity...") - return None - capacity = (quota_content["occupied"] / quota_content["quota"]) * 100 - self.recording_capacity = round(capacity) - _logger.debug("Remaining recordingcapacity %s %%", self.recording_capacity) - return self.recording_capacity - except Exception: - _logger.error("Unable to fetch recording capacity...") - return None - - def get_recordings(self) -> List[LGHorizonBaseRecording]: - """Returns recordings.""" - _logger.info("Retrieving recordings...") - recording_content = self._do_api_call( - f"{self._config['recordingService']['URL']}/customers/{self._auth.household_id}/recordings?sort=time&sortOrder=desc&language={self._country_settings['language']}" + self._channels[channel.id] = channel + + async def get_all_recordings(self) -> LGHorizonRecordingList: + """Retrieve all recordings.""" + _LOGGER.debug("Retrieving recordings...") + service_url = await self._service_config.get_service_url("recordingService") + lang = await self._customer.get_profile_lang(self._profile_id) + recordings_json = await self.auth.request( + service_url, + f"/customers/{self.auth.household_id}/recordings?isAdult=false&offset=0&limit=100&sort=time&sortOrder=desc&profileId={self._profile_id}&language={lang}", ) - recordings = [] - for recording_data_item in recording_content["data"]: - recording_type = recording_data_item["type"] - if recording_type == RECORDING_TYPE_SINGLE: - recordings.append(LGHorizonRecordingSingle(recording_data_item)) - elif recording_type in (RECORDING_TYPE_SEASON, RECORDING_TYPE_SHOW): - recordings.append(LGHorizonRecordingListSeasonShow(recording_data_item)) - _logger.info("%s recordings retrieved...", len(recordings)) + recordings = await self._recording_factory.create_recordings(recordings_json) return recordings - def get_recording_show(self, show_id: str) -> list[LGHorizonRecordingSingle]: - """Returns show recording""" - _logger.info("Retrieving show recordings...") - show_recording_content = self._do_api_call( - f"{self._config['recordingService']['URL']}/customers/{self._auth.household_id}/episodes/shows/{show_id}?source=recording&language=nl&sort=time&sortOrder=asc" + async def get_show_recordings( + self, show_id: str, channel_id: str + ) -> LGHorizonShowRecordingList: # type: ignore[valid-type] + """Retrieve all recordings.""" + _LOGGER.debug("Retrieving recordings fro show...") + service_url = await self._service_config.get_service_url("recordingService") + lang = await self._customer.get_profile_lang(self._profile_id) + episodes_json = await self.auth.request( + service_url, + f"/customers/{self.auth.household_id}/episodes/shows/{show_id}?source=recording&isAdult=false&offset=0&limit=100&profileId={self._profile_id}&language={lang}&channelId={channel_id}&sort=time&sortOrder=asc", ) - recordings = [] - for item in show_recording_content["data"]: - if item["source"] == "show": - recordings.append(LGHorizonRecordingShow(item)) - else: - recordings.append(LGHorizonRecordingEpisode(item)) - _logger.info("%s showrecordings retrieved...", len(recordings)) + recordings = await self._recording_factory.create_episodes(episodes_json) return recordings - def _update_entitlements(self) -> None: - _logger.info("Retrieving entitlements...") - entitlements_json = self._do_api_call( - f"{self._config['purchaseService']['URL']}/v2/customers/{self._auth.household_id}/entitlements?enableDaypass=true" + async def get_recording_quota(self) -> LGHorizonRecordingQuota: + """Refresh recording quota.""" + _LOGGER.debug("Refreshing recording quota...") + service_url = await self._service_config.get_service_url("recordingService") + quota_json = await self.auth.request( + service_url, + f"/customers/{self.auth.household_id}/quota", ) - self._entitlements.clear() - for entitlement in entitlements_json["entitlements"]: - self._entitlements.append(entitlement["id"]) - - def _get_config(self, country_code: str): - base_country_code = country_code[0:2] - config_url = f"{self._country_settings['api_url']}/{base_country_code}/en/config-service/conf/web/backoffice.json" - result = self._do_api_call(config_url) - _logger.debug(result) - return result + return LGHorizonRecordingQuota(quota_json) + + +__all__ = ["LGHorizonApi", "LGHorizonAuth"] diff --git a/lghorizon/lghorizon_device.py b/lghorizon/lghorizon_device.py new file mode 100644 index 0000000..2c20d7f --- /dev/null +++ b/lghorizon/lghorizon_device.py @@ -0,0 +1,409 @@ +"""LG Horizon Device.""" + +from __future__ import annotations +import asyncio +import json +import logging +from typing import Any, Callable, Coroutine, Dict, Optional +from .lghorizon_models import ( + LGHorizonRunningState, + LGHorizonStatusMessage, + LGHorizonUIStatusMessage, + LGHorizonDeviceState, + LGHorizonAuth, + LGHorizonChannel, +) + +from .exceptions import LGHorizonApiConnectionError +from .helpers import make_id +from .lghorizon_device_state_processor import LGHorizonDeviceStateProcessor +from .lghorizon_mqtt_client import LGHorizonMqttClient +from .const import ( + MEDIA_KEY_CHANNEL_DOWN, + MEDIA_KEY_CHANNEL_UP, + MEDIA_KEY_ENTER, + MEDIA_KEY_FAST_FORWARD, + MEDIA_KEY_PLAY_PAUSE, + MEDIA_KEY_POWER, + MEDIA_KEY_RECORD, + MEDIA_KEY_REWIND, + MEDIA_KEY_STOP, + ONLINE_RUNNING, + PLATFORM_TYPES, +) + +_LOGGER = logging.getLogger(__name__) + + +class LGHorizonDevice: + """The LG Horizon device (set-top box).""" + + _device_id: str + _hashed_cpe_id: str + _device_friendly_name: str + _platform_type: str + _device_state: LGHorizonDeviceState + _manufacturer: Optional[str] + _model: Optional[str] + _recording_capacity: Optional[int] + _device_state_processor: LGHorizonDeviceStateProcessor + _mqtt_client: LGHorizonMqttClient + _change_callback: Callable[[str], Coroutine[Any, Any, Any]] + _auth: LGHorizonAuth + _channels: Dict[str, LGHorizonChannel] + _last_ui_message_timestamp: int = 0 + + def __init__( + self, + device_json, + mqtt_client: LGHorizonMqttClient, + device_state_processor: LGHorizonDeviceStateProcessor, + auth: LGHorizonAuth, + channels: Dict[str, LGHorizonChannel], + ): + """Initialize the LG Horizon device.""" + self._device_id = device_json["deviceId"] + self._hashed_cpe_id = device_json["hashedCPEId"] + self._device_friendly_name = device_json["settings"]["deviceFriendlyName"] + self._platform_type = device_json.get("platformType") + self._mqtt_client = mqtt_client + self._auth = auth + self._channels = channels + self._device_state = LGHorizonDeviceState() # Initialize state + self._manufacturer = None + self._model = None + self._recording_capacity = None + self._device_state_processor = device_state_processor + + @property + def device_id(self) -> str: + """Return the device ID.""" + return self._device_id + + @property + def platform_type(self) -> str: + """Return the device ID.""" + return self._platform_type + + @property + def manufacturer(self) -> str: + """Return the manufacturer of the settop box.""" + platform_info = PLATFORM_TYPES.get(self._platform_type, dict()) + return platform_info.get("manufacturer", "unknown") + + @property + def model(self) -> str: + """Return the model of the settop box.""" + platform_info = PLATFORM_TYPES.get(self._platform_type, dict()) + return platform_info.get("model", "unknown") + + @property + def is_available(self) -> bool: + """Return the availability of the settop box.""" + return self._device_state.state in ( + LGHorizonRunningState.ONLINE_RUNNING, + LGHorizonRunningState.ONLINE_STANDBY, + ) + + @property + def hashed_cpe_id(self) -> str: + """Return the hashed CPE ID.""" + return self._hashed_cpe_id + + @property + def device_friendly_name(self) -> str: + """Return the device friendly name.""" + return self._device_friendly_name + + @property + def device_state(self) -> LGHorizonDeviceState: + """Return the current playing information.""" + return self._device_state + + @property + def recording_capacity(self) -> Optional[int]: + """Return the recording capacity used.""" + return self._recording_capacity + + @recording_capacity.setter + def recording_capacity(self, value: int) -> None: + """Set the recording capacity used.""" + self._recording_capacity = value + + @property + def last_ui_message_timestamp(self) -> int: + """Return the last ui message timestamp.""" + return self._last_ui_message_timestamp + + @last_ui_message_timestamp.setter + def last_ui_message_timestamp(self, value: int) -> None: + """Set the last ui message timestamp.""" + self._last_ui_message_timestamp = value + + async def update_channels(self, channels: Dict[str, LGHorizonChannel]): + """Update the channels list.""" + self._channels = channels + + async def register_mqtt(self) -> None: + """Register the mqtt connection.""" + topic = f"{self._auth.household_id}/{self._mqtt_client.client_id}/status" + payload = { + "source": self._mqtt_client.client_id, + "state": ONLINE_RUNNING, + "deviceType": "HGO", + } + await self._mqtt_client.publish_message(topic, json.dumps(payload)) + + async def set_callback( + self, change_callback: Callable[[str], Coroutine[Any, Any, Any]] + ) -> None: + """Set a callback function to be called when the device state changes. + + Args: + change_callback: An asynchronous callable that takes the device ID + as an argument. + """ + self._change_callback = change_callback + await self.register_mqtt() # type: ignore [assignment] # Callback can be None + + async def handle_status_message( + self, status_message: LGHorizonStatusMessage + ) -> None: + """Handle an incoming status message from the set-top box. + + Args: + status_message: The status message received from the device. + """ + old_running_state = self.device_state.state + new_running_state = status_message.running_state + if ( + old_running_state == new_running_state + ): # Access backing field for comparison + return + await self._device_state_processor.process_state( + self.device_state, status_message + ) # Use the setter + if self._device_state.state == LGHorizonRunningState.ONLINE_RUNNING: + await self._request_settop_box_state() + + await self._trigger_callback() + await self._request_settop_box_recording_capacity() + + async def handle_ui_status_message( + self, status_message: LGHorizonUIStatusMessage + ) -> None: + """Handle UI status message.""" + + await self._device_state_processor.process_ui_state( + self.device_state, status_message + ) + self.last_ui_message_timestamp = status_message.message_timestamp + await self._trigger_callback() + + async def update_recording_capacity(self, payload) -> None: + """Updates the recording capacity.""" + if "CPE.capacity" not in payload or "used" not in payload: + return + self.recording_capacity = payload["used"] # Use the setter + + async def _trigger_callback(self): + """Trigger the registered callback function. + + This method is called when the device's state changes and a callback is set. + """ + if self._change_callback is not None: + _LOGGER.debug("Callback called from box %s", self.device_id) + await self._change_callback(self.device_id) + + async def turn_on(self) -> None: + """Turn the settop box on.""" + + if self._device_state.state == LGHorizonRunningState.ONLINE_STANDBY: + await self.send_key_to_box(MEDIA_KEY_POWER) + + async def turn_off(self) -> None: + """Turn the settop box off.""" + if self._device_state.state == LGHorizonRunningState.ONLINE_RUNNING: + await self.send_key_to_box(MEDIA_KEY_POWER) + await self._device_state.reset() + + async def pause(self) -> None: + """Pause the given settopbox.""" + if ( + self._device_state.state == LGHorizonRunningState.ONLINE_RUNNING + and not self._device_state.paused + ): + await self.send_key_to_box(MEDIA_KEY_PLAY_PAUSE) + + async def play(self) -> None: + """Resume the settopbox.""" + if ( + self._device_state.state == LGHorizonRunningState.ONLINE_RUNNING + and self._device_state.paused + ): + await self.send_key_to_box(MEDIA_KEY_PLAY_PAUSE) + + async def stop(self) -> None: + """Stop the settopbox.""" + if self._device_state.state == LGHorizonRunningState.ONLINE_RUNNING: + await self.send_key_to_box(MEDIA_KEY_STOP) + + async def next_channel(self): + """Select the next channel for given settop box.""" + if self._device_state.state == LGHorizonRunningState.ONLINE_RUNNING: + await self.send_key_to_box(MEDIA_KEY_CHANNEL_UP) + + async def previous_channel(self) -> None: + """Select the previous channel for given settop box.""" + if self._device_state.state == LGHorizonRunningState.ONLINE_RUNNING: + await self.send_key_to_box(MEDIA_KEY_CHANNEL_DOWN) + + async def press_enter(self) -> None: + """Press enter on the settop box.""" + if self._device_state.state == LGHorizonRunningState.ONLINE_RUNNING: + await self.send_key_to_box(MEDIA_KEY_ENTER) + + async def rewind(self) -> None: + """Rewind the settop box.""" + if self._device_state.state == LGHorizonRunningState.ONLINE_RUNNING: + await self.send_key_to_box(MEDIA_KEY_REWIND) + + async def fast_forward(self) -> None: + """Fast forward the settop box.""" + if self._device_state.state == LGHorizonRunningState.ONLINE_RUNNING: + await self.send_key_to_box(MEDIA_KEY_FAST_FORWARD) + + async def record(self): + """Record on the settop box.""" + if self._device_state.state == LGHorizonRunningState.ONLINE_RUNNING: + await self.send_key_to_box(MEDIA_KEY_RECORD) + + async def set_player_position(self, position: int) -> None: + """Set the player position on the settop box.""" + payload = { + "source": self.device_id, + "type": "CPE.setPlayerPosition", + "runtimeType": "setPlayerposition", + "id": await make_id(), + "version": "1.3.11", + "status": {"relativePosition": position}, + } + payload_str = json.dumps(payload) + await self._mqtt_client.publish_message( + f"{self._auth.household_id}/{self.device_id}", payload_str + ) + + async def display_message(self, sourceType: str, message: str) -> None: + """Display a message on the set-top box and repeat it for longer visibility. + + # We sturen de payload 3 keer met een kortere tussentijd + + This method sends the message payload multiple times to ensure it stays + visible on the screen for a longer duration, as the display time for + such messages is typically short. + + Args: + sourceType: The type of source for the message (e.g., "linear"). + message: The message string to display. + """ + for i in range(3): + payload = { + "id": await make_id(8), + "type": "CPE.pushToTV", + "source": { + "clientId": self._mqtt_client.client_id, + "friendlyDeviceName": f"\n\n{message}", + }, + "status": { + "sourceType": sourceType, + "source": {"channelId": "1234"}, + "title": "Nieuwe melding", + "relativePosition": 0, + "speed": 1, + }, + } + + await self._mqtt_client.publish_message( + f"{self._auth.household_id}/{self.device_id}", json.dumps(payload) + ) + + # Omdat de melding 3 seconden blijft staan, wachten we 3 seconden + # voor de volgende 'refresh'. + if i < 2: + await asyncio.sleep(3) + + async def set_channel(self, source: str) -> None: + """Change te channel from the settopbox.""" + channel = [src for src in self._channels.values() if src.title == source][0] + payload = { + "id": await make_id(8), + "type": "CPE.pushToTV", + "source": { + "clientId": self._mqtt_client.client_id, + "friendlyDeviceName": "Home Assistant", + }, + "status": { + "sourceType": "linear", + "source": {"channelId": channel.id}, + "relativePosition": 0, + "speed": 1, + }, + } + + await self._mqtt_client.publish_message( + f"{self._auth.household_id}/{self.device_id}", json.dumps(payload) + ) + + async def play_recording(self, recording_id): + """Play recording.""" + payload = { + "id": await make_id(8), + "type": "CPE.pushToTV", + "source": { + "clientId": self._mqtt_client.client_id, + "friendlyDeviceName": "Home Assistant", + }, + "status": { + "sourceType": "nDVR", + "source": {"recordingId": recording_id}, + "relativePosition": 0, + }, + } + + await self._mqtt_client.publish_message( + f"{self._auth.household_id}/{self.device_id}", json.dumps(payload) + ) + + async def send_key_to_box(self, key: str) -> None: + """Send emulated (remote) key press to settopbox.""" + payload_dict = { + "type": "CPE.KeyEvent", + "runtimeType": "key", + "id": "ha", + "source": self.device_id.lower(), + "status": {"w3cKey": key, "eventType": "keyDownUp"}, + } + payload = json.dumps(payload_dict) + await self._mqtt_client.publish_message( + f"{self._auth.household_id}/{self.device_id}", payload + ) + + async def _request_settop_box_state(self) -> None: + """Send mqtt message to receive state from settop box.""" + topic = f"{self._auth.household_id}/{self.device_id}" + payload = { + "id": await make_id(8), + "type": "CPE.getUiStatus", + "source": self._mqtt_client.client_id, + } + await self._mqtt_client.publish_message(topic, json.dumps(payload)) + + async def _request_settop_box_recording_capacity(self) -> None: + """Send mqtt message to receive state from settop box.""" + topic = f"{self._auth.household_id}/{self.device_id}" + payload = { + "id": await make_id(8), + "type": "CPE.capacity", + "source": self._mqtt_client.client_id, + } + await self._mqtt_client.publish_message(topic, json.dumps(payload)) diff --git a/lghorizon/lghorizon_device_state_processor.py b/lghorizon/lghorizon_device_state_processor.py new file mode 100644 index 0000000..e9d9bc3 --- /dev/null +++ b/lghorizon/lghorizon_device_state_processor.py @@ -0,0 +1,365 @@ +"""LG Horizon device (set-top box) model.""" + +import random +import json +import urllib.parse +from datetime import datetime as dt, timezone + +import time + +from typing import cast, Dict, Optional + +from .lghorizon_models import LGHorizonDeviceState, LGHorizonRunningState +from .lghorizon_models import LGHorizonStatusMessage, LGHorizonUIStatusMessage +from .lghorizon_models import ( + LGHorizonSourceType, + LGHorizonLinearSource, + LGHorizonVODSource, + LGHorizonReplaySource, + LGHorizonNDVRSource, + LGHorizonReviewBufferSource, + LGHorizonRecordingSource, +) +from .lghorizon_models import LGHorizonAuth +from .lghorizon_models import LGHorizonReplayEvent, LGHorizonVOD, LGHorizonVODType + +from .lghorizon_models import LGHorizonRecordingSingle +from .lghorizon_models import LGHorizonChannel +from .lghorizon_models import ( + LGHorizonUIStateType, + LGHorizonAppsState, + LGHorizonPlayerState, +) +from .lghorizon_models import LGHorizonCustomer + + +class LGHorizonDeviceStateProcessor: + """Process incoming device state messages""" + + def __init__( + self, + auth: LGHorizonAuth, + channels: Dict[str, LGHorizonChannel], + customer: LGHorizonCustomer, + profile_id: str, + ): + self._auth = auth + self._channels = channels + self._customer = customer + self._profile_id = profile_id + + async def process_state( + self, device_state: LGHorizonDeviceState, status_message: LGHorizonStatusMessage + ) -> None: + """Process the device state based on the status message.""" + await device_state.reset() + device_state.state = status_message.running_state + + async def process_ui_state( + self, + device_state: LGHorizonDeviceState, + ui_status_message: LGHorizonUIStatusMessage, + ) -> None: + """Process the device state based on the UI status message.""" + await device_state.reset() + if ( + ui_status_message.ui_state is None + or device_state.state == LGHorizonRunningState.ONLINE_STANDBY + ): + await device_state.reset() + return + + if ui_status_message.ui_state is None: + return + match ui_status_message.ui_state.ui_status: + case LGHorizonUIStateType.MAINUI: + if ui_status_message.ui_state.player_state is None: + return + await self._process_main_ui_state( + device_state, ui_status_message.ui_state.player_state + ) + case LGHorizonUIStateType.APPS: + if ui_status_message.ui_state.apps_state is None: + return + await self._process_apps_state( + device_state, ui_status_message.ui_state.apps_state + ) + + if ui_status_message.ui_state.ui_status == LGHorizonUIStateType.APPS: + return + + if ui_status_message.ui_state.player_state is None: + return + + async def _process_main_ui_state( + self, + device_state: LGHorizonDeviceState, + player_state: LGHorizonPlayerState, + ) -> None: + if player_state is None: + return + await device_state.reset() + device_state.source_type = player_state.source_type + device_state.ui_state_type = LGHorizonUIStateType.MAINUI + device_state.speed = player_state.speed + + match player_state.source_type: + case LGHorizonSourceType.LINEAR: + await self._process_linear_state(device_state, player_state) + case LGHorizonSourceType.REVIEWBUFFER: + await self._process_reviewbuffer_state(device_state, player_state) + case LGHorizonSourceType.REPLAY: + await self._process_replay_state(device_state, player_state) + case LGHorizonSourceType.VOD: + await self._process_vod_state(device_state, player_state) + case LGHorizonSourceType.NDVR: + await self._process_ndvr_state(device_state, player_state) + + async def _process_apps_state( + self, + device_state: LGHorizonDeviceState, + apps_state: LGHorizonAppsState, + ) -> None: + device_state.id = apps_state.id + device_state.show_title = apps_state.app_name + device_state.image = apps_state.logo_path + device_state.ui_state_type = LGHorizonUIStateType.APPS + + async def _process_linear_state( + self, + device_state: LGHorizonDeviceState, + player_state: LGHorizonPlayerState, + ) -> None: + """Process the device state based on the UI status message.""" + if player_state.source is None: + return + player_state.source.__class__ = LGHorizonLinearSource + source = cast(LGHorizonLinearSource, player_state.source) + service_config = await self._auth.get_service_config() + service_url = await service_config.get_service_url("linearService") + lang = await self._customer.get_profile_lang(self._profile_id) + service_path = f"/v2/replayEvent/{source.event_id}?returnLinearContent=true&language={lang}" + + event_json = await self._auth.request( + service_url, + service_path, + ) + replay_event = LGHorizonReplayEvent(event_json) + device_state.id = replay_event.event_id + channel = self._channels[replay_event.channel_id] + device_state.source_type = source.source_type + device_state.channel_id = channel.id + device_state.channel_name = channel.title + device_state.episode_title = replay_event.episode_name + device_state.season_number = replay_event.season_number + device_state.episode_number = replay_event.episode_number + device_state.show_title = replay_event.title + now_in_ms = int(time.time() * 1000) + + device_state.last_position_update = int(time.time() * 1000) + device_state.start_time = replay_event.start_time + device_state.end_time = replay_event.end_time + device_state.duration = replay_event.end_time - replay_event.start_time + device_state.position = now_in_ms - int(replay_event.start_time * 1000) + + # Add random number to url to force refresh + join_param = "?" + if join_param in channel.stream_image: + join_param = "&" + image_url = ( + f"{channel.stream_image}{join_param}{str(random.randrange(1000000))}" + ) + device_state.image = image_url + + async def _process_reviewbuffer_state( + self, + device_state: LGHorizonDeviceState, + player_state: LGHorizonPlayerState, + ) -> None: + """Process the device state based on the UI status message.""" + if player_state.source is None: + return + player_state.source.__class__ = LGHorizonReviewBufferSource + source = cast(LGHorizonReviewBufferSource, player_state.source) + service_config = await self._auth.get_service_config() + service_url = await service_config.get_service_url("linearService") + lang = await self._customer.get_profile_lang(self._profile_id) + service_path = f"/v2/replayEvent/{source.event_id}?returnLinearContent=true&language={lang}" + + event_json = await self._auth.request( + service_url, + service_path, + ) + replay_event = LGHorizonReplayEvent(event_json) + device_state.id = replay_event.event_id + channel = self._channels[replay_event.channel_id] + device_state.source_type = source.source_type + device_state.channel_id = channel.id + device_state.channel_name = channel.title + device_state.episode_title = replay_event.episode_name + device_state.season_number = replay_event.season_number + device_state.episode_number = replay_event.episode_number + device_state.show_title = replay_event.title + device_state.last_position_update = player_state.last_speed_change_time + device_state.position = player_state.relative_position + device_state.start_time = replay_event.start_time + device_state.end_time = replay_event.end_time + device_state.duration = replay_event.end_time - replay_event.start_time + # Add random number to url to force refresh + join_param = "?" + if join_param in channel.stream_image: + join_param = "&" + image_url = ( + f"{channel.stream_image}{join_param}{str(random.randrange(1000000))}" + ) + device_state.image = image_url + + async def _process_replay_state( + self, + device_state: LGHorizonDeviceState, + player_state: LGHorizonPlayerState, + ) -> None: + """Process the device state based on the UI status message.""" + if player_state.source is None: + return + player_state.source.__class__ = LGHorizonReplaySource + source = cast(LGHorizonReplaySource, player_state.source) + service_config = await self._auth.get_service_config() + service_url = await service_config.get_service_url("linearService") + lang = await self._customer.get_profile_lang(self._profile_id) + service_path = f"/v2/replayEvent/{source.event_id}?returnLinearContent=true&language={lang}" + + event_json = await self._auth.request( + service_url, + service_path, + ) + replay_event = LGHorizonReplayEvent(event_json) + device_state.id = replay_event.event_id + # Iets met buffer doen + channel = self._channels[replay_event.channel_id] + padding = channel.replay_pre_padding + channel.replay_post_padding + device_state.source_type = source.source_type + device_state.channel_id = channel.id + device_state.episode_title = replay_event.episode_name + device_state.season_number = replay_event.season_number + device_state.episode_number = replay_event.episode_number + device_state.show_title = replay_event.title + device_state.last_position_update = int(time.time() * 1000) + device_state.start_time = replay_event.start_time + device_state.end_time = replay_event.end_time + device_state.duration = ( + replay_event.end_time - replay_event.start_time + padding + ) + device_state.position = ( + player_state.relative_position + channel.replay_pre_padding + ) + # Add random number to url to force refresh + device_state.image = await self._get_intent_image_url(replay_event.event_id) + + async def _process_vod_state( + self, + device_state: LGHorizonDeviceState, + player_state: LGHorizonPlayerState, + ) -> None: + """Process the device state based on the UI status message.""" + if player_state.source is None: + return + player_state.source.__class__ = LGHorizonVODSource + source = cast(LGHorizonVODSource, player_state.source) + service_config = await self._auth.get_service_config() + service_url = await service_config.get_service_url("vodService") + lang = await self._customer.get_profile_lang(self._profile_id) + service_path = f"/v2/detailscreen/{source.title_id}?language={lang}&profileId={self._profile_id}&cityId={self._customer.city_id}" + + vod_json = await self._auth.request( + service_url, + service_path, + ) + vod = LGHorizonVOD(vod_json) + device_state.id = vod.id + if vod.vod_type == LGHorizonVODType.EPISODE: + device_state.show_title = vod.series_title + device_state.episode_title = vod.title + device_state.season_number = vod.season + device_state.episode_number = vod.episode + else: + device_state.show_title = vod.title + + device_state.duration = vod.duration + device_state.last_position_update = int(time.time() * 1000) + device_state.position = player_state.relative_position + + device_state.image = await self._get_intent_image_url(vod.id) + + async def _process_ndvr_state( + self, device_state: LGHorizonDeviceState, player_state: LGHorizonPlayerState + ) -> None: + """Process the device state based on the UI status message.""" + if player_state.source is None: + return + player_state.source.__class__ = LGHorizonNDVRSource + source = cast(LGHorizonNDVRSource, player_state.source) + service_config = await self._auth.get_service_config() + service_url = await service_config.get_service_url("recordingService") + lang = await self._customer.get_profile_lang(self._profile_id) + service_path = f"/customers/{self._customer.customer_id}/details/single/{source.recording_id}?profileId={self._profile_id}&language={lang}" + recording_json = await self._auth.request( + service_url, + service_path, + ) + recording = LGHorizonRecordingSingle(recording_json) + device_state.id = recording.id + device_state.channel_id = recording.channel_id + if recording.channel_id: + channel = self._channels[recording.channel_id] + device_state.channel_name = channel.title + + device_state.episode_title = recording.episode_title + device_state.season_number = recording.season_number + device_state.episode_number = recording.episode_number + device_state.last_position_update = player_state.last_speed_change_time + device_state.position = player_state.relative_position + if recording.start_time: + device_state.start_time = int( + dt.fromisoformat( + recording.start_time.replace("Z", "+00:00") + ).timestamp() + ) + if recording.end_time: + device_state.end_time = int( + dt.fromisoformat(recording.end_time.replace("Z", "+00:00")).timestamp() + ) + if recording.start_time and recording.end_time: + device_state.duration = device_state.end_time - device_state.start_time + if recording.source == LGHorizonRecordingSource.SHOW: + device_state.show_title = recording.title + else: + device_state.show_title = recording.show_title + + device_state.image = await self._get_intent_image_url(recording.id) + + async def _get_intent_image_url(self, intent_id: str) -> Optional[str]: + """Get intent image url.""" + service_config = await self._auth.get_service_config() + intents_url = await service_config.get_service_url("imageService") + intents_path = "/intent" + body_json = [ + { + "id": intent_id, + "intents": ["detailedBackground", "posterTile"], + } + ] + intents_body = urllib.parse.quote( + json.dumps(body_json, separators=(",", ":"), indent=None), safe="~" + ) + + # Construct the full path with the URL-encoded JSON as a query parameter + full_intents_path = f"{intents_path}?jsonBody={intents_body}" + intents_result = await self._auth.request(intents_url, full_intents_path) + if ( + "intents" in intents_result[0] + and len(intents_result[0]["intents"]) > 0 + and intents_result[0]["intents"][0]["url"] + ): + return intents_result[0]["intents"][0]["url"] + return None diff --git a/lghorizon/lghorizon_message_factory.py b/lghorizon/lghorizon_message_factory.py new file mode 100644 index 0000000..007995d --- /dev/null +++ b/lghorizon/lghorizon_message_factory.py @@ -0,0 +1,38 @@ +"LG Horizon Message Factory." + +from .lghorizon_models import ( + LGHorizonMessage, + LGHorizonStatusMessage, + LGHorizonUnknownMessage, + LGHorizonUIStatusMessage, + LGHorizonMessageType, # Import LGHorizonMessageType from here +) + + +class LGHorizonMessageFactory: + """Handle incoming MQTT messages for LG Horizon devices.""" + + def __init__(self): + """Initialize the LG Horizon Message Factory.""" + + async def create_message(self, topic: str, payload: dict) -> LGHorizonMessage: + """Create an LG Horizon message based on the topic and payload.""" + message_type = await self._get_message_type(topic, payload) + match message_type: + case LGHorizonMessageType.STATUS: + return LGHorizonStatusMessage(payload, topic) + case LGHorizonMessageType.UI_STATUS: + return LGHorizonUIStatusMessage(payload, topic) + case LGHorizonMessageType.UNKNOWN: + return LGHorizonUnknownMessage(payload, topic) + + async def _get_message_type( + self, topic: str, payload: dict + ) -> LGHorizonMessageType: + """Determine the message type based on topic and payload.""" + if "status" in topic: + return LGHorizonMessageType.STATUS + if "type" in payload: + if payload["type"] == "CPE.uiStatus": + return LGHorizonMessageType.UI_STATUS + return LGHorizonMessageType.UNKNOWN diff --git a/lghorizon/lghorizon_models.py b/lghorizon/lghorizon_models.py new file mode 100644 index 0000000..4c85ac1 --- /dev/null +++ b/lghorizon/lghorizon_models.py @@ -0,0 +1,1512 @@ +"""LG Horizon Model.""" + +from __future__ import annotations + +import json +import logging +import time +from abc import ABC, abstractmethod +from datetime import datetime +from enum import Enum +from typing import Any, Dict, List, Optional, Callable + +import backoff +from aiohttp import ClientResponseError, ClientSession + +from .const import ( + COUNTRY_SETTINGS, +) +from .exceptions import LGHorizonApiConnectionError, LGHorizonApiUnauthorizedError + + +_LOGGER = logging.getLogger(__name__) + + +class LGHorizonRunningState(Enum): + """Running state of horizon box.""" + + UNKNOWN = "UNKNOWN" + ONLINE_RUNNING = "ONLINE_RUNNING" + ONLINE_STANDBY = "ONLINE_STANDBY" + + +class LGHorizonMessageType(Enum): + """Enumeration of LG Horizon message types.""" + + UNKNOWN = 0 + STATUS = 1 + UI_STATUS = 2 + + +class LGHorizonRecordingSource(Enum): + """LGHorizon recording.""" + + SHOW = "show" + UNKNOWN = "unknown" + + +class LGHorizonRecordingState(Enum): + """Enumeration of LG Horizon recording states.""" + + RECORDED = "recorded" + UNKNOWN = "unknown" + + +class LGHorizonRecordingType(Enum): # type: ignore[no-redef] + """Enumeration of LG Horizon recording states.""" + + SINGLE = "single" + SEASON = "season" + SHOW = "show" + UNKNOWN = "unknown" + + +class LGHorizonUIStateType(Enum): + """Enumeration of LG Horizon UI State types.""" + + MAINUI = "mainUI" + APPS = "apps" + UNKNOWN = "unknown" + + +class LGHorizonMessage(ABC): + """Abstract base class for LG Horizon messages.""" + + @property + def topic(self) -> str: + """Return the topic of the message.""" + return self._topic + + @property + def payload(self) -> dict: + """Return the payload of the message.""" + return self._payload + + @property + @abstractmethod + def message_type(self) -> LGHorizonMessageType | None: + """Return the message type.""" + + @abstractmethod + def __init__(self, topic: str, payload: dict) -> None: + """Abstract base class for LG Horizon messages.""" + self._topic = topic + """Initialize the abstract base class for LG Horizon messages. + + Args: + topic: The MQTT topic of the message. + payload: The dictionary payload of the message. + """ + self._payload = payload + + def __repr__(self) -> str: + """Return a string representation of the message.""" + return f"LGHorizonStatusMessage(topic='{self._topic}', payload={json.dumps(self._payload, indent=2)})" + + +class LGHorizonStatusMessage(LGHorizonMessage): + """Represents an LG Horizon status message received via MQTT.""" + + def __init__(self, payload: dict, topic: str) -> None: + """Initialize an LG Horizon status message.""" + super().__init__(topic, payload) + + @property + def message_type(self) -> LGHorizonMessageType: + """Return the message type from the payload, if available.""" + return LGHorizonMessageType.STATUS + + @property + def source(self) -> str: + """Return the device ID from the payload, if available.""" + return self._payload.get("source", "unknown") + + @property + def running_state(self) -> LGHorizonRunningState: + """Return the device ID from the payload, if available.""" + return LGHorizonRunningState[self._payload.get("state", "unknown").upper()] + + +class LGHorizonSourceType(Enum): + """Enumeration of LG Horizon source types.""" + + LINEAR = "linear" + REVIEWBUFFER = "reviewBuffer" + NDVR = "nDVR" + REPLAY = "replay" + VOD = "VOD" + UNKNOWN = "unknown" + + +class LGHorizonSource(ABC): + """Abstract base class for LG Horizon sources.""" + + def __init__(self, raw_json: dict) -> None: + """Initialize the LG Horizon source.""" + self._raw_json = raw_json + + @property + @abstractmethod + def source_type(self) -> LGHorizonSourceType: # type: ignore[no-redef] + """Return the message type.""" + + +class LGHorizonLinearSource(LGHorizonSource): + """Represent the Linear Source of an LG Horizon device.""" + + @property + def channel_id(self) -> str: + """Return the source type.""" + return self._raw_json.get("channelId", "") + + @property + def event_id(self) -> str: + """Return the event ID.""" + return self._raw_json.get("eventId", "") + + @property + def source_type(self) -> LGHorizonSourceType: + return LGHorizonSourceType.LINEAR + + +class LGHorizonReviewBufferSource(LGHorizonSource): + """Represent the ReviewBuffer Source of an LG Horizon device.""" + + @property + def channel_id(self) -> str: + """Return the source type.""" + return self._raw_json.get("channelId", "") + + @property + def event_id(self) -> str: + """Return the event ID.""" + return self._raw_json.get("eventId", "") + + @property + def source_type(self) -> LGHorizonSourceType: + return LGHorizonSourceType.REVIEWBUFFER + + +class LGHorizonNDVRSource(LGHorizonSource): + """Represent the Network Digital Video Recorder (NDVR) Source of an LG Horizon device.""" + + @property + def recording_id(self) -> str: + """Return the recording ID.""" + return self._raw_json.get("recordingId", "") + + @property + def channel_id(self) -> str: + """Return the channel ID.""" + return self._raw_json.get("channelId", "") + + @property + def source_type(self) -> LGHorizonSourceType: + return LGHorizonSourceType.NDVR + + +class LGHorizonVODSource(LGHorizonSource): + """Represent the VOD Source of an LG Horizon device.""" + + @property + def title_id(self) -> str: + """Return the title ID.""" + return self._raw_json.get("titleId", "") + + @property + def start_intro_time(self) -> int: + """Return the start intro time.""" + return self._raw_json.get("startIntroTime", 0) + + @property + def end_intro_time(self) -> int: + """Return the end intro time.""" + return self._raw_json.get("endIntroTime", 0) + + @property + def source_type(self) -> LGHorizonSourceType: + return LGHorizonSourceType.VOD + + +class LGHorizonReplaySource(LGHorizonSource): + """Represent the Replay Source of an LG Horizon device.""" + + @property + def event_id(self) -> str: + """Return the title ID.""" + return self._raw_json.get("eventId", "") + + @property + def source_type(self) -> LGHorizonSourceType: + """Return the source type.""" + return LGHorizonSourceType.REPLAY + + +class LGHorizonUnknownSource(LGHorizonSource): + """Represent an unknown source type of an LG Horizon device.""" + + @property + def source_type(self) -> LGHorizonSourceType: + return LGHorizonSourceType.UNKNOWN + + +class LGHorizonPlayerState: + """Represent the Player State of an LG Horizon device.""" + + def __init__(self, raw_json: dict) -> None: + """Initialize the Player State.""" + self._raw_json = raw_json + + @property + def source_type(self) -> LGHorizonSourceType: + """Return the source type.""" + return LGHorizonSourceType[self._raw_json.get("sourceType", "unknown").upper()] + + @property + def speed(self) -> int: + """Return the Player State dictionary.""" + return self._raw_json.get("speed", 0) + + @property + def last_speed_change_time( + self, + ) -> int: + """Return the last speed change time.""" + return self._raw_json.get("lastSpeedChangeTime", 0.0) + + @property + def relative_position( + self, + ) -> int: + """Return the last speed change time.""" + return self._raw_json.get("relativePosition", 0.0) + + @property + def source(self) -> LGHorizonSource | None: # Added None to the return type + """Return the last speed change time.""" + if "source" in self._raw_json: + match self.source_type: + case LGHorizonSourceType.LINEAR: + return LGHorizonLinearSource(self._raw_json["source"]) + case LGHorizonSourceType.VOD: + return LGHorizonVODSource(self._raw_json["source"]) + case LGHorizonSourceType.REPLAY: + return LGHorizonReplaySource(self._raw_json["source"]) + case LGHorizonSourceType.NDVR: + return LGHorizonNDVRSource(self._raw_json["source"]) + case LGHorizonSourceType.REVIEWBUFFER: + return LGHorizonReviewBufferSource(self._raw_json["source"]) + + return LGHorizonUnknownSource(self._raw_json["source"]) + + +class LGHorizonAppsState: + """Represent the Apps State of an LG Horizon device.""" + + def __init__(self, raw_json: dict) -> None: + """Initialize the Apps state.""" + self._raw_json = raw_json + + @property + def id(self) -> str: + """Return the id.""" + return self._raw_json.get("id", "") + + @property + def app_name(self) -> str: + """Return the app name.""" + return self._raw_json.get("appName", "") + + @property + def logo_path(self) -> str: + """Return the logo path.""" + return self._raw_json.get("logoPath", "") + + +class LGHorizonUIState: + """Represent the UI State of an LG Horizon device.""" + + _player_state: LGHorizonPlayerState | None = None + _apps_state: LGHorizonAppsState | None = None + + def __init__(self, raw_json: dict) -> None: + """Initialize the State.""" + self._raw_json = raw_json + """Initialize the UI State. + + Args: + raw_json: The raw JSON dictionary containing UI state information. + """ + + @property + def ui_status(self) -> LGHorizonUIStateType: + """Return the UI status dictionary.""" + return LGHorizonUIStateType[self._raw_json.get("uiStatus", "unknown").upper()] + + @property + def player_state( + self, + ) -> LGHorizonPlayerState | None: # Added None to the return type + """Return the UI status dictionary.""" + # Check if _player_state is None and if "playerState" key exists in raw_json + if self._player_state is None and "playerState" in self._raw_json: + self._player_state = LGHorizonPlayerState( + self._raw_json["playerState"] + ) # Access directly as existence is checked + return self._player_state + + @property + def apps_state( + self, + ) -> LGHorizonAppsState | None: # Added None to the return type + """Return the UI status dictionary.""" + # Check if _player_state is None and if "playerState" key exists in raw_json + if self._apps_state is None and "appsState" in self._raw_json: + self._apps_state = LGHorizonAppsState( + self._raw_json["appsState"] + ) # Access directly as existence is checked + return self._apps_state + + +class LGHorizonUIStatusMessage(LGHorizonMessage): + """Represents an LG Horizon UI status message received via MQTT.""" + + _status: LGHorizonUIState | None = None + + def __init__(self, payload: dict, topic: str) -> None: + """Initialize an LG Horizon UI status message.""" + super().__init__(topic, payload) + + @property + def message_type(self) -> LGHorizonMessageType: + """Return the message type from the payload, if available.""" + return LGHorizonMessageType.UI_STATUS + + @property + def source(self) -> str: + """Return the device ID from the payload, if available.""" + return self._payload.get("source", "unknown") + + @property + def message_timestamp(self) -> int: + """Return the device ID from the payload, if available.""" + return self._payload.get("messageTimeStamp", 0) + + @property + def ui_state(self) -> LGHorizonUIState | None: + """Return the device ID from the payload, if available.""" + if not self._status and "status" in self._payload: + self._status = LGHorizonUIState(self._payload["status"]) + return self._status + + +class LGHorizonUnknownMessage(LGHorizonMessage): + """Represents an unknown LG Horizon message received via MQTT.""" + + def __init__(self, payload: dict, topic: str) -> None: + """Initialize an LG Horizon unknown message.""" + super().__init__(topic, payload) + + @property + def message_type(self) -> LGHorizonMessageType: + """Return the message type from the payload, if available.""" + return LGHorizonMessageType.UNKNOWN + + +class LGHorizonProfileOptions: + """LGHorizon profile options.""" + + def __init__(self, options_payload: dict): + """Initialize a profile options.""" + self._options_payload = options_payload + + @property + def lang(self) -> str: + """Return the language.""" + return self._options_payload["lang"] + + +class LGHorizonProfile: + """LGHorizon profile.""" + + _options: LGHorizonProfileOptions + _profile_payload: dict + + def __init__(self, profile_payload: dict): + """Initialize a profile.""" + self._profile_payload = profile_payload + self._options = LGHorizonProfileOptions(self._profile_payload["options"]) + + @property + def id(self) -> str: + """Return the profile id.""" + return self._profile_payload["profileId"] + + @property + def name(self) -> str: + """Return the profile name.""" + return self._profile_payload["name"] + + @property + def favorite_channels(self) -> list[str]: + """Return the favorite channels.""" + return self._profile_payload.get("favoriteChannels", []) + + @property + def options(self) -> LGHorizonProfileOptions: + """Return the profile options.""" + return self._options + + +class LGHorizonAuth: + """Class to make authenticated requests.""" + + _websession: ClientSession + _refresh_token: str + _access_token: Optional[str] + _username: str + _password: str + _household_id: str + _token_expiry: Optional[int] + _country_code: str + _host: str + _use_refresh_token: bool + _token_refresh_callback: Callable[str, None] | None # pyright: ignore[reportInvalidTypeForm] + + def __init__( + self, + websession: ClientSession, + country_code: str, + refresh_token: str = "", + username: str = "", + password: str = "", + ) -> None: + """Initialize the auth with refresh token.""" + self._websession = websession + self._refresh_token = refresh_token + self._access_token = None + self._username = username + self._password = password + self._household_id = "" + self._token_expiry = None + self._country_code = country_code + self._host = COUNTRY_SETTINGS[country_code]["api_url"] + self._use_refresh_token = COUNTRY_SETTINGS[country_code]["use_refreshtoken"] + self._service_config = None + self._token_refresh_callback = None + + @property + def websession(self) -> ClientSession: + """Return the aiohttp client session.""" + return self._websession + + @property + def refresh_token(self) -> str: + """Return the refresh token.""" + return self._refresh_token + + @refresh_token.setter + def refresh_token(self, value: str) -> None: + """Set the refresh token.""" + self._refresh_token = value + + @property + def access_token(self) -> Optional[str]: + """Return the access token.""" + return self._access_token + + @access_token.setter + def access_token(self, value: Optional[str]) -> None: + """Set the access token.""" + self._access_token = value + + @property + def username(self) -> str: + """Return the username.""" + return self._username + + @username.setter + def username(self, value: str) -> None: + """Set the username.""" + self._username = value + + @property + def password(self) -> str: + """Return the password.""" + return self._password + + @password.setter + def password(self, value: str) -> None: + """Set the password.""" + self._password = value + + @property + def household_id(self) -> str: + """Return the household ID.""" + return self._household_id + + @household_id.setter + def household_id(self, value: str) -> None: + """Set the household ID.""" + self._household_id = value + + @property + def token_expiry(self) -> Optional[int]: + """Return the token expiry timestamp.""" + return self._token_expiry + + @token_expiry.setter + def token_expiry(self, value: Optional[int]) -> None: + """Set the token expiry timestamp.""" + self._token_expiry = value + + @property + def country_code(self) -> str: + """Return the country code.""" + return self._country_code + + async def is_token_expiring(self) -> bool: + """Check if the token is expiring within one day.""" + if not self.access_token or not self.token_expiry: + return True + current_unix_time = int(time.time()) + return current_unix_time >= (self.token_expiry - 86400) + + async def fetch_access_token(self) -> None: + """Fetch the access token.""" + _LOGGER.debug("Fetching access token") + headers = dict() + headers["content-type"] = "application/json" + headers["charset"] = "utf-8" + + if not self._use_refresh_token and self.access_token is None: + payload = {"password": self.password, "username": self.username} + headers["x-device-code"] = "web" + auth_url_path = "/auth-service/v1/authorization" + else: + payload = {"refreshToken": self.refresh_token} + auth_url_path = "/auth-service/v1/authorization/refresh" + try: # Use properties and backing fields + auth_response = await self.websession.post( + f"{self._host}{auth_url_path}", + json=payload, + headers=headers, + ) + except Exception as ex: + raise LGHorizonApiConnectionError from ex + auth_json = await auth_response.json() + if not auth_response.ok: + error = None + if "error" in auth_json: + error = auth_json["error"] + if error and error["statusCode"] == 97401: + raise LGHorizonApiUnauthorizedError("Invalid credentials") + elif error: + raise LGHorizonApiConnectionError(error["message"]) + else: + raise LGHorizonApiConnectionError("Unknown connection error") + + self.household_id = auth_json["householdId"] + self.access_token = auth_json["accessToken"] + self.refresh_token = auth_json["refreshToken"] + if self._token_refresh_callback: + self._token_refresh_callback(self.refresh_token) + self.username = auth_json["username"] + self.token_expiry = auth_json["refreshTokenExpiry"] + + @backoff.on_exception(backoff.expo, LGHorizonApiConnectionError, max_tries=3) + async def request(self, host: str, path: str, params=None, **kwargs) -> Any: + """Make a request.""" + if headers := kwargs.pop("headers", {}): + headers = dict(headers) + request_url = f"{host}{path}" + if await self.is_token_expiring(): # Use property + _LOGGER.debug("Access token is expiring, fetching a new one") + await self.fetch_access_token() + try: + web_response = await self.websession.request( + "GET", request_url, **kwargs, headers=headers, params=params + ) + web_response.raise_for_status() + json_response = await web_response.json() + _LOGGER.debug( + "Response from %s:\n %s", + request_url, + json.dumps(json_response, indent=2), + ) + return json_response + except ClientResponseError as cre: + _LOGGER.error("Error response from %s: %s", request_url, str(cre)) + if cre.status == 401: + await self.fetch_access_token() + raise LGHorizonApiConnectionError( + f"Unable to call {request_url}. Error:{str(cre)}" + ) from cre + + except Exception as ex: + _LOGGER.error("Error calling %s: %s", request_url, str(ex)) + raise LGHorizonApiConnectionError( + f"Unable to call {request_url}. Error:{str(ex)}" + ) from ex + + async def get_mqtt_token(self) -> Any: + """Get the MQTT token.""" + _LOGGER.debug("Fetching MQTT token") + config = await self.get_service_config() + service_url = await config.get_service_url("authorizationService") + result = await self.request( + service_url, + "/v1/mqtt/token", + ) + return result["token"] + + async def get_service_config(self): + """Get the service configuration.""" + _LOGGER.debug("Fetching service configuration") + if self._service_config is None: # Use property and backing field + base_country_code = self.country_code[0:2] + result = await self.request( + self._host, + f"/{base_country_code}/en/config-service/conf/web/backoffice.json", + ) + self._service_config = LGHorizonServicesConfig(result) + + return self._service_config + + +class LGHorizonChannel: + """Class to represent a channel.""" + + def __init__(self, channel_json): + """Initialize a channel.""" + self.channel_json = channel_json + + @property + def id(self) -> str: + """Returns the id.""" + return self.channel_json["id"] + + @property + def channel_number(self) -> str: + """Returns the channel number.""" + return self.channel_json["logicalChannelNumber"] + + @property + def replay_pre_padding(self) -> int: + """Returns the channel number.""" + return self.channel_json.get("replayPrePadding", 0) + + @property + def replay_post_padding(self) -> int: + """Returns the channel number.""" + return self.channel_json.get("replayPostPadding", 0) + + @property + def is_radio(self) -> bool: + """Returns if the channel is a radio channel.""" + return self.channel_json.get("isRadio", False) + + @property + def title(self) -> str: + """Returns the title.""" + return self.channel_json["name"] + + @property + def logo_image(self) -> str: + """Returns the logo image.""" + if "logo" in self.channel_json and "focused" in self.channel_json["logo"]: + return self.channel_json["logo"]["focused"] + return "" + + @property + def linear_products(self) -> list[str]: + """Returns the linear products.""" + return self.channel_json.get("linearProducts", []) + + @property + def stream_image(self) -> str: + """Returns the stream image.""" + image_stream = self.channel_json["imageStream"] + if "full" in image_stream: + return image_stream["full"] + if "small" in image_stream: + return image_stream["small"] + if "logo" in self.channel_json and "focused" in self.channel_json["logo"]: + return self.channel_json["logo"]["focused"] + return "" + + +class LGHorizonServicesConfig: + """Handle LG Horizon configuration and service URLs.""" + + def __init__(self, config_data: dict[str, Any]) -> None: + """Initialize LG Horizon config. + + Args: + config_data: Configuration dictionary with service endpoints + """ + self._config = config_data + + async def get_service_url(self, service_name: str) -> str: + """Get the URL for a specific service. + + Args: + service_name: Name of the service (e.g., 'authService', 'recordingService') + + Returns: + URL for the service + + Raises: + ValueError: If the service or its URL is not found + """ + if service_name in self._config and "URL" in self._config[service_name]: + return self._config[service_name]["URL"] + raise ValueError(f"Service URL for '{service_name}' not found in configuration") + + async def get_all_services(self) -> dict[str, str]: + """Get all available services and their URLs. + + Returns: + Dictionary mapping service names to URLs + """ + return { + name: url + for name, service in self._config.items() + if isinstance(service, dict) and (url := service.get("URL")) + } + + async def __getattr__(self, name: str) -> Optional[str]: + """Access service URLs as attributes. + + Example: config.authService returns the auth service URL + + Args: + name: Service name + + Returns: + URL for the service or None if not found + """ + if name.startswith("_"): + raise AttributeError( + f"'{type(self).__name__}' object has no attribute '{name}'" + ) + return await self.get_service_url(name) + + def __repr__(self) -> str: + """Return string representation.""" + services = list(self._config.keys()) + return f"LGHorizonConfig({len(services)} services)" + + +class LGHorizonCustomer: + """LGHorizon customer.""" + + _profiles: Dict[str, LGHorizonProfile] = {} + + def __init__(self, json_payload: dict): + """Initialize a customer.""" + self._json_payload = json_payload + + @property + def customer_id(self) -> str: + """Return the customer id.""" + return self._json_payload["customerId"] + + @property + def hashed_customer_id(self) -> str: + """Return the hashed customer id.""" + return self._json_payload["hashedCustomerId"] + + @property + def country_id(self) -> str: + """Return the country id.""" + return self._json_payload["countryId"] + + @property + def city_id(self) -> int: + """Return the city id.""" + return self._json_payload["cityId"] + + @property + def assigned_devices(self) -> list[str]: + """Return the assigned set-top boxes.""" + return self._json_payload.get("assignedDevices", []) + + @property + def profiles(self) -> Dict[str, LGHorizonProfile]: + """Return the profiles.""" + if not self._profiles or self._profiles == {}: + self._profiles = { + p["profileId"]: LGHorizonProfile(p) + for p in self._json_payload.get("profiles", []) + } + return self._profiles + + async def get_profile_lang(self, profile_id: str) -> str: + """Return the profile language.""" + if profile_id not in self.profiles: + return "nl" + return self.profiles[profile_id].options.lang + + +class LGHorizonDeviceState: + """Represent current state of a box.""" + + _id: Optional[str] + _channel_id: Optional[str] + _channel_name: Optional[str] + _show_title: Optional[str] + _episode_title: Optional[str] + _season_number: Optional[int] + _episode_number: Optional[int] + _image: Optional[str] + _source_type: LGHorizonSourceType + _paused: bool + _duration: Optional[float] + _position: Optional[float] + _last_position_update: Optional[datetime] + _state: LGHorizonRunningState + _speed: Optional[int] + _start_time: Optional[int] + _end_time: Optional[int] + + def __init__(self) -> None: + """Initialize the playing info.""" + self._channel_id = None + self._show_title = None + self._episode_title = None + self._season_number = None + self._episode_number = None + self._image = None + self._source_type = LGHorizonSourceType.UNKNOWN + self._ui_state_type = LGHorizonUIStateType.UNKNOWN + self._paused = False + self._duration = None + self._position = None + self._last_position_update = None + self._state = LGHorizonRunningState.UNKNOWN + self._speed = None + self._channel_name = None + self._id = None + self._start_time = None + self._end_time = None + + @property + def state(self) -> LGHorizonRunningState: + """Return the channel ID.""" + return self._state + + @state.setter + def state(self, value: LGHorizonRunningState) -> None: + """Set the channel ID.""" + self._state = value + + @property + def channel_id(self) -> Optional[str]: + """Return the channel ID.""" + return self._channel_id + + @channel_id.setter + def channel_id(self, value: Optional[str]) -> None: + """Set the channel ID.""" + self._channel_id = value + + @property + def id(self) -> Optional[str]: + """Return the channel ID.""" + return self._id + + @id.setter + def id(self, value: Optional[str]) -> None: + """Set the channel ID.""" + self._id = value + + @property + def channel_name(self) -> Optional[str]: + """Return the channel ID.""" + return self._channel_name + + @channel_name.setter + def channel_name(self, value: Optional[str]) -> None: + """Set the channel ID.""" + self._channel_name = value + + @property + def show_title(self) -> Optional[str]: + """Return the title.""" + return self._show_title + + @show_title.setter + def show_title(self, value: Optional[str]) -> None: + """Set the title.""" + self._show_title = value + + @property + def app_name(self) -> Optional[str]: + """Return the title.""" + return self._app_name + + @app_name.setter + def app_name(self, value: Optional[str]) -> None: + """Set the title.""" + self._app_name = value + + @property + def episode_title(self) -> Optional[str]: + """Return the title.""" + return self._episode_title + + @episode_title.setter + def episode_title(self, value: Optional[str]) -> None: + """Set the title.""" + self._episode_title = value + + @property + def episode_number(self) -> Optional[int]: + """Return the title.""" + return self._episode_number + + @episode_number.setter + def episode_number(self, value: Optional[int]) -> None: + """Set the title.""" + self._episode_number = value + + @property + def season_number(self) -> Optional[int]: + """Return the title.""" + return self._season_number + + @season_number.setter + def season_number(self, value: Optional[int]) -> None: + """Set the title.""" + self._season_number = value + + @property + def start_time(self) -> Optional[int]: + """Return the title.""" + return self._start_time + + @start_time.setter + def start_time(self, value: Optional[int]) -> None: + """Set the title.""" + self._start_time = value + + @property + def end_time(self) -> Optional[int]: + """Return the title.""" + return self._end_time + + @end_time.setter + def end_time(self, value: Optional[int]) -> None: + """Set the title.""" + self._end_time = value + + @property + def image(self) -> Optional[str]: + """Return the image URL.""" + return self._image + + @image.setter + def image(self, value: Optional[str]) -> None: + """Set the image URL.""" + self._image = value + + @property + def source_type(self) -> LGHorizonSourceType: + """Return the source type.""" + return self._source_type + + @source_type.setter + def source_type(self, value: LGHorizonSourceType) -> None: + """Set the source type.""" + self._source_type = value + + @property + def ui_state_type(self) -> LGHorizonUIStateType: + """Return the source type.""" + return self._ui_state_type + + @ui_state_type.setter + def ui_state_type(self, value: LGHorizonUIStateType) -> None: + """Set the source type.""" + self._ui_state_type = value + + @property + def paused(self) -> bool: + """Return if the media is paused.""" + if self.speed is None: + return False + return self.speed == 0 + + @property + def duration(self) -> Optional[float]: + """Return the duration of the media.""" + return self._duration + + @duration.setter + def duration(self, value: Optional[float]) -> None: + """Set the duration of the media.""" + self._duration = value + + @property + def position(self) -> Optional[float]: + """Return the current position in the media.""" + return self._position + + @position.setter + def position(self, value: Optional[float]) -> None: + """Set the current position in the media.""" + self._position = value + + @property + def last_position_update(self) -> Optional[int]: + """Return the last time the position was updated.""" + return self._last_position_update + + @last_position_update.setter + def last_position_update(self, value: Optional[int]) -> None: + """Set the last position update time.""" + self._last_position_update = value + + async def reset_progress(self) -> None: + """Reset the progress-related attributes.""" + self.last_position_update = None + self.duration = None + self.position = None + + @property + def speed(self) -> Optional[int]: + """Return the speed.""" + return self._speed + + @speed.setter + def speed(self, value: int | None) -> None: + """Set the channel ID.""" + self._speed = value + + async def reset(self) -> None: + """Reset all playing information.""" + self.channel_id = None + self.episode_number = None + self.season_number = None + self.episode_title = None + self.show_title = None + self.app_name = None + self.image = None + self.source_type = LGHorizonSourceType.UNKNOWN + self.speed = None + self.channel_name = None + self.id = None + self.start_time = None + self.end_time = None + await self.reset_progress() + + +class LGHorizonEntitlements: + """Class to represent entitlements.""" + + def __init__(self, entitlements_json): + """Initialize entitlements.""" + self.entitlements_json = entitlements_json + + @property + def entitlements(self): + """Returns the entitlements.""" + return self.entitlements_json.get("entitlements", []) + + @property + def entitlement_ids(self) -> list[str]: + """Returns a list of entitlement IDs.""" + return [e["id"] for e in self.entitlements if "id" in e] + + +class LGHorizonReplayEvent: + """LGhorizon replay event.""" + + def __init__(self, raw_json: dict): + """Initialize an LG Horizon replay event.""" + self._raw_json = raw_json + + @property + def episode_number(self) -> Optional[int]: + """Return the episode number.""" + return self._raw_json.get("episodeNumber") + + @property + def channel_id(self) -> str: + """Return the channel ID.""" + return self._raw_json["channelId"] + + @property + def event_id(self) -> str: + """Return the event ID.""" + return self._raw_json["eventId"] + + @property + def season_number(self) -> Optional[int]: + """Return the season number.""" + return self._raw_json.get("seasonNumber") + + @property + def start_time(self) -> Optional[int]: + """Return the season number.""" + return self._raw_json.get("startTime", None) + + @property + def end_time(self) -> Optional[int]: + """Return the season number.""" + return self._raw_json.get("endTime", None) + + @property + def title(self) -> str: + """Return the title of the event.""" + return self._raw_json["title"] + + @property + def episode_name(self) -> Optional[str]: + """Return the episode name.""" + return self._raw_json.get("episodeName", None) + + @property + def full_episode_title(self) -> Optional[str]: + """Return the full episode title.""" + + if not self.season_number and not self.episode_number: + return None + full_title = f"""S{self.season_number:02d}E{self.episode_number:02d}""" + if self.episode_name: + full_title += f": {self.episode_name}" + return full_title + + +class LGHorizonVODType(Enum): + """Enumeration of LG Horizon VOD types.""" + + ASSET = "ASSET" + EPISODE = "EPISODE" + UNKNOWN = "UNKNOWN" + + +class LGHorizonVOD: + """LGHorizon video on demand.""" + + def __init__(self, vod_json) -> None: + """Initialize an LG Horizon VOD object. + + Args: + vod_json: The raw JSON dictionary containing VOD information. + """ + self._vod_json = vod_json + + @property + def vod_type(self) -> LGHorizonVODType: + """Return the ID of the VOD.""" + return LGHorizonVODType[self._vod_json.get("type", "unknown").upper()] + + @property + def id(self) -> str: + """Return the ID of the VOD.""" + return self._vod_json["id"] + + @property + def season(self) -> Optional[int]: + """Return the season number of the recording.""" + return self._vod_json.get("season", None) + + @property + def episode(self) -> Optional[int]: + """Return the episode number of the recording.""" + return self._vod_json.get("episode", None) + + @property + def title(self) -> str: + """Return the ID of the VOD.""" + return self._vod_json["title"] + + @property + def series_title(self) -> Optional[str]: + """Return the series title of the VOD.""" + return self._vod_json.get("seriesTitle", None) + + @property + def duration(self) -> float: + """Return the duration of the VOD.""" + return self._vod_json["duration"] + + +class LGHOrizonRelevantEpisode: + """Represents a relevant episode within a recording season or show.""" + + def __init__(self, episode_json: dict) -> None: + """Abstract base class for LG Horizon recordings.""" + self._episode_json = episode_json + + @property + def recording_state(self) -> LGHorizonRecordingState: + """Return the recording state.""" + return LGHorizonRecordingState[ + self._episode_json.get("recordingState", "unknown").upper() + ] + + @property + def season_number(self) -> Optional[int]: + """Return the season number of the recording.""" + return self._episode_json.get("seasonNumber", None) + + @property + def episode_number(self) -> Optional[int]: + """Return the episode number of the recording.""" + return self._episode_json.get("episodeNumber", None) + + +class LGHorizonRecording(ABC): + """Abstract base class for LG Horizon recordings.""" + + @property + def recording_payload(self) -> dict: + """Return the payload of the message.""" + return self._recording_payload + + @property + def recording_state(self) -> LGHorizonRecordingState: + """Return the recording state.""" + return LGHorizonRecordingState[ + self._recording_payload.get("recordingState", "unknown").upper() + ] + + @property + def source(self) -> LGHorizonRecordingSource: + """Return the recording source.""" + return LGHorizonRecordingSource[ + self._recording_payload.get("source", "unknown").upper() + ] + + @property + def type(self) -> LGHorizonRecordingType: + """Return the recording source.""" + return LGHorizonRecordingType[ + self._recording_payload.get("type", "unknown").upper() + ] + + @property + def id(self) -> str: + """Return the ID of the recording.""" + return self._recording_payload["id"] + + @property + def title(self) -> str: + """Return the title of the recording.""" + return self._recording_payload.get("title", "unknown") + + @property + def channel_id(self) -> str: + """Return the channel ID of the recording.""" + return self._recording_payload["channelId"] + + @property + def poster_url(self) -> Optional[str]: + """Return the title of the recording.""" + poster = self._recording_payload.get("poster") + if poster: + return poster.get("url") + return None + + def __init__(self, recording_payload: dict) -> None: + """Abstract base class for LG Horizon recordings. + Args: + recording_payload: The raw JSON dictionary containing recording information. + """ + self._recording_payload = recording_payload + + +class LGHorizonRecordingSingle(LGHorizonRecording): + """LGHorizon recording.""" + + @property + def episode_title(self) -> Optional[str]: + """Return the episode title of the recording.""" + return self._recording_payload.get("episodeTitle", None) + + @property + def episode_id(self) -> Optional[str]: + """Return the episode title of the recording.""" + return self._recording_payload.get("episodeId", None) + + @property + def season_number(self) -> Optional[int]: + """Return the season number of the recording.""" + return self._recording_payload.get("seasonNumber", None) + + @property + def episode_number(self) -> Optional[int]: + """Return the episode number of the recording.""" + return self._recording_payload.get("episodeNumber", None) + + @property + def show_id(self) -> Optional[str]: + """Return the show ID of the recording.""" + return self._recording_payload.get("showId", None) + + @property + def show_title(self) -> Optional[str]: + """Return the show ID of the recording.""" + return self._recording_payload.get("showTitle", None) + + @property + def season_id(self) -> Optional[str]: + """Return the season ID of the recording.""" + return self._recording_payload.get("seasonId", None) + + @property + def channel_id(self) -> Optional[str]: + """Return the channel ID of the recording.""" + return self._recording_payload.get("channelId", None) + + @property + def duration(self) -> Optional[int]: + """Return the title.""" + return self.recording_payload.get("duration", None) + + @property + def start_time(self) -> Optional[int]: + """Return the title.""" + return self.recording_payload.get("startTime", None) + + @property + def end_time(self) -> Optional[int]: + """Return the title.""" + return self.recording_payload.get("endTime", None) + + +class LGHorizonRecordingSeason(LGHorizonRecording): + """Represents an LG Horizon recording season.""" + + _most_relevant_epsode: Optional[LGHOrizonRelevantEpisode] + + def __init__(self, payload: dict) -> None: + """Abstract base class for LG Horizon recordings.""" + super().__init__(payload) + episode_payload = payload.get("mostRelevantEpisode") + if episode_payload: + self._most_relevant_epsode = LGHOrizonRelevantEpisode(episode_payload) + + @property + def no_of_episodes(self) -> int: + """Return the number of episodes in the season.""" + return self._recording_payload.get("noOfEpisodes", 0) + + @property + def season_title(self) -> str: + """Return the season title of the recording.""" + return self._recording_payload.get("seasonTitle", "") + + @property + def show_id(self) -> str: + """Return the season title of the recording.""" + return self._recording_payload.get("showId", "") + + @property + def most_relevant_episode(self) -> Optional[LGHOrizonRelevantEpisode]: + """Return the most relevant episode of the season.""" + return self._most_relevant_epsode + + +class LGHorizonRecordingShow(LGHorizonRecording): + """Represents an LG Horizon recording show.""" + + _most_relevant_epsode: Optional[LGHOrizonRelevantEpisode] + + def __init__(self, payload: dict) -> None: + """Abstract base class for LG Horizon recordings.""" + super().__init__(payload) + episode_payload = payload.get("mostRelevantEpisode") + if episode_payload: + self._most_relevant_epsode = LGHOrizonRelevantEpisode(episode_payload) + + @property + def no_of_episodes(self) -> int: + """Return the number of episodes in the season.""" + return self._recording_payload.get("noOfEpisodes", 0) + + @property + def most_relevant_episode(self) -> Optional[LGHOrizonRelevantEpisode]: + """Return the most relevant episode of the season.""" + return self._most_relevant_epsode + + +class LGHorizonRecordingList: + """Represents a list of LG Horizon recordings.""" + + @property + def total(self) -> int: + """Return the total number of recordings.""" + return len(self._recordings) + + def __init__(self, recordings: List[LGHorizonRecording]) -> None: + """Initialize an LG Horizon recording list. + + Args: + recordings: A list of LGHorizonRecording objects. + """ + self._recordings = recordings + + @property + def recordings(self) -> List[LGHorizonRecording]: + """Return the total number of recordings.""" + return self._recordings + + +class LGHorizonShowRecordingList(LGHorizonRecordingList): + """LGHorizon recording.""" + + def __init__( + self, + show_title: Optional[str], + show_image, + recordings: List[LGHorizonRecording], + ) -> None: + """Initialize an LG Horizon show recording list. + + Args: + show_title: The title of the show. + show_image: The image URL for the show. + recordings: A list of LGHorizonRecording objects belonging to the show. + """ + super().__init__(recordings) + self._show_title = show_title + self._show_image = show_image + + @property + def show_title(self) -> str: + """Title of the show.""" + return self._show_title + + @property + def show_image(self) -> Optional[str]: + """Image of the show.""" + return self._show_image + + +class LGHorizonRecordingQuota: + """LGHorizon recording quota.""" + + def __init__(self, quota_json: dict) -> None: + """Initialize the recording quota.""" + self._quota_json = quota_json + + @property + def quota(self) -> int: + """Return the total space in MB.""" + return self._quota_json.get("quota", 0) + + @property + def occupied(self) -> int: + """Return the used space in MB.""" + return self._quota_json.get("occupied", 0) + + @property + def percentage_used(self) -> float: + """Return the percentage of space used.""" + if self.quota == 0: + return 0.0 + return (self.occupied / self.quota) * 100 diff --git a/lghorizon/lghorizon_mqtt_client.py b/lghorizon/lghorizon_mqtt_client.py new file mode 100644 index 0000000..1d8e695 --- /dev/null +++ b/lghorizon/lghorizon_mqtt_client.py @@ -0,0 +1,335 @@ +import asyncio +import json +import logging +from typing import Any, Callable, Coroutine + +import paho.mqtt.client as mqtt + +from .helpers import make_id +from .lghorizon_models import LGHorizonAuth + +_logger = logging.getLogger(__name__) + + +class LGHorizonMqttClient: + """Asynchronous-friendly wrapper around Paho MQTT.""" + + def __init__( + self, + auth: LGHorizonAuth, + on_connected_callback: Callable[[], Coroutine[Any, Any, Any]], + on_message_callback: Callable[[dict, str], Coroutine[Any, Any, Any]], + loop: asyncio.AbstractEventLoop, + ) -> None: + self._auth = auth + """Initialize the LGHorizonMqttClient. + + Args: + auth: The authentication object for obtaining MQTT tokens. + on_connected_callback: An async callback function for MQTT connection events. + on_message_callback: An async callback function for MQTT message events. + loop: The asyncio event loop. + """ + self._on_connected_callback = on_connected_callback + self._on_message_callback = on_message_callback + self._loop = loop + + self._mqtt_client: mqtt.Client | None = None + self._mqtt_broker_url: str = "" + self._mqtt_token: str = "" + self.client_id: str = "" + self._reconnect_task: asyncio.Task | None = None + self._disconnect_requested: bool = False + + # FIFO queues + self._message_queue: asyncio.Queue = asyncio.Queue() + self._publish_queue: asyncio.Queue = asyncio.Queue() + + # Worker tasks + self._message_worker_task: asyncio.Task | None = None + self._publish_worker_task: asyncio.Task | None = None + + @property + def is_connected(self) -> bool: + return self._mqtt_client is not None and self._mqtt_client.is_connected() + + @classmethod + async def create( + cls, + auth: LGHorizonAuth, + on_connected_callback: Callable[[], Coroutine[Any, Any, Any]], + on_message_callback: Callable[[dict, str], Coroutine[Any, Any, Any]], + ) -> "LGHorizonMqttClient": + """Asynchronously create and initialize an LGHorizonMqttClient instance.""" + + loop = asyncio.get_running_loop() + instance = cls(auth, on_connected_callback, on_message_callback, loop) + + # Service config ophalen + service_config = await auth.get_service_config() + mqtt_broker_url = await service_config.get_service_url("mqttBroker") + instance._mqtt_broker_url = mqtt_broker_url.replace("wss://", "").replace( + ":443/mqtt", "" + ) + + instance.client_id = await make_id() + + # Paho client + instance._mqtt_client = mqtt.Client( + client_id=instance.client_id, + transport="websockets", + ) + instance._mqtt_client.ws_set_options( + headers={"Sec-WebSocket-Protocol": "mqtt, mqttv3.1, mqttv3.11"} + ) + + # Token ophalen + instance._mqtt_token = await auth.get_mqtt_token() + instance._mqtt_client.username_pw_set( + auth.household_id, + instance._mqtt_token, + ) + + # TLS instellen (blocking → executor) + await loop.run_in_executor(None, instance._mqtt_client.tls_set) + + instance._mqtt_client.enable_logger(_logger) + instance._mqtt_client.on_connect = instance._on_connect + instance._mqtt_client.on_message = instance._on_message + instance._mqtt_client.on_disconnect = instance._on_disconnect + + return instance + + async def connect(self) -> None: + """Connect the MQTT client to the broker asynchronously.""" + if not self._mqtt_client: + raise RuntimeError("MQTT client not initialized") + + if self.is_connected: + _logger.debug("MQTT client is already connected.") + return + + self._disconnect_requested = False # Reset flag for new connection attempt + + # Cancel any ongoing reconnect task if connect() is called manually + if self._reconnect_task and not self._reconnect_task.done(): + _logger.debug("Cancelling existing reconnect task before manual connect.") + self._reconnect_task.cancel() + self._reconnect_task = None + + _logger.info("Attempting initial MQTT connection...") + # Blocking connect → executor + await self._loop.run_in_executor( + None, + self._mqtt_client.connect, + self._mqtt_broker_url, + 443, + ) + + # Start Paho thread + self._mqtt_client.loop_start() + + # Start workers + self._message_worker_task = asyncio.create_task(self._message_worker()) + self._publish_worker_task = asyncio.create_task(self._publish_worker()) + + async def disconnect(self) -> None: + """Disconnect the MQTT client from the broker asynchronously.""" + if not self._mqtt_client: + return + + # Stop workers + if self._message_worker_task: + self._message_worker_task.cancel() + self._message_worker_task = None + + if self._publish_worker_task: + self._publish_worker_task.cancel() + self._publish_worker_task = None + + # Blocking disconnect → executor + await self._loop.run_in_executor(None, self._mqtt_client.disconnect) + self._mqtt_client.loop_stop() + + async def subscribe(self, topic: str) -> None: + """Subscribe to an MQTT topic. + + Args: + topic: The MQTT topic to subscribe to. + """ + if not self._mqtt_client: + raise RuntimeError("MQTT client not initialized") + + self._mqtt_client.subscribe(topic) + + async def publish_message(self, topic: str, json_payload: str) -> None: + """Queue an MQTT message for publishing. + + Args: + topic: The MQTT topic to publish to. + json_payload: The JSON payload as a string. + """ + await self._publish_queue.put((topic, json_payload)) + + # ------------------------- + # INTERNAL CALLBACKS + # ------------------------- + + def _on_connect(self, client, userdata, flags, result_code): + """Callback for when the MQTT client connects to the broker. + + Args: + client: The Paho MQTT client instance. + userdata: User data passed to the client. + flags: Response flags from the broker. + result_code: The connection result code. + """ + if result_code == 0: + _logger.info("MQTT client connected successfully.") + # If a reconnect task was running, it means we successfully reconnected. + # Cancel it as we are now connected. + if self._reconnect_task: + self._reconnect_task.cancel() + self._reconnect_task = None # Clear the reference + asyncio.run_coroutine_threadsafe( + self._on_connected_callback(), + self._loop, + ) + elif result_code == 5: + _logger.warning( + "MQTT connection failed: Token expired. Attempting to refresh token and reconnect." + ) + # Schedule the token refresh and reconnect in the main event loop + asyncio.run_coroutine_threadsafe( + self._handle_token_refresh_and_reconnect(), self._loop + ) + else: + _logger.error("MQTT connect error: %s", result_code) + # For other errors, Paho's _on_disconnect will typically be called, + # which will then trigger the general reconnect loop. + + async def _handle_token_refresh_and_reconnect(self): + """Refreshes the MQTT token and attempts to reconnect the client.""" + try: + # Get new token + self._mqtt_token = await self._auth.get_mqtt_token() + self._mqtt_client.username_pw_set( + self._auth.household_id, + self._mqtt_token, + ) + _logger.info("MQTT token refreshed. Attempting to reconnect.") + # Call connect. If it fails, _on_disconnect will be triggered, + # and the _reconnect_loop will take over. + await self.connect() + except Exception as e: + _logger.error("Failed to refresh MQTT token or initiate reconnect: %s", e) + # If token refresh itself fails, or connect() raises an exception + # before _on_disconnect can be called, ensure reconnect loop starts. + if not self._disconnect_requested and ( + not self._reconnect_task or self._reconnect_task.done() + ): + _logger.info( + "Scheduling MQTT reconnect after token refresh/connect failure." + ) + self._reconnect_task = asyncio.create_task(self._reconnect_loop()) + + def _on_message(self, client, userdata, message): + """Callback for when an MQTT message is received. + + Args: + client: The Paho MQTT client instance. + userdata: User data passed to the client. + message: The MQTTMessage object containing topic and payload. + """ + asyncio.run_coroutine_threadsafe( + self._message_queue.put((message.topic, message.payload)), + self._loop, + ) + + def _on_disconnect(self, client, userdata, result_code): + """Callback for when the MQTT client disconnects from the broker. + + Args: + client: The Paho MQTT client instance. + userdata: User data passed to the client. + result_code: The disconnection result code. + """ + _logger.warning("MQTT disconnected with result code: %s", result_code) + if not self._disconnect_requested: + _logger.info("Unexpected MQTT disconnection. Initiating reconnect loop.") + if not self._reconnect_task or self._reconnect_task.done(): + self._reconnect_task = asyncio.run_coroutine_threadsafe( + self._reconnect_loop(), self._loop + ) + else: + _logger.debug("Reconnect loop already active.") + else: + _logger.info("MQTT disconnected as requested.") + + async def _reconnect_loop(self): + """Manages the MQTT reconnection process with exponential backoff.""" + retries = 0 + while not self._disconnect_requested: + if self.is_connected: + _logger.debug( + "MQTT client reconnected within loop, stopping reconnect attempts." + ) + break # Already connected, stop trying + + delay = min(2**retries, 60) # Exponential backoff, max 60 seconds + _logger.debug( + "Waiting %s seconds before MQTT reconnect attempt %s", + delay, + retries + 1, + ) + await asyncio.sleep(delay) + + try: + _logger.info("Attempting MQTT reconnect...") + await self.connect() + # If connect() succeeds, _on_connect will be called, which will cancel this task. + # If connect() fails, _on_disconnect will be called again, and this loop continues. + break # If connect() doesn't raise, assume it's handled by _on_connect + except Exception as e: + _logger.error("MQTT reconnect attempt failed: %s", e) + retries += 1 + self._reconnect_task = None # Clear task when loop finishes or is cancelled. + + # ------------------------- + # MESSAGE WORKER (FIFO) + # ------------------------- + + async def _message_worker(self): + """Worker task to process incoming MQTT messages from the queue.""" + while True: + topic, payload = await self._message_queue.get() + + try: + json_payload = json.loads(payload) + await self._on_message_callback(json_payload, topic) + except Exception: + _logger.exception("Error processing MQTT message") + + self._message_queue.task_done() + + # ------------------------- + # PUBLISH WORKER (FIFO) + # ------------------------- + + async def _publish_worker(self): + """Worker task to process outgoing MQTT publish commands from the queue.""" + while True: + topic, payload = await self._publish_queue.get() + + try: + # Wacht tot MQTT echt connected is + while not self.is_connected: + await asyncio.sleep(0.1) + + # Publish is non-blocking + self._mqtt_client.publish(topic, payload, qos=2) + + except Exception: + _logger.exception("Error publishing MQTT message") + + self._publish_queue.task_done() diff --git a/lghorizon/lghorizon_recording_factory.py b/lghorizon/lghorizon_recording_factory.py new file mode 100644 index 0000000..d08d811 --- /dev/null +++ b/lghorizon/lghorizon_recording_factory.py @@ -0,0 +1,55 @@ +from typing import Optional +from .lghorizon_models import ( + LGHorizonRecordingList, + LGHorizonRecordingSingle, + LGHorizonRecordingSeason, + LGHorizonRecordingShow, + LGHorizonRecordingType, + LGHorizonShowRecordingList, +) + + +class LGHorizonRecordingFactory: + """Factory to create LGHorizonRecording objects.""" + + async def create_recordings(self, recording_json: dict) -> LGHorizonRecordingList: + """Create a LGHorizonRecording object based on the recording type.""" + recording_list = [] + for recording in recording_json["data"]: + recording_type = LGHorizonRecordingType[ + recording.get("type", "unknown").upper() + ] + match recording_type: + case LGHorizonRecordingType.SINGLE: + recording_single = LGHorizonRecordingSingle(recording) + recording_list.append(recording_single) + case LGHorizonRecordingType.SEASON: + recording_season = LGHorizonRecordingSeason(recording) + recording_list.append(recording_season) + case LGHorizonRecordingType.SHOW: + recording_show = LGHorizonRecordingShow(recording) + recording_list.append(recording_show) + case LGHorizonRecordingType.UNKNOWN: + pass + + return LGHorizonRecordingList(recording_list) + + async def create_episodes(self, episode_json: dict) -> LGHorizonShowRecordingList: + """Create a LGHorizonRecording list based for episodes.""" + recording_list = [] + show_title: Optional[str] = None + if "images" in episode_json: + images = episode_json["images"] + show_image = next( + (img["url"] for img in images if img.get("type") == "titleTreatment"), + images[0]["url"] if images else None, + ) + else: + show_image = None + + for recording in episode_json["data"]: + recording_single = LGHorizonRecordingSingle(recording) + if show_title is None: + show_title = recording_single.show_title or recording_single.title + recording_list.append(recording_single) + return LGHorizonShowRecordingList(show_title, show_image, recording_list) diff --git a/lghorizon/models.py b/lghorizon/models.py deleted file mode 100644 index 36f2ab8..0000000 --- a/lghorizon/models.py +++ /dev/null @@ -1,768 +0,0 @@ -"""Models for LGHorizon API.""" - -# pylint: disable=broad-exception-caught -# pylint: disable=broad-exception-raised -from datetime import datetime -from typing import Callable, Dict -import json -import logging -import paho.mqtt.client as mqtt - -from .const import ( - BOX_PLAY_STATE_CHANNEL, - ONLINE_STANDBY, - ONLINE_RUNNING, - MEDIA_KEY_POWER, - MEDIA_KEY_PLAY_PAUSE, - MEDIA_KEY_STOP, - MEDIA_KEY_CHANNEL_UP, - MEDIA_KEY_CHANNEL_DOWN, - MEDIA_KEY_ENTER, - MEDIA_KEY_REWIND, - MEDIA_KEY_FAST_FORWARD, - MEDIA_KEY_RECORD, - RECORDING_TYPE_SEASON, -) - -from .helpers import make_id - -_logger = logging.getLogger(__name__) - - -class LGHorizonAuth: - """Class to hold LGHorizon authentication.""" - - household_id: str - access_token: str - refresh_token: str - refresh_token_expiry: datetime - username: str - mqtt_token: str = None - access_token: str = None - - def __init__(self): - """Initialize a session.""" - - def fill(self, auth_json) -> None: - """Fill the object.""" - self.household_id = auth_json["householdId"] - self.access_token = auth_json["accessToken"] - self.refresh_token = auth_json["refreshToken"] - self.username = auth_json["username"] - try: - self.refresh_token_expiry = datetime.fromtimestamp( - auth_json["refreshTokenExpiry"] - ) - except ValueError: - # VM uses milliseconds for the expiry time. - # If the year is too high to be valid, it assumes it's milliseconds and divides it - self.refresh_token_expiry = datetime.fromtimestamp( - auth_json["refreshTokenExpiry"] // 1000 - ) - - def is_expired(self) -> bool: - """Check if refresh token is expired.""" - return self.refresh_token_expiry - - -class LGHorizonPlayingInfo: - """Represent current state of a box.""" - - channel_id: str = None - title: str = None - image: str = None - source_type: str = None - paused: bool = False - channel_title: str = None - duration: float = None - position: float = None - last_position_update: datetime = None - - def __init__(self): - """Initialize the playing info.""" - - def set_paused(self, paused: bool): - """Set pause state.""" - self.paused = paused - - def set_channel(self, channel_id): - """Set channel.""" - self.channel_id = channel_id - - def set_title(self, title): - """Set title.""" - self.title = title - - def set_channel_title(self, title): - """Set channel title.""" - self.channel_title = title - - def set_image(self, image): - """Set image.""" - self.image = image - - def set_source_type(self, source_type): - """Set source type.""" - self.source_type = source_type - - def set_duration(self, duration: float): - """Set duration.""" - self.duration = duration - - def set_position(self, position: float): - """Set position.""" - self.position = position - - def set_last_position_update(self, last_position_update: datetime): - """Set last position update.""" - self.last_position_update = last_position_update - - def reset_progress(self): - """Reset the progress.""" - self.last_position_update = None - self.duration = None - self.position = None - - def reset(self): - """Reset the channel""" - self.channel_id = None - self.title = None - self.image = None - self.source_type = None - self.paused = False - self.channel_title = None - self.reset_progress() - - -class LGHorizonChannel: - """Represent a channel.""" - - id: str - title: str - stream_image: str - logo_image: str - channel_number: str - - def __init__(self, channel_json): - """Initialize a channel.""" - self.id = channel_json["id"] - self.title = channel_json["name"] - self.stream_image = self.get_stream_image(channel_json) - if "logo" in channel_json and "focused" in channel_json["logo"]: - self.logo_image = channel_json["logo"]["focused"] - else: - self.logo_image = "" - self.channel_number = channel_json["logicalChannelNumber"] - - def get_stream_image(self, channel_json) -> str: - """Returns the stream image.""" - image_stream = channel_json["imageStream"] - if "full" in image_stream: - return image_stream["full"] - if "small" in image_stream: - return image_stream["small"] - if "logo" in channel_json and "focused" in channel_json["logo"]: - return channel_json["logo"]["focused"] - return "" - - -class LGHorizonReplayEvent: - """LGhorizon replay event.""" - - episode_number: int = None - channel_id: str = None - event_id: str = None - season_number: int = None - title: str = None - episode_name: str = None - - def __init__(self, raw_json: str): - self.channel_id = raw_json["channelId"] - self.event_id = raw_json["eventId"] - self.title = raw_json["title"] - if "episodeName" in raw_json: - self.episode_name = raw_json["episodeName"] - if "episodeNumber" in raw_json: - self.episode_number = raw_json["episodeNumber"] - if "seasonNumber" in raw_json: - self.season_number = raw_json["seasonNumber"] - - -class LGHorizonBaseRecording: - """LgHorizon base recording.""" - - recording_id: str = None - title: str = None - image: str = None - recording_type: str = None - channel_id: str = None - - def __init__( - self, - recording_id: str, - title: str, - image: str, - channel_id: str, - recording_type: str, - ) -> None: - self.recording_id = recording_id - self.title = title - self.image = image - self.channel_id = channel_id - self.recording_type = recording_type - - -class LGHorizonRecordingSingle(LGHorizonBaseRecording): - """Represents a single recording.""" - - season_number: int = None - episode_number: int = None - - def __init__(self, recording_json): - """Init the single recording.""" - poster_url = None - if "poster" in recording_json and "url" in recording_json["poster"]: - poster_url = recording_json["poster"]["url"] - LGHorizonBaseRecording.__init__( - self, - recording_json["id"], - recording_json["title"], - poster_url, - recording_json["channelId"], - recording_json["type"], - ) - if "seasonNumber" in recording_json: - self.season_number = recording_json["seasonNumber"] - if "episodeNumber" in recording_json: - self.episode_number = recording_json["episodeNumber"] - - -class LGHorizonRecordingEpisode: - """Represents a single recording.""" - - episode_id: str = None - episode_title: str = None - season_number: int = None - episode_number: int = None - show_title: str = None - recording_state: str = None - image: str = None - - def __init__(self, recording_json): - """Init the single recording.""" - self.episode_id = recording_json["episodeId"] - self.episode_title = recording_json["episodeTitle"] - self.show_title = recording_json["showTitle"] - self.recording_state = recording_json["recordingState"] - if "seasonNumber" in recording_json: - self.season_number = recording_json["seasonNumber"] - if "episodeNumber" in recording_json: - self.episode_number = recording_json["episodeNumber"] - if "poster" in recording_json and "url" in recording_json["poster"]: - self.image = recording_json["poster"]["url"] - - -class LGHorizonRecordingShow: - """Represents a single recording.""" - - episode_id: str = None - show_title: str = None - season_number: int = None - episode_number: int = None - recording_state: str = None - image: str = None - - def __init__(self, recording_json): - """Init the single recording.""" - self.episode_id = recording_json["episodeId"] - self.show_title = recording_json["showTitle"] - self.recording_state = recording_json["recordingState"] - if "seasonNumber" in recording_json: - self.season_number = recording_json["seasonNumber"] - if "episodeNumber" in recording_json: - self.episode_number = recording_json["episodeNumber"] - if "poster" in recording_json and "url" in recording_json["poster"]: - self.image = recording_json["poster"]["url"] - - -class LGHorizonRecordingListSeasonShow(LGHorizonBaseRecording): - """LGHorizon Season show list.""" - - show_id: str = None - - def __init__(self, recording_season_json): - """Init the single recording.""" - - poster_url = None - if ( - "poster" in recording_season_json - and "url" in recording_season_json["poster"] - ): - poster_url = recording_season_json["poster"]["url"] - LGHorizonBaseRecording.__init__( - self, - recording_season_json["id"], - recording_season_json["title"], - poster_url, - recording_season_json["channelId"], - recording_season_json["type"], - ) - if self.recording_type == RECORDING_TYPE_SEASON: - self.show_id = recording_season_json["showId"] - else: - self.show_id = recording_season_json["id"] - - -class LGHorizonVod: - """LGHorizon video on demand.""" - - title: str = None - image: str = None - duration: float = None - - def __init__(self, vod_json) -> None: - self.title = vod_json["title"] - self.duration = vod_json["duration"] - - -class LGHorizonApp: - """LGHorizon App.""" - - title: str = None - image: str = None - - def __init__(self, app_state_json: str) -> None: - self.title = app_state_json["appName"] - self.image = app_state_json["logoPath"] - if not self.image.startswith("http:"): - self.image = "https:" + self.image - - -class LGHorizonMqttClient: - """LGHorizon MQTT client.""" - - _broker_url: str = None - _mqtt_client: mqtt.Client - _auth: LGHorizonAuth - client_id: str = None - _on_connected_callback: Callable = None - _on_message_callback: Callable[[str, str], None] = None - - @property - def is_connected(self): - """Is client connected.""" - return self._mqtt_client.is_connected - - def __init__( - self, - auth: LGHorizonAuth, - mqtt_broker_url: str, - on_connected_callback: Callable = None, - on_message_callback: Callable[[str], None] = None, - ): - self._auth = auth - self._broker_url = mqtt_broker_url.replace("wss://", "").replace( - ":443/mqtt", "" - ) - self.client_id = make_id() - self._mqtt_client = mqtt.Client( - client_id=self.client_id, - transport="websockets", - ) - - self._mqtt_client.ws_set_options( - headers={"Sec-WebSocket-Protocol": "mqtt, mqttv3.1, mqttv3.11"} - ) - self._mqtt_client.username_pw_set( - self._auth.household_id, self._auth.mqtt_token - ) - self._mqtt_client.tls_set() - self._mqtt_client.enable_logger(_logger) - self._mqtt_client.on_connect = self._on_mqtt_connect - self._on_connected_callback = on_connected_callback - self._on_message_callback = on_message_callback - - def _on_mqtt_connect(self, client, userdata, flags, result_code): # pylint: disable=unused-argument - if result_code == 0: - self._mqtt_client.on_message = self._on_client_message - self._mqtt_client.subscribe(self._auth.household_id) - self._mqtt_client.subscribe(self._auth.household_id + "/#") - self._mqtt_client.subscribe(self._auth.household_id + "/" + self.client_id) - self._mqtt_client.subscribe(self._auth.household_id + "/+/status") - self._mqtt_client.subscribe( - self._auth.household_id + "/+/networkRecordings" - ) - self._mqtt_client.subscribe( - self._auth.household_id + "/+/networkRecordings/capacity" - ) - self._mqtt_client.subscribe(self._auth.household_id + "/+/localRecordings") - self._mqtt_client.subscribe( - self._auth.household_id + "/+/localRecordings/capacity" - ) - self._mqtt_client.subscribe(self._auth.household_id + "/watchlistService") - self._mqtt_client.subscribe(self._auth.household_id + "/purchaseService") - self._mqtt_client.subscribe( - self._auth.household_id + "/personalizationService" - ) - self._mqtt_client.subscribe(self._auth.household_id + "/recordingStatus") - self._mqtt_client.subscribe( - self._auth.household_id + "/recordingStatus/lastUserAction" - ) - if self._on_connected_callback: - self._on_connected_callback() - elif result_code == 5: - self._mqtt_client.username_pw_set( - self._auth.household_id, self._auth.mqtt_token - ) - self.connect() - else: - _logger.error( - "Cannot connect to MQTT server with resultCode: %s", result_code - ) - - def connect(self) -> None: - """Connect the client.""" - self._mqtt_client.connect(self._broker_url, 443) - self._mqtt_client.loop_start() - - def _on_client_message(self, client, userdata, message): # pylint: disable=unused-argument - """Handle messages received by mqtt client.""" - _logger.debug("Received MQTT message. Topic: %s", message.topic) - json_payload = json.loads(message.payload) - _logger.debug("Message: %s", json_payload) - if self._on_message_callback: - self._on_message_callback(json_payload, message.topic) - - def publish_message(self, topic: str, json_payload: str) -> None: - """Publish a MQTT message.""" - self._mqtt_client.publish(topic, json_payload, qos=2) - - def disconnect(self) -> None: - """Disconnect the client.""" - if self._mqtt_client.is_connected(): - self._mqtt_client.disconnect() - - -class LGHorizonBox: - """The LGHorizon box.""" - - device_id: str = None - hashed_cpe_id: str = None - device_friendly_name: str = None - state: str = None - playing_info: LGHorizonPlayingInfo = None - manufacturer: str = None - model: str = None - recording_capacity: int = None - - _mqtt_client: LGHorizonMqttClient - _change_callback: Callable = None - _auth: LGHorizonAuth = None - _channels: Dict[str, LGHorizonChannel] = None - _message_stamp = None - - def __init__( - self, - box_json: str, - platform_type: Dict[str, str], - mqtt_client: LGHorizonMqttClient, - auth: LGHorizonAuth, - channels: Dict[str, LGHorizonChannel], - ): - self.device_id = box_json["deviceId"] - self.hashed_cpe_id = box_json["hashedCPEId"] - self.device_friendly_name = box_json["settings"]["deviceFriendlyName"] - self._mqtt_client = mqtt_client - self._auth = auth - self._channels = channels - self.playing_info = LGHorizonPlayingInfo() - if platform_type: - self.manufacturer = platform_type["manufacturer"] - self.model = platform_type["model"] - - def update_channels(self, channels: Dict[str, LGHorizonChannel]): - """Update the channels list.""" - self._channels = channels - - def register_mqtt(self) -> None: - """Register the mqtt connection.""" - if not self._mqtt_client.is_connected: - raise Exception("MQTT client not connected.") - topic = f"{self._auth.household_id}/{self._mqtt_client.client_id}/status" - payload = { - "source": self._mqtt_client.client_id, - "state": ONLINE_RUNNING, - "deviceType": "HGO", - } - self._mqtt_client.publish_message(topic, json.dumps(payload)) - - def set_callback(self, change_callback: Callable) -> None: - """Set a callback function.""" - self._change_callback = change_callback - - def update_state(self, payload): - """Register a new settop box.""" - state = payload["state"] - if self.state == state: - return - self.state = state - if state == ONLINE_STANDBY: - self.playing_info.reset() - if self._change_callback: - self._change_callback(self.device_id) - else: - self._request_settop_box_state() - self._request_settop_box_recording_capacity() - - def update_recording_capacity(self, payload) -> None: - """Updates the recording capacity.""" - if "CPE.capacity" not in payload or "used" not in payload: - return - self.recording_capacity = payload["used"] - - def update_with_replay_event( - self, source_type: str, event: LGHorizonReplayEvent, channel: LGHorizonChannel - ) -> None: - """Update box with replay event.""" - self.playing_info.set_source_type(source_type) - self.playing_info.set_channel(channel.id) - self.playing_info.set_channel_title(channel.title) - title = event.title - if event.episode_name: - title += f": {event.episode_name}" - self.playing_info.set_title(title) - self.playing_info.set_image(channel.stream_image) - self.playing_info.reset_progress() - self._trigger_callback() - - def update_with_recording( - self, - source_type: str, - recording: LGHorizonRecordingSingle, - channel: LGHorizonChannel, - start: float, - end: float, - last_speed_change: float, - relative_position: float, - ) -> None: - """Update box with recording.""" - self.playing_info.set_source_type(source_type) - self.playing_info.set_channel(channel.id) - self.playing_info.set_channel_title(channel.title) - self.playing_info.set_title(f"{recording.title}") - self.playing_info.set_image(recording.image) - start_dt = datetime.fromtimestamp(start / 1000.0) - end_dt = datetime.fromtimestamp(end / 1000.0) - duration = (end_dt - start_dt).total_seconds() - self.playing_info.set_duration(duration) - self.playing_info.set_position(relative_position / 1000.0) - last_update_dt = datetime.fromtimestamp(last_speed_change / 1000.0) - self.playing_info.set_last_position_update(last_update_dt) - self._trigger_callback() - - def update_with_vod( - self, - source_type: str, - vod: LGHorizonVod, - last_speed_change: float, - relative_position: float, - ) -> None: - """Update box with vod.""" - self.playing_info.set_source_type(source_type) - self.playing_info.set_channel(None) - self.playing_info.set_channel_title(None) - self.playing_info.set_title(vod.title) - self.playing_info.set_image(None) - self.playing_info.set_duration(vod.duration) - self.playing_info.set_position(relative_position / 1000.0) - last_update_dt = datetime.fromtimestamp(last_speed_change / 1000.0) - self.playing_info.set_last_position_update(last_update_dt) - self._trigger_callback() - - def update_with_app(self, source_type: str, app: LGHorizonApp) -> None: - """Update box with app.""" - self.playing_info.set_source_type(source_type) - self.playing_info.set_channel(None) - self.playing_info.set_channel_title(app.title) - self.playing_info.set_title(app.title) - self.playing_info.set_image(app.image) - self.playing_info.reset_progress() - self._trigger_callback() - - def _trigger_callback(self): - if self._change_callback: - _logger.debug("Callback called from box %s", self.device_id) - self._change_callback(self.device_id) - - def turn_on(self) -> None: - """Turn the settop box on.""" - - if self.state == ONLINE_STANDBY: - self.send_key_to_box(MEDIA_KEY_POWER) - - def turn_off(self) -> None: - """Turn the settop box off.""" - if self.state == ONLINE_RUNNING: - self.send_key_to_box(MEDIA_KEY_POWER) - self.playing_info.reset() - - def pause(self) -> None: - """Pause the given settopbox.""" - if self.state == ONLINE_RUNNING and not self.playing_info.paused: - self.send_key_to_box(MEDIA_KEY_PLAY_PAUSE) - - def play(self) -> None: - """Resume the settopbox.""" - if self.state == ONLINE_RUNNING and self.playing_info.paused: - self.send_key_to_box(MEDIA_KEY_PLAY_PAUSE) - - def stop(self) -> None: - """Stop the settopbox.""" - if self.state == ONLINE_RUNNING: - self.send_key_to_box(MEDIA_KEY_STOP) - - def next_channel(self): - """Select the next channel for given settop box.""" - if self.state == ONLINE_RUNNING: - self.send_key_to_box(MEDIA_KEY_CHANNEL_UP) - - def previous_channel(self) -> None: - """Select the previous channel for given settop box.""" - if self.state == ONLINE_RUNNING: - self.send_key_to_box(MEDIA_KEY_CHANNEL_DOWN) - - def press_enter(self) -> None: - """Press enter on the settop box.""" - if self.state == ONLINE_RUNNING: - self.send_key_to_box(MEDIA_KEY_ENTER) - - def rewind(self) -> None: - """Rewind the settop box.""" - if self.state == ONLINE_RUNNING: - self.send_key_to_box(MEDIA_KEY_REWIND) - - def fast_forward(self) -> None: - """Fast forward the settop box.""" - if self.state == ONLINE_RUNNING: - self.send_key_to_box(MEDIA_KEY_FAST_FORWARD) - - def record(self): - """Record on the settop box.""" - if self.state == ONLINE_RUNNING: - self.send_key_to_box(MEDIA_KEY_RECORD) - - def is_available(self) -> bool: - """Return the availability of the settop box.""" - return self.state == ONLINE_RUNNING or self.state == ONLINE_STANDBY - - def set_channel(self, source: str) -> None: - """Change te channel from the settopbox.""" - channel = [src for src in self._channels.values() if src.title == source][0] - payload = ( - '{"id":"' - + make_id(8) - + '","type":"CPE.pushToTV","source":{"clientId":"' - + self._mqtt_client.client_id - + '","friendlyDeviceName":"Home Assistant"},' - + '"status":{"sourceType":"linear","source":{"channelId":"' - + channel.id - + '"},"relativePosition":0,"speed":1}}' - ) - - self._mqtt_client.publish_message( - f"{self._auth.household_id}/{self.device_id}", payload - ) - - def play_recording(self, recording_id): - """Play recording.""" - payload = ( - '{"id":"' - + make_id(8) - + '","type":"CPE.pushToTV","source":{"clientId":"' - + self._mqtt_client.client_id - + '","friendlyDeviceName":"Home Assistant"},' - + '"status":{"sourceType":"nDVR","source":{"recordingId":"' - + recording_id - + '"},"relativePosition":0}}' - ) - self._mqtt_client.publish_message( - f"{self._auth.household_id}/{self.device_id}", payload - ) - - def send_key_to_box(self, key: str) -> None: - """Send emulated (remote) key press to settopbox.""" - payload_dict = { - "type": "CPE.KeyEvent", - "runtimeType": "key", - "id": "ha", - "source": self.device_id.lower(), - "status": {"w3cKey": key, "eventType": "keyDownUp"}, - } - payload = json.dumps(payload_dict) - self._mqtt_client.publish_message( - f"{self._auth.household_id}/{self.device_id}", payload - ) - - def _set_unknown_channel_info(self) -> None: - """Set unknown channel info.""" - _logger.warning("Couldn't set channel. Channel info set to unknown...") - self.playing_info.set_source_type(BOX_PLAY_STATE_CHANNEL) - self.playing_info.set_channel(None) - self.playing_info.set_title("No information available") - self.playing_info.set_image(None) - self.playing_info.set_paused(False) - - def _request_settop_box_state(self) -> None: - """Send mqtt message to receive state from settop box.""" - topic = f"{self._auth.household_id}/{self.device_id}" - payload = { - "id": make_id(8), - "type": "CPE.getUiStatus", - "source": self._mqtt_client.client_id, - } - self._mqtt_client.publish_message(topic, json.dumps(payload)) - - def _request_settop_box_recording_capacity(self) -> None: - """Send mqtt message to receive state from settop box.""" - topic = f"{self._auth.household_id}/{self.device_id}" - payload = { - "id": make_id(8), - "type": "CPE.capacity", - "source": self._mqtt_client.client_id, - } - self._mqtt_client.publish_message(topic, json.dumps(payload)) - - -class LGHorizonProfile: - """LGHorizon profile.""" - - profile_id: str = None - name: str = None - favorite_channels: list[str] = None - - def __init__(self, json_payload): - self.profile_id = json_payload["profileId"] - self.name = json_payload["name"] - self.favorite_channels = json_payload["favoriteChannels"] - - -class LGHorizonCustomer: - """LGHorizon customer""" - - customer_id: str = None - hashed_customer_id: str = None - country_id: str = None - city_id: int = 0 - settop_boxes: list[str] = None - profiles: Dict[str, LGHorizonProfile] = {} - - def __init__(self, json_payload): - self.customer_id = json_payload["customerId"] - self.hashed_customer_id = json_payload["hashedCustomerId"] - self.country_id = json_payload["countryId"] - self.city_id = json_payload["cityId"] - if "assignedDevices" in json_payload: - self.settop_boxes = json_payload["assignedDevices"] - if "profiles" in json_payload: - for profile in json_payload["profiles"]: - self.profiles[profile["profileId"]] = LGHorizonProfile(profile) diff --git a/main.py b/main.py new file mode 100644 index 0000000..90aa922 --- /dev/null +++ b/main.py @@ -0,0 +1,85 @@ +"""Main class to test working of LG Horizon API""" + +import asyncio +import json +import logging +import sys # Import sys for stdin + +import aiohttp +import traceback + +from lghorizon.lghorizon_api import LGHorizonApi +from lghorizon.lghorizon_models import LGHorizonAuth + +# Define an asyncio Event to signal shutdown +shutdown_event = asyncio.Event() + + +async def read_input_and_signal_shutdown(): + """Reads a line from stdin and sets the shutdown event.""" + _LOGGER.info("Press Enter to gracefully shut down...") + # run_in_executor is used to run blocking I/O operations in a separate thread + # so it doesn't block the asyncio event loop. + await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline) + _LOGGER.info("Enter pressed, signaling shutdown.") + shutdown_event.set() + + +_LOGGER = logging.getLogger(__name__) + + +async def main(): + """Main function to run the LG Horizon API test script.""" + logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + filename="lghorizon.log", + filemode="w", + ) + + logging.info("Starting LG Horizon test script") + with open("secrets.json", encoding="utf-8") as f: + secrets = json.load(f) + username = secrets.get("username") + password = secrets.get("password") + country = secrets.get("country", "nl") + + async with aiohttp.ClientSession() as session: + auth = LGHorizonAuth(session, country, username=username, password=password) + api = LGHorizonApi(auth) + + # Start the input reader task + input_task = asyncio.create_task(read_input_and_signal_shutdown()) + + async def device_callback(device_id: str): + device = devices[device_id] + print( + f"Device {device.device_id} state changed. Status:\n\nName: {device.device_friendly_name}\nState: {device.device_state.state.value}\nChannel: {device.device_state.channel_name} ({device.device_state.channel_id})\nShow: {device.device_state.show_title}\nEpisode: {device.device_state.episode_title}\nSource type: {device.device_state.source_type.value}\nlast pos update: {device.device_state.last_position_update}\npos: {device.device_state.position}\nstart time: {device.device_state.start_time}\nend time: {device.device_state.end_time}\n\n", + ) + + try: + await api.initialize() + devices = await api.get_devices() + for device in devices.values(): + await device.set_callback(device_callback) + + quota = await api.get_recording_quota() + print(f"Recording occupancy: {quota.percentage_used}") + # Wait until the shutdown event is set + await shutdown_event.wait() + + except Exception as e: + print(f"An error occurred: {e}") + _LOGGER.error("An error occurred: %s", e, exc_info=True) + finally: + _LOGGER.info("Shutting down API and cancelling input task.") + input_task.cancel() + try: + await input_task # Await to let it clean up if it was cancelled + except asyncio.CancelledError: + pass # Expected if cancelled + await api.disconnect() + _LOGGER.info("Shutdown complete.") + + +asyncio.run(main()) diff --git a/renovate.json b/renovate.json index 9a3152d..1cc015e 100644 --- a/renovate.json +++ b/renovate.json @@ -1,7 +1,7 @@ { "$schema": "https://docs.renovatebot.com/renovate-schema.json", - "extends": [ - "config:recommended", - ":dependencyDashboard" - ] + "extends": ["config:recommended", ":dependencyDashboard"], + "constraints": { + "python": "==3.13.*" + } } diff --git a/test.py b/test.py deleted file mode 100644 index 88548fe..0000000 --- a/test.py +++ /dev/null @@ -1,84 +0,0 @@ -""" "Test the component.""" - -import json -import logging -import time -from lghorizon import LGHorizonApi - -api: LGHorizonApi - -logging.basicConfig( - level=logging.DEBUG, - format="%(asctime)s - %(levelname)s - %(message)s", -) - -_Logger = logging.getLogger() - -file_handler = logging.FileHandler("logfile.log", mode="w") -file_handler.setLevel(logging.DEBUG) -_Logger.addHandler(file_handler) - -console_handler = logging.StreamHandler() -console_handler.setLevel(logging.DEBUG) -_Logger.addHandler(console_handler) - -secrets: dict[str, str] = None - - -def read_secrets(file_path): - """Read secrets from file.""" - try: - with open(file_path, "r", encoding="UTF-8") as file: - return json.load(file) - except FileNotFoundError: - print(f"Error: Secrets file not found at {file_path}") - return {} - except json.JSONDecodeError: - print(f"Error: Unable to decode JSON in {file_path}") - return {} - - -def event_loop(): - """Default event loop.""" - while True: - time.sleep(1) # Simulate some work - - # Check for a breaking condition - if break_condition(): - break - - -def break_condition(): - """Break event loop on conditions.""" - # Implement your breaking condition logic here - return False # Change this condition based on your requirements - - -if __name__ == "__main__": - try: - secrets = read_secrets("secrets.json") - - refresh_token: str = None - if "refresh_token" in secrets: - refresh_token = secrets["refresh_token"] - - profile_id: str = None - if "profile_id" in secrets: - profile_id = secrets["profile_id"] - - api = LGHorizonApi( - secrets["username"], - secrets["password"], - secrets["country"], - # identifier="DTV3907048", - refresh_token=refresh_token, - profile_id=profile_id, - ) - api.connect() - event_loop() - except KeyboardInterrupt: - print("\nScript interrupted by user.") - finally: - print("Script is exiting.") - if api: - api.disconnect()