diff --git a/README.md b/README.md index 97311fe..987d30e 100644 --- a/README.md +++ b/README.md @@ -105,8 +105,31 @@ with SmartPowerClient( ) ``` -Equal-tank-capacitor value is a two-register pair (value + exponent); -`read_capacitance()` returns it as a single float in Farads. +#### Tank-capacitor read / write + +The firmware exposes two tank-capacitor pairs as adjacent (value + exponent) +holding registers: + +- `HOLD_REG_CAP_VAL` (`0x3008`) / `HOLD_REG_CAP_EXP` (`0x3009`) +- `HOLD_REG_SECOND_CAP_VAL` (`0x3012`) / `HOLD_REG_SECOND_CAP_EXP` (`0x3013`) + +For convenience, the client exposes each pair as a single Farads-valued +float: + +```python +c1 = client.read_capacitance() # primary, single float in F +c2 = client.read_second_capacitance() # secondary, single float in F +client.write_capacitance(100e-6) # 100 µF, atomic 2-register write +client.write_second_capacitance(1e-3) # 1 mF +``` + +`write_capacitance` chooses the (`val`, `exp`) pair that maximises uint16 +precision (mantissa in `[6554, 65535]` whenever possible), so a round-trip +through `read_capacitance` preserves the input to ~4 decimal digits. +Negative, `nan`/`inf`, or out-of-range values (exponent outside `[-30, 6]`) +raise `InvalidValueError`. Both reads and writes use a single Modbus +transaction so the value / exponent pair cannot be torn by a concurrent +peer. ### Low-level (raw addresses, no validation) diff --git a/smartpower_modbus/client.py b/smartpower_modbus/client.py index 8ded060..b6a98af 100644 --- a/smartpower_modbus/client.py +++ b/smartpower_modbus/client.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +import math import threading import warnings from collections.abc import Iterable @@ -92,6 +93,52 @@ def _validate_int16(reg: Register, raw: int, *, physical_value=None) -> None: ) +# Plausibility window for the tank-capacitor exponent. Real capacitors run +# from picofarad to ~Farad scale; clamping past that catches uninitialised +# memory / wrong-register reads while leaving plenty of headroom on either +# side. Used by both the read decoder (``_read_cap_pair``) and the write +# encoder (``_encode_capacitance``). +_CAP_EXP_MIN = -30 +_CAP_EXP_MAX = 6 + + +def _encode_capacitance(value_F: float) -> tuple[int, int]: + """Encode a capacitance in Farads as a (val, exp) pair for the firmware's + two-register storage: ``cap_F == val * 10**exp``, with ``val`` in + ``[0, 65535]`` (uint16) and ``exp`` in ``[_CAP_EXP_MIN, _CAP_EXP_MAX]``. + + Chooses ``exp`` so that ``val`` lands in ``[6554, 65535]`` whenever + possible — that maximises uint16 precision so a round-trip through + :func:`_read_cap_pair` preserves the input to ~4 decimal digits. + + Raises :class:`InvalidValueError` for negative, ``nan``, ``inf``, or + values that cannot be represented within the exponent window. + """ + if not isinstance(value_F, (int, float)) or isinstance(value_F, bool): + raise InvalidValueError( + f"Capacitance must be a real number, got {type(value_F).__name__} {value_F!r}" + ) + if not math.isfinite(value_F) or value_F < 0: + raise InvalidValueError( + f"Capacitance must be a non-negative finite number, got {value_F!r}" + ) + if value_F == 0.0: + return (0, 0) + # exp = ceil(log10(value / uint16_max)) puts ``val`` in [6554, 65535] + # except when rounding pushes val to 65536 — caught below. + exp = math.ceil(math.log10(value_F / 0xFFFF)) + val = round(value_F / (10.0 ** exp)) + if val > 0xFFFF: + exp += 1 + val = round(value_F / (10.0 ** exp)) + if not _CAP_EXP_MIN <= exp <= _CAP_EXP_MAX: + raise InvalidValueError( + f"Capacitance {value_F} F cannot be encoded with exponent in " + f"[{_CAP_EXP_MIN}, {_CAP_EXP_MAX}] (got exp={exp})" + ) + return (val, exp) + + def _coerce_model(value) -> SmartPowerModel: """Accept a SmartPowerModel, a public model name string, a FirmwareBranch (deprecated), or a firmware-branch string (deprecated) and return the @@ -448,6 +495,51 @@ def write_value( # ----- composite: tank-capacitor value+exponent ----- + def _read_cap_pair(self, val_reg: Register, exp_reg: Register) -> float: + """Atomic value+exponent read shared by ``read_capacitance`` and + ``read_second_capacitance``. Reads ``val_reg`` and ``exp_reg`` in + one Modbus transaction (they live at adjacent addresses), decodes + the exponent as int16, and bounds-checks it. + """ + model = self._require_model() + assert_supported(val_reg, model) + assert_supported(exp_reg, model) + # val_reg and exp_reg are firmware-side adjacent (val first); enforce + # so a refactor can't quietly desynchronise the pair. + assert exp_reg.addr == val_reg.addr + 1, ( + f"{val_reg.name} and {exp_reg.name} must be at adjacent addresses" + ) + with self._lock: + raw = self._transport.read_holding(val_reg.addr, 2) + val_raw = raw[0] + exp_raw = signed16(raw[1]) + if not _CAP_EXP_MIN <= exp_raw <= _CAP_EXP_MAX: + raise InvalidValueError( + f"Capacitance exponent {exp_raw} from {exp_reg.name} " + f"is out of plausible range [{_CAP_EXP_MIN}, {_CAP_EXP_MAX}]" + ) + return val_raw * (10.0 ** exp_raw) + + def _write_cap_pair( + self, val_reg: Register, exp_reg: Register, value: float + ) -> None: + """Encode ``value`` (Farads) and write the val+exp pair atomically + via FC 0x10 (Write Multiple Registers). Shared by + ``write_capacitance`` and ``write_second_capacitance``. + """ + model = self._require_model() + assert_supported(val_reg, model) + assert_supported(exp_reg, model) + assert exp_reg.addr == val_reg.addr + 1, ( + f"{val_reg.name} and {exp_reg.name} must be at adjacent addresses" + ) + val, exp = _encode_capacitance(float(value)) + # ``exp`` is a signed int16 on the wire — convert to the uint16 + # wire form. unsigned16() also validates the range as a sanity guard. + exp_wire = unsigned16(exp) + with self._lock: + self._transport.write_holdings(val_reg.addr, [val, exp_wire]) + def read_capacitance(self) -> float: """Read the equal-tank-capacitor value as a single float in Farads. @@ -462,22 +554,49 @@ def read_capacitance(self) -> float: exponent against a sane range — a garbage firmware value of e.g. ``exp=400`` would silently overflow ``10.0 ** exp`` to ``inf``. """ - model = self._require_model() - assert_supported(Register.HOLD_REG_CAP_VAL, model) - assert_supported(Register.HOLD_REG_CAP_EXP, model) - with self._lock: - raw = self._transport.read_holding(Register.HOLD_REG_CAP_VAL.addr, 2) - val_raw = raw[0] - exp_raw = signed16(raw[1]) - # Real-world tank caps land in pico- to milli-farad territory; clamp - # well past that so legitimate firmware values pass while obvious - # garbage (uninitialised memory, wrong register) is rejected. - if not -30 <= exp_raw <= 6: - raise InvalidValueError( - f"Capacitance exponent {exp_raw} from {Register.HOLD_REG_CAP_EXP.name} " - f"is out of plausible range [-30, 6]" - ) - return val_raw * (10.0 ** exp_raw) + return self._read_cap_pair( + Register.HOLD_REG_CAP_VAL, Register.HOLD_REG_CAP_EXP, + ) + + def read_second_capacitance(self) -> float: + """Read the second equal-tank-capacitor value as a single float in Farads. + + Mirror of :meth:`read_capacitance` against the second value/exponent + pair at ``HOLD_REG_SECOND_CAP_VAL`` (``0x3012``) and + ``HOLD_REG_SECOND_CAP_EXP`` (``0x3013``). Same atomicity and + bounds-check semantics. + """ + return self._read_cap_pair( + Register.HOLD_REG_SECOND_CAP_VAL, Register.HOLD_REG_SECOND_CAP_EXP, + ) + + def write_capacitance(self, value: float) -> None: + """Write the equal-tank-capacitor value as a single float in Farads. + + Inverse of :meth:`read_capacitance`. Encodes ``value`` as a (val, + exp) pair and writes both registers atomically via FC 0x10. The + encoder maximises uint16 precision (mantissa lands in + ``[6554, 65535]`` whenever possible) so a round-trip through + :meth:`read_capacitance` preserves ``value`` to ~4 decimal digits. + + Raises :class:`InvalidValueError` for negative, ``nan``/``inf``, + or out-of-range inputs (exponent outside ``[-30, 6]``). + """ + self._write_cap_pair( + Register.HOLD_REG_CAP_VAL, Register.HOLD_REG_CAP_EXP, value, + ) + + def write_second_capacitance(self, value: float) -> None: + """Write the second equal-tank-capacitor value as a single float in Farads. + + Mirror of :meth:`write_capacitance` against the second value/exponent + pair (``HOLD_REG_SECOND_CAP_VAL`` / ``HOLD_REG_SECOND_CAP_EXP``). + """ + self._write_cap_pair( + Register.HOLD_REG_SECOND_CAP_VAL, + Register.HOLD_REG_SECOND_CAP_EXP, + value, + ) def dump(self) -> dict[Register, int | bool]: """Read every register exposed by the configured model. diff --git a/tests/test_capacitance.py b/tests/test_capacitance.py index e1321e6..2adcc8e 100644 --- a/tests/test_capacitance.py +++ b/tests/test_capacitance.py @@ -10,10 +10,21 @@ silently coercing them to ``True``. - B4 — ``identify_model`` does not clobber an explicitly-configured ``self.model`` when the device reports a different one. + +Also covers the second-pair / write-side additions: + +- ``read_second_capacitance`` mirrors ``read_capacitance`` against the + ``HOLD_REG_SECOND_CAP_*`` pair (0x3012 / 0x3013). +- ``write_capacitance`` and ``write_second_capacitance`` encode a Farads + value and emit one atomic FC 0x10 (Write Multiple Registers) write. +- ``_encode_capacitance`` round-trips, maximises uint16 precision, and + rejects bad inputs. """ from __future__ import annotations +import math + import pytest from smartpower_modbus import ( @@ -22,6 +33,7 @@ SmartPowerClient, SmartPowerModel, ) +from smartpower_modbus.client import _encode_capacitance # Re-use the existing fake transport from test_transport.py. With pytest's # default rootdir + tests/__init__.py present, the package form works; the @@ -184,3 +196,202 @@ def test_identify_model_sets_when_unconfigured(fake_client): assert c.model is SmartPowerModel.GEN_2_0 finally: c.close() + + +# ---------- _encode_capacitance: pure unit tests ---------- + +def test_encode_capacitance_microfarad(): + """100 µF must encode to a mantissa in [6554, 65535] so a round-trip + keeps full uint16 precision.""" + val, exp = _encode_capacitance(100e-6) + assert val == 10000 + assert exp == -8 + assert val * (10.0 ** exp) == pytest.approx(100e-6) + + +def test_encode_capacitance_zero(): + """Zero is the only value where the (val, exp) pair is ambiguous; we + canonicalise it to (0, 0).""" + assert _encode_capacitance(0.0) == (0, 0) + + +def test_encode_capacitance_picofarad_round_trip(): + val, exp = _encode_capacitance(1e-12) + assert val * (10.0 ** exp) == pytest.approx(1e-12) + + +@pytest.mark.parametrize("value_F", [1e-12, 1e-9, 1e-6, 100e-6, 1e-3, 1.0, 1.234e-6]) +def test_encode_capacitance_round_trip(value_F): + """Across the plausible capacitor range, encode→decode must preserve + the input value to within float-rounding tolerance.""" + val, exp = _encode_capacitance(value_F) + assert 0 <= val <= 0xFFFF + assert -30 <= exp <= 6 + assert val * (10.0 ** exp) == pytest.approx(value_F, rel=1e-4) + + +def test_encode_capacitance_rejects_negative(): + with pytest.raises(InvalidValueError, match="non-negative"): + _encode_capacitance(-1e-6) + + +def test_encode_capacitance_rejects_nan(): + with pytest.raises(InvalidValueError, match="non-negative"): + _encode_capacitance(math.nan) + + +def test_encode_capacitance_rejects_inf(): + with pytest.raises(InvalidValueError, match="non-negative"): + _encode_capacitance(math.inf) + + +def test_encode_capacitance_rejects_bool(): + """bool is an int subclass — reject it explicitly so write_capacitance(True) + can't sneak through as 1.0 F.""" + with pytest.raises(InvalidValueError, match="real number"): + _encode_capacitance(True) + + +def test_encode_capacitance_rejects_too_small(): + with pytest.raises(InvalidValueError, match=r"exponent in \[-30, 6\]"): + _encode_capacitance(1e-35) + + +def test_encode_capacitance_rejects_too_large(): + with pytest.raises(InvalidValueError, match=r"exponent in \[-30, 6\]"): + _encode_capacitance(1e12) + + +# ---------- read_second_capacitance ---------- + +def test_read_second_capacitance_normal_pair(client, fake_client): + """200 × 10^-5 = 2 mF — typical secondary tank cap.""" + fake_client.script( + "read_holding_registers", + _Resp(registers=[200, _signed16_to_raw(-5)]), + ) + cap = client.read_second_capacitance() + assert cap == pytest.approx(2e-3) + + +def test_read_second_capacitance_uses_single_call_at_0x3012(client, fake_client): + """B2-style atomicity guard: one transaction, address 0x3012, count=2.""" + fake_client.calls.clear() + fake_client.script("read_holding_registers", _Resp(registers=[100, _signed16_to_raw(-6)])) + client.read_second_capacitance() + holding_calls = [c for c in fake_client.calls if c.name == "read_holding_registers"] + assert len(holding_calls) == 1 + call = holding_calls[0] + assert call.address == Register.HOLD_REG_SECOND_CAP_VAL.addr # 0x3012 + assert call.kwargs["count"] == 2 + + +def test_read_second_capacitance_rejects_overflow_exponent(client, fake_client): + fake_client.script("read_holding_registers", _Resp(registers=[1, 400])) + with pytest.raises(InvalidValueError, match="exponent"): + client.read_second_capacitance() + + +# ---------- write_capacitance ---------- + +def test_write_capacitance_emits_single_write_multiple_registers_call(client, fake_client): + """B2-style atomicity: write_capacitance must use one FC 0x10 PDU at + 0x3008 carrying [val, exp_wire] — not two single-register writes.""" + fake_client.calls.clear() + client.write_capacitance(100e-6) + write_calls = [c for c in fake_client.calls if c.name == "write_registers"] + assert len(write_calls) == 1, ( + f"expected 1 write_registers call, got {len(write_calls)}: {write_calls}" + ) + call = write_calls[0] + assert call.address == Register.HOLD_REG_CAP_VAL.addr # 0x3008 + # 100 µF → (10000, -8); exp -8 on the wire is 0xFFF8. + assert call.kwargs["values"] == [10000, 0xFFF8] + + +def test_write_capacitance_uses_no_single_register_writes(client, fake_client): + """Belt-and-braces: no FC 0x06 (write_register) calls should leak out + on the write_capacitance path.""" + fake_client.calls.clear() + client.write_capacitance(1e-3) + single_writes = [c for c in fake_client.calls if c.name == "write_register"] + assert single_writes == [] + + +@pytest.mark.parametrize("value_F", [1e-12, 1e-9, 1e-6, 100e-6, 1e-3, 1.0]) +def test_write_then_read_capacitance_round_trip(value_F, client, fake_client): + """End-to-end: write_capacitance(v) followed by reading the same val+exp + pair back through the fake transport returns approximately v.""" + fake_client.calls.clear() + client.write_capacitance(value_F) + written = fake_client.calls[-1].kwargs["values"] + # Mirror the encoder: exp_wire is unsigned16(exp); decoding goes back + # through signed16. Have the fake reply with the same payload on read. + fake_client.script( + "read_holding_registers", _Resp(registers=list(written)), + ) + assert client.read_capacitance() == pytest.approx(value_F, rel=1e-4) + + +def test_write_capacitance_zero_round_trip(client, fake_client): + fake_client.calls.clear() + client.write_capacitance(0.0) + written = fake_client.calls[-1].kwargs["values"] + assert written == [0, 0] + fake_client.script("read_holding_registers", _Resp(registers=[0, 0])) + assert client.read_capacitance() == 0.0 + + +def test_write_capacitance_rejects_negative(client, fake_client): + fake_client.calls.clear() + with pytest.raises(InvalidValueError, match="non-negative"): + client.write_capacitance(-1e-6) + # And nothing should hit the wire. + assert [c for c in fake_client.calls if c.name == "write_registers"] == [] + + +def test_write_capacitance_rejects_nonfinite(client): + for bad in (math.nan, math.inf, -math.inf): + with pytest.raises(InvalidValueError, match="non-negative"): + client.write_capacitance(bad) + + +def test_write_capacitance_rejects_too_large(client): + with pytest.raises(InvalidValueError, match=r"exponent in \[-30, 6\]"): + client.write_capacitance(1e12) + + +def test_write_capacitance_rejects_too_small(client): + with pytest.raises(InvalidValueError, match=r"exponent in \[-30, 6\]"): + client.write_capacitance(1e-35) + + +def test_write_capacitance_accepts_int_input(client, fake_client): + """Common ergonomic case: caller passes an int (e.g. 1 for 1 F).""" + fake_client.calls.clear() + client.write_capacitance(1) + call = fake_client.calls[-1] + assert call.name == "write_registers" + # 1 F → (10000, -4); -4 wire = 0xFFFC. + assert call.kwargs["values"] == [10000, 0xFFFC] + + +# ---------- write_second_capacitance ---------- + +def test_write_second_capacitance_writes_to_0x3012(client, fake_client): + fake_client.calls.clear() + client.write_second_capacitance(100e-6) + call = fake_client.calls[-1] + assert call.name == "write_registers" + assert call.address == Register.HOLD_REG_SECOND_CAP_VAL.addr # 0x3012 + assert call.kwargs["values"] == [10000, 0xFFF8] + + +def test_write_second_capacitance_round_trip(client, fake_client): + """Symmetry check: writing then reading the second pair returns the + same value (parallel to the first-pair round-trip).""" + fake_client.calls.clear() + client.write_second_capacitance(1e-6) + written = fake_client.calls[-1].kwargs["values"] + fake_client.script("read_holding_registers", _Resp(registers=list(written))) + assert client.read_second_capacitance() == pytest.approx(1e-6, rel=1e-4)