Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/code-cleanup.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
name: code-cleanup
on:
pull_request:
paths-ignore:
- '**.md'

permissions:
contents: write
Expand Down
42 changes: 33 additions & 9 deletions dimos/core/blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import operator
import sys
from types import MappingProxyType
from typing import Any, Literal, get_args, get_origin, get_type_hints
from typing import TYPE_CHECKING, Any, Literal, get_args, get_origin, get_type_hints

if TYPE_CHECKING:
from dimos.protocol.service.system_configurator import SystemConfigurator

from dimos.core.global_config import GlobalConfig, global_config
from dimos.core.module import Module, is_module_type
Expand Down Expand Up @@ -108,7 +111,9 @@ class Blueprint:
remapping_map: Mapping[tuple[type[Module], str], str | type[Module] | type[Spec]] = field(
default_factory=lambda: MappingProxyType({})
)
requirement_checks: tuple[Callable[[], str | None], ...] = field(default_factory=tuple)
requirement_checks: "tuple[Callable[[], str | None] | SystemConfigurator, ...]" = field(
default_factory=tuple
)

@classmethod
def create(cls, module: type[Module], *args: Any, **kwargs: Any) -> "Blueprint":
Expand Down Expand Up @@ -148,7 +153,7 @@ def remappings(
requirement_checks=self.requirement_checks,
)

def requirements(self, *checks: Callable[[], str | None]) -> "Blueprint":
def requirements(self, *checks: "Callable[[], str | None] | SystemConfigurator") -> "Blueprint":
return Blueprint(
blueprints=self.blueprints,
transport_map=self.transport_map,
Expand Down Expand Up @@ -203,16 +208,35 @@ def _is_name_unique(self, name: str) -> bool:
return sum(1 for n, _ in self._all_name_types if n == name) == 1

def _check_requirements(self) -> None:
errors = []
red = "\033[31m"
reset = "\033[0m"
from dimos.protocol.service.system_configurator import (
SystemConfigurator,
configure_system,
lcm_configurators,
)

configurators = list(lcm_configurators())
other_checks: list[Callable[[], str | None]] = []
for check in self.requirement_checks:
error = check()
if error:
errors.append(error)
if isinstance(check, SystemConfigurator):
configurators.append(check)
else:
other_checks.append(check)

if configurators:
try:
configure_system(configurators)
except SystemExit:
labels = [type(c).__name__ for c in configurators]
print(
f"Required system configuration was declined: {', '.join(labels)}",
file=sys.stderr,
)
sys.exit(1)

Comment on lines +225 to 235
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

configuring and checking requirements seem different. I think you should add:

autoconnect(
    ...
).configurators(SystemConfigurator())

Copy link
Contributor Author

@leshy leshy Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered this and it would have been easier but I always like to have less categories then more.

I consider this a category of "function you call - it does either checks or config, if it errors out stuff is not good to run" so this can be pings, clock, network checks, LFS checks etc. From a blueprint perspective that's the way you want to think about the category imo, no need to know what's inside the functions.

Only ugliness here is that configurators are not functions but these classes. I considered keeping the wrapper to turn them into functions external to blueprints.py but this complicates the user facing API.

like

blueprint.requirements(configurator(Clock, Network)) 

There are a few more things with configurators to refactor, I'd like to turn them info functions, so I expect this code to go away from blueprints.. how does this sound?

errors = [e for check in other_checks if (e := check())]
if errors:
red = "\033[31m"
reset = "\033[0m"
for error in errors:
print(f"{red}Error: {error}{reset}", file=sys.stderr)
sys.exit(1)
Expand Down
14 changes: 4 additions & 10 deletions dimos/protocol/pubsub/benchmark/test_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
MsgGen,
PubSubContext,
)
from dimos.utils.human import human_bytes

# Message sizes for throughput benchmarking (powers of 2 from 64B to 10MB)
MSG_SIZES = [
Expand Down Expand Up @@ -56,15 +57,6 @@
RECEIVE_TIMEOUT = 1.0


def size_id(size: int) -> str:
"""Convert byte size to human-readable string for test IDs."""
if size >= 1048576:
return f"{size // 1048576}MB"
if size >= 1024:
return f"{size // 1024}KB"
return f"{size}B"


def pubsub_id(testcase: Case[Any, Any]) -> str:
"""Extract pubsub implementation name from context manager function name."""
name: str = testcase.pubsub_context.__name__
Expand All @@ -86,7 +78,9 @@ def benchmark_results() -> Generator[BenchmarkResults, None, None]:


@pytest.mark.tool
@pytest.mark.parametrize("msg_size", MSG_SIZES, ids=[size_id(s) for s in MSG_SIZES])
@pytest.mark.parametrize(
"msg_size", MSG_SIZES, ids=[human_bytes(s, concise=True, decimals=0) for s in MSG_SIZES]
)
@pytest.mark.parametrize("pubsub_context, msggen", testcases, ids=[pubsub_id(t) for t in testcases])
def test_throughput(
pubsub_context: PubSubContext[Any, Any],
Expand Down
50 changes: 9 additions & 41 deletions dimos/protocol/pubsub/benchmark/type.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing import Any, Generic

from dimos.protocol.pubsub.spec import MsgT, PubSub, TopicT
from dimos.utils.human import human_bytes, human_duration, human_number

MsgGen = Callable[[int], tuple[TopicT, MsgT]]

Expand All @@ -41,33 +42,6 @@ def __len__(self) -> int:
TestData = Sequence[Case[Any, Any]]


def _format_mib(value: float) -> str:
"""Format bytes as MiB with intelligent rounding.

>= 10 MiB: integer (e.g., "42")
1-10 MiB: 1 decimal (e.g., "2.5")
< 1 MiB: 2 decimals (e.g., "0.07")
"""
mib = value / (1024**2)
if mib >= 10:
return f"{mib:.0f}"
if mib >= 1:
return f"{mib:.1f}"
return f"{mib:.2f}"


def _format_iec(value: float, concise: bool = False, decimals: int = 2) -> str:
"""Format bytes with IEC units (Ki/Mi/Gi = 1024^1/2/3)"""
k = 1024.0
units = ["B", "K", "M", "G", "T"] if concise else ["B", "KiB", "MiB", "GiB", "TiB"]

for unit in units[:-1]:
if abs(value) < k:
return f"{value:.{decimals}f}{unit}" if concise else f"{value:.{decimals}f} {unit}"
value /= k
return f"{value:.{decimals}f}{units[-1]}" if concise else f"{value:.{decimals}f} {units[-1]}"


@dataclass
class BenchmarkResult:
transport: str
Expand Down Expand Up @@ -133,11 +107,13 @@ def print_summary(self) -> None:
recv_style = "yellow" if r.receive_time > 0.1 else "dim"
table.add_row(
r.transport,
_format_iec(r.msg_size_bytes, decimals=0),
human_bytes(r.msg_size_bytes, decimals=0),
f"{r.msgs_sent:,}",
f"{r.msgs_received:,}",
f"{r.throughput_msgs:,.0f}",
_format_mib(r.throughput_bytes),
(lambda m: f"{m:.2f}" if m < 1 else f"{m:.1f}" if m < 10 else f"{m:.0f}")(
r.throughput_bytes / 1024**2
),
f"[{recv_style}]{r.receive_time * 1000:.0f}ms[/{recv_style}]",
f"[{loss_style}]{r.loss_pct:.1f}%[/{loss_style}]",
)
Expand Down Expand Up @@ -211,7 +187,7 @@ def val_to_color(v: float) -> int:
return gradient[int(t * (len(gradient) - 1))]

reset = "\033[0m"
size_labels = [_format_iec(s, concise=True, decimals=0) for s in sizes]
size_labels = [human_bytes(s, concise=True, decimals=0) for s in sizes]
col_w = max(8, max(len(s) for s in size_labels) + 1)
transport_w = max(len(t) for t in transports) + 1

Expand All @@ -236,31 +212,23 @@ def val_to_color(v: float) -> int:
def print_heatmap(self) -> None:
"""Print msgs/sec heatmap."""

def fmt(v: float) -> str:
return f"{v / 1000:.1f}k" if v >= 1000 else f"{v:.0f}"

self._print_heatmap("Msgs/sec", lambda r: r.throughput_msgs, fmt)
self._print_heatmap("Msgs/sec", lambda r: r.throughput_msgs, human_number)

def print_bandwidth_heatmap(self) -> None:
"""Print bandwidth heatmap."""

def fmt(v: float) -> str:
return _format_iec(v, concise=True, decimals=1)
return human_bytes(v, concise=True, decimals=1)

self._print_heatmap("Bandwidth (IEC)", lambda r: r.throughput_bytes, fmt)

def print_latency_heatmap(self) -> None:
"""Print latency heatmap (time waiting for messages after publishing)."""

def fmt(v: float) -> str:
if v >= 1:
return f"{v:.1f}s"
return f"{v * 1000:.0f}ms"

self._print_heatmap(
"Latency",
lambda r: r.receive_time,
fmt,
lambda v: human_duration(v, signed=False),
high_is_good=False,
)

Expand Down
29 changes: 4 additions & 25 deletions dimos/protocol/service/lcmservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,7 @@
import lcm

from dimos.protocol.service.spec import Service
from dimos.protocol.service.system_configurator import (
BufferConfiguratorLinux,
BufferConfiguratorMacOS,
MaxFileConfiguratorMacOS,
MulticastConfiguratorLinux,
MulticastConfiguratorMacOS,
SystemConfigurator,
configure_system,
)
from dimos.protocol.service.system_configurator import configure_system, lcm_configurators
from dimos.utils.logging_config import setup_logger

logger = setup_logger()
Expand All @@ -46,22 +38,9 @@


def autoconf(check_only: bool = False) -> None:
# check multicast and buffer sizes
system = platform.system()
checks: list[SystemConfigurator] = []
if system == "Linux":
checks = [
MulticastConfiguratorLinux(loopback_interface="lo"),
BufferConfiguratorLinux(),
]
elif system == "Darwin":
checks = [
MulticastConfiguratorMacOS(loopback_interface="lo0"),
BufferConfiguratorMacOS(),
MaxFileConfiguratorMacOS(),
]
else:
logger.error(f"System configuration not supported on {system}")
checks = lcm_configurators()
if not checks:
logger.error(f"System configuration not supported on {platform.system()}")
return
configure_system(checks, check_only=check_only)

Expand Down
75 changes: 75 additions & 0 deletions dimos/protocol/service/system_configurator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""System configurator package — re-exports for backward compatibility."""

import platform

from dimos.protocol.service.system_configurator.base import (
SystemConfigurator,
configure_system,
sudo_run,
)
from dimos.protocol.service.system_configurator.clock_sync import ClockSyncConfigurator
from dimos.protocol.service.system_configurator.lcm import (
IDEAL_RMEM_SIZE,
BufferConfiguratorLinux,
BufferConfiguratorMacOS,
MaxFileConfiguratorMacOS,
MulticastConfiguratorLinux,
MulticastConfiguratorMacOS,
)


# TODO: This is a configurator API issue and inserted here temporarily
#
# We need to use different configurators based on the underlying OS
#
# We should have separation of concerns, nothing but configurators themselves care about the OS in this context
#
# So configurators with multi-os behavior should be responsible for the right per-OS behaviour, and
# not external systems
#
# We might want to have some sort of recursive configurators
#
def lcm_configurators() -> list[SystemConfigurator]:
"""Return the platform-appropriate LCM system configurators."""
system = platform.system()
if system == "Linux":
return [
MulticastConfiguratorLinux(loopback_interface="lo"),
BufferConfiguratorLinux(),
]
elif system == "Darwin":
return [
MulticastConfiguratorMacOS(loopback_interface="lo0"),
BufferConfiguratorMacOS(),
MaxFileConfiguratorMacOS(), # TODO: this is not LCM related and shouldn't be here at all
]
return []


__all__ = [
"IDEAL_RMEM_SIZE",
"BufferConfiguratorLinux",
"BufferConfiguratorMacOS",
"ClockSyncConfigurator",
"MaxFileConfiguratorMacOS",
"MulticastConfiguratorLinux",
"MulticastConfiguratorMacOS",
"SystemConfigurator",
"configure_system",
"lcm_configurators",
"sudo_run",
]
Loading
Loading