Skip to content

Conversation

@Kaweees
Copy link
Member

@Kaweees Kaweees commented Jan 28, 2026

This merge request implements DDS (Data Distribution Service) transport layer using CycloneDDS, providing a new high-performance pub/sub transport option.

Quick Start

Note

We currently use Eclipse Cyclone DDS as our DDS implementation. Its IdlStruct feature lets you define DDS topic types in pure Python, eliminating the need for separate IDL files, with automatic serialization support.

from dataclasses import dataclass
from cyclonedds.idl import IdlStruct
from dimos.protocol.pubsub.ddspubsub import DDS, Topic

@dataclass
class SensorReading(IdlStruct):
    value: float

dds = DDS()
dds.start()

received = []
sensor_topic = Topic(name="sensors/temperature", data_type=SensorReading)

dds.subscribe(sensor_topic, lambda msg, t: received.append(msg))
dds.publish(sensor_topic, SensorReading(value=22.5))

import time
time.sleep(0.1)

print(f"Received: {received}")
dds.stop()
Received: [SensorReading(value=22.5)]

Unit Tests/Benchmarks

uv run pytest -svm tool dimos/protocol/pubsub/benchmark/test_benchmark.py --override-ini="addopts=" -k "DDS"
uv run pytest -svm tool dimos/protocol/pubsub/benchmark/test_benchmark.py
image

This builds off of #1036, which was closed because the branch was renamed to miguel/dds_transport

Kaweees and others added 22 commits January 15, 2026 18:06
* raw rospubsub and benchmarks

* typefixes, shm added to the benchmark

* SHM is not so important to tell us every time when it starts

* greptile comments

* Add co-authorship line to commit message filter patterns

* Remove unused contextmanager import

---------

Co-authored-by: Ivan Nikolic <lesh@sysphere.org>
Replace base64 string encoding with native IDL bytearray type to eliminate
buffer overflow issues. The original base64 encoding exceeded CycloneDDS's
default string size limit (~256 bytes) and caused crashes on messages >= 1KB.

Key changes:
- Use make_idl_struct with bytearray field instead of string
- Convert bytes to bytearray when publishing to DDS
- Convert bytearray back to bytes when receiving from DDS
- Add _DDSMessageListener for async message dispatch
- Implement thread-safe DataWriter/DataReader management
- Add pickle support via __getstate__/__setstate__

Result: All 12 DDS benchmark tests pass (64B to 10MB messages).
The double-checked locking pattern avoids lock contention on every
call after initial object creation. Initial benchmarking shows this
pattern performs better than simple locking for repeated accesses
to the same topics.
@greptile-apps
Copy link

greptile-apps bot commented Jan 28, 2026

Greptile Overview

Greptile Summary

This PR implements DDS (Data Distribution Service) transport using Eclipse Cyclone DDS, providing a new pub/sub transport option alongside existing LCM, ROS, and shared memory transports.

Key additions:

  • DDSService - Manages singleton DomainParticipant for DDS domain coordination
  • DDS class - PubSub implementation with thread-safe DataWriter/DataReader management
  • DDSTransport - Transport layer integration with auto-start on first use
  • Benchmark test cases comparing DDS performance against other transports
  • Documentation and examples for usage

Critical issues found:

  • DDSService.stop() doesn't properly clean up the DomainParticipant resource (line 62-63)
  • DDSTransport has race conditions in broadcast() and subscribe() where locks are released before calling DDS methods (lines 280-290)
  • Missing idempotency checks in start() and stop() methods could lead to double-initialization

Note: Several issues from previous review threads remain unaddressed, including the string formatting error in ddspubsub.py and duplicate imports in transport.py.

Confidence Score: 3/5

  • This PR has multiple race conditions and resource cleanup issues that could cause instability in multi-threaded environments
  • Score reflects several critical concurrency bugs in DDSTransport (lock release before method calls) and missing resource cleanup in DDSService, plus unresolved issues from previous review threads
  • Pay close attention to dimos/core/transport.py (race conditions in broadcast/subscribe) and dimos/protocol/service/ddsservice.py (missing resource cleanup)

Important Files Changed

Filename Overview
dimos/protocol/pubsub/ddspubsub.py New DDS pubsub implementation with thread-safe listener pattern
dimos/protocol/service/ddsservice.py DDS service with global singleton participant, missing cleanup logic
dimos/core/transport.py Added DDSTransport class with thread-safe initialization

Sequence Diagram

sequenceDiagram
    participant App
    participant DDSTransport
    participant DDS
    participant DDSService
    participant DomainParticipant
    participant DataWriter
    participant DataReader
    participant Listener

    Note over App,Listener: Initialization and Publishing Flow
    App->>DDSTransport: __init__(topic, type)
    DDSTransport->>DDS: __init__(qos)
    DDS->>DDSService: __init__()
    
    App->>DDSTransport: broadcast(msg)
    DDSTransport->>DDSTransport: Check _started flag
    DDSTransport->>DDS: start()
    DDS->>DDSService: start()
    DDSService->>DomainParticipant: Create singleton
    
    DDSTransport->>DDS: publish(topic, msg)
    DDS->>DDS: _get_writer(topic)
    DDS->>DataWriter: Create if not exists
    DDS->>DataWriter: write(message)

    Note over App,Listener: Subscription Flow
    App->>DDSTransport: subscribe(callback)
    DDSTransport->>DDS: subscribe(topic, callback)
    DDS->>DDS: _get_listener(topic)
    DDS->>Listener: Create listener if not exists
    DDS->>DataReader: Create with listener
    DDS->>Listener: add_callback(callback)
    
    Note over App,Listener: Message Reception
    DataWriter->>DataReader: DDS message arrives
    DataReader->>Listener: on_data_available(reader)
    Listener->>DataReader: take()
    loop For each sample
        Listener->>App: callback(sample, topic)
    end
Loading

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

@Kaweees
Copy link
Member Author

Kaweees commented Jan 28, 2026

@greptile

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@Kaweees Kaweees self-assigned this Jan 28, 2026
Kaweees and others added 4 commits January 31, 2026 20:55
The listener now uses a tuple for callbacks and has thread-safe methods for adding/removing callbacks for a given topic.
Kaweees and others added 3 commits February 2, 2026 13:16
Co-authored-by: Paul Nechifor <paul@nechifor.net>
Co-authored-by: Paul Nechifor <paul@nechifor.net>
Co-authored-by: Paul Nechifor <paul@nechifor.net>
@Kaweees
Copy link
Member Author

Kaweees commented Feb 2, 2026

@greptile

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 files reviewed, 6 comments

Edit Code Review Agent Settings | Greptile

@Kaweees
Copy link
Member Author

Kaweees commented Feb 3, 2026

@greptile

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 files reviewed, 5 comments

Edit Code Review Agent Settings | Greptile

Comment on lines 62 to 63
if _participant is not None:
_participant = None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting _participant to None doesn't actually clean up the DomainParticipant resource. Should call .close() or similar cleanup method before setting to None.

Suggested change
if _participant is not None:
_participant = None
if _participant is not None:
_participant.close()
_participant = None

Comment on lines 280 to 284
def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def]
with self._start_lock:
if not self._started:
self.start()
self.dds.publish(self.topic, msg)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lock is released before calling publish(), allowing race condition where start() could be called again by another thread before publish executes. Should keep lock held or check _started again inside the lock.

Suggested change
def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def]
with self._start_lock:
if not self._started:
self.start()
self.dds.publish(self.topic, msg)
def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def]
with self._start_lock:
if not self._started:
self.start()
self.dds.publish(self.topic, msg)

Comment on lines 286 to 290
def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override]
with self._start_lock:
if not self._started:
self.start()
return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same lock release issue - subscribe() releases lock before calling dds.subscribe(), allowing race condition.

Suggested change
def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override]
with self._start_lock:
if not self._started:
self.start()
return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value]
def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override]
with self._start_lock:
if not self._started:
self.start()
return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value]

Comment on lines 270 to 273
def start(self) -> None:
with self._start_lock:
self.dds.start()
self._started = True
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling start() recursively while holding _start_lock (an RLock) could lead to nested start calls if start() triggers any code path that calls broadcast() or subscribe(). Check _started flag before calling dds.start().

Suggested change
def start(self) -> None:
with self._start_lock:
self.dds.start()
self._started = True
def start(self) -> None:
with self._start_lock:
if not self._started:
self.dds.start()
self._started = True

Comment on lines 275 to 278
def stop(self) -> None:
with self._start_lock:
self.dds.stop()
self._started = False
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check _started flag to avoid calling stop() multiple times

Suggested change
def stop(self) -> None:
with self._start_lock:
self.dds.stop()
self._started = False
def stop(self) -> None:
with self._start_lock:
if self._started:
self.dds.stop()
self._started = False

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants