From b5a972843047cba74f4210f876cc6a0f82ff6a47 Mon Sep 17 00:00:00 2001 From: Carl Quirion Date: Sat, 31 Jan 2026 14:03:03 -0500 Subject: [PATCH 1/3] Implement automatic persisted query for getLocation --- pyhilo/graphql.py | 54 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 43 insertions(+), 11 deletions(-) diff --git a/pyhilo/graphql.py b/pyhilo/graphql.py index 548dd0d..86a49ca 100644 --- a/pyhilo/graphql.py +++ b/pyhilo/graphql.py @@ -1,8 +1,10 @@ import asyncio +import hashlib import logging import ssl from typing import Any, Dict, List, Optional +import aiohttp from gql import Client, gql from gql.transport.aiohttp import AIOHTTPTransport from gql.transport.websockets import WebsocketsTransport @@ -533,18 +535,48 @@ async def async_init(self) -> None: async def call_get_location_query(self, location_hilo_id: str) -> None: """This functions calls the digital-twin and requests location id""" access_token = await self._get_access_token() - transport = AIOHTTPTransport( - url=f"https://{PLATFORM_HOST}/api/digital-twin/v3/graphql", - headers={"Authorization": f"Bearer {access_token}"}, - ) - client = Client(transport=transport, fetch_schema_from_transport=True) - query = gql(self.QUERY_GET_LOCATION) + url = f"https://{PLATFORM_HOST}/api/digital-twin/v3/graphql" + headers = {"Authorization": f"Bearer {access_token}"} - async with client as session: - result = await session.execute( - query, variable_values={"locationHiloId": location_hilo_id} - ) - self._handle_query_result(result) + query = self.QUERY_GET_LOCATION + query_hash = hashlib.sha256(query.encode("utf-8")).hexdigest() + + payload = { + "extensions": { + "persistedQuery": { + "version": 1, + "sha256Hash": query_hash, + } + }, + "variables": {"locationHiloId": location_hilo_id}, + } + + async with aiohttp.ClientSession(headers=headers) as session: + async with session.post(url, json=payload) as response: + try: + response_json = await response.json() + except Exception as e: + LOG.error("Error parsing response: %s", e) + return + + if "errors" in response_json: + for error in response_json["errors"]: + if error.get("message") == "PersistedQueryNotFound": + payload["query"] = query + async with session.post(url, json=payload) as response: + try: + response_json = await response.json() + except Exception as e: + LOG.error("Error parsing response on retry: %s", e) + return + break + + if "errors" in response_json: + LOG.error("GraphQL errors: %s", response_json["errors"]) + return + + if "data" in response_json: + self._handle_query_result(response_json["data"]) async def subscribe_to_device_updated( self, location_hilo_id: str, callback: callable = None From cb576b9ae9aac690a8e7a2f26a49ca54aaf2406b Mon Sep 17 00:00:00 2001 From: Carl Quirion Date: Sat, 31 Jan 2026 15:06:03 -0500 Subject: [PATCH 2/3] http sse instead of websocket apq for subscriptions http/2 --- pyhilo/graphql.py | 195 +++++++++++++++++++++++++++------------------- pyproject.toml | 6 +- requirements.txt | 2 + 3 files changed, 120 insertions(+), 83 deletions(-) diff --git a/pyhilo/graphql.py b/pyhilo/graphql.py index 86a49ca..8935a20 100644 --- a/pyhilo/graphql.py +++ b/pyhilo/graphql.py @@ -1,13 +1,11 @@ import asyncio import hashlib +import json import logging -import ssl -from typing import Any, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional -import aiohttp -from gql import Client, gql -from gql.transport.aiohttp import AIOHTTPTransport -from gql.transport.websockets import WebsocketsTransport +import httpx +from httpx_sse import aconnect_sse from pyhilo import API from pyhilo.const import LOG, PLATFORM_HOST @@ -551,24 +549,28 @@ async def call_get_location_query(self, location_hilo_id: str) -> None: "variables": {"locationHiloId": location_hilo_id}, } - async with aiohttp.ClientSession(headers=headers) as session: - async with session.post(url, json=payload) as response: - try: - response_json = await response.json() - except Exception as e: - LOG.error("Error parsing response: %s", e) - return + async with httpx.AsyncClient(http2=True) as client: + try: + response = await client.post(url, json=payload, headers=headers) + response.raise_for_status() + response_json = response.json() + except Exception as e: + LOG.error("Error parsing response: %s", e) + return if "errors" in response_json: for error in response_json["errors"]: if error.get("message") == "PersistedQueryNotFound": payload["query"] = query - async with session.post(url, json=payload) as response: - try: - response_json = await response.json() - except Exception as e: - LOG.error("Error parsing response on retry: %s", e) - return + try: + response = await client.post( + url, json=payload, headers=headers + ) + response.raise_for_status() + response_json = response.json() + except Exception as e: + LOG.error("Error parsing response on retry: %s", e) + return break if "errors" in response_json: @@ -582,74 +584,107 @@ async def subscribe_to_device_updated( self, location_hilo_id: str, callback: callable = None ) -> None: LOG.debug("subscribe_to_device_updated called") + await self._listen_to_sse( + self.SUBSCRIPTION_DEVICE_UPDATED, + {"locationHiloId": location_hilo_id}, + self._handle_device_subscription_result, + callback, + location_hilo_id, + ) + + async def subscribe_to_location_updated( + self, location_hilo_id: str, callback: callable = None + ) -> None: + LOG.debug("subscribe_to_location_updated called") + await self._listen_to_sse( + self.SUBSCRIPTION_LOCATION_UPDATED, + {"locationHiloId": location_hilo_id}, + self._handle_location_subscription_result, + callback, + location_hilo_id, + ) - # Setting log level to suppress keepalive messages on gql transport - logging.getLogger("gql.transport.websockets").setLevel(logging.WARNING) - - # - loop = asyncio.get_event_loop() - ssl_context = await loop.run_in_executor(None, ssl.create_default_context) - - while True: # Loop to reconnect if the connection is lost - LOG.debug("subscribe_to_device_updated while true") - access_token = await self._get_access_token() - transport = WebsocketsTransport( - url=f"wss://{PLATFORM_HOST}/api/digital-twin/v3/graphql?access_token={access_token}", - ssl=ssl_context, - ) - client = Client(transport=transport, fetch_schema_from_transport=True) - query = gql(self.SUBSCRIPTION_DEVICE_UPDATED) + async def _listen_to_sse( + self, + query: str, + variables: Dict[str, Any], + handler: Callable[[Dict[str, Any]], str], + callback: Optional[Callable[[str], None]] = None, + location_hilo_id: str = None, + ) -> None: + query_hash = hashlib.sha256(query.encode("utf-8")).hexdigest() + payload = { + "extensions": { + "persistedQuery": { + "version": 1, + "sha256Hash": query_hash, + } + }, + "variables": variables, + } + + while True: try: - async with client as session: - async for result in session.subscribe( - query, variable_values={"locationHiloId": location_hilo_id} - ): - LOG.debug( - "subscribe_to_device_updated: Received subscription result %s", - result, - ) - device_hilo_id = self._handle_device_subscription_result(result) - if callback: - callback(device_hilo_id) + access_token = await self._get_access_token() + url = f"https://{PLATFORM_HOST}/api/digital-twin/v3/graphql" + headers = {"Authorization": f"Bearer {access_token}"} + + retry_with_full_query = False + + async with httpx.AsyncClient(http2=True, timeout=None) as client: + async with aconnect_sse( + client, "POST", url, json=payload, headers=headers + ) as event_source: + async for sse in event_source.aiter_sse(): + if not sse.data: + continue + try: + data = json.loads(sse.data) + except json.JSONDecodeError: + continue + + if "errors" in data: + if any( + e.get("message") == "PersistedQueryNotFound" + for e in data["errors"] + ): + retry_with_full_query = True + break + LOG.error( + "GraphQL Subscription Errors: %s", data["errors"] + ) + continue + + if "data" in data: + LOG.debug( + "Received subscription result %s", data["data"] + ) + result = handler(data["data"]) + if callback: + callback(result) + + if retry_with_full_query: + payload["query"] = query + continue + except Exception as e: LOG.debug( - "subscribe_to_device_updated: Connection lost: %s. Reconnecting in 5 seconds...", - e, + "Subscription connection lost: %s. Reconnecting in 5 seconds...", e ) await asyncio.sleep(5) - try: - await self.call_get_location_query(location_hilo_id) - LOG.debug( - "subscribe_to_device_updated, call_get_location_query success" - ) - - except Exception as e2: - LOG.error( - "subscribe_to_device_updated, exception while reconnecting, retrying: %s", - e2, - ) + # Reset payload to APQ only on reconnect + if "query" in payload: + del payload["query"] - async def subscribe_to_location_updated( - self, location_hilo_id: str, callback: callable = None - ) -> None: - access_token = await self._get_access_token() - transport = WebsocketsTransport( - url=f"wss://{PLATFORM_HOST}/api/digital-twin/v3/graphql?access_token={access_token}" - ) - client = Client(transport=transport, fetch_schema_from_transport=True) - query = gql(self.SUBSCRIPTION_LOCATION_UPDATED) - try: - async with client as session: - async for result in session.subscribe( - query, variable_values={"locationHiloId": location_hilo_id} - ): - LOG.debug("Received subscription result %s", result) - device_hilo_id = self._handle_location_subscription_result(result) - callback(device_hilo_id) - except asyncio.CancelledError: - LOG.debug("Subscription cancelled.") - asyncio.sleep(1) - await self.subscribe_to_location_updated(location_hilo_id) + if location_hilo_id: + try: + await self.call_get_location_query(location_hilo_id) + LOG.debug("call_get_location_query success after reconnect") + except Exception as e2: + LOG.error( + "exception while RE-connecting, retrying: %s", + e2, + ) async def _get_access_token(self) -> str: """Get the access token.""" diff --git a/pyproject.toml b/pyproject.toml index e4b7e7a..1e724dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,7 +40,7 @@ exclude = ".venv/.*" [tool.poetry] name = "python-hilo" -version = "2026.1.2" +version = "2026.1.3" description = "A Python3, async interface to the Hilo API" readme = "README.md" authors = ["David Vallee Delisle "] @@ -73,9 +73,9 @@ backoff = ">=1.11.1" python-dateutil = ">=2.8.2" python = "^3.9.0" voluptuous = ">=0.13.1" -websockets = ">=8.1,<16.0" -gql = ">=3.5.2,<5.0.0" pyyaml = "^6.0.2" +httpx = {version = ">=0.20.0", extras = ["http2"]} +httpx-sse = ">=0.4.0" [poetry.group.dev.dependencies] Sphinx = "^7.1.2" diff --git a/requirements.txt b/requirements.txt index 0688b89..20593bb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,8 @@ backoff>=2.2.1 charset-normalizer>=3.1.0 distro>=1.8.0 frozenlist>=1.3.3 +httpx[http2]>=0.20.0 +httpx-sse>=0.4.0 idna>=3.4 multidict>=6.0.4 python-dateutil>=2.8.2 From 217717bac5f8b34ad6ed25dea36a3ea6488855a6 Mon Sep 17 00:00:00 2001 From: "Ian C." <108159253+ic-dev21@users.noreply.github.com> Date: Sun, 1 Feb 2026 12:31:45 -0500 Subject: [PATCH 3/3] Fix Calver --- pyhilo/const.py | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyhilo/const.py b/pyhilo/const.py index bd61da9..f1a7870 100755 --- a/pyhilo/const.py +++ b/pyhilo/const.py @@ -11,7 +11,7 @@ LOG: Final = logging.getLogger(__package__) DEFAULT_STATE_FILE: Final = "hilo_state.yaml" REQUEST_RETRY: Final = 9 -PYHILO_VERSION: Final = "2026.1.02" +PYHILO_VERSION: Final = "2026.2.01" # TODO: Find a way to keep previous line in sync with pyproject.toml automatically CONTENT_TYPE_FORM: Final = "application/x-www-form-urlencoded" diff --git a/pyproject.toml b/pyproject.toml index 1e724dc..f9f1566 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,7 +40,7 @@ exclude = ".venv/.*" [tool.poetry] name = "python-hilo" -version = "2026.1.3" +version = "2026.2.1" description = "A Python3, async interface to the Hilo API" readme = "README.md" authors = ["David Vallee Delisle "]