diff --git a/docs/specs/hal/time_driver.md b/docs/specs/hal/time_driver.md new file mode 100644 index 00000000..2c1410de --- /dev/null +++ b/docs/specs/hal/time_driver.md @@ -0,0 +1,108 @@ +# Time Driver (HAL) + +## Description + +The time driver is a HAL component that manages the system NTP daemon (`sysntpd`) and exposes a time source capability to the rest of the system. It is responsible for all direct OS interaction related to time synchronisation: restarting the daemon, monitoring kernel hotplug NTP events via ubus, and publishing the current sync state and accuracy metadata as a capability on the bus. + +The time driver acts as both a driver and a time manager — it owns the full lifecycle of `sysntpd` within the running session. + +## Dependencies + +- **ubus capability** (`{'cap', 'ubus', '1', ...}`): required to listen for `hotplug.ntp` kernel events. The driver waits for the ubus capability to become available before starting NTP. + +## Initialisation + +On startup (once the ubus capability is available): + +1. Restart `sysntpd` via `exec.command("/etc/init.d/sysntpd", "restart"):run()`. +2. Register a ubus listener for `hotplug.ntp` events. +3. Publish initial capability `meta` and `state` (initially `synced = false`). + +## Capability + +The driver publishes a single time source capability. The capability id is a UUID generated at startup. In the future, additional time source capabilities may exist alongside this one. + +### Meta (retained) + +Topic: `{'cap', 'time', , 'meta'}` + +```lua +{ + provider = 'hal', + source = 'ntp', -- time source type + version = 1, -- interface version + accuracy_seconds = , -- estimated absolute error in seconds +} +``` + +`accuracy_seconds` is a coarse estimate of absolute clock error and is derived from NTP stratum. Lower values are better. It is `nil` when unsynced (stratum >= 16) or before first sync-quality data is available. + +### State (retained) + +Topic: `{'cap', 'time', , 'state'}` + +```lua +{ + synced = true | false, + stratum = , -- last reported stratum, nil before first event + accuracy_seconds = , +} +``` + +Published on every sync/unsync transition. Retained so new subscribers immediately get the current state. + +### Events (non-retained) + +#### synced + +Topic: `{'cap', 'time', , 'event', 'synced'}` + +Fired when the NTP daemon transitions from unsynced to synced (stratum < 16). Payload: + +```lua +{ stratum = , accuracy_seconds = } +``` + +#### unsynced + +Topic: `{'cap', 'time', , 'event', 'unsynced'}` + +Fired when the NTP daemon transitions from synced to unsynced (stratum == 16 or daemon restarts). Payload: + +```lua +{ stratum = 16, accuracy_seconds = nil } +``` + +These events are non-retained — consumers that need current state should read `{'cap', 'time', '1', 'state'}` first, then subscribe to events for transitions. + +## Service Flow + +```mermaid +flowchart TD + St[Start] --> A(Install alarm handler) + A --> B(Wait for ubus capability) + B -->|ubus cap available| C(Restart sysntpd) + C --> D{sysntpd restart ok?} + D -->|error| E[Log error and stop] + D -->|ok| F(Register ubus listener for hotplug.ntp) + F --> G{Listener registered ok?} + G -->|error| H[Log error and stop] + G -->|ok| I(Publish meta + initial state: synced=false) + I --> J{Wait for hotplug.ntp event or stream closed or context done} + J -->|hotplug.ntp event| K{stratum < 16?} + K -->|yes, was unsynced| L(Publish state synced=true + event/synced) + K -->|no, was synced| M(Publish state synced=false + event/unsynced) + L --> J + M --> J + K -->|no change| J + J -->|stream closed| N[Log warning, stop] + J -->|context done| O(Send stop_stream to ubus) +``` + +## Architecture + +- The driver runs a single main fiber that handles the full lifecycle. No child scope is needed. +- Sync and unsync events are only published on **transitions** — the driver tracks the previous sync state and only emits an event when it changes. +- The retained `state` topic is always updated on any hotplug event regardless of transition, to refresh the stratum value. +- A `finally` block logs the reason for shutdown and performs cleanup (stop_stream if still active). +- The ubus `stream_id` must be stopped cleanly on context cancellation to avoid leaking listener registrations in the ubus driver. diff --git a/docs/specs/time.md b/docs/specs/time.md new file mode 100644 index 00000000..13db5a50 --- /dev/null +++ b/docs/specs/time.md @@ -0,0 +1,76 @@ +# Time Service + +## Description + +The time service listens to time capabilities for sync and unsync events. It uses the events to sync and unsync the alarm module of fibers and broadcast sync/unsync events to the bus for other services to listen to. + +## Time capability + +There is initially only one time capability, provided by the time driver in HAL. In the future we can discover multiple time capabilities and balance multiple sources of time intelligently (e.g. prefer the capability reporting the lowest stratum). + +For now, the time service subscribes to `{'cap', 'time', '+', 'meta', 'source'}` and uses the **first** capability announced. Subsequent announcements are ignored. + +## Bus Outputs + +### Service status (retained) + +Topic: `{'svc', 'time', 'status'}` + +```lua +{ + state = 'starting' | 'running' | 'stopped', + ts = , +} +``` + +### Time synced state (retained) + +Topic: `{'svc', 'time', 'synced'}` + +Payload: `true` or `false` + +Published whenever the overall sync state changes. This is the authoritative "is system time trustworthy" signal for other services. It is retained so services that start later immediately receive the current state. + +### Time transition events (non-retained) + +Topics: +- `{'svc', 'time', 'event', 'synced'}` +- `{'svc', 'time', 'event', 'unsynced'}` + +Published on state transitions for consumers that need edge-triggered behaviour. + +## Service Flow + +```mermaid +flowchart TD + St[Start] --> B(Publish status: starting) + B --> C(Subscribe to cap/time/+/meta/source) + C --> D(Publish status: running) + D --> E{Wait for first time capability meta message or scope done} + E -->|scope done| Z[Publish status: stopped] + E -->|first capability meta received| F(Extract uuid from meta topic) + F --> G(Subscribe to cap/time/uuid/state/synced\ncap/time/uuid/event/synced\ncap/time/uuid/event/unsynced) + G --> H(Read single retained state message and apply sync state) + H --> H2(Unsubscribe from state/synced) + H2 --> K{Wait for synced event, unsynced event, or scope done} + K -->|synced event| L(Apply synced: retain svc/time/synced=true, emit transition event) + L --> K + K -->|unsynced event| M(Apply unsynced: retain svc/time/synced=false, emit transition event) + M --> K + K -->|scope done| Z +``` + +All three subscriptions are created before any message is read, so no events are lost during initialisation. The retained `{'cap', 'time', , 'state', 'synced'}` payload is consumed as a one-shot read to bootstrap sync state, then the state subscription is dropped. Ongoing sync state changes are tracked exclusively through the `event/synced` and `event/unsynced` transition topics. + +With the new fibers alarm API, the service calls: +- `alarm.set_time_source(fibers.utils.time.realtime)` on first synced state +- `alarm.time_changed()` on subsequent synced transitions/events + +There is no direct equivalent of `clock_desynced` in the new API; unsynced updates still propagate over bus outputs. + +## Architecture + +- Everything runs in a single fiber — no child fibers needed. The fiber blocks waiting for the first capability, then transitions directly into the event loop for that capability. +- The service does not interact with the OS directly — all time source information arrives through the capability published by the time driver in HAL. +- Use `finally` to log shutdown reason and publish `stopped` status. + diff --git a/src/configs/config.json b/src/configs/config.json index 1d0ff0f9..a91a41fc 100644 --- a/src/configs/config.json +++ b/src/configs/config.json @@ -10,7 +10,8 @@ "name": "state", "root": "/tmp/dc-states/" } - ] + ], + "time": {} }, "gsm": { "modems": { diff --git a/src/services/hal/backends/time/contract.lua b/src/services/hal/backends/time/contract.lua new file mode 100644 index 00000000..c1da2caf --- /dev/null +++ b/src/services/hal/backends/time/contract.lua @@ -0,0 +1,26 @@ +---@class TimeBackend +---@field start_ntp_monitor fun(self: TimeBackend): boolean, string +---@field ntp_event_op fun(self: TimeBackend): Op +---@field stop fun(self: TimeBackend): boolean, string + +local BACKEND_FUNCTIONS = { + "start_ntp_monitor", + "ntp_event_op", + "stop", +} + +---Check that a time backend provides all required functions. +---@param backend TimeBackend +---@return string error Empty string on success. +local function validate(backend) + for _, func in ipairs(BACKEND_FUNCTIONS) do + if type(backend[func]) ~= "function" then + return "Missing required function: " .. func + end + end + return "" +end + +return { + validate = validate +} diff --git a/src/services/hal/backends/time/provider.lua b/src/services/hal/backends/time/provider.lua new file mode 100644 index 00000000..5377c00c --- /dev/null +++ b/src/services/hal/backends/time/provider.lua @@ -0,0 +1,49 @@ +---Time backend provider factory. +---Selects the appropriate TimeBackend implementation based on the runtime platform. + +local contract = require "services.hal.backends.time.contract" + +local BACKENDS = { + "openwrt" +} + +--- Select and initialize the backend implementation +---@return table backend_impl +local function get_backend_impl() + local backend_impl = nil + for _, backend_name in ipairs(BACKENDS) do + local ok, backend_mod = pcall(require, "services.hal.backends.time.providers." .. backend_name .. ".init") + if ok and type(backend_mod) == "table" and backend_mod.is_supported and backend_mod.is_supported() then + backend_impl = backend_mod.backend + break + end + end + + if backend_impl == nil then + error("No supported time backend found") + end + + return backend_impl +end + +---Create a new TimeBackend instance. +--- +---Detects the platform and instantiates the appropriate backend implementation. +---Fails with an error if no supported backend is found. +--- +---@return TimeBackend +local function new() + local backend_impl = get_backend_impl() + local backend = backend_impl.new() + + local iface_err = contract.validate(backend) + if iface_err ~= "" then + error("Time backend does not implement required interface: " .. tostring(iface_err)) + end + + return backend +end + +return { + new = new, +} diff --git a/src/services/hal/backends/time/providers/openwrt/impl.lua b/src/services/hal/backends/time/providers/openwrt/impl.lua new file mode 100644 index 00000000..89f593e2 --- /dev/null +++ b/src/services/hal/backends/time/providers/openwrt/impl.lua @@ -0,0 +1,182 @@ +---OpenWrt-specific TimeBackend implementation using ubus hotplug.ntp events. + +-- Service modules +local log = require "services.log" +local time_types = require "services.hal.types.time" + +-- Fibers modules +local op = require "fibers.op" +local exec = require "fibers.io.exec" + +-- Other modules +local cjson = require "cjson.safe" + +---@class OpenWrtTimeBackend : TimeBackend +---@field ntp_monitor_stream Stream? Current ubus listen stream +---@field ntp_monitor_cmd Command? Current ubus listen command +local OpenWrtTimeBackend = {} +OpenWrtTimeBackend.__index = OpenWrtTimeBackend + +---- Private Utilities ---- + +---Recursively convert numeric-looking strings into numbers. +---@param value any +---@return any +local function coerce_numeric_strings(value) + if type(value) == 'string' then + local n = tonumber(value) + if n ~= nil then + return n + end + return value + end + + if type(value) == 'table' then + for k, v in pairs(value) do + value[k] = coerce_numeric_strings(v) + end + return value + end + + return value +end + +---Parse a single ubus listen hotplug.ntp line into a strongly typed NTPEvent. +--- +---Called as a wrap function on read_line_op(), receiving (line, read_err). +---Returns: +--- (NTPEvent, nil) -- success +--- (nil, nil) -- parse error; caller should retry with next line +--- (nil, err_string) -- fatal error (stream closed or read error) +--- +---@param line string? Line from ubus listen (nil on EOF) +---@param read_err any? Read error from the stream (nil on success) +---@return NTPEvent? +---@return string? +local function parse_ntp_event_line(line, read_err) + -- Fatal: stream read error + if read_err ~= nil then + return nil, "read error: " .. tostring(read_err) + end + + -- Fatal: EOF / stream closed + if line == nil or line == "" then + return nil, "stream closed" + end + + local decoded = cjson.decode(line) + if not decoded then + log.warn("OpenWrt Time Backend: failed to decode hotplug.ntp event:", line) + return nil, nil -- non-fatal, retry + end + + decoded = coerce_numeric_strings(decoded) + local ntp_data = decoded["hotplug.ntp"] + if type(ntp_data) ~= 'table' then + log.warn("OpenWrt Time Backend: no hotplug.ntp data in event:", line) + return nil, nil -- non-fatal, retry + end + + if type(ntp_data.stratum) ~= 'number' then + log.warn("OpenWrt Time Backend: stratum field missing or not a number:", line) + return nil, nil -- non-fatal, retry + end + + -- Extract fields with sensible defaults for optional fields + local action = ntp_data.action or "unknown" + local offset = ntp_data.offset or 0 + local freq_drift_ppm = ntp_data.freq_drift_ppm or 0 + + local ntp_event, event_err = time_types.new.NTPEvent( + ntp_data.stratum, + action, + offset, + freq_drift_ppm + ) + if not ntp_event then + log.warn("OpenWrt Time Backend: failed to construct NTPEvent:", event_err) + return nil, nil -- non-fatal, retry + end + + -- Preserve additional backend fields on the event for observability + for k, v in pairs(ntp_data) do + if ntp_event[k] == nil then + ntp_event[k] = v + end + end + + return ntp_event, nil +end + +---- Backend Lifecycle ---- + +---Start monitoring NTP synchronization events via ubus hotplug.ntp. +--- +---@return boolean ok +---@return string error Empty string on success. +function OpenWrtTimeBackend:start_ntp_monitor() + if self.ntp_monitor_cmd then + return false, "NTP monitor already running" + end + + -- Start ubus listen command bound to current scope + self.ntp_monitor_cmd = exec.command{ + 'ubus', 'listen', 'hotplug.ntp', + stdin = 'null', + stdout = 'pipe', + stderr = 'null', + } + local stream, stream_err = self.ntp_monitor_cmd:stdout_stream() + if not stream then + return false, "failed to start ubus listen: " .. tostring(stream_err) + end + + self.ntp_monitor_stream = stream + log.trace("OpenWrt Time Backend: NTP monitor started") + return true, "" +end + +---Get an operation that yields the next NTP event from the hotplug.ntp stream. +--- +---Returns (NTPEvent, nil) on success, (nil, nil) on a parse error (caller should +---retry), or (nil, err_string) on a fatal error (stream closed or read error). +--- +---@return Op +function OpenWrtTimeBackend:ntp_event_op() + return op.guard(function() + if not self.ntp_monitor_stream then + error("NTP monitor not started") + end + return self.ntp_monitor_stream:read_line_op():wrap(parse_ntp_event_line) + end) +end + +---Stop the NTP monitor and clean up resources. +--- +---@return boolean ok +---@return string error +function OpenWrtTimeBackend:stop() + if self.ntp_monitor_cmd then + self.ntp_monitor_cmd:kill() + end + self.ntp_monitor_stream = nil + self.ntp_monitor_cmd = nil + log.trace("OpenWrt Time Backend: NTP monitor stopped") + return true, "" +end + +---- Constructor ---- + +---Create a new OpenWrt time backend. +--- +---@return OpenWrtTimeBackend +local function new() + return setmetatable({ + ntp_monitor_stream = nil, + ntp_monitor_cmd = nil, + }, OpenWrtTimeBackend) +end + +return { + new = new, +} diff --git a/src/services/hal/backends/time/providers/openwrt/init.lua b/src/services/hal/backends/time/providers/openwrt/init.lua new file mode 100644 index 00000000..ec563590 --- /dev/null +++ b/src/services/hal/backends/time/providers/openwrt/init.lua @@ -0,0 +1,48 @@ +local file = require "fibers.io.file" +local exec = require "fibers.io.exec" +local fibers = require "fibers" + +local backend = require "services.hal.backends.time.providers.openwrt.impl" + +local function is_linux() + local fh, open_err = file.open("/proc/version", "r") + if not fh or open_err then + return false + end + + local content, read_err = fh:read_all() + fh:close() + if not content or read_err then + return false + end + + return content:lower():find("linux") ~= nil +end + +--- Returns true if `ubus` is available and the daemon is reachable +---@return boolean ok +local function has_ubus() + local cmd = exec.command{ + "ubus", "list", + stdin = "null", + stdout = "pipe", + stderr = "null" + } + local _, status, code = fibers.perform(cmd:combined_output_op()) + if status == "exited" and code == 0 then + return true + end + return false +end + +--- Returns true if this is a supported OpenWrt system +---@return boolean +local function is_supported() + local res = is_linux() and has_ubus() + return res +end + +return { + is_supported = is_supported, + backend = backend +} diff --git a/src/services/hal/drivers/time.lua b/src/services/hal/drivers/time.lua new file mode 100644 index 00000000..0f63dfaf --- /dev/null +++ b/src/services/hal/drivers/time.lua @@ -0,0 +1,319 @@ +-- HAL modules +local hal_types = require "services.hal.types.core" +local cap_types = require "services.hal.types.capabilities" + +-- Backend modules +local time_backend_provider = require "services.hal.backends.time.provider" + +-- Service modules +local log = require "services.log" + +-- Fibers modules +local fibers = require "fibers" +local op = require "fibers.op" +local channel = require "fibers.channel" +local sleep = require "fibers.sleep" +local exec = require "fibers.io.exec" + +-- Other modules +local uuid = require "uuid" + +---@class TimeDriver +---@field id CapabilityId UUID for this time source capability +---@field cap_emit_ch Channel Capability emit channel (Emit messages) +---@field scope Scope Child scope owning the monitor fiber +---@field backend TimeBackend Time backend for platform-specific NTP monitoring +---@field control_ch Channel RPC control channel (no offerings currently, reserved) +---@field initialised boolean +---@field caps_applied boolean +---@field synced boolean Tracks last known sync state to detect transitions +local TimeDriver = {} +TimeDriver.__index = TimeDriver + +---- Constants ---- + +local DEFAULT_STOP_TIMEOUT = 5 +local CONTROL_Q_LEN = 4 + +---- Internal Utilities ---- + +---Emit a capability state, meta, or event via the cap emit channel. +---@param emit_ch Channel +---@param id CapabilityId +---@param mode EmitMode +---@param key string +---@param data any +---@return boolean ok +---@return string? error +local function emit(emit_ch, id, mode, key, data) + local payload, err = hal_types.new.Emit('time', id, mode, key, data) + if not payload then + return false, err + end + emit_ch:put(payload) + return true +end + +---Convert NTP stratum to an estimated absolute accuracy in seconds. +---Returns nil when unsynced (stratum >= 16) or invalid input. +---@param stratum number +---@return number? accuracy_seconds +local function accuracy_for_stratum(stratum) + if type(stratum) ~= 'number' then + return nil + end + if stratum >= 16 then + return nil + end + + -- Coarse operational heuristic: + -- lower stratum generally implies lower clock error. + if stratum <= 1 then + return 0.001 + elseif stratum <= 4 then + return 0.01 + elseif stratum <= 8 then + return 0.1 + else + return 1.0 + end +end + +---Build a meta payload table for this time source. +---@param accuracy_seconds number? +---@return table +local function build_meta(accuracy_seconds) + return { + provider = 'hal', + source = 'ntp', + version = 1, + accuracy_seconds = accuracy_seconds, + } +end + +---- Monitor Fiber ---- + +---Listen to NTP synchronization events via the time backend and emit state/events via +---the cap emit channel. Runs in self.scope. Exits on stream close, read error, or scope +---cancellation. Transition events (synced/unsynced) are non-retained; the current +---sync state is always published as a retained state emit on every hotplug event. +---@return nil +function TimeDriver:_ntpd_monitor() + fibers.current_scope():finally(function() + log.trace("Time Driver: ntpd monitor exiting") + end) + + -- Start the backend monitor here so the ubus command is bound to this fiber's + -- scope (the driver child scope). Cancelling the driver scope will then kill + -- the underlying process automatically. + local ok, start_err = self.backend:start_ntp_monitor() + if not ok then + log.error("Time Driver: failed to start NTP backend:", start_err) + return + end + + log.trace("Time Driver: ntpd monitor started") + + while true do + local ntp_event, err = fibers.perform(self.backend:ntp_event_op()) + if err ~= nil then + -- Fatal: stream closed or read error + log.warn("Time Driver: NTP event stream closed:", err) + break + end + if ntp_event then + local stratum = ntp_event.stratum + -- NTPEvent constructor guarantees stratum is a number, but guard anyway + if type(stratum) ~= 'number' then + log.warn("Time Driver: received NTP event with invalid stratum:", stratum) + else + local now_synced = stratum ~= 16 + local was_synced = self.synced + local accuracy_seconds = accuracy_for_stratum(stratum) + + -- Always update retained state, even if sync status did not change, + -- so that the latest stratum value is always visible to subscribers. + local emit_ok, emit_err = emit( + self.cap_emit_ch, self.id, 'state', 'synced', + { + synced = now_synced, + stratum = stratum, + accuracy_seconds = accuracy_seconds, + } + ) + if not emit_ok then + log.warn("Time Driver: failed to emit state:", emit_err) + end + + -- Update accuracy metadata only on a sync/unsync transition. + if now_synced ~= was_synced then + local meta_ok, meta_err = emit( + self.cap_emit_ch, self.id, 'meta', 'source', + build_meta(accuracy_seconds) + ) + if not meta_ok then + log.warn("Time Driver: failed to emit meta:", meta_err) + end + end + + -- Emit non-retained transition events. + if now_synced and not was_synced then + log.debug("Time Driver: NTP synced, stratum =", stratum) + local ev_ok, ev_err = emit( + self.cap_emit_ch, self.id, 'event', 'synced', + { + stratum = stratum, + accuracy_seconds = accuracy_seconds, + } + ) + if not ev_ok then + log.warn("Time Driver: failed to emit synced event:", ev_err) + end + elseif not now_synced and was_synced then + log.debug("Time Driver: NTP unsynced, stratum =", stratum) + local ev_ok, ev_err = emit( + self.cap_emit_ch, self.id, 'event', 'unsynced', + { + stratum = stratum, + accuracy_seconds = accuracy_seconds, + } + ) + if not ev_ok then + log.warn("Time Driver: failed to emit unsynced event:", ev_err) + end + end + + self.synced = now_synced + end + end + -- ntp_event == nil and err == nil: parse error, already logged by backend, retry + end +end + +---- Driver Lifecycle ---- + +---Initialise the time driver. Restarts sysntpd and marks the driver as initialised. +---Must be called from inside a fiber. +---@return string error Empty string on success. +function TimeDriver:init() + log.trace("Time Driver: initialising, restarting sysntpd") + + local status, code, _, err = fibers.perform( + exec.command("/etc/init.d/sysntpd", "restart"):run_op() + ) + if status ~= 'exited' or code ~= 0 then + return "sysntpd restart failed: " .. tostring(err or ("exit code " .. tostring(code))) + end + + self.initialised = true + log.trace("Time Driver: sysntpd restarted successfully") + return "" +end + +---Connect the driver to the capability emit channel and return the capability list. +---Must be called after init() and before start(). +---@param cap_emit_ch Channel +---@return Capability[]? capabilities +---@return string error Empty string on success. +function TimeDriver:capabilities(cap_emit_ch) + if not self.initialised then + return nil, "driver not initialised" + end + + self.cap_emit_ch = cap_emit_ch + + local cap, cap_err = cap_types.new.TimeCapability(self.id, self.control_ch) + if not cap then + return nil, "failed to create time capability: " .. tostring(cap_err) + end + + self.caps_applied = true + return { cap }, "" +end + +---Start the time driver. Emits initial meta and state, then spawns the NTP monitor +---fiber. Must be called after capabilities(). +---@return boolean ok +---@return string? error +function TimeDriver:start() + if not self.initialised then + return false, "driver not initialised" + end + if not self.caps_applied then + return false, "capabilities not applied" + end + + -- Publish initial meta (accuracy unknown until first NTP update). + local meta_ok, meta_err = emit( + self.cap_emit_ch, self.id, 'meta', 'source', + build_meta(nil) + ) + if not meta_ok then + log.warn("Time Driver: failed to emit initial meta:", meta_err) + end + + -- Publish initial retained state: not yet synced, stratum unknown. + local state_ok, state_err = emit( + self.cap_emit_ch, self.id, 'state', 'synced', + { synced = false, stratum = nil } + ) + if not state_ok then + log.warn("Time Driver: failed to emit initial state:", state_err) + end + + self.scope:spawn(function() self:_ntpd_monitor() end) + + log.trace("Time Driver: started") + return true, nil +end + +---Stop the time driver. Cancels the driver scope, terminating the NTP monitor fiber +---and any running ubus listen process. +---@param timeout number? Timeout in seconds. Defaults to 5. +---@return boolean ok +---@return string? error +function TimeDriver:stop(timeout) + timeout = timeout or DEFAULT_STOP_TIMEOUT + self.scope:cancel() + + local source = fibers.perform(op.named_choice { + join = self.scope:join_op(), + timeout = sleep.sleep_op(timeout), + }) + + if source == 'timeout' then + return false, "time driver stop timeout" + end + return true, nil +end + +---- Constructor ---- + +---Create a new TimeDriver instance. Generates a UUID for the capability id, +---creates a child scope, and instantiates the platform-specific time backend. +---Must be called from inside a fiber. +---@return TimeDriver? driver +---@return string error Empty string on success. +local function new() + local scope, sc_err = fibers.current_scope():child() + if not scope then + return nil, "failed to create child scope: " .. tostring(sc_err) + end + + local backend = time_backend_provider.new() + + return setmetatable({ + id = uuid.new(), + cap_emit_ch = nil, + scope = scope, + backend = backend, + control_ch = channel.new(CONTROL_Q_LEN), + initialised = false, + caps_applied = false, + synced = false, + }, TimeDriver), "" +end + +return { + new = new, +} diff --git a/src/services/hal/managers/time.lua b/src/services/hal/managers/time.lua new file mode 100644 index 00000000..c96a77e0 --- /dev/null +++ b/src/services/hal/managers/time.lua @@ -0,0 +1,260 @@ +-- HAL modules +local time_driver = require "services.hal.drivers.time" +local hal_types = require "services.hal.types.core" + +-- Fibers modules +local fibers = require "fibers" +local op = require "fibers.op" +local sleep = require "fibers.sleep" +local exec = require "fibers.io.exec" + +-- Other modules +local log = require "services.log" + +-- Constants + +local STOP_TIMEOUT = 5.0 -- seconds +local HOTPLUG_DIR = "/etc/hotplug.d/ntp" +local HOTPLUG_SCRIPT_NAME = "ntp" + +---@alias TimeDriverHandle table + +---@class TimeManager +---@field scope Scope +---@field started boolean +---@field driver TimeDriverHandle? +---@field dev_ev_ch Channel? +---@field cap_emit_ch Channel? +local TimeManager = { + started = false, + driver = nil, + dev_ev_ch = nil, + cap_emit_ch = nil, +} + +---- Internal Utilities ---- + +---Emit a HAL device-added event for the time capability provider. +---@param driver TimeDriverHandle +---@param capabilities Capability[] +local function emit_device_added(driver, capabilities) + local device_event, ev_err = hal_types.new.DeviceEvent( + "added", + "time", + driver.id, + { source = "ntp" }, + capabilities + ) + if not device_event then + log.error("Time Manager: failed to create device-added event:", ev_err) + return + end + TimeManager.dev_ev_ch:put(device_event) +end + +---Emit a HAL device-removed event for the time capability provider. +---@param driver TimeDriverHandle +local function emit_device_removed(driver) + local device_event, ev_err = hal_types.new.DeviceEvent( + "removed", + "time", + driver.id, + {} + ) + if not device_event then + log.error("Time Manager: failed to create device-removed event:", ev_err) + return + end + TimeManager.dev_ev_ch:put(device_event) +end + +---Stop the currently running driver (if any) and notify HAL that the device was +---removed. Safe to call when no driver is running. +---@return nil +local function stop_existing_driver() + local prev = TimeManager.driver + if not prev then return end + + log.trace("Time Manager: stopping existing driver") + local ok, stop_err = prev:stop(STOP_TIMEOUT) + if not ok then + log.warn("Time Manager: failed to stop previous driver:", stop_err) + end + + emit_device_removed(prev) + TimeManager.driver = nil +end + +---Run a command and require a zero exit status. +---@param ... string argv +---@return boolean ok +---@return string? error +local function run_checked(...) + local status, code, _, err = fibers.perform(exec.command(...):run_op()) + if status ~= 'exited' or code ~= 0 then + return false, tostring(err or ("exit code " .. tostring(code))) + end + return true, nil +end + +---Resolve the directory that contains this manager file. +---@return string dir +local function manager_dir() + local source = debug.getinfo(1, 'S').source or '' + source = source:gsub('^@', '') + return source:match('^(.*)/[^/]+$') or '.' +end + +---Install the NTP hotplug script into /etc/hotplug.d/ntp. +---@return boolean ok +---@return string? error +local function install_ntp_hotplug_script() + local src = manager_dir() .. "/time/" .. HOTPLUG_SCRIPT_NAME + local dst = HOTPLUG_DIR .. "/" .. HOTPLUG_SCRIPT_NAME + + local ok, err = run_checked("mkdir", "-p", HOTPLUG_DIR) + if not ok then + return false, "failed to create hotplug directory: " .. tostring(err) + end + + ok, err = run_checked("cp", src, dst) + if not ok then + return false, "failed to copy hotplug script from " .. src .. ": " .. tostring(err) + end + + ok, err = run_checked("chmod", "+x", dst) + if not ok then + return false, "failed to chmod hotplug script: " .. tostring(err) + end + + return true, nil +end + +---Initialise, apply capabilities, and start a new TimeDriver. Stops any previously +---running driver first. Called from within a manager-scope fiber so exec and channel +---operations are safe. +---@return nil +local function bring_up_driver() + stop_existing_driver() + + local installed, install_err = install_ntp_hotplug_script() + if not installed then + log.error("Time Manager: failed to install NTP hotplug script:", install_err) + return + end + + local driver, new_err = time_driver.new() + if not driver then + log.error("Time Manager: failed to create driver:", new_err) + return + end + + local init_err = driver:init() + if init_err ~= "" then + log.error("Time Manager: failed to init driver:", init_err) + return + end + + local capabilities, cap_err = driver:capabilities(TimeManager.cap_emit_ch) + if not capabilities then + log.error("Time Manager: failed to apply capabilities:", cap_err) + return + end + + local ok, start_err = driver:start() + if not ok then + log.error("Time Manager: failed to start driver:", start_err) + return + end + + TimeManager.driver = driver + emit_device_added(driver, capabilities) + log.trace("Time Manager: driver started successfully, capability id =", driver.id) +end + +---- Manager Lifecycle ---- + +---Start the Time Manager. Creates a child scope for managing the driver lifetime. +---@param dev_ev_ch Channel Device event channel (DeviceEvent messages to HAL) +---@param cap_emit_ch Channel Capability emit channel (Emit messages to HAL) +---@return string error Empty string on success. +function TimeManager.start(dev_ev_ch, cap_emit_ch) + if TimeManager.started then + return "already started" + end + + local scope, sc_err = fibers.current_scope():child() + if not scope then + return "failed to create child scope: " .. tostring(sc_err) + end + + TimeManager.scope = scope + TimeManager.dev_ev_ch = dev_ev_ch + TimeManager.cap_emit_ch = cap_emit_ch + + scope:finally(function() + local st, primary = scope:status() + if st == 'failed' then + log.error(("Time Manager: error - %s"):format(tostring(primary))) + end + log.trace("Time Manager: stopped") + end) + + TimeManager.started = true + log.trace("Time Manager: started") + return "" +end + +---Stop the Time Manager and its driver. Cancels the manager scope which will +---propagate cancellation to any running driver scope. +---@param timeout number? Timeout in seconds. Defaults to 5. +---@return boolean ok +---@return string error +function TimeManager.stop(timeout) + if not TimeManager.started then + return false, "not started" + end + + timeout = timeout or STOP_TIMEOUT + TimeManager.scope:cancel() + + local source = fibers.perform(op.named_choice { + join = TimeManager.scope:join_op(), + timeout = sleep.sleep_op(timeout), + }) + + if source == 'timeout' then + return false, "time manager stop timeout" + end + + TimeManager.started = false + return true, "" +end + +---Apply time manager configuration. Spawns a fiber to create and start the time +---driver. The time driver requires no user-supplied configuration (sysntpd path is +---fixed), so the config table is accepted for interface consistency but ignored. +---@param config table +---@return boolean ok +---@return string error +function TimeManager.apply_config(config) -- luacheck: ignore config + if not TimeManager.started then + return false, "time manager not started" + end + if TimeManager.dev_ev_ch == nil or TimeManager.cap_emit_ch == nil then + return false, "channels not initialized (start must be called first)" + end + + log.trace("Time manager: received config") + + local ok, spawn_err = TimeManager.scope:spawn(function() + bring_up_driver() + end) + if not ok then + return false, "failed to spawn driver initialization: " .. tostring(spawn_err) + end + + return true, "" +end + +return TimeManager diff --git a/src/services/hal/managers/time/ntp b/src/services/hal/managers/time/ntp new file mode 100644 index 00000000..972f5136 --- /dev/null +++ b/src/services/hal/managers/time/ntp @@ -0,0 +1,16 @@ +#!/bin/sh + +# Define the UBUS event name +EVENT_NAME="hotplug.ntp" + +# Create JSON payload for UBUS event +JSON_PAYLOAD="{ + \"action\": \"$ACTION\", + \"freq_drift_ppm\": \"$freq_drift_ppm\", + \"offset\": \"$offset\", + \"stratum\": \"$stratum\", + \"poll_interval\": \"$poll_interval\" +}" + +# Send event to UBUS +ubus send "$EVENT_NAME" "$JSON_PAYLOAD" diff --git a/src/services/hal/types/capabilities.lua b/src/services/hal/types/capabilities.lua index 12c51c50..7139c1f2 100644 --- a/src/services/hal/types/capabilities.lua +++ b/src/services/hal/types/capabilities.lua @@ -102,42 +102,38 @@ function new.ModemCapability(id, control_ch) return new.Capability('modem', id, control_ch, offerings) end ----@param class CapabilityClass ---@param id CapabilityId ---@param control_ch Channel ---@return Capability? ---@return string error -function new.GeoCapability(class, id, control_ch) +function new.GeoCapability(id, control_ch) local offerings = {} - return new.Capability(class, id, control_ch, offerings) + return new.Capability('geo', id, control_ch, offerings) end ----@param class CapabilityClass ---@param id CapabilityId ---@param control_ch Channel ---@return Capability? ---@return string error -function new.TimeCapability(class, id, control_ch) +function new.TimeCapability(id, control_ch) local offerings = {} - return new.Capability(class, id, control_ch, offerings) + return new.Capability('time', id, control_ch, offerings) end ----@param class CapabilityClass ---@param id CapabilityId ---@param control_ch Channel ---@return Capability? ---@return string error -function new.NetworkCapability(class, id, control_ch) +function new.NetworkCapability(id, control_ch) local offerings = {} - return new.Capability(class, id, control_ch, offerings) + return new.Capability('network', id, control_ch, offerings) end ----@param class CapabilityClass ---@param id CapabilityId ---@param control_ch Channel ---@return Capability? ---@return string error -function new.WirelessCapability(class, id, control_ch) +function new.WirelessCapability(id, control_ch) local offerings = { 'set_channels', 'set_country', @@ -149,15 +145,14 @@ function new.WirelessCapability(class, id, control_ch) 'clear_radio_config', 'apply' } - return new.Capability(class, id, control_ch, offerings) + return new.Capability('wireless', id, control_ch, offerings) end ----@param class CapabilityClass ---@param id CapabilityId ---@param control_ch Channel ---@return Capability? ---@return string error -function new.BandCapability(class, id, control_ch) +function new.BandCapability(id, control_ch) local offerings = { 'set_log_level', 'set_kicking', @@ -174,19 +169,18 @@ function new.BandCapability(class, id, control_ch) 'set_networking', 'apply' } - return new.Capability(class, id, control_ch, offerings) + return new.Capability('band', id, control_ch, offerings) end ----@param class CapabilityClass ---@param id CapabilityId ---@param control_ch Channel ---@return Capability? ---@return string error -function new.SerialCapability(class, id, control_ch) +function new.SerialCapability(id, control_ch) local offerings = { 'open', 'close', 'write' } - return new.Capability(class, id, control_ch, offerings) + return new.Capability('serial', id, control_ch, offerings) end ---@param id CapabilityId diff --git a/src/services/hal/types/time.lua b/src/services/hal/types/time.lua new file mode 100644 index 00000000..a83ae329 --- /dev/null +++ b/src/services/hal/types/time.lua @@ -0,0 +1,57 @@ +---@alias NTPAction string + +---@class NTPEvent +---@field stratum number NTP stratum level (0-15: synced, 16: unsynced) +---@field action NTPAction Event action (e.g., "step", "clock", "leap") +---@field offset number Clock offset in seconds +---@field freq_drift_ppm number Frequency drift in parts per million +---@field [string] any Additional fields from backend +local NTPEvent = {} +NTPEvent.__index = NTPEvent + +---@class TimeTypeConstructors +local new = {} + +---Create a new NTPEvent. +--- +---@param stratum number NTP stratum (0-15 synced, 16 unsynced) +---@param action string Event action +---@param offset number Clock offset in seconds +---@param freq_drift_ppm number Frequency drift in ppm +---@return NTPEvent? +---@return string error +function new.NTPEvent(stratum, action, offset, freq_drift_ppm) + if type(stratum) ~= 'number' then + return nil, "invalid stratum" + end + + if type(action) ~= 'string' or action == '' then + return nil, "invalid action" + end + + if type(offset) ~= 'number' then + return nil, "invalid offset" + end + + if type(freq_drift_ppm) ~= 'number' then + return nil, "invalid freq_drift_ppm" + end + + local event = setmetatable({ + stratum = stratum, + action = action, + offset = offset, + freq_drift_ppm = freq_drift_ppm, + }, NTPEvent) + return event, "" +end + +---@class TimeBackend +---@field start_ntp_monitor fun(self: TimeBackend): boolean, string +---@field ntp_event_op fun(self: TimeBackend): Op +---@field stop fun(self: TimeBackend): boolean, string + +return { + NTPEvent = NTPEvent, + new = new, +} diff --git a/src/services/time.lua b/src/services/time.lua new file mode 100644 index 00000000..7f80e0f2 --- /dev/null +++ b/src/services/time.lua @@ -0,0 +1,210 @@ +-- services/time.lua +-- +-- Time service: +-- - discovers the first HAL time capability via {'cap','time','+','meta','source'} +-- - consumes retained state + sync/unsync events from that capability +-- - publishes retained {'svc','time','synced'} for system-wide time trust state +-- - nudges fibers alarm wall-clock handling when sync transitions occur + +local fibers = require "fibers" +local op = require "fibers.op" +local alarm = require "fibers.alarm" +local time_utils = require "fibers.utils.time" + +local log = require "services.log" + +local perform = fibers.perform +local now = fibers.now + +local M = {} + +---@return table +local function t(...) + return { ... } +end + +---@param conn Connection +---@param name string +---@param state string +---@param extra table? +---@return nil +local function publish_status(conn, name, state, extra) + local payload = { state = state, ts = now() } + if type(extra) == 'table' then + for k, v in pairs(extra) do payload[k] = v end + end + log.trace(("TIME: service status -> %s"):format(tostring(state))) + conn:retain(t('svc', name, 'status'), payload) +end + +---@param conn Connection +---@param synced boolean +---@return nil +local function publish_synced(conn, synced) + conn:retain(t('svc', 'time', 'synced'), synced) +end + +---@param conn Connection +---@param event_name 'synced'|'unsynced' +---@param payload table? +---@return nil +local function publish_transition_event(conn, event_name, payload) + conn:publish(t('svc', 'time', 'event', event_name), payload or {}) +end + +---@param payload any +---@return boolean? synced +local function synced_from_state_payload(payload) + if type(payload) ~= 'table' then return nil end + if type(payload.synced) ~= 'boolean' then return nil end + return payload.synced +end + +---@param state table +---@param conn Connection +---@param is_synced boolean +---@param payload table? +---@return nil +local function apply_sync_state(state, conn, is_synced, payload) + if state.current_synced ~= is_synced then + log.info(( + "TIME: sync state transition %s -> %s" + ):format(tostring(state.current_synced), tostring(is_synced))) + publish_synced(conn, is_synced) + if is_synced then + publish_transition_event(conn, 'synced', payload) + else + publish_transition_event(conn, 'unsynced', payload) + end + state.current_synced = is_synced + end + + -- New fibers core has set_time_source/time_changed instead of + -- install_alarm_handler/clock_synced/clock_desynced. + if is_synced then + if not state.time_source_installed then + log.info("TIME: installing alarm time source from realtime clock") + local ok, err = pcall(alarm.set_time_source, time_utils.realtime) + if ok then + state.time_source_installed = true + log.info("TIME: alarm time source installed") + else + log.warn("TIME: failed to set alarm time source:", tostring(err)) + end + else + alarm.time_changed() + end + end +end + +---@param conn Connection +---@param cap_id CapabilityId +---@return nil +local function monitor_capability(conn, cap_id) + log.info(("TIME: starting monitor for capability id=%s"):format(tostring(cap_id))) + local sub_state = conn:subscribe(t('cap', 'time', cap_id, 'state', 'synced'), { + queue_len = 10, + full = 'drop_oldest', + }) + local sub_synced = conn:subscribe(t('cap', 'time', cap_id, 'event', 'synced'), { + queue_len = 20, + full = 'drop_oldest', + }) + local sub_unsynced = conn:subscribe(t('cap', 'time', cap_id, 'event', 'unsynced'), { + queue_len = 20, + full = 'drop_oldest', + }) + + local state = { + ---@type boolean? + current_synced = nil, + ---@type boolean + time_source_installed = false, + } + + -- Read retained initial state once, then rely on events for transitions. + do + local msg, err = perform(sub_state:recv_op()) + if msg then + log.info("TIME: received initial retained sync state") + local is_synced = synced_from_state_payload(msg.payload) + if is_synced ~= nil then + apply_sync_state(state, conn, is_synced, msg.payload) + else + log.warn("TIME: initial retained state payload missing boolean synced field") + end + else + log.warn("TIME: failed to read initial state:", err) + end + sub_state:unsubscribe() + end + + while true do + local which, msg, err = perform(op.named_choice({ + synced = sub_synced:recv_op(), + unsynced = sub_unsynced:recv_op(), + })) + + if not msg then + log.warn("TIME: capability monitor subscription closed:", err) + return + end + + if which == 'synced' then + apply_sync_state(state, conn, true, msg.payload) + elseif which == 'unsynced' then + apply_sync_state(state, conn, false, msg.payload) + else + log.warn("TIME: unknown event source in monitor loop:", tostring(which)) + end + end +end + +---@param conn Connection +---@param opts table? +---@return nil +function M.start(conn, opts) + opts = opts or {} + local name = opts.name or 'time' + log.trace("TIME: starting") + + publish_status(conn, name, 'starting') + + fibers.current_scope():finally(function() + local st, primary = fibers.current_scope():status() + if st == 'failed' then + log.error(("TIME: scope failed - %s"):format(tostring(primary))) + end + publish_status(conn, name, 'stopped', { reason = primary or st }) + end) + + local sub_meta = conn:subscribe(t('cap', 'time', '+', 'meta', 'source'), { + queue_len = 10, + full = 'drop_oldest', + }) + log.trace("TIME: subscribed to time capability meta announcements") + + publish_status(conn, name, 'running') + + while true do + local msg, err = perform(sub_meta:recv_op()) + if not msg then + sub_meta:unsubscribe() + log.warn("TIME: capability discovery subscription closed:", err) + return + end + + local topic = msg.topic + local cap_id = topic and topic[3] + if cap_id ~= nil then + sub_meta:unsubscribe() + log.trace("TIME: selected first time capability:", tostring(cap_id)) + monitor_capability(conn, cap_id) + return + else + log.warn("TIME: capability meta message missing capability id token") + end + end +end + +return M