diff --git a/roborock/devices/traits/b01/q10/__init__.py b/roborock/devices/traits/b01/q10/__init__.py index 9073bf60..ac897259 100644 --- a/roborock/devices/traits/b01/q10/__init__.py +++ b/roborock/devices/traits/b01/q10/__init__.py @@ -1,12 +1,10 @@ """Traits for Q10 B01 devices.""" -from typing import Any - -from roborock.devices.rpc.b01_q7_channel import send_decoded_command from roborock.devices.traits import Trait from roborock.devices.transport.mqtt_channel import MqttChannel from .command import CommandTrait +from .vacuum import VacuumTrait __all__ = [ "Q10PropertiesApi", @@ -19,9 +17,13 @@ class Q10PropertiesApi(Trait): command: CommandTrait """Trait for sending commands to Q10 devices.""" + vacuum: VacuumTrait + """Trait for sending vacuum related commands to Q10 devices.""" + def __init__(self, channel: MqttChannel) -> None: """Initialize the B01Props API.""" self.command = CommandTrait(channel) + self.vacuum = VacuumTrait(self.command) def create(channel: MqttChannel) -> Q10PropertiesApi: diff --git a/roborock/devices/traits/b01/q10/vacuum.py b/roborock/devices/traits/b01/q10/vacuum.py new file mode 100644 index 00000000..1b7104c6 --- /dev/null +++ b/roborock/devices/traits/b01/q10/vacuum.py @@ -0,0 +1,56 @@ +"""Traits for Q10 B01 devices.""" + +from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP + +from .command import CommandTrait + + +class VacuumTrait: + """Trait for sending vacuum commands. + + This is a wrapper around the CommandTrait for sending vacuum related + commands to Q10 devices. + """ + + def __init__(self, command: CommandTrait) -> None: + """Initialize the VacuumTrait.""" + self._command = command + + async def start_clean(self) -> None: + """Start cleaning.""" + await self._command.send( + command=B01_Q10_DP.START_CLEAN, + # TODO: figure out other commands + # 1 = start cleaning + # 2 = "electoral" clean, also has "clean_parameters" + # 4 = fast create map + params={"cmd": 1}, + ) + + async def pause_clean(self) -> None: + """Pause cleaning.""" + await self._command.send( + command=B01_Q10_DP.PAUSE, + params={}, + ) + + async def resume_clean(self) -> None: + """Resume cleaning.""" + await self._command.send( + command=B01_Q10_DP.RESUME, + params={}, + ) + + async def stop_clean(self) -> None: + """Stop cleaning.""" + await self._command.send( + command=B01_Q10_DP.STOP, + params={}, + ) + + async def return_to_dock(self) -> None: + """Return to dock.""" + await self._command.send( + command=B01_Q10_DP.START_DOCK_TASK, + params={}, + ) diff --git a/tests/devices/traits/b01/q10/test_vacuum.py b/tests/devices/traits/b01/q10/test_vacuum.py new file mode 100644 index 00000000..394dc55a --- /dev/null +++ b/tests/devices/traits/b01/q10/test_vacuum.py @@ -0,0 +1,50 @@ +import json +from collections.abc import Awaitable, Callable +from typing import Any + +import pytest + +from roborock.devices.traits.b01.q10 import Q10PropertiesApi +from roborock.devices.traits.b01.q10.vacuum import VacuumTrait +from tests.fixtures.channel_fixtures import FakeChannel + + +@pytest.fixture(name="fake_channel") +def fake_channel_fixture() -> FakeChannel: + return FakeChannel() + + +@pytest.fixture(name="q10_api") +def q10_api_fixture(fake_channel: FakeChannel) -> Q10PropertiesApi: + return Q10PropertiesApi(fake_channel) # type: ignore[arg-type] + + +@pytest.fixture(name="vacuumm") +def vacuumm_fixture(q10_api: Q10PropertiesApi) -> VacuumTrait: + return q10_api.vacuum + + +@pytest.mark.parametrize( + ("command_fn", "expected_payload"), + [ + (lambda x: x.start_clean(), {"201": {"cmd": 1}}), + (lambda x: x.pause_clean(), {"204": {}}), + (lambda x: x.resume_clean(), {"205": {}}), + (lambda x: x.stop_clean(), {"206": {}}), + (lambda x: x.return_to_dock(), {"203": {}}), + ], +) +async def test_vacuum_commands( + vacuumm: VacuumTrait, + fake_channel: FakeChannel, + command_fn: Callable[[VacuumTrait], Awaitable[None]], + expected_payload: dict[str, Any], +) -> None: + """Test sending a vacuum start command.""" + await command_fn(vacuumm) + + assert len(fake_channel.published_messages) == 1 + message = fake_channel.published_messages[0] + assert message.payload + payload_data = json.loads(message.payload.decode()) + assert payload_data == {"dps": expected_payload}