Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion dimos/core/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from dimos.protocol.pubsub.impl.lcmpubsub import LCM, JpegLCM, PickleLCM, Topic as LCMTopic
from dimos.protocol.pubsub.impl.rospubsub import DimosROS, ROSTopic
from dimos.protocol.pubsub.impl.shmpubsub import BytesSharedMemory, PickleSharedMemory
from dimos.protocol.pubsub.impl.zenohpubsub import Topic as ZenohTopic, ZenohPubSub

if TYPE_CHECKING:
from collections.abc import Callable
Expand Down Expand Up @@ -319,4 +320,35 @@ def subscribe(
return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg))


class ZenohTransport(PubSubTransport[T]): ...
class ZenohTransport(PubSubTransport[T]):
def __init__(self, topic: str, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(ZenohTopic(topic))
self.zenoh = ZenohPubSub(**kwargs)
self._started: bool = False
self._start_lock = threading.RLock()

def start(self) -> None:
with self._start_lock:
if not self._started:
self.zenoh.start()
self._started = True

def stop(self) -> None:
with self._start_lock:
if self._started:
self.zenoh.stop()
self._started = False

def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def]
with self._start_lock:
if not self._started:
self.start()
self.zenoh.publish(self.topic, msg)

def subscribe(
self, callback: Callable[[T], None], selfstream: Stream[T] | None = None
) -> Callable[[], None]:
with self._start_lock:
if not self._started:
self.start()
return self.zenoh.subscribe(self.topic, lambda msg, topic: callback(msg))
22 changes: 22 additions & 0 deletions dimos/protocol/pubsub/benchmark/testdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
LCMSharedMemory,
PickleSharedMemory,
)
from dimos.protocol.pubsub.impl.zenohpubsub import Topic as ZenohTopic, ZenohPubSub


def make_data_bytes(size: int) -> bytes:
Expand Down Expand Up @@ -272,6 +273,27 @@ def redis_msggen(size: int) -> tuple[str, Any]:
print("Redis not available")


@contextmanager
def zenoh_pubsub_channel() -> Generator[ZenohPubSub, None, None]:
zenoh_pubsub = ZenohPubSub()
zenoh_pubsub.start()
yield zenoh_pubsub
zenoh_pubsub.stop()


def zenoh_msggen(size: int) -> tuple[ZenohTopic, bytes]:
"""Generate raw bytes for Zenoh pubsub benchmark."""
return (ZenohTopic("benchmark/zenoh"), make_data_bytes(size))


testcases.append(
Case(
pubsub_context=zenoh_pubsub_channel,
msg_gen=zenoh_msggen,
)
)


from dimos.protocol.pubsub.impl.rospubsub import (
ROS_AVAILABLE,
DimosROS,
Expand Down
1 change: 1 addition & 0 deletions dimos/protocol/pubsub/impl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
PickleLCM as PickleLCM,
)
from dimos.protocol.pubsub.impl.memory import Memory as Memory
from dimos.protocol.pubsub.impl.zenohpubsub import ZenohPubSub as ZenohPubSub
109 changes: 109 additions & 0 deletions dimos/protocol/pubsub/impl/zenohpubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from collections.abc import Callable
from dataclasses import dataclass
import threading
from typing import TYPE_CHECKING, Any, TypeAlias

from dimos.protocol.pubsub.spec import PubSub
from dimos.protocol.service.zenohservice import ZenohService
from dimos.utils.logging_config import setup_logger

if TYPE_CHECKING:
import zenoh

logger = setup_logger()


@dataclass(frozen=True)
class Topic:
"""Represents a Zenoh topic (key expression)."""

name: str

def __str__(self) -> str:
return self.name


MessageCallback: TypeAlias = Callable[[Any, Topic], None]


class ZenohPubSub(ZenohService, PubSub[Topic, Any]):
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._publishers: dict[Topic, zenoh.Publisher] = {}
self._publisher_lock = threading.Lock()
self._subscribers: list[zenoh.Subscriber] = []
self._subscriber_lock = threading.Lock()

def _get_publisher(self, topic: Topic) -> zenoh.Publisher:
"""Get or create a Publisher for the given topic."""
with self._publisher_lock:
if topic not in self._publishers:
self._publishers[topic] = self.session.declare_publisher(topic.name)
return self._publishers[topic]

def publish(self, topic: Topic, message: bytes | str) -> None:
"""Publish a message to a Zenoh topic."""
publisher = self._get_publisher(topic)
try:
publisher.put(message)
except Exception as e:
logger.error(f"Error publishing to topic {topic}: {e}", exc_info=True)

def subscribe(self, topic: Topic, callback: MessageCallback) -> Callable[[], None]:
"""Subscribe to a Zenoh topic with a callback.

Each call declares its own Zenoh subscriber (Zenoh spawns a
background thread per callback handler). Unsubscribe undeclares it.
"""

def on_sample(sample: zenoh.Sample) -> None:
callback(sample.payload.to_bytes(), topic)

sub = self.session.declare_subscriber(topic.name, on_sample)
with self._subscriber_lock:
self._subscribers.append(sub)

def unsubscribe() -> None:
sub.undeclare()
with self._subscriber_lock:
try:
self._subscribers.remove(sub)
except ValueError:
pass

return unsubscribe

def stop(self) -> None:
"""Stop the Zenoh pub/sub and clean up resources."""
with self._subscriber_lock:
for subscriber in self._subscribers:
subscriber.undeclare()
self._subscribers.clear()
with self._publisher_lock:
for publisher in self._publishers.values():
publisher.undeclare()
self._publishers.clear()
super().stop()


__all__ = [
"MessageCallback",
"Topic",
"ZenohPubSub",
]
11 changes: 11 additions & 0 deletions dimos/protocol/pubsub/test_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from dimos.msgs.geometry_msgs import Vector3
from dimos.protocol.pubsub.impl.lcmpubsub import LCM, Topic
from dimos.protocol.pubsub.impl.memory import Memory
from dimos.protocol.pubsub.impl.zenohpubsub import Topic as ZenohTopic, ZenohPubSub


@contextmanager
Expand Down Expand Up @@ -124,6 +125,16 @@ def lcm_context() -> Generator[LCM, None, None]:
)


@contextmanager
def zenoh_context() -> Generator[ZenohPubSub, None, None]:
zenoh_pubsub = ZenohPubSub()
zenoh_pubsub.start()
yield zenoh_pubsub
zenoh_pubsub.stop()


testdata.append((zenoh_context, ZenohTopic("test/zenoh/spec"), [b"value1", b"value2", b"value3"]))

from dimos.protocol.pubsub.impl.shmpubsub import PickleSharedMemory


Expand Down
1 change: 1 addition & 0 deletions dimos/protocol/service/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dimos.protocol.service.lcmservice import LCMService
from dimos.protocol.service.spec import Configurable as Configurable, Service as Service
from dimos.protocol.service.zenohservice import ZenohService as ZenohService

__all__ = [
"Configurable",
Expand Down
84 changes: 84 additions & 0 deletions dimos/protocol/service/zenohservice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from dataclasses import dataclass, field
import json
import threading
from typing import Any

import zenoh

from dimos.protocol.service.spec import Service
from dimos.utils.logging_config import setup_logger

logger = setup_logger()

_sessions: dict[str, zenoh.Session] = {}
_sessions_lock = threading.Lock()


@dataclass
class ZenohConfig:
"""Configuration for Zenoh service."""

mode: str = "peer"
connect: list[str] = field(default_factory=list)
listen: list[str] = field(default_factory=list)

@property
def session_key(self) -> str:
"""Produce a hashable key for singleton session lookup."""
return f"{self.mode}|{json.dumps(sorted(self.connect))}|{json.dumps(sorted(self.listen))}"


class ZenohService(Service[ZenohConfig]):
default_config = ZenohConfig

def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)

def start(self) -> None:
"""Start the Zenoh service."""
key = self.config.session_key
with _sessions_lock:
if key not in _sessions:
config = zenoh.Config()
config.insert_json5("mode", json.dumps(self.config.mode))
if self.config.connect:
config.insert_json5("connect/endpoints", json.dumps(self.config.connect))
if self.config.listen:
config.insert_json5("listen/endpoints", json.dumps(self.config.listen))
_sessions[key] = zenoh.open(config)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zenoh sessions in _sessions dict are never closed - sessions accumulate without cleanup. Consider adding reference counting or explicit cleanup in stop().

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zenoh sessions in _sessions dict are never closed when services stop. This matches the DDS pattern but could leak resources if many different session configs are created over time.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

logger.info(f"Zenoh service started in {self.config.mode} mode")
super().start()

def stop(self) -> None:
"""Stop the Zenoh service."""
super().stop()

@property
def session(self) -> zenoh.Session:
"""Get the Zenoh Session instance for this service's config."""
key = self.config.session_key
if key not in _sessions:
raise RuntimeError("Zenoh session not initialized")
return _sessions[key]


__all__ = [
"ZenohConfig",
"ZenohService",
]
39 changes: 38 additions & 1 deletion docs/usage/transports.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ P: box "PubSub" rad 5px
# Descriptions below
text "robot configs" at B.s + (0.1, -0.2in)
text "camera, nav" at M.s + (0, -0.2in)
text "LCM, SHM, ROS" at T.s + (0, -0.2in)
text "LCM, SHM, Zenoh" at T.s + (0, -0.2in)
text "pub/sub API" at P.s + (0, -0.2in)
```

Expand Down Expand Up @@ -289,6 +289,42 @@ shm.stop()
Received: [{'data': [1, 2, 3]}]
```

### Zenoh

Zenoh is a high-performance pub/sub protocol. Topics are untyped key expressions; payloads are raw `bytes` or `str` (serialization is your choice).

```python session=zenoh_demo ansi=false
import json
from dataclasses import dataclass, asdict
from dimos.protocol.pubsub.impl.zenohpubsub import ZenohPubSub, Topic

@dataclass
class SensorReading:
value: float

zenoh = ZenohPubSub()
zenoh.start()

received = []
sensor_topic = Topic(name="sensors/temperature")

zenoh.subscribe(sensor_topic, lambda msg, t: received.append(SensorReading(**json.loads(msg))))
zenoh.publish(sensor_topic, json.dumps(asdict(SensorReading(value=22.5))))

import time
time.sleep(0.1)

print(f"Received: {received}")
zenoh.stop()
```

<!--Result:-->
```
Received: [SensorReading(value=22.5)]
```

Zenoh is interoperable with any Zenoh client (Rust, C, C++, etc.) since it sends raw bytes on the wire. Use any serialization format you like (JSON, Protobuf, LCM binary, etc.) — the transport doesn't impose one.

### DDS Transport

For network communication, DDS uses the Data Distribution Service (DDS) protocol:
Expand Down Expand Up @@ -434,4 +470,5 @@ python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_b
| `LCM` | Robot LAN broadcast (UDP multicast) | Yes | Yes | Best-effort; can drop packets on LAN |
| `Redis` | Network pubsub via Redis server | Yes | Yes | Central broker; adds hop |
| `ROS` | ROS 2 topic communication | Yes | Yes | Integrates with RViz/ROS tools |
| `Zenoh` | High-perf network pubsub | Yes | Yes | Raw bytes/str; no serialization overhead |
| `DDS` | Cyclone DDS without ROS (WIP) | Yes | Yes | WIP |
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies = [
# Transport Protocols
"dimos-lcm",
"PyTurboJPEG==1.8.2",
"eclipse-zenoh>=1.7.2",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check that ZenohTransport implementation in dimos/core/transport.py:322 is complete - currently it's just an empty stub (class ZenohTransport(PubSubTransport[T]): ...)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if docs/usage/transports.md should be updated to document Zenoh transport alongside LCM, SharedMemory, DDS, and ROS transports

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

# Core
"numpy>=1.26.4",
"scipy>=1.15.1",
Expand Down
Loading