From 5b63c232708d866425ed98ca620bd2d23586ad9d Mon Sep 17 00:00:00 2001 From: Herklos Date: Thu, 29 Jan 2026 15:54:37 +0100 Subject: [PATCH] Add social collectors --- octobot_backtesting/api/__init__.py | 7 + .../api/social_data_collector.py | 103 ++++++++++ octobot_backtesting/collectors/__init__.py | 4 + .../exchanges/exchange_collector.py | 4 +- .../collectors/social/__init__.py | 10 + .../abstract_social_history_collector.py | 20 ++ .../social/abstract_social_live_collector.py | 20 ++ .../collectors/social/social_collector.py | 105 +++++++++- octobot_backtesting/constants.py | 1 + octobot_backtesting/data/data_file_manager.py | 39 +++- octobot_backtesting/enums.py | 9 + octobot_backtesting/importers/__init__.py | 4 + .../importers/social/__init__.py | 8 + .../importers/social/social_importer.py | 188 ++++++++++++++++++ 14 files changed, 514 insertions(+), 8 deletions(-) create mode 100644 octobot_backtesting/api/social_data_collector.py create mode 100644 octobot_backtesting/collectors/social/abstract_social_history_collector.py create mode 100644 octobot_backtesting/collectors/social/abstract_social_live_collector.py diff --git a/octobot_backtesting/api/__init__.py b/octobot_backtesting/api/__init__.py index c9767d6..3a547f6 100644 --- a/octobot_backtesting/api/__init__.py +++ b/octobot_backtesting/api/__init__.py @@ -19,6 +19,7 @@ from octobot_backtesting.api import importer from octobot_backtesting.api import backtesting from octobot_backtesting.api import exchange_data_collector +from octobot_backtesting.api import social_data_collector from octobot_backtesting.api.data_file_converters import ( convert_data_file, @@ -72,6 +73,10 @@ get_data_collector_progress, is_data_collector_finished, ) +from octobot_backtesting.api.social_data_collector import ( + social_historical_data_collector_factory, + social_live_data_collector_factory, +) __all__ = [ "convert_data_file", @@ -116,4 +121,6 @@ "is_data_collector_in_progress", "get_data_collector_progress", "is_data_collector_finished", + "social_historical_data_collector_factory", + "social_live_data_collector_factory", ] diff --git a/octobot_backtesting/api/social_data_collector.py b/octobot_backtesting/api/social_data_collector.py new file mode 100644 index 0000000..53d887d --- /dev/null +++ b/octobot_backtesting/api/social_data_collector.py @@ -0,0 +1,103 @@ +# Drakkar-Software OctoBot-Backtesting +# Copyright (c) Drakkar-Software, All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 3.0 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. +import octobot_backtesting.collectors as collectors +import octobot_commons.tentacles_management as tentacles_management + + +def social_historical_data_collector_factory(social_name, + tentacles_setup_config, + sources=None, + symbols=None, + start_timestamp=None, + end_timestamp=None, + config=None): + """ + Factory function to create a social history data collector. + :param social_name: Name of the service (e.g., "TwitterService", "TelegramService") + :param tentacles_setup_config: Tentacles setup configuration + :param sources: Optional list of sources/channels to collect from + :param symbols: Optional list of symbols to filter by + :param start_timestamp: Optional start timestamp in milliseconds + :param end_timestamp: Optional end timestamp in milliseconds + :param config: Optional configuration dict + :return: SocialHistoryDataCollector instance + """ + return _social_collector_factory(collectors.AbstractSocialHistoryCollector, + social_name, + tentacles_setup_config, + sources, + symbols, + start_timestamp, + end_timestamp, + config) + + +def social_live_data_collector_factory(social_name, + tentacles_setup_config, + sources=None, + symbols=None, + service_feed_class=None, + channel_name=None, + config=None): + """ + Factory function to create a social live data collector. + :param social_name: Name of the service (e.g., "TwitterService", "TelegramService") + :param tentacles_setup_config: Tentacles setup configuration + :param sources: Optional list of sources/channels to collect from + :param symbols: Optional list of symbols to filter by + :param service_feed_class: Optional service feed class to subscribe to + :param channel_name: Optional channel name to subscribe to directly + :param config: Optional configuration dict + :return: SocialLiveDataCollector instance + """ + collector_class = tentacles_management.get_single_deepest_child_class( + collectors.AbstractSocialLiveCollector + ) + collector_instance = collector_class( + config or {}, + social_name, + tentacles_setup_config, + sources=sources, + symbols=symbols, + service_feed_class=service_feed_class, + channel_name=channel_name + ) + return collector_instance + + +def _social_collector_factory(collector_parent_class, social_name, tentacles_setup_config, + sources, symbols, start_timestamp, end_timestamp, config): + collector_class = tentacles_management.get_single_deepest_child_class(collector_parent_class) + collector_instance = collector_class( + config or {}, + social_name, + tentacles_setup_config, + sources=sources, + symbols=symbols, + use_all_available_sources=sources is None, + start_timestamp=start_timestamp, + end_timestamp=end_timestamp + ) + return collector_instance + + +from octobot_backtesting.api.exchange_data_collector import ( + initialize_and_run_data_collector, + stop_data_collector, + is_data_collector_in_progress, + get_data_collector_progress, + is_data_collector_finished, +) diff --git a/octobot_backtesting/collectors/__init__.py b/octobot_backtesting/collectors/__init__.py index 3ff89c4..3d8a6a1 100644 --- a/octobot_backtesting/collectors/__init__.py +++ b/octobot_backtesting/collectors/__init__.py @@ -30,6 +30,8 @@ from octobot_backtesting.collectors import social from octobot_backtesting.collectors.social import ( SocialDataCollector, + AbstractSocialHistoryCollector, + AbstractSocialLiveCollector, ) @@ -40,4 +42,6 @@ "AbstractExchangeHistoryCollector", "AbstractExchangeLiveCollector", "SocialDataCollector", + "AbstractSocialHistoryCollector", + "AbstractSocialLiveCollector", ] diff --git a/octobot_backtesting/collectors/exchanges/exchange_collector.py b/octobot_backtesting/collectors/exchanges/exchange_collector.py index 2462b76..85bf190 100644 --- a/octobot_backtesting/collectors/exchanges/exchange_collector.py +++ b/octobot_backtesting/collectors/exchanges/exchange_collector.py @@ -21,6 +21,7 @@ import octobot_commons.constants as commons_constants import octobot_backtesting.collectors.data_collector as data_collector +import octobot_backtesting.constants as constants import octobot_backtesting.enums as enums import octobot_backtesting.importers as importers @@ -88,7 +89,8 @@ def _load_timeframes_if_necessary(self): async def _create_description(self): await self.database.insert(enums.DataTables.DESCRIPTION, timestamp=time.time(), - version=self.VERSION, + version=constants.CURRENT_VERSION, + type=enums.DataType.EXCHANGE.value, exchange=self.exchange_name, symbols=json.dumps([symbol.symbol_str for symbol in self.symbols]), time_frames=json.dumps([tf.value for tf in self.time_frames]), diff --git a/octobot_backtesting/collectors/social/__init__.py b/octobot_backtesting/collectors/social/__init__.py index 93d8b45..d3af113 100644 --- a/octobot_backtesting/collectors/social/__init__.py +++ b/octobot_backtesting/collectors/social/__init__.py @@ -15,11 +15,21 @@ # License along with this library. from octobot_backtesting.collectors.social import social_collector +from octobot_backtesting.collectors.social import abstract_social_history_collector +from octobot_backtesting.collectors.social import abstract_social_live_collector from octobot_backtesting.collectors.social.social_collector import ( SocialDataCollector, ) +from octobot_backtesting.collectors.social.abstract_social_history_collector import ( + AbstractSocialHistoryCollector, +) +from octobot_backtesting.collectors.social.abstract_social_live_collector import ( + AbstractSocialLiveCollector, +) __all__ = [ "SocialDataCollector", + "AbstractSocialHistoryCollector", + "AbstractSocialLiveCollector", ] diff --git a/octobot_backtesting/collectors/social/abstract_social_history_collector.py b/octobot_backtesting/collectors/social/abstract_social_history_collector.py new file mode 100644 index 0000000..71c7572 --- /dev/null +++ b/octobot_backtesting/collectors/social/abstract_social_history_collector.py @@ -0,0 +1,20 @@ +# Drakkar-Software OctoBot-Backtesting +# Copyright (c) Drakkar-Software, All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 3.0 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. +import octobot_backtesting.collectors.social.social_collector as social_collector + + +class AbstractSocialHistoryCollector(social_collector.SocialDataCollector): + pass diff --git a/octobot_backtesting/collectors/social/abstract_social_live_collector.py b/octobot_backtesting/collectors/social/abstract_social_live_collector.py new file mode 100644 index 0000000..9d38827 --- /dev/null +++ b/octobot_backtesting/collectors/social/abstract_social_live_collector.py @@ -0,0 +1,20 @@ +# Drakkar-Software OctoBot-Backtesting +# Copyright (c) Drakkar-Software, All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 3.0 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. +import octobot_backtesting.collectors.social.social_collector as social_collector + + +class AbstractSocialLiveCollector(social_collector.SocialDataCollector): + pass diff --git a/octobot_backtesting/collectors/social/social_collector.py b/octobot_backtesting/collectors/social/social_collector.py index 2e2a6fa..ddcc9a2 100644 --- a/octobot_backtesting/collectors/social/social_collector.py +++ b/octobot_backtesting/collectors/social/social_collector.py @@ -13,18 +13,111 @@ # # You should have received a copy of the GNU Lesser General Public # License along with this library. -import octobot_backtesting.collectors as collectors +import json +import logging +import abc +import time +import octobot_backtesting.collectors.data_collector as data_collector +import octobot_backtesting.constants as constants +import octobot_backtesting.enums as enums +import octobot_backtesting.importers as importers -class SocialDataCollector(collectors.DataCollector): - # IMPORTER = SocialDataImporter +try: + import octobot_services.constants as services_constants +except ImportError: + logging.error("SocialDataCollector requires OctoBot-Services package installed") - def __init__(self, config, social_name): - super().__init__(config) + +class SocialDataCollector(data_collector.DataCollector): + VERSION = constants.CURRENT_VERSION + IMPORTER = importers.SocialDataImporter + + def __init__(self, config, social_name, tentacles_setup_config=None, sources=None, symbols=None, + use_all_available_sources=False, + data_format=enums.DataFormats.REGULAR_COLLECTOR_DATA, + start_timestamp=None, end_timestamp=None): + super().__init__(config, data_format=data_format) self.social_name = social_name + self.tentacles_setup_config = tentacles_setup_config + self.sources = sources if sources else [] + self.symbols = symbols if symbols else [] + self.use_all_available_sources = use_all_available_sources + self.start_timestamp = start_timestamp + self.end_timestamp = end_timestamp + self.current_step_index = 0 + self.total_steps = 0 + self.current_step_percent = 0 self.set_file_path() + def get_current_step_index(self): + return self.current_step_index + + def get_total_steps(self): + return self.total_steps + + def get_current_step_percent(self): + return self.current_step_percent + + @abc.abstractmethod + def _load_all_available_sources(self): + raise NotImplementedError("_load_all_available_sources is not implemented") + async def initialize(self): self.create_database() + await self.database.initialize() + + # set config from params + if self.sources: + self.config.setdefault("sources", self.sources) + # get service config if available + existing_service_config = self.config.get(services_constants.CONFIG_CATEGORY_SERVICES, {}).get(self.social_name, {}) + self.config[services_constants.CONFIG_CATEGORY_SERVICES] = {self.social_name: existing_service_config} + if self.symbols: + self.config.setdefault("symbols", [str(symbol) for symbol in self.symbols]) + + def _load_sources_if_necessary(self): + if self.use_all_available_sources: + self._load_all_available_sources() + if self.sources: + self.config["sources"] = self.sources + + async def _create_description(self): + await self.database.insert(enums.DataTables.DESCRIPTION, + timestamp=time.time(), + version=self.VERSION, + type=enums.DataType.SOCIAL.value, + service_name=self.social_name, + sources=json.dumps(self.sources) if self.sources else json.dumps([]), + symbols=json.dumps([str(symbol) for symbol in self.symbols]) if self.symbols else json.dumps([]), + start_timestamp=int(self.start_timestamp/1000) if self.start_timestamp else 0, + end_timestamp=int(self.end_timestamp/1000) if self.end_timestamp + else int(time.time()) if self.start_timestamp else 0) + + async def save_event(self, timestamp, service_name, channel=None, symbol=None, payload=None, multiple=False): + if not multiple: + await self.database.insert(enums.SocialDataTables.SOCIAL_EVENTS, timestamp, + service_name=service_name, + channel=channel if channel else "", + symbol=symbol if symbol else "", + payload=json.dumps(payload)) + else: + # When multiple=True, timestamp should be a list, and varying fields should be lists + # service_name stays constant, channel and symbol can be lists or single values + channel_list = channel if isinstance(channel, list) else [channel if channel else "" for _ in payload] + symbol_list = symbol if isinstance(symbol, list) else [symbol if symbol else "" for _ in payload] + await self.database.insert_all(enums.SocialDataTables.SOCIAL_EVENTS, timestamp=timestamp, + service_name=service_name, + channel=channel_list, + symbol=symbol_list, + payload=[json.dumps(p) for p in payload]) - # TODO initialize with service + async def delete_all(self, table, service_name, channel=None, symbol=None): + kwargs = { + "service_name": service_name, + } + if channel: + kwargs["channel"] = channel + if symbol: + kwargs["symbol"] = symbol + await self.database.delete(table, **kwargs) diff --git a/octobot_backtesting/constants.py b/octobot_backtesting/constants.py index 3ecfc29..649dbef 100644 --- a/octobot_backtesting/constants.py +++ b/octobot_backtesting/constants.py @@ -27,6 +27,7 @@ BACKTESTING_DATA_FILE_EXT = ".data" BACKTESTING_DATA_FILE_TEMP_EXT = ".part" BACKTESTING_DATA_FILE_SEPARATOR = "_" +CURRENT_VERSION = "2.0" BACKTESTING_DATA_FILE_TIME_WRITE_FORMAT = '%Y%m%d_%H%M%S' BACKTESTING_DATA_FILE_TIME_READ_FORMAT = BACKTESTING_DATA_FILE_TIME_WRITE_FORMAT.replace("_", "") BACKTESTING_DATA_FILE_TIME_DISPLAY_FORMAT = '%d %B %Y at %H:%M:%S' diff --git a/octobot_backtesting/data/data_file_manager.py b/octobot_backtesting/data/data_file_manager.py index ba137e4..a42748f 100644 --- a/octobot_backtesting/data/data_file_manager.py +++ b/octobot_backtesting/data/data_file_manager.py @@ -54,7 +54,44 @@ def get_date(time_info) -> str: async def get_database_description(database): description = (await database.select(enums.DataTables.DESCRIPTION, size=1))[0] version = description[1] - if version == "1.0": + if version == "2.0": + data_type = description[2] + if data_type == enums.DataType.EXCHANGE.value: + symbols = json.loads(description[4]) + time_frames = [common_enums.TimeFrames(tf) for tf in json.loads(description[5])] + candles_count = (await database.select_count( + enums.ExchangeDataTables.OHLCV, ["*"], + time_frame=tmf_manager.find_min_time_frame(time_frames).value + ))[0][0] + candles_length = int(candles_count / len(symbols)) if symbols else 0 + return { + enums.DataFormatKeys.TIMESTAMP.value: description[0], + enums.DataFormatKeys.VERSION.value: description[1], + enums.DataFormatKeys.EXCHANGE.value: description[3], + enums.DataFormatKeys.SYMBOLS.value: symbols, + enums.DataFormatKeys.TIME_FRAMES.value: time_frames, + enums.DataFormatKeys.START_TIMESTAMP.value: description[6], + enums.DataFormatKeys.END_TIMESTAMP.value: description[7], + enums.DataFormatKeys.CANDLES_LENGTH.value: candles_length, + } + elif data_type == enums.DataType.SOCIAL.value: + symbols = [] + if description[5]: + try: + symbols = json.loads(description[5]) + except (json.JSONDecodeError, TypeError): + pass + return { + enums.DataFormatKeys.TIMESTAMP.value: description[0], + enums.DataFormatKeys.VERSION.value: description[1], + enums.DataFormatKeys.EXCHANGE.value: description[3] if len(description) > 3 else "social", + enums.DataFormatKeys.SYMBOLS.value: symbols if isinstance(symbols, list) else [], + enums.DataFormatKeys.TIME_FRAMES.value: [], + enums.DataFormatKeys.START_TIMESTAMP.value: description[6] if len(description) > 6 else 0, + enums.DataFormatKeys.END_TIMESTAMP.value: description[7] if len(description) > 7 else 0, + enums.DataFormatKeys.CANDLES_LENGTH.value: 0, + } + elif version == "1.0": return { enums.DataFormatKeys.TIMESTAMP.value: description[0], enums.DataFormatKeys.VERSION.value: description[1], diff --git a/octobot_backtesting/enums.py b/octobot_backtesting/enums.py index baf3d23..d024e96 100644 --- a/octobot_backtesting/enums.py +++ b/octobot_backtesting/enums.py @@ -42,6 +42,11 @@ class ReportFormat(enum.Enum): SYMBOLS_WITH_TF = "symbols_with_time_frames_frames" +class DataType(enum.Enum): + EXCHANGE = "exchange" + SOCIAL = "social" + + class DataTables(enum.Enum): DESCRIPTION = "description" @@ -53,3 +58,7 @@ class ExchangeDataTables(enum.Enum): KLINE = "kline" TICKER = "ticker" FUNDING = "funding" + + +class SocialDataTables(enum.Enum): + SOCIAL_EVENTS = "social_events" diff --git a/octobot_backtesting/importers/__init__.py b/octobot_backtesting/importers/__init__.py index 3a12a80..6c4e6a9 100644 --- a/octobot_backtesting/importers/__init__.py +++ b/octobot_backtesting/importers/__init__.py @@ -31,10 +31,14 @@ import_recent_trades, import_klines, ) +from octobot_backtesting.importers.social import ( + SocialDataImporter, +) __all__ = [ "DataImporter", "ExchangeDataImporter", + "SocialDataImporter", "get_operations_from_timestamps", "import_ohlcvs", "import_tickers", diff --git a/octobot_backtesting/importers/social/__init__.py b/octobot_backtesting/importers/social/__init__.py index 8b6791d..3834995 100644 --- a/octobot_backtesting/importers/social/__init__.py +++ b/octobot_backtesting/importers/social/__init__.py @@ -15,3 +15,11 @@ # License along with this library. from octobot_backtesting.importers.social import social_importer + +from octobot_backtesting.importers.social.social_importer import ( + SocialDataImporter, +) + +__all__ = [ + "SocialDataImporter", +] diff --git a/octobot_backtesting/importers/social/social_importer.py b/octobot_backtesting/importers/social/social_importer.py index 5335228..ad04072 100644 --- a/octobot_backtesting/importers/social/social_importer.py +++ b/octobot_backtesting/importers/social/social_importer.py @@ -13,3 +13,191 @@ # # You should have received a copy of the GNU Lesser General Public # License along with this library. +import json + +import octobot_commons.constants as common_constants +import octobot_commons.errors as common_errors +import octobot_commons.databases as databases + +import octobot_backtesting.constants as constants +import octobot_backtesting.data as data +import octobot_backtesting.enums as enums +import octobot_backtesting.importers as importers +import octobot_backtesting.importers.exchanges.util as importers_util + + +class SocialDataImporter(importers.DataImporter): + def __init__(self, config, file_path): + super().__init__(config, file_path) + + self.service_name = None + self.sources = [] + self.symbols = [] + self.available_data_types = [] + self.has_all_events_history = False + + async def initialize(self) -> None: + self.load_database() + await self.database.initialize() + + # load description + description = await self._get_database_description() + self.service_name = description.get("service_name") + self.sources = description.get("sources", []) + self.symbols = description.get("symbols", []) + self.has_all_events_history = bool(description.get("start_timestamp", 0)) + await self._init_available_data_types() + + self.logger.info(f"Loaded {self.service_name} data file with " + f"sources: {', '.join(self.sources) if self.sources else 'all'}, " + f"symbols: {', '.join(self.symbols) if self.symbols else 'all'}") + + async def _get_database_description(self): + row = (await self.database.select(enums.DataTables.DESCRIPTION, size=1))[0] + if row[2] == enums.DataType.SOCIAL.value: + def _list(v): + try: + out = json.loads(v) if v else [] + return out if isinstance(out, list) else [] + except (json.JSONDecodeError, TypeError): + return [] + return { + "timestamp": row[0], + "version": row[1], + "type": row[2], + "service_name": row[3] if len(row) > 3 else "", + "sources": _list(row[4]) if len(row) > 4 else [], + "symbols": _list(row[5]) if len(row) > 5 else [], + "start_timestamp": row[6] if len(row) > 6 else 0, + "end_timestamp": row[7] if len(row) > 7 else 0, + } + return { + "timestamp": row[0], + "version": row[1], + "service_name": row[2] if len(row) > 2 else "unknown", + "sources": [], + "symbols": [], + "start_timestamp": 0, + "end_timestamp": 0, + } + + async def start(self) -> None: + pass + + async def get_data_timestamp_interval(self, time_frame=None): + """Get timestamp interval for social events""" + minimum_timestamp: float = 0.0 + maximum_timestamp: float = 0.0 + + if enums.SocialDataTables.SOCIAL_EVENTS in self.available_data_types: + try: + min_timestamp = (await self.database.select_min(enums.SocialDataTables.SOCIAL_EVENTS, + [databases.SQLiteDatabase.TIMESTAMP_COLUMN]))[0][0] + max_timestamp = (await self.database.select_max(enums.SocialDataTables.SOCIAL_EVENTS, + [databases.SQLiteDatabase.TIMESTAMP_COLUMN]))[0][0] + if min_timestamp and max_timestamp: + minimum_timestamp = min_timestamp + maximum_timestamp = max_timestamp + except (IndexError, common_errors.DatabaseNotFoundError): + pass + + return minimum_timestamp, maximum_timestamp + + async def _init_available_data_types(self): + self.available_data_types = [table for table in enums.SocialDataTables + if await self.database.check_table_exists(table) + and await self.database.check_table_not_empty(table)] + + async def _get_from_db( + self, service_name, table, + channel=None, + symbol=None, + limit=databases.SQLiteDatabase.DEFAULT_SIZE, + timestamps=None, + operations=None + ): + kwargs = {"service_name": service_name} + if channel: + kwargs["channel"] = channel + if symbol: + kwargs["symbol"] = symbol + + if timestamps: + return await self.database.select_from_timestamp( + table, size=limit, + timestamps=timestamps, + operations=operations, + **kwargs + ) + return await self.database.select( + table, size=limit, + **kwargs + ) + + async def get_social_events(self, service_name=None, channel=None, symbol=None, + limit=databases.SQLiteDatabase.DEFAULT_SIZE, + timestamps=None, + operations=None): + """ + Get social events from database. + :param service_name: Filter by service name (defaults to self.service_name) + :param channel: Filter by channel + :param symbol: Filter by symbol + :param limit: Maximum number of events to return + :param timestamps: List of timestamps to filter by + :param operations: Operations for timestamp filtering + :return: List of event dicts + """ + service_name = service_name or self.service_name + events = await self._get_from_db( + service_name, enums.SocialDataTables.SOCIAL_EVENTS, + channel=channel, + symbol=symbol, + limit=limit, + timestamps=timestamps, + operations=operations + ) + # Parse JSON payloads + result = [] + for event in events: + event_dict = { + "timestamp": event[0], + "service_name": event[1], + "channel": event[2], + "symbol": event[3], + "payload": json.loads(event[4]) if len(event) > 4 else {} + } + result.append(event_dict) + return result + + async def get_social_events_from_timestamps(self, service_name=None, channel=None, symbol=None, + limit=databases.SQLiteDatabase.DEFAULT_SIZE, + inferior_timestamp=-1, superior_timestamp=-1): + """ + Reads social events history from database and populates a local ChronologicalReadDatabaseCache. + Warning: can't read data from before last given inferior_timestamp unless associated cache is reset + """ + return await self._get_from_cache(service_name, channel, symbol, enums.SocialDataTables.SOCIAL_EVENTS, + inferior_timestamp, superior_timestamp, self.get_social_events, limit) + + async def _get_from_cache(self, service_name, channel, symbol, data_type, + inferior_timestamp, superior_timestamp, set_cache_method, limit): + cache_key = (service_name, channel, symbol, data_type) + if not self.chronological_cache.has(cache_key): + # ignore superior timestamp to select everything starting from inferior_timestamp and cache it + select_superior_timestamp = -1 + timestamps, operations = importers_util.get_operations_from_timestamps( + select_superior_timestamp, + inferior_timestamp + ) + # initializer without time_frame args are not expecting the time_frame argument, remove it + # ignore the limit param as it might reduce the available cache and give false later select results + init_cache_method_args = ( + service_name, channel, symbol, databases.SQLiteDatabase.DEFAULT_SIZE, timestamps, operations + ) + self.chronological_cache.set( + await set_cache_method(*init_cache_method_args), + 0, + cache_key + ) + return self.chronological_cache.get(inferior_timestamp, superior_timestamp, cache_key)