diff --git a/roborock/cli.py b/roborock/cli.py index 9f02b2de..515f7163 100644 --- a/roborock/cli.py +++ b/roborock/cli.py @@ -50,6 +50,7 @@ from roborock.devices.device import RoborockDevice from roborock.devices.device_manager import DeviceManager, UserParams, create_device_manager from roborock.devices.traits import Trait +from roborock.devices.traits.b01.q10.vacuum import VacuumTrait from roborock.devices.traits.v1 import V1TraitMixin from roborock.devices.traits.v1.consumeable import ConsumableAttribute from roborock.devices.traits.v1.map_content import MapContentTrait @@ -438,6 +439,15 @@ async def _display_v1_trait(context: RoborockContext, device_id: str, display_fu click.echo(dump_json(trait.as_dict())) +async def _q10_vacuum_trait(context: RoborockContext, device_id: str) -> VacuumTrait: + """Get VacuumTrait from Q10 device.""" + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is None: + raise RoborockUnsupportedFeature("Device does not support B01 Q10 protocol. Is it a Q10?") + return device.b01_q10_properties.vacuum + + @session.command() @click.option("--device_id", required=True) @click.pass_context @@ -1141,6 +1151,91 @@ def write_markdown_table(product_features: dict[str, dict[str, any]], all_featur click.echo("Done.") +@session.command() +@click.option("--device_id", required=True, help="Device ID") +@click.pass_context +@async_command +async def q10_vacuum_start(ctx: click.Context, device_id: str) -> None: + """Start vacuum cleaning on Q10 device.""" + context: RoborockContext = ctx.obj + try: + trait = await _q10_vacuum_trait(context, device_id) + await trait.start_clean() + click.echo("Starting vacuum cleaning...") + except RoborockUnsupportedFeature: + click.echo("Device does not support B01 Q10 protocol. Is it a Q10?") + except RoborockException as e: + click.echo(f"Error: {e}") + + +@session.command() +@click.option("--device_id", required=True, help="Device ID") +@click.pass_context +@async_command +async def q10_vacuum_pause(ctx: click.Context, device_id: str) -> None: + """Pause vacuum cleaning on Q10 device.""" + context: RoborockContext = ctx.obj + try: + trait = await _q10_vacuum_trait(context, device_id) + await trait.pause_clean() + click.echo("Pausing vacuum cleaning...") + except RoborockUnsupportedFeature: + click.echo("Device does not support B01 Q10 protocol. Is it a Q10?") + except RoborockException as e: + click.echo(f"Error: {e}") + + +@session.command() +@click.option("--device_id", required=True, help="Device ID") +@click.pass_context +@async_command +async def q10_vacuum_resume(ctx: click.Context, device_id: str) -> None: + """Resume vacuum cleaning on Q10 device.""" + context: RoborockContext = ctx.obj + try: + trait = await _q10_vacuum_trait(context, device_id) + await trait.resume_clean() + click.echo("Resuming vacuum cleaning...") + except RoborockUnsupportedFeature: + click.echo("Device does not support B01 Q10 protocol. Is it a Q10?") + except RoborockException as e: + click.echo(f"Error: {e}") + + +@session.command() +@click.option("--device_id", required=True, help="Device ID") +@click.pass_context +@async_command +async def q10_vacuum_stop(ctx: click.Context, device_id: str) -> None: + """Stop vacuum cleaning on Q10 device.""" + context: RoborockContext = ctx.obj + try: + trait = await _q10_vacuum_trait(context, device_id) + await trait.stop_clean() + click.echo("Stopping vacuum cleaning...") + except RoborockUnsupportedFeature: + click.echo("Device does not support B01 Q10 protocol. Is it a Q10?") + except RoborockException as e: + click.echo(f"Error: {e}") + + +@session.command() +@click.option("--device_id", required=True, help="Device ID") +@click.pass_context +@async_command +async def q10_vacuum_dock(ctx: click.Context, device_id: str) -> None: + """Return vacuum to dock on Q10 device.""" + context: RoborockContext = ctx.obj + try: + trait = await _q10_vacuum_trait(context, device_id) + await trait.return_to_dock() + click.echo("Returning vacuum to dock...") + except RoborockUnsupportedFeature: + click.echo("Device does not support B01 Q10 protocol. Is it a Q10?") + except RoborockException as e: + click.echo(f"Error: {e}") + + cli.add_command(login) cli.add_command(discover) cli.add_command(list_devices) @@ -1170,6 +1265,17 @@ def write_markdown_table(product_features: dict[str, dict[str, any]], all_featur cli.add_command(flow_led_status) cli.add_command(led_status) cli.add_command(network_info) +cli.add_command(q10_vacuum_start) +cli.add_command(q10_vacuum_pause) +cli.add_command(q10_vacuum_resume) +cli.add_command(q10_vacuum_stop) +cli.add_command(q10_vacuum_dock) + +session.add_command(q10_vacuum_start) +session.add_command(q10_vacuum_pause) +session.add_command(q10_vacuum_resume) +session.add_command(q10_vacuum_stop) +session.add_command(q10_vacuum_dock) def main(): diff --git a/roborock/data/b01_q10/b01_q10_code_mappings.py b/roborock/data/b01_q10/b01_q10_code_mappings.py index 69dbc8d9..9f684d45 100644 --- a/roborock/data/b01_q10/b01_q10_code_mappings.py +++ b/roborock/data/b01_q10/b01_q10_code_mappings.py @@ -125,7 +125,7 @@ class YXFanLevel(RoborockModeEnum): NORMAL = "normal", 2 STRONG = "strong", 3 MAX = "max", 4 - SUPER = "super", 5 + SUPER = "super", 8 class YXWaterLevel(RoborockModeEnum): diff --git a/roborock/devices/rpc/b01_q10_channel.py b/roborock/devices/rpc/b01_q10_channel.py index a482e109..a11e13dd 100644 --- a/roborock/devices/rpc/b01_q10_channel.py +++ b/roborock/devices/rpc/b01_q10_channel.py @@ -2,17 +2,23 @@ from __future__ import annotations +import asyncio import logging +from collections.abc import Iterable +from typing import Any from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP from roborock.devices.transport.mqtt_channel import MqttChannel from roborock.exceptions import RoborockException from roborock.protocols.b01_q10_protocol import ( ParamsType, + decode_rpc_response, encode_mqtt_payload, ) +from roborock.roborock_message import RoborockMessage _LOGGER = logging.getLogger(__name__) +_TIMEOUT = 10.0 async def send_command( @@ -34,3 +40,61 @@ async def send_command( ex, ) raise + + +async def send_decoded_command( + mqtt_channel: MqttChannel, + command: B01_Q10_DP, + params: ParamsType, + expected_dps: Iterable[B01_Q10_DP] | None = None, +) -> dict[B01_Q10_DP, Any]: + """Send a command and await the first decoded response. + + Q10 responses are not correlated with a message id, so we filter on + expected datapoints when provided. + """ + roborock_message = encode_mqtt_payload(command, params) + future: asyncio.Future[dict[B01_Q10_DP, Any]] = asyncio.get_running_loop().create_future() + + expected_set = set(expected_dps) if expected_dps is not None else None + + def find_response(response_message: RoborockMessage) -> None: + try: + decoded_dps = decode_rpc_response(response_message) + except RoborockException as ex: + _LOGGER.debug( + "Failed to decode B01 Q10 RPC response (expecting %s): %s: %s", + command, + response_message, + ex, + ) + return + if expected_set and not any(dps in decoded_dps for dps in expected_set): + return + if not future.done(): + future.set_result(decoded_dps) + + unsub = await mqtt_channel.subscribe(find_response) + + _LOGGER.debug("Sending MQTT message: %s", roborock_message) + try: + await mqtt_channel.publish(roborock_message) + return await asyncio.wait_for(future, timeout=_TIMEOUT) + except TimeoutError as ex: + raise RoborockException(f"B01 Q10 command timed out after {_TIMEOUT}s ({command})") from ex + except RoborockException as ex: + _LOGGER.warning( + "Error sending B01 Q10 decoded command (%s): %s", + command, + ex, + ) + raise + except Exception as ex: + _LOGGER.exception( + "Error sending B01 Q10 decoded command (%s): %s", + command, + ex, + ) + raise + finally: + unsub() diff --git a/roborock/devices/traits/b01/q10/__init__.py b/roborock/devices/traits/b01/q10/__init__.py index ac897259..f228b933 100644 --- a/roborock/devices/traits/b01/q10/__init__.py +++ b/roborock/devices/traits/b01/q10/__init__.py @@ -4,10 +4,12 @@ from roborock.devices.transport.mqtt_channel import MqttChannel from .command import CommandTrait +from .status import StatusTrait from .vacuum import VacuumTrait __all__ = [ "Q10PropertiesApi", + "StatusTrait", ] @@ -20,10 +22,14 @@ class Q10PropertiesApi(Trait): vacuum: VacuumTrait """Trait for sending vacuum related commands to Q10 devices.""" + status: StatusTrait + """Trait for reading device status values.""" + def __init__(self, channel: MqttChannel) -> None: """Initialize the B01Props API.""" self.command = CommandTrait(channel) self.vacuum = VacuumTrait(self.command) + self.status = StatusTrait(channel) def create(channel: MqttChannel) -> Q10PropertiesApi: diff --git a/roborock/devices/traits/b01/q10/status.py b/roborock/devices/traits/b01/q10/status.py new file mode 100644 index 00000000..4bb2c4f9 --- /dev/null +++ b/roborock/devices/traits/b01/q10/status.py @@ -0,0 +1,79 @@ +"""Status trait for Q10 B01 devices.""" + +from __future__ import annotations + +from typing import Any, cast + +from roborock.data.b01_q10.b01_q10_code_mappings import ( + B01_Q10_DP, + YXDeviceCleanTask, + YXDeviceState, + YXDeviceWorkMode, + YXFanLevel, + YXWaterLevel, +) +from roborock.devices.rpc.b01_q10_channel import send_decoded_command +from roborock.devices.transport.mqtt_channel import MqttChannel + + +class StatusTrait: + """Trait for requesting and holding Q10 status values.""" + + def __init__(self, channel: MqttChannel) -> None: + self._channel = channel + self._data: dict[B01_Q10_DP, Any] = {} + + @property + def data(self) -> dict[B01_Q10_DP, Any]: + """Return the latest raw status data.""" + return self._data + + async def refresh(self) -> dict[B01_Q10_DP, Any]: + """Refresh status values from the device.""" + decoded = await send_decoded_command( + self._channel, + command=B01_Q10_DP.REQUETDPS, + params={}, + expected_dps={B01_Q10_DP.STATUS, B01_Q10_DP.BATTERY}, + ) + self._data = decoded + return decoded + + @property + def state_code(self) -> int | None: + return self._data.get(B01_Q10_DP.STATUS) + + @property + def state(self) -> YXDeviceState | None: + code = self.state_code + return cast(YXDeviceState | None, YXDeviceState.from_code_optional(code)) if code is not None else None + + @property + def battery(self) -> int | None: + return self._data.get(B01_Q10_DP.BATTERY) + + @property + def fan_level(self) -> YXFanLevel | None: + value = self._data.get(B01_Q10_DP.FUN_LEVEL) + return cast(YXFanLevel | None, YXFanLevel.from_code_optional(value)) if value is not None else None + + @property + def water_level(self) -> YXWaterLevel | None: + value = self._data.get(B01_Q10_DP.WATER_LEVEL) + return cast(YXWaterLevel | None, YXWaterLevel.from_code_optional(value)) if value is not None else None + + @property + def clean_mode(self) -> YXDeviceWorkMode | None: + value = self._data.get(B01_Q10_DP.CLEAN_MODE) + return cast(YXDeviceWorkMode | None, YXDeviceWorkMode.from_code_optional(value)) if value is not None else None + + @property + def clean_task(self) -> YXDeviceCleanTask | None: + value = self._data.get(B01_Q10_DP.CLEAN_TASK_TYPE) + return ( + cast(YXDeviceCleanTask | None, YXDeviceCleanTask.from_code_optional(value)) if value is not None else None + ) + + @property + def cleaning_progress(self) -> int | None: + return self._data.get(B01_Q10_DP.CLEANING_PROGRESS) diff --git a/tests/devices/rpc/test_b01_q10_channel.py b/tests/devices/rpc/test_b01_q10_channel.py new file mode 100644 index 00000000..dfe8a602 --- /dev/null +++ b/tests/devices/rpc/test_b01_q10_channel.py @@ -0,0 +1,201 @@ +"""Tests for the b01_q10_channel.""" + +from unittest.mock import patch + +import pytest + +from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP +from roborock.devices.rpc.b01_q10_channel import send_decoded_command +from roborock.exceptions import RoborockException +from roborock.protocols.b01_q10_protocol import encode_mqtt_payload +from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol +from tests.fixtures.channel_fixtures import FakeChannel + + +@pytest.fixture +def mock_mqtt_channel() -> FakeChannel: + """Fixture for a fake MQTT channel.""" + return FakeChannel() + + +async def test_send_decoded_command_success(mock_mqtt_channel: FakeChannel): + """Test successful command sending and response decoding.""" + # Prepare response data + response_data = { + B01_Q10_DP.STATUS: 1, # sleepstate + B01_Q10_DP.BATTERY: 91, + } + + # Encode response message + encoded = encode_mqtt_payload(B01_Q10_DP.STATUS, {}) + response_message = RoborockMessage( + protocol=RoborockMessageProtocol.RPC_RESPONSE, + payload=encoded.payload, + version=encoded.version, + ) + + # Mock the decode_rpc_response to return test data + with patch("roborock.devices.rpc.b01_q10_channel.decode_rpc_response") as mock_decode: + mock_decode.return_value = response_data + mock_mqtt_channel.response_queue.append(response_message) + + # Call the function + result = await send_decoded_command( + mock_mqtt_channel, # type: ignore[arg-type] + B01_Q10_DP.REQUETDPS, + {}, + expected_dps={B01_Q10_DP.STATUS, B01_Q10_DP.BATTERY}, + ) + + # Assertions + assert result == response_data + mock_mqtt_channel.publish.assert_awaited_once() + mock_mqtt_channel.subscribe.assert_awaited_once() + + +async def test_send_decoded_command_filters_by_expected_dps(mock_mqtt_channel: FakeChannel): + """Test that responses are filtered by expected_dps.""" + # First response doesn't match expected_dps + non_matching_data = {B01_Q10_DP.CLEANING_PROGRESS: 50} + + # Second response matches + matching_data = {B01_Q10_DP.STATUS: 1, B01_Q10_DP.BATTERY: 91} + + encoded1 = encode_mqtt_payload(B01_Q10_DP.CLEANING_PROGRESS, {}) + response1 = RoborockMessage( + protocol=RoborockMessageProtocol.RPC_RESPONSE, + payload=encoded1.payload, + version=encoded1.version, + ) + + encoded2 = encode_mqtt_payload(B01_Q10_DP.STATUS, {}) + response2 = RoborockMessage( + protocol=RoborockMessageProtocol.RPC_RESPONSE, + payload=encoded2.payload, + version=encoded2.version, + ) + + with patch("roborock.devices.rpc.b01_q10_channel.decode_rpc_response") as mock_decode: + mock_decode.side_effect = [non_matching_data, matching_data] + + # Add both responses to queue + mock_mqtt_channel.response_queue.extend([response1, response2]) + + # Call the function with expected_dps + result = await send_decoded_command( + mock_mqtt_channel, # type: ignore[arg-type] + B01_Q10_DP.REQUETDPS, + {}, + expected_dps={B01_Q10_DP.STATUS, B01_Q10_DP.BATTERY}, + ) + + # Should get the matching response, not the first one + assert result == matching_data + + +async def test_send_decoded_command_timeout(): + """Test that command times out if no matching response.""" + mock_mqtt_channel = FakeChannel() + + with patch("roborock.devices.rpc.b01_q10_channel.decode_rpc_response") as mock_decode: + mock_decode.return_value = {B01_Q10_DP.CLEANING_PROGRESS: 50} + + # Don't add any responses to queue + with pytest.raises(RoborockException, match="timed out"): + await send_decoded_command( + mock_mqtt_channel, # type: ignore[arg-type] + B01_Q10_DP.REQUETDPS, + {}, + expected_dps={B01_Q10_DP.STATUS}, # Won't match CLEANING_PROGRESS + ) + + +async def test_send_decoded_command_ignores_decode_errors(mock_mqtt_channel: FakeChannel): + """Test that decode errors are logged but don't fail the command.""" + # First response has decode error, second is valid + valid_data = {B01_Q10_DP.STATUS: 1} + + encoded1 = encode_mqtt_payload(B01_Q10_DP.STATUS, {}) + response1 = RoborockMessage( + protocol=RoborockMessageProtocol.RPC_RESPONSE, + payload=encoded1.payload, + version=encoded1.version, + ) + + encoded2 = encode_mqtt_payload(B01_Q10_DP.STATUS, {}) + response2 = RoborockMessage( + protocol=RoborockMessageProtocol.RPC_RESPONSE, + payload=encoded2.payload, + version=encoded2.version, + ) + + with patch("roborock.devices.rpc.b01_q10_channel.decode_rpc_response") as mock_decode: + # First call raises, second returns valid data + mock_decode.side_effect = [ + RoborockException("Decode error"), + valid_data, + ] + + mock_mqtt_channel.response_queue.extend([response1, response2]) + + # Command should still succeed with second response + result = await send_decoded_command( + mock_mqtt_channel, # type: ignore[arg-type] + B01_Q10_DP.REQUETDPS, + {}, + expected_dps={B01_Q10_DP.STATUS}, + ) + + assert result == valid_data + + +async def test_send_decoded_command_no_expected_dps_filter(): + """Test that without expected_dps, any decoded response is accepted.""" + mock_mqtt_channel = FakeChannel() + + response_data = {B01_Q10_DP.CLEANING_PROGRESS: 50} + + encoded = encode_mqtt_payload(B01_Q10_DP.CLEANING_PROGRESS, {}) + response = RoborockMessage( + protocol=RoborockMessageProtocol.RPC_RESPONSE, + payload=encoded.payload, + version=encoded.version, + ) + + with patch("roborock.devices.rpc.b01_q10_channel.decode_rpc_response") as mock_decode: + mock_decode.return_value = response_data + mock_mqtt_channel.response_queue.append(response) + + # Call without expected_dps + result = await send_decoded_command( + mock_mqtt_channel, # type: ignore[arg-type] + B01_Q10_DP.REQUETDPS, + {}, + ) + + assert result == response_data + + +async def test_send_decoded_command_publishes_message(mock_mqtt_channel: FakeChannel): + """Test that the command is properly published.""" + response_data = {B01_Q10_DP.STATUS: 1} + + encoded = encode_mqtt_payload(B01_Q10_DP.STATUS, {}) + response = RoborockMessage( + protocol=RoborockMessageProtocol.RPC_RESPONSE, + payload=encoded.payload, + version=encoded.version, + ) + + with patch("roborock.devices.rpc.b01_q10_channel.decode_rpc_response") as mock_decode: + mock_decode.return_value = response_data + mock_mqtt_channel.response_queue.append(response) + + await send_decoded_command( + mock_mqtt_channel, # type: ignore[arg-type] + B01_Q10_DP.REQUETDPS, + {}, + ) + + # Verify message was published + assert len(mock_mqtt_channel.published_messages) == 1 diff --git a/tests/devices/traits/b01/q10/test_status.py b/tests/devices/traits/b01/q10/test_status.py new file mode 100644 index 00000000..37de19dd --- /dev/null +++ b/tests/devices/traits/b01/q10/test_status.py @@ -0,0 +1,136 @@ +"""Tests for B01 Q10 status trait.""" + +from typing import Any +from unittest.mock import patch + +import pytest + +from roborock.data.b01_q10.b01_q10_code_mappings import ( + B01_Q10_DP, + YXDeviceCleanTask, + YXDeviceState, + YXDeviceWorkMode, + YXFanLevel, + YXWaterLevel, +) +from roborock.devices.traits.b01.q10 import Q10PropertiesApi +from roborock.devices.traits.b01.q10.status import StatusTrait +from tests.fixtures.channel_fixtures import FakeChannel + + +@pytest.fixture(name="fake_channel") +def fake_channel_fixture() -> FakeChannel: + return FakeChannel() + + +@pytest.fixture(name="q10_api") +def q10_api_fixture(fake_channel: FakeChannel) -> Q10PropertiesApi: + return Q10PropertiesApi(fake_channel) # type: ignore[arg-type] + + +@pytest.fixture(name="status_trait") +def status_trait_fixture(q10_api: Q10PropertiesApi) -> StatusTrait: + return q10_api.status + + +async def test_status_refresh( + status_trait: StatusTrait, + fake_channel: FakeChannel, +) -> None: + """Test refreshing status from device.""" + # Simulate device response with status data + response_data: dict[B01_Q10_DP, Any] = { + B01_Q10_DP.STATUS: 2, # sleep_state + B01_Q10_DP.BATTERY: 91, + B01_Q10_DP.FUN_LEVEL: 8, # custom + B01_Q10_DP.WATER_LEVEL: 1, # medium + B01_Q10_DP.CLEAN_MODE: 2, # standard + B01_Q10_DP.CLEAN_TASK_TYPE: 0, # unknown + B01_Q10_DP.CLEANING_PROGRESS: 75, + } + + # Mock send_decoded_command to return response data + with patch("roborock.devices.traits.b01.q10.status.send_decoded_command") as mock_send: + mock_send.return_value = response_data + result = await status_trait.refresh() + + assert result == response_data + assert status_trait.data == response_data + + +async def test_status_properties( + status_trait: StatusTrait, + fake_channel: FakeChannel, +) -> None: + """Test status property accessors.""" + status_trait._data = { + B01_Q10_DP.STATUS: 2, # sleep_state + B01_Q10_DP.BATTERY: 91, + B01_Q10_DP.FUN_LEVEL: 8, # custom + B01_Q10_DP.WATER_LEVEL: 1, # medium + B01_Q10_DP.CLEAN_MODE: 2, # standard + B01_Q10_DP.CLEAN_TASK_TYPE: 0, # unknown + B01_Q10_DP.CLEANING_PROGRESS: 75, + } + + assert status_trait.state_code == 2 + assert status_trait.state == YXDeviceState.SLEEP_STATE + assert status_trait.battery == 91 + assert status_trait.fan_level == YXFanLevel.CUSTOM # type: ignore[attr-defined] + assert status_trait.water_level == YXWaterLevel.MEDIUM # type: ignore[attr-defined] + assert status_trait.clean_mode == YXDeviceWorkMode.STANDARD # type: ignore[attr-defined] + assert status_trait.clean_task == YXDeviceCleanTask.UNKNOWN + assert status_trait.cleaning_progress == 75 + + +async def test_status_properties_empty( + status_trait: StatusTrait, +) -> None: + """Test status properties when no data is available.""" + assert status_trait.state_code is None + assert status_trait.state is None + assert status_trait.battery is None + assert status_trait.fan_level is None + assert status_trait.water_level is None + assert status_trait.clean_mode is None + assert status_trait.clean_task is None + assert status_trait.cleaning_progress is None + + +async def test_status_enum_mappings( + status_trait: StatusTrait, +) -> None: + """Test all enum mappings for status values.""" + test_cases = [ + (B01_Q10_DP.STATUS, 2, YXDeviceState.SLEEP_STATE, "state"), + (B01_Q10_DP.STATUS, 3, YXDeviceState.STANDBY_STATE, "state"), + (B01_Q10_DP.STATUS, 5, YXDeviceState.CLEANING_STATE, "state"), + (B01_Q10_DP.FUN_LEVEL, 1, YXFanLevel.QUIET, "fan_level"), # type: ignore[attr-defined] + (B01_Q10_DP.FUN_LEVEL, 2, YXFanLevel.NORMAL, "fan_level"), + (B01_Q10_DP.FUN_LEVEL, 3, YXFanLevel.STRONG, "fan_level"), + (B01_Q10_DP.FUN_LEVEL, 4, YXFanLevel.MAX, "fan_level"), + (B01_Q10_DP.FUN_LEVEL, 8, YXFanLevel.CUSTOM, "fan_level"), # type: ignore[attr-defined] + (B01_Q10_DP.WATER_LEVEL, 0, YXWaterLevel.LOW, "water_level"), + (B01_Q10_DP.WATER_LEVEL, 1, YXWaterLevel.MEDIUM, "water_level"), # type: ignore[attr-defined] + (B01_Q10_DP.WATER_LEVEL, 2, YXWaterLevel.HIGH, "water_level"), + (B01_Q10_DP.CLEAN_MODE, 1, YXDeviceWorkMode.QUIET, "clean_mode"), # type: ignore[attr-defined] + (B01_Q10_DP.CLEAN_MODE, 2, YXDeviceWorkMode.STANDARD, "clean_mode"), # type: ignore[attr-defined] + (B01_Q10_DP.CLEAN_MODE, 3, YXDeviceWorkMode.HIGH, "clean_mode"), # type: ignore[attr-defined] + ] + + for dp, code, expected_enum, property_name in test_cases: + status_trait._data = {dp: code} + property_value = getattr(status_trait, property_name) + assert property_value == expected_enum, f"Failed for {property_name} with code {code}" + + +async def test_status_data_property( + status_trait: StatusTrait, +) -> None: + """Test the raw data property.""" + test_data = { + B01_Q10_DP.STATUS: 1, + B01_Q10_DP.BATTERY: 50, + } + status_trait._data = test_data + assert status_trait.data == test_data