From 250512bbbabb4114d1d98670676330f757d6a01c Mon Sep 17 00:00:00 2001 From: Ryan Name Date: Fri, 13 Mar 2026 18:14:53 +0000 Subject: [PATCH 01/16] Add TimeDriver and TimeManager implementations for NTP synchronization --- src/services/hal/drivers/time.lua | 288 +++++++++++++++++++++++++++++ src/services/hal/managers/time.lua | 204 ++++++++++++++++++++ 2 files changed, 492 insertions(+) create mode 100644 src/services/hal/drivers/time.lua create mode 100644 src/services/hal/managers/time.lua diff --git a/src/services/hal/drivers/time.lua b/src/services/hal/drivers/time.lua new file mode 100644 index 00000000..63f11d53 --- /dev/null +++ b/src/services/hal/drivers/time.lua @@ -0,0 +1,288 @@ +-- HAL modules +local hal_types = require "services.hal.types.core" +local cap_types = require "services.hal.types.capabilities" + +-- Backend modules +local ubus_backend = require "services.hal.backends.ubus" + +-- Service modules +local log = require "services.log" + +-- Fibers modules +local fibers = require "fibers" +local op = require "fibers.op" +local channel = require "fibers.channel" +local sleep = require "fibers.sleep" +local exec = require "fibers.io.exec" + +-- Other modules +local cjson = require "cjson.safe" +local uuid = require "uuid" + +---@class TimeDriver +---@field id CapabilityId UUID for this time source capability +---@field cap_emit_ch Channel Capability emit channel (Emit messages) +---@field scope Scope Child scope owning the monitor fiber +---@field control_ch Channel RPC control channel (no offerings currently, reserved) +---@field initialised boolean +---@field caps_applied boolean +---@field synced boolean Tracks last known sync state to detect transitions +local TimeDriver = {} +TimeDriver.__index = TimeDriver + +---- Constants ---- + +local DEFAULT_STOP_TIMEOUT = 5 +local CONTROL_Q_LEN = 4 + +---- Internal Utilities ---- + +---Emit a capability state, meta, or event via the cap emit channel. +---@param emit_ch Channel +---@param id CapabilityId +---@param mode EmitMode +---@param key string +---@param data any +---@return boolean ok +---@return string? error +local function emit(emit_ch, id, mode, key, data) + local payload, err = hal_types.new.Emit('time', id, mode, key, data) + if not payload then + return false, err + end + emit_ch:put(payload) + return true +end + +---Build a meta payload table for this time source. +---@param accuracy 'low'|'high' +---@return table +local function build_meta(accuracy) + return { + provider = 'hal', + source = 'ntp', + version = 1, + accuracy = accuracy, + } +end + +---- Monitor Fiber ---- + +---Listen to `ubus listen hotplug.ntp` output and emit state/events via the cap +---emit channel. Runs in self.scope. Exits on stream close, read error, or scope +---cancellation. Transition events (synced/unsynced) are non-retained; the current +---sync state is always published as a retained state emit on every hotplug event. +---@return nil +function TimeDriver:_ntpd_monitor() + fibers.current_scope():finally(function() + log.trace("Time Driver: ntpd monitor exiting") + end) + + log.trace("Time Driver: ntpd monitor started") + + -- exec.command is bound to the current scope (self.scope) automatically. + -- When self.scope is cancelled the process is terminated, causing read_line to + -- return nil/error and the loop below exits cleanly. + local listen_cmd = ubus_backend.listen('hotplug.ntp') + local stdout, stream_err = listen_cmd:stdout_stream() + if not stdout then + error("Time Driver: failed to start ubus listen: " .. tostring(stream_err)) + end + + while true do + local line, read_err = stdout:read_line() + if read_err then + log.error("Time Driver: ubus listen read error:", read_err) + break + end + if line == nil then + log.warn("Time Driver: ubus listen stream closed unexpectedly") + break + end + + -- ubus listen output is one JSON object per line: + -- { "hotplug.ntp": { "stratum": } } + local decoded = cjson.decode(line) + if not decoded then + log.warn("Time Driver: failed to decode hotplug.ntp event:", line) + else + local ntp_data = decoded["hotplug.ntp"] + if type(ntp_data) == 'table' and type(ntp_data.stratum) == 'number' then + local stratum = ntp_data.stratum + local now_synced = stratum ~= 16 + local was_synced = self.synced + + -- Always update retained state, even if sync status did not change, + -- so that the latest stratum value is always visible to subscribers. + local ok, emit_err = emit( + self.cap_emit_ch, self.id, 'state', 'synced', + { synced = now_synced, stratum = stratum } + ) + if not ok then + log.warn("Time Driver: failed to emit state:", emit_err) + end + + -- Update accuracy metadata only on a sync/unsync transition. + if now_synced ~= was_synced then + local accuracy = now_synced and 'high' or 'low' + local meta_ok, meta_err = emit( + self.cap_emit_ch, self.id, 'meta', 'source', + build_meta(accuracy) + ) + if not meta_ok then + log.warn("Time Driver: failed to emit meta:", meta_err) + end + end + + -- Emit non-retained transition events. + if now_synced and not was_synced then + log.debug("Time Driver: NTP synced, stratum =", stratum) + local ev_ok, ev_err = emit( + self.cap_emit_ch, self.id, 'event', 'synced', + { stratum = stratum } + ) + if not ev_ok then + log.warn("Time Driver: failed to emit synced event:", ev_err) + end + elseif not now_synced and was_synced then + log.debug("Time Driver: NTP unsynced, stratum =", stratum) + local ev_ok, ev_err = emit( + self.cap_emit_ch, self.id, 'event', 'unsynced', + { stratum = stratum } + ) + if not ev_ok then + log.warn("Time Driver: failed to emit unsynced event:", ev_err) + end + end + + self.synced = now_synced + else + log.warn("Time Driver: received unexpected hotplug.ntp payload:", line) + end + end + end +end + +---- Driver Lifecycle ---- + +---Initialise the time driver. Restarts sysntpd and marks the driver as initialised. +---Must be called from inside a fiber. +---@return string error Empty string on success. +function TimeDriver:init() + log.trace("Time Driver: initialising, restarting sysntpd") + + local status, code, _, err = fibers.perform( + exec.command("/etc/init.d/sysntpd", "restart"):run_op() + ) + if status ~= 'exited' or code ~= 0 then + return "sysntpd restart failed: " .. tostring(err or ("exit code " .. tostring(code))) + end + + self.initialised = true + log.trace("Time Driver: sysntpd restarted successfully") + return "" +end + +---Connect the driver to the capability emit channel and return the capability list. +---Must be called after init() and before start(). +---@param cap_emit_ch Channel +---@return Capability[]? capabilities +---@return string error Empty string on success. +function TimeDriver:capabilities(cap_emit_ch) + if not self.initialised then + return nil, "driver not initialised" + end + + self.cap_emit_ch = cap_emit_ch + + local cap, cap_err = cap_types.new.TimeCapability(self.id, self.control_ch) + if not cap then + return nil, "failed to create time capability: " .. tostring(cap_err) + end + + self.caps_applied = true + return { cap }, "" +end + +---Start the time driver. Emits initial meta and state, then spawns the NTP monitor +---fiber. Must be called after capabilities(). +---@return boolean ok +---@return string? error +function TimeDriver:start() + if not self.initialised then + return false, "driver not initialised" + end + if not self.caps_applied then + return false, "capabilities not applied" + end + + -- Publish initial meta (accuracy is low; sysntpd has only just restarted). + local meta_ok, meta_err = emit( + self.cap_emit_ch, self.id, 'meta', 'source', + build_meta('low') + ) + if not meta_ok then + log.warn("Time Driver: failed to emit initial meta:", meta_err) + end + + -- Publish initial retained state: not yet synced, stratum unknown. + local state_ok, state_err = emit( + self.cap_emit_ch, self.id, 'state', 'synced', + { synced = false, stratum = nil } + ) + if not state_ok then + log.warn("Time Driver: failed to emit initial state:", state_err) + end + + self.scope:spawn(function() self:_ntpd_monitor() end) + + log.trace("Time Driver: started") + return true, nil +end + +---Stop the time driver. Cancels the driver scope, terminating the NTP monitor fiber +---and any running ubus listen process. +---@param timeout number? Timeout in seconds. Defaults to 5. +---@return boolean ok +---@return string? error +function TimeDriver:stop(timeout) + timeout = timeout or DEFAULT_STOP_TIMEOUT + self.scope:cancel() + + local source = fibers.perform(op.named_choice { + join = self.scope:join_op(), + timeout = sleep.sleep_op(timeout), + }) + + if source == 'timeout' then + return false, "time driver stop timeout" + end + return true, nil +end + +---- Constructor ---- + +---Create a new TimeDriver instance. Generates a UUID for the capability id and +---creates a child scope. Must be called from inside a fiber. +---@return TimeDriver? driver +---@return string error Empty string on success. +local function new() + local scope, sc_err = fibers.current_scope():child() + if not scope then + return nil, "failed to create child scope: " .. tostring(sc_err) + end + + return setmetatable({ + id = uuid.new(), + cap_emit_ch = nil, + scope = scope, + control_ch = channel.new(CONTROL_Q_LEN), + initialised = false, + caps_applied = false, + synced = false, + }, TimeDriver), "" +end + +return { + new = new, +} diff --git a/src/services/hal/managers/time.lua b/src/services/hal/managers/time.lua new file mode 100644 index 00000000..efc2475d --- /dev/null +++ b/src/services/hal/managers/time.lua @@ -0,0 +1,204 @@ +-- HAL modules +local time_driver = require "services.hal.drivers.time" +local hal_types = require "services.hal.types.core" + +-- Fibers modules +local fibers = require "fibers" +local op = require "fibers.op" +local sleep = require "fibers.sleep" + +-- Other modules +local log = require "services.log" + +-- Constants + +local STOP_TIMEOUT = 5.0 -- seconds + +---@alias TimeDriverHandle table + +---@class TimeManager +---@field scope Scope +---@field started boolean +---@field driver TimeDriverHandle? +---@field dev_ev_ch Channel +---@field cap_emit_ch Channel +local TimeManager = { + started = false, + driver = nil, + dev_ev_ch = nil, + cap_emit_ch = nil, +} + +---- Internal Utilities ---- + +---Emit a HAL device-added event for the time capability provider. +---@param driver TimeDriverHandle +---@param capabilities Capability[] +local function emit_device_added(driver, capabilities) + local device_event, ev_err = hal_types.new.DeviceEvent( + "added", + "time", + driver.id, + { source = "ntp" }, + capabilities + ) + if not device_event then + log.error("Time Manager: failed to create device-added event:", ev_err) + return + end + TimeManager.dev_ev_ch:put(device_event) +end + +---Emit a HAL device-removed event for the time capability provider. +---@param driver TimeDriverHandle +local function emit_device_removed(driver) + local device_event, ev_err = hal_types.new.DeviceEvent( + "removed", + "time", + driver.id, + {} + ) + if not device_event then + log.error("Time Manager: failed to create device-removed event:", ev_err) + return + end + TimeManager.dev_ev_ch:put(device_event) +end + +---Stop the currently running driver (if any) and notify HAL that the device was +---removed. Safe to call when no driver is running. +---@return nil +local function stop_existing_driver() + local prev = TimeManager.driver + if not prev then return end + + log.trace("Time Manager: stopping existing driver") + local ok, stop_err = prev:stop(STOP_TIMEOUT) + if not ok then + log.warn("Time Manager: failed to stop previous driver:", stop_err) + end + + emit_device_removed(prev) + TimeManager.driver = nil +end + +---Initialise, apply capabilities, and start a new TimeDriver. Stops any previously +---running driver first. Called from within a manager-scope fiber so exec and channel +---operations are safe. +---@return nil +local function bring_up_driver() + stop_existing_driver() + + local driver, new_err = time_driver.new() + if not driver then + log.error("Time Manager: failed to create driver:", new_err) + return + end + + local init_err = driver:init() + if init_err ~= "" then + log.error("Time Manager: failed to init driver:", init_err) + return + end + + local capabilities, cap_err = driver:capabilities(TimeManager.cap_emit_ch) + if not capabilities then + log.error("Time Manager: failed to apply capabilities:", cap_err) + return + end + + local ok, start_err = driver:start() + if not ok then + log.error("Time Manager: failed to start driver:", start_err) + return + end + + TimeManager.driver = driver + emit_device_added(driver, capabilities) + log.trace("Time Manager: driver started successfully, capability id =", driver.id) +end + +---- Manager Lifecycle ---- + +---Start the Time Manager. Creates a child scope for managing the driver lifetime. +---@param dev_ev_ch Channel Device event channel (DeviceEvent messages to HAL) +---@param cap_emit_ch Channel Capability emit channel (Emit messages to HAL) +---@return string error Empty string on success. +function TimeManager.start(dev_ev_ch, cap_emit_ch) + if TimeManager.started then + return "already started" + end + + local scope, sc_err = fibers.current_scope():child() + if not scope then + return "failed to create child scope: " .. tostring(sc_err) + end + + TimeManager.scope = scope + TimeManager.dev_ev_ch = dev_ev_ch + TimeManager.cap_emit_ch = cap_emit_ch + + scope:finally(function() + local st, primary = scope:status() + if st == 'failed' then + log.error(("Time Manager: error - %s"):format(tostring(primary))) + end + log.trace("Time Manager: stopped") + end) + + TimeManager.started = true + log.trace("Time Manager: started") + return "" +end + +---Stop the Time Manager and its driver. Cancels the manager scope which will +---propagate cancellation to any running driver scope. +---@param timeout number? Timeout in seconds. Defaults to 5. +---@return boolean ok +---@return string error +function TimeManager.stop(timeout) + if not TimeManager.started then + return false, "not started" + end + + timeout = timeout or STOP_TIMEOUT + TimeManager.scope:cancel() + + local source = fibers.perform(op.named_choice { + join = TimeManager.scope:join_op(), + timeout = sleep.sleep_op(timeout), + }) + + if source == 'timeout' then + return false, "time manager stop timeout" + end + + TimeManager.started = false + return true, "" +end + +---Apply time manager configuration. Spawns a fiber to create and start the time +---driver. The time driver requires no user-supplied configuration (sysntpd path is +---fixed), so the config table is accepted for interface consistency but ignored. +---@param config table +---@return boolean ok +---@return string error +function TimeManager.apply_config(config) -- luacheck: ignore config + if not TimeManager.started then + return false, "time manager not started" + end + if TimeManager.dev_ev_ch == nil or TimeManager.cap_emit_ch == nil then + return false, "channels not initialized (start must be called first)" + end + + local ok, spawn_err = TimeManager.scope:spawn(function() + bring_up_driver() + end) + if not ok then + return false, "failed to spawn driver initialization: " .. tostring(spawn_err) + end + + return true, "" +end + +return TimeManager From c8ac8674c2aaf357a007abc67fe54a1b8933db5f Mon Sep 17 00:00:00 2001 From: Ryan Name Date: Fri, 13 Mar 2026 18:15:28 +0000 Subject: [PATCH 02/16] Add accuracy estimation for NTP stratum in TimeDriver --- src/services/hal/drivers/time.lua | 55 +++++++++++++++++++++++++------ 1 file changed, 45 insertions(+), 10 deletions(-) diff --git a/src/services/hal/drivers/time.lua b/src/services/hal/drivers/time.lua index 63f11d53..78182899 100644 --- a/src/services/hal/drivers/time.lua +++ b/src/services/hal/drivers/time.lua @@ -54,15 +54,40 @@ local function emit(emit_ch, id, mode, key, data) return true end +---Convert NTP stratum to an estimated absolute accuracy in seconds. +---Returns nil when unsynced (stratum >= 16) or invalid input. +---@param stratum number +---@return number? accuracy_seconds +local function accuracy_for_stratum(stratum) + if type(stratum) ~= 'number' then + return nil + end + if stratum >= 16 then + return nil + end + + -- Coarse operational heuristic: + -- lower stratum generally implies lower clock error. + if stratum <= 1 then + return 0.001 + elseif stratum <= 4 then + return 0.01 + elseif stratum <= 8 then + return 0.1 + else + return 1.0 + end +end + ---Build a meta payload table for this time source. ----@param accuracy 'low'|'high' +---@param accuracy_seconds number? ---@return table -local function build_meta(accuracy) +local function build_meta(accuracy_seconds) return { provider = 'hal', source = 'ntp', version = 1, - accuracy = accuracy, + accuracy_seconds = accuracy_seconds, } end @@ -111,12 +136,17 @@ function TimeDriver:_ntpd_monitor() local stratum = ntp_data.stratum local now_synced = stratum ~= 16 local was_synced = self.synced + local accuracy_seconds = accuracy_for_stratum(stratum) -- Always update retained state, even if sync status did not change, -- so that the latest stratum value is always visible to subscribers. local ok, emit_err = emit( self.cap_emit_ch, self.id, 'state', 'synced', - { synced = now_synced, stratum = stratum } + { + synced = now_synced, + stratum = stratum, + accuracy_seconds = accuracy_seconds, + } ) if not ok then log.warn("Time Driver: failed to emit state:", emit_err) @@ -124,10 +154,9 @@ function TimeDriver:_ntpd_monitor() -- Update accuracy metadata only on a sync/unsync transition. if now_synced ~= was_synced then - local accuracy = now_synced and 'high' or 'low' local meta_ok, meta_err = emit( self.cap_emit_ch, self.id, 'meta', 'source', - build_meta(accuracy) + build_meta(accuracy_seconds) ) if not meta_ok then log.warn("Time Driver: failed to emit meta:", meta_err) @@ -139,7 +168,10 @@ function TimeDriver:_ntpd_monitor() log.debug("Time Driver: NTP synced, stratum =", stratum) local ev_ok, ev_err = emit( self.cap_emit_ch, self.id, 'event', 'synced', - { stratum = stratum } + { + stratum = stratum, + accuracy_seconds = accuracy_seconds, + } ) if not ev_ok then log.warn("Time Driver: failed to emit synced event:", ev_err) @@ -148,7 +180,10 @@ function TimeDriver:_ntpd_monitor() log.debug("Time Driver: NTP unsynced, stratum =", stratum) local ev_ok, ev_err = emit( self.cap_emit_ch, self.id, 'event', 'unsynced', - { stratum = stratum } + { + stratum = stratum, + accuracy_seconds = accuracy_seconds, + } ) if not ev_ok then log.warn("Time Driver: failed to emit unsynced event:", ev_err) @@ -216,10 +251,10 @@ function TimeDriver:start() return false, "capabilities not applied" end - -- Publish initial meta (accuracy is low; sysntpd has only just restarted). + -- Publish initial meta (accuracy unknown until first NTP update). local meta_ok, meta_err = emit( self.cap_emit_ch, self.id, 'meta', 'source', - build_meta('low') + build_meta(nil) ) if not meta_ok then log.warn("Time Driver: failed to emit initial meta:", meta_err) From 94adbcbd961c3454f152b3b90b1706d5bf1027c9 Mon Sep 17 00:00:00 2001 From: Ryan Name Date: Fri, 13 Mar 2026 18:15:41 +0000 Subject: [PATCH 03/16] Refactor capability constructors to remove unused class parameter and set capability type directly --- src/services/hal/types/capabilities.lua | 30 ++++++++++--------------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/src/services/hal/types/capabilities.lua b/src/services/hal/types/capabilities.lua index 12c51c50..7139c1f2 100644 --- a/src/services/hal/types/capabilities.lua +++ b/src/services/hal/types/capabilities.lua @@ -102,42 +102,38 @@ function new.ModemCapability(id, control_ch) return new.Capability('modem', id, control_ch, offerings) end ----@param class CapabilityClass ---@param id CapabilityId ---@param control_ch Channel ---@return Capability? ---@return string error -function new.GeoCapability(class, id, control_ch) +function new.GeoCapability(id, control_ch) local offerings = {} - return new.Capability(class, id, control_ch, offerings) + return new.Capability('geo', id, control_ch, offerings) end ----@param class CapabilityClass ---@param id CapabilityId ---@param control_ch Channel ---@return Capability? ---@return string error -function new.TimeCapability(class, id, control_ch) +function new.TimeCapability(id, control_ch) local offerings = {} - return new.Capability(class, id, control_ch, offerings) + return new.Capability('time', id, control_ch, offerings) end ----@param class CapabilityClass ---@param id CapabilityId ---@param control_ch Channel ---@return Capability? ---@return string error -function new.NetworkCapability(class, id, control_ch) +function new.NetworkCapability(id, control_ch) local offerings = {} - return new.Capability(class, id, control_ch, offerings) + return new.Capability('network', id, control_ch, offerings) end ----@param class CapabilityClass ---@param id CapabilityId ---@param control_ch Channel ---@return Capability? ---@return string error -function new.WirelessCapability(class, id, control_ch) +function new.WirelessCapability(id, control_ch) local offerings = { 'set_channels', 'set_country', @@ -149,15 +145,14 @@ function new.WirelessCapability(class, id, control_ch) 'clear_radio_config', 'apply' } - return new.Capability(class, id, control_ch, offerings) + return new.Capability('wireless', id, control_ch, offerings) end ----@param class CapabilityClass ---@param id CapabilityId ---@param control_ch Channel ---@return Capability? ---@return string error -function new.BandCapability(class, id, control_ch) +function new.BandCapability(id, control_ch) local offerings = { 'set_log_level', 'set_kicking', @@ -174,19 +169,18 @@ function new.BandCapability(class, id, control_ch) 'set_networking', 'apply' } - return new.Capability(class, id, control_ch, offerings) + return new.Capability('band', id, control_ch, offerings) end ----@param class CapabilityClass ---@param id CapabilityId ---@param control_ch Channel ---@return Capability? ---@return string error -function new.SerialCapability(class, id, control_ch) +function new.SerialCapability(id, control_ch) local offerings = { 'open', 'close', 'write' } - return new.Capability(class, id, control_ch, offerings) + return new.Capability('serial', id, control_ch, offerings) end ---@param id CapabilityId From 05334c30749feea00093f8351e186762251781f4 Mon Sep 17 00:00:00 2001 From: Ryan Name Date: Mon, 16 Mar 2026 10:20:24 +0000 Subject: [PATCH 04/16] time service --- src/services/time.lua | 192 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 192 insertions(+) create mode 100644 src/services/time.lua diff --git a/src/services/time.lua b/src/services/time.lua new file mode 100644 index 00000000..be7625c7 --- /dev/null +++ b/src/services/time.lua @@ -0,0 +1,192 @@ +-- services/time.lua +-- +-- Time service: +-- - discovers the first HAL time capability via {'cap','time','+','meta','source'} +-- - consumes retained state + sync/unsync events from that capability +-- - publishes retained {'svc','time','synced'} for system-wide time trust state +-- - nudges fibers alarm wall-clock handling when sync transitions occur + +local fibers = require "fibers" +local op = require "fibers.op" +local alarm = require "fibers.alarm" +local time_utils = require "fibers.utils.time" + +local log = require "services.log" + +local perform = fibers.perform +local now = fibers.now + +local M = {} + +---@return table +local function t(...) + return { ... } +end + +---@param conn Connection +---@param name string +---@param state string +---@param extra table? +---@return nil +local function publish_status(conn, name, state, extra) + local payload = { state = state, ts = now() } + if type(extra) == 'table' then + for k, v in pairs(extra) do payload[k] = v end + end + conn:retain(t('svc', name, 'status'), payload) +end + +---@param conn Connection +---@param synced boolean +---@return nil +local function publish_synced(conn, synced) + conn:retain(t('svc', 'time', 'synced'), synced) +end + +---@param conn Connection +---@param event_name 'synced'|'unsynced' +---@param payload table? +---@return nil +local function publish_transition_event(conn, event_name, payload) + conn:publish(t('svc', 'time', 'event', event_name), payload or {}) +end + +---@param payload any +---@return boolean? synced +local function synced_from_state_payload(payload) + if type(payload) ~= 'table' then return nil end + if type(payload.synced) ~= 'boolean' then return nil end + return payload.synced +end + +---@param state table +---@param conn Connection +---@param is_synced boolean +---@param payload table? +---@return nil +local function apply_sync_state(state, conn, is_synced, payload) + if state.current_synced ~= is_synced then + publish_synced(conn, is_synced) + if is_synced then + publish_transition_event(conn, 'synced', payload) + else + publish_transition_event(conn, 'unsynced', payload) + end + state.current_synced = is_synced + end + + -- New fibers core has set_time_source/time_changed instead of + -- install_alarm_handler/clock_synced/clock_desynced. + if is_synced then + if not state.time_source_installed then + local ok, err = pcall(alarm.set_time_source, time_utils.realtime) + if ok then + state.time_source_installed = true + else + log.warn("TIME: failed to set alarm time source:", tostring(err)) + end + else + alarm.time_changed() + end + end +end + +---@param conn Connection +---@param cap_id CapabilityId +---@return nil +local function monitor_capability(conn, cap_id) + local sub_state = conn:subscribe(t('cap', 'time', cap_id, 'state', 'synced'), { + queue_len = 10, + full = 'drop_oldest', + }) + local sub_synced = conn:subscribe(t('cap', 'time', cap_id, 'event', 'synced'), { + queue_len = 20, + full = 'drop_oldest', + }) + local sub_unsynced = conn:subscribe(t('cap', 'time', cap_id, 'event', 'unsynced'), { + queue_len = 20, + full = 'drop_oldest', + }) + + local state = { + ---@type boolean? + current_synced = nil, + ---@type boolean + time_source_installed = false, + } + + -- Read retained initial state once, then rely on events for transitions. + do + local msg, err = perform(sub_state:recv_op()) + if msg then + local is_synced = synced_from_state_payload(msg.payload) + if is_synced ~= nil then + apply_sync_state(state, conn, is_synced, msg.payload) + end + else + log.warn("TIME: failed to read initial state:", err) + end + sub_state:unsubscribe() + end + + while true do + local which, msg, err = perform(op.named_choice({ + synced = sub_synced:recv_op(), + unsynced = sub_unsynced:recv_op(), + })) + + if not msg then + log.warn("TIME: capability monitor subscription closed:", err) + return + end + + if which == 'synced' then + apply_sync_state(state, conn, true, msg.payload) + elseif which == 'unsynced' then + apply_sync_state(state, conn, false, msg.payload) + end + end +end + +---@param conn Connection +---@param opts table? +---@return nil +function M.start(conn, opts) + opts = opts or {} + local name = opts.name or 'time' + + publish_status(conn, name, 'starting') + + fibers.current_scope():finally(function() + local st, primary = fibers.current_scope():status() + if st == 'failed' then + log.error(("TIME: scope failed - %s"):format(tostring(primary))) + end + publish_status(conn, name, 'stopped', { reason = primary or st }) + end) + + local sub_meta = conn:subscribe(t('cap', 'time', '+', 'meta', 'source'), { + queue_len = 10, + full = 'drop_oldest', + }) + + publish_status(conn, name, 'running') + + while true do + local msg, err = perform(sub_meta:recv_op()) + if not msg then + log.warn("TIME: capability discovery subscription closed:", err) + return + end + + local topic = msg.topic + local cap_id = topic and topic[3] + if cap_id ~= nil then + log.trace("TIME: selected first time capability:", tostring(cap_id)) + monitor_capability(conn, cap_id) + return + end + end +end + +return M From 91410bfc2a91b77fd711d4368a06bc45e891d791 Mon Sep 17 00:00:00 2001 From: Ryan Name Date: Mon, 16 Mar 2026 10:20:41 +0000 Subject: [PATCH 05/16] now installing time hotplug ntp --- src/services/hal/managers/time.lua | 58 ++++++++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/src/services/hal/managers/time.lua b/src/services/hal/managers/time.lua index efc2475d..809438ea 100644 --- a/src/services/hal/managers/time.lua +++ b/src/services/hal/managers/time.lua @@ -6,6 +6,7 @@ local hal_types = require "services.hal.types.core" local fibers = require "fibers" local op = require "fibers.op" local sleep = require "fibers.sleep" +local exec = require "fibers.io.exec" -- Other modules local log = require "services.log" @@ -13,6 +14,8 @@ local log = require "services.log" -- Constants local STOP_TIMEOUT = 5.0 -- seconds +local HOTPLUG_DIR = "/etc/hotplug.d/ntp" +local HOTPLUG_SCRIPT_NAME = "ntp" ---@alias TimeDriverHandle table @@ -20,8 +23,8 @@ local STOP_TIMEOUT = 5.0 -- seconds ---@field scope Scope ---@field started boolean ---@field driver TimeDriverHandle? ----@field dev_ev_ch Channel ----@field cap_emit_ch Channel +---@field dev_ev_ch Channel? +---@field cap_emit_ch Channel? local TimeManager = { started = false, driver = nil, @@ -82,6 +85,51 @@ local function stop_existing_driver() TimeManager.driver = nil end +---Run a command and require a zero exit status. +---@param ... string argv +---@return boolean ok +---@return string? error +local function run_checked(...) + local status, code, _, err = fibers.perform(exec.command(...):run_op()) + if status ~= 'exited' or code ~= 0 then + return false, tostring(err or ("exit code " .. tostring(code))) + end + return true, nil +end + +---Resolve the directory that contains this manager file. +---@return string dir +local function manager_dir() + local source = debug.getinfo(1, 'S').source or '' + source = source:gsub('^@', '') + return source:match('^(.*)/[^/]+$') or '.' +end + +---Install the NTP hotplug script into /etc/hotplug.d/ntp. +---@return boolean ok +---@return string? error +local function install_ntp_hotplug_script() + local src = manager_dir() .. "/time/" .. HOTPLUG_SCRIPT_NAME + local dst = HOTPLUG_DIR .. "/" .. HOTPLUG_SCRIPT_NAME + + local ok, err = run_checked("mkdir", "-p", HOTPLUG_DIR) + if not ok then + return false, "failed to create hotplug directory: " .. tostring(err) + end + + ok, err = run_checked("cp", src, dst) + if not ok then + return false, "failed to copy hotplug script from " .. src .. ": " .. tostring(err) + end + + ok, err = run_checked("chmod", "+x", dst) + if not ok then + return false, "failed to chmod hotplug script: " .. tostring(err) + end + + return true, nil +end + ---Initialise, apply capabilities, and start a new TimeDriver. Stops any previously ---running driver first. Called from within a manager-scope fiber so exec and channel ---operations are safe. @@ -89,6 +137,12 @@ end local function bring_up_driver() stop_existing_driver() + local installed, install_err = install_ntp_hotplug_script() + if not installed then + log.error("Time Manager: failed to install NTP hotplug script:", install_err) + return + end + local driver, new_err = time_driver.new() if not driver then log.error("Time Manager: failed to create driver:", new_err) From 6a8279bc46bd6357cddd14c43cb0af72aaeadbfb Mon Sep 17 00:00:00 2001 From: Ryan Name Date: Mon, 16 Mar 2026 12:24:51 +0000 Subject: [PATCH 06/16] Add detailed logging for time service status and sync state transitions --- src/services/time.lua | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/services/time.lua b/src/services/time.lua index be7625c7..5a1a3757 100644 --- a/src/services/time.lua +++ b/src/services/time.lua @@ -33,6 +33,7 @@ local function publish_status(conn, name, state, extra) if type(extra) == 'table' then for k, v in pairs(extra) do payload[k] = v end end + log.trace(("TIME: service status -> %s"):format(tostring(state))) conn:retain(t('svc', name, 'status'), payload) end @@ -66,6 +67,9 @@ end ---@return nil local function apply_sync_state(state, conn, is_synced, payload) if state.current_synced ~= is_synced then + log.info(( + "TIME: sync state transition %s -> %s" + ):format(tostring(state.current_synced), tostring(is_synced))) publish_synced(conn, is_synced) if is_synced then publish_transition_event(conn, 'synced', payload) @@ -79,9 +83,11 @@ local function apply_sync_state(state, conn, is_synced, payload) -- install_alarm_handler/clock_synced/clock_desynced. if is_synced then if not state.time_source_installed then + log.info("TIME: installing alarm time source from realtime clock") local ok, err = pcall(alarm.set_time_source, time_utils.realtime) if ok then state.time_source_installed = true + log.info("TIME: alarm time source installed") else log.warn("TIME: failed to set alarm time source:", tostring(err)) end @@ -95,6 +101,7 @@ end ---@param cap_id CapabilityId ---@return nil local function monitor_capability(conn, cap_id) + log.info(("TIME: starting monitor for capability id=%s"):format(tostring(cap_id))) local sub_state = conn:subscribe(t('cap', 'time', cap_id, 'state', 'synced'), { queue_len = 10, full = 'drop_oldest', @@ -119,9 +126,12 @@ local function monitor_capability(conn, cap_id) do local msg, err = perform(sub_state:recv_op()) if msg then + log.info("TIME: received initial retained sync state") local is_synced = synced_from_state_payload(msg.payload) if is_synced ~= nil then apply_sync_state(state, conn, is_synced, msg.payload) + else + log.warn("TIME: initial retained state payload missing boolean synced field") end else log.warn("TIME: failed to read initial state:", err) @@ -144,6 +154,8 @@ local function monitor_capability(conn, cap_id) apply_sync_state(state, conn, true, msg.payload) elseif which == 'unsynced' then apply_sync_state(state, conn, false, msg.payload) + else + log.warn("TIME: unknown event source in monitor loop:", tostring(which)) end end end @@ -154,6 +166,7 @@ end function M.start(conn, opts) opts = opts or {} local name = opts.name or 'time' + log.trace("TIME: starting") publish_status(conn, name, 'starting') @@ -169,6 +182,7 @@ function M.start(conn, opts) queue_len = 10, full = 'drop_oldest', }) + log.trace("TIME: subscribed to time capability meta announcements") publish_status(conn, name, 'running') @@ -185,6 +199,8 @@ function M.start(conn, opts) log.trace("TIME: selected first time capability:", tostring(cap_id)) monitor_capability(conn, cap_id) return + else + log.warn("TIME: capability meta message missing capability id token") end end end From 0d140b5b9ed74e83c6b18cdad39f85e1384f79b5 Mon Sep 17 00:00:00 2001 From: Ryan Name Date: Mon, 16 Mar 2026 12:25:22 +0000 Subject: [PATCH 07/16] Add trace logging for received configuration in TimeManager --- src/services/hal/managers/time.lua | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/services/hal/managers/time.lua b/src/services/hal/managers/time.lua index 809438ea..b17cdac8 100644 --- a/src/services/hal/managers/time.lua +++ b/src/services/hal/managers/time.lua @@ -245,6 +245,8 @@ function TimeManager.apply_config(config) -- luacheck: ignore config return false, "channels not initialized (start must be called first)" end + log.trace("Time manager: recieved config") + local ok, spawn_err = TimeManager.scope:spawn(function() bring_up_driver() end) From 37cf91a00f425293fb69bc44fac5731d2dc2c827 Mon Sep 17 00:00:00 2001 From: Ryan Name Date: Mon, 16 Mar 2026 12:25:37 +0000 Subject: [PATCH 08/16] Add numeric string coercion and NTP event script for UBUS --- src/services/hal/drivers/time.lua | 23 +++++++++++++++++++++++ src/services/hal/managers/time/ntp | 16 ++++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 src/services/hal/managers/time/ntp diff --git a/src/services/hal/drivers/time.lua b/src/services/hal/drivers/time.lua index 78182899..cdf7c389 100644 --- a/src/services/hal/drivers/time.lua +++ b/src/services/hal/drivers/time.lua @@ -54,6 +54,28 @@ local function emit(emit_ch, id, mode, key, data) return true end +---Recursively convert numeric-looking strings into numbers. +---@param value any +---@return any +local function coerce_numeric_strings(value) + if type(value) == 'string' then + local n = tonumber(value) + if n ~= nil then + return n + end + return value + end + + if type(value) == 'table' then + for k, v in pairs(value) do + value[k] = coerce_numeric_strings(v) + end + return value + end + + return value +end + ---Convert NTP stratum to an estimated absolute accuracy in seconds. ---Returns nil when unsynced (stratum >= 16) or invalid input. ---@param stratum number @@ -131,6 +153,7 @@ function TimeDriver:_ntpd_monitor() if not decoded then log.warn("Time Driver: failed to decode hotplug.ntp event:", line) else + decoded = coerce_numeric_strings(decoded) local ntp_data = decoded["hotplug.ntp"] if type(ntp_data) == 'table' and type(ntp_data.stratum) == 'number' then local stratum = ntp_data.stratum diff --git a/src/services/hal/managers/time/ntp b/src/services/hal/managers/time/ntp new file mode 100644 index 00000000..972f5136 --- /dev/null +++ b/src/services/hal/managers/time/ntp @@ -0,0 +1,16 @@ +#!/bin/sh + +# Define the UBUS event name +EVENT_NAME="hotplug.ntp" + +# Create JSON payload for UBUS event +JSON_PAYLOAD="{ + \"action\": \"$ACTION\", + \"freq_drift_ppm\": \"$freq_drift_ppm\", + \"offset\": \"$offset\", + \"stratum\": \"$stratum\", + \"poll_interval\": \"$poll_interval\" +}" + +# Send event to UBUS +ubus send "$EVENT_NAME" "$JSON_PAYLOAD" From e02f3d56a0de896e2ec4d7ae40cf1f6b6e008ef9 Mon Sep 17 00:00:00 2001 From: Ryan Name Date: Mon, 16 Mar 2026 12:28:12 +0000 Subject: [PATCH 09/16] Added time manager config --- src/configs/config.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/configs/config.json b/src/configs/config.json index 1d0ff0f9..a91a41fc 100644 --- a/src/configs/config.json +++ b/src/configs/config.json @@ -10,7 +10,8 @@ "name": "state", "root": "/tmp/dc-states/" } - ] + ], + "time": {} }, "gsm": { "modems": { From 71a944b8299dd15d90f66c2ad8dd16f31c5e0fc7 Mon Sep 17 00:00:00 2001 From: Ryan Name Date: Tue, 17 Mar 2026 10:06:18 +0000 Subject: [PATCH 10/16] Add initial documentation for Time Service and its capabilities --- docs/specs/time.md | 79 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 docs/specs/time.md diff --git a/docs/specs/time.md b/docs/specs/time.md new file mode 100644 index 00000000..9a983b75 --- /dev/null +++ b/docs/specs/time.md @@ -0,0 +1,79 @@ +# Time Service + +## Description + +The time service listens to time capabilities for sync and unsync events. It uses the events to sync and unsync the alarm module of fibers and broadcast sync/unsync events to the bus for other services to listen to. + +## Time capability + +There is initially only one time capability, provided by the time driver in HAL. In the future we can discover multiple time capabilities and balance multiple sources of time intelligently (e.g. prefer the capability reporting the lowest stratum). + +For now, the time service subscribes to `{'cap', 'time', '+', 'meta', 'source'}` and uses the **first** capability announced. Subsequent announcements are ignored. + +## Bus Outputs + +### Service status (retained) + +Topic: `{'svc', 'time', 'status'}` + +```lua +{ + state = 'starting' | 'running' | 'stopped', + ts = , +} +``` + +### Time synced state (retained) + +Topic: `{'svc', 'time', 'synced'}` + +Payload: `true` or `false` + +Published whenever the overall sync state changes. This is the authoritative "is system time trustworthy" signal for other services. It is retained so services that start later immediately receive the current state. + +### Time transition events (non-retained) + +Topics: +- `{'svc', 'time', 'event', 'synced'}` +- `{'svc', 'time', 'event', 'unsynced'}` + +Published on state transitions for consumers that need edge-triggered behaviour. + +## Service Flow + +```mermaid +flowchart TD + St[Start] --> B(Publish status: starting) + B --> C(Subscribe to cap/time/+/meta/source) + C --> D(Publish status: running) + D --> E{Wait for first time capability meta message or context done} + E -->|context done| Z[Publish status: stopped] + E -->|first capability meta received| F(Extract uuid from meta topic) + F --> G(Subscribe to cap/time/uuid/state/synced) + G --> H(Consume retained state and apply sync state) + H --> I(Subscribe to cap/time/uuid/event/synced) + I --> J(Subscribe to cap/time/uuid/event/unsynced) + J --> K{Wait for state, synced event, unsynced event, or context done} + K -->|synced event| L(Apply synced: retain svc/time/synced=true, emit transition event) + L --> K + K -->|unsynced event| M(Apply unsynced: retain svc/time/synced=false, emit transition event) + M --> K + K -->|state update| N(Apply state payload synced bool) + N --> K + K -->|context done| Z +``` + +The retained `{'cap', 'time', , 'state', 'synced'}` payload is consumed immediately after subscription to initialise sync state before any events arrive. + +With the new fibers alarm API, the service calls: +- `alarm.set_time_source(fibers.utils.time.realtime)` on first synced state +- `alarm.time_changed()` on subsequent synced transitions/events + +There is no direct equivalent of `clock_desynced` in the new API; unsynced updates still propagate over bus outputs. + +## Architecture + +- Everything runs in a single fiber — no child fibers needed. The fiber blocks waiting for the first capability, then transitions directly into the event loop for that capability. +- The service does not interact with the OS directly — all time source information arrives through the capability published by the time driver in HAL. +- Use `finally` to log shutdown reason and publish `stopped` status. + From 19784d0d7b206b1369623ba31283a186b9d88944 Mon Sep 17 00:00:00 2001 From: Ryan Name Date: Tue, 17 Mar 2026 12:33:12 +0000 Subject: [PATCH 11/16] Add unsubscribe logic for capability discovery in time service --- src/services/time.lua | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/services/time.lua b/src/services/time.lua index 5a1a3757..7f80e0f2 100644 --- a/src/services/time.lua +++ b/src/services/time.lua @@ -189,6 +189,7 @@ function M.start(conn, opts) while true do local msg, err = perform(sub_meta:recv_op()) if not msg then + sub_meta:unsubscribe() log.warn("TIME: capability discovery subscription closed:", err) return end @@ -196,6 +197,7 @@ function M.start(conn, opts) local topic = msg.topic local cap_id = topic and topic[3] if cap_id ~= nil then + sub_meta:unsubscribe() log.trace("TIME: selected first time capability:", tostring(cap_id)) monitor_capability(conn, cap_id) return From a884e2691c12d74047b5286117f412a9d5852c02 Mon Sep 17 00:00:00 2001 From: Ryan Name Date: Tue, 17 Mar 2026 12:33:47 +0000 Subject: [PATCH 12/16] Implement TimeBackend interface and OpenWrt-specific NTP monitoring --- src/services/hal/backends/time/contract.lua | 26 +++ src/services/hal/backends/time/provider.lua | 49 +++++ .../backends/time/providers/openwrt/impl.lua | 182 ++++++++++++++++++ .../backends/time/providers/openwrt/init.lua | 48 +++++ src/services/hal/drivers/time.lua | 91 +++------ src/services/hal/types/time.lua | 57 ++++++ 6 files changed, 394 insertions(+), 59 deletions(-) create mode 100644 src/services/hal/backends/time/contract.lua create mode 100644 src/services/hal/backends/time/provider.lua create mode 100644 src/services/hal/backends/time/providers/openwrt/impl.lua create mode 100644 src/services/hal/backends/time/providers/openwrt/init.lua create mode 100644 src/services/hal/types/time.lua diff --git a/src/services/hal/backends/time/contract.lua b/src/services/hal/backends/time/contract.lua new file mode 100644 index 00000000..c1da2caf --- /dev/null +++ b/src/services/hal/backends/time/contract.lua @@ -0,0 +1,26 @@ +---@class TimeBackend +---@field start_ntp_monitor fun(self: TimeBackend): boolean, string +---@field ntp_event_op fun(self: TimeBackend): Op +---@field stop fun(self: TimeBackend): boolean, string + +local BACKEND_FUNCTIONS = { + "start_ntp_monitor", + "ntp_event_op", + "stop", +} + +---Check that a time backend provides all required functions. +---@param backend TimeBackend +---@return string error Empty string on success. +local function validate(backend) + for _, func in ipairs(BACKEND_FUNCTIONS) do + if type(backend[func]) ~= "function" then + return "Missing required function: " .. func + end + end + return "" +end + +return { + validate = validate +} diff --git a/src/services/hal/backends/time/provider.lua b/src/services/hal/backends/time/provider.lua new file mode 100644 index 00000000..5377c00c --- /dev/null +++ b/src/services/hal/backends/time/provider.lua @@ -0,0 +1,49 @@ +---Time backend provider factory. +---Selects the appropriate TimeBackend implementation based on the runtime platform. + +local contract = require "services.hal.backends.time.contract" + +local BACKENDS = { + "openwrt" +} + +--- Select and initialize the backend implementation +---@return table backend_impl +local function get_backend_impl() + local backend_impl = nil + for _, backend_name in ipairs(BACKENDS) do + local ok, backend_mod = pcall(require, "services.hal.backends.time.providers." .. backend_name .. ".init") + if ok and type(backend_mod) == "table" and backend_mod.is_supported and backend_mod.is_supported() then + backend_impl = backend_mod.backend + break + end + end + + if backend_impl == nil then + error("No supported time backend found") + end + + return backend_impl +end + +---Create a new TimeBackend instance. +--- +---Detects the platform and instantiates the appropriate backend implementation. +---Fails with an error if no supported backend is found. +--- +---@return TimeBackend +local function new() + local backend_impl = get_backend_impl() + local backend = backend_impl.new() + + local iface_err = contract.validate(backend) + if iface_err ~= "" then + error("Time backend does not implement required interface: " .. tostring(iface_err)) + end + + return backend +end + +return { + new = new, +} diff --git a/src/services/hal/backends/time/providers/openwrt/impl.lua b/src/services/hal/backends/time/providers/openwrt/impl.lua new file mode 100644 index 00000000..89f593e2 --- /dev/null +++ b/src/services/hal/backends/time/providers/openwrt/impl.lua @@ -0,0 +1,182 @@ +---OpenWrt-specific TimeBackend implementation using ubus hotplug.ntp events. + +-- Service modules +local log = require "services.log" +local time_types = require "services.hal.types.time" + +-- Fibers modules +local op = require "fibers.op" +local exec = require "fibers.io.exec" + +-- Other modules +local cjson = require "cjson.safe" + +---@class OpenWrtTimeBackend : TimeBackend +---@field ntp_monitor_stream Stream? Current ubus listen stream +---@field ntp_monitor_cmd Command? Current ubus listen command +local OpenWrtTimeBackend = {} +OpenWrtTimeBackend.__index = OpenWrtTimeBackend + +---- Private Utilities ---- + +---Recursively convert numeric-looking strings into numbers. +---@param value any +---@return any +local function coerce_numeric_strings(value) + if type(value) == 'string' then + local n = tonumber(value) + if n ~= nil then + return n + end + return value + end + + if type(value) == 'table' then + for k, v in pairs(value) do + value[k] = coerce_numeric_strings(v) + end + return value + end + + return value +end + +---Parse a single ubus listen hotplug.ntp line into a strongly typed NTPEvent. +--- +---Called as a wrap function on read_line_op(), receiving (line, read_err). +---Returns: +--- (NTPEvent, nil) -- success +--- (nil, nil) -- parse error; caller should retry with next line +--- (nil, err_string) -- fatal error (stream closed or read error) +--- +---@param line string? Line from ubus listen (nil on EOF) +---@param read_err any? Read error from the stream (nil on success) +---@return NTPEvent? +---@return string? +local function parse_ntp_event_line(line, read_err) + -- Fatal: stream read error + if read_err ~= nil then + return nil, "read error: " .. tostring(read_err) + end + + -- Fatal: EOF / stream closed + if line == nil or line == "" then + return nil, "stream closed" + end + + local decoded = cjson.decode(line) + if not decoded then + log.warn("OpenWrt Time Backend: failed to decode hotplug.ntp event:", line) + return nil, nil -- non-fatal, retry + end + + decoded = coerce_numeric_strings(decoded) + local ntp_data = decoded["hotplug.ntp"] + if type(ntp_data) ~= 'table' then + log.warn("OpenWrt Time Backend: no hotplug.ntp data in event:", line) + return nil, nil -- non-fatal, retry + end + + if type(ntp_data.stratum) ~= 'number' then + log.warn("OpenWrt Time Backend: stratum field missing or not a number:", line) + return nil, nil -- non-fatal, retry + end + + -- Extract fields with sensible defaults for optional fields + local action = ntp_data.action or "unknown" + local offset = ntp_data.offset or 0 + local freq_drift_ppm = ntp_data.freq_drift_ppm or 0 + + local ntp_event, event_err = time_types.new.NTPEvent( + ntp_data.stratum, + action, + offset, + freq_drift_ppm + ) + if not ntp_event then + log.warn("OpenWrt Time Backend: failed to construct NTPEvent:", event_err) + return nil, nil -- non-fatal, retry + end + + -- Preserve additional backend fields on the event for observability + for k, v in pairs(ntp_data) do + if ntp_event[k] == nil then + ntp_event[k] = v + end + end + + return ntp_event, nil +end + +---- Backend Lifecycle ---- + +---Start monitoring NTP synchronization events via ubus hotplug.ntp. +--- +---@return boolean ok +---@return string error Empty string on success. +function OpenWrtTimeBackend:start_ntp_monitor() + if self.ntp_monitor_cmd then + return false, "NTP monitor already running" + end + + -- Start ubus listen command bound to current scope + self.ntp_monitor_cmd = exec.command{ + 'ubus', 'listen', 'hotplug.ntp', + stdin = 'null', + stdout = 'pipe', + stderr = 'null', + } + local stream, stream_err = self.ntp_monitor_cmd:stdout_stream() + if not stream then + return false, "failed to start ubus listen: " .. tostring(stream_err) + end + + self.ntp_monitor_stream = stream + log.trace("OpenWrt Time Backend: NTP monitor started") + return true, "" +end + +---Get an operation that yields the next NTP event from the hotplug.ntp stream. +--- +---Returns (NTPEvent, nil) on success, (nil, nil) on a parse error (caller should +---retry), or (nil, err_string) on a fatal error (stream closed or read error). +--- +---@return Op +function OpenWrtTimeBackend:ntp_event_op() + return op.guard(function() + if not self.ntp_monitor_stream then + error("NTP monitor not started") + end + return self.ntp_monitor_stream:read_line_op():wrap(parse_ntp_event_line) + end) +end + +---Stop the NTP monitor and clean up resources. +--- +---@return boolean ok +---@return string error +function OpenWrtTimeBackend:stop() + if self.ntp_monitor_cmd then + self.ntp_monitor_cmd:kill() + end + self.ntp_monitor_stream = nil + self.ntp_monitor_cmd = nil + log.trace("OpenWrt Time Backend: NTP monitor stopped") + return true, "" +end + +---- Constructor ---- + +---Create a new OpenWrt time backend. +--- +---@return OpenWrtTimeBackend +local function new() + return setmetatable({ + ntp_monitor_stream = nil, + ntp_monitor_cmd = nil, + }, OpenWrtTimeBackend) +end + +return { + new = new, +} diff --git a/src/services/hal/backends/time/providers/openwrt/init.lua b/src/services/hal/backends/time/providers/openwrt/init.lua new file mode 100644 index 00000000..17b57a30 --- /dev/null +++ b/src/services/hal/backends/time/providers/openwrt/init.lua @@ -0,0 +1,48 @@ +local file = require "fibers.io.file" +local exec = require "fibers.io.exec" +local fibers = require "fibers" + +local backend = require "services.hal.backends.time.providers.openwrt.impl" + +local function is_linux() + local fh, open_err = file.open("/proc/version", "r") + if not fh or open_err then + return false + end + + local content, read_err = fh:read_all() + fh:close() + if not content or read_err then + return false + end + + return content:lower():find("linux") ~= nil +end + +--- Returns true if `ubus` is available on the system +---@return boolean ok +local function has_ubus() + local cmd = exec.command{ + "ubus", + stdin = "null", + stdout = "pipe", + stderr = "stdout" + } + local _, status, code = fibers.perform(cmd:combined_output_op()) + if status == "exited" and code == 0 then + return true + end + return false +end + +--- Returns true if this is a supported OpenWrt system +---@return boolean +local function is_supported() + local res = is_linux() and has_ubus() + return res +end + +return { + is_supported = is_supported, + backend = backend +} diff --git a/src/services/hal/drivers/time.lua b/src/services/hal/drivers/time.lua index cdf7c389..0f63dfaf 100644 --- a/src/services/hal/drivers/time.lua +++ b/src/services/hal/drivers/time.lua @@ -3,7 +3,7 @@ local hal_types = require "services.hal.types.core" local cap_types = require "services.hal.types.capabilities" -- Backend modules -local ubus_backend = require "services.hal.backends.ubus" +local time_backend_provider = require "services.hal.backends.time.provider" -- Service modules local log = require "services.log" @@ -16,13 +16,13 @@ local sleep = require "fibers.sleep" local exec = require "fibers.io.exec" -- Other modules -local cjson = require "cjson.safe" local uuid = require "uuid" ---@class TimeDriver ---@field id CapabilityId UUID for this time source capability ---@field cap_emit_ch Channel Capability emit channel (Emit messages) ---@field scope Scope Child scope owning the monitor fiber +---@field backend TimeBackend Time backend for platform-specific NTP monitoring ---@field control_ch Channel RPC control channel (no offerings currently, reserved) ---@field initialised boolean ---@field caps_applied boolean @@ -54,28 +54,6 @@ local function emit(emit_ch, id, mode, key, data) return true end ----Recursively convert numeric-looking strings into numbers. ----@param value any ----@return any -local function coerce_numeric_strings(value) - if type(value) == 'string' then - local n = tonumber(value) - if n ~= nil then - return n - end - return value - end - - if type(value) == 'table' then - for k, v in pairs(value) do - value[k] = coerce_numeric_strings(v) - end - return value - end - - return value -end - ---Convert NTP stratum to an estimated absolute accuracy in seconds. ---Returns nil when unsynced (stratum >= 16) or invalid input. ---@param stratum number @@ -115,8 +93,8 @@ end ---- Monitor Fiber ---- ----Listen to `ubus listen hotplug.ntp` output and emit state/events via the cap ----emit channel. Runs in self.scope. Exits on stream close, read error, or scope +---Listen to NTP synchronization events via the time backend and emit state/events via +---the cap emit channel. Runs in self.scope. Exits on stream close, read error, or scope ---cancellation. Transition events (synced/unsynced) are non-retained; the current ---sync state is always published as a retained state emit on every hotplug event. ---@return nil @@ -125,45 +103,37 @@ function TimeDriver:_ntpd_monitor() log.trace("Time Driver: ntpd monitor exiting") end) - log.trace("Time Driver: ntpd monitor started") - - -- exec.command is bound to the current scope (self.scope) automatically. - -- When self.scope is cancelled the process is terminated, causing read_line to - -- return nil/error and the loop below exits cleanly. - local listen_cmd = ubus_backend.listen('hotplug.ntp') - local stdout, stream_err = listen_cmd:stdout_stream() - if not stdout then - error("Time Driver: failed to start ubus listen: " .. tostring(stream_err)) + -- Start the backend monitor here so the ubus command is bound to this fiber's + -- scope (the driver child scope). Cancelling the driver scope will then kill + -- the underlying process automatically. + local ok, start_err = self.backend:start_ntp_monitor() + if not ok then + log.error("Time Driver: failed to start NTP backend:", start_err) + return end + log.trace("Time Driver: ntpd monitor started") + while true do - local line, read_err = stdout:read_line() - if read_err then - log.error("Time Driver: ubus listen read error:", read_err) + local ntp_event, err = fibers.perform(self.backend:ntp_event_op()) + if err ~= nil then + -- Fatal: stream closed or read error + log.warn("Time Driver: NTP event stream closed:", err) break end - if line == nil then - log.warn("Time Driver: ubus listen stream closed unexpectedly") - break - end - - -- ubus listen output is one JSON object per line: - -- { "hotplug.ntp": { "stratum": } } - local decoded = cjson.decode(line) - if not decoded then - log.warn("Time Driver: failed to decode hotplug.ntp event:", line) - else - decoded = coerce_numeric_strings(decoded) - local ntp_data = decoded["hotplug.ntp"] - if type(ntp_data) == 'table' and type(ntp_data.stratum) == 'number' then - local stratum = ntp_data.stratum + if ntp_event then + local stratum = ntp_event.stratum + -- NTPEvent constructor guarantees stratum is a number, but guard anyway + if type(stratum) ~= 'number' then + log.warn("Time Driver: received NTP event with invalid stratum:", stratum) + else local now_synced = stratum ~= 16 local was_synced = self.synced local accuracy_seconds = accuracy_for_stratum(stratum) -- Always update retained state, even if sync status did not change, -- so that the latest stratum value is always visible to subscribers. - local ok, emit_err = emit( + local emit_ok, emit_err = emit( self.cap_emit_ch, self.id, 'state', 'synced', { synced = now_synced, @@ -171,7 +141,7 @@ function TimeDriver:_ntpd_monitor() accuracy_seconds = accuracy_seconds, } ) - if not ok then + if not emit_ok then log.warn("Time Driver: failed to emit state:", emit_err) end @@ -214,10 +184,9 @@ function TimeDriver:_ntpd_monitor() end self.synced = now_synced - else - log.warn("Time Driver: received unexpected hotplug.ntp payload:", line) end end + -- ntp_event == nil and err == nil: parse error, already logged by backend, retry end end @@ -320,8 +289,9 @@ end ---- Constructor ---- ----Create a new TimeDriver instance. Generates a UUID for the capability id and ----creates a child scope. Must be called from inside a fiber. +---Create a new TimeDriver instance. Generates a UUID for the capability id, +---creates a child scope, and instantiates the platform-specific time backend. +---Must be called from inside a fiber. ---@return TimeDriver? driver ---@return string error Empty string on success. local function new() @@ -330,10 +300,13 @@ local function new() return nil, "failed to create child scope: " .. tostring(sc_err) end + local backend = time_backend_provider.new() + return setmetatable({ id = uuid.new(), cap_emit_ch = nil, scope = scope, + backend = backend, control_ch = channel.new(CONTROL_Q_LEN), initialised = false, caps_applied = false, diff --git a/src/services/hal/types/time.lua b/src/services/hal/types/time.lua new file mode 100644 index 00000000..a83ae329 --- /dev/null +++ b/src/services/hal/types/time.lua @@ -0,0 +1,57 @@ +---@alias NTPAction string + +---@class NTPEvent +---@field stratum number NTP stratum level (0-15: synced, 16: unsynced) +---@field action NTPAction Event action (e.g., "step", "clock", "leap") +---@field offset number Clock offset in seconds +---@field freq_drift_ppm number Frequency drift in parts per million +---@field [string] any Additional fields from backend +local NTPEvent = {} +NTPEvent.__index = NTPEvent + +---@class TimeTypeConstructors +local new = {} + +---Create a new NTPEvent. +--- +---@param stratum number NTP stratum (0-15 synced, 16 unsynced) +---@param action string Event action +---@param offset number Clock offset in seconds +---@param freq_drift_ppm number Frequency drift in ppm +---@return NTPEvent? +---@return string error +function new.NTPEvent(stratum, action, offset, freq_drift_ppm) + if type(stratum) ~= 'number' then + return nil, "invalid stratum" + end + + if type(action) ~= 'string' or action == '' then + return nil, "invalid action" + end + + if type(offset) ~= 'number' then + return nil, "invalid offset" + end + + if type(freq_drift_ppm) ~= 'number' then + return nil, "invalid freq_drift_ppm" + end + + local event = setmetatable({ + stratum = stratum, + action = action, + offset = offset, + freq_drift_ppm = freq_drift_ppm, + }, NTPEvent) + return event, "" +end + +---@class TimeBackend +---@field start_ntp_monitor fun(self: TimeBackend): boolean, string +---@field ntp_event_op fun(self: TimeBackend): Op +---@field stop fun(self: TimeBackend): boolean, string + +return { + NTPEvent = NTPEvent, + new = new, +} From 3579b3058fe7cc1c20b031806d7657b4223c9d8a Mon Sep 17 00:00:00 2001 From: Ryan Name Date: Tue, 17 Mar 2026 12:33:56 +0000 Subject: [PATCH 13/16] Fix typo in log message for received config in TimeManager --- src/services/hal/managers/time.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/hal/managers/time.lua b/src/services/hal/managers/time.lua index b17cdac8..c96a77e0 100644 --- a/src/services/hal/managers/time.lua +++ b/src/services/hal/managers/time.lua @@ -245,7 +245,7 @@ function TimeManager.apply_config(config) -- luacheck: ignore config return false, "channels not initialized (start must be called first)" end - log.trace("Time manager: recieved config") + log.trace("Time manager: received config") local ok, spawn_err = TimeManager.scope:spawn(function() bring_up_driver() From b72e875ca2fc91c0d12acc5443b6fe4f9eb7f10a Mon Sep 17 00:00:00 2001 From: Ryan Name Date: Tue, 17 Mar 2026 12:58:29 +0000 Subject: [PATCH 14/16] Update documentation for ubus availability check in OpenWrt time provider --- src/services/hal/backends/time/providers/openwrt/init.lua | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/services/hal/backends/time/providers/openwrt/init.lua b/src/services/hal/backends/time/providers/openwrt/init.lua index 17b57a30..ec563590 100644 --- a/src/services/hal/backends/time/providers/openwrt/init.lua +++ b/src/services/hal/backends/time/providers/openwrt/init.lua @@ -19,14 +19,14 @@ local function is_linux() return content:lower():find("linux") ~= nil end ---- Returns true if `ubus` is available on the system +--- Returns true if `ubus` is available and the daemon is reachable ---@return boolean ok local function has_ubus() local cmd = exec.command{ - "ubus", + "ubus", "list", stdin = "null", stdout = "pipe", - stderr = "stdout" + stderr = "null" } local _, status, code = fibers.perform(cmd:combined_output_op()) if status == "exited" and code == 0 then From 5d6c99ae1fa5b451bf7d344c7758bfacb9f52c8e Mon Sep 17 00:00:00 2001 From: Ryan Name Date: Tue, 17 Mar 2026 13:09:24 +0000 Subject: [PATCH 15/16] Refactor service flow diagram and update subscription logic for time capability handling --- docs/specs/time.md | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/docs/specs/time.md b/docs/specs/time.md index 9a983b75..13db5a50 100644 --- a/docs/specs/time.md +++ b/docs/specs/time.md @@ -46,24 +46,21 @@ flowchart TD St[Start] --> B(Publish status: starting) B --> C(Subscribe to cap/time/+/meta/source) C --> D(Publish status: running) - D --> E{Wait for first time capability meta message or context done} - E -->|context done| Z[Publish status: stopped] + D --> E{Wait for first time capability meta message or scope done} + E -->|scope done| Z[Publish status: stopped] E -->|first capability meta received| F(Extract uuid from meta topic) - F --> G(Subscribe to cap/time/uuid/state/synced) - G --> H(Consume retained state and apply sync state) - H --> I(Subscribe to cap/time/uuid/event/synced) - I --> J(Subscribe to cap/time/uuid/event/unsynced) - J --> K{Wait for state, synced event, unsynced event, or context done} + F --> G(Subscribe to cap/time/uuid/state/synced\ncap/time/uuid/event/synced\ncap/time/uuid/event/unsynced) + G --> H(Read single retained state message and apply sync state) + H --> H2(Unsubscribe from state/synced) + H2 --> K{Wait for synced event, unsynced event, or scope done} K -->|synced event| L(Apply synced: retain svc/time/synced=true, emit transition event) L --> K K -->|unsynced event| M(Apply unsynced: retain svc/time/synced=false, emit transition event) M --> K - K -->|state update| N(Apply state payload synced bool) - N --> K - K -->|context done| Z + K -->|scope done| Z ``` -The retained `{'cap', 'time', , 'state', 'synced'}` payload is consumed immediately after subscription to initialise sync state before any events arrive. +All three subscriptions are created before any message is read, so no events are lost during initialisation. The retained `{'cap', 'time', , 'state', 'synced'}` payload is consumed as a one-shot read to bootstrap sync state, then the state subscription is dropped. Ongoing sync state changes are tracked exclusively through the `event/synced` and `event/unsynced` transition topics. With the new fibers alarm API, the service calls: - `alarm.set_time_source(fibers.utils.time.realtime)` on first synced state From 49cef3927a53b22077cb03f6783f9fe09139a5d2 Mon Sep 17 00:00:00 2001 From: Ryan Name Date: Fri, 20 Mar 2026 17:52:12 +0000 Subject: [PATCH 16/16] Add initial implementation of Time Driver HAL with NTP management and event handling --- docs/specs/hal/time_driver.md | 108 ++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 docs/specs/hal/time_driver.md diff --git a/docs/specs/hal/time_driver.md b/docs/specs/hal/time_driver.md new file mode 100644 index 00000000..2c1410de --- /dev/null +++ b/docs/specs/hal/time_driver.md @@ -0,0 +1,108 @@ +# Time Driver (HAL) + +## Description + +The time driver is a HAL component that manages the system NTP daemon (`sysntpd`) and exposes a time source capability to the rest of the system. It is responsible for all direct OS interaction related to time synchronisation: restarting the daemon, monitoring kernel hotplug NTP events via ubus, and publishing the current sync state and accuracy metadata as a capability on the bus. + +The time driver acts as both a driver and a time manager — it owns the full lifecycle of `sysntpd` within the running session. + +## Dependencies + +- **ubus capability** (`{'cap', 'ubus', '1', ...}`): required to listen for `hotplug.ntp` kernel events. The driver waits for the ubus capability to become available before starting NTP. + +## Initialisation + +On startup (once the ubus capability is available): + +1. Restart `sysntpd` via `exec.command("/etc/init.d/sysntpd", "restart"):run()`. +2. Register a ubus listener for `hotplug.ntp` events. +3. Publish initial capability `meta` and `state` (initially `synced = false`). + +## Capability + +The driver publishes a single time source capability. The capability id is a UUID generated at startup. In the future, additional time source capabilities may exist alongside this one. + +### Meta (retained) + +Topic: `{'cap', 'time', , 'meta'}` + +```lua +{ + provider = 'hal', + source = 'ntp', -- time source type + version = 1, -- interface version + accuracy_seconds = , -- estimated absolute error in seconds +} +``` + +`accuracy_seconds` is a coarse estimate of absolute clock error and is derived from NTP stratum. Lower values are better. It is `nil` when unsynced (stratum >= 16) or before first sync-quality data is available. + +### State (retained) + +Topic: `{'cap', 'time', , 'state'}` + +```lua +{ + synced = true | false, + stratum = , -- last reported stratum, nil before first event + accuracy_seconds = , +} +``` + +Published on every sync/unsync transition. Retained so new subscribers immediately get the current state. + +### Events (non-retained) + +#### synced + +Topic: `{'cap', 'time', , 'event', 'synced'}` + +Fired when the NTP daemon transitions from unsynced to synced (stratum < 16). Payload: + +```lua +{ stratum = , accuracy_seconds = } +``` + +#### unsynced + +Topic: `{'cap', 'time', , 'event', 'unsynced'}` + +Fired when the NTP daemon transitions from synced to unsynced (stratum == 16 or daemon restarts). Payload: + +```lua +{ stratum = 16, accuracy_seconds = nil } +``` + +These events are non-retained — consumers that need current state should read `{'cap', 'time', '1', 'state'}` first, then subscribe to events for transitions. + +## Service Flow + +```mermaid +flowchart TD + St[Start] --> A(Install alarm handler) + A --> B(Wait for ubus capability) + B -->|ubus cap available| C(Restart sysntpd) + C --> D{sysntpd restart ok?} + D -->|error| E[Log error and stop] + D -->|ok| F(Register ubus listener for hotplug.ntp) + F --> G{Listener registered ok?} + G -->|error| H[Log error and stop] + G -->|ok| I(Publish meta + initial state: synced=false) + I --> J{Wait for hotplug.ntp event or stream closed or context done} + J -->|hotplug.ntp event| K{stratum < 16?} + K -->|yes, was unsynced| L(Publish state synced=true + event/synced) + K -->|no, was synced| M(Publish state synced=false + event/unsynced) + L --> J + M --> J + K -->|no change| J + J -->|stream closed| N[Log warning, stop] + J -->|context done| O(Send stop_stream to ubus) +``` + +## Architecture + +- The driver runs a single main fiber that handles the full lifecycle. No child scope is needed. +- Sync and unsync events are only published on **transitions** — the driver tracks the previous sync state and only emits an event when it changes. +- The retained `state` topic is always updated on any hotplug event regardless of transition, to refresh the stratum value. +- A `finally` block logs the reason for shutdown and performs cleanup (stop_stream if still active). +- The ubus `stream_id` must be stopped cleanly on context cancellation to avoid leaking listener registrations in the ubus driver.