Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/api/events/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@
- Event
- SystemEvent
- StopEvent
- EventConnectorBuilder
- EventHandlers
2 changes: 1 addition & 1 deletion docs/examples/tutorials/event-driven-models.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Finally, we need components to subscribe to these events and process them. Use t
Now we can create a [`Process`][plugboard.process.Process] from all these components. The outputs from `CollectLow` and `CollectHigh` are connected to separate [`FileWriter`][plugboard.library.FileWriter] components so that we'll get a CSV file containing the latest high and low values at each step of the simulation.

!!! info
We need a few extra lines of code to create connectors for the event-based parts of the model. If you define your process in YAML this will be done automatically for you, but if you are defining the process in code then you will need to use the [`EventConnectorBuilder`][plugboard.events.EventConnectorBuilder] to do this.
We need a few extra lines of code to create connectors for the event-based parts of the model. If you define your process in YAML this will be done automatically for you, but if you are defining the process in code then you will need to use the [`ConnectorBuilder`][plugboard.connector.ConnectorBuilder] to do this.

```python hl_lines="15-17"
--8<-- "examples/tutorials/005_events/hello_events.py:main"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
"import typing as _t\n",
"\n",
"from plugboard.connector import AsyncioConnector, ConnectorBuilder\n",
"from plugboard.events import EventConnectorBuilder\n",
"from plugboard.process import LocalProcess\n",
"from plugboard.schemas import ConnectorSpec\n",
"\n",
Expand Down Expand Up @@ -480,9 +479,7 @@
"]\n",
"\n",
"# Event connectors\n",
"builder = ConnectorBuilder(connector_cls=AsyncioConnector)\n",
"event_builder = EventConnectorBuilder(connector_builder=builder)\n",
"event_connectors = list(event_builder.build(components).values())\n",
"event_connectors = AsyncioConnector.builder().build_event_connectors(components)\n",
"\n",
"process = LocalProcess(components=components, connectors=connectors + event_connectors)"
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
"from plugboard.schemas import ComponentArgsDict, ConnectorSpec\n",
"from plugboard.process import LocalProcess\n",
"from plugboard.library import FileReader, FileWriter, LLMImageProcessor\n",
"from plugboard.events import Event, EventConnectorBuilder"
"from plugboard.events import Event"
]
},
{
Expand Down Expand Up @@ -390,9 +390,7 @@
"]\n",
"\n",
"# Event Connectors\n",
"builder = ConnectorBuilder(connector_cls=AsyncioConnector)\n",
"event_builder = EventConnectorBuilder(connector_builder=builder)\n",
"event_connectors = list(event_builder.build(components).values())\n",
"event_connectors = AsyncioConnector.builder().build_event_connectors(components)\n",
"\n",
"# Define Process\n",
"process = LocalProcess(components=components, connectors=connectors + event_connectors)\n",
Expand Down
6 changes: 2 additions & 4 deletions examples/tutorials/005_events/hello_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from plugboard.component import Component, IOController
from plugboard.connector import AsyncioConnector, ConnectorBuilder
from plugboard.events import Event, EventConnectorBuilder, StopEvent
from plugboard.events import Event, StopEvent
from plugboard.library import FileWriter
from plugboard.process import LocalProcess
from plugboard.schemas import ConnectorSpec, ComponentArgsDict
Expand Down Expand Up @@ -139,9 +139,7 @@ async def main() -> None:
connect("collect-high.value", "save-high.value"),
connect("collect-low.value", "save-low.value"),
]
connector_builder = ConnectorBuilder(connector_cls=AsyncioConnector) # (2)!
event_connector_builder = EventConnectorBuilder(connector_builder=connector_builder)
event_connectors = list(event_connector_builder.build(components).values())
event_connectors = AsyncioConnector.builder().build_event_connectors(components) # (3)!

process = LocalProcess(
components=components,
Expand Down
6 changes: 6 additions & 0 deletions plugboard/connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import typing as _t

from plugboard.connector.channel import Channel
from plugboard.connector.connector_builder import ConnectorBuilder
from plugboard.schemas import ConnectorSpec
from plugboard.utils import ExportMixin

Expand Down Expand Up @@ -39,3 +40,8 @@ def dict(self) -> dict[str, _t.Any]: # noqa: D102
"id": self.id,
"spec": self.spec.model_dump(),
}

@classmethod
def builder(cls, *args: _t.Any, **kwargs: _t.Any) -> ConnectorBuilder:
"""Returns a `ConnectorBuilder` for this `Connector` class."""
return ConnectorBuilder(cls, *args, **kwargs)
13 changes: 12 additions & 1 deletion plugboard/connector/connector_builder.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
"""Provides `ConnectorBuilder` to build `Connector` objects."""

from __future__ import annotations

import typing as _t

from plugboard.connector.connector import Connector

if _t.TYPE_CHECKING: # pragma: no cover
from plugboard.connector.connector import Connector

from plugboard.connector.event_connector_spec_builder import EventConnectorSpecBuilder
from plugboard.schemas import ConnectorSpec


Expand All @@ -17,3 +23,8 @@ def __init__(self, connector_cls: type[Connector], *args: _t.Any, **kwargs: _t.A
def build(self, spec: ConnectorSpec) -> Connector:
"""Builds a `Connector` object."""
return self._connector_cls(spec, *self._args, **self._kwargs)

def build_event_connectors(self, components: _t.Iterable[_t.Any]) -> list[Connector]:
"""Builds event connectors for the given components."""
evt_conn_map = EventConnectorSpecBuilder.build(components)
return [self.build(spec=spec) for spec in evt_conn_map.values()]
53 changes: 53 additions & 0 deletions plugboard/connector/event_connector_spec_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Provides `EventConnectorSpecBuilder` utility class which builds event connector specs for components.""" # noqa: E501,W505

from __future__ import annotations

import typing as _t

from plugboard.events.event import Event
from plugboard.schemas import ConnectorMode, ConnectorSocket, ConnectorSpec


if _t.TYPE_CHECKING:
from plugboard.component import Component


class EventConnectorSpecBuilder: # pragma: no cover
"""`EventConnectorSpecBuilder` constructs connector specs for component event handlers."""

_source_descriptor: str = "publishers"
_target_descriptor: str = "subscribers"

@staticmethod
def build(components: _t.Iterable[Component]) -> dict[str, ConnectorSpec]:
"""Returns mapping of connector specs for events handled by components."""
evt_conn_map: dict[str, ConnectorSpec] = {}
for component in components:
comp_evt_conn_map = EventConnectorSpecBuilder._build_for_component(
evt_conn_map, component
)
evt_conn_map.update(comp_evt_conn_map)
return evt_conn_map

@staticmethod
def _build_for_component(
evt_conn_map: dict[str, ConnectorSpec], component: Component
) -> dict[str, ConnectorSpec]:
component_evts = set(component.io.input_events + component.io.output_events)
return {
evt.type: EventConnectorSpecBuilder._build_for_event(evt.type)
for evt in component_evts
if evt.type not in evt_conn_map
}

@staticmethod
def _build_for_event(evt_type: str) -> ConnectorSpec:
evt_type_safe = Event.safe_type(evt_type)
source = ConnectorSocket(
entity=evt_type_safe, descriptor=EventConnectorSpecBuilder._source_descriptor
)
target = ConnectorSocket(
entity=evt_type_safe, descriptor=EventConnectorSpecBuilder._target_descriptor
)
spec = ConnectorSpec(source=source, target=target, mode=ConnectorMode.PUBSUB)
return spec
2 changes: 0 additions & 2 deletions plugboard/events/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
"""Provides models and utilities for handling events."""

from plugboard.events.event import Event, StopEvent, SystemEvent
from plugboard.events.event_connector_builder import EventConnectorBuilder
from plugboard.events.event_handlers import EventHandlers


__all__ = [
"Event",
"EventConnectorBuilder",
"EventHandlers",
"StopEvent",
"SystemEvent",
Expand Down
49 changes: 0 additions & 49 deletions plugboard/events/event_connector_builder.py

This file was deleted.

9 changes: 4 additions & 5 deletions plugboard/process/process_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from plugboard.component.utils import ComponentDecoratorHelper
from plugboard.connector.connector import Connector
from plugboard.connector.connector_builder import ConnectorBuilder
from plugboard.events.event_connector_builder import EventConnectorBuilder
from plugboard.connector.ray_channel import RayConnector
from plugboard.process.process import Process
from plugboard.schemas import ProcessSpec
from plugboard.state import StateBackend
Expand Down Expand Up @@ -70,16 +70,15 @@ def _build_connectors(cls, spec: ProcessSpec, components: list[Component]) -> li
connector_builder = ConnectorBuilder(
connector_cls=connector_class, **dict(spec.connector_builder.args)
)
event_connector_builder = EventConnectorBuilder(connector_builder=connector_builder)
# TODO: Remove this when https://github.com/plugboard-dev/plugboard/issues/101 is resolved
if spec.type.endswith("RayProcess"):
if connector_class is RayConnector: # pragma: no cover
DI.logger.resolve_sync().warning(
"RayProcess does not yet support event-based models. "
"RayConnector does not yet support event-based models. "
"Event connectors will not be built."
)
event_connectors = []
else:
event_connectors = list(event_connector_builder.build(components).values())
event_connectors = connector_builder.build_event_connectors(components)
spec_connectors = [connector_builder.build(cs) for cs in spec.args.connectors]
return sorted(
{conn.id: conn for conn in event_connectors + spec_connectors}.values(),
Expand Down
28 changes: 14 additions & 14 deletions tests/integration/test_component_event_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from plugboard.component import Component, IOController
from plugboard.connector import AsyncioConnector, Connector, ConnectorBuilder
from plugboard.events import Event, EventConnectorBuilder
from plugboard.events import Event
from plugboard.schemas import ConnectorSpec
from tests.conftest import zmq_connector_cls

Expand Down Expand Up @@ -84,10 +84,9 @@ def connector_cls(_connector_cls: _t.Type[Connector]) -> _t.Type[Connector]:


@pytest.fixture
def event_connectors(connector_cls: _t.Type[Connector]) -> EventConnectorBuilder:
def connector_builder(connector_cls: _t.Type[Connector]) -> ConnectorBuilder:
"""Fixture for an event connectors instance."""
connector_builder = ConnectorBuilder(connector_cls=connector_cls)
return EventConnectorBuilder(connector_builder=connector_builder)
return ConnectorBuilder(connector_cls=connector_cls)


@pytest.mark.asyncio
Expand Down Expand Up @@ -115,7 +114,7 @@ def event_connectors(connector_cls: _t.Type[Connector]) -> EventConnectorBuilder
],
)
async def test_component_event_handlers(
io_controller_kwargs: dict, event_connectors: EventConnectorBuilder
io_controller_kwargs: dict, connector_builder: ConnectorBuilder
) -> None:
"""Test that event handlers are registered and called correctly for components."""

Expand All @@ -124,24 +123,24 @@ class _A(A):

a = _A(name="a")

event_connectors_map = event_connectors.build([a])
connectors = list(event_connectors_map.values())
connectors = connector_builder.build_event_connectors([a])
event_connectors_map = {conn.spec.source.entity: conn for conn in connectors}

await a.io.connect(connectors)

assert a.event_A_count == 0
assert a.event_B_count == 0

evt_A = EventTypeA(data=EventTypeAData(x=2), source="test-driver")
chan_A = await event_connectors_map[evt_A.type].connect_send()
chan_A = await event_connectors_map[evt_A.safe_type()].connect_send()
await chan_A.send(evt_A)
await a.step()

assert a.event_A_count == 2
assert a.event_B_count == 0

evt_B = EventTypeB(data=EventTypeBData(y=4), source="test-driver")
chan_B = await event_connectors_map[evt_B.type].connect_send()
chan_B = await event_connectors_map[evt_B.safe_type()].connect_send()
await chan_B.send(evt_B)
await a.step()

Expand All @@ -163,7 +162,7 @@ async def field_connectors(connector_cls: _t.Type[Connector]) -> list[Connector]

@pytest.mark.asyncio
async def test_component_event_handlers_with_field_inputs(
event_connectors: EventConnectorBuilder,
connector_builder: ConnectorBuilder,
field_connectors: list[Connector],
) -> None:
"""Test that event handlers are registered and called correctly for components."""
Expand All @@ -178,8 +177,9 @@ class _A(A):

a = _A(name="a")

event_connectors_map = event_connectors.build([a])
connectors = list(event_connectors_map.values()) + field_connectors
event_connectors = connector_builder.build_event_connectors([a])
event_connectors_map = {conn.spec.source.entity: conn for conn in event_connectors}
connectors = event_connectors + field_connectors

# FIXME : With `ZMQConnector` both send and recv side must be connected to avoid hanging.
# : See https://github.com/plugboard-dev/plugboard/issues/101.
Expand All @@ -199,7 +199,7 @@ class _A(A):

# After sending one event of type A, the event_A_count should be 2
evt_A = EventTypeA(data=EventTypeAData(x=2), source="test-driver")
chan_A = await event_connectors_map[evt_A.type].connect_send()
chan_A = await event_connectors_map[evt_A.safe_type()].connect_send()
await chan_A.send(evt_A)
await a.step()

Expand All @@ -210,7 +210,7 @@ class _A(A):

# After sending one event of type B, the event_B_count should be 4
evt_B = EventTypeB(data=EventTypeBData(y=4), source="test-driver")
chan_B = await event_connectors_map[evt_B.type].connect_send()
chan_B = await event_connectors_map[evt_B.safe_type()].connect_send()
await chan_B.send(evt_B)
await a.step()

Expand Down
10 changes: 5 additions & 5 deletions tests/integration/test_process_stop_cancel.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
RabbitMQConnector,
RayConnector,
)
from plugboard.events import EventConnectorBuilder, StopEvent
from plugboard.events import StopEvent
from plugboard.process import LocalProcess, Process, RayProcess
from plugboard.schemas import ConnectorSpec, Status
from tests.conftest import ComponentTestHelper, zmq_connector_cls
Expand Down Expand Up @@ -73,7 +73,6 @@ async def test_process_stop_event(
) -> None:
"""Tests that an event-driven Process can be stopped gracefully via a StopEvent."""
connector_builder = ConnectorBuilder(connector_cls=connector_cls)
event_connectors = EventConnectorBuilder(connector_builder=connector_builder)

max_iters = 25
iters_before_stop = 15
Expand All @@ -89,13 +88,14 @@ async def test_process_stop_event(
]
field_connectors: list[Connector] = [conn_ab1, conn_ab2, conn_ab3, conn_ab4, conn_ab5]

event_connectors_map = event_connectors.build(components)
connectors: list[Connector] = list(event_connectors_map.values()) + field_connectors
event_connectors = connector_builder.build_event_connectors(components)
event_connectors_map = {conn.spec.source.entity: conn for conn in event_connectors}
connectors: list[Connector] = event_connectors + field_connectors

process = process_cls(components, connectors)

async with process:
stop_evt_conn = event_connectors_map[StopEvent.type]
stop_evt_conn = event_connectors_map[StopEvent.safe_type()]
stop_chan = await stop_evt_conn.connect_send()

async def stop_after() -> None:
Expand Down
Loading
Loading