diff --git a/TASK.md b/TASK.md index 5c6d2c7..501e133 100644 --- a/TASK.md +++ b/TASK.md @@ -102,3 +102,6 @@ ## 2025-11-11 - [x] Pin Reticulum (RNS) to 1.0.2 and LXMF to 0.9.2 in project dependencies. - [x] Centralise payload conversion utilities and refactor EmergencyManagement client and gateway to use them. + +## 2025-11-12 +- [x] Introduce FastAPI integration helpers for LXMF configuration, dependencies, and command routing. diff --git a/docs/fastapi_integration.md b/docs/fastapi_integration.md new file mode 100644 index 0000000..f683a22 --- /dev/null +++ b/docs/fastapi_integration.md @@ -0,0 +1,104 @@ +# FastAPI Integration Helpers + +The `reticulum_openapi.integrations.fastapi` package centralises shared +infrastructure for building LXMF-aware FastAPI applications. It provides +configuration models, lifecycle management utilities, reusable command execution +helpers, and diagnostics that were previously embedded inside the Emergency +Management example services. + +## Configuration Settings + +Use `LXMFClientSettings` together with `create_settings_loader` (or +`load_lxmf_client_settings`) to populate LXMF client configuration from JSON +files or environment variables. Values mirror the historical +`north_api.config.NorthAPIClientSettings` fields: + +```python +from pathlib import Path +from reticulum_openapi.integrations.fastapi import create_settings_loader + +loader = create_settings_loader( + default_path=Path("./client_config.json"), + env_json_var="NORTH_API_CONFIG_JSON", + env_path_var="NORTH_API_CONFIG_PATH", +) +settings = loader() +``` + +All values are normalised (paths are stripped, RPC keys lower-cased) and an +optional `require_server_identity` flag enforces mandatory server identity +configuration when desired. + +## Managing LXMF Clients + +`LXMFClientManager` wraps client instantiation and lifecycle management. It +produces a singleton LXMF client, handles optional announce broadcasts, and can +attach notification bridges during FastAPI startup/shutdown events: + +```python +from fastapi import FastAPI +from reticulum_openapi.integrations.fastapi import LXMFClientManager +from reticulum_openapi.integrations.fastapi import LXMFClientSettings + +settings = LXMFClientSettings(server_identity_hash="001122...") +manager = LXMFClientManager(lambda: settings) +app = FastAPI() +manager.register_events(app) +``` + +The manager exposes `get_client()` for dependency injection and +`get_server_identity()` for resolving default server targets. The Emergency +Management northbound API now consumes this helper directly. + +## Link Management and Interface Status + +`LinkManager` tracks LXMF link attempts, retries connections with backoff, and +records structured status suitable for status endpoints. Applications can start +or stop the retry loop during FastAPI lifecycle events, and the resulting +`LinkStatus` model integrates with diagnostics endpoints. The +`gather_interface_status()` helper inspects active Reticulum interfaces and is +used by the Emergency Management gateway to publish interface metadata. + +## Command Execution Contexts + +To remove repetitive boilerplate from FastAPI routes, the integration package +introduces `CommandSpec` and `create_command_context_dependency`. A command +context resolves the destination server identity (via query parameter, header, +or configured default), handles dataclass payload preparation, and translates +common LXMF errors into HTTP responses: + +```python +from typing import Annotated, Dict +from fastapi import Depends, FastAPI +from reticulum_openapi.integrations.fastapi import ( + CommandSpec, + LXMFCommandContext, + LXMFClientManager, + create_command_context_dependency, +) + +manager = LXMFClientManager(loader) +command_specs = { + "eam:create": CommandSpec(command="CreateEmergencyActionMessage") +} +CommandContext = Annotated[ + LXMFCommandContext, + Depends(create_command_context_dependency(manager, command_specs)), +] + +@app.post("/emergency-action-messages") +async def create_eam(payload: Dict[str, str], context: CommandContext): + return await context.execute("eam:create", body=payload) +``` + +Routes simply supply command metadata and optional payload overrides. The +Emergency Management gateway has been refactored to use this shared context for +all LXMF interactions. + +## Example Adoption + +Both the Emergency Management FastAPI gateway and the northbound API now rely on +`reticulum_openapi.integrations.fastapi` for configuration, dependency +injection, link status tracking, and command execution. Tests under +`tests/integrations/fastapi/` cover the integration points, ensuring lifecycle +hooks and status reporting remain functional across applications. diff --git a/examples/EmergencyManagement/client/north_api/config.py b/examples/EmergencyManagement/client/north_api/config.py index 095d5b2..afc2acc 100644 --- a/examples/EmergencyManagement/client/north_api/config.py +++ b/examples/EmergencyManagement/client/north_api/config.py @@ -2,129 +2,41 @@ from __future__ import annotations -import json -import os -from functools import lru_cache from pathlib import Path -from typing import Any -from typing import Dict -from typing import Optional -from dotenv import load_dotenv -from pydantic import BaseModel -from pydantic import Field -from pydantic import field_validator +from reticulum_openapi.integrations.fastapi import LXMFClientSettings +from reticulum_openapi.integrations.fastapi import create_settings_loader +from reticulum_openapi.integrations.fastapi import load_lxmf_client_settings -load_dotenv() - CONFIG_JSON_ENV_VAR = "NORTH_API_CONFIG_JSON" CONFIG_PATH_ENV_VAR = "NORTH_API_CONFIG_PATH" -DEFAULT_CONFIG_PATH = Path(__file__).resolve().parent.parent / "client_config.json" - - -class NorthAPIClientSettings(BaseModel): - """Pydantic model describing LXMF client configuration values.""" - - server_identity_hash: str = Field(..., min_length=1) - client_display_name: str = Field("EmergencyClient", min_length=1) - request_timeout_seconds: float = Field(300.0, ge=0.0) - lxmf_config_path: Optional[str] = None - lxmf_storage_path: Optional[str] = None - shared_instance_rpc_key: Optional[str] = None - - @field_validator("server_identity_hash") - def _validate_server_identity_hash(cls, value: str) -> str: - """Ensure the server identity hash contains hexadecimal characters.""" - - cleaned = value.strip() - if not cleaned: - raise ValueError("server_identity_hash cannot be empty") - return cleaned.lower() - - @field_validator("client_display_name") - def _validate_display_name(cls, value: str) -> str: - """Ensure the display name is not blank.""" - - cleaned = value.strip() - if not cleaned: - raise ValueError("client_display_name cannot be empty") - return cleaned - - @field_validator("lxmf_config_path", "lxmf_storage_path", mode="before") - def _normalise_optional_paths(cls, value: Optional[str]) -> Optional[str]: - """Return ``None`` when optional string values are empty.""" - - if value is None: - return None - cleaned = str(value).strip() - return cleaned or None - - @field_validator("shared_instance_rpc_key", mode="before") - def _validate_shared_instance_rpc_key( - cls, value: Optional[str] - ) -> Optional[str]: - """Normalise and validate optional RPC key overrides.""" - - if value is None: - return None +DEFAULT_CONFIG_PATH = ( + Path(__file__).resolve().parent.parent / "client_config.json" +) - cleaned = str(value).strip() - if not cleaned: - return None +NorthAPIClientSettings = LXMFClientSettings - try: - bytes.fromhex(cleaned) - except ValueError as exc: - raise ValueError( - "shared_instance_rpc_key must be a hexadecimal string" - ) from exc - - return cleaned.lower() - - -def _load_config_from_json(raw_json: str) -> Dict[str, Any]: - """Return configuration data parsed from a raw JSON string.""" - - try: - return json.loads(raw_json) - except json.JSONDecodeError as exc: # pragma: no cover - defensive logging - raise ValueError("Invalid JSON supplied via environment variable") from exc - - -def _load_config_from_path(path: Path) -> Dict[str, Any]: - """Return configuration data parsed from a JSON file.""" - - if not path.exists(): - raise FileNotFoundError(f"Configuration file not found: {path}") - with path.open("r", encoding="utf-8") as handle: - return json.load(handle) - - -def _resolve_config_source() -> Dict[str, Any]: - """Return configuration data from environment or default JSON file.""" - - raw_json = os.getenv(CONFIG_JSON_ENV_VAR) - if raw_json: - return _load_config_from_json(raw_json) - - path_override = os.getenv(CONFIG_PATH_ENV_VAR) - config_path = Path(path_override) if path_override else DEFAULT_CONFIG_PATH - return _load_config_from_path(config_path) +_SETTINGS_LOADER = create_settings_loader( + default_path=DEFAULT_CONFIG_PATH, + env_json_var=CONFIG_JSON_ENV_VAR, + env_path_var=CONFIG_PATH_ENV_VAR, + require_server_identity=True, +) def load_config() -> NorthAPIClientSettings: """Create a configuration model populated with LXMF client values.""" - data = _resolve_config_source() - return NorthAPIClientSettings(**data) - + return load_lxmf_client_settings( + default_path=DEFAULT_CONFIG_PATH, + env_json_var=CONFIG_JSON_ENV_VAR, + env_path_var=CONFIG_PATH_ENV_VAR, + require_server_identity=True, + ) -@lru_cache(maxsize=1) -def get_config() -> NorthAPIClientSettings: - """Return a cached configuration instance for dependency injection.""" - return load_config() +get_config = _SETTINGS_LOADER __all__ = ["NorthAPIClientSettings", "get_config", "load_config"] diff --git a/examples/EmergencyManagement/client/north_api/dependencies.py b/examples/EmergencyManagement/client/north_api/dependencies.py index bc03436..f7cc84d 100644 --- a/examples/EmergencyManagement/client/north_api/dependencies.py +++ b/examples/EmergencyManagement/client/north_api/dependencies.py @@ -1,70 +1,35 @@ -"""Dependency wiring for the emergency management north API client.""" +"""FastAPI dependencies for the Emergency Management northbound API.""" from __future__ import annotations -import logging -from typing import Annotated, Optional +from typing import Annotated from fastapi import Depends from fastapi import FastAPI from reticulum_openapi.client import LXMFClient +from reticulum_openapi.integrations.fastapi import LXMFClientManager from .config import NorthAPIClientSettings from .config import get_config -logger = logging.getLogger(__name__) -_client_instance: Optional[LXMFClient] = None - - -def _create_client(settings: NorthAPIClientSettings) -> LXMFClient: - """Instantiate the LXMF client using configuration values.""" - - return LXMFClient( - config_path=settings.lxmf_config_path, - storage_path=settings.lxmf_storage_path, - display_name=settings.client_display_name, - timeout=settings.request_timeout_seconds, - shared_instance_rpc_key=settings.shared_instance_rpc_key, - ) - - -def startup_client() -> LXMFClient: - """Initialise the singleton LXMF client instance if required.""" - - global _client_instance - if _client_instance is None: - settings = get_config() - _client_instance = _create_client(settings) - return _client_instance - - -def shutdown_client() -> None: - """Stop the LXMF client instance and release related resources.""" - - global _client_instance - if _client_instance is None: - return - try: - _client_instance.stop_listening_for_announces() - except Exception: # pragma: no cover - defensive cleanup - logger.debug("Failed to stop announce listener during shutdown", exc_info=True) - _client_instance = None +_client_manager = LXMFClientManager(get_config) def get_lxmf_client() -> LXMFClient: """Return the configured LXMF client instance.""" - if _client_instance is None: - raise RuntimeError("LXMF client has not been initialised") - return _client_instance + return _client_manager.get_client() def get_server_identity_hash() -> str: """Return the configured server identity hash without user interaction.""" - return get_config().server_identity_hash + identity = _client_manager.get_server_identity() + if identity is None: # pragma: no cover - configuration guard + raise RuntimeError("server_identity_hash must be configured") + return identity ServerIdentityHash = Annotated[str, Depends(get_server_identity_hash)] @@ -73,20 +38,13 @@ def get_server_identity_hash() -> str: def register_client_events(app: FastAPI) -> None: """Attach lifecycle events for creating and shutting down the client.""" - @app.on_event("startup") - async def _startup() -> None: - startup_client() - - @app.on_event("shutdown") - async def _shutdown() -> None: - shutdown_client() + _client_manager.register_events(app) __all__ = [ + "NorthAPIClientSettings", "ServerIdentityHash", "get_lxmf_client", "get_server_identity_hash", "register_client_events", - "shutdown_client", - "startup_client", ] diff --git a/examples/EmergencyManagement/web_gateway/app.py b/examples/EmergencyManagement/web_gateway/app.py index 297ab72..b5521bf 100644 --- a/examples/EmergencyManagement/web_gateway/app.py +++ b/examples/EmergencyManagement/web_gateway/app.py @@ -2,27 +2,19 @@ from __future__ import annotations -import asyncio -import json import logging -from contextlib import suppress -from dataclasses import dataclass -from datetime import datetime, timezone -from pathlib import Path -from typing import ( - Any, - Awaitable, - Callable, - Dict, - List, - Optional, -) - -from importlib import metadata import os - -from dotenv import load_dotenv -from fastapi import Depends, FastAPI, Header, HTTPException, Query, status +from datetime import datetime +from datetime import timezone +from importlib import metadata +from typing import Annotated +from typing import Any +from typing import Dict +from typing import List +from typing import Optional + +from fastapi import Depends +from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse @@ -33,29 +25,23 @@ Event, ) from examples.EmergencyManagement.client.client import LXMFClient -from examples.EmergencyManagement.client.client_emergency import ( - CLIENT_DISPLAY_NAME_KEY, - CONFIG_PATH, - DEFAULT_DISPLAY_NAME, - DEFAULT_TIMEOUT_SECONDS, - LXMF_CONFIG_PATH_KEY, - LXMF_STORAGE_PATH_KEY, - REQUEST_TIMEOUT_KEY, - SHARED_INSTANCE_RPC_KEY, - load_client_config, - read_server_identity_from_config, -) +from examples.EmergencyManagement.client.client_emergency import CONFIG_PATH from reticulum_openapi.api.notifications import ( attach_client_notifications, router as notifications_router, ) -from reticulum_openapi.conversion import normalise_response -from reticulum_openapi.conversion import prepare_dataclass_payload +from reticulum_openapi.integrations.fastapi import CommandSpec +from reticulum_openapi.integrations.fastapi import LXMFCommandContext +from reticulum_openapi.integrations.fastapi import LXMFClientManager +from reticulum_openapi.integrations.fastapi import LinkManager +from reticulum_openapi.integrations.fastapi import create_command_context_dependency +from reticulum_openapi.integrations.fastapi import create_settings_loader +from reticulum_openapi.integrations.fastapi import gather_interface_status -from examples.EmergencyManagement.web_gateway.interface_status import ( - gather_interface_status, -) +logger = logging.getLogger(__name__) +CONFIG_JSON_ENV_VAR = "NORTH_API_CONFIG_JSON" +CONFIG_PATH_ENV_VAR = "NORTH_API_CONFIG_PATH" COMMAND_CREATE_EAM = "CreateEmergencyActionMessage" COMMAND_DELETE_EAM = "DeleteEmergencyActionMessage" @@ -70,19 +56,71 @@ COMMAND_RETRIEVE_EVENT = "RetrieveEvent" -ConfigDict = Dict[str, Any] +def _parse_allowed_origins(raw_value: Optional[str]) -> List[str]: + """Return a list of allowed origins parsed from an environment variable.""" + if not raw_value: + return [] + origins = [] + for candidate in raw_value.split(","): + cleaned = candidate.strip() + if cleaned: + origins.append(cleaned) + return origins -@dataclass(frozen=True) -class CommandSpec: - """Describe an LXMF command handled by the gateway.""" - command: str - request_type: Optional[Any] = None - response_type: Optional[Any] = None - path_field: Optional[str] = None +_ALLOWED_ORIGINS: List[str] = _parse_allowed_origins( + os.getenv("EMERGENCY_GATEWAY_ALLOWED_ORIGINS") +) +if not _ALLOWED_ORIGINS: + _ALLOWED_ORIGINS = ["*"] +app = FastAPI(title="Emergency Management Gateway") +app.add_middleware( + CORSMiddleware, + allow_origins=_ALLOWED_ORIGINS, + allow_credentials=False, + allow_methods=["*"], + allow_headers=["*"], +) +app.include_router(notifications_router) + + +def _resolve_gateway_version() -> str: + """Return the installed package version or a development placeholder.""" + + try: + return metadata.version("reticulum-openapi") + except metadata.PackageNotFoundError: + return "0.1.0-dev" + + +_SETTINGS_LOADER = create_settings_loader( + default_path=CONFIG_PATH, + env_json_var=CONFIG_JSON_ENV_VAR, + env_path_var=CONFIG_PATH_ENV_VAR, +) + + +def _create_client(settings) -> LXMFClient: + """Instantiate the shared LXMF client based on configuration data.""" + + client = LXMFClient( + config_path=settings.lxmf_config_path, + storage_path=settings.lxmf_storage_path, + display_name=settings.client_display_name, + timeout=settings.request_timeout_seconds, + shared_instance_rpc_key=settings.shared_instance_rpc_key, + ) + return client + + +_CLIENT_MANAGER = LXMFClientManager(_SETTINGS_LOADER, client_factory=_create_client) +_CLIENT_MANAGER.register_events( + app, attach_notifications=attach_client_notifications +) +_LINK_MANAGER = LinkManager(_CLIENT_MANAGER.get_client) _COMMAND_SPECS: Dict[str, CommandSpec] = { "eam:create": CommandSpec( command=COMMAND_CREATE_EAM, @@ -132,133 +170,24 @@ class CommandSpec: ), } -logger = logging.getLogger(__name__) - -load_dotenv() - -app = FastAPI(title="Emergency Management Gateway") -app.include_router(notifications_router) - - -def _parse_allowed_origins(raw_value: Optional[str]) -> List[str]: - """Return a list of allowed origins parsed from an environment variable.""" - - if not raw_value: - return [] - origins = [] - for candidate in raw_value.split(","): - cleaned = candidate.strip() - if cleaned: - origins.append(cleaned) - return origins - -_ALLOWED_ORIGINS: List[str] = _parse_allowed_origins( - os.getenv("EMERGENCY_GATEWAY_ALLOWED_ORIGINS") +_CommandContextDependency = create_command_context_dependency( + _CLIENT_MANAGER, _COMMAND_SPECS ) -if not _ALLOWED_ORIGINS: - _ALLOWED_ORIGINS = ["*"] +CommandContext = Annotated[LXMFCommandContext, Depends(_CommandContextDependency)] -app.add_middleware( - CORSMiddleware, - allow_origins=_ALLOWED_ORIGINS, - allow_credentials=False, - allow_methods=["*"], - allow_headers=["*"], -) - - -def _resolve_gateway_version() -> str: - """Return the installed package version or a development placeholder.""" - - try: - return metadata.version("reticulum-openapi") - except metadata.PackageNotFoundError: - return "0.1.0-dev" - - -_CONFIG_JSON_ENV_VAR = "NORTH_API_CONFIG_JSON" -_CONFIG_PATH_ENV_VAR = "NORTH_API_CONFIG_PATH" - - -def _load_gateway_config() -> ConfigDict: - """Load configuration data for the gateway. - - Returns: - ConfigDict: Parsed configuration values describing the LXMF client. - """ - - raw_json = os.getenv(_CONFIG_JSON_ENV_VAR) - if raw_json: - try: - parsed = json.loads(raw_json) - except json.JSONDecodeError: - parsed = None - else: - if isinstance(parsed, dict): - return parsed - # Fall back to path-based configuration when JSON is invalid or not a mapping. - - path_override = os.getenv(_CONFIG_PATH_ENV_VAR) - if path_override: - try: - override_path = Path(path_override).expanduser() - except (TypeError, ValueError): - override_path = None - if override_path: - data = load_client_config(override_path) - if data: - return data - - return load_client_config(CONFIG_PATH) - - -_CONFIG_DATA: ConfigDict = _load_gateway_config() -_DEFAULT_SERVER_IDENTITY: Optional[str] = read_server_identity_from_config( - CONFIG_PATH, _CONFIG_DATA -) -_CLIENT_INSTANCE: Optional[LXMFClient] = None _GATEWAY_VERSION: str = _resolve_gateway_version() _START_TIME: datetime = datetime.now(timezone.utc) -_NOTIFICATION_UNSUBSCRIBER: Optional[Callable[[], Awaitable[None]]] = None -_LINK_RETRY_DELAY_SECONDS: float = 5.0 -_LINK_TASK: Optional[asyncio.Task[None]] = None _INTERFACE_STATUS: List[Dict[str, Any]] = [] -def _format_timestamp(value: Optional[datetime]) -> Optional[str]: - """Return an ISO formatted timestamp in UTC when available.""" - - if value is None: - return None - return value.astimezone(timezone.utc).isoformat() - - -@dataclass -class _LinkStatus: - """Track the gateway's most recent LXMF link attempt.""" - - state: str = "pending" - message: Optional[str] = None - server_identity: Optional[str] = None - last_attempt: Optional[datetime] = None - last_success: Optional[datetime] = None - last_error: Optional[str] = None - - def to_dict(self) -> Dict[str, Optional[str]]: - """Return a serialisable mapping describing the link state.""" - - return { - "state": self.state, - "message": self.message, - "serverIdentity": self.server_identity, - "lastAttempt": _format_timestamp(self.last_attempt), - "lastSuccess": _format_timestamp(self.last_success), - "lastError": self.last_error, - } - +def _format_uptime(uptime_seconds: float) -> str: + """Format seconds since startup as an ``HH:MM:SS`` string.""" -_LINK_STATUS = _LinkStatus() + total_seconds = int(max(uptime_seconds, 0)) + hours, remainder = divmod(total_seconds, 3600) + minutes, seconds = divmod(remainder, 60) + return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def _refresh_interface_status() -> List[Dict[str, Any]]: @@ -269,228 +198,29 @@ def _refresh_interface_status() -> List[Dict[str, Any]]: return _INTERFACE_STATUS -def _normalise_optional_path(value: Optional[str]) -> Optional[str]: - """Return a stripped path string or ``None`` when empty.""" - - if isinstance(value, str): - cleaned = value.strip() - return cleaned or None - return None - - -def _normalise_optional_hex(value: Optional[str]) -> Optional[str]: - """Return a stripped hexadecimal string or ``None`` when empty.""" - - if isinstance(value, str): - cleaned = value.strip() - return cleaned or None - return None - - -def _resolve_timeout(config: ConfigDict) -> float: - """Return the timeout value configured for the client.""" - - timeout_setting = config.get(REQUEST_TIMEOUT_KEY) - if isinstance(timeout_setting, (int, float)) and timeout_setting > 0: - return float(timeout_setting) - return DEFAULT_TIMEOUT_SECONDS - - -def _resolve_display_name(config: ConfigDict) -> str: - """Return the configured display name or the default when missing.""" - - display_name = config.get(CLIENT_DISPLAY_NAME_KEY) - if isinstance(display_name, str) and display_name.strip(): - return display_name.strip() - return DEFAULT_DISPLAY_NAME - - -def _create_client_from_config() -> LXMFClient: - """Instantiate the shared LXMF client based on configuration data.""" - - config_path_override = _normalise_optional_path( - _CONFIG_DATA.get(LXMF_CONFIG_PATH_KEY) - ) - storage_path_override = _normalise_optional_path( - _CONFIG_DATA.get(LXMF_STORAGE_PATH_KEY) - ) - rpc_key_override = _normalise_optional_hex( - _CONFIG_DATA.get(SHARED_INSTANCE_RPC_KEY) - ) - timeout_seconds = _resolve_timeout(_CONFIG_DATA) - display_name = _resolve_display_name(_CONFIG_DATA) - - client = LXMFClient( - config_path=config_path_override, - storage_path=storage_path_override, - display_name=display_name, - timeout=timeout_seconds, - shared_instance_rpc_key=rpc_key_override, - ) - client.announce() - return client - - -def get_shared_client() -> LXMFClient: - """Return the shared LXMF client, creating it if necessary.""" - - global _CLIENT_INSTANCE - if _CLIENT_INSTANCE is None: - _CLIENT_INSTANCE = _create_client_from_config() - return _CLIENT_INSTANCE - - -def _record_link_failure(server_identity: str, error: Exception) -> None: - """Update the link status after a failed connection attempt.""" - - _LINK_STATUS.state = "connecting" - _LINK_STATUS.last_error = str(error) - _LINK_STATUS.message = ( - "Link to LXMF server " - f"{server_identity} failed: {error}. " - f"Retrying in {_LINK_RETRY_DELAY_SECONDS:.1f} seconds." - ) - logger.warning("LXMF link to server %s failed: %s", server_identity, error) - - -def _record_link_success(server_identity: str, attempt_time: datetime) -> None: - """Update link status and log a successful connection.""" - - _LINK_STATUS.state = "connected" - _LINK_STATUS.last_success = attempt_time - _LINK_STATUS.last_error = None - message = f"Connected to LXMF server {server_identity}" - _LINK_STATUS.message = message - print(f"[Emergency Gateway] {message}") - logger.info("Established LXMF link with server %s", server_identity) - - -async def _ensure_link_with_retry(client: LXMFClient, server_identity: str) -> None: - """Continuously attempt to connect the LXMF client to the server.""" - - while True: - attempt_time = datetime.now(timezone.utc) - _LINK_STATUS.last_attempt = attempt_time - try: - await client.ensure_link(server_identity) - except asyncio.CancelledError: - raise - except Exception as exc: - _record_link_failure(server_identity, exc) - await asyncio.sleep(_LINK_RETRY_DELAY_SECONDS) - else: - _record_link_success(server_identity, attempt_time) - break - - -def _format_uptime(uptime_seconds: float) -> str: - """Format seconds since startup as an ``HH:MM:SS`` string.""" - - total_seconds = int(max(uptime_seconds, 0)) - hours, remainder = divmod(total_seconds, 3600) - minutes, seconds = divmod(remainder, 60) - return f"{hours:02d}:{minutes:02d}:{seconds:02d}" - - @app.on_event("startup") async def _startup() -> None: """Ensure the LXMF client is ready before serving requests.""" - client = get_shared_client() + _CLIENT_MANAGER.get_client() interface_status = _refresh_interface_status() active_interfaces = [ status["name"] for status in interface_status if status.get("online") ] if active_interfaces: joined = ", ".join(active_interfaces) - print("[Emergency Gateway] Active Reticulum interfaces: " f"{joined}") + print(f"[Emergency Gateway] Active Reticulum interfaces: {joined}") else: print("[Emergency Gateway] No active Reticulum interfaces reported.") - global _LINK_TASK - server_identity = _DEFAULT_SERVER_IDENTITY - if server_identity: - _LINK_STATUS.server_identity = server_identity - _LINK_STATUS.state = "connecting" - _LINK_STATUS.last_error = None - _LINK_STATUS.message = f"Attempting to connect to LXMF server {server_identity}" - if _LINK_TASK is None or _LINK_TASK.done(): - _LINK_TASK = asyncio.create_task( - _ensure_link_with_retry(client, server_identity) - ) - else: - _LINK_STATUS.state = "unconfigured" - _LINK_STATUS.message = "Server identity hash not configured." - global _NOTIFICATION_UNSUBSCRIBER - if _NOTIFICATION_UNSUBSCRIBER is None and hasattr( - client, "add_notification_listener" - ): - _NOTIFICATION_UNSUBSCRIBER = await attach_client_notifications(client) + _LINK_MANAGER.start(_CLIENT_MANAGER.get_server_identity()) @app.on_event("shutdown") async def _shutdown() -> None: - """Tear down the notification bridge on application shutdown.""" - - global _LINK_TASK - if _LINK_TASK is not None: - _LINK_TASK.cancel() - with suppress(asyncio.CancelledError): - await _LINK_TASK - _LINK_TASK = None - - global _NOTIFICATION_UNSUBSCRIBER - if _NOTIFICATION_UNSUBSCRIBER is None: - return - unsubscribe = _NOTIFICATION_UNSUBSCRIBER - _NOTIFICATION_UNSUBSCRIBER = None - await unsubscribe() - - -async def _send_command( - server_identity: str, - command: str, - payload: Optional[object], - response_type: Optional[Any] = None, -) -> JSONResponse: - """Send a command through LXMF and return the decoded response.""" + """Tear down background tasks on application shutdown.""" - client = get_shared_client() - try: - response = await client.send_command( - server_identity, - command, - payload, - await_response=True, - response_type=response_type, - ) - except TimeoutError as exc: - logger.error( - "LXMF gateway command '%s' to server %s timed out: %s", - command, - server_identity, - exc, - ) - raise HTTPException( - status_code=status.HTTP_504_GATEWAY_TIMEOUT, - detail=str(exc), - ) from exc - except ValueError as exc: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=str(exc), - ) from exc - except Exception as exc: # pragma: no cover - defensive path - raise HTTPException( - status_code=status.HTTP_502_BAD_GATEWAY, - detail=str(exc), - ) from exc - - if response is None: - return JSONResponse(content=None) - - normalised = normalise_response(response) - return JSONResponse(content=normalised) + await _LINK_MANAGER.stop() @app.get("/") @@ -498,216 +228,125 @@ async def get_gateway_status() -> Dict[str, Any]: """Return gateway metadata and configuration details.""" uptime_seconds = (datetime.now(timezone.utc) - _START_TIME).total_seconds() - config_path_override = _normalise_optional_path( - _CONFIG_DATA.get(LXMF_CONFIG_PATH_KEY) - ) - storage_path_override = _normalise_optional_path( - _CONFIG_DATA.get(LXMF_STORAGE_PATH_KEY) - ) + settings = _CLIENT_MANAGER.get_settings() interface_status = _refresh_interface_status() return { "version": _GATEWAY_VERSION, "uptime": _format_uptime(uptime_seconds), - "serverIdentity": _DEFAULT_SERVER_IDENTITY, - "clientDisplayName": _resolve_display_name(_CONFIG_DATA), - "requestTimeoutSeconds": _resolve_timeout(_CONFIG_DATA), - "lxmfConfigPath": config_path_override or str(CONFIG_PATH), - "lxmfStoragePath": storage_path_override, + "serverIdentity": settings.server_identity_hash, + "clientDisplayName": settings.client_display_name, + "requestTimeoutSeconds": settings.request_timeout_seconds, + "lxmfConfigPath": settings.lxmf_config_path or str(CONFIG_PATH), + "lxmfStoragePath": settings.lxmf_storage_path, "allowedOrigins": _ALLOWED_ORIGINS, - "linkStatus": _LINK_STATUS.to_dict(), + "linkStatus": _LINK_MANAGER.status.to_dict(), "reticulumInterfaces": interface_status, } -async def _resolve_server_identity( - server_identity_query: Optional[str] = Query(None, alias="server_identity"), - server_identity_header: Optional[str] = Header(None, alias="X-Server-Identity"), -) -> str: - """Determine the destination server identity hash for a request.""" - - candidate = ( - server_identity_query or server_identity_header or _DEFAULT_SERVER_IDENTITY - ) - if candidate is None: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Server identity hash is required", - ) - try: - return LXMFClient._normalise_destination_hex(candidate) - except (TypeError, ValueError) as exc: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=str(exc), - ) from exc - - @app.post("/emergency-action-messages") async def create_emergency_action_message( payload: Dict[str, Any], - server_identity: str = Depends(_resolve_server_identity), + context: CommandContext, ) -> JSONResponse: """Create a new emergency action message via LXMF.""" - spec = _COMMAND_SPECS["eam:create"] - message = prepare_dataclass_payload(spec.request_type, dict(payload)) - return await _send_command( - server_identity, - spec.command, - message, - response_type=spec.response_type, - ) + return await context.execute("eam:create", body=payload) @app.delete("/emergency-action-messages/{callsign}") async def delete_emergency_action_message( callsign: str, - server_identity: str = Depends(_resolve_server_identity), + context: CommandContext, ) -> JSONResponse: """Delete an emergency action message by callsign.""" - spec = _COMMAND_SPECS["eam:delete"] - return await _send_command( - server_identity, - spec.command, - callsign, - response_type=spec.response_type, - ) + return await context.execute("eam:delete", payload=callsign) @app.get("/emergency-action-messages") async def list_emergency_action_messages( - server_identity: str = Depends(_resolve_server_identity), + context: CommandContext, ) -> JSONResponse: - """List stored emergency action messages.""" - - spec = _COMMAND_SPECS["eam:list"] - return await _send_command( - server_identity, - spec.command, - None, - response_type=spec.response_type, - ) + """List emergency action messages stored on the server.""" + + return await context.execute("eam:list") @app.put("/emergency-action-messages/{callsign}") async def update_emergency_action_message( callsign: str, payload: Dict[str, Any], - server_identity: str = Depends(_resolve_server_identity), + context: CommandContext, ) -> JSONResponse: """Update an existing emergency action message.""" - spec = _COMMAND_SPECS["eam:update"] - overrides = {spec.path_field: callsign} if spec.path_field else {"callsign": callsign} - message = prepare_dataclass_payload( - spec.request_type, dict(payload), overrides=overrides - ) - return await _send_command( - server_identity, - spec.command, - message, - response_type=spec.response_type, + return await context.execute( + "eam:update", body=payload, path_params={"callsign": callsign} ) @app.get("/emergency-action-messages/{callsign}") async def retrieve_emergency_action_message( callsign: str, - server_identity: str = Depends(_resolve_server_identity), + context: CommandContext, ) -> JSONResponse: """Retrieve an emergency action message by callsign.""" - spec = _COMMAND_SPECS["eam:retrieve"] - return await _send_command( - server_identity, - spec.command, - callsign, - response_type=spec.response_type, - ) + return await context.execute("eam:retrieve", payload=callsign) @app.post("/events") async def create_event( payload: Dict[str, Any], - server_identity: str = Depends(_resolve_server_identity), + context: CommandContext, ) -> JSONResponse: """Create a new event record via LXMF.""" - spec = _COMMAND_SPECS["event:create"] - event = prepare_dataclass_payload(spec.request_type, dict(payload)) - return await _send_command( - server_identity, - spec.command, - event, - response_type=spec.response_type, - ) + return await context.execute("event:create", body=payload) @app.delete("/events/{uid}") async def delete_event( uid: str, - server_identity: str = Depends(_resolve_server_identity), + context: CommandContext, ) -> JSONResponse: """Delete an event by unique identifier.""" - spec = _COMMAND_SPECS["event:delete"] - return await _send_command( - server_identity, - spec.command, - uid, - response_type=spec.response_type, - ) + return await context.execute("event:delete", payload=uid) @app.get("/events") async def list_events( - server_identity: str = Depends(_resolve_server_identity), + context: CommandContext, ) -> JSONResponse: """List events stored on the server.""" - spec = _COMMAND_SPECS["event:list"] - return await _send_command( - server_identity, - spec.command, - None, - response_type=spec.response_type, - ) + return await context.execute("event:list") @app.put("/events/{uid}") async def update_event( uid: int, payload: Dict[str, Any], - server_identity: str = Depends(_resolve_server_identity), + context: CommandContext, ) -> JSONResponse: """Update an existing event by unique identifier.""" - spec = _COMMAND_SPECS["event:update"] - overrides = {spec.path_field: uid} if spec.path_field else {"uid": uid} - event = prepare_dataclass_payload( - spec.request_type, dict(payload), overrides=overrides - ) - return await _send_command( - server_identity, - spec.command, - event, - response_type=spec.response_type, + return await context.execute( + "event:update", body=payload, path_params={"uid": uid} ) @app.get("/events/{uid}") async def retrieve_event( uid: str, - server_identity: str = Depends(_resolve_server_identity), + context: CommandContext, ) -> JSONResponse: """Retrieve an event by unique identifier.""" - spec = _COMMAND_SPECS["event:retrieve"] - return await _send_command( - server_identity, - spec.command, - uid, - response_type=spec.response_type, - ) + return await context.execute("event:retrieve", payload=uid) + + +__all__ = ["app"] diff --git a/reticulum_openapi/integrations/fastapi/__init__.py b/reticulum_openapi/integrations/fastapi/__init__.py new file mode 100644 index 0000000..363a540 --- /dev/null +++ b/reticulum_openapi/integrations/fastapi/__init__.py @@ -0,0 +1,25 @@ +"""FastAPI integration helpers for Reticulum LXMF clients.""" + +from .commands import CommandSpec +from .commands import LXMFCommandContext +from .commands import create_command_context_dependency +from .dependencies import LXMFClientManager +from .interfaces import gather_interface_status +from .link import LinkManager +from .link import LinkStatus +from .settings import LXMFClientSettings +from .settings import create_settings_loader +from .settings import load_lxmf_client_settings + +__all__ = [ + "CommandSpec", + "LXMFCommandContext", + "LXMFClientManager", + "LinkManager", + "LinkStatus", + "create_command_context_dependency", + "create_settings_loader", + "gather_interface_status", + "load_lxmf_client_settings", + "LXMFClientSettings", +] diff --git a/reticulum_openapi/integrations/fastapi/commands.py b/reticulum_openapi/integrations/fastapi/commands.py new file mode 100644 index 0000000..a57c277 --- /dev/null +++ b/reticulum_openapi/integrations/fastapi/commands.py @@ -0,0 +1,189 @@ +"""Reusable helpers for issuing LXMF commands from FastAPI routers.""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import Any +from typing import Callable +from typing import Mapping +from typing import Optional + +from fastapi import Header +from fastapi import HTTPException +from fastapi import Query +from fastapi import status +from fastapi.responses import JSONResponse + +from reticulum_openapi.client import LXMFClient as BaseLXMFClient +from reticulum_openapi.conversion import normalise_response +from reticulum_openapi.conversion import prepare_dataclass_payload + +from .dependencies import LXMFClientManager + + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class CommandSpec: + """Describe an LXMF command handled by a FastAPI endpoint.""" + + command: str + request_type: Optional[Any] = None + response_type: Optional[Any] = None + path_field: Optional[str] = None + + +class LXMFCommandContext: + """Command execution context bound to a specific server identity.""" + + def __init__( + self, + manager: LXMFClientManager, + server_identity: str, + command_specs: Mapping[str, CommandSpec], + ) -> None: + self._manager = manager + self._server_identity = server_identity + self._command_specs = command_specs + + async def execute( + self, + key: str, + *, + body: Optional[Mapping[str, Any]] = None, + payload: Optional[Any] = None, + path_params: Optional[Mapping[str, Any]] = None, + ) -> JSONResponse: + """Send the LXMF command described by ``key`` and normalise the response.""" + + if key not in self._command_specs: + raise KeyError(f"Unknown LXMF command key: {key}") + + spec = self._command_specs[key] + request_payload = self._prepare_payload(spec, body, payload, path_params) + return await self._send_command(spec.command, request_payload, spec.response_type) + + def _prepare_payload( + self, + spec: CommandSpec, + body: Optional[Mapping[str, Any]], + payload: Optional[Any], + path_params: Optional[Mapping[str, Any]], + ) -> Optional[Any]: + """Return the payload to send for the supplied command specification.""" + + if payload is not None: + return payload + + if spec.request_type is not None: + overrides = dict(path_params or {}) + raw_payload = dict(body or {}) + return prepare_dataclass_payload(spec.request_type, raw_payload, overrides=overrides) + + if spec.path_field and path_params and spec.path_field in path_params: + return path_params[spec.path_field] + + if body is None: + return None + + return dict(body) + + async def _send_command( + self, + command: str, + request_payload: Optional[Any], + response_type: Optional[Any], + ) -> JSONResponse: + """Send a command through LXMF and return the decoded response.""" + + client = self._manager.get_client() + try: + response = await client.send_command( + self._server_identity, + command, + request_payload, + await_response=True, + response_type=response_type, + ) + except TimeoutError as exc: + logger.error( + "LXMF gateway command '%s' to server %s timed out: %s", + command, + self._server_identity, + exc, + ) + raise HTTPException( + status_code=status.HTTP_504_GATEWAY_TIMEOUT, + detail=str(exc), + ) from exc + except ValueError as exc: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=str(exc), + ) from exc + except Exception as exc: # pragma: no cover - defensive path + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=str(exc), + ) from exc + + if response is None: + return JSONResponse(content=None) + + normalised = normalise_response(response) + return JSONResponse(content=normalised) + + +def _resolve_server_identity( + manager: LXMFClientManager, + server_identity_query: Optional[str], + server_identity_header: Optional[str], +) -> str: + """Return the destination server identity hash for a request.""" + + candidate = ( + server_identity_query + or server_identity_header + or manager.get_server_identity() + ) + if candidate is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Server identity hash is required", + ) + try: + return BaseLXMFClient._normalise_destination_hex(candidate) + except (TypeError, ValueError) as exc: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=str(exc), + ) from exc + + +def create_command_context_dependency( + manager: LXMFClientManager, + command_specs: Mapping[str, CommandSpec], +) -> Callable[..., LXMFCommandContext]: + """Return a dependency that resolves server identity and command context.""" + + async def _dependency( + server_identity_query: Optional[str] = Query(None, alias="server_identity"), + server_identity_header: Optional[str] = Header( + None, alias="X-Server-Identity" + ), + ) -> LXMFCommandContext: + server_identity = _resolve_server_identity( + manager, server_identity_query, server_identity_header + ) + return LXMFCommandContext(manager, server_identity, command_specs) + + return _dependency + + +__all__ = [ + "CommandSpec", + "LXMFCommandContext", + "create_command_context_dependency", +] diff --git a/reticulum_openapi/integrations/fastapi/dependencies.py b/reticulum_openapi/integrations/fastapi/dependencies.py new file mode 100644 index 0000000..539b255 --- /dev/null +++ b/reticulum_openapi/integrations/fastapi/dependencies.py @@ -0,0 +1,121 @@ +"""FastAPI dependency helpers for managing LXMF client instances.""" + +from __future__ import annotations + +import logging +from contextlib import suppress +from typing import Awaitable +from typing import Callable +from typing import Optional + +from fastapi import FastAPI + +from reticulum_openapi.client import LXMFClient + +from .settings import LXMFClientSettings + + +logger = logging.getLogger(__name__) + + +class LXMFClientManager: + """Manage lifecycle of a shared :class:`~reticulum_openapi.client.LXMFClient`.""" + + def __init__( + self, + settings_loader: Callable[[], LXMFClientSettings], + *, + client_factory: Optional[Callable[[LXMFClientSettings], LXMFClient]] = None, + announce_on_startup: bool = True, + ) -> None: + """Initialise the manager with a settings loader and optional factory.""" + + self._settings_loader = settings_loader + self._client_factory = client_factory or self._default_factory + self._announce_on_startup = announce_on_startup + self._client: Optional[LXMFClient] = None + self._notification_unsubscriber: Optional[Callable[[], Awaitable[None]]] = None + + @staticmethod + def _default_factory(settings: LXMFClientSettings) -> LXMFClient: + """Create a new :class:`LXMFClient` using supplied settings.""" + + client = LXMFClient( + config_path=settings.lxmf_config_path, + storage_path=settings.lxmf_storage_path, + display_name=settings.client_display_name, + timeout=settings.request_timeout_seconds, + shared_instance_rpc_key=settings.shared_instance_rpc_key, + ) + return client + + def get_settings(self) -> LXMFClientSettings: + """Return the cached LXMF client settings.""" + + return self._settings_loader() + + def get_client(self) -> LXMFClient: + """Return the shared LXMF client, creating it if required.""" + + if self._client is None: + settings = self.get_settings() + client = self._client_factory(settings) + if self._announce_on_startup and hasattr(client, "announce"): + client.announce() + self._client = client + return self._client + + def get_server_identity(self) -> Optional[str]: + """Return the configured server identity hash when available.""" + + settings = self.get_settings() + return settings.server_identity_hash + + def set_notification_unsubscriber( + self, unsubscriber: Optional[Callable[[], Awaitable[None]]] + ) -> None: + """Store the callable responsible for removing notification hooks.""" + + self._notification_unsubscriber = unsubscriber + + async def shutdown(self) -> None: + """Tear down the managed client and release resources.""" + + if self._client is None: + return + + client = self._client + self._client = None + + if self._notification_unsubscriber is not None: + unsubscribe = self._notification_unsubscriber + self._notification_unsubscriber = None + with suppress(Exception): # pragma: no cover - defensive cleanup + await unsubscribe() + + with suppress(Exception): # pragma: no cover - defensive cleanup + client.stop_listening_for_announces() + + def register_events( + self, + app: FastAPI, + *, + attach_notifications: Optional[ + Callable[[LXMFClient], Awaitable[Callable[[], Awaitable[None]]]] + ] = None, + ) -> None: + """Register FastAPI events for managing the LXMF client lifecycle.""" + + @app.on_event("startup") + async def _startup() -> None: + client = self.get_client() + if attach_notifications is not None: + unsubscribe = await attach_notifications(client) + self.set_notification_unsubscriber(unsubscribe) + + @app.on_event("shutdown") + async def _shutdown() -> None: + await self.shutdown() + + +__all__ = ["LXMFClientManager"] diff --git a/examples/EmergencyManagement/web_gateway/interface_status.py b/reticulum_openapi/integrations/fastapi/interfaces.py similarity index 87% rename from examples/EmergencyManagement/web_gateway/interface_status.py rename to reticulum_openapi/integrations/fastapi/interfaces.py index 8274860..3fe082f 100644 --- a/examples/EmergencyManagement/web_gateway/interface_status.py +++ b/reticulum_openapi/integrations/fastapi/interfaces.py @@ -1,8 +1,11 @@ -"""Helpers for reporting Reticulum interface status.""" +"""Helpers for inspecting Reticulum interface status for FastAPI gateways.""" from __future__ import annotations -from typing import Any, Dict, List, Optional +from typing import Any +from typing import Dict +from typing import List +from typing import Optional import RNS from RNS.Interfaces import Interface as RNSInterface @@ -39,7 +42,7 @@ def _coerce_optional_int(value: Any) -> Optional[int]: if isinstance(value, (int, float)): try: return int(value) - except (TypeError, ValueError): + except (TypeError, ValueError): # pragma: no cover - defensive cast return None return None @@ -62,3 +65,6 @@ def gather_interface_status() -> List[Dict[str, Any]]: } ) return statuses + + +__all__ = ["gather_interface_status"] diff --git a/reticulum_openapi/integrations/fastapi/link.py b/reticulum_openapi/integrations/fastapi/link.py new file mode 100644 index 0000000..602e316 --- /dev/null +++ b/reticulum_openapi/integrations/fastapi/link.py @@ -0,0 +1,140 @@ +"""Utilities for tracking LXMF link status and retrying connections.""" + +from __future__ import annotations + +import asyncio +import logging +from dataclasses import dataclass +from datetime import datetime +from datetime import timezone +from typing import Callable +from typing import Dict +from typing import Optional + +from reticulum_openapi.client import LXMFClient + + +logger = logging.getLogger(__name__) + + +@dataclass +class LinkStatus: + """Describe the gateway's most recent LXMF link attempt.""" + + state: str = "pending" + message: Optional[str] = None + server_identity: Optional[str] = None + last_attempt: Optional[datetime] = None + last_success: Optional[datetime] = None + last_error: Optional[str] = None + + def to_dict(self) -> Dict[str, Optional[str]]: + """Return a serialisable mapping describing the link state.""" + + def _format_timestamp(value: Optional[datetime]) -> Optional[str]: + if value is None: + return None + return value.astimezone(timezone.utc).isoformat() + + return { + "state": self.state, + "message": self.message, + "serverIdentity": self.server_identity, + "lastAttempt": _format_timestamp(self.last_attempt), + "lastSuccess": _format_timestamp(self.last_success), + "lastError": self.last_error, + } + + +class LinkManager: + """Manage LXMF link retries for a shared client instance.""" + + def __init__( + self, + client_provider: Callable[[], LXMFClient], + *, + retry_delay_seconds: float = 5.0, + ) -> None: + self._client_provider = client_provider + self._retry_delay_seconds = retry_delay_seconds + self._task: Optional[asyncio.Task[None]] = None + self.status = LinkStatus() + + async def _ensure_link_with_retry(self, server_identity: str) -> None: + """Continuously attempt to connect the LXMF client to the server.""" + + while True: + attempt_time = datetime.now(timezone.utc) + self.status.last_attempt = attempt_time + try: + client = self._client_provider() + await client.ensure_link(server_identity) + except asyncio.CancelledError: + raise + except Exception as exc: # pragma: no cover - defensive logging + self._record_link_failure(server_identity, exc) + await asyncio.sleep(self._retry_delay_seconds) + else: + self._record_link_success(server_identity, attempt_time) + break + + def _record_link_failure(self, server_identity: str, error: Exception) -> None: + """Update the link status after a failed connection attempt.""" + + self.status.state = "connecting" + self.status.last_error = str(error) + self.status.message = ( + "Link to LXMF server " + f"{server_identity} failed: {error}. " + f"Retrying in {self._retry_delay_seconds:.1f} seconds." + ) + logger.warning("LXMF link to server %s failed: %s", server_identity, error) + + def _record_link_success( + self, server_identity: str, attempt_time: datetime + ) -> None: + """Update link status and log a successful connection.""" + + self.status.state = "connected" + self.status.last_success = attempt_time + self.status.last_error = None + message = f"Connected to LXMF server {server_identity}" + self.status.message = message + print(f"[Reticulum FastAPI] {message}") + logger.info("Established LXMF link with server %s", server_identity) + + def start(self, server_identity: Optional[str]) -> None: + """Begin the background retry loop for the configured server identity.""" + + if server_identity is None: + self.status.state = "unconfigured" + self.status.message = "Server identity hash not configured." + self.status.server_identity = None + return + + self.status.state = "connecting" + self.status.message = f"Attempting to connect to LXMF server {server_identity}" + self.status.server_identity = server_identity + + if self._task is not None and not self._task.done(): + return + + loop = asyncio.get_running_loop() + self._task = loop.create_task(self._ensure_link_with_retry(server_identity)) + + async def stop(self) -> None: + """Cancel the retry task if it is active.""" + + if self._task is None: + return + + task = self._task + self._task = None + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + +__all__ = ["LinkManager", "LinkStatus"] diff --git a/reticulum_openapi/integrations/fastapi/settings.py b/reticulum_openapi/integrations/fastapi/settings.py new file mode 100644 index 0000000..6b08685 --- /dev/null +++ b/reticulum_openapi/integrations/fastapi/settings.py @@ -0,0 +1,183 @@ +"""Pydantic models and helpers for configuring LXMF FastAPI integrations.""" + +from __future__ import annotations + +import json +import os +from functools import lru_cache +from pathlib import Path +from typing import Any +from typing import Callable +from typing import Dict +from typing import Optional + +from dotenv import load_dotenv +from pydantic import BaseModel +from pydantic import Field +from pydantic import field_validator + + +load_dotenv() + + +class LXMFClientSettings(BaseModel): + """Pydantic model describing LXMF client configuration values.""" + + server_identity_hash: Optional[str] = Field(default=None) + client_display_name: str = Field("LXMFClient", min_length=1) + request_timeout_seconds: float = Field(300.0, ge=0.0) + lxmf_config_path: Optional[str] = None + lxmf_storage_path: Optional[str] = None + shared_instance_rpc_key: Optional[str] = None + + @field_validator("server_identity_hash", mode="before") + @classmethod + def _normalise_identity(cls, value: Optional[str]) -> Optional[str]: + """Return a lowercase hexadecimal identity string when provided.""" + + if value is None: + return None + cleaned = str(value).strip() + if not cleaned: + return None + return cleaned.lower() + + @field_validator("client_display_name") + @classmethod + def _validate_display_name(cls, value: str) -> str: + """Ensure the display name is not blank.""" + + cleaned = value.strip() + if not cleaned: + raise ValueError("client_display_name cannot be empty") + return cleaned + + @field_validator("lxmf_config_path", "lxmf_storage_path", mode="before") + @classmethod + def _normalise_optional_paths( + cls, value: Optional[str] + ) -> Optional[str]: + """Return ``None`` when optional string values are empty.""" + + if value is None: + return None + cleaned = str(value).strip() + return cleaned or None + + @field_validator("shared_instance_rpc_key", mode="before") + @classmethod + def _normalise_shared_instance_key( + cls, value: Optional[str] + ) -> Optional[str]: + """Normalise and validate optional RPC key overrides.""" + + if value is None: + return None + + cleaned = str(value).strip() + if not cleaned: + return None + + try: + bytes.fromhex(cleaned) + except ValueError as exc: # pragma: no cover - defensive validation + raise ValueError( + "shared_instance_rpc_key must be a hexadecimal string" + ) from exc + + return cleaned.lower() + + +def _load_config_from_json(raw_json: str) -> Dict[str, Any]: + """Return configuration data parsed from a raw JSON string.""" + + try: + parsed = json.loads(raw_json) + except json.JSONDecodeError as exc: # pragma: no cover - defensive logging + raise ValueError("Invalid JSON supplied via environment variable") from exc + + if not isinstance(parsed, dict): + raise ValueError("Configuration JSON must decode to a mapping") + + return parsed + + +def _load_config_from_path(path: Path) -> Dict[str, Any]: + """Return configuration data parsed from a JSON file.""" + + if not path.exists(): + raise FileNotFoundError(f"Configuration file not found: {path}") + with path.open("r", encoding="utf-8") as handle: + return json.load(handle) + + +def load_lxmf_client_settings( + *, + default_path: Optional[Path] = None, + env_json_var: str = "LXMF_CLIENT_CONFIG_JSON", + env_path_var: str = "LXMF_CLIENT_CONFIG_PATH", + require_server_identity: bool = False, +) -> LXMFClientSettings: + """Load LXMF client settings from environment variables or a JSON file. + + Args: + default_path (Optional[Path]): Path to load when no overrides are supplied. + env_json_var (str): Name of the environment variable containing raw JSON. + env_path_var (str): Name of the environment variable pointing to a file. + require_server_identity (bool): When ``True`` a missing identity raises. + + Returns: + LXMFClientSettings: Parsed configuration model populated with LXMF values. + """ + + raw_json = os.getenv(env_json_var) + if raw_json: + config_data = _load_config_from_json(raw_json) + else: + path_override = os.getenv(env_path_var) + if path_override: + try: + candidate_path = Path(path_override).expanduser() + except (TypeError, ValueError): + candidate_path = None + if candidate_path is not None: + config_data = _load_config_from_path(candidate_path) + else: + config_data = {} + elif default_path is not None: + config_data = _load_config_from_path(default_path) + else: + config_data = {} + + settings = LXMFClientSettings(**config_data) + if require_server_identity and not settings.server_identity_hash: + raise ValueError("server_identity_hash must be configured") + return settings + + +def create_settings_loader( + *, + default_path: Optional[Path] = None, + env_json_var: str = "LXMF_CLIENT_CONFIG_JSON", + env_path_var: str = "LXMF_CLIENT_CONFIG_PATH", + require_server_identity: bool = False, +) -> Callable[[], LXMFClientSettings]: + """Return a cached callable that loads LXMF client settings.""" + + @lru_cache(maxsize=1) + def _loader() -> LXMFClientSettings: + return load_lxmf_client_settings( + default_path=default_path, + env_json_var=env_json_var, + env_path_var=env_path_var, + require_server_identity=require_server_identity, + ) + + return _loader + + +__all__ = [ + "LXMFClientSettings", + "create_settings_loader", + "load_lxmf_client_settings", +] diff --git a/tests/examples/emergency_management/test_north_api.py b/tests/examples/emergency_management/test_north_api.py index 02438f8..966a7bb 100644 --- a/tests/examples/emergency_management/test_north_api.py +++ b/tests/examples/emergency_management/test_north_api.py @@ -107,7 +107,23 @@ def __init__( def stop_listening_for_announces(self): self.stopped = True - monkeypatch.setattr(deps, "LXMFClient", DummyClient) + def announce(self) -> None: + return None + + def _factory(settings): + return DummyClient( + config_path=settings.lxmf_config_path, + storage_path=settings.lxmf_storage_path, + display_name=settings.client_display_name, + timeout=settings.request_timeout_seconds, + shared_instance_rpc_key=settings.shared_instance_rpc_key, + ) + + deps._client_manager = deps.LXMFClientManager( + deps.get_config, + client_factory=_factory, + announce_on_startup=False, + ) app = FastAPI() deps.register_client_events(app) @@ -125,8 +141,9 @@ def stop_listening_for_announces(self): await handler() assert DummyClient.instance.stopped is True - with pytest.raises(RuntimeError): - deps.get_lxmf_client() + new_client = deps.get_lxmf_client() + assert isinstance(new_client, DummyClient) + assert new_client is not client @pytest.fixture() @@ -143,21 +160,34 @@ def north_api_test_client(monkeypatch): "examples.EmergencyManagement.client.north_api.dependencies" ) - app_module = importlib.reload(app_module) - routes_module = importlib.reload(routes_module) deps_module = importlib.reload(deps_module) stub_client = object() - deps_module._client_instance = None - def _fake_startup() -> None: - deps_module._client_instance = stub_client + class StubManager: + def __init__(self) -> None: + self.client = stub_client + + def get_client(self): + return self.client + + def get_server_identity(self) -> str: + return "0011223344556677" - def _fake_shutdown() -> None: - deps_module._client_instance = None + def register_events(self, app, attach_notifications=None): + @app.on_event("startup") + async def _startup() -> None: + return None + + @app.on_event("shutdown") + async def _shutdown() -> None: + return None + + deps_module._client_manager = StubManager() + + app_module = importlib.reload(app_module) + routes_module = importlib.reload(routes_module) - monkeypatch.setattr(deps_module, "startup_client", _fake_startup) - monkeypatch.setattr(deps_module, "shutdown_client", _fake_shutdown) app_module.app.dependency_overrides[deps_module.get_lxmf_client] = ( lambda: stub_client ) @@ -169,7 +199,6 @@ def _fake_shutdown() -> None: yield client, routes_module, stub_client app_module.app.dependency_overrides.clear() - deps_module._client_instance = None def test_create_event_route_converts_payload(north_api_test_client, monkeypatch): diff --git a/tests/examples/emergency_management/test_web_gateway.py b/tests/examples/emergency_management/test_web_gateway.py index fd95188..a3812cf 100644 --- a/tests/examples/emergency_management/test_web_gateway.py +++ b/tests/examples/emergency_management/test_web_gateway.py @@ -27,7 +27,15 @@ def gateway_app(monkeypatch): """Provide a configured TestClient and captured LXMF client instance.""" - monkeypatch.delenv("NORTH_API_CONFIG_JSON", raising=False) + config_json = json.dumps( + { + "server_identity_hash": SERVER_IDENTITY, + "client_display_name": "JsonConfiguredClient", + "request_timeout_seconds": 12, + "shared_instance_rpc_key": "C0FFEE", + } + ) + monkeypatch.setenv("NORTH_API_CONFIG_JSON", config_json) monkeypatch.delenv("NORTH_API_CONFIG_PATH", raising=False) module = importlib.import_module("examples.EmergencyManagement.web_gateway.app") @@ -48,11 +56,20 @@ def __init__(self, *args, **kwargs) -> None: self.announce_called = False self.kwargs = kwargs self.shared_instance_rpc_key = kwargs.get("shared_instance_rpc_key") + self._listener = None created_clients.append(self) def announce(self) -> None: self.announce_called = True + async def add_notification_listener(self, listener): + self._listener = listener + + async def _unsubscribe() -> None: + self._listener = None + + return _unsubscribe + monkeypatch.setattr(module, "LXMFClient", StubClient) mode_full = RNS.Interfaces.Interface.Interface.MODE_FULL mode_roaming = RNS.Interfaces.Interface.Interface.MODE_ROAMING @@ -67,7 +84,7 @@ def __init__(self, name: str, online: bool, mode: int, bitrate: int) -> None: self.bitrate = bitrate status_module = importlib.import_module( - "examples.EmergencyManagement.web_gateway.interface_status" + "reticulum_openapi.integrations.fastapi.interfaces" ) monkeypatch.setattr( status_module.RNS.Transport, @@ -82,29 +99,23 @@ def __init__(self, name: str, online: bool, mode: int, bitrate: int) -> None: if not created_clients: raise AssertionError("LXMF client was not created on startup") stub = created_clients[0] - assert stub.shared_instance_rpc_key == module._CONFIG_DATA.get( - "shared_instance_rpc_key" - ) + settings = module._CLIENT_MANAGER.get_settings() + assert stub.shared_instance_rpc_key == settings.shared_instance_rpc_key for _ in range(20): - if module._LINK_STATUS.state == "connected": + if module._LINK_MANAGER.status.state == "connected": break time.sleep(0.05) - if module._DEFAULT_SERVER_IDENTITY: - stub.ensure_link.assert_awaited_once_with(module._DEFAULT_SERVER_IDENTITY) - assert module._LINK_STATUS.state == "connected" - assert ( - module._LINK_STATUS.server_identity == module._DEFAULT_SERVER_IDENTITY - ) - assert module._LINK_STATUS.message.startswith("Connected to LXMF") + stub.ensure_link.assert_awaited_once_with(SERVER_IDENTITY) + assert module._LINK_MANAGER.status.state == "connected" + assert module._LINK_MANAGER.status.server_identity == SERVER_IDENTITY + assert module._LINK_MANAGER.status.message.startswith("Connected to LXMF") assert module._INTERFACE_STATUS assert module._INTERFACE_STATUS[0]["name"] == "Local Gateway" stub.send_command.reset_mock() stub.ensure_link.reset_mock() yield module, client, stub - module._CLIENT_INSTANCE = None - module._LINK_TASK = None - module._LINK_STATUS = module._LinkStatus() + module._SETTINGS_LOADER.cache_clear() module._INTERFACE_STATUS = [] @@ -124,10 +135,11 @@ def test_default_identity_uses_json_config(monkeypatch) -> None: module = importlib.import_module("examples.EmergencyManagement.web_gateway.app") module = importlib.reload(module) - assert module._DEFAULT_SERVER_IDENTITY == SERVER_IDENTITY - assert module._CONFIG_DATA["client_display_name"] == "JsonConfiguredClient" - assert module._CONFIG_DATA["request_timeout_seconds"] == 12 - assert module._CONFIG_DATA["shared_instance_rpc_key"] == "C0FFEE" + settings = module._CLIENT_MANAGER.get_settings() + assert module._CLIENT_MANAGER.get_server_identity() == SERVER_IDENTITY + assert settings.client_display_name == "JsonConfiguredClient" + assert settings.request_timeout_seconds == 12 + assert settings.shared_instance_rpc_key == "c0ffee" monkeypatch.delenv("NORTH_API_CONFIG_JSON", raising=False) importlib.reload(module) @@ -395,19 +407,16 @@ def test_gateway_status_returns_version_and_uptime(gateway_app) -> None: assert payload["version"] == module._GATEWAY_VERSION assert isinstance(payload["uptime"], str) assert payload["uptime"].count(":") == 2 - assert payload["serverIdentity"] == module._DEFAULT_SERVER_IDENTITY - assert payload["clientDisplayName"] == module._resolve_display_name( - module._CONFIG_DATA - ) - assert payload["requestTimeoutSeconds"] == module._resolve_timeout( - module._CONFIG_DATA + assert payload["serverIdentity"] == module._CLIENT_MANAGER.get_server_identity() + settings = module._CLIENT_MANAGER.get_settings() + assert payload["clientDisplayName"] == settings.client_display_name + assert payload["requestTimeoutSeconds"] == settings.request_timeout_seconds + assert payload["lxmfConfigPath"] == settings.lxmf_config_path or str( + module.CONFIG_PATH ) - assert payload["lxmfConfigPath"] == str(module.CONFIG_PATH) - assert payload["lxmfStoragePath"] is None + assert payload["lxmfStoragePath"] == settings.lxmf_storage_path assert payload["allowedOrigins"] == module._ALLOWED_ORIGINS - assert payload["linkStatus"]["state"] in {"connected", "unconfigured"} - if payload["linkStatus"]["state"] == "connected": - assert payload["linkStatus"]["message"].startswith("Connected to LXMF") + assert payload["linkStatus"] == module._LINK_MANAGER.status.to_dict() def test_link_failure_reported_in_status(monkeypatch) -> None: @@ -431,21 +440,25 @@ def __init__(self, *args, **kwargs) -> None: def announce(self) -> None: return None + async def add_notification_listener(self, listener): + async def _unsubscribe() -> None: + return None + + return _unsubscribe + monkeypatch.setattr(module, "LXMFClient", FailingClient) - monkeypatch.setattr(module, "_LINK_RETRY_DELAY_SECONDS", 0.01) + module._LINK_MANAGER._retry_delay_seconds = 0.01 with TestClient(module.app): time.sleep(0.05) - assert module._LINK_STATUS.state == "connecting" - assert module._LINK_STATUS.last_error == "no link" - assert "Retrying" in (module._LINK_STATUS.message or "") - assert module._LINK_STATUS.last_attempt is not None + status = module._LINK_MANAGER.status + assert status.state == "connecting" + assert status.last_error == "no link" + assert "Retrying" in (status.message or "") + assert status.last_attempt is not None monkeypatch.delenv("NORTH_API_CONFIG_JSON", raising=False) - module._CLIENT_INSTANCE = None - module._LINK_STATUS = module._LinkStatus() - module._LINK_TASK = None def test_successful_link_prints_console_message(monkeypatch) -> None: @@ -477,15 +490,18 @@ def __init__(self, *args, **kwargs) -> None: def announce(self) -> None: return None + async def add_notification_listener(self, listener): + async def _unsubscribe() -> None: + return None + + return _unsubscribe + monkeypatch.setattr(module, "LXMFClient", SuccessfulClient) with TestClient(module.app): time.sleep(0.05) assert any("Connected to LXMF server" in message for message in printed) - assert module._LINK_STATUS.state == "connected" + assert module._LINK_MANAGER.status.state == "connected" monkeypatch.delenv("NORTH_API_CONFIG_JSON", raising=False) - module._CLIENT_INSTANCE = None - module._LINK_STATUS = module._LinkStatus() - module._LINK_TASK = None diff --git a/tests/integrations/fastapi/test_integration_layer.py b/tests/integrations/fastapi/test_integration_layer.py new file mode 100644 index 0000000..a394165 --- /dev/null +++ b/tests/integrations/fastapi/test_integration_layer.py @@ -0,0 +1,136 @@ +import asyncio +from types import SimpleNamespace +from typing import Awaitable +from typing import Callable + +import pytest +from fastapi import FastAPI +from fastapi import HTTPException +from fastapi import status +from fastapi.testclient import TestClient +from unittest.mock import AsyncMock + +import RNS + +from reticulum_openapi.integrations.fastapi import CommandSpec +from reticulum_openapi.integrations.fastapi import LXMFClientManager +from reticulum_openapi.integrations.fastapi import LinkManager +from reticulum_openapi.integrations.fastapi import create_command_context_dependency +from reticulum_openapi.integrations.fastapi import gather_interface_status +from reticulum_openapi.integrations.fastapi import LXMFClientSettings + + +@pytest.fixture() +def stubbed_interfaces(monkeypatch): + """Provide deterministic Reticulum interfaces for tests.""" + + mode_full = RNS.Interfaces.Interface.Interface.MODE_FULL + mode_roaming = RNS.Interfaces.Interface.Interface.MODE_ROAMING + + class StubInterface: + def __init__(self, name: str, online: bool, mode: int, bitrate: int) -> None: + self.name = name + self.online = online + self.mode = mode + self.bitrate = bitrate + + interfaces = [ + StubInterface("Full Power", True, mode_full, 1_000_000), + StubInterface("Roaming", False, mode_roaming, 62_500), + ] + monkeypatch.setattr(RNS.Transport, "interfaces", interfaces) + return interfaces + + +def test_client_manager_registers_lifecycle(): + """The LXMF client manager should create and tear down the client.""" + + settings = LXMFClientSettings(server_identity_hash="0011") + + created_clients = [] + + class StubClient: + def __init__(self) -> None: + self.announce_called = False + self.stop_called = False + + def announce(self) -> None: + self.announce_called = True + + def stop_listening_for_announces(self) -> None: + self.stop_called = True + + def factory(_: LXMFClientSettings) -> StubClient: + client = StubClient() + created_clients.append(client) + return client + + async def attach_notifications(client: StubClient) -> Callable[[], Awaitable[None]]: + assert client is created_clients[0] + + async def unsubscribe() -> None: + client.stop_called = True + + return unsubscribe + + manager = LXMFClientManager(lambda: settings, client_factory=factory) + + app = FastAPI() + manager.register_events(app, attach_notifications=attach_notifications) + + with TestClient(app): + assert created_clients + assert created_clients[0].announce_called is True + + assert created_clients[0].stop_called is True + + +def test_gather_interface_status_reports_metadata(stubbed_interfaces): + """Interface helper should expose name, type, and status metadata.""" + + statuses = gather_interface_status() + assert len(statuses) == 2 + first = statuses[0] + assert first["name"] == "Full Power" + assert first["online"] is True + assert first["mode"] == "full" + + +@pytest.mark.asyncio() +async def test_link_manager_connects_and_stops(): + """Link manager should attempt to connect and record success.""" + + stub_client = SimpleNamespace(ensure_link=AsyncMock()) + manager = LinkManager(lambda: stub_client, retry_delay_seconds=0.01) + + manager.start("001122") + await asyncio.sleep(0.05) + + stub_client.ensure_link.assert_awaited_once_with("001122") + assert manager.status.state == "connected" + await manager.stop() + + +@pytest.mark.asyncio() +async def test_command_context_translates_timeouts(): + """Command context should convert LXMF timeouts to HTTP errors.""" + + settings = LXMFClientSettings(server_identity_hash="001122") + stub_client = SimpleNamespace( + send_command=AsyncMock(side_effect=TimeoutError("boom")) + ) + manager = LXMFClientManager( + lambda: settings, + client_factory=lambda _: stub_client, + announce_on_startup=False, + ) + + dependency = create_command_context_dependency( + manager, {"test": CommandSpec(command="TestCommand")} + ) + context = await dependency(None, None) + + with pytest.raises(HTTPException) as excinfo: + await context.execute("test") + + assert excinfo.value.status_code == status.HTTP_504_GATEWAY_TIMEOUT