diff --git a/.github/workflows/code-cleanup.yml b/.github/workflows/code-cleanup.yml index 745f3852c1..5070747b77 100644 --- a/.github/workflows/code-cleanup.yml +++ b/.github/workflows/code-cleanup.yml @@ -1,8 +1,6 @@ name: code-cleanup on: pull_request: - paths-ignore: - - '**.md' permissions: contents: write diff --git a/dimos/core/blueprints.py b/dimos/core/blueprints.py index 605517e6cf..d1d039ceb8 100644 --- a/dimos/core/blueprints.py +++ b/dimos/core/blueprints.py @@ -21,7 +21,10 @@ import operator import sys from types import MappingProxyType -from typing import Any, Literal, get_args, get_origin, get_type_hints +from typing import TYPE_CHECKING, Any, Literal, get_args, get_origin, get_type_hints + +if TYPE_CHECKING: + from dimos.protocol.service.system_configurator import SystemConfigurator from dimos.core.global_config import GlobalConfig, global_config from dimos.core.module import Module, is_module_type @@ -108,7 +111,9 @@ class Blueprint: remapping_map: Mapping[tuple[type[Module], str], str | type[Module] | type[Spec]] = field( default_factory=lambda: MappingProxyType({}) ) - requirement_checks: tuple[Callable[[], str | None], ...] = field(default_factory=tuple) + requirement_checks: "tuple[Callable[[], str | None] | SystemConfigurator, ...]" = field( + default_factory=tuple + ) @classmethod def create(cls, module: type[Module], *args: Any, **kwargs: Any) -> "Blueprint": @@ -148,7 +153,7 @@ def remappings( requirement_checks=self.requirement_checks, ) - def requirements(self, *checks: Callable[[], str | None]) -> "Blueprint": + def requirements(self, *checks: "Callable[[], str | None] | SystemConfigurator") -> "Blueprint": return Blueprint( blueprints=self.blueprints, transport_map=self.transport_map, @@ -203,16 +208,35 @@ def _is_name_unique(self, name: str) -> bool: return sum(1 for n, _ in self._all_name_types if n == name) == 1 def _check_requirements(self) -> None: - errors = [] - red = "\033[31m" - reset = "\033[0m" + from dimos.protocol.service.system_configurator import ( + SystemConfigurator, + configure_system, + lcm_configurators, + ) + configurators = list(lcm_configurators()) + other_checks: list[Callable[[], str | None]] = [] for check in self.requirement_checks: - error = check() - if error: - errors.append(error) + if isinstance(check, SystemConfigurator): + configurators.append(check) + else: + other_checks.append(check) + + if configurators: + try: + configure_system(configurators) + except SystemExit: + labels = [type(c).__name__ for c in configurators] + print( + f"Required system configuration was declined: {', '.join(labels)}", + file=sys.stderr, + ) + sys.exit(1) + errors = [e for check in other_checks if (e := check())] if errors: + red = "\033[31m" + reset = "\033[0m" for error in errors: print(f"{red}Error: {error}{reset}", file=sys.stderr) sys.exit(1) diff --git a/dimos/protocol/pubsub/benchmark/test_benchmark.py b/dimos/protocol/pubsub/benchmark/test_benchmark.py index 39a4421c35..4ab2fdb479 100644 --- a/dimos/protocol/pubsub/benchmark/test_benchmark.py +++ b/dimos/protocol/pubsub/benchmark/test_benchmark.py @@ -29,6 +29,7 @@ MsgGen, PubSubContext, ) +from dimos.utils.human import human_bytes # Message sizes for throughput benchmarking (powers of 2 from 64B to 10MB) MSG_SIZES = [ @@ -56,15 +57,6 @@ RECEIVE_TIMEOUT = 1.0 -def size_id(size: int) -> str: - """Convert byte size to human-readable string for test IDs.""" - if size >= 1048576: - return f"{size // 1048576}MB" - if size >= 1024: - return f"{size // 1024}KB" - return f"{size}B" - - def pubsub_id(testcase: Case[Any, Any]) -> str: """Extract pubsub implementation name from context manager function name.""" name: str = testcase.pubsub_context.__name__ @@ -86,7 +78,9 @@ def benchmark_results() -> Generator[BenchmarkResults, None, None]: @pytest.mark.tool -@pytest.mark.parametrize("msg_size", MSG_SIZES, ids=[size_id(s) for s in MSG_SIZES]) +@pytest.mark.parametrize( + "msg_size", MSG_SIZES, ids=[human_bytes(s, concise=True, decimals=0) for s in MSG_SIZES] +) @pytest.mark.parametrize("pubsub_context, msggen", testcases, ids=[pubsub_id(t) for t in testcases]) def test_throughput( pubsub_context: PubSubContext[Any, Any], diff --git a/dimos/protocol/pubsub/benchmark/type.py b/dimos/protocol/pubsub/benchmark/type.py index a9ef80fe7a..4b64a08561 100644 --- a/dimos/protocol/pubsub/benchmark/type.py +++ b/dimos/protocol/pubsub/benchmark/type.py @@ -20,6 +20,7 @@ from typing import Any, Generic from dimos.protocol.pubsub.spec import MsgT, PubSub, TopicT +from dimos.utils.human import human_bytes, human_duration, human_number MsgGen = Callable[[int], tuple[TopicT, MsgT]] @@ -41,33 +42,6 @@ def __len__(self) -> int: TestData = Sequence[Case[Any, Any]] -def _format_mib(value: float) -> str: - """Format bytes as MiB with intelligent rounding. - - >= 10 MiB: integer (e.g., "42") - 1-10 MiB: 1 decimal (e.g., "2.5") - < 1 MiB: 2 decimals (e.g., "0.07") - """ - mib = value / (1024**2) - if mib >= 10: - return f"{mib:.0f}" - if mib >= 1: - return f"{mib:.1f}" - return f"{mib:.2f}" - - -def _format_iec(value: float, concise: bool = False, decimals: int = 2) -> str: - """Format bytes with IEC units (Ki/Mi/Gi = 1024^1/2/3)""" - k = 1024.0 - units = ["B", "K", "M", "G", "T"] if concise else ["B", "KiB", "MiB", "GiB", "TiB"] - - for unit in units[:-1]: - if abs(value) < k: - return f"{value:.{decimals}f}{unit}" if concise else f"{value:.{decimals}f} {unit}" - value /= k - return f"{value:.{decimals}f}{units[-1]}" if concise else f"{value:.{decimals}f} {units[-1]}" - - @dataclass class BenchmarkResult: transport: str @@ -133,11 +107,13 @@ def print_summary(self) -> None: recv_style = "yellow" if r.receive_time > 0.1 else "dim" table.add_row( r.transport, - _format_iec(r.msg_size_bytes, decimals=0), + human_bytes(r.msg_size_bytes, decimals=0), f"{r.msgs_sent:,}", f"{r.msgs_received:,}", f"{r.throughput_msgs:,.0f}", - _format_mib(r.throughput_bytes), + (lambda m: f"{m:.2f}" if m < 1 else f"{m:.1f}" if m < 10 else f"{m:.0f}")( + r.throughput_bytes / 1024**2 + ), f"[{recv_style}]{r.receive_time * 1000:.0f}ms[/{recv_style}]", f"[{loss_style}]{r.loss_pct:.1f}%[/{loss_style}]", ) @@ -211,7 +187,7 @@ def val_to_color(v: float) -> int: return gradient[int(t * (len(gradient) - 1))] reset = "\033[0m" - size_labels = [_format_iec(s, concise=True, decimals=0) for s in sizes] + size_labels = [human_bytes(s, concise=True, decimals=0) for s in sizes] col_w = max(8, max(len(s) for s in size_labels) + 1) transport_w = max(len(t) for t in transports) + 1 @@ -236,31 +212,23 @@ def val_to_color(v: float) -> int: def print_heatmap(self) -> None: """Print msgs/sec heatmap.""" - def fmt(v: float) -> str: - return f"{v / 1000:.1f}k" if v >= 1000 else f"{v:.0f}" - - self._print_heatmap("Msgs/sec", lambda r: r.throughput_msgs, fmt) + self._print_heatmap("Msgs/sec", lambda r: r.throughput_msgs, human_number) def print_bandwidth_heatmap(self) -> None: """Print bandwidth heatmap.""" def fmt(v: float) -> str: - return _format_iec(v, concise=True, decimals=1) + return human_bytes(v, concise=True, decimals=1) self._print_heatmap("Bandwidth (IEC)", lambda r: r.throughput_bytes, fmt) def print_latency_heatmap(self) -> None: """Print latency heatmap (time waiting for messages after publishing).""" - def fmt(v: float) -> str: - if v >= 1: - return f"{v:.1f}s" - return f"{v * 1000:.0f}ms" - self._print_heatmap( "Latency", lambda r: r.receive_time, - fmt, + lambda v: human_duration(v, signed=False), high_is_good=False, ) diff --git a/dimos/protocol/service/lcmservice.py b/dimos/protocol/service/lcmservice.py index 4655780fb3..f414ce9e23 100644 --- a/dimos/protocol/service/lcmservice.py +++ b/dimos/protocol/service/lcmservice.py @@ -24,15 +24,7 @@ import lcm from dimos.protocol.service.spec import Service -from dimos.protocol.service.system_configurator import ( - BufferConfiguratorLinux, - BufferConfiguratorMacOS, - MaxFileConfiguratorMacOS, - MulticastConfiguratorLinux, - MulticastConfiguratorMacOS, - SystemConfigurator, - configure_system, -) +from dimos.protocol.service.system_configurator import configure_system, lcm_configurators from dimos.utils.logging_config import setup_logger logger = setup_logger() @@ -46,22 +38,9 @@ def autoconf(check_only: bool = False) -> None: - # check multicast and buffer sizes - system = platform.system() - checks: list[SystemConfigurator] = [] - if system == "Linux": - checks = [ - MulticastConfiguratorLinux(loopback_interface="lo"), - BufferConfiguratorLinux(), - ] - elif system == "Darwin": - checks = [ - MulticastConfiguratorMacOS(loopback_interface="lo0"), - BufferConfiguratorMacOS(), - MaxFileConfiguratorMacOS(), - ] - else: - logger.error(f"System configuration not supported on {system}") + checks = lcm_configurators() + if not checks: + logger.error(f"System configuration not supported on {platform.system()}") return configure_system(checks, check_only=check_only) diff --git a/dimos/protocol/service/system_configurator/__init__.py b/dimos/protocol/service/system_configurator/__init__.py new file mode 100644 index 0000000000..b49c76876a --- /dev/null +++ b/dimos/protocol/service/system_configurator/__init__.py @@ -0,0 +1,75 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""System configurator package — re-exports for backward compatibility.""" + +import platform + +from dimos.protocol.service.system_configurator.base import ( + SystemConfigurator, + configure_system, + sudo_run, +) +from dimos.protocol.service.system_configurator.clock_sync import ClockSyncConfigurator +from dimos.protocol.service.system_configurator.lcm import ( + IDEAL_RMEM_SIZE, + BufferConfiguratorLinux, + BufferConfiguratorMacOS, + MaxFileConfiguratorMacOS, + MulticastConfiguratorLinux, + MulticastConfiguratorMacOS, +) + + +# TODO: This is a configurator API issue and inserted here temporarily +# +# We need to use different configurators based on the underlying OS +# +# We should have separation of concerns, nothing but configurators themselves care about the OS in this context +# +# So configurators with multi-os behavior should be responsible for the right per-OS behaviour, and +# not external systems +# +# We might want to have some sort of recursive configurators +# +def lcm_configurators() -> list[SystemConfigurator]: + """Return the platform-appropriate LCM system configurators.""" + system = platform.system() + if system == "Linux": + return [ + MulticastConfiguratorLinux(loopback_interface="lo"), + BufferConfiguratorLinux(), + ] + elif system == "Darwin": + return [ + MulticastConfiguratorMacOS(loopback_interface="lo0"), + BufferConfiguratorMacOS(), + MaxFileConfiguratorMacOS(), # TODO: this is not LCM related and shouldn't be here at all + ] + return [] + + +__all__ = [ + "IDEAL_RMEM_SIZE", + "BufferConfiguratorLinux", + "BufferConfiguratorMacOS", + "ClockSyncConfigurator", + "MaxFileConfiguratorMacOS", + "MulticastConfiguratorLinux", + "MulticastConfiguratorMacOS", + "SystemConfigurator", + "configure_system", + "lcm_configurators", + "sudo_run", +] diff --git a/dimos/protocol/service/system_configurator/base.py b/dimos/protocol/service/system_configurator/base.py new file mode 100644 index 0000000000..c221af890f --- /dev/null +++ b/dimos/protocol/service/system_configurator/base.py @@ -0,0 +1,135 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from abc import ABC, abstractmethod +from functools import cache +import logging +import os +import subprocess +from typing import Any + +import typer + +logger = logging.getLogger(__name__) + +# ----------------------------- sudo helpers ----------------------------- + + +@cache +def _is_root_user() -> bool: + try: + return os.geteuid() == 0 + except AttributeError: + return False + + +def sudo_run(*args: Any, **kwargs: Any) -> subprocess.CompletedProcess[str]: + if _is_root_user(): + return subprocess.run(list(args), **kwargs) + return subprocess.run(["sudo", *args], **kwargs) + + +def _read_sysctl_int(name: str) -> int | None: + try: + result = subprocess.run(["sysctl", name], capture_output=True, text=True) + if result.returncode != 0: + logger.error( + "[sysctl] `sysctl %s` rc=%d stderr=%r", name, result.returncode, result.stderr + ) + return None + + text = result.stdout.strip().replace(":", "=") + if "=" not in text: + logger.error("[sysctl] unexpected output for %s: %r", name, text) + return None + + return int(text.split("=", 1)[1].strip()) + except Exception as error: + logger.error("[sysctl] reading %s: %s", name, error) + return None + + +def _write_sysctl_int(name: str, value: int) -> None: + sudo_run("sysctl", "-w", f"{name}={value}", check=True, text=True, capture_output=False) + + +# -------------------------- base class for system config checks/requirements -------------------------- + + +class SystemConfigurator(ABC): + critical: bool = False + + @abstractmethod + def check(self) -> bool: + """Return True if configured. Log errors and return False on uncertainty.""" + raise NotImplementedError + + @abstractmethod + def explanation(self) -> str | None: + """ + Return a human-readable summary of what would be done (sudo commands) if not configured. + Return None when no changes are needed. + """ + raise NotImplementedError + + @abstractmethod + def fix(self) -> None: + """Apply fixes (may attempt sudo, catch, and apply fallback measures if needed).""" + raise NotImplementedError + + +# ----------------------------- generic enforcement of system configs ----------------------------- + + +def configure_system(checks: list[SystemConfigurator], check_only: bool = False) -> None: + if os.environ.get("CI"): + logger.info("CI environment detected: skipping system configuration.") + return + + # run checks + failing = [check for check in checks if not check.check()] + if not failing: + return + + # ask for permission to modify system + explanations: list[str] = [msg for check in failing if (msg := check.explanation()) is not None] + + if explanations: + logger.warning("System configuration changes are recommended/required:\n") + logger.warning("\n\n".join(explanations)) + + if check_only: + return + + if not typer.confirm("\nApply these changes now?"): + if any(check.critical for check in failing): + raise SystemExit(1) + return + + for check in failing: + try: + check.fix() + except subprocess.CalledProcessError as error: + if check.critical: + logger.error("Critical fix failed rc=%d", error.returncode) + logger.error("stdout: %s", error.stdout) + logger.error("stderr: %s", error.stderr) + raise + logger.warning("Optional improvement failed: rc=%d", error.returncode) + logger.warning("stdout: %s", error.stdout) + logger.warning("stderr: %s", error.stderr) + + logger.info("System configuration completed.") diff --git a/dimos/protocol/service/system_configurator/clock_sync.py b/dimos/protocol/service/system_configurator/clock_sync.py new file mode 100644 index 0000000000..4dbefcec9e --- /dev/null +++ b/dimos/protocol/service/system_configurator/clock_sync.py @@ -0,0 +1,125 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import platform +import shutil +import socket +import struct +import time + +from dimos.protocol.service.system_configurator.base import SystemConfigurator, sudo_run +from dimos.utils.human import human_duration + + +class ClockSyncConfigurator(SystemConfigurator): + """Check that the local clock is within MAX_OFFSET_SECONDS of NTP time. + + Uses a pure-Python NTP query (RFC 4330 SNTPv4) so there are no external + dependencies. If the NTP server is unreachable the check *passes* — we + don't want unrelated network issues to block robot startup. + """ + + critical = False + MAX_OFFSET_SECONDS = 0.2 # 200 ms per issue spec + NTP_SERVER = "pool.ntp.org" + NTP_PORT = 123 + NTP_TIMEOUT = 2 # seconds + + def __init__(self) -> None: + self._offset: float | None = None # seconds, filled by check() + self._fix_cmd: list[str] = [] # resolved by check() + + @staticmethod + def _ntp_offset(server: str = "pool.ntp.org", port: int = 123, timeout: float = 2) -> float: + """Return clock offset in seconds (local - NTP). Raises on failure.""" + + # Minimal SNTPv4 request: LI=0, VN=4, Mode=3 → first byte = 0x23 + msg = b"\x23" + b"\x00" * 47 + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.settimeout(timeout) + try: + t1 = time.time() + sock.sendto(msg, (server, port)) + data, _ = sock.recvfrom(1024) + t4 = time.time() + finally: + sock.close() + + if len(data) < 48: + raise ValueError(f"NTP response too short ({len(data)} bytes)") + + # Transmit Timestamp starts at byte 40 (seconds at 40, fraction at 44) + ntp_secs: int = struct.unpack("!I", data[40:44])[0] + ntp_frac: int = struct.unpack("!I", data[44:48])[0] + # NTP epoch is 1900-01-01; Unix epoch is 1970-01-01 + ntp_time: float = ntp_secs - 2208988800 + ntp_frac / (2**32) + + # Simplified offset: assume symmetric delay + t_server = ntp_time + rtt = t4 - t1 + offset: float = t_server - (t1 + rtt / 2) + return offset + + def _resolve_fix_cmd(self) -> list[str]: + """Determine the best available NTP sync command for this platform.""" + system = platform.system() + if system == "Darwin": + return ["sntp", "-sS", self.NTP_SERVER] + if system == "Linux": + if shutil.which("ntpdate"): + return ["ntpdate", self.NTP_SERVER] + if shutil.which("sntp"): + return ["sntp", "-sS", self.NTP_SERVER] + if self._offset is not None: + new_time = time.time() - self._offset + return ["date", "-s", f"@{new_time:.3f}"] + return [] + + def check(self) -> bool: + try: + self._offset = self._ntp_offset(self.NTP_SERVER, self.NTP_PORT, self.NTP_TIMEOUT) + except (TimeoutError, OSError, ValueError) as exc: + print(f"[clock-sync] NTP query failed ({exc}); assuming clock is OK") + self._offset = None + return True # graceful degradation — don't block on network issues + + if abs(self._offset) <= self.MAX_OFFSET_SECONDS: + return True + + self._fix_cmd = self._resolve_fix_cmd() + return False + + def explanation(self) -> str | None: + if self._offset is None: + return None + if self._fix_cmd: + cmd = f"sudo {' '.join(self._fix_cmd)}" + else: + cmd = "(no NTP tool found — install ntpdate or sntp, then re-run)" + hint = "" + if platform.system() == "Linux": + hint = "\n (Alternatively, you can install systemd-timesyncd.service)" + return f"- clock sync: local clock is off by {human_duration(self._offset)}: {cmd}{hint}" + + def fix(self) -> None: + if not self._fix_cmd: + print(f"[clock-sync] No automatic fix available on {platform.system()}") + return + cmd = list(self._fix_cmd) + # Recompute the corrected time at fix-time (not stale from check-time) + if cmd[:2] == ["date", "-s"] and self._offset is not None: + cmd[2] = f"@{time.time() - self._offset:.3f}" + sudo_run(*cmd, check=True, text=True, capture_output=True) diff --git a/dimos/protocol/service/system_configurator.py b/dimos/protocol/service/system_configurator/lcm.py similarity index 73% rename from dimos/protocol/service/system_configurator.py rename to dimos/protocol/service/system_configurator/lcm.py index 44b8c45276..0443ce5ccb 100644 --- a/dimos/protocol/service/system_configurator.py +++ b/dimos/protocol/service/system_configurator/lcm.py @@ -14,129 +14,16 @@ from __future__ import annotations -from abc import ABC, abstractmethod -from functools import cache -import os import re import resource import subprocess -from typing import Any - -# ----------------------------- sudo helpers ----------------------------- - - -@cache -def _is_root_user() -> bool: - try: - return os.geteuid() == 0 - except AttributeError: - return False - - -def sudo_run(*args: Any, **kwargs: Any) -> subprocess.CompletedProcess[str]: - if _is_root_user(): - return subprocess.run(list(args), **kwargs) - return subprocess.run(["sudo", *args], **kwargs) - - -def _read_sysctl_int(name: str) -> int | None: - try: - result = subprocess.run(["sysctl", name], capture_output=True, text=True) - if result.returncode != 0: - print( - f"[sysctl] ERROR: `sysctl {name}` rc={result.returncode} stderr={result.stderr!r}" - ) - return None - - text = result.stdout.strip().replace(":", "=") - if "=" not in text: - print(f"[sysctl] ERROR: unexpected output for {name}: {text!r}") - return None - - return int(text.split("=", 1)[1].strip()) - except Exception as error: - print(f"[sysctl] ERROR: reading {name}: {error}") - return None - - -def _write_sysctl_int(name: str, value: int) -> None: - sudo_run("sysctl", "-w", f"{name}={value}", check=True, text=True, capture_output=False) - - -# -------------------------- base class for system config checks/requirements -------------------------- - - -class SystemConfigurator(ABC): - critical: bool = False - - @abstractmethod - def check(self) -> bool: - """Return True if configured. Log errors and return False on uncertainty.""" - raise NotImplementedError - - @abstractmethod - def explanation(self) -> str | None: - """ - Return a human-readable summary of what would be done (sudo commands) if not configured. - Return None when no changes are needed. - """ - raise NotImplementedError - - @abstractmethod - def fix(self) -> None: - """Apply fixes (may attempt sudo, catch, and apply fallback measures if needed).""" - raise NotImplementedError - - -# ----------------------------- generic enforcement of system configs ----------------------------- - - -def configure_system(checks: list[SystemConfigurator], check_only: bool = False) -> None: - if os.environ.get("CI"): - print("CI environment detected: skipping system configuration.") - return - - # run checks - failing = [check for check in checks if not check.check()] - if not failing: - return - - # ask for permission to modify system - explanations: list[str] = [msg for check in failing if (msg := check.explanation()) is not None] - - if explanations: - print("System configuration changes are recommended/required:\n") - print("\n\n".join(explanations)) - print() - - if check_only: - return - - try: - answer = input("Apply these changes now? [y/N]: ").strip().lower() - except (EOFError, KeyboardInterrupt): - answer = "" - - if answer not in ("y", "yes"): - if any(check.critical for check in failing): - raise SystemExit(1) - return - - for check in failing: - try: - check.fix() - except subprocess.CalledProcessError as error: - if check.critical: - print(f"Critical fix failed rc={error.returncode}") - print(f"stdout: {error.stdout}") - print(f"stderr: {error.stderr}") - raise - print(f"Optional improvement failed: rc={error.returncode}") - print(f"stdout: {error.stdout}") - print(f"stderr: {error.stderr}") - - print("System configuration completed.") +from dimos.protocol.service.system_configurator.base import ( + SystemConfigurator, + _read_sysctl_int, + _write_sysctl_int, + sudo_run, +) # ------------------------------ specific checks: multicast ------------------------------ @@ -306,7 +193,7 @@ def check(self) -> bool: def explanation(self) -> str | None: lines = [] for key, target in self.needs: - lines.append(f"- socket buffer optimization: sudo sysctl -w {key}={target}") + lines.append(f"- socket buffer optimization for LCM: sudo sysctl -w {key}={target}") return "\n".join(lines) def fix(self) -> None: @@ -343,7 +230,7 @@ def check(self) -> bool: def explanation(self) -> str | None: lines = [] for key, target in self.needs: - lines.append(f"- sudo sysctl -w {key}={target}") + lines.append(f"- socket buffer optimization for LCM: sudo sysctl -w {key}={target}") return "\n".join(lines) def fix(self) -> None: @@ -383,11 +270,15 @@ def check(self) -> bool: def explanation(self) -> str | None: lines = [] if self.can_fix_without_sudo: - lines.append(f"- Raise soft file count limit to {self.target} (no sudo required)") + lines.append( + f"- Raise soft file count limit to {self.target} for LCM (no sudo required)" + ) else: - lines.append(f"- Raise soft file count limit to {min(self.target, self.current_hard)}") lines.append( - f"- Raise hard limit via: sudo launchctl limit maxfiles {self.target} {self.target}" + f"- Raise soft file count limit to {min(self.target, self.current_hard)} for LCM" + ) + lines.append( + f"- Raise hard limit via: sudo launchctl limit maxfiles {self.target} {self.target} for LCM" ) return "\n".join(lines) diff --git a/dimos/protocol/service/test_lcmservice.py b/dimos/protocol/service/test_lcmservice.py index 4231302426..fdd2340e54 100644 --- a/dimos/protocol/service/test_lcmservice.py +++ b/dimos/protocol/service/test_lcmservice.py @@ -36,7 +36,9 @@ class TestConfigureSystemForLcm: def test_creates_linux_checks_on_linux(self) -> None: - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Linux"): + with patch( + "dimos.protocol.service.system_configurator.platform.system", return_value="Linux" + ): with patch("dimos.protocol.service.lcmservice.configure_system") as mock_configure: autoconf() mock_configure.assert_called_once() @@ -47,7 +49,9 @@ def test_creates_linux_checks_on_linux(self) -> None: assert checks[0].loopback_interface == "lo" def test_creates_macos_checks_on_darwin(self) -> None: - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Darwin"): + with patch( + "dimos.protocol.service.system_configurator.platform.system", return_value="Darwin" + ): with patch("dimos.protocol.service.lcmservice.configure_system") as mock_configure: autoconf() mock_configure.assert_called_once() @@ -59,14 +63,18 @@ def test_creates_macos_checks_on_darwin(self) -> None: assert checks[0].loopback_interface == "lo0" def test_passes_check_only_flag(self) -> None: - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Linux"): + with patch( + "dimos.protocol.service.system_configurator.platform.system", return_value="Linux" + ): with patch("dimos.protocol.service.lcmservice.configure_system") as mock_configure: autoconf(check_only=True) mock_configure.assert_called_once() assert mock_configure.call_args[1]["check_only"] is True def test_logs_error_on_unsupported_system(self) -> None: - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Windows"): + with patch( + "dimos.protocol.service.system_configurator.platform.system", return_value="Windows" + ): with patch("dimos.protocol.service.lcmservice.configure_system") as mock_configure: with patch("dimos.protocol.service.lcmservice.logger") as mock_logger: autoconf() diff --git a/dimos/protocol/service/test_system_configurator.py b/dimos/protocol/service/test_system_configurator.py index 07f8ede64c..e4180b7c12 100644 --- a/dimos/protocol/service/test_system_configurator.py +++ b/dimos/protocol/service/test_system_configurator.py @@ -14,6 +14,7 @@ import os import resource +import struct from unittest.mock import MagicMock, patch import pytest @@ -22,15 +23,18 @@ IDEAL_RMEM_SIZE, BufferConfiguratorLinux, BufferConfiguratorMacOS, + ClockSyncConfigurator, MaxFileConfiguratorMacOS, MulticastConfiguratorLinux, MulticastConfiguratorMacOS, SystemConfigurator, + configure_system, + sudo_run, +) +from dimos.protocol.service.system_configurator.base import ( _is_root_user, _read_sysctl_int, _write_sysctl_int, - configure_system, - sudo_run, ) # ----------------------------- Helper function tests ----------------------------- @@ -163,32 +167,25 @@ def test_check_only_mode_does_not_fix(self) -> None: def test_prompts_user_and_fixes_on_yes(self) -> None: with patch.dict(os.environ, {"CI": ""}, clear=False): mock_check = MockConfigurator(passes=False) - with patch("builtins.input", return_value="y"): + with patch("typer.confirm", return_value=True): configure_system([mock_check]) assert mock_check.fix_called def test_does_not_fix_on_no(self) -> None: with patch.dict(os.environ, {"CI": ""}, clear=False): mock_check = MockConfigurator(passes=False) - with patch("builtins.input", return_value="n"): + with patch("typer.confirm", return_value=False): configure_system([mock_check]) assert not mock_check.fix_called def test_exits_on_no_with_critical_check(self) -> None: with patch.dict(os.environ, {"CI": ""}, clear=False): mock_check = MockConfigurator(passes=False, is_critical=True) - with patch("builtins.input", return_value="n"): + with patch("typer.confirm", return_value=False): with pytest.raises(SystemExit) as exc_info: configure_system([mock_check]) assert exc_info.value.code == 1 - def test_handles_eof_error_on_input(self) -> None: - with patch.dict(os.environ, {"CI": ""}, clear=False): - mock_check = MockConfigurator(passes=False) - with patch("builtins.input", side_effect=EOFError): - configure_system([mock_check]) - assert not mock_check.fix_called - # ----------------------------- MulticastConfiguratorLinux tests ----------------------------- @@ -314,14 +311,14 @@ def test_fix_runs_route_command(self) -> None: class TestBufferConfiguratorLinux: def test_check_returns_true_when_buffers_sufficient(self) -> None: configurator = BufferConfiguratorLinux() - with patch("dimos.protocol.service.system_configurator._read_sysctl_int") as mock_read: + with patch("dimos.protocol.service.system_configurator.lcm._read_sysctl_int") as mock_read: mock_read.return_value = IDEAL_RMEM_SIZE assert configurator.check() is True assert configurator.needs == [] def test_check_returns_false_when_rmem_max_low(self) -> None: configurator = BufferConfiguratorLinux() - with patch("dimos.protocol.service.system_configurator._read_sysctl_int") as mock_read: + with patch("dimos.protocol.service.system_configurator.lcm._read_sysctl_int") as mock_read: mock_read.side_effect = [1048576, IDEAL_RMEM_SIZE] # rmem_max low assert configurator.check() is False assert len(configurator.needs) == 1 @@ -329,7 +326,7 @@ def test_check_returns_false_when_rmem_max_low(self) -> None: def test_check_returns_false_when_both_low(self) -> None: configurator = BufferConfiguratorLinux() - with patch("dimos.protocol.service.system_configurator._read_sysctl_int") as mock_read: + with patch("dimos.protocol.service.system_configurator.lcm._read_sysctl_int") as mock_read: mock_read.return_value = 1048576 # Both low assert configurator.check() is False assert len(configurator.needs) == 2 @@ -344,7 +341,9 @@ def test_explanation_lists_needed_changes(self) -> None: def test_fix_writes_needed_values(self) -> None: configurator = BufferConfiguratorLinux() configurator.needs = [("net.core.rmem_max", IDEAL_RMEM_SIZE)] - with patch("dimos.protocol.service.system_configurator._write_sysctl_int") as mock_write: + with patch( + "dimos.protocol.service.system_configurator.lcm._write_sysctl_int" + ) as mock_write: configurator.fix() mock_write.assert_called_once_with("net.core.rmem_max", IDEAL_RMEM_SIZE) @@ -355,7 +354,7 @@ def test_fix_writes_needed_values(self) -> None: class TestBufferConfiguratorMacOS: def test_check_returns_true_when_buffers_sufficient(self) -> None: configurator = BufferConfiguratorMacOS() - with patch("dimos.protocol.service.system_configurator._read_sysctl_int") as mock_read: + with patch("dimos.protocol.service.system_configurator.lcm._read_sysctl_int") as mock_read: mock_read.side_effect = [ BufferConfiguratorMacOS.TARGET_BUFFER_SIZE, BufferConfiguratorMacOS.TARGET_RECVSPACE, @@ -366,7 +365,7 @@ def test_check_returns_true_when_buffers_sufficient(self) -> None: def test_check_returns_false_when_values_low(self) -> None: configurator = BufferConfiguratorMacOS() - with patch("dimos.protocol.service.system_configurator._read_sysctl_int") as mock_read: + with patch("dimos.protocol.service.system_configurator.lcm._read_sysctl_int") as mock_read: mock_read.return_value = 1024 # All low assert configurator.check() is False assert len(configurator.needs) == 3 @@ -384,7 +383,9 @@ def test_fix_writes_needed_values(self) -> None: configurator.needs = [ ("kern.ipc.maxsockbuf", BufferConfiguratorMacOS.TARGET_BUFFER_SIZE), ] - with patch("dimos.protocol.service.system_configurator._write_sysctl_int") as mock_write: + with patch( + "dimos.protocol.service.system_configurator.lcm._write_sysctl_int" + ) as mock_write: configurator.fix() mock_write.assert_called_once_with( "kern.ipc.maxsockbuf", BufferConfiguratorMacOS.TARGET_BUFFER_SIZE @@ -480,3 +481,220 @@ def test_fix_raises_on_setrlimit_error(self) -> None: with patch("resource.setrlimit", side_effect=ValueError("test error")): with pytest.raises(ValueError): configurator.fix() + + +# ----------------------------- ClockSyncConfigurator tests ----------------------------- + + +class TestClockSyncConfigurator: + def test_check_passes_when_offset_within_threshold(self) -> None: + configurator = ClockSyncConfigurator() + with patch.object(ClockSyncConfigurator, "_ntp_offset", return_value=0.05): # 50ms + assert configurator.check() is True + assert configurator._offset == 0.05 + + def test_check_fails_when_offset_exceeds_threshold(self) -> None: + configurator = ClockSyncConfigurator() + with patch.object(ClockSyncConfigurator, "_ntp_offset", return_value=0.5): # 500ms + assert configurator.check() is False + assert configurator._offset == 0.5 + + def test_check_fails_with_negative_offset(self) -> None: + configurator = ClockSyncConfigurator() + with patch.object(ClockSyncConfigurator, "_ntp_offset", return_value=-0.5): # -500ms + assert configurator.check() is False + + def test_check_passes_when_ntp_unreachable(self) -> None: + configurator = ClockSyncConfigurator() + with patch.object( + ClockSyncConfigurator, "_ntp_offset", side_effect=OSError("Network unreachable") + ): + assert configurator.check() is True + assert configurator._offset is None + + def test_check_passes_on_socket_timeout(self) -> None: + configurator = ClockSyncConfigurator() + with patch.object( + ClockSyncConfigurator, "_ntp_offset", side_effect=TimeoutError("timed out") + ): + assert configurator.check() is True + + def test_check_passes_on_malformed_response(self) -> None: + configurator = ClockSyncConfigurator() + with patch.object( + ClockSyncConfigurator, "_ntp_offset", side_effect=ValueError("NTP response too short") + ): + assert configurator.check() is True + + def test_is_not_critical(self) -> None: + configurator = ClockSyncConfigurator() + assert configurator.critical is False + + def test_explanation_on_linux_with_ntpdate(self) -> None: + configurator = ClockSyncConfigurator() + configurator._offset = 0.5 # 500ms + with ( + patch( + "dimos.protocol.service.system_configurator.clock_sync.platform.system", + return_value="Linux", + ), + patch( + "dimos.protocol.service.system_configurator.clock_sync.shutil.which", + return_value="/usr/bin/ntpdate", + ), + ): + configurator._fix_cmd = configurator._resolve_fix_cmd() + explanation = configurator.explanation() + assert explanation is not None + assert "+500.0 ms" in explanation or "+0.5 s" in explanation + assert "ntpdate" in explanation + assert "systemd-timesyncd" in explanation + + def test_explanation_on_linux_no_ntp_tools(self) -> None: + configurator = ClockSyncConfigurator() + configurator._offset = 0.5 + with ( + patch( + "dimos.protocol.service.system_configurator.clock_sync.platform.system", + return_value="Linux", + ), + patch( + "dimos.protocol.service.system_configurator.clock_sync.shutil.which", + return_value=None, + ), + ): + configurator._fix_cmd = configurator._resolve_fix_cmd() + explanation = configurator.explanation() + # Falls back to `date -s` when ntpdate/sntp unavailable + assert "date -s" in explanation + assert "systemd-timesyncd" in explanation + + def test_explanation_on_macos(self) -> None: + configurator = ClockSyncConfigurator() + configurator._offset = -0.3 # -300ms + with patch( + "dimos.protocol.service.system_configurator.clock_sync.platform.system", + return_value="Darwin", + ): + configurator._fix_cmd = configurator._resolve_fix_cmd() + explanation = configurator.explanation() + assert explanation is not None + assert "-300.0 ms" in explanation + assert "sntp" in explanation + + def test_explanation_returns_none_when_ntp_unreachable(self) -> None: + configurator = ClockSyncConfigurator() + configurator._offset = None + assert configurator.explanation() is None + + def test_fix_on_linux_with_ntpdate(self) -> None: + _is_root_user.cache_clear() + configurator = ClockSyncConfigurator() + with ( + patch( + "dimos.protocol.service.system_configurator.clock_sync.platform.system", + return_value="Linux", + ), + patch( + "dimos.protocol.service.system_configurator.clock_sync.shutil.which", + side_effect=lambda cmd: "/usr/bin/ntpdate" if cmd == "ntpdate" else None, + ), + patch("os.geteuid", return_value=0), + patch("subprocess.run") as mock_run, + ): + configurator._fix_cmd = configurator._resolve_fix_cmd() + mock_run.return_value = MagicMock(returncode=0) + configurator.fix() + assert mock_run.call_count == 1 + assert "ntpdate" in mock_run.call_args_list[0][0][0] + + def test_fix_on_linux_sntp_fallback(self) -> None: + _is_root_user.cache_clear() + configurator = ClockSyncConfigurator() + with ( + patch( + "dimos.protocol.service.system_configurator.clock_sync.platform.system", + return_value="Linux", + ), + patch( + "dimos.protocol.service.system_configurator.clock_sync.shutil.which", + side_effect=lambda cmd: "/usr/bin/sntp" if cmd == "sntp" else None, + ), + patch("os.geteuid", return_value=0), + patch("subprocess.run") as mock_run, + ): + configurator._fix_cmd = configurator._resolve_fix_cmd() + mock_run.return_value = MagicMock(returncode=0) + configurator.fix() + assert mock_run.call_count == 1 + assert "sntp" in mock_run.call_args_list[0][0][0] + + def test_fix_on_linux_date_fallback(self) -> None: + _is_root_user.cache_clear() + configurator = ClockSyncConfigurator() + configurator._offset = 1.0 + with ( + patch( + "dimos.protocol.service.system_configurator.clock_sync.platform.system", + return_value="Linux", + ), + patch( + "dimos.protocol.service.system_configurator.clock_sync.shutil.which", + return_value=None, + ), + patch("os.geteuid", return_value=0), + patch("subprocess.run") as mock_run, + ): + configurator._fix_cmd = configurator._resolve_fix_cmd() + mock_run.return_value = MagicMock(returncode=0) + configurator.fix() + assert mock_run.call_count == 1 + assert "date" in mock_run.call_args_list[0][0][0] + + def test_fix_on_macos(self) -> None: + _is_root_user.cache_clear() + configurator = ClockSyncConfigurator() + with ( + patch( + "dimos.protocol.service.system_configurator.clock_sync.platform.system", + return_value="Darwin", + ), + patch("os.geteuid", return_value=0), + patch("subprocess.run") as mock_run, + ): + configurator._fix_cmd = configurator._resolve_fix_cmd() + mock_run.return_value = MagicMock(returncode=0) + configurator.fix() + assert mock_run.call_count == 1 + args = mock_run.call_args[0][0] + assert "sntp" in args + + def test_ntp_offset_with_mocked_socket(self) -> None: + # Build a minimal NTP response with a known transmit timestamp + # NTP epoch offset: 2208988800 seconds between 1900 and 1970 + fake_time = 1700000000.0 # a Unix timestamp + ntp_secs = int(fake_time) + 2208988800 + ntp_frac = 0 + response = b"\x00" * 40 + struct.pack("!II", ntp_secs, ntp_frac) + + with patch("socket.socket") as mock_socket_cls: + mock_sock = MagicMock() + mock_socket_cls.return_value = mock_sock + mock_sock.recvfrom.return_value = (response, ("pool.ntp.org", 123)) + + with patch("time.time", side_effect=[fake_time, fake_time + 0.01]): + offset = ClockSyncConfigurator._ntp_offset("pool.ntp.org", 123, 2) + # With zero RTT offset and matching times, offset should be close to 0 + # t1=fake_time, t4=fake_time+0.01, server=fake_time + # offset = fake_time - (fake_time + 0.005) = -0.005 + assert abs(offset - (-0.005)) < 0.001 + + def test_ntp_offset_raises_on_short_response(self) -> None: + with patch("socket.socket") as mock_socket_cls: + mock_sock = MagicMock() + mock_socket_cls.return_value = mock_sock + mock_sock.recvfrom.return_value = (b"\x00" * 10, ("pool.ntp.org", 123)) + + with patch("time.time", return_value=1700000000.0): + with pytest.raises(ValueError, match="too short"): + ClockSyncConfigurator._ntp_offset() diff --git a/dimos/robot/cli/dimos.py b/dimos/robot/cli/dimos.py index 3979138216..03e9a91349 100644 --- a/dimos/robot/cli/dimos.py +++ b/dimos/robot/cli/dimos.py @@ -102,7 +102,6 @@ def run( ) -> None: """Start a robot blueprint""" from dimos.core.blueprints import autoconnect - from dimos.protocol import pubsub from dimos.robot.get_all_blueprints import get_by_name from dimos.utils.logging_config import setup_exception_handler @@ -110,7 +109,6 @@ def run( cli_config_overrides: dict[str, Any] = ctx.obj global_config.update(**cli_config_overrides) - pubsub.lcm.autoconf() # type: ignore[attr-defined] blueprint = autoconnect(*map(get_by_name, robot_types)) dimos = blueprint.build(cli_config_overrides=cli_config_overrides) diff --git a/dimos/robot/unitree/g1/blueprints/basic/unitree_g1_basic.py b/dimos/robot/unitree/g1/blueprints/basic/unitree_g1_basic.py index 1fb591e895..f2ec866b89 100644 --- a/dimos/robot/unitree/g1/blueprints/basic/unitree_g1_basic.py +++ b/dimos/robot/unitree/g1/blueprints/basic/unitree_g1_basic.py @@ -17,6 +17,7 @@ from dimos.core.blueprints import autoconnect from dimos.navigation.rosnav import ros_nav +from dimos.protocol.service.system_configurator import ClockSyncConfigurator from dimos.robot.unitree.g1.blueprints.primitive.uintree_g1_primitive_no_nav import ( uintree_g1_primitive_no_nav, ) @@ -26,6 +27,6 @@ uintree_g1_primitive_no_nav, g1_connection(), ros_nav(), -) +).requirements(ClockSyncConfigurator()) __all__ = ["unitree_g1_basic"] diff --git a/dimos/robot/unitree/go2/blueprints/basic/unitree_go2_basic.py b/dimos/robot/unitree/go2/blueprints/basic/unitree_go2_basic.py index cfd53abe51..8f31ac7206 100644 --- a/dimos/robot/unitree/go2/blueprints/basic/unitree_go2_basic.py +++ b/dimos/robot/unitree/go2/blueprints/basic/unitree_go2_basic.py @@ -22,6 +22,7 @@ from dimos.core.transport import pSHMTransport from dimos.msgs.sensor_msgs import Image from dimos.protocol.pubsub.impl.lcmpubsub import LCM +from dimos.protocol.service.system_configurator import ClockSyncConfigurator from dimos.robot.unitree.go2.connection import go2_connection from dimos.web.websocket_vis.websocket_vis_module import websocket_vis @@ -93,11 +94,17 @@ case _: with_vis = _transports_base -unitree_go2_basic = autoconnect( - with_vis, - go2_connection(), - websocket_vis(), -).global_config(n_dask_workers=4, robot_model="unitree_go2") +unitree_go2_basic = ( + autoconnect( + with_vis, + go2_connection(), + websocket_vis(), + ) + .global_config(n_dask_workers=4, robot_model="unitree_go2") + .requirements( + ClockSyncConfigurator(), + ) +) __all__ = [ "unitree_go2_basic", diff --git a/dimos/utils/cli/lcmspy/lcmspy.py b/dimos/utils/cli/lcmspy/lcmspy.py index 5493e53024..2df800591f 100755 --- a/dimos/utils/cli/lcmspy/lcmspy.py +++ b/dimos/utils/cli/lcmspy/lcmspy.py @@ -14,30 +14,11 @@ from collections import deque from dataclasses import dataclass -from enum import Enum import threading import time from dimos.protocol.service.lcmservice import LCMConfig, LCMService - - -class BandwidthUnit(Enum): - BP = "B" - KBP = "kB" - MBP = "MB" - GBP = "GB" - - -def human_readable_bytes(bytes_value: float, round_to: int = 2) -> tuple[float, BandwidthUnit]: - """Convert bytes to human-readable format with appropriate units""" - if bytes_value >= 1024**3: # GB - return round(bytes_value / (1024**3), round_to), BandwidthUnit.GBP - elif bytes_value >= 1024**2: # MB - return round(bytes_value / (1024**2), round_to), BandwidthUnit.MBP - elif bytes_value >= 1024: # KB - return round(bytes_value / 1024, round_to), BandwidthUnit.KBP - else: - return round(bytes_value, round_to), BandwidthUnit.BP +from dimos.utils.human import human_bytes class Topic: @@ -88,12 +69,10 @@ def kbps(self, time_window: float) -> float: total_kbytes = total_bytes / 1000 # Convert bytes to kB return total_kbytes / time_window # type: ignore[no-any-return] - def kbps_hr(self, time_window: float, round_to: int = 2) -> tuple[float, BandwidthUnit]: + def kbps_hr(self, time_window: float) -> str: """Return human-readable bandwidth with appropriate units""" - kbps_val = self.kbps(time_window) - # Convert kB/s to B/s for human_readable_bytes - bps = kbps_val * 1000 - return human_readable_bytes(bps, round_to) + bps = self.kbps(time_window) * 1000 + return human_bytes(bps) + "/s" # avg msg size in the last n seconds def size(self, time_window: float) -> float: @@ -107,10 +86,9 @@ def total_traffic(self) -> int: """Return total traffic passed in bytes since the beginning""" return self.total_traffic_bytes - def total_traffic_hr(self) -> tuple[float, BandwidthUnit]: + def total_traffic_hr(self) -> str: """Return human-readable total traffic with appropriate units""" - total_bytes = self.total_traffic() - return human_readable_bytes(total_bytes) + return human_bytes(self.total_traffic()) def __str__(self) -> str: return f"topic({self.name})" diff --git a/dimos/utils/cli/lcmspy/run_lcmspy.py b/dimos/utils/cli/lcmspy/run_lcmspy.py index f3d31b48ba..5be0bf28b5 100644 --- a/dimos/utils/cli/lcmspy/run_lcmspy.py +++ b/dimos/utils/cli/lcmspy/run_lcmspy.py @@ -106,14 +106,12 @@ def refresh_table(self) -> None: for t in topics: freq = t.freq(5.0) kbps = t.kbps(5.0) - bw_val, bw_unit = t.kbps_hr(5.0) - total_val, total_unit = t.total_traffic_hr() self.table.add_row( # type: ignore[union-attr] topic_text(t.name), Text(f"{freq:.1f}", style=gradient(10, freq)), - Text(f"{bw_val} {bw_unit.value}/s", style=gradient(1024 * 3, kbps)), - Text(f"{total_val} {total_unit.value}"), + Text(t.kbps_hr(5.0), style=gradient(1024 * 3, kbps)), + Text(t.total_traffic_hr()), ) diff --git a/dimos/utils/human.py b/dimos/utils/human.py new file mode 100644 index 0000000000..c90c575048 --- /dev/null +++ b/dimos/utils/human.py @@ -0,0 +1,66 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Human-readable formatters for durations, byte sizes, and numbers.""" + +from __future__ import annotations + + +def human_duration(seconds: float, signed: bool = True) -> str: + """Format a duration in seconds to a human-readable string. + + Examples: ``"25.1 ms"``, ``"1.3 s"``, ``"4m 12s"``, ``"2h 3m"``. + + When *signed* is True (the default), the result is prefixed with ``+`` + or ``-``. Set *signed=False* for unsigned values like latencies. + """ + sign = ("+" if seconds >= 0 else "-") if signed else "" + s = abs(seconds) + if s < 1: + return f"{sign}{s * 1000:.1f} ms" + if s < 60: + return f"{sign}{s:.1f} s" + m, s = divmod(s, 60) + if m < 60: + return f"{sign}{int(m)}m {int(s)}s" + h, m = divmod(m, 60) + return f"{sign}{int(h)}h {int(m)}m" + + +def human_number(value: float, decimals: int = 1) -> str: + """Format a number with SI suffixes (k, M, G, ...). + + Examples: ``"42"``, ``"1.5k"``, ``"3.2M"``. + """ + for unit in ("", "k", "M", "G", "T"): + if abs(value) < 1000: + return f"{value:.{decimals}f}{unit}" if unit else f"{value:.0f}" + value /= 1000 + return f"{value:.{decimals}f}P" + + +def human_bytes(value: float, concise: bool = False, decimals: int = 2) -> str: + """Format bytes with IEC units (1024-based: KiB, MiB, GiB, ...). + + *concise=True* uses short suffixes without a space (``"1.50K"``). + *decimals* controls the number of decimal places. + """ + k = 1024.0 + units = ["B", "K", "M", "G", "T"] if concise else ["B", "KiB", "MiB", "GiB", "TiB"] + + for unit in units[:-1]: + if abs(value) < k: + return f"{value:.{decimals}f}{unit}" if concise else f"{value:.{decimals}f} {unit}" + value /= k + return f"{value:.{decimals}f}{units[-1]}" if concise else f"{value:.{decimals}f} {units[-1]}"