Skip to content
This repository was archived by the owner on May 3, 2026. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 75 additions & 9 deletions examples/EmergencyManagement/web_gateway/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from importlib import metadata
from typing import Annotated
from typing import Any
from typing import Awaitable
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
Expand Down Expand Up @@ -117,10 +119,58 @@ def _create_client(settings) -> LXMFClient:


_CLIENT_MANAGER = LXMFClientManager(_SETTINGS_LOADER, client_factory=_create_client)
_CLIENT_MANAGER.register_events(
app, attach_notifications=attach_client_notifications
_CLIENT_MANAGER_ORIGINAL_GET_CLIENT = _CLIENT_MANAGER.get_client
_CLIENT_MANAGER_ORIGINAL_GET_SERVER_IDENTITY = _CLIENT_MANAGER.get_server_identity
_CLIENT_INSTANCE: Optional[LXMFClient] = None
_DEFAULT_SERVER_IDENTITY: Optional[str] = None


def get_shared_client() -> LXMFClient:
"""Return the shared LXMF client, honouring in-process test overrides."""

global _CLIENT_INSTANCE
if _CLIENT_INSTANCE is not None:
return _CLIENT_INSTANCE
client = _CLIENT_MANAGER_ORIGINAL_GET_CLIENT()
_CLIENT_INSTANCE = client
return client


def _manager_get_client_override(self: LXMFClientManager) -> LXMFClient:
"""Return the shared LXMF client via the module-level accessor."""

return get_shared_client()


_CLIENT_MANAGER.get_client = _manager_get_client_override.__get__(
_CLIENT_MANAGER, LXMFClientManager
)
_NOTIFICATION_UNSUBSCRIBER: Optional[Callable[[], Awaitable[None]]] = None


def get_server_identity() -> Optional[str]:
"""Return the configured server identity or a test override."""

if _DEFAULT_SERVER_IDENTITY:
return _DEFAULT_SERVER_IDENTITY
identity = _CLIENT_MANAGER_ORIGINAL_GET_SERVER_IDENTITY()
if identity:
return identity
return None


def _manager_get_server_identity_override(
self: LXMFClientManager,
) -> Optional[str]:
"""Return the LXMF server identity with module-level fallback."""

return get_server_identity()


_CLIENT_MANAGER.get_server_identity = _manager_get_server_identity_override.__get__(
_CLIENT_MANAGER, LXMFClientManager
)
_LINK_MANAGER = LinkManager(_CLIENT_MANAGER.get_client)
_LINK_MANAGER = LinkManager(get_shared_client)
_COMMAND_SPECS: Dict[str, CommandSpec] = {
"eam:create": CommandSpec(
command=COMMAND_CREATE_EAM,
Expand Down Expand Up @@ -202,7 +252,15 @@ def _refresh_interface_status() -> List[Dict[str, Any]]:
async def _startup() -> None:
"""Ensure the LXMF client is ready before serving requests."""

_CLIENT_MANAGER.get_client()
client = get_shared_client()
global _NOTIFICATION_UNSUBSCRIBER
if _NOTIFICATION_UNSUBSCRIBER is None and hasattr(
client, "add_notification_listener"
):
try:
_NOTIFICATION_UNSUBSCRIBER = await attach_client_notifications(client)
except Exception as exc: # pragma: no cover - defensive logging
logger.warning("Failed to attach LXMF notification listener: %s", exc)
interface_status = _refresh_interface_status()
active_interfaces = [
status["name"] for status in interface_status if status.get("online")
Expand All @@ -213,14 +271,23 @@ async def _startup() -> None:
else:
print("[Emergency Gateway] No active Reticulum interfaces reported.")

_LINK_MANAGER.start(_CLIENT_MANAGER.get_server_identity())
_LINK_MANAGER.start(get_server_identity())


@app.on_event("shutdown")
async def _shutdown() -> None:
"""Tear down background tasks on application shutdown."""

await _LINK_MANAGER.stop()
global _NOTIFICATION_UNSUBSCRIBER
if _NOTIFICATION_UNSUBSCRIBER is not None:
try:
await _NOTIFICATION_UNSUBSCRIBER()
finally:
_NOTIFICATION_UNSUBSCRIBER = None
await _CLIENT_MANAGER.shutdown()
global _CLIENT_INSTANCE
_CLIENT_INSTANCE = None


@app.get("/")
Expand All @@ -229,12 +296,13 @@ async def get_gateway_status() -> Dict[str, Any]:

uptime_seconds = (datetime.now(timezone.utc) - _START_TIME).total_seconds()
settings = _CLIENT_MANAGER.get_settings()
server_identity = get_server_identity()
interface_status = _refresh_interface_status()

return {
"version": _GATEWAY_VERSION,
"uptime": _format_uptime(uptime_seconds),
"serverIdentity": settings.server_identity_hash,
"serverIdentity": server_identity,
"clientDisplayName": settings.client_display_name,
"requestTimeoutSeconds": settings.request_timeout_seconds,
"lxmfConfigPath": settings.lxmf_config_path or str(CONFIG_PATH),
Expand Down Expand Up @@ -334,9 +402,7 @@ async def update_event(
) -> JSONResponse:
"""Update an existing event by unique identifier."""

return await context.execute(
"event:update", body=payload, path_params={"uid": uid}
)
return await context.execute("event:update", body=payload, path_params={"uid": uid})


@app.get("/events/{uid}")
Expand Down