diff --git a/docs/api/events/events.md b/docs/api/events/events.md index 219c69c8..3992f05b 100644 --- a/docs/api/events/events.md +++ b/docs/api/events/events.md @@ -4,5 +4,4 @@ - Event - SystemEvent - StopEvent - - EventConnectorBuilder - EventHandlers diff --git a/docs/examples/tutorials/event-driven-models.md b/docs/examples/tutorials/event-driven-models.md index b0b99843..03622b22 100644 --- a/docs/examples/tutorials/event-driven-models.md +++ b/docs/examples/tutorials/event-driven-models.md @@ -66,7 +66,7 @@ Finally, we need components to subscribe to these events and process them. Use t Now we can create a [`Process`][plugboard.process.Process] from all these components. The outputs from `CollectLow` and `CollectHigh` are connected to separate [`FileWriter`][plugboard.library.FileWriter] components so that we'll get a CSV file containing the latest high and low values at each step of the simulation. !!! info - We need a few extra lines of code to create connectors for the event-based parts of the model. If you define your process in YAML this will be done automatically for you, but if you are defining the process in code then you will need to use the [`EventConnectorBuilder`][plugboard.events.EventConnectorBuilder] to do this. + We need a few extra lines of code to create connectors for the event-based parts of the model. If you define your process in YAML this will be done automatically for you, but if you are defining the process in code then you will need to use the [`ConnectorBuilder`][plugboard.connector.ConnectorBuilder] to do this. ```python hl_lines="15-17" --8<-- "examples/tutorials/005_events/hello_events.py:main" diff --git a/examples/demos/finance/001_momentum_signal/momentum-signal.ipynb b/examples/demos/finance/001_momentum_signal/momentum-signal.ipynb index 38f32a13..006a1188 100644 --- a/examples/demos/finance/001_momentum_signal/momentum-signal.ipynb +++ b/examples/demos/finance/001_momentum_signal/momentum-signal.ipynb @@ -69,7 +69,6 @@ "import typing as _t\n", "\n", "from plugboard.connector import AsyncioConnector, ConnectorBuilder\n", - "from plugboard.events import EventConnectorBuilder\n", "from plugboard.process import LocalProcess\n", "from plugboard.schemas import ConnectorSpec\n", "\n", @@ -480,9 +479,7 @@ "]\n", "\n", "# Event connectors\n", - "builder = ConnectorBuilder(connector_cls=AsyncioConnector)\n", - "event_builder = EventConnectorBuilder(connector_builder=builder)\n", - "event_connectors = list(event_builder.build(components).values())\n", + "event_connectors = AsyncioConnector.builder().build_event_connectors(components)\n", "\n", "process = LocalProcess(components=components, connectors=connectors + event_connectors)" ] diff --git a/examples/demos/llm/004_image_processing/local-and-remote-image-processing.ipynb b/examples/demos/llm/004_image_processing/local-and-remote-image-processing.ipynb index bc9d4141..a7144c91 100644 --- a/examples/demos/llm/004_image_processing/local-and-remote-image-processing.ipynb +++ b/examples/demos/llm/004_image_processing/local-and-remote-image-processing.ipynb @@ -73,7 +73,7 @@ "from plugboard.schemas import ComponentArgsDict, ConnectorSpec\n", "from plugboard.process import LocalProcess\n", "from plugboard.library import FileReader, FileWriter, LLMImageProcessor\n", - "from plugboard.events import Event, EventConnectorBuilder" + "from plugboard.events import Event" ] }, { @@ -390,9 +390,7 @@ "]\n", "\n", "# Event Connectors\n", - "builder = ConnectorBuilder(connector_cls=AsyncioConnector)\n", - "event_builder = EventConnectorBuilder(connector_builder=builder)\n", - "event_connectors = list(event_builder.build(components).values())\n", + "event_connectors = AsyncioConnector.builder().build_event_connectors(components)\n", "\n", "# Define Process\n", "process = LocalProcess(components=components, connectors=connectors + event_connectors)\n", diff --git a/examples/tutorials/005_events/hello_events.py b/examples/tutorials/005_events/hello_events.py index c2a03f2f..8233dc2e 100644 --- a/examples/tutorials/005_events/hello_events.py +++ b/examples/tutorials/005_events/hello_events.py @@ -9,7 +9,7 @@ from plugboard.component import Component, IOController from plugboard.connector import AsyncioConnector, ConnectorBuilder -from plugboard.events import Event, EventConnectorBuilder, StopEvent +from plugboard.events import Event, StopEvent from plugboard.library import FileWriter from plugboard.process import LocalProcess from plugboard.schemas import ConnectorSpec, ComponentArgsDict @@ -139,9 +139,7 @@ async def main() -> None: connect("collect-high.value", "save-high.value"), connect("collect-low.value", "save-low.value"), ] - connector_builder = ConnectorBuilder(connector_cls=AsyncioConnector) # (2)! - event_connector_builder = EventConnectorBuilder(connector_builder=connector_builder) - event_connectors = list(event_connector_builder.build(components).values()) + event_connectors = AsyncioConnector.builder().build_event_connectors(components) # (3)! process = LocalProcess( components=components, diff --git a/plugboard/connector/connector.py b/plugboard/connector/connector.py index 7a0bec61..ceeb35c3 100644 --- a/plugboard/connector/connector.py +++ b/plugboard/connector/connector.py @@ -6,6 +6,7 @@ import typing as _t from plugboard.connector.channel import Channel +from plugboard.connector.connector_builder import ConnectorBuilder from plugboard.schemas import ConnectorSpec from plugboard.utils import ExportMixin @@ -39,3 +40,8 @@ def dict(self) -> dict[str, _t.Any]: # noqa: D102 "id": self.id, "spec": self.spec.model_dump(), } + + @classmethod + def builder(cls, *args: _t.Any, **kwargs: _t.Any) -> ConnectorBuilder: + """Returns a `ConnectorBuilder` for this `Connector` class.""" + return ConnectorBuilder(cls, *args, **kwargs) diff --git a/plugboard/connector/connector_builder.py b/plugboard/connector/connector_builder.py index 8a2644c0..bf205112 100644 --- a/plugboard/connector/connector_builder.py +++ b/plugboard/connector/connector_builder.py @@ -1,8 +1,14 @@ """Provides `ConnectorBuilder` to build `Connector` objects.""" +from __future__ import annotations + import typing as _t -from plugboard.connector.connector import Connector + +if _t.TYPE_CHECKING: # pragma: no cover + from plugboard.connector.connector import Connector + +from plugboard.connector.event_connector_spec_builder import EventConnectorSpecBuilder from plugboard.schemas import ConnectorSpec @@ -17,3 +23,8 @@ def __init__(self, connector_cls: type[Connector], *args: _t.Any, **kwargs: _t.A def build(self, spec: ConnectorSpec) -> Connector: """Builds a `Connector` object.""" return self._connector_cls(spec, *self._args, **self._kwargs) + + def build_event_connectors(self, components: _t.Iterable[_t.Any]) -> list[Connector]: + """Builds event connectors for the given components.""" + evt_conn_map = EventConnectorSpecBuilder.build(components) + return [self.build(spec=spec) for spec in evt_conn_map.values()] diff --git a/plugboard/connector/event_connector_spec_builder.py b/plugboard/connector/event_connector_spec_builder.py new file mode 100644 index 00000000..e65900a4 --- /dev/null +++ b/plugboard/connector/event_connector_spec_builder.py @@ -0,0 +1,53 @@ +"""Provides `EventConnectorSpecBuilder` utility class which builds event connector specs for components.""" # noqa: E501,W505 + +from __future__ import annotations + +import typing as _t + +from plugboard.events.event import Event +from plugboard.schemas import ConnectorMode, ConnectorSocket, ConnectorSpec + + +if _t.TYPE_CHECKING: + from plugboard.component import Component + + +class EventConnectorSpecBuilder: # pragma: no cover + """`EventConnectorSpecBuilder` constructs connector specs for component event handlers.""" + + _source_descriptor: str = "publishers" + _target_descriptor: str = "subscribers" + + @staticmethod + def build(components: _t.Iterable[Component]) -> dict[str, ConnectorSpec]: + """Returns mapping of connector specs for events handled by components.""" + evt_conn_map: dict[str, ConnectorSpec] = {} + for component in components: + comp_evt_conn_map = EventConnectorSpecBuilder._build_for_component( + evt_conn_map, component + ) + evt_conn_map.update(comp_evt_conn_map) + return evt_conn_map + + @staticmethod + def _build_for_component( + evt_conn_map: dict[str, ConnectorSpec], component: Component + ) -> dict[str, ConnectorSpec]: + component_evts = set(component.io.input_events + component.io.output_events) + return { + evt.type: EventConnectorSpecBuilder._build_for_event(evt.type) + for evt in component_evts + if evt.type not in evt_conn_map + } + + @staticmethod + def _build_for_event(evt_type: str) -> ConnectorSpec: + evt_type_safe = Event.safe_type(evt_type) + source = ConnectorSocket( + entity=evt_type_safe, descriptor=EventConnectorSpecBuilder._source_descriptor + ) + target = ConnectorSocket( + entity=evt_type_safe, descriptor=EventConnectorSpecBuilder._target_descriptor + ) + spec = ConnectorSpec(source=source, target=target, mode=ConnectorMode.PUBSUB) + return spec diff --git a/plugboard/events/__init__.py b/plugboard/events/__init__.py index fe3edbf2..69a5be32 100644 --- a/plugboard/events/__init__.py +++ b/plugboard/events/__init__.py @@ -1,13 +1,11 @@ """Provides models and utilities for handling events.""" from plugboard.events.event import Event, StopEvent, SystemEvent -from plugboard.events.event_connector_builder import EventConnectorBuilder from plugboard.events.event_handlers import EventHandlers __all__ = [ "Event", - "EventConnectorBuilder", "EventHandlers", "StopEvent", "SystemEvent", diff --git a/plugboard/events/event_connector_builder.py b/plugboard/events/event_connector_builder.py deleted file mode 100644 index 9b102131..00000000 --- a/plugboard/events/event_connector_builder.py +++ /dev/null @@ -1,49 +0,0 @@ -"""Provides `EventConnectorBuilder` class which helps build event connectors for components.""" - -from __future__ import annotations - -import typing as _t - -from plugboard.connector import Connector -from plugboard.events.event import Event -from plugboard.schemas import ConnectorMode, ConnectorSocket, ConnectorSpec - - -if _t.TYPE_CHECKING: - from plugboard.component import Component - from plugboard.connector import ConnectorBuilder - - -class EventConnectorBuilder: # pragma: no cover - """`EventConnectorBuilder` constructs connectors for component event handlers.""" - - _source_descriptor: str = "publishers" - _target_descriptor: str = "subscribers" - - def __init__(self, connector_builder: ConnectorBuilder) -> None: - self._connector_builder = connector_builder - - def build(self, components: _t.Iterable[Component]) -> dict[str, Connector]: - """Returns mapping of connectors for events handled by components.""" - evt_conn_map: dict[str, Connector] = {} - for component in components: - comp_evt_conn_map = self._build_for_component(evt_conn_map, component) - evt_conn_map.update(comp_evt_conn_map) - return evt_conn_map - - def _build_for_component( - self, evt_conn_map: dict[str, Connector], component: Component - ) -> dict[str, Connector]: - component_evts = set(component.io.input_events + component.io.output_events) - return { - evt.type: self._build_for_event(evt.type) - for evt in component_evts - if evt.type not in evt_conn_map - } - - def _build_for_event(self, evt_type: str) -> Connector: - evt_type_safe = Event.safe_type(evt_type) - source = ConnectorSocket(entity=evt_type_safe, descriptor=self._source_descriptor) - target = ConnectorSocket(entity=evt_type_safe, descriptor=self._target_descriptor) - spec = ConnectorSpec(source=source, target=target, mode=ConnectorMode.PUBSUB) - return self._connector_builder.build(spec) diff --git a/plugboard/process/process_builder.py b/plugboard/process/process_builder.py index be6bea6d..5501171c 100644 --- a/plugboard/process/process_builder.py +++ b/plugboard/process/process_builder.py @@ -8,7 +8,7 @@ from plugboard.component.utils import ComponentDecoratorHelper from plugboard.connector.connector import Connector from plugboard.connector.connector_builder import ConnectorBuilder -from plugboard.events.event_connector_builder import EventConnectorBuilder +from plugboard.connector.ray_channel import RayConnector from plugboard.process.process import Process from plugboard.schemas import ProcessSpec from plugboard.state import StateBackend @@ -70,16 +70,15 @@ def _build_connectors(cls, spec: ProcessSpec, components: list[Component]) -> li connector_builder = ConnectorBuilder( connector_cls=connector_class, **dict(spec.connector_builder.args) ) - event_connector_builder = EventConnectorBuilder(connector_builder=connector_builder) # TODO: Remove this when https://github.com/plugboard-dev/plugboard/issues/101 is resolved - if spec.type.endswith("RayProcess"): + if connector_class is RayConnector: # pragma: no cover DI.logger.resolve_sync().warning( - "RayProcess does not yet support event-based models. " + "RayConnector does not yet support event-based models. " "Event connectors will not be built." ) event_connectors = [] else: - event_connectors = list(event_connector_builder.build(components).values()) + event_connectors = connector_builder.build_event_connectors(components) spec_connectors = [connector_builder.build(cs) for cs in spec.args.connectors] return sorted( {conn.id: conn for conn in event_connectors + spec_connectors}.values(), diff --git a/tests/integration/test_component_event_handlers.py b/tests/integration/test_component_event_handlers.py index 51e93702..8a27ce34 100644 --- a/tests/integration/test_component_event_handlers.py +++ b/tests/integration/test_component_event_handlers.py @@ -10,7 +10,7 @@ from plugboard.component import Component, IOController from plugboard.connector import AsyncioConnector, Connector, ConnectorBuilder -from plugboard.events import Event, EventConnectorBuilder +from plugboard.events import Event from plugboard.schemas import ConnectorSpec from tests.conftest import zmq_connector_cls @@ -84,10 +84,9 @@ def connector_cls(_connector_cls: _t.Type[Connector]) -> _t.Type[Connector]: @pytest.fixture -def event_connectors(connector_cls: _t.Type[Connector]) -> EventConnectorBuilder: +def connector_builder(connector_cls: _t.Type[Connector]) -> ConnectorBuilder: """Fixture for an event connectors instance.""" - connector_builder = ConnectorBuilder(connector_cls=connector_cls) - return EventConnectorBuilder(connector_builder=connector_builder) + return ConnectorBuilder(connector_cls=connector_cls) @pytest.mark.asyncio @@ -115,7 +114,7 @@ def event_connectors(connector_cls: _t.Type[Connector]) -> EventConnectorBuilder ], ) async def test_component_event_handlers( - io_controller_kwargs: dict, event_connectors: EventConnectorBuilder + io_controller_kwargs: dict, connector_builder: ConnectorBuilder ) -> None: """Test that event handlers are registered and called correctly for components.""" @@ -124,8 +123,8 @@ class _A(A): a = _A(name="a") - event_connectors_map = event_connectors.build([a]) - connectors = list(event_connectors_map.values()) + connectors = connector_builder.build_event_connectors([a]) + event_connectors_map = {conn.spec.source.entity: conn for conn in connectors} await a.io.connect(connectors) @@ -133,7 +132,7 @@ class _A(A): assert a.event_B_count == 0 evt_A = EventTypeA(data=EventTypeAData(x=2), source="test-driver") - chan_A = await event_connectors_map[evt_A.type].connect_send() + chan_A = await event_connectors_map[evt_A.safe_type()].connect_send() await chan_A.send(evt_A) await a.step() @@ -141,7 +140,7 @@ class _A(A): assert a.event_B_count == 0 evt_B = EventTypeB(data=EventTypeBData(y=4), source="test-driver") - chan_B = await event_connectors_map[evt_B.type].connect_send() + chan_B = await event_connectors_map[evt_B.safe_type()].connect_send() await chan_B.send(evt_B) await a.step() @@ -163,7 +162,7 @@ async def field_connectors(connector_cls: _t.Type[Connector]) -> list[Connector] @pytest.mark.asyncio async def test_component_event_handlers_with_field_inputs( - event_connectors: EventConnectorBuilder, + connector_builder: ConnectorBuilder, field_connectors: list[Connector], ) -> None: """Test that event handlers are registered and called correctly for components.""" @@ -178,8 +177,9 @@ class _A(A): a = _A(name="a") - event_connectors_map = event_connectors.build([a]) - connectors = list(event_connectors_map.values()) + field_connectors + event_connectors = connector_builder.build_event_connectors([a]) + event_connectors_map = {conn.spec.source.entity: conn for conn in event_connectors} + connectors = event_connectors + field_connectors # FIXME : With `ZMQConnector` both send and recv side must be connected to avoid hanging. # : See https://github.com/plugboard-dev/plugboard/issues/101. @@ -199,7 +199,7 @@ class _A(A): # After sending one event of type A, the event_A_count should be 2 evt_A = EventTypeA(data=EventTypeAData(x=2), source="test-driver") - chan_A = await event_connectors_map[evt_A.type].connect_send() + chan_A = await event_connectors_map[evt_A.safe_type()].connect_send() await chan_A.send(evt_A) await a.step() @@ -210,7 +210,7 @@ class _A(A): # After sending one event of type B, the event_B_count should be 4 evt_B = EventTypeB(data=EventTypeBData(y=4), source="test-driver") - chan_B = await event_connectors_map[evt_B.type].connect_send() + chan_B = await event_connectors_map[evt_B.safe_type()].connect_send() await chan_B.send(evt_B) await a.step() diff --git a/tests/integration/test_process_stop_cancel.py b/tests/integration/test_process_stop_cancel.py index 2376e0e1..239dad1d 100644 --- a/tests/integration/test_process_stop_cancel.py +++ b/tests/integration/test_process_stop_cancel.py @@ -17,7 +17,7 @@ RabbitMQConnector, RayConnector, ) -from plugboard.events import EventConnectorBuilder, StopEvent +from plugboard.events import StopEvent from plugboard.process import LocalProcess, Process, RayProcess from plugboard.schemas import ConnectorSpec, Status from tests.conftest import ComponentTestHelper, zmq_connector_cls @@ -73,7 +73,6 @@ async def test_process_stop_event( ) -> None: """Tests that an event-driven Process can be stopped gracefully via a StopEvent.""" connector_builder = ConnectorBuilder(connector_cls=connector_cls) - event_connectors = EventConnectorBuilder(connector_builder=connector_builder) max_iters = 25 iters_before_stop = 15 @@ -89,13 +88,14 @@ async def test_process_stop_event( ] field_connectors: list[Connector] = [conn_ab1, conn_ab2, conn_ab3, conn_ab4, conn_ab5] - event_connectors_map = event_connectors.build(components) - connectors: list[Connector] = list(event_connectors_map.values()) + field_connectors + event_connectors = connector_builder.build_event_connectors(components) + event_connectors_map = {conn.spec.source.entity: conn for conn in event_connectors} + connectors: list[Connector] = event_connectors + field_connectors process = process_cls(components, connectors) async with process: - stop_evt_conn = event_connectors_map[StopEvent.type] + stop_evt_conn = event_connectors_map[StopEvent.safe_type()] stop_chan = await stop_evt_conn.connect_send() async def stop_after() -> None: diff --git a/tests/integration/test_process_with_components_run.py b/tests/integration/test_process_with_components_run.py index 5ca4607a..b941cccf 100644 --- a/tests/integration/test_process_with_components_run.py +++ b/tests/integration/test_process_with_components_run.py @@ -21,7 +21,6 @@ RayConnector, ) from plugboard.events import Event -from plugboard.events.event_connector_builder import EventConnectorBuilder from plugboard.exceptions import NotInitialisedError, ProcessStatusError from plugboard.process import LocalProcess, Process, RayProcess from plugboard.schemas import ConnectorSpec, Status @@ -439,8 +438,7 @@ async def test_event_driven_process_shutdown( components = [clock, controller, actuator] connector_builder = ConnectorBuilder(connector_cls=connector_cls) - event_connector_builder = EventConnectorBuilder(connector_builder=connector_builder) - event_connectors = list(event_connector_builder.build(components).values()) + event_connectors = connector_builder.build_event_connectors(components) process = process_cls(components, event_connectors) await process.init() diff --git a/tests/unit/test_diagram.py b/tests/unit/test_diagram.py index 74cc8bc4..7bc560b8 100644 --- a/tests/unit/test_diagram.py +++ b/tests/unit/test_diagram.py @@ -10,7 +10,7 @@ from plugboard.component import Component, IOController as IO from plugboard.connector import AsyncioConnector, ConnectorBuilder from plugboard.diagram import MermaidDiagram -from plugboard.events import Event, EventConnectorBuilder +from plugboard.events import Event from plugboard.process import LocalProcess from plugboard.schemas import ConnectorSpec @@ -65,8 +65,7 @@ def process() -> LocalProcess: AsyncioConnector(spec=ConnectorSpec(source="component-b.c", target="component-c.c")), ] connector_builder = ConnectorBuilder(connector_cls=AsyncioConnector) # (2)! - event_connector_builder = EventConnectorBuilder(connector_builder=connector_builder) - event_connectors = list(event_connector_builder.build(components).values()) + event_connectors = connector_builder.build_event_connectors(components) return LocalProcess(components=components, connectors=connectors + event_connectors)