Skip to content
This repository was archived by the owner on May 3, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,20 @@ The Emergency Management example exposes both a Reticulum‑backed service and a
gateway implemented with FastAPI. Start the Reticulum service first:

```bash
python examples/EmergencyManagement/Server/server_emergency.py
python examples/EmergencyManagement/Server/server_emergency.py \
--config-path ~/.reticulum \
--storage-path ~/.reticulum/lxmf \
--display-name "Emergency Ops" \
--auth-token YOUR_TOKEN \
--link-keepalive-interval 30 \
--database-path ./emergency.db
```

All flags are optional; omit them to fall back to the defaults embedded in the
example. At startup the server prints the announced identity hash, command
destination hash, and the active configuration so that clients can copy the
values without digging through the code.

### Starting the HTTP gateway

Launch the FastAPI gateway with `uvicorn` to serve the OpenAPI on
Expand Down
1 change: 1 addition & 0 deletions TASK.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,6 @@
## 2025-09-26
- [x] Derive EmergencyManagement database configuration from the server module path and expose runtime overrides.
- [x] Add sortable tables to the EmergencyManagement web UI for messages and events.
- [x] Extend EmergencyManagement server CLI to accept runtime overrides and document usage.


11 changes: 11 additions & 0 deletions docs/emergency_management_architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ associated event reports. The upcoming FastAPI gateway will reuse the existing
LXMF service stack while presenting a northbound REST API for web and mobile
clients.

### Runtime configuration

The LXMF service located at
`examples/EmergencyManagement/Server/server_emergency.py` accepts command-line
flags so operators can point the process at different Reticulum configurations,
LXMF storage directories, database locations, and authentication tokens without
editing the source code. The same CLI also controls the announced display name
and link keepalive interval. When the service starts it prints a summary of the
effective settings alongside the identity and destination hashes used by LXMF
clients and gateways.

## Emergency Action Message Operations

| Operation | LXMF Command | Controller Method | Input Dataclass | Output |
Expand Down
163 changes: 144 additions & 19 deletions examples/EmergencyManagement/Server/server_emergency.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import sys
from contextlib import suppress
from pathlib import Path
from typing import Optional, Sequence
from typing import Any, Dict, Optional, Sequence


def _ensure_standard_library_on_path() -> None:
Expand Down Expand Up @@ -147,26 +147,146 @@ def _sync_handler(*_: int, **__: object) -> None:
EmergencyService = None


def _resolve_database_override(argv: Optional[Sequence[str]]) -> Optional[str]:
"""Parse ``argv`` for optional database overrides."""
def _build_arg_parser() -> argparse.ArgumentParser:
"""Construct the command-line parser for the server module."""

if argv is None:
return None
parser = argparse.ArgumentParser(
description="Run the Emergency Management LXMF service.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--config-path",
dest="config_path",
help="Path to the Reticulum configuration directory.",
)
parser.add_argument(
"--storage-path",
dest="storage_path",
help="Directory used for LXMF storage and message persistence.",
)
parser.add_argument(
"--display-name",
dest="display_name",
help="Display name announced for the LXMF identity.",
)
parser.add_argument(
"--auth-token",
dest="auth_token",
help="Auth token required from clients when sending commands.",
)
parser.add_argument(
"--link-keepalive-interval",
dest="link_keepalive_interval",
type=float,
help="Seconds between LXMF link keepalive packets.",
)
parser.add_argument(
"--database-url",
dest="database_url",
help="SQLAlchemy database URL override.",
)
parser.add_argument(
"--database-path",
dest="database_path",
help="Filesystem path to a SQLite database file.",
)
parser.add_argument(
"--database",
dest="database",
help="Backward compatible alias for --database-path.",
)
return parser


def _parse_args(argv: Optional[Sequence[str]]) -> argparse.Namespace:
"""Parse ``argv`` into configuration options for the service."""

parser = _build_arg_parser()
return parser.parse_args(argv)

parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("--database")
parser.add_argument("--database-path")
parser.add_argument("--database-url")
parsed, _ = parser.parse_known_args(list(argv))

for candidate in (parsed.database_url, parsed.database_path, parsed.database):
if candidate:
return candidate
def _select_database_override(args: argparse.Namespace) -> Optional[str]:
"""Determine the preferred database override from parsed arguments."""

for attr in ("database_url", "database_path", "database"):
value = getattr(args, attr, None)
if value:
return value

return None


async def main(argv: Optional[Sequence[str]] = None) -> None:
def _prepare_service_kwargs(args: argparse.Namespace) -> Dict[str, Any]:
"""Extract keyword arguments for ``EmergencyService`` from ``args``."""

candidate_kwargs: Dict[str, Any] = {
"config_path": getattr(args, "config_path", None),
"storage_path": getattr(args, "storage_path", None),
"display_name": getattr(args, "display_name", None),
"auth_token": getattr(args, "auth_token", None),
"link_keepalive_interval": getattr(args, "link_keepalive_interval", None),
}
return {
key: value
for key, value in candidate_kwargs.items()
if value is not None
}


def _format_hash(value: Optional[bytes]) -> str:
"""Convert a destination or identity hash into a human readable string."""

if not value:
return "n/a"

return value.hex().upper()


def _emit_startup_summary(
service: Any,
database_url: str,
args: argparse.Namespace,
) -> None:
"""Print a summary of the active runtime configuration to stdout."""

identity_hash = _format_hash(
getattr(getattr(service, "source_identity", None), "hash", None)
)
destination_hash = _format_hash(
getattr(getattr(service, "destination", None), "hash", None)
)
link_destination = getattr(service, "link_destination", None)
if link_destination is None:
link_hash = "disabled"
else:
link_hash = _format_hash(getattr(link_destination, "hash", None))

display_name = getattr(args, "display_name", None) or "ReticulumOpenAPI"
storage_path = getattr(args, "storage_path", None) or "default"
config_path = getattr(args, "config_path", None) or "default"
auth_token = "set" if getattr(args, "auth_token", None) else "not set"
keepalive = getattr(args, "link_keepalive_interval", None)
keepalive_display = keepalive if keepalive is not None else "default"

summary_lines = [
"Emergency Management service is running.",
f" Identity hash: {identity_hash}",
f" Command destination: {destination_hash}",
f" Link destination: {link_hash}",
f" Reticulum config: {config_path}",
f" LXMF storage: {storage_path}",
f" Display name: {display_name}",
f" Auth token: {auth_token}",
f" Link keepalive interval: {keepalive_display}",
f" Database URL: {database_url}",
]
print("\n".join(summary_lines))


async def main(
options: Optional[argparse.Namespace] = None,
argv: Optional[Sequence[str]] = None,
) -> None:
"""Run the emergency management service until interrupted.

Returns:
Expand All @@ -183,19 +303,24 @@ async def main(argv: Optional[Sequence[str]] = None) -> None:
):
raise RuntimeError("Emergency service dependencies failed to load")

if argv is None:
argv = sys.argv[1:]
if options is None:
if argv is None:
argv = sys.argv[1:]
options = _parse_args(list(argv))

_configure_environment()
override = _resolve_database_override(argv)
override = _select_database_override(options)
configured_url = configure_database(override)
await init_db(configured_url)
async with EmergencyService() as svc:
service_kwargs = _prepare_service_kwargs(options)
async with EmergencyService(**service_kwargs) as svc:
svc.announce()
_emit_startup_summary(svc, configured_url, options)
stop_event = asyncio.Event()
_register_shutdown_signals(stop_event)
await stop_event.wait()


if __name__ == "__main__":
asyncio.run(main())
parsed_args = _parse_args(sys.argv[1:])
asyncio.run(main(parsed_args))
120 changes: 120 additions & 0 deletions tests/examples/emergency_management/test_server_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""Tests for the Emergency Management server CLI helpers."""

from __future__ import annotations

from types import SimpleNamespace

import pytest

from examples.EmergencyManagement.Server import server_emergency


class _StubService:
"""Async context manager that captures invocation parameters."""

def __init__(self, **kwargs):
self.kwargs = kwargs
self.announced = False
self.destination = SimpleNamespace(hash=b"\x01" * 16)
self.source_identity = SimpleNamespace(hash=b"\x02" * 16)
self.link_destination = SimpleNamespace(hash=b"\x03" * 16)

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc, tb):
return False

def announce(self):
self.announced = True


@pytest.mark.asyncio
async def test_main_threads_cli_arguments(monkeypatch, capsys):
"""Parsed CLI options should propagate into the service and database calls."""

configure_calls = []
init_calls = []

class _Factory(_StubService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self_created["instance"] = self

self_created = {}

monkeypatch.setattr(
server_emergency,
"_ensure_dependencies_loaded",
lambda: None,
)
monkeypatch.setattr(server_emergency, "EmergencyService", _Factory)

def _fake_configure(url):
configure_calls.append(url)
return "configured://database"

async def _fake_init(url):
init_calls.append(url)

monkeypatch.setattr(server_emergency, "configure_database", _fake_configure)
monkeypatch.setattr(server_emergency, "init_db", _fake_init)
monkeypatch.setattr(
server_emergency,
"_register_shutdown_signals",
lambda stop_event: stop_event.set(),
)

options = server_emergency._parse_args(
[
"--config-path",
"/var/lib/reticulum",
"--storage-path",
"/var/lib/lxmf",
"--display-name",
"Emergency Ops",
"--auth-token",
"secret-token",
"--link-keepalive-interval",
"15.5",
"--database-path",
"/tmp/emergency.db",
]
)

await server_emergency.main(options=options)

stub_service = self_created["instance"]
assert stub_service.announced is True
assert stub_service.kwargs == {
"config_path": "/var/lib/reticulum",
"storage_path": "/var/lib/lxmf",
"display_name": "Emergency Ops",
"auth_token": "secret-token",
"link_keepalive_interval": 15.5,
}
assert configure_calls == ["/tmp/emergency.db"]
assert init_calls == ["configured://database"]

output = capsys.readouterr().out
assert "Emergency Management service is running." in output
assert "configured://database" in output
assert "01010101010101010101010101010101" in output


def test_database_override_priority():
"""The CLI should prefer explicit URLs over filesystem paths."""

options = server_emergency._parse_args(
[
"--database",
"./fallback.db",
"--database-path",
"./preferred.db",
"--database-url",
"postgresql+asyncpg://user:pass@host/db",
]
)

override = server_emergency._select_database_override(options)
assert override == "postgresql+asyncpg://user:pass@host/db"