Skip to content

Comments

[DRAFT] Create Zenoh Transport Protocol#1296

Open
Kaweees wants to merge 2 commits intodevfrom
miguel/zenoh_transport
Open

[DRAFT] Create Zenoh Transport Protocol#1296
Kaweees wants to merge 2 commits intodevfrom
miguel/zenoh_transport

Conversation

@Kaweees
Copy link
Member

@Kaweees Kaweees commented Feb 19, 2026

This merge request implements Zenoh transport layer, providing a new high-performance pub/sub transport option.

Quick Start

Note

We currently use the python bindings for Eclipse Zenoh as our Zenoh implementation. Its IdlStruct feature lets you define DDS topic types in pure Python, eliminating the need for separate IDL files, with automatic serialization support.

import json
from dataclasses import dataclass, asdict
from dimos.protocol.pubsub.impl.zenohpubsub import ZenohPubSub, Topic

@dataclass
class SensorReading:
    value: float

zenoh = ZenohPubSub()
zenoh.start()

received = []
sensor_topic = Topic(name="sensors/temperature")

zenoh.subscribe(sensor_topic, lambda msg, t: received.append(SensorReading(**json.loads(msg))))
zenoh.publish(sensor_topic, json.dumps(asdict(SensorReading(value=22.5))))

import time
time.sleep(0.1)

print(f"Received: {received}")
zenoh.stop()
Received: [SensorReading(value=22.5)]

Unit Tests/Benchmarks

# Test and benchmark Zenoh transport protocol
uv run python -m pytest -svm tool dimos/protocol/pubsub/benchmark/test_benchmark.py --override-ini="addopts=" -k "Zenoh"
# Test and benchmark all transport protocols
uv run python -m pytest -svm tool dimos/protocol/pubsub/benchmark/test_benchmark.py
image

Related: #927

@Kaweees Kaweees self-assigned this Feb 19, 2026
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 19, 2026

Greptile Summary

This PR adds a complete Zenoh transport layer implementation, providing a high-performance pub/sub protocol option alongside existing LCM, SharedMemory, DDS, and ROS transports.

Key Changes:

  • Implemented ZenohService base class with singleton session management
  • Created ZenohPubSub implementation with proper publisher/subscriber lifecycle management
  • Added ZenohTransport wrapper following existing transport patterns with thread-safe lazy initialization
  • Included comprehensive tests and benchmarks
  • Documented the new transport with usage examples

Implementation Quality:

  • Clean separation of concerns: ZenohService handles session management, ZenohPubSub handles pub/sub operations, ZenohTransport provides the standard transport interface
  • Thread-safe operations with proper locking throughout
  • Proper resource cleanup for publishers and subscribers in stop()
  • Follows established patterns from DDS and LCM transports

Minor Note:

  • Zenoh sessions are stored in a singleton dictionary and never closed (matches DDS pattern), which could theoretically leak resources if many different session configurations are created over the application lifetime

Confidence Score: 5/5

  • This PR is safe to merge with minimal risk
  • The implementation follows established patterns from existing transports (DDS, LCM), includes comprehensive tests and benchmarks, has proper thread safety and resource management, and is well-documented. The session singleton pattern matches the DDS implementation and is appropriate for this use case.
  • No files require special attention

Important Files Changed

Filename Overview
dimos/protocol/service/zenohservice.py Added ZenohService base class with singleton session management, but sessions are never closed
dimos/protocol/pubsub/impl/zenohpubsub.py Implemented ZenohPubSub with proper publisher/subscriber management and cleanup
dimos/core/transport.py ZenohTransport implementation follows existing patterns with proper thread-safe lazy initialization

Sequence Diagram

sequenceDiagram
    participant User
    participant ZenohTransport
    participant ZenohPubSub
    participant ZenohService
    participant Session as zenoh.Session

    User->>ZenohTransport: __init__(topic, **kwargs)
    ZenohTransport->>ZenohPubSub: create instance
    ZenohPubSub->>ZenohService: inherit from

    User->>ZenohTransport: broadcast(msg) or subscribe(callback)
    ZenohTransport->>ZenohTransport: check _started (lazy init)
    alt not started
        ZenohTransport->>ZenohPubSub: start()
        ZenohPubSub->>ZenohService: start()
        ZenohService->>Session: zenoh.open(config)
        Note over ZenohService,Session: Session stored in _sessions dict
    end

    alt broadcast
        ZenohTransport->>ZenohPubSub: publish(topic, msg)
        ZenohPubSub->>ZenohPubSub: _get_publisher(topic)
        ZenohPubSub->>Session: declare_publisher(topic.name)
        ZenohPubSub->>Session: publisher.put(message)
    else subscribe
        ZenohTransport->>ZenohPubSub: subscribe(topic, callback)
        ZenohPubSub->>Session: declare_subscriber(topic.name, on_sample)
        Session-->>ZenohPubSub: subscriber
        Note over Session: Background thread spawned
        Session->>ZenohPubSub: on_sample(sample)
        ZenohPubSub->>User: callback(msg, topic)
    end

    User->>ZenohTransport: stop()
    ZenohTransport->>ZenohPubSub: stop()
    ZenohPubSub->>ZenohPubSub: undeclare all publishers
    ZenohPubSub->>ZenohPubSub: undeclare all subscribers
    Note over Session: Session remains open (singleton)
Loading

Last reviewed commit: 3899edc

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 file reviewed, 2 comments

Edit Code Review Agent Settings | Greptile

# Transport Protocols
"dimos-lcm",
"PyTurboJPEG==1.8.2",
"eclipse-zenoh>=1.7.2",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check that ZenohTransport implementation in dimos/core/transport.py:322 is complete - currently it's just an empty stub (class ZenohTransport(PubSubTransport[T]): ...)

# Transport Protocols
"dimos-lcm",
"PyTurboJPEG==1.8.2",
"eclipse-zenoh>=1.7.2",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if docs/usage/transports.md should be updated to document Zenoh transport alongside LCM, SharedMemory, DDS, and ROS transports

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

@spomichter
Copy link
Contributor

Where are you handling bridge vs host discovery mode? pretty importatnt for zenoh

@leshy
Copy link
Contributor

leshy commented Feb 20, 2026

Where are you handling bridge vs host discovery mode? pretty importatnt for zenoh

Stash I mentioned those two things briefly but there are also 55 other things that are zenoh. someone familair with transport protocols needs to seriously go through the documentation and think about this integration

@Kaweees
Copy link
Member Author

Kaweees commented Feb 20, 2026

zenoh not close to done without understanding brokers, discovery, their diff transport mechanisms etc, zenoh is not a trivial protocol

Here is what @leshy is referring to. I will extend ZenohConfig to support the different discovery modes

@Kaweees
Copy link
Member Author

Kaweees commented Feb 20, 2026

@greptile

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile

config.insert_json5("connect/endpoints", json.dumps(self.config.connect))
if self.config.listen:
config.insert_json5("listen/endpoints", json.dumps(self.config.listen))
_sessions[key] = zenoh.open(config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zenoh sessions in _sessions dict are never closed - sessions accumulate without cleanup. Consider adding reference counting or explicit cleanup in stop().

config.insert_json5("connect/endpoints", json.dumps(self.config.connect))
if self.config.listen:
config.insert_json5("listen/endpoints", json.dumps(self.config.listen))
_sessions[key] = zenoh.open(config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zenoh sessions in _sessions dict are never closed when services stop. This matches the DDS pattern but could leak resources if many different session configs are created over time.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants