Skip to content
This repository was archived by the owner on May 3, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[flake8]
max-line-length = 120
exclude = .git,__pycache__,build,dist
14 changes: 14 additions & 0 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: CI
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.10'
- run: python -m pip install --upgrade pip
- run: pip install -r requirements.txt
- run: flake8 reticulum_openapi examples tests
- run: pytest
327 changes: 15 additions & 312 deletions README.md

Large diffs are not rendered by default.

200 changes: 200 additions & 0 deletions docs/protocol_design.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion examples/EmergencyManagement/Server/server_emergency.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async def main():
svc.add_route("PatchEvent", evc.PatchEvent, Event)
svc.add_route("RetrieveEvent", evc.RetrieveEvent)
svc.announce()
svc.start()
await svc.start()

if __name__ == "__main__":
asyncio.run(main())
17 changes: 17 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"

[project]
name = "reticulum-openapi"
version = "0.1.0"
description = "Reticulum LXMF-based OpenAPI framework"
readme = "README.md"
requires-python = ">=3.8"
license = {text = "MIT"}
dependencies = [
"RNS",
"LXMF",
"SQLAlchemy",
"jsonschema"
]
6 changes: 6 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
RNS
LXMF
SQLAlchemy
jsonschema
pytest
flake8
73 changes: 73 additions & 0 deletions reticulum_openapi/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import asyncio
import RNS, LXMF
from typing import Optional, Dict, Callable, Type
from .model import dataclass_to_json, dataclass_from_json

class LXMFClient:
"""Simple client for sending commands and awaiting responses."""

def __init__(self, config_path: str = None, storage_path: str = None,
identity: RNS.Identity = None, display_name: str = "OpenAPIClient",
auth_token: str = None, timeout: float = 10.0):
self.reticulum = RNS.Reticulum(config_path)
storage_path = storage_path or (RNS.Reticulum.storagepath + "/lxmf_client")
self.router = LXMF.LXMRouter(storagepath=storage_path)
self.router.register_delivery_callback(self._callback)
if identity is None:
identity = RNS.Identity()
self.identity = identity
self.source_identity = self.router.register_delivery_identity(
identity, display_name=display_name, stamp_cost=0
)
self._loop = asyncio.get_event_loop()
self._futures: Dict[str, asyncio.Future] = {}
self.auth_token = auth_token
self.timeout = timeout

def _callback(self, message: LXMF.LXMessage):
title = message.title
future = self._futures.pop(title, None)
if future is not None and not future.done():
future.set_result(message.content)

async def send_command(self, dest_hex: str, command: str, payload_obj=None,
await_response: bool = True, response_title: Optional[str] = None):
dest_hash = bytes.fromhex(dest_hex)
if not RNS.Transport.has_path(dest_hash):
RNS.Transport.request_path(dest_hash)
for _ in range(50):
if RNS.Transport.has_path(dest_hash):
break
await asyncio.sleep(0.1)
dest_identity = RNS.Identity.recall(dest_hash) or RNS.Identity.recall(dest_hash, create=True)
if payload_obj is None:
content_bytes = b''
elif isinstance(payload_obj, bytes):
content_bytes = payload_obj
else:
data = dataclass_to_json(payload_obj)
if self.auth_token:
import json, zlib
obj = dataclass_from_json(type(payload_obj), data)
obj_dict = obj.__dict__
obj_dict['auth_token'] = self.auth_token
data = zlib.compress(json.dumps(obj_dict).encode('utf-8'))
content_bytes = data
lxmsg = LXMF.LXMessage(
RNS.Destination(dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery"),
self.source_identity, content_bytes, command
)
future = None
if await_response:
response_title = response_title or f"{command}_response"
future = self._loop.create_future()
self._futures[response_title] = future
self.router.handle_outbound(lxmsg)
if future:
try:
resp = await asyncio.wait_for(future, timeout=self.timeout)
return resp
except asyncio.TimeoutError:
self._futures.pop(response_title, None)
raise TimeoutError("No response received")
return None
57 changes: 35 additions & 22 deletions reticulum_openapi/service.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
# reticulum_openapi/service.py
import asyncio
import time
import json
import zlib
import RNS, LXMF
from typing import Callable, Dict, Optional, Type
from jsonschema import validate, ValidationError
from .model import dataclass_from_json, dataclass_to_json

class LXMFService:
def __init__(self, config_path: str = None, storage_path: str = None,
identity: RNS.Identity = None, display_name: str = "ReticulumOpenAPI",
stamp_cost: int = 0):
def __init__(self, config_path: str = None, storage_path: str = None,
identity: RNS.Identity = None, display_name: str = "ReticulumOpenAPI",
stamp_cost: int = 0, auth_token: str = None,
max_payload_size: int = 32_000):
"""
Initialize the LXMF Service dispatcher.
:param config_path: Path to Reticulum config directory (None for default).
Expand All @@ -33,16 +38,19 @@ def __init__(self, config_path: str = None, storage_path: str = None,
# Routing table: command -> (handler_coroutine, payload_type)
self._routes: Dict[str, (Callable, Optional[Type])] = {}
self._loop = asyncio.get_event_loop()
self.auth_token = auth_token
self.max_payload_size = max_payload_size
RNS.log(f"LXMFService initialized (Identity hash: {RNS.prettyhexrep(self.source_identity.hash)})")

def add_route(self, command: str, handler: Callable, payload_type: Optional[Type] = None):
def add_route(self, command: str, handler: Callable, payload_type: Optional[Type] = None,
payload_schema: dict = None):
"""
Register a handler for a given command name.
:param command: Command string (should match LXMF message title).
:param handler: Async function to handle the command.
:param payload_type: Dataclass type for request payload, or None for raw dict/bytes.
"""
self._routes[command] = (handler, payload_type)
self._routes[command] = (handler, payload_type, payload_schema)
RNS.log(f"Route registered: '{command}' -> {handler}")

def _lxmf_delivery_callback(self, message: LXMF.LXMessage):
Expand All @@ -61,9 +69,12 @@ def _lxmf_delivery_callback(self, message: LXMF.LXMessage):
if cmd not in self._routes:
RNS.log(f"No route found for command: {cmd}")
return
handler, payload_type = self._routes[cmd]
handler, payload_type, payload_schema = self._routes[cmd]
# Decode payload
if payload_bytes:
if len(payload_bytes) > self.max_payload_size:
RNS.log(f"Payload for {cmd} exceeds maximum size")
return
if payload_type:
try:
# Parse bytes into the expected dataclass
Expand All @@ -74,7 +85,6 @@ def _lxmf_delivery_callback(self, message: LXMF.LXMessage):
else:
# If no type provided, just decode JSON to dict
try:
import json, zlib
json_bytes = zlib.decompress(payload_bytes)
payload_obj = json.loads(json_bytes.decode('utf-8'))
except zlib.error:
Expand All @@ -83,6 +93,16 @@ def _lxmf_delivery_callback(self, message: LXMF.LXMessage):
except Exception as e:
RNS.log(f"Invalid JSON payload for {cmd}: {e}")
return
if payload_schema is not None:
try:
validate(payload_obj, payload_schema)
except ValidationError as e:
RNS.log(f"Schema validation failed for {cmd}: {e.message}")
return
if self.auth_token and isinstance(payload_obj, dict):
if payload_obj.get('auth_token') != self.auth_token:
RNS.log("Authentication failed for message")
return
else:
payload_obj = None # No payload content
# Dispatch to handler asynchronously
Expand Down Expand Up @@ -145,7 +165,7 @@ def _send_lxmf(self, dest_identity: RNS.Identity, title: str, content_bytes: byt
# Dispatch the message via the router
self.router.handle_outbound(lxmessage)

def send_message(self, dest_hex: str, command: str, payload_obj=None, await_path: bool = True):
async def send_message(self, dest_hex: str, command: str, payload_obj=None, await_path: bool = True):
"""
Public method to send a command to another LXMF node (by hex hash of its identity).
This can be used by clients or by the server to send outbound notifications.
Expand All @@ -160,10 +180,10 @@ def send_message(self, dest_hex: str, command: str, payload_obj=None, await_path
if await_path and not RNS.Transport.has_path(dest_hash):
RNS.log("Destination not in routing table, requesting path...")
RNS.Transport.request_path(dest_hash)
# Wait a short while for an announce (up to 5 seconds)
attempts = 0
while attempts < 50 and not RNS.Transport.has_path(dest_hash):
time.sleep(0.1); attempts += 1
await asyncio.sleep(0.1)
attempts += 1
# Recall or create Identity object for destination
dest_identity = RNS.Identity.recall(dest_hash)
if dest_identity is None:
Expand All @@ -188,18 +208,11 @@ def announce(self):
except Exception as e:
RNS.log(f"Announcement failed: {e}")

def start(self):
"""
Start the service. In this context, the Reticulum network and LXMF router
are already running after init, so this can simply log and perhaps run an event loop.
"""
async def start(self):
"""Run the service until cancelled."""
RNS.log("LXMFService started and listening for messages...")
# Optionally, block the main thread to keep the service alive
try:
# Keep alive indefinitely; Reticulum threads will handle I/O.
import time
while True:
time.sleep(1)
except KeyboardInterrupt:
RNS.log("Service stopping (KeyboardInterrupt).")
# Cleanup could go here if needed
await asyncio.sleep(1)
except asyncio.CancelledError:
RNS.log("Service stopping (Cancelled)")
14 changes: 14 additions & 0 deletions tests/test_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from dataclasses import dataclass
from reticulum_openapi.model import dataclass_to_json, dataclass_from_json

@dataclass
class Item:
name: str
value: int


def test_serialization_roundtrip():
item = Item(name="foo", value=42)
data = dataclass_to_json(item)
obj = dataclass_from_json(Item, data)
assert obj == item
Loading