-
Notifications
You must be signed in to change notification settings - Fork 7
Create DDS Transport Protocol #1144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
* raw rospubsub and benchmarks * typefixes, shm added to the benchmark * SHM is not so important to tell us every time when it starts * greptile comments * Add co-authorship line to commit message filter patterns * Remove unused contextmanager import --------- Co-authored-by: Ivan Nikolic <lesh@sysphere.org>
Replace base64 string encoding with native IDL bytearray type to eliminate buffer overflow issues. The original base64 encoding exceeded CycloneDDS's default string size limit (~256 bytes) and caused crashes on messages >= 1KB. Key changes: - Use make_idl_struct with bytearray field instead of string - Convert bytes to bytearray when publishing to DDS - Convert bytearray back to bytes when receiving from DDS - Add _DDSMessageListener for async message dispatch - Implement thread-safe DataWriter/DataReader management - Add pickle support via __getstate__/__setstate__ Result: All 12 DDS benchmark tests pass (64B to 10MB messages).
…or Message payload.
The double-checked locking pattern avoids lock contention on every call after initial object creation. Initial benchmarking shows this pattern performs better than simple locking for repeated accesses to the same topics.
Greptile OverviewGreptile SummaryThis PR implements DDS (Data Distribution Service) transport using Eclipse Cyclone DDS, providing a new pub/sub transport option alongside existing LCM, ROS, and shared memory transports. Key additions:
Critical issues found:
Note: Several issues from previous review threads remain unaddressed, including the string formatting error in Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant App
participant DDSTransport
participant DDS
participant DDSService
participant DomainParticipant
participant DataWriter
participant DataReader
participant Listener
Note over App,Listener: Initialization and Publishing Flow
App->>DDSTransport: __init__(topic, type)
DDSTransport->>DDS: __init__(qos)
DDS->>DDSService: __init__()
App->>DDSTransport: broadcast(msg)
DDSTransport->>DDSTransport: Check _started flag
DDSTransport->>DDS: start()
DDS->>DDSService: start()
DDSService->>DomainParticipant: Create singleton
DDSTransport->>DDS: publish(topic, msg)
DDS->>DDS: _get_writer(topic)
DDS->>DataWriter: Create if not exists
DDS->>DataWriter: write(message)
Note over App,Listener: Subscription Flow
App->>DDSTransport: subscribe(callback)
DDSTransport->>DDS: subscribe(topic, callback)
DDS->>DDS: _get_listener(topic)
DDS->>Listener: Create listener if not exists
DDS->>DataReader: Create with listener
DDS->>Listener: add_callback(callback)
Note over App,Listener: Message Reception
DataWriter->>DataReader: DDS message arrives
DataReader->>Listener: on_data_available(reader)
Listener->>DataReader: take()
loop For each sample
Listener->>App: callback(sample, topic)
end
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5 files reviewed, 3 comments
|
@greptile |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5 files reviewed, no comments
The listener now uses a tuple for callbacks and has thread-safe methods for adding/removing callbacks for a given topic.
Co-authored-by: Paul Nechifor <paul@nechifor.net>
Co-authored-by: Paul Nechifor <paul@nechifor.net>
Co-authored-by: Paul Nechifor <paul@nechifor.net>
|
@greptile |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5 files reviewed, 6 comments
|
@greptile |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3 files reviewed, 5 comments
| if _participant is not None: | ||
| _participant = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting _participant to None doesn't actually clean up the DomainParticipant resource. Should call .close() or similar cleanup method before setting to None.
| if _participant is not None: | |
| _participant = None | |
| if _participant is not None: | |
| _participant.close() | |
| _participant = None |
dimos/core/transport.py
Outdated
| def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] | ||
| with self._start_lock: | ||
| if not self._started: | ||
| self.start() | ||
| self.dds.publish(self.topic, msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lock is released before calling publish(), allowing race condition where start() could be called again by another thread before publish executes. Should keep lock held or check _started again inside the lock.
| def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] | |
| with self._start_lock: | |
| if not self._started: | |
| self.start() | |
| self.dds.publish(self.topic, msg) | |
| def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] | |
| with self._start_lock: | |
| if not self._started: | |
| self.start() | |
| self.dds.publish(self.topic, msg) |
dimos/core/transport.py
Outdated
| def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override] | ||
| with self._start_lock: | ||
| if not self._started: | ||
| self.start() | ||
| return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same lock release issue - subscribe() releases lock before calling dds.subscribe(), allowing race condition.
| def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override] | |
| with self._start_lock: | |
| if not self._started: | |
| self.start() | |
| return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value] | |
| def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override] | |
| with self._start_lock: | |
| if not self._started: | |
| self.start() | |
| return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value] |
dimos/core/transport.py
Outdated
| def start(self) -> None: | ||
| with self._start_lock: | ||
| self.dds.start() | ||
| self._started = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling start() recursively while holding _start_lock (an RLock) could lead to nested start calls if start() triggers any code path that calls broadcast() or subscribe(). Check _started flag before calling dds.start().
| def start(self) -> None: | |
| with self._start_lock: | |
| self.dds.start() | |
| self._started = True | |
| def start(self) -> None: | |
| with self._start_lock: | |
| if not self._started: | |
| self.dds.start() | |
| self._started = True |
dimos/core/transport.py
Outdated
| def stop(self) -> None: | ||
| with self._start_lock: | ||
| self.dds.stop() | ||
| self._started = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check _started flag to avoid calling stop() multiple times
| def stop(self) -> None: | |
| with self._start_lock: | |
| self.dds.stop() | |
| self._started = False | |
| def stop(self) -> None: | |
| with self._start_lock: | |
| if self._started: | |
| self.dds.stop() | |
| self._started = False |
This merge request implements DDS (Data Distribution Service) transport layer using CycloneDDS, providing a new high-performance pub/sub transport option.
Quick Start
Note
We currently use Eclipse Cyclone DDS as our DDS implementation. Its IdlStruct feature lets you define DDS topic types in pure Python, eliminating the need for separate IDL files, with automatic serialization support.
Unit Tests/Benchmarks
This builds off of #1036, which was closed because the branch was renamed to
miguel/dds_transport