Skip to content
This repository was archived by the owner on May 3, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 257 additions & 19 deletions examples/EmergencyManagement/web_gateway/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
from contextlib import suppress
from dataclasses import dataclass, fields, is_dataclass
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import (
Any,
Awaitable,
Callable,
Dict,
List,
Literal,
Optional,
Type,
TypeVar,
Expand All @@ -33,6 +35,8 @@
from fastapi.responses import JSONResponse

from examples.EmergencyManagement.Server.models_emergency import (
DeleteEmergencyActionMessageResult,
DeleteEventResult,
EmergencyActionMessage,
Event,
)
Expand Down Expand Up @@ -77,6 +81,66 @@
ConfigDict = Dict[str, Any]
T = TypeVar("T")


@dataclass(frozen=True)
class CommandSpec:
"""Describe an LXMF command handled by the gateway."""

command: str
request_type: Optional[Any] = None
response_type: Optional[Any] = None
path_field: Optional[str] = None


_COMMAND_SPECS: Dict[str, CommandSpec] = {
"eam:create": CommandSpec(
command=COMMAND_CREATE_EAM,
request_type=EmergencyActionMessage,
response_type=EmergencyActionMessage,
),
"eam:update": CommandSpec(
command=COMMAND_PUT_EAM,
request_type=EmergencyActionMessage,
response_type=Optional[EmergencyActionMessage],
path_field="callsign",
),
"eam:list": CommandSpec(
command=COMMAND_LIST_EAM,
response_type=List[EmergencyActionMessage],
),
"eam:retrieve": CommandSpec(
command=COMMAND_RETRIEVE_EAM,
response_type=Optional[EmergencyActionMessage],
),
"eam:delete": CommandSpec(
command=COMMAND_DELETE_EAM,
response_type=DeleteEmergencyActionMessageResult,
),
"event:create": CommandSpec(
command=COMMAND_CREATE_EVENT,
request_type=Event,
response_type=Event,
),
"event:update": CommandSpec(
command=COMMAND_PUT_EVENT,
request_type=Event,
response_type=Optional[Event],
path_field="uid",
),
"event:list": CommandSpec(
command=COMMAND_LIST_EVENT,
response_type=List[Event],
),
"event:retrieve": CommandSpec(
command=COMMAND_RETRIEVE_EVENT,
response_type=Optional[Event],
),
"event:delete": CommandSpec(
command=COMMAND_DELETE_EVENT,
response_type=DeleteEventResult,
),
}

logger = logging.getLogger(__name__)

load_dotenv()
Expand Down Expand Up @@ -395,6 +459,51 @@ async def _shutdown() -> None:
def _convert_value(expected_type: Type[Any], value: Any) -> Any:
"""Recursively convert JSON values to dataclass field types."""

if expected_type is Any or expected_type is object:
return value
if expected_type is str:
if isinstance(value, str):
return value
if isinstance(value, (bytes, bytearray, memoryview)):
try:
return bytes(value).decode("utf-8")
except UnicodeDecodeError as exc:
raise ValueError("Unable to decode bytes to string") from exc
raise TypeError(f"Expected string for type {expected_type}")
if expected_type is int:
if isinstance(value, bool):
raise TypeError("Boolean value is not a valid integer")
if isinstance(value, int):
return value
if isinstance(value, float) and value.is_integer():
return int(value)
if isinstance(value, str):
cleaned = value.strip()
try:
return int(cleaned, 10)
except ValueError as exc:
raise ValueError(f"Unable to convert '{value}' to int") from exc
raise TypeError(f"Expected integer for type {expected_type}")
if expected_type is float:
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
cleaned = value.strip()
try:
return float(cleaned)
except ValueError as exc:
raise ValueError(f"Unable to convert '{value}' to float") from exc
raise TypeError(f"Expected float for type {expected_type}")
if expected_type is bool:
if isinstance(value, bool):
return value
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in {"true", "1", "yes", "on"}:
return True
if lowered in {"false", "0", "no", "off"}:
return False
raise TypeError(f"Expected boolean for type {expected_type}")
origin = get_origin(expected_type)
if origin is Union:
for arg in get_args(expected_type):
Expand All @@ -407,6 +516,13 @@ def _convert_value(expected_type: Type[Any], value: Any) -> Any:
except (TypeError, ValueError):
continue
raise ValueError(f"Unable to match value {value!r} to type {expected_type}")
if origin is Literal:
allowed = get_args(expected_type)
if value in allowed:
return value
raise ValueError(
f"Value {value!r} is not permitted for literal {expected_type}"
)
if origin in (list, List):
if not isinstance(value, list):
raise TypeError(f"Expected list for type {expected_type}")
Expand Down Expand Up @@ -438,6 +554,7 @@ async def _send_command(
server_identity: str,
command: str,
payload: Optional[object],
response_type: Optional[Any] = None,
) -> JSONResponse:
"""Send a command through LXMF and return the decoded response."""

Expand Down Expand Up @@ -480,7 +597,70 @@ async def _send_command(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=str(exc),
) from exc
return JSONResponse(content=data)

converted = data
if response_type is not None:
try:
converted = _convert_value(response_type, data)
except (TypeError, ValueError) as exc:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Unable to decode response payload: {exc}",
) from exc

normalised = _normalise_response(converted)
return JSONResponse(content=normalised)


def _prepare_payload(
spec: CommandSpec,
payload: Optional[Dict[str, Any]] = None,
*,
overrides: Optional[Dict[str, Any]] = None,
) -> Any:
"""Return the payload shaped for the LXMF command described by ``spec``."""

if spec.request_type is None:
if payload is not None and overrides:
merged: Dict[str, Any] = dict(payload)
merged.update(overrides)
return merged
if payload is not None:
return payload
if overrides is not None:
if len(overrides) == 1:
return next(iter(overrides.values()))
return dict(overrides)
return None

data: Dict[str, Any] = {}
if payload is not None:
data.update(payload)
if overrides is not None:
data.update(overrides)
return _build_dataclass(spec.request_type, data)


def _normalise_response(value: Any) -> Any:
"""Convert dataclasses, enums, and iterables into JSON-serialisable data."""

if value is None:
return None
if is_dataclass(value):
result: Dict[str, Any] = {}
for field in fields(value):
field_value = getattr(value, field.name)
if field_value is None:
continue
result[field.name] = _normalise_response(field_value)
return result
if isinstance(value, Enum):
return value.value
if isinstance(value, dict):
return {str(key): _normalise_response(item) for key, item in value.items()}
if isinstance(value, (list, tuple, set)):
return [_normalise_response(item) for item in value]
return value


@app.get("/")
Expand Down Expand Up @@ -540,8 +720,14 @@ async def create_emergency_action_message(
) -> JSONResponse:
"""Create a new emergency action message via LXMF."""

message = _build_dataclass(EmergencyActionMessage, payload)
return await _send_command(server_identity, COMMAND_CREATE_EAM, message)
spec = _COMMAND_SPECS["eam:create"]
message = _prepare_payload(spec, dict(payload))
return await _send_command(
server_identity,
spec.command,
message,
response_type=spec.response_type,
)


@app.delete("/emergency-action-messages/{callsign}")
Expand All @@ -551,7 +737,13 @@ async def delete_emergency_action_message(
) -> JSONResponse:
"""Delete an emergency action message by callsign."""

return await _send_command(server_identity, COMMAND_DELETE_EAM, callsign)
spec = _COMMAND_SPECS["eam:delete"]
return await _send_command(
server_identity,
spec.command,
callsign,
response_type=spec.response_type,
)


@app.get("/emergency-action-messages")
Expand All @@ -560,7 +752,13 @@ async def list_emergency_action_messages(
) -> JSONResponse:
"""List stored emergency action messages."""

return await _send_command(server_identity, COMMAND_LIST_EAM, None)
spec = _COMMAND_SPECS["eam:list"]
return await _send_command(
server_identity,
spec.command,
None,
response_type=spec.response_type,
)


@app.put("/emergency-action-messages/{callsign}")
Expand All @@ -571,10 +769,15 @@ async def update_emergency_action_message(
) -> JSONResponse:
"""Update an existing emergency action message."""

payload = dict(payload)
payload["callsign"] = callsign
message = _build_dataclass(EmergencyActionMessage, payload)
return await _send_command(server_identity, COMMAND_PUT_EAM, message)
spec = _COMMAND_SPECS["eam:update"]
overrides = {spec.path_field: callsign} if spec.path_field else {"callsign": callsign}
message = _prepare_payload(spec, dict(payload), overrides=overrides)
return await _send_command(
server_identity,
spec.command,
message,
response_type=spec.response_type,
)


@app.get("/emergency-action-messages/{callsign}")
Expand All @@ -584,7 +787,13 @@ async def retrieve_emergency_action_message(
) -> JSONResponse:
"""Retrieve an emergency action message by callsign."""

return await _send_command(server_identity, COMMAND_RETRIEVE_EAM, callsign)
spec = _COMMAND_SPECS["eam:retrieve"]
return await _send_command(
server_identity,
spec.command,
callsign,
response_type=spec.response_type,
)


@app.post("/events")
Expand All @@ -594,8 +803,14 @@ async def create_event(
) -> JSONResponse:
"""Create a new event record via LXMF."""

event = _build_dataclass(Event, payload)
return await _send_command(server_identity, COMMAND_CREATE_EVENT, event)
spec = _COMMAND_SPECS["event:create"]
event = _prepare_payload(spec, dict(payload))
return await _send_command(
server_identity,
spec.command,
event,
response_type=spec.response_type,
)


@app.delete("/events/{uid}")
Expand All @@ -605,7 +820,13 @@ async def delete_event(
) -> JSONResponse:
"""Delete an event by unique identifier."""

return await _send_command(server_identity, COMMAND_DELETE_EVENT, uid)
spec = _COMMAND_SPECS["event:delete"]
return await _send_command(
server_identity,
spec.command,
uid,
response_type=spec.response_type,
)


@app.get("/events")
Expand All @@ -614,7 +835,13 @@ async def list_events(
) -> JSONResponse:
"""List events stored on the server."""

return await _send_command(server_identity, COMMAND_LIST_EVENT, None)
spec = _COMMAND_SPECS["event:list"]
return await _send_command(
server_identity,
spec.command,
None,
response_type=spec.response_type,
)


@app.put("/events/{uid}")
Expand All @@ -625,10 +852,15 @@ async def update_event(
) -> JSONResponse:
"""Update an existing event by unique identifier."""

payload = dict(payload)
payload["uid"] = uid
event = _build_dataclass(Event, payload)
return await _send_command(server_identity, COMMAND_PUT_EVENT, event)
spec = _COMMAND_SPECS["event:update"]
overrides = {spec.path_field: uid} if spec.path_field else {"uid": uid}
event = _prepare_payload(spec, dict(payload), overrides=overrides)
return await _send_command(
server_identity,
spec.command,
event,
response_type=spec.response_type,
)


@app.get("/events/{uid}")
Expand All @@ -638,4 +870,10 @@ async def retrieve_event(
) -> JSONResponse:
"""Retrieve an event by unique identifier."""

return await _send_command(server_identity, COMMAND_RETRIEVE_EVENT, uid)
spec = _COMMAND_SPECS["event:retrieve"]
return await _send_command(
server_identity,
spec.command,
uid,
response_type=spec.response_type,
)
Loading
Loading