-
Notifications
You must be signed in to change notification settings - Fork 64
feat: add clean record for Q7 #745
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,9 @@ | ||
| import datetime | ||
| import json | ||
| from dataclasses import dataclass, field | ||
| from functools import cached_property | ||
|
|
||
| from ...exceptions import RoborockException | ||
| from ..containers import RoborockBase | ||
| from .b01_q7_code_mappings import ( | ||
| B01Fault, | ||
|
|
@@ -203,3 +207,82 @@ def wind_name(self) -> str | None: | |
| def work_mode_name(self) -> str | None: | ||
| """Returns the name of the current work mode.""" | ||
| return self.work_mode.value if self.work_mode is not None else None | ||
|
|
||
|
|
||
| @dataclass | ||
| class CleanRecordDetail(RoborockBase): | ||
| """Represents a single clean record detail (from `record_list[].detail`).""" | ||
|
|
||
| record_start_time: int | None = None | ||
| method: int | None = None | ||
| record_use_time: int | None = None | ||
| clean_count: int | None = None | ||
| # This is seemingly returned in meters (non-squared) | ||
| record_clean_area: int | None = None | ||
| record_clean_mode: int | None = None | ||
| record_clean_way: int | None = None | ||
| record_task_status: int | None = None | ||
| record_faultcode: int | None = None | ||
| record_dust_num: int | None = None | ||
| clean_current_map: int | None = None | ||
| record_map_url: str | None = None | ||
|
|
||
| @property | ||
| def start_datetime(self) -> datetime.datetime | None: | ||
| """Convert the start datetime into a datetime object.""" | ||
| if self.record_start_time is not None: | ||
| return datetime.datetime.fromtimestamp(self.record_start_time).astimezone(datetime.UTC) | ||
| return None | ||
|
|
||
| @property | ||
| def square_meters_area_cleaned(self) -> float | None: | ||
| """Returns the area cleaned in square meters.""" | ||
| if self.record_clean_area is not None: | ||
| return self.record_clean_area / 100 | ||
| return None | ||
|
|
||
|
|
||
| @dataclass | ||
| class CleanRecordListItem(RoborockBase): | ||
| """Represents an entry in the clean record list returned by `service.get_record_list`.""" | ||
|
|
||
| url: str | None = None | ||
| detail: str | None = None | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a comment here explaining what's up with this field? It looks like its not really usable and folks are not expected to use this directly and should only access the summary for the last detail. As an alternative you could put the json parsing here as something like
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a cached property |
||
|
|
||
| @cached_property | ||
| def detail_parsed(self) -> CleanRecordDetail | None: | ||
| """Parse and return the detail as a CleanRecordDetail object.""" | ||
| if self.detail is None: | ||
| return None | ||
| try: | ||
| parsed = json.loads(self.detail) | ||
| except json.JSONDecodeError as ex: | ||
| raise RoborockException(f"Invalid B01 record detail JSON: {self.detail!r}") from ex | ||
| return CleanRecordDetail.from_dict(parsed) | ||
|
|
||
|
|
||
| @dataclass | ||
| class CleanRecordList(RoborockBase): | ||
| """Represents the clean record list response from `service.get_record_list`.""" | ||
|
|
||
| total_area: int | None = None | ||
| total_time: int | None = None # stored in seconds | ||
| total_count: int | None = None | ||
| record_list: list[CleanRecordListItem] = field(default_factory=list) | ||
|
|
||
| @property | ||
| def square_meters_area_cleaned(self) -> float | None: | ||
| """Returns the area cleaned in square meters.""" | ||
| if self.total_area is not None: | ||
| return self.total_area / 100 | ||
| return None | ||
|
|
||
|
|
||
| @dataclass | ||
| class CleanRecordSummary(RoborockBase): | ||
| """Represents clean record totals for B01/Q7 devices.""" | ||
|
|
||
| total_time: int | None = None | ||
| total_area: int | None = None | ||
| total_count: int | None = None | ||
| last_record_detail: CleanRecordDetail | None = None | ||
Lash-L marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| """Clean summary / clean records trait for B01 Q7 devices. | ||
|
|
||
| For B01/Q7, the Roborock app uses `service.get_record_list` which returns totals | ||
| and a `record_list` whose items contain a JSON string in `detail`. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
|
|
||
| from roborock import CleanRecordDetail, CleanRecordList, CleanRecordSummary | ||
| from roborock.devices.rpc.b01_q7_channel import send_decoded_command | ||
| from roborock.devices.traits import Trait | ||
| from roborock.devices.transport.mqtt_channel import MqttChannel | ||
| from roborock.exceptions import RoborockException | ||
| from roborock.protocols.b01_q7_protocol import Q7RequestMessage | ||
| from roborock.roborock_typing import RoborockB01Q7Methods | ||
|
|
||
| __all__ = [ | ||
| "CleanSummaryTrait", | ||
| ] | ||
|
|
||
| _LOGGER = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class CleanSummaryTrait(CleanRecordSummary, Trait): | ||
| """B01/Q7 clean summary + clean record access (via record list service).""" | ||
|
|
||
| def __init__(self, channel: MqttChannel) -> None: | ||
Lash-L marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """Initialize the clean summary trait. | ||
|
|
||
| Args: | ||
| channel: MQTT channel used to communicate with the device. | ||
| """ | ||
| super().__init__() | ||
| self._channel = channel | ||
|
|
||
| async def refresh(self) -> None: | ||
| """Refresh totals and last record detail from the device.""" | ||
| record_list = await self._get_record_list() | ||
|
|
||
| self.total_time = record_list.total_time | ||
| self.total_area = record_list.total_area | ||
| self.total_count = record_list.total_count | ||
|
|
||
| details = await self._get_clean_record_details(record_list=record_list) | ||
| self.last_record_detail = details[0] if details else None | ||
|
|
||
| async def _get_record_list(self) -> CleanRecordList: | ||
| """Fetch the raw device clean record list (`service.get_record_list`).""" | ||
| result = await send_decoded_command( | ||
| self._channel, | ||
| Q7RequestMessage(dps=10000, command=RoborockB01Q7Methods.GET_RECORD_LIST, params={}), | ||
| ) | ||
|
|
||
| if not isinstance(result, dict): | ||
| raise TypeError(f"Unexpected response type for GET_RECORD_LIST: {type(result).__name__}: {result!r}") | ||
| return CleanRecordList.from_dict(result) | ||
|
|
||
| async def _get_clean_record_details(self, *, record_list: CleanRecordList) -> list[CleanRecordDetail]: | ||
| """Return parsed record detail objects (newest-first).""" | ||
| details: list[CleanRecordDetail] = [] | ||
| for item in record_list.record_list: | ||
| try: | ||
| parsed = item.detail_parsed | ||
| except RoborockException as ex: | ||
| # Rather than failing if something goes wrong here, we should fail and log to tell the user. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given there is already an exception catch in While typically i think it is good to not swallow exceptions down deep, i'm just thinking since in practice you're ignoring parse errors then just make it less code. |
||
| _LOGGER.debug("Failed to parse record detail: %s", ex) | ||
| continue | ||
| if parsed is not None: | ||
| details.append(parsed) | ||
|
|
||
| # The server returns the newest record at the end of record_list; reverse so newest is first (index 0). | ||
| details.reverse() | ||
| return details | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| import json | ||
| from typing import Any | ||
|
|
||
| from Crypto.Cipher import AES | ||
| from Crypto.Util.Padding import pad | ||
|
|
||
| from roborock.devices.traits.b01.q7 import Q7PropertiesApi | ||
| from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol | ||
| from tests.fixtures.channel_fixtures import FakeChannel | ||
|
|
||
|
|
||
| class B01MessageBuilder: | ||
| """Helper class to build B01 RPC response messages for tests.""" | ||
|
|
||
| def __init__(self) -> None: | ||
| self.msg_id = 123456789 | ||
| self.seq = 2020 | ||
|
|
||
| def build(self, data: dict[str, Any] | str, code: int | None = None) -> RoborockMessage: | ||
| """Build an encoded B01 RPC response message.""" | ||
| message: dict[str, Any] = { | ||
| "msgId": str(self.msg_id), | ||
| "data": data, | ||
| } | ||
| if code is not None: | ||
| message["code"] = code | ||
| return self._build_dps(message) | ||
|
|
||
| def _build_dps(self, message: dict[str, Any] | str) -> RoborockMessage: | ||
| """Build an encoded B01 RPC response message.""" | ||
| dps_payload = {"dps": {"10000": json.dumps(message)}} | ||
| self.seq += 1 | ||
| return RoborockMessage( | ||
| protocol=RoborockMessageProtocol.RPC_RESPONSE, | ||
| payload=pad( | ||
| json.dumps(dps_payload).encode(), | ||
| AES.block_size, | ||
| ), | ||
| version=b"B01", | ||
| seq=self.seq, | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| import math | ||
| import time | ||
| from collections.abc import Generator | ||
| from unittest.mock import patch | ||
|
|
||
| import pytest | ||
|
|
||
| from roborock.devices.traits.b01.q7 import Q7PropertiesApi | ||
| from tests.fixtures.channel_fixtures import FakeChannel | ||
|
|
||
| from . import B01MessageBuilder | ||
|
|
||
|
|
||
| @pytest.fixture(name="fake_channel") | ||
| def fake_channel_fixture() -> FakeChannel: | ||
| return FakeChannel() | ||
|
|
||
|
|
||
| @pytest.fixture(name="q7_api") | ||
| def q7_api_fixture(fake_channel: FakeChannel) -> Q7PropertiesApi: | ||
| return Q7PropertiesApi(fake_channel) # type: ignore[arg-type] | ||
|
|
||
|
|
||
| @pytest.fixture(name="expected_msg_id", autouse=True) | ||
| def next_message_id_fixture() -> Generator[int, None, None]: | ||
| """Fixture to patch get_next_int to return the expected message ID. | ||
|
|
||
| We pick an arbitrary number, but just need it to ensure we can craft a fake | ||
| response with the message id matched to the outgoing RPC. | ||
| """ | ||
| expected_msg_id = math.floor(time.time()) | ||
|
|
||
| # Patch get_next_int to return our expected msg_id so the channel waits for it | ||
| with patch("roborock.protocols.b01_q7_protocol.get_next_int", return_value=expected_msg_id): | ||
| yield expected_msg_id | ||
|
|
||
|
|
||
| @pytest.fixture(name="message_builder") | ||
| def message_builder_fixture(expected_msg_id: int) -> B01MessageBuilder: | ||
| builder = B01MessageBuilder() | ||
| builder.msg_id = expected_msg_id | ||
| return builder |
Uh oh!
There was an error while loading. Please reload this page.