diff --git a/docs/specs/metrics.md b/docs/specs/metrics.md new file mode 100644 index 00000000..86bbf175 --- /dev/null +++ b/docs/specs/metrics.md @@ -0,0 +1,372 @@ +# Metrics Service + +## Description + +The metrics service listens to the bus for metrics, applies processing to these metrics and publishes the result to the cloud. + +## Config + +The metrics service takes a selection of configs, validates them and then applies all parts of the config that are valid. If globally required parts of the config are not valid then the whole service fails (publish_period, all pipelines fail). Any pipeline that uses a failed template also fails. A missing cloud URL means any HTTP protocol metrics also fail and the HTTP publisher cannot be started. + +`publish_period`: The period for which metrics are cached before publishing to the cloud, recorded in seconds + +`cloud_url`: The url used to publish http metrics to + +`templates`: a k-v set of metric pipelines that can be reused in the pipelines section, good for different metrics that use the same pipeline without repetition + +`pipelines`: a k-v set of metric pipelines (`name` to pipeline) +- `name`: The name of the metric e.g. `rx_bytes` + +A pipeline is defined as: + +- `template`: An optional argument for using a template (can be modified with further arguments) +- `protocol`: The protocol to be used for publication, either log or http +- `process`: A list of processing blocks to be used on the metric +- - `type`: The process block name +- - additional fields are the arguments for that block (flat, not nested under an `args` key) + +An example config + +```json +"metrics": { + "publish_period": 60, + "cloud_url": "cloud.com", + "templates": { + "network_stat": { + "protocol": "http", + "process": [ + { + "type": "DiffTrigger", + "diff_method": "percent", + "threshold": 5, + "initial_val": 0 + }, + { + "type": "DeltaValue" + } + ] + } + }, + "pipelines": { + "rx_bytes": { + "template": "network_stat" + }, + "sim": { + "protocol": "log", + "process": [ + { + "type": "DiffTrigger", + "diff_method": "any-change" + } + ] + } + } +} +``` + +## Metrics over the Bus + +Metrics received over the bus will need to be in a specific format to be processed. The metric payload must include a `value` field and may also contain a `namespace` field. The namespace is a list of string or integer tokens which denotes the full senml key of that metric. Without a namespace, the metric key will be derived from the bus topic path. + +The bus topic path determines which pipeline config is applied to the metric. The topic is an array of strings/integers that form the metric's endpoint identifier. The exact topic structure depends on the bus namespace conventions used (see runtime model documentation for topic organization). + +An example bus message for a simple metric: + +```lua +{ + topic = { , }, -- e.g., { 'obs', 'v1', 'network', 'metric', 'rx_bytes' } + payload = { + value = 50 + } +} +``` + +Or a metric with a namespace override: + +```lua +{ + topic = { , }, -- e.g., { 'obs', 'v1', 'modem', 'metric', 'sim' } + payload = { + value = 'present', + namespace = { 'modem', 1 } -- overrides the senml key structure + } +} +``` + +To make creation and validation of a metric easier the service will provide an sdk which will include a types module; with the types module the user can create a metric type which will validate the arguments provided and return a metric type or error. The service can accept metrics types or tables which it will validate by trying to cast to a metric type. + +## Configuration & Validation + +The metrics service validates all configuration on receipt and handles invalid configs gracefully: + +### Validation Rules + +- `publish_period`: Must be a positive number (seconds). Invalid values cause service to reject entire config. +- `cloud_url`: Must be a string. Missing URL prevents HTTP protocol from functioning. +- `templates`: Each template must have valid `protocol` and optional `process` array. Invalid templates are dropped. +- `pipelines`: Each pipeline must specify a valid `protocol`. Pipelines referencing dropped templates are also dropped. +- `protocol`: Must be one of: `http`, `log`, or `bus`. Invalid protocols cause that pipeline to be dropped. +- `process`: Must be an array of process block tables, each with a `type` field; additional fields are the block's arguments. + +### Warning Handling + +When invalid configurations are detected: + +1. **Template Failures**: Invalid templates are removed from the config +2. **Dependent Metrics**: Any pipeline referencing a failed template is also removed +3. **Protocol Errors**: Pipelines with invalid protocols are dropped +4. **Graceful Degradation**: Service continues with all valid pipelines; only fails if no valid pipelines remain or `publish_period` is invalid + +All dropped metrics and templates are logged with detailed warning messages including: +- Specific validation errors for each dropped item +- Summary of total dropped metrics and templates +- List of affected pipeline names + +This allows partial config updates to succeed even if some pipelines are misconfigured. + +### Mainflux Config Normalization + +The mainflux configuration supports both legacy and current naming conventions: +- `mainflux_id` or `thing_id` (normalized to `thing_id`) +- `mainflux_key` or `thing_key` (normalized to `thing_key`) +- `mainflux_channels` or `channels` (normalized to `channels`) + +Channel metadata is automatically injected by scanning channel names: +- Channels containing "data" get `metadata.channel_type = "data"` +- Channels containing "control" get `metadata.channel_type = "events"` + +The metrics service merges the mainflux config with the metrics `cloud_url` to build the final HTTP publication config. + +## Publication of metrics + +Once the publish period has triggered the cache will be purged and all k-v pairs will be converted to a senml representation with correct timestamping. There will be 3 possible protocol for sending of the metrics. + +### Log + +This method will iterate over the list of senml items and log them using the logging module with info mode (`log.info(key, value)`). + +### HTTP + +This method will send the senml list to the cloud by building up a cloud config by merging the cloud_url with the mainflux config. The mainflux config is retrieved by loading it from HAL via the `filesystem` capability with the mainflux.cfg file name, this is then decoded from JSON to a Lua table and then normalised (`mainflux_id` -> `thing_id`, `mainflux_key` -> `thing_key`). The mainflux config is of the structure: + +```json +{ + "thing_id": "f47ac10b-58cc-4372-a567-0e02b2c3d479", + "thing_key": "550e8400-e29b-41d4-a716-446655440000", + "channels": [ + { + "id": "6ba7b810-9dad-11d1-80b4-00c04fd430c8", + "name": "f47ac10b-58cc-4372-a567-0e02b2c3d479_data", + "metadata": { + "channel_type": "data" + } + }, + { + "id": "6ba7b811-9dad-11d1-80b4-00c04fd430c8", + "name": "f47ac10b-58cc-4372-a567-0e02b2c3d479_control", + "metadata": { + "channel_type": "control" + } + } + ], + "content": "{\"hawkbit\":{\"hawkbit_key\": \"examplehawkbitkey0123456789abcdef\",\"hawkbit_url\": \"https://example.com/hawkbit\"},\"serial\": \"EX1234567890ABCD\",\"networks\":{\"networks\":[{\"name\": \"example-network\",\"ssid\": \"Example WiFi-aB1xY\",\"password\": \"examplepassword123\"}]}}" +} +``` +The resulting merger should be formatted into the structure: + +```lua +{ + url = "cloud.com", + data_id = "6ba7b810-9dad-11d1-80b4-00c04fd430c8", + thing_key = "550e8400-e29b-41d4-a716-446655440000" +} +``` + +The cloud URI is composed as `cloud.url` + `/http/channels/` + `cloud.data_id` + `/messages` + +The authentication header is composed as `Thing ` + `cloud.thing_key` + +The body is a json encoding of the senml list + +As HTTP publications can fail due to lack of backhaul, boot-time metrics are very important, and publications can cause long blocking time on the fiber they are running on, so we run the HTTP sending in a separate fiber with a queue between the main metrics service fiber and the HTTP fiber. + +**Queue Behavior**: +- Fixed capacity: **10 items** +- On queue full: New metrics are **dropped silently** with error log +- No retry mechanism for queue-full conditions (older metrics are preserved) + +**Network Failure Retry**: +- When HTTP send fails (network unavailable), the fiber implements exponential backoff +- Sleep duration doubles on each failure: 1s → 2s → 4s → 8s → ... → max 60s +- Retries indefinitely until successful or service shutdown +- Queue-full and network-failure are handled separately: only network failures trigger retry + +### Bus + +For testability a bus protocol should be provided which will publish the metrics under the topic +`svc/metrics/` for example `svc/metrics/modem/1/sim`. The payload will just be the value and timestamp of the metric. + +## Processing + +Each metric goes through a processing pipeline which can be composed of a variable amount of process blocks or none at all. + +### Pipeline Architecture + +The metrics service uses a **shared pipeline with per-endpoint state** model: +- Each configured pipeline (from config) is instantiated **once** as a processing pipeline object +- Each unique metric endpoint (topic path) gets its own **state table** +- The same pipeline logic processes metrics from different endpoints using different state tables +- This allows pipeline code reuse while maintaining isolated state per endpoint + +**State Isolation**: A metric published to namespace `network.wan.rx_bytes` and another to namespace `network.lan.rx_bytes` use the same pipeline logic but maintain separate DiffTrigger/DeltaValue state, preventing cross-endpoint interference. + +All process blocks and the pipeline itself have an interface to reset state when a value has been published. The pipeline and blocks are logic and config only; they require a state table as input for each invocation. + +### ProcessPipeline + +The process pipeline contains all processing blocks and runs them sequentially until either: +1. A block returns a short-circuit flag (stops processing early) +2. All blocks complete successfully + +The pipeline returns: `(value, short_circuit_flag, error)` + +**Short-Circuit Behavior**: When a processing block returns `short_circuit = true`, the pipeline immediately stops processing and the metric is **not stored or published**. This is intentional control flow (e.g., DiffTrigger suppressing unchanged values), not an error condition. + +**Reset Behavior**: The pipeline only resets individual block state if a complete run was achieved (no short-circuit). This ensures state is only updated when metrics are actually published. + +**Metric Storage Structure**: Successfully processed metrics are stored in a nested structure: `metric_values[protocol][endpoint] = {value, time}`, allowing the same endpoint to publish via multiple protocols (e.g., both "http" and "log"). + +### DiffTrigger + +The difference trigger is a block which only outputs a value if the difference between the last output value and the current value hits a threshold. There are 3 different types of threshold: + +- `percent`: Only triggers if the current value is a percentage difference of the last output value +- `absolute`: Only triggers if the current value is an absolute difference of the last output value +- `any-change`: Triggers if the current value does not match the last output value + +**Arguments** (fields on the process block table, alongside `type`) + +- `diff_method`: This can either be `percent`, `absolute` or `any-change` +- `initial_val`: This is an optional argument to set the previous value before any value has been set. If this value is not set then the block will always output the first value received +- `threshold`: This argument is only for `percent` and `absolute` types, the threshold that must be hit to trigger an output + +**Behavior Details** + +- **First Value**: The first value received always publishes (triggers) regardless of threshold, unless `initial_val` is specified +- **Reset**: DiffTrigger state is **not affected by reset** - it maintains `last_val` across publish cycles +- **Short-Circuit**: When threshold is not met, returns `short_circuit = true` to prevent publishing + +### TimeTrigger + +The time trigger outputs a value only once a certain time period has elapsed since the last value output. Uses monotonic time to ensure immunity to system clock changes. + +**Arguments** (fields on the process block table, alongside `type`) +- `duration`: the time between value outputs (seconds) + +**Behavior Details** + +- **Timing**: Timer is reset after each successful output, ensuring periodic publishing every `duration` seconds +- **Reset**: TimeTrigger state is **not affected by reset** - timeout persists across publish cycles +- **Short-Circuit**: When timeout has not expired, returns `short_circuit = true` to suppress output + +### DeltaValue + +The delta value block always outputs the difference between the last published value and the current value. This is useful for converting cumulative counters into per-period deltas. + +**Arguments** + +- `initial_val`: An optional initial value to compare the first values to before the first publish, if this is not set the default is 0 + +**Behavior Details** + +- **Calculation**: Returns `current_value - last_published_value` +- **Reset**: On reset, sets `last_val = current_val` (resets to the current value, not zero) +- **Reset Trigger**: Only resets after a successful publish cycle (when pipeline completes without short-circuit) +- **Always Publishes**: Does not short-circuit; always returns a delta value + +## Timestamping + +Each metric must have an accurate time stamp. As the time at boot will be incorrect until the time service has done a ntp time sync, the metrics service must hold metrics with a relative timestamp which can be converted into a real timestamp once both time has been synced and the publish cache is ready to publish (via the publish period). + +### NTP Sync Dependency + +The metrics service **blocks all publishing** until NTP time synchronization is confirmed: + +- Service subscribes to `{'svc', 'time', 'synced'}` bus topic +- When `synced = true` is received: + - **First sync**: Calculates base time offset and schedules first publish + - **Subsequent syncs**: Continues using existing offset +- When `synced = false` is received: + - Publishing is suspended indefinitely + - Publish timer is disabled (`next_publish_time = infinity`) + - Metrics continue to be collected but not published + +### Time Conversion + +We can convert between relative and real time by using monotonic time stamps on the metrics and holding the monotonic time that the ntp sync was done as well as the real time that the sync was done. + +**Base Time Calculation** (on first NTP sync): +- `base_real = current_realtime - (current_monotime - initial_monotime)` +- This calculates what the real time was at the initial monotonic reference point + +**Metric Timestamp Conversion** (at publish time): +- `metric_timestamp_ms = floor(base_real + (metric_monotime - base_monotime)) * 1000` +- This converts the metric's monotonic timestamp to real time in milliseconds + +**Important**: If NTP sync is lost and regained, the original base time offset is preserved rather than recalculated. This ensures timestamp consistency across sync interruptions. + +## Testing + +Current automated coverage is in `tests/test_metrics.lua`. + +Run command: + +```bash +cd tests && luajit test_metrics.lua +``` + +Covered test groups: + +- **Processing blocks (`TestProcessing`)** + - `DiffTrigger` behavior for `absolute`, `percent`, and `any-change` + - `DeltaValue` behavior and reset semantics + - `ProcessPipeline` sequencing, reset, and short-circuit behavior + +- **Config module (`TestConfig`)** + - `validate_http_config` valid + invalid inputs + - `merge_config` recursive merge behavior + - `apply_config` builds valid pipeline maps + - `validate_config` hard rejection (`publish_period <= 0`) + - `validate_config` warning path for invalid protocol + - invalid template propagation to dependent pipeline warnings + +- **SenML encoder (`TestSenML`)** + - encode number/string/boolean/time records + - reject unsupported value types + - `encode_r` flattening behavior for nested keys + +- **HTTP publisher module (`TestHttpModule`)** + - `start_http_publisher` builds expected HTTP request shape + - verifies method/header/body composition via mocked `http.request` + +- **Service-level behavior (`TestMetricsService`)** + - bus protocol publish end-to-end + - namespace override routing/topic mapping + - unknown metrics are dropped + - `DiffTrigger` suppresses unchanged values + - `DeltaValue` emits per-period deltas + - config update replaces active pipelines + - per-endpoint state isolation across namespaces + - HTTP protocol pipeline enqueues expected Mainflux payload + +Notes: + +- Service tests use virtual time helpers from `tests/test_utils/virtual_time.lua` and `tests/test_utils/time_harness.lua`. +- Service tests subscribe to `{'svc','metrics','#'}` and intentionally filter out `status` lifecycle messages when asserting metric payload publications. + + + + + + + diff --git a/src/configs/config.json b/src/configs/config.json index 1d0ff0f9..bc4ba439 100644 --- a/src/configs/config.json +++ b/src/configs/config.json @@ -36,10 +36,10 @@ }, "metrics": { "cloud_url": "$CLOUD_URL", - "publish_period": 10, + "publish_period": 60, "templates": { "network_stat": { - "protocol": "log", + "protocol": "http", "process": [ { "type": "DiffTrigger", @@ -53,7 +53,7 @@ ] }, "any_change": { - "protocol": "log", + "protocol": "http", "process": [ { "type": "DiffTrigger", @@ -62,7 +62,7 @@ ] }, "percent_5": { - "protocol": "log", + "protocol": "http", "process": [ { "type": "DiffTrigger", @@ -72,7 +72,7 @@ ] }, "absolute_10": { - "protocol": "log", + "protocol": "http", "process": [ { "type": "DiffTrigger", @@ -150,7 +150,7 @@ "template": "any_change" }, "boot_time": { - "protocol": "log", + "protocol": "http", "process": [ { "type": "DiffTrigger", @@ -175,7 +175,7 @@ "template": "percent_5" }, "temperature": { - "protocol": "log", + "protocol": "http", "process": [ { "type": "DiffTrigger", @@ -185,7 +185,7 @@ ] }, "alloc": { - "protocol": "log", + "protocol": "http", "process": [ { "type": "TimeTrigger", @@ -194,7 +194,7 @@ ] }, "internal": { - "protocol": "log", + "protocol": "http", "process": [ { "type": "DiffTrigger", @@ -303,11 +303,11 @@ "template": "any_change" }, "session_start": { - "protocol": "log", + "protocol": "http", "process": [] }, "session_end": { - "protocol": "log", + "protocol": "http", "process": [] }, "hostname": { @@ -317,7 +317,7 @@ "template": "percent_5" }, "power": { - "protocol": "log", + "protocol": "http", "process": [ { "type": "DiffTrigger", @@ -331,7 +331,7 @@ "template": "any_change" }, "noise": { - "protocol": "log", + "protocol": "http", "process": [ { "type": "DiffTrigger", diff --git a/src/services/metrics.lua b/src/services/metrics.lua new file mode 100644 index 00000000..4ac59f8f --- /dev/null +++ b/src/services/metrics.lua @@ -0,0 +1,601 @@ +-- services/metrics.lua +-- +-- Metrics service: +-- - subscribes to {'obs', 'v1', '+', 'metric', '+'} for all observable metrics +-- - applies per-pipeline processing (DiffTrigger, DeltaValue, etc.) +-- - maintains per-endpoint processing state (shared pipeline logic, isolated state) +-- - periodically publishes accumulated metrics via http, log, or bus protocol +-- - fetches Mainflux cloud credentials from the HAL filesystem capability +-- +-- Topics consumed: +-- {'obs', 'v1', '+', 'metric', '+'} - incoming metric values +-- {'cfg', 'metrics'} - metrics config (retained) +-- {'svc', 'time', 'synced'} - NTP sync status (retained) +-- {'cap', 'fs', 'configs', 'state'} - HAL filesystem capability readiness +-- +-- Topics produced: +-- {'svc', 'metrics', 'status'} - service lifecycle status (retained) +-- {'svc', 'metrics', ...} - per-metric bus publications (bus protocol) + +local fibers = require 'fibers' +local op = require 'fibers.op' +local sleep = require 'fibers.sleep' +local runtime = require 'fibers.runtime' +local time = require 'fibers.utils.time' +local perform = fibers.perform + +local json = require 'cjson.safe' +local log = require 'services.log' +local external_types = require 'services.hal.types.external' + +local senml = require 'services.metrics.senml' +local http_m = require 'services.metrics.http' +local conf = require 'services.metrics.config' +local types = require 'services.metrics.types' + + +local unpack = unpack or rawget(table, 'unpack') + +local NAME = 'metrics' + +------------------------------------------------------------------------------- +-- Topic helpers +------------------------------------------------------------------------------- + +---@param name string +---@return table +local function t_svc_status(name) return { 'svc', name, 'status' } end + +---@param service string +---@param name string +---@return table +local function t_obs_metric(service, name) return { 'obs', 'v1', service, 'metric', name } end + +---@param name string +---@return table +local function t_cfg(name) return { 'cfg', name } end + +---@return table +local function t_time_ntp_synced() return { 'svc', 'time', 'synced' } end + +---@return table +local function t_cap_fs_state() return { 'cap', 'fs', 'configs', 'state' } end + +---@param method string +---@return table +local function t_cap_fs_rpc(method) return { 'cap', 'fs', 'configs', 'rpc', method } end + +---@param tokens table +---@return table +local function t_svc_metrics_bus(tokens) return { 'svc', 'metrics', unpack(tokens) } end + +---@return number +local function now() return runtime.now() end + +---@return number +local function now_real() return time.realtime() end + +---@param conn Connection +---@param name string +---@param state string +---@param extra table? +local function publish_status(conn, name, state, extra) + local payload = { state = state, ts = now() } + if type(extra) == 'table' then + for k, v in pairs(extra) do payload[k] = v end + end + conn:retain(t_svc_status(name), payload) +end + +------------------------------------------------------------------------------- +-- Metric helpers +------------------------------------------------------------------------------- + +--- Validate a topic array (no gaps, no nils, at least one element). +---@param topic any +---@return boolean +local function validate_topic(topic) + if type(topic) ~= 'table' then return false end + local count = 0 + for k in pairs(topic) do + if type(k) ~= 'number' or k < 1 or k ~= math.floor(k) then return false end + count = count + 1 + end + if count == 0 then return false end + for i = 1, count do + if topic[i] == nil then return false end + end + return true +end + +--- Shift per-endpoint metric timestamps from monotonic to real-time milliseconds. +--- base_time = { real = wall_clock_at_mono_base, mono = mono_at_base } +---@param base_time BaseTime +---@param metrics table +---@return table +local function set_timestamps_realtime_millis(base_time, metrics) + for _, metric in pairs(metrics) do + metric.time = math.floor((base_time.real + (metric.time - base_time.mono)) * 1000) + end + return metrics +end + +------------------------------------------------------------------------------- +-- Config warnings (pure: no service state) +------------------------------------------------------------------------------- + +--- Log config warnings and prune invalid entries from the raw config in-place. +---@param warns table +---@param config table +local function process_config_warnings(warns, config) + if #warns == 0 then return end + + local warn_msgs = {} + local dropped_metrics = {} + local dropped_templates = {} + + for _, warn in ipairs(warns) do + table.insert(warn_msgs, warn.msg) + if warn.endpoint then + if warn.type == 'metric' then + config.pipelines[warn.endpoint] = nil + dropped_metrics[warn.endpoint] = true + elseif warn.type == 'template' then + if config.templates then + config.templates[warn.endpoint] = nil + end + dropped_templates[warn.endpoint] = true + end + end + end + + local summary_parts = {} + + local dm_list = {} + for ep in pairs(dropped_metrics) do dm_list[#dm_list + 1] = ep end + if #dm_list > 0 then + table.insert(summary_parts, string.format( + 'Dropped %d metric(s): %s', #dm_list, table.concat(dm_list, ', '))) + end + + local dt_list = {} + for ep in pairs(dropped_templates) do dt_list[#dt_list + 1] = ep end + if #dt_list > 0 then + table.insert(summary_parts, string.format( + 'Dropped %d template(s): %s', #dt_list, table.concat(dt_list, ', '))) + end + + log.warn(string.format( + 'metrics: config warnings (invalid entries will be dropped):\n\t%s\n\nSummary: %s', + table.concat(warn_msgs, '\n\t'), + table.concat(summary_parts, '; '))) +end + +------------------------------------------------------------------------------- +-- Service state +------------------------------------------------------------------------------- + +---@type ServiceState +local State = { + conn = nil, + name = nil, + http_send_ch = nil, + pipelines_map = {}, + metric_states = {}, + endpoint_to_pipe = {}, + metric_values = {}, + publish_period = nil, + cloud_url = nil, + mainflux_config = nil, + cloud_config = nil, + base_time = nil, +} + +------------------------------------------------------------------------------- +-- Cloud config +------------------------------------------------------------------------------- + +local function rebuild_cloud_config() + local mf = State.mainflux_config + if not mf or not State.cloud_url or not mf.thing_key or not mf.channels then + State.cloud_config = nil + return + end + local cfg, cfg_err = types.new.CloudConfig(State.cloud_url, mf.thing_key, mf.channels) + if not cfg then + log.warn('metrics: failed to build cloud config: ' .. tostring(cfg_err)) + State.cloud_config = nil + return + end + State.cloud_config = cfg + log.info('metrics: cloud config ready') +end + +local function fetch_mainflux_config() + local read_opts, opts_err = external_types.new.FilesystemReadOpts('mainflux.cfg') + if not read_opts then + log.warn('metrics: failed to build mainflux.cfg read opts:', tostring(opts_err)) + return + end + + local reply, err = State.conn:call(t_cap_fs_rpc('read'), read_opts) + if not reply then + log.warn('metrics: failed to read mainflux.cfg:', tostring(err)) + return + end + if reply.ok ~= true then + log.warn('metrics: mainflux.cfg read failed:', tostring(reply.reason)) + return + end + + local raw, decode_err = json.decode(reply.reason) + if not raw then + log.warn('metrics: failed to decode mainflux.cfg:', tostring(decode_err)) + return + end + + State.mainflux_config = conf.standardise_config(raw) + log.info('metrics: mainflux config loaded successfully') + rebuild_cloud_config() +end + +------------------------------------------------------------------------------- +-- Protocol publish handlers +------------------------------------------------------------------------------- + +---@param data table +local function bus_publish(data) + for endpoint_str, metric in pairs(data) do + local tokens = {} + for part in endpoint_str:gmatch('[^.]+') do + tokens[#tokens + 1] = part + end + State.conn:publish(t_svc_metrics_bus(tokens), { value = metric.value, time = metric.time }) + end +end + +---@param data table +local function log_publish(data) + for endpoint_str, metric in pairs(data) do + log.info(string.format('metrics: %s = %s (t=%s)', + endpoint_str, tostring(metric.value), tostring(metric.time))) + end +end + +---@param data table +local function http_publish(data) + local senml_list, encode_err = senml.encode_r('', data) + if encode_err then + log.error('metrics: SenML encode failed: ' .. tostring(encode_err)) + return + end + if #senml_list == 0 then return end + + local body = json.encode(senml_list) + + local valid, config_err = conf.validate_http_config(State.cloud_config) + if not valid then + log.error('metrics: HTTP publish skipped, invalid cloud config: ' .. tostring(config_err)) + return + end + + local channel_id + for _, ch in ipairs(State.cloud_config.channels) do + if ch.metadata and ch.metadata.channel_type == 'data' then + channel_id = ch.id + break + end + end + if channel_id == nil then + log.error('metrics: HTTP publish failed, no data channel id found') + return + end + + local uri = string.format('%s/http/channels/%s/messages', + State.cloud_config.url, channel_id) + local auth = 'Thing ' .. State.cloud_config.thing_key + + -- Non-blocking enqueue: drop and log if the channel is at capacity. + local full = perform(State.http_send_ch:put_op({ uri = uri, auth = auth, body = body }) + :or_else(function() return true end)) + + if full then + log.error('metrics: HTTP send queue full, dropping publish payload') + end +end + +local publish_fns = { bus = bus_publish, log = log_publish, http = http_publish } + +---@param values table> +local function publish_all(values) + for protocol, pv in pairs(values) do + -- Reset per-endpoint pipeline states for published endpoints. + for endpoint_str, _ in pairs(pv) do + local metric_name = State.endpoint_to_pipe[endpoint_str] + if metric_name then + local pipe_cfg = State.pipelines_map[metric_name] + if pipe_cfg and State.metric_states[endpoint_str] then + pipe_cfg.pipeline:reset(State.metric_states[endpoint_str]) + end + end + end + + pv = set_timestamps_realtime_millis(State.base_time, pv) + + local fn = publish_fns[protocol] + if fn == nil then + log.error('metrics: no publish function for protocol: ' .. tostring(protocol)) + else + fn(pv) + end + end +end + +------------------------------------------------------------------------------- +-- Metric handling +------------------------------------------------------------------------------- + +---@param msg Message? +local function handle_metric(msg) + if not msg then return end + + -- Topic layout: {'obs', 'v1', , 'metric', } + local metric_name = msg.topic and msg.topic[5] + if not metric_name then return end + + local pipe_cfg = State.pipelines_map[metric_name] + if not pipe_cfg then return end -- no matching pipeline, drop silently + + local payload = msg.payload + if type(payload) ~= 'table' then return end + + local value = payload.value + if value == nil then return end + + -- Optional namespace overrides the topic used as the SenML name and state key. + local topic = payload.namespace or msg.topic + if not validate_topic(topic) then + log.warn('metrics: received metric with invalid topic array, skipping') + return + end + + local endpoint_str = table.concat(topic, '.') + + -- Get-or-create per-endpoint processing state. + if not State.metric_states[endpoint_str] then + State.metric_states[endpoint_str] = pipe_cfg.pipeline:new_state() + State.endpoint_to_pipe[endpoint_str] = metric_name + end + + local ret, short, err = pipe_cfg.pipeline:run(value, State.metric_states[endpoint_str]) + if err then + log.error(string.format( + 'metrics: pipeline error for [%s]: %s', endpoint_str, tostring(err))) + return + end + + if not short then + State.metric_values[pipe_cfg.protocol] = State.metric_values[pipe_cfg.protocol] or {} + State.metric_values[pipe_cfg.protocol][endpoint_str] = types.new.MetricSample(ret, now()) + end +end + +------------------------------------------------------------------------------- +-- Config handling +------------------------------------------------------------------------------- + +---@param payload table? +---@return number next_publish_time +local function handle_config(payload) + if not payload then return math.huge end + + local valid, warns, err = conf.validate_config(payload) + if not valid then + log.error('metrics: invalid config received: ' .. tostring(err)) + return math.huge + end + + process_config_warnings(warns, payload) + + local new_pipelines_map, new_publish_period = conf.apply_config(payload) + + if next(new_pipelines_map) == nil then + log.warn('metrics: no valid pipelines after config apply; service will be idle') + end + + -- Cache cloud_url from the metrics config and rebuild cloud_config. + State.cloud_url = payload.cloud_url + rebuild_cloud_config() + + -- Replace all pipeline state (logic may have changed). + State.pipelines_map = new_pipelines_map + State.metric_states = {} + State.endpoint_to_pipe = {} + State.publish_period = new_publish_period + + if State.base_time.synced and State.publish_period then + return now() + State.publish_period + end + return math.huge +end + +---@param synced boolean +---@return boolean first_sync +local function handle_time_sync(synced) + if synced == true then + if not State.base_time.synced then + State.base_time.synced = true + local real = now_real() + local mono = now() + -- Compute the wall-clock time that corresponds to base_time.mono. + State.base_time.real = real - (mono - State.base_time.mono) + return true -- first sync + end + else + State.base_time.synced = false + end + return false +end + +------------------------------------------------------------------------------- +-- Main loop +------------------------------------------------------------------------------- + +---@return boolean ok +local function wait_for_fs_capability() + local sub = State.conn:subscribe( + t_cap_fs_state(), + { queue_len = 10, full = 'drop_oldest' }) + + while true do + local msg, err = perform(sub:recv_op()) + if not msg then + log.warn('metrics: filesystem capability subscription closed:', tostring(err)) + sub:unsubscribe() + return false + end + if msg.payload == 'added' then + sub:unsubscribe() + return true + end + end +end + +local function main() + -- Subscribe to all observable metrics. + local obs_sub = State.conn:subscribe( + t_obs_metric('+', '+'), + { queue_len = 100, full = 'drop_oldest' }) + + -- Subscribe to the metrics config (retained; first message is current config). + local cfg_sub = State.conn:subscribe( + t_cfg(NAME), + { queue_len = 10, full = 'drop_oldest' }) + + -- Subscribe to NTP sync status. + local time_sub = State.conn:subscribe( + t_time_ntp_synced(), + { queue_len = 5, full = 'drop_oldest' }) + + local next_publish_time = math.huge + + while true do + local which, a, b = perform(op.named_choice({ + config = cfg_sub:recv_op(), + metric = obs_sub:recv_op(), + timesync = time_sub:recv_op(), + -- timesync = State.base_time.synced and op.never() or op.always({ payload = true }), + tick = sleep.sleep_until_op(next_publish_time), + })) + + + if which == 'config' then + local msg, err = a, b + if not msg then + log.warn('metrics: config subscription closed:', tostring(err)) + break + end + log.info('metrics: config received, applying') + next_publish_time = handle_config(msg.payload) + -- Re-read mainflux.cfg in case cloud_url or credentials changed. + fetch_mainflux_config() + local next_publish_str = next_publish_time == math.huge and "never" or string.format('%.1fs', (next_publish_time - now())) + log.info('metrics: config applied, next publish in ' .. next_publish_str) + elseif which == 'metric' then + local msg = a + if msg then + handle_metric(msg) + end + elseif which == 'timesync' then + local msg = a + if msg then + local first_sync = handle_time_sync(msg.payload) + if first_sync and State.publish_period then + next_publish_time = now() + State.publish_period + log.info(string.format( + 'metrics: NTP synced, first publish scheduled in %.1fs', State.publish_period)) + elseif first_sync then + log.info('metrics: NTP synced, waiting for config before scheduling publish') + elseif not State.base_time.synced then + next_publish_time = math.huge + log.warn('metrics: NTP sync lost, publishing suspended') + end + end + elseif which == 'tick' then + local values = State.metric_values + State.metric_values = {} + + if State.base_time.synced and State.publish_period then + next_publish_time = now() + State.publish_period + else + next_publish_time = math.huge + end + + local total = 0 + for _, pv in pairs(values) do + for _ in pairs(pv) do total = total + 1 end + end + if total > 0 then + log.info(string.format('metrics: publishing %d metric(s)', total)) + end + publish_all(values) + end + end + + obs_sub:unsubscribe() + cfg_sub:unsubscribe() + time_sub:unsubscribe() + log.info('metrics: service stopping') +end + +------------------------------------------------------------------------------- +-- Module entry point +------------------------------------------------------------------------------- + +local M = {} + +---@param conn Connection +---@param opts table? +function M.start(conn, opts) + opts = opts or {} + local name = opts.name or NAME + + publish_status(conn, name, 'starting') + + State.conn = conn + State.name = name + State.http_send_ch = http_m.start_http_publisher() + State.pipelines_map = {} + State.metric_states = {} + State.endpoint_to_pipe = {} + State.metric_values = {} + State.publish_period = nil + State.cloud_url = nil + State.mainflux_config = nil + State.cloud_config = nil + State.base_time = types.new.BaseTime(now_real(), now()) + + fibers.current_scope():finally(function(_, st, primary) + local reason = primary or st + log.info(('metrics: scope closed (status: %s, reason: %s)'):format(tostring(st), tostring(primary))) + publish_status(conn, name, 'stopped', reason and { reason = tostring(reason) } or nil) + end) + + log.info('metrics: waiting for filesystem capability') + local fs_ok = wait_for_fs_capability() + if not fs_ok then + publish_status(conn, name, 'error', { reason = 'filesystem capability unavailable' }) + log.error('metrics: filesystem capability unavailable, service cannot start') + return + end + + log.info('metrics: fetching mainflux config') + fetch_mainflux_config() + + publish_status(conn, name, 'running') + log.info('metrics: service is live') + + main() +end + +return M diff --git a/src/services/metrics/config.lua b/src/services/metrics/config.lua new file mode 100644 index 00000000..6363820b --- /dev/null +++ b/src/services/metrics/config.lua @@ -0,0 +1,450 @@ +-- services/metrics/config.lua +-- +-- Configuration validation and application for the metrics service. +-- +-- validate_config(config) +-- Returns (ok, warnings, error). Validates structure, protocols, process +-- blocks, templates and pipelines. Invalid templates and any pipelines that +-- reference them are collected as warnings rather than hard failures so that +-- the service can continue with the valid subset. +-- +-- apply_config(config) +-- Returns (pipelines_map, publish_period). +-- pipelines_map[metric_name] = { pipeline, protocol } +-- The pipeline object contains only logic; per-endpoint state is created +-- externally with pipeline:new_state(). + +local log = require 'services.log' +local processing = require 'services.metrics.processing' +local _types = require 'services.metrics.types' -- luacheck: ignore (imported for annotations) + +local VALID_PROTOCOLS = { http = true, log = true, bus = true } +local VALID_PROCESS_TYPES = { DiffTrigger = true, TimeTrigger = true, DeltaValue = true } + +local VALID_TEMPLATE_FIELDS = { + protocol = true, + process = true, +} +local VALID_METRIC_FIELDS = { + protocol = true, + process = true, + template = true, +} + +------------------------------------------------------------------------------- +-- Helpers +------------------------------------------------------------------------------- + +---@param t any +---@return boolean +local function is_array(t) + if type(t) ~= 'table' then return false end + local count = 0 + for k in pairs(t) do + if type(k) ~= 'number' or math.floor(k) ~= k or k < 1 then + return false + end + count = count + 1 + end + for i = 1, count do + if t[i] == nil then return false end + end + return true +end + +---@param base table? +---@param override table? +---@return table +local function merge_config(base, override) + if not base then return override or {} end + if not override then return base end + local result = {} + for k, v in pairs(base) do result[k] = v end + for k, v in pairs(override) do + if type(v) == 'table' and type(result[k]) == 'table' then + result[k] = merge_config(result[k], v) + else + result[k] = v + end + end + return result +end + +--- Normalise a raw mainflux config table to a consistent field set. +--- Accepts both legacy (`mainflux_*`) and current (`thing_*`) naming. +---@param config table +---@return table +local function standardise_config(config) + local out = {} + out.thing_key = config.mainflux_key or config.thing_key + out.channels = config.mainflux_channels or config.channels + for _, channel in ipairs(out.channels or {}) do + channel.metadata = channel.metadata or {} + if type(channel.metadata) == 'userdata' then channel.metadata = {} end + if type(channel.name) == 'string' then + if string.find(channel.name, 'data') then + channel.metadata.channel_type = 'data' + elseif string.find(channel.name, 'control') then + channel.metadata.channel_type = 'events' + end + end + end + out.content = config.content + return out +end + +--- Basic sanity-check for the cloud (Mainflux) config used for HTTP publish. +---@param config CloudConfig? +---@return boolean ok +---@return string? error +local function validate_http_config(config) + if not config then + return false, 'No cloud config set' + end + if not config.url then + return false, 'No cloud url set' + end + if type(config.url) ~= 'string' then + return false, 'Cloud url is not a string' + end + if not config.thing_key or not config.channels then + return false, 'Cloud thing_key / channels missing' + end + return true, nil +end + +------------------------------------------------------------------------------- +-- Internal validation helpers +------------------------------------------------------------------------------- + +---@param process_block any +---@param endpoint string +---@param index number +---@return string? error +local function validate_process_block(process_block, endpoint, index) + if type(process_block) ~= 'table' then + return string.format('Metric config [%s] process block %d is not a table', + tostring(endpoint), index) + end + if process_block.type == nil then + return string.format('Metric config [%s] process block %d has no type field', + tostring(endpoint), index) + end + if not VALID_PROCESS_TYPES[process_block.type] then + return string.format( + "Metric config [%s] process block %d has invalid type '%s' (valid: %s)", + tostring(endpoint), index, tostring(process_block.type), + table.concat({ 'DiffTrigger', 'TimeTrigger', 'DeltaValue' }, ', ')) + end + return nil +end + +---@param name any +---@param template_config any +---@return table warnings +local function validate_template(name, template_config) + local warnings = {} + + if type(name) ~= 'string' then + table.insert(warnings, { + msg = 'Template name is not a string', + endpoint = name, + type = 'template', + }) + end + if type(template_config) ~= 'table' then + table.insert(warnings, { + msg = string.format('Template config [%s] is not a table', tostring(name)), + endpoint = name, + type = 'template', + }) + return warnings + end + + for field in pairs(template_config) do + if not VALID_TEMPLATE_FIELDS[field] then + table.insert(warnings, { + msg = string.format("Template config [%s] has invalid field '%s'", + tostring(name), tostring(field)), + endpoint = name, + type = 'template', + }) + end + end + + if template_config.protocol and not VALID_PROTOCOLS[template_config.protocol] then + table.insert(warnings, { + msg = string.format( + "Template config [%s] has invalid protocol '%s' (valid: http, log, bus)", + tostring(name), tostring(template_config.protocol)), + endpoint = name, + type = 'template', + }) + end + + if template_config.process ~= nil then + if not is_array(template_config.process) then + table.insert(warnings, { + msg = string.format('Template config [%s] process must be an array', tostring(name)), + endpoint = name, + type = 'template', + }) + else + for i, blk in ipairs(template_config.process) do + local err = validate_process_block(blk, name, i) + if err then + table.insert(warnings, { msg = err, endpoint = name, type = 'template' }) + end + end + end + end + + return warnings +end + +---@param endpoint any +---@param metric_config any +---@return table warnings +local function validate_metric(endpoint, metric_config) + local warnings = {} + + if type(endpoint) ~= 'string' then + table.insert(warnings, { + msg = 'Metric endpoint is not a string', + endpoint = endpoint, + type = 'metric', + }) + end + if type(metric_config) ~= 'table' then + table.insert(warnings, { + msg = string.format('Metric config [%s] is not a table', tostring(endpoint)), + endpoint = endpoint, + type = 'metric', + }) + return warnings + end + + for field in pairs(metric_config) do + if not VALID_METRIC_FIELDS[field] then + table.insert(warnings, { + msg = string.format("Metric config [%s] has invalid field '%s'", + tostring(endpoint), tostring(field)), + endpoint = endpoint, + type = 'metric', + }) + end + end + + if metric_config.protocol == nil then + table.insert(warnings, { + msg = string.format('Metric config [%s] has no defined protocol', tostring(endpoint)), + endpoint = endpoint, + type = 'metric', + }) + elseif not VALID_PROTOCOLS[metric_config.protocol] then + table.insert(warnings, { + msg = string.format( + "Metric config [%s] has invalid protocol '%s' (valid: http, log, bus)", + tostring(endpoint), tostring(metric_config.protocol)), + endpoint = endpoint, + type = 'metric', + }) + end + + if metric_config.process ~= nil then + if not is_array(metric_config.process) then + table.insert(warnings, { + msg = string.format('Metric config [%s] process must be an array', tostring(endpoint)), + endpoint = endpoint, + type = 'metric', + }) + else + for i, blk in ipairs(metric_config.process) do + local err = validate_process_block(blk, endpoint, i) + if err then + table.insert(warnings, { msg = err, endpoint = endpoint, type = 'metric' }) + end + end + end + end + + return warnings +end + +------------------------------------------------------------------------------- +-- Pipeline builder +------------------------------------------------------------------------------- + +--- Build a ProcessPipeline from a process_config array. +---@param endpoint string +---@param process_config table +---@return ProcessPipeline? +---@return string? error +local function build_metric_pipeline(endpoint, process_config) + local pipeline, pipeline_err = processing.new_process_pipeline() + if not pipeline then + return nil, string.format('Metric config [%s] failed to create pipeline: %s', endpoint, tostring(pipeline_err)) + end + + if process_config == nil then + -- An empty pipeline (pass-through) is valid. + return pipeline, nil + end + + for _, blk_cfg in ipairs(process_config) do + local ptype = blk_cfg.type + if ptype == nil then + return nil, string.format('Metric config [%s] has process block with no type', endpoint) + end + + local proc_class = processing[ptype] + if proc_class == nil then + return nil, string.format('Metric config [%s] has invalid process block type [%s]', + endpoint, tostring(ptype)) + end + + local proc, proc_err = proc_class.new(blk_cfg) + if not proc or proc_err then + return nil, string.format( + 'Metric config [%s] failed to create process block [%s]: %s', + endpoint, tostring(ptype), tostring(proc_err)) + end + + local add_err = pipeline:add(proc) + if add_err then + return nil, add_err + end + end + + return pipeline, nil +end + +------------------------------------------------------------------------------- +-- Public: validate_config +------------------------------------------------------------------------------- + +--- Validate a raw metrics config table. +---@param config table +---@return boolean ok +---@return table warnings +---@return string? error +local function validate_config(config) + local warnings = {} + + if type(config) ~= 'table' then + return false, warnings, 'Invalid configuration message' + end + + if type(config.publish_period) ~= 'number' then + return false, warnings, + 'Publish period must be of number type, found ' .. type(config.publish_period) + end + if config.publish_period <= 0 then + return false, warnings, 'Publish period must be greater than 0' + end + + if type(config.pipelines) ~= 'table' then + return false, warnings, 'No metric pipelines defined in config' + end + + local dropped_templates = {} + for name, tmpl in pairs(config.templates or {}) do + local tmpl_warns = validate_template(name, tmpl) + if #tmpl_warns > 0 then + for _, w in ipairs(tmpl_warns) do + table.insert(warnings, w) + end + dropped_templates[name] = true + end + end + + for endpoint, metric_config in pairs(config.pipelines) do + -- Check template existence + if metric_config.template then + if (not config.templates) or (not config.templates[metric_config.template]) then + table.insert(warnings, { + msg = string.format( + 'Metric config [%s] uses template [%s] that does not exist', + tostring(endpoint), tostring(metric_config.template)), + endpoint = endpoint, + type = 'metric', + }) + end + if dropped_templates[metric_config.template] then + table.insert(warnings, { + msg = string.format( + 'Metric config [%s] uses invalid template [%s]', + tostring(endpoint), tostring(metric_config.template)), + endpoint = endpoint, + type = 'metric', + }) + end + end + + -- Merge template then validate the resulting config + local full_cfg = merge_config( + (config.templates and metric_config.template + and config.templates[metric_config.template]) or {}, + metric_config + ) + local metric_warns = validate_metric(endpoint, full_cfg) + for _, w in ipairs(metric_warns) do + table.insert(warnings, w) + end + end + + return true, warnings, nil +end + +------------------------------------------------------------------------------- +-- Public: apply_config +------------------------------------------------------------------------------- + +--- Apply a validated config and return a pipelines_map. +--- Does not create any bus subscriptions. +--- +---@param config table +---@return PipelineMap pipelines_map keyed by metric_name +---@return number publish_period +local function apply_config(config) + local publish_period = config.publish_period + local pipelines_map = {} + + for metric_name, metric_config in pairs(config.pipelines) do + local resolved = metric_config + if resolved.template and config.templates and config.templates[resolved.template] then + resolved = merge_config(config.templates[resolved.template], resolved) + end + + local protocol = resolved.protocol + if not protocol or not VALID_PROTOCOLS[protocol] then + log.warn(string.format( + 'metrics/config: skipping pipeline [%s] — invalid or missing protocol', + tostring(metric_name))) + else + local pipeline, pipeline_err = build_metric_pipeline( + metric_name, resolved.process or {}) + if pipeline_err then + log.error(string.format( + 'metrics/config: skipping pipeline [%s] — %s', + tostring(metric_name), pipeline_err)) + else + pipelines_map[metric_name] = { + pipeline = pipeline, + protocol = protocol, + } + end + end + end + + return pipelines_map, publish_period +end + +return { + merge_config = merge_config, + standardise_config = standardise_config, + validate_http_config = validate_http_config, + validate_config = validate_config, + apply_config = apply_config, + build_metric_pipeline = build_metric_pipeline, +} diff --git a/src/services/metrics/http.lua b/src/services/metrics/http.lua new file mode 100644 index 00000000..e02a9628 --- /dev/null +++ b/src/services/metrics/http.lua @@ -0,0 +1,98 @@ +-- services/metrics/http.lua +-- +-- HTTP publisher for the metrics service. +-- +-- Starts a dedicated fiber that drains a bounded channel of HTTP payloads and +-- sends them to the Mainflux cloud endpoint with exponential-backoff retry on +-- network failure. +-- +-- Public API: +-- start_http_publisher() -> channel +-- Must be called from inside a running fiber scope. Returns the send +-- channel (capacity QUEUE_SIZE). The caller enqueues payloads with a +-- non-blocking put; if the channel is full the payload is dropped and an +-- error is logged. + +local fibers = require 'fibers' +local op = require 'fibers.op' +local sleep = require 'fibers.sleep' +local channel = require 'fibers.channel' +local request = require 'http.request' +local log = require 'services.log' + +local QUEUE_SIZE = 10 + +--- Send a single HTTP payload to the cloud, retrying with exponential backoff +--- on network failure. Returns only when the send succeeds or the enclosing +--- scope is cancelled. +--- +---@param data table { uri: string, auth: string, body: string } +local function send_http(data) + local uri = data.uri + local body = data.body + local auth = data.auth + + local sleep_duration = 1 + local response_headers + + while not response_headers do + local req = request.new_from_uri(uri) + req.headers:upsert(':method', 'POST') + req.headers:upsert('authorization', auth) + req.headers:upsert('content-type', 'application/senml+json') + req.headers:delete('expect') + req:set_body(body) + + local headers = req:go(10) + response_headers = headers + + if not response_headers then + log.debug(string.format( + 'metrics/http: HTTP publish failed, retrying in %s seconds', + sleep_duration)) + sleep.sleep(sleep_duration) + sleep_duration = math.min(sleep_duration * 2, 60) + end + end + + local status = response_headers:get(':status') + if status ~= '202' then + local parts = {} + for k, v in response_headers:each() do + table.insert(parts, string.format('\t%s: %s', k, v)) + end + log.debug(string.format( + 'metrics/http: HTTP publish failed, response headers:\n%s', + table.concat(parts, '\n'))) + else + log.info('metrics/http: HTTP publish success, status: ' .. status) + end +end + +--- Start the HTTP publisher fiber in the current scope. +--- Returns the send channel. Payloads must be enqueued with a non-blocking +--- select (see _http_publish in metrics.lua); if the channel is full the +--- payload should be dropped by the caller. +--- +---@return table channel +local function start_http_publisher() + local send_ch = channel.new(QUEUE_SIZE) + + local scope = fibers.current_scope() + scope:spawn(function() + while true do + local which, payload = fibers.perform(op.named_choice({ + msg = send_ch:get_op(), + })) + if which == 'msg' and payload ~= nil then + send_http(payload) + end + end + end) + + return send_ch +end + +return { + start_http_publisher = start_http_publisher, +} diff --git a/src/services/metrics/processing.lua b/src/services/metrics/processing.lua new file mode 100644 index 00000000..27656f64 --- /dev/null +++ b/src/services/metrics/processing.lua @@ -0,0 +1,297 @@ +-- services/metrics/processing.lua +-- +-- Processing pipeline blocks for the metrics service. +-- +-- Each block class holds only configuration (logic). Runtime state is kept in a +-- separate table created by :new_state() and passed explicitly to :run() and +-- :reset(). This allows a single pipeline object to be shared across many +-- metric endpoints while maintaining fully isolated per-endpoint state. +-- +-- Block interface: +-- block:new_state() -> state_table +-- block:run(value, state) -> value, short_circuit, error +-- block:reset(state) -> nil +-- +-- ProcessPipeline wraps a list of blocks and exposes the same interface: +-- pipeline:new_state() -> state_table {full_run, blocks={...}} +-- pipeline:run(value, state) -> value, short_circuit, error +-- pipeline:reset(state) -> nil (only resets when full_run == true) +-- pipeline:force_reset(state) -> nil (reset unconditionally) +-- pipeline:add(block) -> error? + +local runtime = require 'fibers.runtime' + +------------------------------------------------------------------------------- +-- Base Process (documentation only; not used at runtime) +------------------------------------------------------------------------------- + +---@class Process +---@field new_state fun(self: Process): table +---@field run fun(self: Process, value: any, state: table): any, boolean, string? +---@field reset fun(self: Process, state: table) + +------------------------------------------------------------------------------- +-- DiffTrigger +------------------------------------------------------------------------------- + +--- Passes a value only when the change from the last-published value exceeds a +--- threshold. Three diff methods are supported: +--- "absolute" - abs(curr - last) >= threshold +--- "percent" - abs((curr - last) / last) * 100 >= threshold +--- "any-change" - curr ~= last +--- +---@class DiffTrigger +---@field threshold number +---@field diff_fn function +---@field config table +local DiffTrigger = {} +DiffTrigger.__index = DiffTrigger + +local function check_diff_args_valid(config) + if config.initial_val ~= nil and type(config.initial_val) ~= 'number' then + return 'Initial value must be a number' + end + if config.diff_method ~= 'any-change' and type(config.threshold) ~= 'number' then + return 'Threshold must be a number' + end +end + +---@param config table +---@return DiffTrigger? +---@return string? error +function DiffTrigger.new(config) + local valid_err = check_diff_args_valid(config) + if valid_err then return nil, valid_err end + + local self = setmetatable({}, DiffTrigger) + self.config = config + self.threshold = config.threshold + + local dm = config.diff_method + if dm == 'absolute' then + self.diff_fn = function(curr, last, threshold) + return math.abs(curr - last) >= threshold + end + elseif dm == 'percent' then + self.diff_fn = function(curr, last, threshold) + return (math.abs((curr - last) / last) * 100) >= threshold + end + elseif dm == 'any-change' then + self.diff_fn = function(curr, last) + return curr ~= last + end + else + return nil, "Diff method must be 'absolute', 'percent' or 'any-change'" + end + return self, nil +end + +---@return table +function DiffTrigger:new_state() + return { + empty = (self.config.initial_val == nil), + last_val = self.config.initial_val or 0, + curr_val = nil, + } +end + +---@param value any +---@param state table +---@return any +---@return boolean short_circuit +---@return string? error +function DiffTrigger:run(value, state) + state.curr_val = value + if state.empty or self.diff_fn(state.curr_val, state.last_val, self.threshold) then + state.last_val = value + state.empty = false + return value, false, nil + end + return nil, true, nil +end + +--- No-op: DiffTrigger does not reset last_val on publish. +---@param state table +function DiffTrigger:reset(state) end -- luacheck: ignore + +------------------------------------------------------------------------------- +-- TimeTrigger +------------------------------------------------------------------------------- + +--- Passes a value only when the elapsed time since the last pass exceeds +--- `duration` seconds. +--- +---@class TimeTrigger +---@field duration number +---@field config table +local TimeTrigger = {} +TimeTrigger.__index = TimeTrigger + +---@param config table +---@return TimeTrigger? +---@return string? error +function TimeTrigger.new(config) + if type(config.duration) ~= 'number' then + return nil, 'Duration must be a number' + end + local self = setmetatable({}, TimeTrigger) + self.duration = config.duration + self.config = config + return self, nil +end + +---@return table +function TimeTrigger:new_state() + return { timeout = runtime.now() + self.duration } +end + +---@param value any +---@param state table +---@return any +---@return boolean short_circuit +---@return string? error +function TimeTrigger:run(value, state) + if runtime.now() >= state.timeout then + state.timeout = runtime.now() + self.duration + return value, false, nil + end + return nil, true, nil +end + +---@param state table +function TimeTrigger:reset(state) end -- luacheck: ignore + +------------------------------------------------------------------------------- +-- DeltaValue +------------------------------------------------------------------------------- + +--- Replaces the raw value with the difference from the last-published value. +--- Requires numeric input. +--- +---@class DeltaValue +---@field config table +local DeltaValue = {} +DeltaValue.__index = DeltaValue + +---@param config table +---@return DeltaValue? +---@return string? error +function DeltaValue.new(config) + if config.initial_val ~= nil and type(config.initial_val) ~= 'number' then + return nil, 'Initial value must be a number' + end + local self = setmetatable({}, DeltaValue) + self.config = config + return self, nil +end + +---@return table +function DeltaValue:new_state() + return { + last_val = self.config.initial_val or 0, + curr_val = nil, + } +end + +---@param value any +---@param state table +---@return any +---@return boolean short_circuit +---@return string? error +function DeltaValue:run(value, state) + if type(value) ~= 'number' then + return nil, false, 'Value must be a number' + end + local difference = value - state.last_val + state.curr_val = value + return difference, false, nil +end + +--- On reset, advance last_val to curr_val so the next delta is computed from +--- the most-recently-published sample. +---@param state table +function DeltaValue:reset(state) + state.last_val = state.curr_val or 0 +end + +------------------------------------------------------------------------------- +-- ProcessPipeline +------------------------------------------------------------------------------- + +---@class ProcessPipeline +---@field process_blocks table +local ProcessPipeline = {} +ProcessPipeline.__index = ProcessPipeline + +---@return ProcessPipeline +local function new_process_pipeline() + return setmetatable({ process_blocks = {} }, ProcessPipeline) +end + +--- Append a processing block to the pipeline. +---@param block any +---@return string? error +function ProcessPipeline:add(block) + if block == nil then return 'processing block cannot be nil' end + table.insert(self.process_blocks, block) +end + +--- Create a fresh state table for this pipeline (and all its blocks). +---@return table +function ProcessPipeline:new_state() + local state = { full_run = false, blocks = {} } + for i, block in ipairs(self.process_blocks) do + state.blocks[i] = block:new_state() + end + return state +end + +--- Run the pipeline, passing value through each block sequentially. +--- Stops early if any block short-circuits or returns an error. +---@param value any +---@param state table +---@return any +---@return boolean short_circuit +---@return string? error +function ProcessPipeline:run(value, state) + local val = value + local short = false + local err = nil + + for i, block in ipairs(self.process_blocks) do + val, short, err = block:run(val, state.blocks[i]) + if err or short then break end + end + + if not short and not err then + state.full_run = true + end + + return val, short, err +end + +--- Reset block states, but only when the pipeline produced a published value +--- (i.e. ran to completion without short-circuiting). +---@param state table +function ProcessPipeline:reset(state) + if state.full_run then + for i, block in ipairs(self.process_blocks) do + block:reset(state.blocks[i]) + end + state.full_run = false + end +end + +--- Reset regardless of whether the pipeline produced a published value. +---@param state table +function ProcessPipeline:force_reset(state) + state.full_run = true + self:reset(state) +end + +return { + DiffTrigger = DiffTrigger, + TimeTrigger = TimeTrigger, + DeltaValue = DeltaValue, + new_process_pipeline = new_process_pipeline, +} diff --git a/src/services/metrics/senml.lua b/src/services/metrics/senml.lua new file mode 100644 index 00000000..c2d72306 --- /dev/null +++ b/src/services/metrics/senml.lua @@ -0,0 +1,83 @@ +-- services/metrics/senml.lua +-- +-- SenML (Sensor Markup Language) encoder for the metrics service. +-- Converts a nested metric-values table into a flat array of SenML objects +-- ready for JSON encoding and HTTP publication. + +--- Encode a single (name, value, time) triple as a SenML record. +---@param topic string +---@param value number|string|boolean +---@param time number? milliseconds since epoch (optional) +---@return SenMLRecord? senml_obj +---@return string? error +local function encode(topic, value, time) + if type(topic) ~= 'string' or topic == '' then + return nil, 'topic must be a non-empty string' + end + local vtype = type(value) + if vtype ~= 'number' and vtype ~= 'string' and vtype ~= 'boolean' then + return nil, 'value must be number, string or boolean, found ' .. vtype + end + + local obj = { n = topic } + + if vtype == 'number' then + obj.v = value + elseif vtype == 'string' then + obj.vs = value + elseif vtype == 'boolean' then + obj.vb = value + end + + if time and type(time) == 'number' then + obj.t = time + end + + return obj, nil +end + +--- Recursively encode a nested values table into a flat SenML array. +--- +--- Each leaf may be either: +--- {value = v, time = t} - a metric sample +--- a plain value - treated as {value = v} +--- +--- Tables without both 'value' and 'time' fields are recursed into with the +--- key appended to the topic (dot-separated). The special key '__value' does +--- not append anything to the topic. +--- +---@param base_topic string +---@param values table +---@param output table accumulator array (created automatically on top call) +---@return table? output +---@return string? error +local function encode_r(base_topic, values, output) + for k, v in pairs(values) do + local topic = base_topic + if k ~= '__value' then + if base_topic == '' then + topic = k + else + topic = topic .. '.' .. k + end + end + + if type(v) == 'table' and not (v.value and v.time) then + local _, err = encode_r(topic, v, output) + if err then return nil, err end + else + if type(v) ~= 'table' then + v = { value = v } + end + local obj, err = encode(topic, v.value, v.time) + if err then return nil, err end + table.insert(output, obj) + end + end + return output, nil +end + +return { + encode = encode, + encode_r = function(base_topic, values) return encode_r(base_topic, values, {}) end, +} diff --git a/src/services/metrics/types.lua b/src/services/metrics/types.lua new file mode 100644 index 00000000..4e13e1e6 --- /dev/null +++ b/src/services/metrics/types.lua @@ -0,0 +1,175 @@ +-- services/metrics/types.lua +-- +-- Shared type definitions and constructors for the metrics service. +-- Follows the same pattern as services/hal/types/external.lua. + +---@class TypeConstructors +local new = {} + +------------------------------------------------------------------------------- +-- BaseTime +------------------------------------------------------------------------------- + +---@class BaseTime +---@field synced boolean +---@field real number wall-clock seconds at the monotonic base +---@field mono number monotonic seconds at the base +local BaseTime = {} +BaseTime.__index = BaseTime + +---Create a new BaseTime anchored to the supplied real/monotonic pair. +---@param real number +---@param mono number +---@return BaseTime? +---@return string error +function new.BaseTime(real, mono) + if type(real) ~= 'number' then + return nil, "real must be a number" + end + if type(mono) ~= 'number' then + return nil, "mono must be a number" + end + return setmetatable({ + synced = false, + real = real, + mono = mono, + }, BaseTime), "" +end + +------------------------------------------------------------------------------- +-- CloudConfig +------------------------------------------------------------------------------- + +---@class CloudChannel +---@field id string +---@field name string +---@field metadata table + +---@class CloudConfig +---@field url string +---@field thing_key string +---@field channels CloudChannel[] +local CloudConfig = {} +CloudConfig.__index = CloudConfig + +---Create a new CloudConfig. +---@param url string +---@param thing_key string +---@param channels CloudChannel[] +---@return CloudConfig? +---@return string error +function new.CloudConfig(url, thing_key, channels) + if type(url) ~= 'string' or url == '' then + return nil, "url must be a non-empty string" + end + if type(thing_key) ~= 'string' or thing_key == '' then + return nil, "thing_key must be a non-empty string" + end + if type(channels) ~= 'table' then + return nil, "channels must be a table" + end + return setmetatable({ + url = url, + thing_key = thing_key, + channels = channels, + }, CloudConfig), "" +end + +------------------------------------------------------------------------------- +-- MetricSample (per-endpoint cache entry) +------------------------------------------------------------------------------- + +---@class MetricSample +---@field value number|string|boolean +---@field time number monotonic seconds when the value was recorded +local MetricSample = {} +MetricSample.__index = MetricSample + +---Create a new MetricSample. +---@param value number|string|boolean +---@param time number +---@return MetricSample? +---@return string error +function new.MetricSample(value, time) + local vt = type(value) + if vt ~= 'number' and vt ~= 'string' and vt ~= 'boolean' then + return nil, "value must be number, string or boolean" + end + if type(time) ~= 'number' then + return nil, "time must be a number" + end + return setmetatable({ + value = value, + time = time, + }, MetricSample), "" +end + +------------------------------------------------------------------------------- +-- SenMLRecord (single SenML object ready for JSON encoding) +------------------------------------------------------------------------------- + +---@class SenMLRecord +---@field n string SenML name +---@field v number? numeric value +---@field vs string? string value +---@field vb boolean? boolean value +---@field t number? time in milliseconds since epoch +local SenMLRecord = {} +SenMLRecord.__index = SenMLRecord + +---Create a new SenMLRecord. +---@param name string +---@param value number|string|boolean +---@param time number? milliseconds since epoch +---@return SenMLRecord? +---@return string? error +function new.SenMLRecord(name, value, time) + if type(name) ~= 'string' or name == '' then + return nil, "name must be a non-empty string" + end + local vt = type(value) + if vt ~= 'number' and vt ~= 'string' and vt ~= 'boolean' then + return nil, "value must be number, string or boolean, found " .. vt + end + if time ~= nil and type(time) ~= 'number' then + return nil, "time must be a number" + end + + local obj = setmetatable({ n = name }, SenMLRecord) + if vt == 'number' then obj.v = value end + if vt == 'string' then obj.vs = value end + if vt == 'boolean' then obj.vb = value end + if time then obj.t = time end + return obj, nil +end + +------------------------------------------------------------------------------- +-- ServiceState (the live mutable state table held in metrics.lua) +------------------------------------------------------------------------------- + +---@alias PipelineEntry { pipeline: ProcessPipeline, protocol: string } +---@alias PipelineMap table +---@alias MetricStates table +---@alias MetricValues table> + +---@class ServiceState +---@field conn Connection? nil before M.start() is called +---@field name string? nil before M.start() is called +---@field http_send_ch Channel? nil before M.start() is called +---@field pipelines_map PipelineMap +---@field metric_states MetricStates +---@field endpoint_to_pipe table +---@field metric_values MetricValues +---@field publish_period number? +---@field cloud_url string? +---@field mainflux_config table? +---@field cloud_config CloudConfig? +---@field base_time BaseTime? nil before M.start() is called + +return { + new = new, + BaseTime = BaseTime, + CloudConfig = CloudConfig, + MetricSample = MetricSample, + SenMLRecord = SenMLRecord, +} diff --git a/tests/README.md b/tests/README.md index 4358d3ca..93d95603 100644 --- a/tests/README.md +++ b/tests/README.md @@ -1 +1,12 @@ -This folder will contain tests for `devicecode` \ No newline at end of file +This folder will contain tests for `devicecode`. + +Deterministic async test helpers live in `tests/test_utils/`. + +- `virtual_time.lua` installs a test-controlled monotonic and realtime clock for fibers-based tests. +- `time_harness.lua` provides general op helpers: + - `try_op_now(op_or_factory)` for non-blocking op probes, + - `wait_op_ticks(op_or_factory, { max_ticks = ... })` for bounded scheduler-turn waits, + - `wait_op_within(clock, timeout_s, op_or_factory, ...)` for virtual-time bounded waits, + plus compatibility helpers for metrics subscriptions. + +For fibers service tests, prefer advancing virtual time and flushing a bounded number of scheduler turns over wall-clock sleeps. This keeps tests focused on logic and avoids timing-dependent flakes across different environments. diff --git a/tests/test_metrics.lua b/tests/test_metrics.lua index 99bed80c..c11ab90d 100644 --- a/tests/test_metrics.lua +++ b/tests/test_metrics.lua @@ -1,535 +1,964 @@ --- Detect if this file is being run as the entry point +-- test_metrics.lua +-- +-- Service-level tests for the metrics service. +-- +-- Each test spins up the full metrics service in a child scope, interacts with +-- it via the bus, and asserts on bus-published results. +-- +-- Run standalone: luajit test_metrics.lua + local this_file = debug.getinfo(1, "S").source:match("@?([^/]+)$") local is_entry_point = arg and arg[0] and arg[0]:match("[^/]+$") == this_file if is_entry_point then - package.path = "../src/lua-fibers/?.lua;" -- fibers submodule src - .. "../src/lua-trie/src/?.lua;" -- trie submodule src - .. "../src/lua-bus/src/?.lua;" -- bus submodule src + package.path = "../vendor/lua-fibers/src/?.lua;" + .. "../vendor/lua-trie/src/?.lua;" + .. "../vendor/lua-bus/src/?.lua;" .. "../src/?.lua;" .. "./test_utils/?.lua;" .. package.path .. ";/usr/lib/lua/?.lua;/usr/lib/lua/?/init.lua" - _G._TEST = true -- Enable test exports in source code + _G._TEST = true end local luaunit = require 'luaunit' +local fibers = require 'fibers' +local perform = fibers.perform +local op = require 'fibers.op' +local busmod = require 'bus' +local json = require 'cjson.safe' +local virtual_time = require 'virtual_time' +local time_harness = require 'time_harness' + local processing = require 'services.metrics.processing' -local conf = require 'services.metrics.config' -local sc = require "fibers.utils.syscall" -local sleep = require "fibers.sleep" -local fiber = require "fibers.fiber" -local senml = require "services.metrics.senml" +local conf = require 'services.metrics.config' +local senml = require 'services.metrics.senml' + +------------------------------------------------------------------------------- +-- Helpers +------------------------------------------------------------------------------- + +-- A sample mainflux.cfg payload returned by the mock HAL RPC. +local MAINFLUX_CFG = json.encode({ + thing_key = 'test-thing-key', + channels = { + { id = 'ch-data', name = 'test_data', metadata = { channel_type = 'data' } }, + { id = 'ch-control', name = 'test_control', metadata = { channel_type = 'control' } }, + }, +}) + +-- Build a fresh bus and test connection for each test. +local function make_bus() + return busmod.new({ q_length = 100, s_wild = '+', m_wild = '#' }) +end + +local function new_test_clock() + return virtual_time.install({ + monotonic = 0, + realtime = 1700000000, + }) +end + +local function flush_ticks(max_ticks) + time_harness.flush_ticks(max_ticks or 20) +end + +-- The tests subscribe to {'svc', 'metrics', '#'} so they can match +-- dynamically-keyed output topics such as 'svc.metrics.wan.rx_bytes'. +-- The '#' wildcard also captures the service's own lifecycle messages +-- retained at {'svc', 'metrics', 'status'} (e.g. 'starting', 'running', +-- 'stopped'). The helpers below skip those status messages so assertions +-- only see metric payload publishes. +local function recv_timeout(clock, sub, timeout_s) + local step = 0.05 + local max_ticks = 20 + local elapsed = 0 + while true do + while true do + local ok, msg = time_harness.try_op_now(function() return sub:recv_op() end) + if not ok then break end + if msg.topic[3] ~= 'status' then return msg end + end + if elapsed >= timeout_s then return nil end + local advance = math.min(step, timeout_s - elapsed) + clock:advance(advance) + time_harness.flush_ticks(max_ticks) + elapsed = elapsed + advance + end +end + +local function drain_non_status(sub, max_ticks) + max_ticks = max_ticks or 20 + local messages = {} + for tick = 1, max_ticks do + local saw_message = false + while true do + local ok, msg = time_harness.try_op_now(function() return sub:recv_op() end) + if not ok then break end + saw_message = true + if msg.topic[3] ~= 'status' then + messages[#messages + 1] = msg + end + end + if not saw_message or tick == max_ticks then break end + time_harness.flush_ticks(1) + end + return messages +end + +-- Start a mock handler for the HAL filesystem read RPC on `bus`. +-- The handler responds with `mainflux_cfg_json` to every request then loops. +-- Returns the child scope so the caller can cancel it after the test. +local function start_mock_hal(test_conn, root_scope) + local ep = test_conn:bind( + { 'cap', 'fs', 'configs', 'rpc', 'read' }, + { queue_len = 5 }) + + root_scope:spawn(function() + while true do + local req, err = perform(ep:recv_op()) + if not req then break end + -- call_op binds a reply endpoint; deliver to it with publish_one. + test_conn:publish_one(req.reply_to, { ok = true, reason = MAINFLUX_CFG }) + end + end) +end + +-- Convenience: start the metrics service in its own child scope. +-- Returns the scope so the caller can cancel/join it. +local function start_metrics(bus, root_scope, opts) + local svc_scope = root_scope:child() + svc_scope:spawn(function() + -- Each test needs a fresh require to reset module-level State. + package.loaded['services.metrics'] = nil + local metrics = require 'services.metrics' + local svc_conn = bus:connect() + metrics.start(svc_conn, opts or { name = 'metrics' }) + end) + return svc_scope +end + +-- Cancel a scope and wait for it to finish. +local function stop_scope(svc_scope) + svc_scope:cancel('test done') + perform(svc_scope:join_op()) +end + +-- A minimal valid metrics config with one bus-protocol pipeline. +local function bus_pipeline_config(metric_name, publish_period, process) + return { + publish_period = publish_period or 0.1, + pipelines = { + [metric_name] = { + protocol = 'bus', + process = process or {}, + }, + }, + } +end + +-- A minimal valid metrics config with one log-protocol pipeline. +local function log_pipeline_config(metric_name, publish_period, process) + return { + publish_period = publish_period or 0.1, + pipelines = { + [metric_name] = { + protocol = 'log', + process = process or {}, + }, + }, + } +end + +------------------------------------------------------------------------------- +-- Unit tests: processing blocks +------------------------------------------------------------------------------- TestProcessing = {} function TestProcessing:test_diff_trigger_absolute() - local config = { - threshold = 5, - diff_method = "absolute", - initial_val = 10 - } + local trigger = processing.DiffTrigger.new({ + threshold = 5, + diff_method = 'absolute', + initial_val = 10, + }) + local state = trigger:new_state() - local trigger, trig_err = processing.DiffTrigger.new(config) - luaunit.assertNil(trig_err) - local val, short, err = trigger:run(12) + local val, short, err = trigger:run(12, state) luaunit.assertNil(err) - luaunit.assertEquals(short, true) -- Should short circuit as diff < threshold + luaunit.assertTrue(short) -- diff = 2 < threshold 5, short-circuits - val, short, err = trigger:run(16) + val, short, err = trigger:run(16, state) luaunit.assertNil(err) - luaunit.assertEquals(short, false) -- Should pass as diff >= threshold + luaunit.assertFalse(short) -- diff = 6 >= threshold 5 luaunit.assertEquals(val, 16) end function TestProcessing:test_diff_trigger_percent() - local config = { - threshold = 10, -- 10% threshold - diff_method = "percent", - initial_val = 100 - } - - local trigger, trig_err = processing.DiffTrigger.new(config) - luaunit.assertNil(trig_err) + local trigger = processing.DiffTrigger.new({ + threshold = 10, + diff_method = 'percent', + initial_val = 100, + }) + local state = trigger:new_state() - -- 5% change - should short circuit - local val, short, err = trigger:run(105) + local val, short, err = trigger:run(105, state) luaunit.assertNil(err) - luaunit.assertEquals(short, true) + luaunit.assertTrue(short) -- 5% < 10% threshold - -- 15% change - should pass - val, short, err = trigger:run(115) + val, short, err = trigger:run(115, state) luaunit.assertNil(err) - luaunit.assertEquals(short, false) + luaunit.assertFalse(short) -- 15% >= 10% threshold luaunit.assertEquals(val, 115) end function TestProcessing:test_diff_trigger_any_change() - local config = { - diff_method = "any-change", - initial_val = 10 - } - - local trigger, trig_err = processing.DiffTrigger.new(config) - luaunit.assertNil(trig_err) + local trigger = processing.DiffTrigger.new({ + diff_method = 'any-change', + initial_val = 10, + }) + local state = trigger:new_state() - -- Same value - should short circuit - local val, short, err = trigger:run(10) + local val, short, err = trigger:run(10, state) luaunit.assertNil(err) - luaunit.assertEquals(short, true) + luaunit.assertTrue(short) -- same value - -- Any change - should pass - val, short, err = trigger:run(10.1) + val, short, err = trigger:run(10.1, state) luaunit.assertNil(err) - luaunit.assertEquals(short, false) + luaunit.assertFalse(short) -- changed luaunit.assertEquals(val, 10.1) end -function TestProcessing:test_time_trigger() - local config = { duration = 0.1 } - local trigger = processing.TimeTrigger.new(config) +function TestProcessing:test_delta_value() + local block = processing.DeltaValue.new({ initial_val = 10 }) + local state = block:new_state() + + local val, short, err = block:run(15, state) + luaunit.assertNil(err) + luaunit.assertFalse(short) + luaunit.assertEquals(val, 5) -- 15 - 10 + + block:reset(state) -- simulate publish: last_val = 15 + + val, short, err = block:run(20, state) + luaunit.assertNil(err) + luaunit.assertEquals(val, 5) -- 20 - 15 +end + +function TestProcessing:test_pipeline_run_and_reset() + local pipeline, err = processing.new_process_pipeline() + luaunit.assertNil(err) + pipeline:add(processing.DeltaValue.new({ initial_val = 10 })) + + local state = pipeline:new_state() - local val, short, err = trigger:run(123) + local val, short + val, short, err = pipeline:run(20, state) luaunit.assertNil(err) - luaunit.assertEquals(short, true) -- Should short circuit initially + luaunit.assertFalse(short) + luaunit.assertEquals(val, 10) -- 20 - 10 - sleep.sleep(0.2) - val, short, err = trigger:run(123) + pipeline:reset(state) -- last_val = 20 + + val, short, err = pipeline:run(25, state) luaunit.assertNil(err) - luaunit.assertEquals(short, false) -- Should pass after duration - luaunit.assertEquals(val, 123) + luaunit.assertEquals(val, 5) -- 25 - 20 end -function TestProcessing:test_delta_value() - local delta = processing.DeltaValue.new({ initial_val = 10 }) +function TestProcessing:test_pipeline_short_circuit() + local pipeline, err = processing.new_process_pipeline() + luaunit.assertNil(err) + pipeline:add(processing.DiffTrigger.new({ + diff_method = 'absolute', threshold = 5, initial_val = 10, + })) + pipeline:add(processing.DeltaValue.new({ initial_val = 10 })) + + local state = pipeline:new_state() - local val, short, err = delta:run(15) + local val, short + val, short, err = pipeline:run(20, state) -- diff=10, passes DiffTrigger luaunit.assertNil(err) - luaunit.assertEquals(short, false) - luaunit.assertEquals(val, 5) -- Should return difference + luaunit.assertFalse(short) + luaunit.assertEquals(val, 10) -- DeltaValue: 20-10 - delta:reset() - val, short, err = delta:run(20) + val, short, err = pipeline:run(22, state) -- diff=2 from last(20), short-circuits luaunit.assertNil(err) - luaunit.assertEquals(short, false) - luaunit.assertEquals(val, 5) + luaunit.assertTrue(short) end -function TestProcessing:test_clone_process() - local config = { - threshold = 5, - diff_method = "absolute", - initial_val = 10 - } +------------------------------------------------------------------------------- +-- Unit tests: config module +------------------------------------------------------------------------------- - local trigger, trig_err = processing.DiffTrigger.new(config) - luaunit.assertNil(trig_err) +TestConfig = {} - local clone, clone_err = trigger:clone() - luaunit.assertNil(clone_err) - luaunit.assertNotNil(clone) +function TestConfig:test_validate_http_config_valid() + local ok, err = conf.validate_http_config({ + url = 'http://cloud.example.com', + thing_key = 'key', + channels = { { id = 'ch1', name = 'data' } }, + }) + luaunit.assertTrue(ok) + luaunit.assertNil(err) +end - -- Test that clone behaves the same as original - local val1, short1, err1 = trigger:run(16) - local val2, short2, err2 = clone:run(16) +function TestConfig:test_validate_http_config_nil() + local ok, err = conf.validate_http_config(nil) + luaunit.assertFalse(ok) + luaunit.assertNotNil(err) +end - luaunit.assertEquals(val1, val2) - luaunit.assertEquals(short1, short2) - luaunit.assertEquals(err1, err2) +function TestConfig:test_validate_http_config_missing_url() + local ok, err = conf.validate_http_config({ thing_key = 'k', channels = {} }) + luaunit.assertFalse(ok) + luaunit.assertNotNil(err) end -function TestProcessing:test_process_pipeline() - local pipeline = processing.new_process_pipeline() +function TestConfig:test_merge_config() + local merged = conf.merge_config( + { a = 1, nested = { x = 10, y = 20 } }, + { b = 2, nested = { y = 99, z = 30 } } + ) + luaunit.assertEquals(merged.a, 1) + luaunit.assertEquals(merged.b, 2) + luaunit.assertEquals(merged.nested.x, 10) + luaunit.assertEquals(merged.nested.y, 99) -- overridden + luaunit.assertEquals(merged.nested.z, 30) -- added +end + +function TestConfig:test_apply_config_builds_pipeline() + local map, period = conf.apply_config({ + publish_period = 30, + pipelines = { + rx_bytes = { + protocol = 'log', + process = { { type = 'DeltaValue' } }, + }, + }, + }) + luaunit.assertEquals(period, 30) + luaunit.assertNotNil(map.rx_bytes) + luaunit.assertEquals(map.rx_bytes.protocol, 'log') + luaunit.assertNotNil(map.rx_bytes.pipeline) +end - -- Create a pipeline with DiffTrigger and DeltaValue - local diff_trigger = processing.DiffTrigger.new({ - threshold = 5, - diff_method = "absolute", - initial_val = 10 +function TestConfig:test_validate_config_rejects_bad_period() + local ok, _, err = conf.validate_config({ + publish_period = -1, + pipelines = { sim = { protocol = 'log', process = {} } }, }) - local delta_value = processing.DeltaValue.new({ initial_val = 10 }) + luaunit.assertFalse(ok) + luaunit.assertNotNil(err) +end + +function TestConfig:test_validate_config_warns_bad_protocol() + local ok, warns = conf.validate_config({ + publish_period = 10, + pipelines = { sim = { protocol = 'invalid' } }, + }) + luaunit.assertTrue(ok) + luaunit.assertTrue(#warns > 0) +end + +function TestConfig:test_validate_config_propagates_invalid_template_to_pipeline() + local ok, warns, err = conf.validate_config({ + publish_period = 10, + templates = { + bad_template = { + protocol = 'invalid', + }, + }, + pipelines = { + sim = { + template = 'bad_template', + }, + }, + }) + + luaunit.assertTrue(ok) + luaunit.assertNil(err) + + local saw_template_invalid = false + local saw_metric_uses_invalid_template = false + local saw_metric_invalid_protocol = false + + for _, w in ipairs(warns) do + if w.type == 'template' + and w.endpoint == 'bad_template' + and string.find(w.msg, "invalid protocol 'invalid'", 1, true) + then + saw_template_invalid = true + end + + if w.type == 'metric' + and w.endpoint == 'sim' + and string.find(w.msg, 'uses invalid template [bad_template]', 1, true) + then + saw_metric_uses_invalid_template = true + end - pipeline:add(diff_trigger) - pipeline:add(delta_value) + if w.type == 'metric' + and w.endpoint == 'sim' + and string.find(w.msg, "invalid protocol 'invalid'", 1, true) + then + saw_metric_invalid_protocol = true + end + end + + luaunit.assertTrue(saw_template_invalid) + luaunit.assertTrue(saw_metric_uses_invalid_template) + luaunit.assertTrue(saw_metric_invalid_protocol) +end + +------------------------------------------------------------------------------- +-- Unit tests: SenML encoder +------------------------------------------------------------------------------- - -- First run should pass through both processes - local val, short, err = pipeline:run(20) +TestSenML = {} + +function TestSenML:test_encode_number() + local rec, err = senml.encode('cpu', 42.5) luaunit.assertNil(err) - luaunit.assertEquals(short, false) - luaunit.assertEquals(val, 10) -- DeltaValue should return difference from initial + luaunit.assertEquals(rec.n, 'cpu') + luaunit.assertEquals(rec.v, 42.5) +end - -- Second run should short circuit at DiffTrigger - val, short, err = pipeline:run(22) +function TestSenML:test_encode_string() + local rec, err = senml.encode('status', 'ok') luaunit.assertNil(err) - luaunit.assertEquals(short, true) + luaunit.assertEquals(rec.vs, 'ok') end -function TestProcessing:test_pipeline_reset() - local pipeline = processing.new_process_pipeline() - local delta_value = processing.DeltaValue.new({ initial_val = 10 }) - pipeline:add(delta_value) +function TestSenML:test_encode_boolean() + local rec, err = senml.encode('flag', true) + luaunit.assertNil(err) + luaunit.assertEquals(rec.vb, true) +end - -- Run once to get delta - local val, short, err = pipeline:run(20) +function TestSenML:test_encode_with_time() + local rec, err = senml.encode('t', 1, 1000) luaunit.assertNil(err) - luaunit.assertEquals(val, 10) + luaunit.assertEquals(rec.t, 1000) +end - -- Reset pipeline - pipeline:reset() +function TestSenML:test_encode_invalid_value() + local rec, err = senml.encode('k', {}) + luaunit.assertNil(rec) + luaunit.assertNotNil(err) +end - -- Next run should use the last value as new base - val, short, err = pipeline:run(25) +function TestSenML:test_encode_r_flat() + local recs, err = senml.encode_r('dev', { temp = 23.5, status = 'on' }) luaunit.assertNil(err) - luaunit.assertEquals(val, 5) -- 25 - 20 + luaunit.assertEquals(#recs, 2) + local names = {} + for _, r in ipairs(recs) do names[r.n] = r end + luaunit.assertEquals(names['dev.temp'].v, 23.5) + luaunit.assertEquals(names['dev.status'].vs, 'on') end -TestConfig = {} +------------------------------------------------------------------------------- +-- Unit tests: HTTP publisher module +------------------------------------------------------------------------------- -function TestConfig:test_build_metric_pipeline_via_apply_config() - -- Test pipeline building indirectly through apply_config - -- which is the main way pipelines are created - local mock_conn = { - subscribe = function(self, topic) - return { - next_msg_op = function() end, - unsubscribe = function() end - } - end +TestHttpModule = {} + +function TestHttpModule:test_start_http_publisher_builds_expected_request() + local original_http_request = package.loaded['http.request'] + local original_http_module = package.loaded['services.metrics.http'] + + local captured = { + uri = nil, + method = nil, + auth = nil, + content_type = nil, + expect_header = 'present', + body = nil, + timeout = nil, } - local config = { - templates = {}, - collections = { - ["test/endpoint"] = { - protocol = "log", - process = { - { - type = "DiffTrigger", - threshold = 5, - diff_method = "absolute" + package.loaded['http.request'] = { + new_from_uri = function(uri) + captured.uri = uri + + local hdr = {} + local req = { + headers = { + upsert = function(_, k, v) hdr[k] = v end, + delete = function(_, k) hdr[k] = nil end, + }, + set_body = function(_, body) + captured.body = body + end, + go = function(_, timeout) + captured.timeout = timeout + captured.method = hdr[':method'] + captured.auth = hdr['authorization'] + captured.content_type = hdr['content-type'] + captured.expect_header = hdr['expect'] + return { + get = function(_, key) + if key == ':status' then return '202' end + return nil + end, + each = function() + return function() return nil end + end, } - } + end, } - }, - publish_period = 60 + + return req + end, } - local metrics, period, cloud_config = conf.apply_config(mock_conn, config, {}) - luaunit.assertEquals(#metrics, 1) - luaunit.assertEquals(period, 60) - luaunit.assertNotNil(metrics[1].base_pipeline) -end + package.loaded['services.metrics.http'] = nil -function TestConfig:test_validate_http_config() - -- Valid config - local valid_config = { - url = "http://example.com", - thing_key = "test_key", - channels = { - { id = "ch1", name = "data", metadata = { channel_type = "data" } } - } - } - local valid, err = conf.validate_http_config(valid_config) - luaunit.assertTrue(valid) - luaunit.assertNil(err) + local st, _, test_err = fibers.run_scope(function(s) + local http_mod = require 'services.metrics.http' - -- Missing url - local invalid_config = { - thing_key = "test_key", - channels = {} - } - valid, err = conf.validate_http_config(invalid_config) - luaunit.assertFalse(valid) - luaunit.assertNotNil(err) + local worker_scope, worker_err = s:child() + luaunit.assertNotNil(worker_scope, tostring(worker_err)) - -- Nil config - valid, err = conf.validate_http_config(nil) - luaunit.assertFalse(valid) - luaunit.assertNotNil(err) + local spawn_ok, spawn_err = worker_scope:spawn(function() + local ch = http_mod.start_http_publisher() + + perform(ch:put_op({ + uri = 'http://localhost:18080/http/channels/ch-data/messages', + auth = 'Thing test-thing-key', + body = '[{"n":"sim","vs":"present"}]', + })) + end) + luaunit.assertTrue(spawn_ok, tostring(spawn_err)) + + flush_ticks(20) + + luaunit.assertEquals(captured.uri, + 'http://localhost:18080/http/channels/ch-data/messages') + luaunit.assertEquals(captured.method, 'POST') + luaunit.assertEquals(captured.auth, 'Thing test-thing-key') + luaunit.assertEquals(captured.content_type, 'application/senml+json') + luaunit.assertNil(captured.expect_header) + luaunit.assertEquals(captured.body, '[{"n":"sim","vs":"present"}]') + luaunit.assertEquals(captured.timeout, 10) + + worker_scope:cancel('test done') + perform(worker_scope:join_op()) + end) + + package.loaded['http.request'] = original_http_request + package.loaded['services.metrics.http'] = original_http_module + + if st ~= 'ok' then + error(test_err or ('http module scope failed: ' .. tostring(st))) + end end -function TestConfig:test_merge_config() - local base = { - url = "http://base.com", - thing_key = "base_key", - nested = { - field1 = "value1", - field2 = "value2" - } - } +------------------------------------------------------------------------------- +-- Service-level tests +-- +-- These tests run the full metrics service in a child scope and exercise it +-- end-to-end via the bus. Because they call perform() / sleep, they must run +-- inside a fiber (i.e. inside fibers.run or another existing scope+spawn). +------------------------------------------------------------------------------- - local override = { - thing_key = "override_key", - nested = { - field2 = "override_value2", - field3 = "value3" - } - } +TestMetricsService = {} - local merged = conf.merge_config(base, override) - luaunit.assertEquals(merged.url, "http://base.com") - luaunit.assertEquals(merged.thing_key, "override_key") - luaunit.assertEquals(merged.nested.field1, "value1") - luaunit.assertEquals(merged.nested.field2, "override_value2") - luaunit.assertEquals(merged.nested.field3, "value3") -end - -function TestConfig:test_validate_topic_with_nil() - -- Test the actual metrics service validate_topic logic by directly calling _handle_metric - local metrics_service = require 'services.metrics' - local bus_pkg = require 'bus' - local context = require "fibers.context" - - -- Create a bus and connection (but don't start the service) - local test_bus = bus_pkg.new() - local conn = test_bus:connect() - - -- Create a context for the metrics service (needed for logging) - local bg_ctx = context.background() - local service_ctx = context.with_value(bg_ctx, "service_name", "metrics_test") - service_ctx = context.with_value(service_ctx, "fiber_name", "test_fiber") - - -- Set up metrics service state manually without starting it - metrics_service.ctx = service_ctx - metrics_service.metric_values = {} - metrics_service.pipelines = {} - - -- Create a pipeline for a test metric - local base_pipeline = processing.new_process_pipeline() - local diff_trigger = processing.DiffTrigger.new({ - threshold = 5, - diff_method = "absolute", - initial_val = 10 - }) - base_pipeline:add(diff_trigger) - - -- Create a metric definition - local metric = { - protocol = "log", - field = nil, - rename = nil, - base_pipeline = base_pipeline - } +function TestMetricsService:setUp() + self.clock = nil +end - -- Test 1: Valid topic should work and add value to metric_values - local valid_msg = bus_pkg.new_msg({"test", "metric"}, 100) - metrics_service:_handle_metric(metric, valid_msg) - luaunit.assertNotNil(metrics_service.metric_values.log) - luaunit.assertNotNil(metrics_service.metric_values.log["test.metric"]) - luaunit.assertEquals(metrics_service.metric_values.log["test.metric"].value, 100) - - -- Reset for next test - metrics_service.metric_values = {} - - -- Test 2: Invalid topic with nil in the middle should be rejected (not added) - local invalid_msg_nil = bus_pkg.new_msg({"test", nil, "metric"}, 200) - metrics_service:_handle_metric(metric, invalid_msg_nil) - -- Should not create any metric values because topic is invalid - luaunit.assertTrue((not metrics_service.metric_values.log) or (not metrics_service.metric_values.log["test..metric"])) - - -- Test 3: Invalid topic with gap (sparse array) should be rejected - local sparse_topic = {} - sparse_topic[1] = "test" - sparse_topic[3] = "metric" -- index 2 is missing - local sparse_msg = bus_pkg.new_msg(sparse_topic, 300) - metrics_service:_handle_metric(metric, sparse_msg) - -- Should not create any metric values because topic is invalid - luaunit.assertTrue((not metrics_service.metric_values.log) or (not metrics_service.metric_values.log["test.metric"])) - - -- Test 4: Another valid topic should work - metrics_service.metric_values = {} - local valid_msg2 = bus_pkg.new_msg({"another", "valid", "topic"}, 50) - metrics_service:_handle_metric(metric, valid_msg2) - luaunit.assertNotNil(metrics_service.metric_values.log) - luaunit.assertNotNil(metrics_service.metric_values.log["another.valid.topic"]) - - -- Test 5: Empty topic should be rejected - metrics_service.metric_values = {} - local empty_msg = bus_pkg.new_msg({}, 400) - metrics_service:_handle_metric(metric, empty_msg) - luaunit.assertNil(metrics_service.metric_values.log) - - -- Clean up - conn:disconnect() +function TestMetricsService:tearDown() + if self.clock then + self.clock:restore() + self.clock = nil + end end -TestSenML = {} +-- Publish a metric config as a retained message and a raw metric, then verify +-- the processed value is re-published on the bus topic. +function TestMetricsService:test_metric_published_via_bus() + local clock = new_test_clock() + self.clock = clock + local root = fibers.current_scope() + local bus = make_bus() + local test_conn = bus:connect() -function TestSenML:test_encode_basic() - -- Test encoding a string - local result, err = senml.encode("test/topic", "string_value") - luaunit.assertNil(err) - luaunit.assertEquals(result.n, "test/topic") - luaunit.assertEquals(result.vs, "string_value") + -- Signal HAL filesystem capability ready (retained). + test_conn:retain({ 'cap', 'fs', 'configs', 'state' }, 'added') + start_mock_hal(test_conn, root) + test_conn:retain({ 'svc', 'time', 'synced' }, true) - -- Test encoding a number - result, err = senml.encode("test/topic", 42.5) - luaunit.assertNil(err) - luaunit.assertEquals(result.n, "test/topic") - luaunit.assertEquals(result.v, 42.5) + -- Subscribe to bus-protocol metric output before starting the service. + local result_sub = test_conn:subscribe( + { 'svc', 'metrics', '#' }, + { queue_len = 10, full = 'drop_oldest' }) - -- Test encoding a boolean - result, err = senml.encode("test/topic", true) - luaunit.assertNil(err) - luaunit.assertEquals(result.n, "test/topic") - luaunit.assertEquals(result.vb, true) + -- Publish config: simple pass-through pipeline, publish every 0.1 s. + test_conn:retain({ 'cfg', 'metrics' }, bus_pipeline_config('sim', 0.1)) - -- Test encoding with timestamp - result, err = senml.encode("test/topic", 42, 1234567890) - luaunit.assertNil(err) - luaunit.assertEquals(result.t, 1234567890) -end + local svc_scope = start_metrics(bus, root) + flush_ticks() -function TestSenML:test_encode_invalid_types() - -- Test encoding with invalid type - local result, err = senml.encode("test/topic", {}) - luaunit.assertNotNil(err) - luaunit.assertNil(result) + -- topic[5] = 'sim' must match the pipeline name. + test_conn:publish( + { 'obs', 'v1', 'modem', 'metric', 'sim' }, + { value = 'present', namespace = { 'modem', 1, 'sim' } }) - -- Test encoding with nil - result, err = senml.encode("test/topic", nil) - luaunit.assertNotNil(err) - luaunit.assertNil(result) -end + local msg = recv_timeout(clock, result_sub, 0.5) -function TestSenML:test_encode_r_flat() - -- Test encoding a flat table - local values = { - temperature = 23.5, - humidity = 60, - status = "online" - } + luaunit.assertNotNil(msg, 'expected bus publish of sim metric') + luaunit.assertEquals(msg.payload.value, 'present') - local result, err = senml.encode_r("device/sensors", values) - luaunit.assertNil(err) - luaunit.assertEquals(#result, 3) - - -- Check each entry - local found = { temp = false, humid = false, status = false } - for _, entry in ipairs(result) do - if entry.n == "device/sensors.temperature" and entry.v == 23.5 then - found.temp = true - elseif entry.n == "device/sensors.humidity" and entry.v == 60 then - found.humid = true - elseif entry.n == "device/sensors.status" and entry.vs == "online" then - found.status = true - end - end + stop_scope(svc_scope) + clock:restore() +end - luaunit.assertTrue(found.temp) - luaunit.assertTrue(found.humid) - luaunit.assertTrue(found.status) +-- When the payload has a `namespace` field it overrides the bus topic used as +-- the SenML key and the output topic. +function TestMetricsService:test_namespace_overrides_topic_key() + local clock = new_test_clock() + self.clock = clock + local root = fibers.current_scope() + local bus = make_bus() + local test_conn = bus:connect() + + test_conn:retain({ 'cap', 'fs', 'configs', 'state' }, 'added') + start_mock_hal(test_conn, root) + test_conn:retain({ 'svc', 'time', 'synced' }, true) + + local result_sub = test_conn:subscribe( + { 'svc', 'metrics', '#' }, + { queue_len = 10, full = 'drop_oldest' }) + + test_conn:retain({ 'cfg', 'metrics' }, bus_pipeline_config('rx_bytes', 0.1)) + local svc_scope = start_metrics(bus, root) + flush_ticks() + + -- Publish with a namespace override: topic key becomes 'wan.rx_bytes'. + test_conn:publish( + { 'obs', 'v1', 'network', 'metric', 'rx_bytes' }, + { value = 1024, namespace = { 'wan', 'rx_bytes' } }) + + local msg = recv_timeout(clock, result_sub, 0.5) + + luaunit.assertNotNil(msg, 'expected bus publish with namespace key') + -- Output topic should be {'svc', 'metrics', 'wan', 'rx_bytes'} + luaunit.assertEquals(msg.topic[3], 'wan') + luaunit.assertEquals(msg.topic[4], 'rx_bytes') + luaunit.assertEquals(msg.payload.value, 1024) + + stop_scope(svc_scope) + clock:restore() end -function TestSenML:test_encode_r_nested() - -- Test encoding a nested table - local values = { - system = { - memory = 8192, - cpu = 45.6 - }, - network = { - status = "connected", - speed = 100 - } - } +-- A metric whose name has no matching pipeline must be silently dropped. +function TestMetricsService:test_unknown_metric_dropped() + local clock = new_test_clock() + self.clock = clock + local root = fibers.current_scope() + local bus = make_bus() + local test_conn = bus:connect() - local result, err = senml.encode_r("device", values) - luaunit.assertNil(err) - luaunit.assertEquals(#result, 4) - - -- Check specific entries - local found = { memory = false, cpu = false, net_status = false, speed = false } - for _, entry in ipairs(result) do - if entry.n == "device.system.memory" and entry.v == 8192 then - found.memory = true - elseif entry.n == "device.system.cpu" and entry.v == 45.6 then - found.cpu = true - elseif entry.n == "device.network.status" and entry.vs == "connected" then - found.net_status = true - elseif entry.n == "device.network.speed" and entry.v == 100 then - found.speed = true - end + test_conn:retain({ 'cap', 'fs', 'configs', 'state' }, 'added') + start_mock_hal(test_conn, root) + test_conn:retain({ 'svc', 'time', 'synced' }, true) + + local result_sub = test_conn:subscribe( + { 'svc', 'metrics', '#' }, + { queue_len = 10, full = 'drop_oldest' }) + + -- Config only knows about 'sim'; we will publish 'rx_bytes'. + test_conn:retain({ 'cfg', 'metrics' }, bus_pipeline_config('sim', 0.1)) + local svc_scope = start_metrics(bus, root) + flush_ticks() + + test_conn:publish( + { 'obs', 'v1', 'network', 'metric', 'rx_bytes' }, + { value = 9999 }) + + clock:advance(0.25) + time_harness.flush_ticks(20) + local messages = drain_non_status(result_sub) + + luaunit.assertEquals(#messages, 0, 'unexpected publish for unknown metric') + + stop_scope(svc_scope) + clock:restore() +end + +-- A DiffTrigger with any-change suppresses the second publish when the value +-- does not change between publish cycles. +function TestMetricsService:test_difftrigger_suppresses_unchanged_value() + local clock = new_test_clock() + self.clock = clock + local root = fibers.current_scope() + local bus = make_bus() + local test_conn = bus:connect() + + test_conn:retain({ 'cap', 'fs', 'configs', 'state' }, 'added') + start_mock_hal(test_conn, root) + test_conn:retain({ 'svc', 'time', 'synced' }, true) + + local result_sub = test_conn:subscribe( + { 'svc', 'metrics', '#' }, + { queue_len = 10, full = 'drop_oldest' }) + + test_conn:retain({ 'cfg', 'metrics' }, bus_pipeline_config('sim', 0.1, { + { type = 'DiffTrigger', diff_method = 'any-change' }, + })) + local svc_scope = start_metrics(bus, root) + flush_ticks() + + -- First publish: value 'present' — should pass DiffTrigger. + test_conn:publish( + { 'obs', 'v1', 'modem', 'metric', 'sim' }, + { value = 'present' }) + + local msg1 = recv_timeout(clock, result_sub, 0.4) + luaunit.assertNotNil(msg1, 'expected first publish') + luaunit.assertEquals(msg1.payload.value, 'present') + + -- Second publish: same value — DiffTrigger must suppress it. + test_conn:publish( + { 'obs', 'v1', 'modem', 'metric', 'sim' }, + { value = 'present' }) + + clock:advance(0.25) + time_harness.flush_ticks(20) + local msg2 = nil + while true do + local ok, m = time_harness.try_op_now(function() return result_sub:recv_op() end) + if not ok then break end + if m.topic[3] ~= 'status' then msg2 = m; break end end + luaunit.assertNil(msg2, 'second publish should be suppressed by DiffTrigger') - luaunit.assertTrue(found.memory) - luaunit.assertTrue(found.cpu) - luaunit.assertTrue(found.net_status) - luaunit.assertTrue(found.speed) -end - -function TestSenML:test_encode_r_with_value_field() - -- Test encoding a table with __value field and a subtable - local values = { - system = { - __value = "active", -- This should be at the base topic - memory = { - __value = "healthy", - used = 4096, - free = 4096 + stop_scope(svc_scope) + clock:restore() +end + +-- DeltaValue transforms a cumulative counter into a per-period delta. +function TestMetricsService:test_delta_value_pipeline() + local clock = new_test_clock() + self.clock = clock + local root = fibers.current_scope() + local bus = make_bus() + local test_conn = bus:connect() + + test_conn:retain({ 'cap', 'fs', 'configs', 'state' }, 'added') + start_mock_hal(test_conn, root) + test_conn:retain({ 'svc', 'time', 'synced' }, true) + + local result_sub = test_conn:subscribe( + { 'svc', 'metrics', '#' }, + { queue_len = 10, full = 'drop_oldest' }) + + test_conn:retain({ 'cfg', 'metrics' }, bus_pipeline_config('rx_bytes', 0.1, { + { type = 'DeltaValue', initial_val = 0 }, + })) + local svc_scope = start_metrics(bus, root) + flush_ticks() + + -- First reading: 1000 bytes; delta from initial 0 = 1000. + test_conn:publish( + { 'obs', 'v1', 'network', 'metric', 'rx_bytes' }, + { value = 1000 }) + + local msg1 = recv_timeout(clock, result_sub, 0.4) + luaunit.assertNotNil(msg1, 'expected first delta publish') + luaunit.assertEquals(msg1.payload.value, 1000) + + -- Second reading: 1500 bytes; delta from 1000 = 500. + test_conn:publish( + { 'obs', 'v1', 'network', 'metric', 'rx_bytes' }, + { value = 1500 }) + + local msg2 = recv_timeout(clock, result_sub, 0.4) + luaunit.assertNotNil(msg2, 'expected second delta publish') + luaunit.assertEquals(msg2.payload.value, 500) + + stop_scope(svc_scope) + clock:restore() +end + +-- HTTP protocol pipelines should enqueue a well-formed Mainflux request. +function TestMetricsService:test_http_pipeline_enqueues_request_payload() + local clock = new_test_clock() + self.clock = clock + local root = fibers.current_scope() + local bus = make_bus() + local test_conn = bus:connect() + + test_conn:retain({ 'cap', 'fs', 'configs', 'state' }, 'added') + start_mock_hal(test_conn, root) + test_conn:retain({ 'svc', 'time', 'synced' }, true) + + local captured = nil + local original_http_mod = package.loaded['services.metrics.http'] + package.loaded['services.metrics.http'] = { + start_http_publisher = function() + return { + put_op = function(_, data) + captured = data + return op.always(true) + end, } - } + end, } - local result, err = senml.encode_r("device", values) - luaunit.assertNil(err) + local svc_scope = nil + local st, _, test_err = fibers.run_scope(function() + test_conn:retain({ 'cfg', 'metrics' }, { + publish_period = 0.1, + cloud_url = 'http://localhost:18080', + pipelines = { + sim = { + protocol = 'http', + process = {}, + }, + }, + }) + + svc_scope = start_metrics(bus, root) + flush_ticks() + + test_conn:publish( + { 'obs', 'v1', 'modem', 'metric', 'sim' }, + { value = 'present', namespace = { 'modem', 1, 'sim' } }) + + clock:advance(0.3) + time_harness.flush_ticks(20) + + luaunit.assertNotNil(captured, 'expected HTTP payload to be enqueued') + luaunit.assertEquals(captured.uri, + 'http://localhost:18080/http/channels/ch-data/messages') + luaunit.assertEquals(captured.auth, 'Thing test-thing-key') + luaunit.assertNotNil(captured.body) + + local recs, decode_err = json.decode(captured.body) + luaunit.assertNil(decode_err) + luaunit.assertEquals(type(recs), 'table') + luaunit.assertEquals(#recs, 1) + luaunit.assertEquals(recs[1].n, 'modem.1.sim') + luaunit.assertEquals(recs[1].vs, 'present') + end) - -- Check for all expected entries - local found = { system = false, memory = false, used = false, free = false } - for _, entry in ipairs(result) do - if entry.n == "device.system" and entry.vs == "active" then - found.system = true - elseif entry.n == "device.system.memory" and entry.vs == "healthy" then - found.memory = true - elseif entry.n == "device.system.memory.used" and entry.v == 4096 then - found.used = true - elseif entry.n == "device.system.memory.free" and entry.v == 4096 then - found.free = true - end + if svc_scope then + stop_scope(svc_scope) end + package.loaded['services.metrics.http'] = original_http_mod + clock:restore() - luaunit.assertTrue(found.system) - luaunit.assertTrue(found.memory) - luaunit.assertTrue(found.used) - luaunit.assertTrue(found.free) - luaunit.assertEquals(#result, 4) + if st ~= 'ok' then + error(test_err or ('metrics http scope failed: ' .. tostring(st))) + end end -function TestSenML:test_encode_r_with_value_and_time() - -- Test encoding values with explicit value and time fields - local values = { - temperature = { - value = 23.5, - time = 1234567890 - }, - status = "online" - } +-- Receiving a new config replaces pipelines; old metric names are dropped. +function TestMetricsService:test_config_update_replaces_pipelines() + local clock = new_test_clock() + self.clock = clock + local root = fibers.current_scope() + local bus = make_bus() + local test_conn = bus:connect() + + test_conn:retain({ 'cap', 'fs', 'configs', 'state' }, 'added') + start_mock_hal(test_conn, root) + test_conn:retain({ 'svc', 'time', 'synced' }, true) + + local result_sub = test_conn:subscribe( + { 'svc', 'metrics', '#' }, + { queue_len = 20, full = 'drop_oldest' } + ) + + -- Initial config: pipeline for 'sim'. + test_conn:retain({ 'cfg', 'metrics' }, bus_pipeline_config('sim', 0.1)) + local svc_scope = start_metrics(bus, root) + flush_ticks() + + -- Confirm 'sim' publishes under the initial config. + test_conn:publish( + { 'obs', 'v1', 'modem', 'metric', 'sim' }, + { value = 'present' } + ) + local msg1 = recv_timeout(clock, result_sub, 0.4) + luaunit.assertNotNil(msg1, 'expected sim metric before config update') + + -- Update config: replace 'sim' pipeline with 'rx_bytes'. + test_conn:retain({ 'cfg', 'metrics' }, bus_pipeline_config('rx_bytes', 0.1)) + flush_ticks() + + -- 'rx_bytes' must publish after the config update. + test_conn:publish( + { 'obs', 'v1', 'network', 'metric', 'rx_bytes' }, + { value = 42 } + ) + local msg2 = recv_timeout(clock, result_sub, 0.4) + luaunit.assertNotNil(msg2, 'expected rx_bytes metric after config update') + luaunit.assertEquals(msg2.payload.value, 42) + + stop_scope(svc_scope) + clock:restore() +end - local result, err = senml.encode_r("sensor", values) - luaunit.assertNil(err) - luaunit.assertEquals(#result, 2) - - -- Check entries - local found = { temp = false, status = false } - for _, entry in ipairs(result) do - if entry.n == "sensor.temperature" and entry.v == 23.5 and entry.t == 1234567890 then - found.temp = true - elseif entry.n == "sensor.status" and entry.vs == "online" then - found.status = true +-- Two endpoints sharing the same pipeline name maintain isolated processing +-- state (DeltaValue counters don't bleed across endpoints). +function TestMetricsService:test_per_endpoint_state_isolation() + local clock = new_test_clock() + self.clock = clock + local root = fibers.current_scope() + local bus = make_bus() + local test_conn = bus:connect() + + test_conn:retain({ 'cap', 'fs', 'configs', 'state' }, 'added') + start_mock_hal(test_conn, root) + test_conn:retain({ 'svc', 'time', 'synced' }, true) + + local result_sub = test_conn:subscribe( + { 'svc', 'metrics', '#' }, + { queue_len = 20, full = 'drop_oldest' }) + + test_conn:retain({ 'cfg', 'metrics' }, bus_pipeline_config('rx_bytes', 0.1, { + { type = 'DeltaValue', initial_val = 0 }, + })) + local svc_scope = start_metrics(bus, root) + flush_ticks() + + -- WAN endpoint: 500 bytes → delta = 500. + test_conn:publish( + { 'obs', 'v1', 'network', 'metric', 'rx_bytes' }, + { value = 500, namespace = { 'wan', 'rx_bytes' } }) + + -- LAN endpoint: 200 bytes → delta = 200 (independent state). + test_conn:publish( + { 'obs', 'v1', 'network', 'metric', 'rx_bytes' }, + { value = 200, namespace = { 'lan', 'rx_bytes' } }) + + -- Collect both publishes within one tick window. + local received = {} + for _ = 1, 2 do + local msg = recv_timeout(clock, result_sub, 0.4) + if msg then + local key = table.concat(msg.topic, '.') + received[key] = msg.payload.value end end - luaunit.assertTrue(found.temp) - luaunit.assertTrue(found.status) + luaunit.assertEquals(received['svc.metrics.wan.rx_bytes'], 500) + luaunit.assertEquals(received['svc.metrics.lan.rx_bytes'], 200) + + stop_scope(svc_scope) + clock:restore() end --- Only run tests if this file is executed directly (not via dofile) +------------------------------------------------------------------------------- +-- Entry point +------------------------------------------------------------------------------- + if is_entry_point then - fiber.spawn(function () - luaunit.LuaUnit.run() - fiber.stop() + fibers.run(function() + os.exit(luaunit.LuaUnit.run()) end) - - fiber.main() end diff --git a/tests/test_utils/time_harness.lua b/tests/test_utils/time_harness.lua new file mode 100644 index 00000000..84896cc7 --- /dev/null +++ b/tests/test_utils/time_harness.lua @@ -0,0 +1,113 @@ +local fibers = require 'fibers' +local runtime = require 'fibers.runtime' + +local perform = fibers.perform +local pack = rawget(table, 'pack') or function(...) + return { n = select('#', ...), ... } +end +local unpack = rawget(table, 'unpack') or unpack + +---@class VirtualClock +---@field advance fun(self: VirtualClock, dt: number): number + +---@class TimeHarnessTickOpts +---@field max_ticks? integer + +---@class TimeHarnessWaitTicksOpts +---@field max_ticks? integer +---@field on_miss? fun(tick: integer) + +---@class TimeHarnessWaitWithinOpts +---@field step? number +---@field max_ticks? integer + +local M = {} +local NOT_READY = {} + +---@param op_or_factory any|fun(): any +---@return any +local function resolve_op(op_or_factory) + if type(op_or_factory) == 'function' then + return op_or_factory() + end + return op_or_factory +end + +---@param op_or_factory any|fun(): any +---@return boolean +---@return ... +function M.try_op_now(op_or_factory) + local op = resolve_op(op_or_factory) + local out = pack(perform(op:or_else(function() return NOT_READY end))) + if out.n == 1 and out[1] == NOT_READY then + return false + end + return true, unpack(out, 1, out.n) +end + +---@param max_ticks? integer +function M.flush_ticks(max_ticks) + max_ticks = max_ticks or 1 + for _ = 1, max_ticks do + runtime.yield() + end +end + +---@param op_or_factory any|fun(): any +---@param opts? TimeHarnessWaitTicksOpts +---@return boolean +---@return ... +function M.wait_op_ticks(op_or_factory, opts) + opts = opts or {} + + local max_ticks = opts.max_ticks or 1 + local on_miss = opts.on_miss + + for tick = 0, max_ticks do + local out = pack(M.try_op_now(op_or_factory)) + if out[1] then + return unpack(out, 1, out.n) + end + + if tick == max_ticks then break end + + if on_miss then + on_miss(tick + 1) + else + runtime.yield() + end + end + + return false +end + +---@param clock VirtualClock +---@param timeout_s number +---@param op_or_factory any|fun(): any +---@param opts? TimeHarnessWaitWithinOpts +---@return boolean +---@return ... +function M.wait_op_within(clock, timeout_s, op_or_factory, opts) + opts = opts or {} + + local step = opts.step or timeout_s + local max_ticks = opts.max_ticks or 1 + local elapsed = 0 + + while true do + local out = pack(M.try_op_now(op_or_factory)) + if out[1] then + return unpack(out, 1, out.n) + end + if elapsed >= timeout_s then + return false + end + + local advance = math.min(step, timeout_s - elapsed) + clock:advance(advance) + M.flush_ticks(max_ticks) + elapsed = elapsed + advance + end +end + +return M diff --git a/tests/test_utils/virtual_time.lua b/tests/test_utils/virtual_time.lua new file mode 100644 index 00000000..bfc9436f --- /dev/null +++ b/tests/test_utils/virtual_time.lua @@ -0,0 +1,128 @@ +local runtime = require 'fibers.runtime' +local time = require 'fibers.utils.time' + +---@class VirtualTimeInstallOpts +---@field monotonic? number +---@field realtime? number +---@field follow_realtime? boolean + +---@class VirtualClock +---@field monotonic fun(self: VirtualClock): number +---@field realtime fun(self: VirtualClock): number +---@field set_monotonic fun(self: VirtualClock, value: number): number +---@field set_realtime fun(self: VirtualClock, value: number): number +---@field advance fun(self: VirtualClock, dt: number): number +---@field restore fun(self: VirtualClock) + +local M = {} + +---@type VirtualClock|nil +local active_clock = nil + +---@param value number|nil +---@param fallback number +---@return number +local function now_or(value, fallback) + if value ~= nil then return value end + return fallback +end + +---@param opts? VirtualTimeInstallOpts +---@return VirtualClock +function M.install(opts) + if active_clock then + error('virtual_time.install: a clock is already installed') + end + + opts = opts or {} + + local scheduler = runtime.current_scheduler + local original = { + monotonic = time.monotonic, + realtime = time.realtime, + block = time._block, + scheduler_time = scheduler.get_time, + wheel_now = scheduler.wheel.now, + } + + local state = { + scheduler = scheduler, + original = original, + monotonic = now_or(opts.monotonic, scheduler:now()), + realtime = now_or(opts.realtime, time.realtime()), + follow_realtime = opts.follow_realtime ~= false, + restored = false, + } + + local function monotonic_now() + return state.monotonic + end + + local function realtime_now() + return state.realtime + end + + time.monotonic = monotonic_now + time.realtime = realtime_now + time._block = function() + return true + end + + scheduler.get_time = monotonic_now + scheduler.wheel.now = state.monotonic + + local clock = {} + + function clock:monotonic() + return state.monotonic + end + + function clock:realtime() + return state.realtime + end + + function clock:set_monotonic(value) + assert(type(value) == 'number', 'virtual_time.set_monotonic: value must be a number') + assert(value >= state.monotonic, 'virtual_time.set_monotonic: cannot move time backwards') + state.monotonic = value + return state.monotonic + end + + function clock:set_realtime(value) + assert(type(value) == 'number', 'virtual_time.set_realtime: value must be a number') + state.realtime = value + return state.realtime + end + + function clock:advance(dt) + assert(type(dt) == 'number', 'virtual_time.advance: dt must be a number') + assert(dt >= 0, 'virtual_time.advance: dt must be non-negative') + state.monotonic = state.monotonic + dt + if state.follow_realtime then + state.realtime = state.realtime + dt + end + return state.monotonic + end + + function clock:restore() + if state.restored then return end + if active_clock ~= clock then + error('virtual_time.restore: attempted to restore a non-active clock') + end + + scheduler.get_time = original.scheduler_time + scheduler.wheel.now = original.wheel_now + time.monotonic = original.monotonic + time.realtime = original.realtime + time._block = original.block + + state.restored = true + active_clock = nil + end + + ---@cast clock VirtualClock + active_clock = clock + return clock +end + +return M