Skip to content
This repository was archived by the owner on May 3, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions TASK.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,4 @@

## 2025-11-11
- [x] Pin Reticulum (RNS) to 1.0.2 and LXMF to 0.9.2 in project dependencies.
- [x] Centralise payload conversion utilities and refactor EmergencyManagement client and gateway to use them.
245 changes: 26 additions & 219 deletions examples/EmergencyManagement/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,18 @@

from __future__ import annotations

from typing import Dict
from typing import List
from typing import Optional

from reticulum_openapi.client import LXMFClient as BaseLXMFClient
from reticulum_openapi.codec_msgpack import from_bytes
from reticulum_openapi.model import dataclass_from_json
from examples.EmergencyManagement.Server.models_emergency import (
DeleteEmergencyActionMessageResult,
DeleteEventResult,
EmergencyActionMessage,
Event,
DeleteEmergencyActionMessageResult,
)

_JSON_DECODE_FAILED = object()


def _decode_json_payload(payload: Optional[bytes], target_type):
"""Attempt to decode a compressed JSON payload into ``target_type``.

Args:
payload (Optional[bytes]): Raw payload returned by the service.
target_type: Dataclass or typing annotation describing the desired
structure.

Returns:
object: Decoded dataclass instance or iterable when successful. When
the payload does not appear to be compressed JSON, returns the
``_JSON_DECODE_FAILED`` sentinel value.
"""

if payload is None:
return _JSON_DECODE_FAILED
if len(payload) < 2 or payload[0] != 0x78:
return _JSON_DECODE_FAILED
try:
return dataclass_from_json(target_type, payload)
except (ValueError, UnicodeDecodeError):
return _JSON_DECODE_FAILED


COMMAND_CREATE_EMERGENCY_ACTION_MESSAGE = "CreateEmergencyActionMessage"
COMMAND_DELETE_EMERGENCY_ACTION_MESSAGE = "DeleteEmergencyActionMessage"
COMMAND_LIST_EMERGENCY_ACTION_MESSAGE = "ListEmergencyActionMessage"
Expand All @@ -55,183 +28,6 @@ def _decode_json_payload(payload: Optional[bytes], target_type):
LXMFClient = BaseLXMFClient


def _decode_emergency_action_message(
payload: Optional[bytes],
) -> EmergencyActionMessage:
"""Return an :class:`EmergencyActionMessage` decoded from MessagePack bytes.

Args:
payload (Optional[bytes]): MessagePack payload returned by the service.

Returns:
EmergencyActionMessage: Dataclass populated from ``payload``.

Raises:
ValueError: If ``payload`` is ``None`` or not a valid MessagePack document.
"""

if payload is None:
raise ValueError("Response payload is required")

json_result = _decode_json_payload(payload, EmergencyActionMessage)
if json_result is not _JSON_DECODE_FAILED:
if json_result is None:
raise ValueError("Decoded payload cannot be null")
return json_result

data = from_bytes(payload)
if not isinstance(data, dict):
raise ValueError("Decoded payload must be a mapping")
return EmergencyActionMessage(**data)


def _decode_optional_emergency_action_message(
payload: Optional[bytes],
) -> Optional[EmergencyActionMessage]:
"""Return an optional :class:`EmergencyActionMessage` decoded from bytes."""

if payload is None:
return None

json_result = _decode_json_payload(payload, EmergencyActionMessage)
if json_result is not _JSON_DECODE_FAILED:
return json_result

data = from_bytes(payload)
if data is None:
return None
if not isinstance(data, dict):
raise ValueError("Decoded payload must be a mapping")
return EmergencyActionMessage(**data)


def _decode_emergency_action_message_list(
payload: Optional[bytes],
) -> List[EmergencyActionMessage]:
"""Return a list of :class:`EmergencyActionMessage` decoded from bytes."""

if payload is None:
return []

json_result = _decode_json_payload(payload, List[EmergencyActionMessage])
if json_result is not _JSON_DECODE_FAILED:
if json_result is None:
return []
return list(json_result)

data = from_bytes(payload)
if data is None:
return []
if not isinstance(data, list):
raise ValueError("Decoded payload must be a list")

messages: List[EmergencyActionMessage] = []
for item in data:
if not isinstance(item, dict):
raise ValueError("Each emergency action payload must be a mapping")
messages.append(EmergencyActionMessage(**item))
return messages


def _decode_delete_emergency_action_message_result(
payload: Optional[bytes],
) -> DeleteEmergencyActionMessageResult:
"""Return the delete emergency action response decoded from bytes."""

if payload is None:
raise ValueError("Response payload is required")

json_result = _decode_json_payload(payload, DeleteEmergencyActionMessageResult)
if json_result is not _JSON_DECODE_FAILED:
if json_result is None:
raise ValueError("Decoded payload cannot be null")
return json_result

data = from_bytes(payload)
if not isinstance(data, dict):
raise ValueError("Decoded payload must be a mapping")
return DeleteEmergencyActionMessageResult(**data)


def _decode_event(payload: Optional[bytes]) -> Event:
"""Return an :class:`Event` decoded from MessagePack bytes."""

if payload is None:
raise ValueError("Response payload is required")

json_result = _decode_json_payload(payload, Event)
if json_result is not _JSON_DECODE_FAILED:
if json_result is None:
raise ValueError("Decoded payload cannot be null")
return json_result

data = from_bytes(payload)

if data is None:
raise ValueError("Decoded payload cannot be null")
if not isinstance(data, dict):
raise ValueError("Decoded payload must be a mapping")
return Event(**data)


def _decode_optional_event(payload: Optional[bytes]) -> Optional[Event]:
"""Return an optional :class:`Event` decoded from MessagePack bytes."""

if payload is None:
return None

json_result = _decode_json_payload(payload, Event)
if json_result is not _JSON_DECODE_FAILED:
return json_result

data = from_bytes(payload)

if data is None:
return None
if not isinstance(data, dict):
raise ValueError("Decoded payload must be a mapping")
return Event(**data)


def _decode_event_list(payload: Optional[bytes]) -> List[Event]:
"""Return a list of :class:`Event` instances decoded from MessagePack."""

if payload is None:
return []

json_result = _decode_json_payload(payload, List[Event])
if json_result is not _JSON_DECODE_FAILED:
if json_result is None:
return []
return list(json_result)

data = from_bytes(payload)

if data is None:
return []
if not isinstance(data, list):
raise ValueError("Decoded payload must be a list")

events: List[Event] = []
for item in data:
if not isinstance(item, dict):
raise ValueError("Each event payload must be a mapping")
events.append(Event(**item))
return events


def _decode_delete_event_response(payload: Optional[bytes]) -> dict:
"""Return the delete event response decoded from MessagePack bytes."""

if payload is None:
raise ValueError("Response payload is required")

data = from_bytes(payload)
if not isinstance(data, dict):
raise ValueError("Decoded payload must be a mapping")
return data


async def create_emergency_action_message(
client: LXMFClient,
server_identity_hash: str,
Expand All @@ -253,8 +49,9 @@ async def create_emergency_action_message(
COMMAND_CREATE_EMERGENCY_ACTION_MESSAGE,
message,
await_response=True,
response_type=EmergencyActionMessage,
)
return _decode_emergency_action_message(response)
return response


async def retrieve_emergency_action_message(
Expand All @@ -278,8 +75,9 @@ async def retrieve_emergency_action_message(
COMMAND_RETRIEVE_EMERGENCY_ACTION_MESSAGE,
callsign,
await_response=True,
response_type=Optional[EmergencyActionMessage],
)
return _decode_optional_emergency_action_message(response)
return response


async def list_emergency_action_messages(
Expand All @@ -293,8 +91,9 @@ async def list_emergency_action_messages(
COMMAND_LIST_EMERGENCY_ACTION_MESSAGE,
None,
await_response=True,
response_type=List[EmergencyActionMessage],
)
return _decode_emergency_action_message_list(response)
return response


async def update_emergency_action_message(
Expand All @@ -309,8 +108,9 @@ async def update_emergency_action_message(
COMMAND_PUT_EMERGENCY_ACTION_MESSAGE,
message,
await_response=True,
response_type=Optional[EmergencyActionMessage],
)
return _decode_optional_emergency_action_message(response)
return response


async def delete_emergency_action_message(
Expand All @@ -325,8 +125,9 @@ async def delete_emergency_action_message(
COMMAND_DELETE_EMERGENCY_ACTION_MESSAGE,
callsign,
await_response=True,
response_type=DeleteEmergencyActionMessageResult,
)
return _decode_delete_emergency_action_message_result(response)
return response


async def create_event(
Expand All @@ -341,8 +142,9 @@ async def create_event(
COMMAND_CREATE_EVENT,
event,
await_response=True,
response_type=Event,
)
return _decode_event(response)
return response


async def retrieve_event(
Expand All @@ -357,8 +159,9 @@ async def retrieve_event(
COMMAND_RETRIEVE_EVENT,
str(uid),
await_response=True,
response_type=Optional[Event],
)
return _decode_optional_event(response)
return response


async def update_event(
Expand All @@ -373,24 +176,27 @@ async def update_event(
COMMAND_PUT_EVENT,
event,
await_response=True,
response_type=Optional[Event],
)
return _decode_optional_event(response)
return response


async def delete_event(
client: LXMFClient,
server_identity_hash: str,
uid: int,
) -> dict:
"""Delete an event and return the raw response payload."""
) -> Dict[str, object]:
"""Delete an event and return the normalised response payload."""

response = await client.send_command(
server_identity_hash,
COMMAND_DELETE_EVENT,
str(uid),
await_response=True,
response_type=DeleteEventResult,
normalise=True,
)
return _decode_delete_event_response(response)
return response


async def list_events(
Expand All @@ -404,5 +210,6 @@ async def list_events(
COMMAND_LIST_EVENT,
None,
await_response=True,
response_type=List[Event],
)
return _decode_event_list(response)
return response
Loading
Loading