From 7194c0a48ae009390e2b481f88a501af69be6eb4 Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Tue, 20 Jan 2026 21:38:46 -0800 Subject: [PATCH 1/3] added async classes --- splitio/client/client.py | 47 ++-- splitio/client/factory.py | 44 ++- splitio/events/events_delivery.py | 7 + splitio/events/events_manager.py | 176 ++++++++---- splitio/events/events_task.py | 65 ++++- splitio/storage/inmemmory.py | 56 +++- splitio/sync/synchronizer.py | 3 + tests/client/test_client.py | 327 ++++++++++++++++++----- tests/client/test_factory.py | 9 +- tests/client/test_input_validator.py | 100 ++++++- tests/client/test_manager.py | 4 +- tests/engine/test_evaluator.py | 37 +-- tests/events/test_events_delivery.py | 17 ++ tests/events/test_events_manager.py | 103 ++++++- tests/events/test_events_task.py | 69 ++++- tests/integration/test_client_e2e.py | 99 +++++-- tests/push/test_split_worker.py | 5 +- tests/storage/test_inmemory_storage.py | 37 +-- tests/sync/test_segments_synchronizer.py | 4 +- tests/sync/test_splits_synchronizer.py | 30 ++- tests/sync/test_synchronizer.py | 20 +- tests/sync/test_telemetry.py | 5 +- tests/util/test_storage_helper.py | 3 +- 23 files changed, 1016 insertions(+), 251 deletions(-) diff --git a/splitio/client/client.py b/splitio/client/client.py index 0074bfb7..3c61166d 100644 --- a/splitio/client/client.py +++ b/splitio/client/client.py @@ -4,12 +4,13 @@ from collections import namedtuple import copy +from splitio.client import input_validator from splitio.engine.evaluator import Evaluator, CONTROL, EvaluationDataFactory, AsyncEvaluationDataFactory from splitio.engine.splitters import Splitter from splitio.models.impressions import Impression, Label, ImpressionDecorated from splitio.models.events import Event, EventWrapper, SdkEvent from splitio.models.telemetry import get_latency_bucket_index, MethodExceptionsAndLatencies -from splitio.client import input_validator +from splitio.optional.loaders import asyncio from splitio.util.time import get_current_epoch_time_ms, utctime_ms @@ -40,7 +41,7 @@ class ClientBase(object): # pylint: disable=too-many-instance-attributes 'impressions_disabled': False } - def __init__(self, factory, recorder, labels_enabled=True, fallback_treatment_calculator=None): + def __init__(self, factory, recorder, events_manager, labels_enabled=True, fallback_treatment_calculator=None): """ Construct a Client instance. @@ -66,6 +67,7 @@ def __init__(self, factory, recorder, labels_enabled=True, fallback_treatment_ca self._telemetry_evaluation_producer = self._factory._telemetry_evaluation_producer self._telemetry_init_producer = self._factory._telemetry_init_producer self._fallback_treatment_calculator = fallback_treatment_calculator + self._events_manager = events_manager @property def ready(self): @@ -221,6 +223,23 @@ def _get_fallback_eval_results(self, eval_result, feature): def _check_impression_label(self, result): return result['impression']['label'] == None or (result['impression']['label'] != None and result['impression']['label'].find(Label.SPLIT_NOT_FOUND) == -1) + def _validate_sdk_event_info(self, sdk_event, callback_handle): + if not self._check_sdk_event(sdk_event): + return False + + if not hasattr(callback_handle, '__call__'): + _LOGGER.warning("Client Event Subscription: The callback handle passed must be of type function, ignoring event subscribing action.") + return False + + return True + + def _check_sdk_event(self, sdk_event): + if not isinstance(sdk_event, SdkEvent): + _LOGGER.warning("Client Event Subscription: The event passed must be of type SdkEvent, ignoring event subscribing action.") + return False + + return True + class Client(ClientBase): # pylint: disable=too-many-instance-attributes """Entry point for the split sdk.""" @@ -239,8 +258,7 @@ def __init__(self, factory, recorder, events_manager, labels_enabled=True, fallb :rtype: Client """ - ClientBase.__init__(self, factory, recorder, labels_enabled, fallback_treatment_calculator) - self._events_manager = events_manager + ClientBase.__init__(self, factory, recorder, events_manager, labels_enabled, fallback_treatment_calculator) self._context_factory = EvaluationDataFactory(factory._get_storage('splits'), factory._get_storage('segments'), factory._get_storage('rule_based_segments')) def destroy(self): @@ -256,17 +274,6 @@ def on(self, sdk_event, callback_handle): return self._events_manager.register(sdk_event, callback_handle) - - def _validate_sdk_event_info(self, sdk_event, callback_handle): - if not isinstance(sdk_event, SdkEvent): - _LOGGER.warning("Client Event Subscription: The event passed must be of type SdkEvent, ignoring event subscribing action.") - return False - - if not hasattr(callback_handle, '__call__'): - _LOGGER.warning("Client Event Subscription: The callback handle passed must be of type function, ignoring event subscribing action.") - return False - - return True def get_treatment(self, key, feature_flag_name, attributes=None, evaluation_options=None): """ @@ -743,7 +750,7 @@ def track(self, key, traffic_type, event_type, value=None, properties=None): class ClientAsync(ClientBase): # pylint: disable=too-many-instance-attributes """Entry point for the split sdk.""" - def __init__(self, factory, recorder, labels_enabled=True, fallback_treatment_calculator=None): + def __init__(self, factory, recorder, events_manager, labels_enabled=True, fallback_treatment_calculator=None): """ Construct a Client instance. @@ -758,7 +765,7 @@ def __init__(self, factory, recorder, labels_enabled=True, fallback_treatment_ca :rtype: Client """ - ClientBase.__init__(self, factory, recorder, labels_enabled, fallback_treatment_calculator) + ClientBase.__init__(self, factory, recorder, events_manager, labels_enabled, fallback_treatment_calculator) self._context_factory = AsyncEvaluationDataFactory(factory._get_storage('splits'), factory._get_storage('segments'), factory._get_storage('rule_based_segments')) async def destroy(self): @@ -769,6 +776,12 @@ async def destroy(self): """ await self._factory.destroy() + async def on(self, sdk_event, callback_handle): + if not self._validate_sdk_event_info(sdk_event, callback_handle): + return + + await self._events_manager.register(sdk_event, callback_handle) + async def get_treatment(self, key, feature_flag_name, attributes=None, evaluation_options=None): """ Get the treatment for a feature and key, with an optional dictionary of attributes, for async calls diff --git a/splitio/client/factory.py b/splitio/client/factory.py index f5a4711b..670cf6c3 100644 --- a/splitio/client/factory.py +++ b/splitio/client/factory.py @@ -19,9 +19,9 @@ TelemetryStorageProducerAsync, TelemetryStorageConsumerAsync from splitio.engine.impressions.manager import Counter as ImpressionsCounter from splitio.engine.impressions.unique_keys_tracker import UniqueKeysTracker, UniqueKeysTrackerAsync -from splitio.events.events_manager import EventsManager +from splitio.events.events_manager import EventsManager, EventsManagerAsync from splitio.events.events_manager_config import EventsManagerConfig -from splitio.events.events_task import EventsTask +from splitio.events.events_task import EventsTask, EventsTaskAsync from splitio.events.events_delivery import EventsDelivery from splitio.models.fallback_config import FallbackTreatmentCalculator from splitio.models.notification import SdkInternalEventNotification @@ -352,6 +352,8 @@ def __init__( # pylint: disable=too-many-arguments storages, labels_enabled, recorder, + internal_events_queue, + events_manager, sync_manager=None, telemetry_producer=None, telemetry_init_producer=None, @@ -387,6 +389,8 @@ def __init__( # pylint: disable=too-many-arguments self._telemetry_submitter = telemetry_submitter self._ready_time = get_current_epoch_time_ms() _LOGGER.debug("Running in asyncio mode") + self._internal_events_queue = internal_events_queue + self._events_manager = events_manager self._manager_start_task = manager_start_task self._status = Status.NOT_INITIALIZED self._sdk_ready_flag = asyncio.Event() @@ -409,6 +413,7 @@ async def _update_status_when_ready_async(self): _LOGGER.debug(str(e)) self._status = Status.READY self._sdk_ready_flag.set() + await self._internal_events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_READY, None)) def manager(self): """ @@ -434,6 +439,7 @@ async def block_until_ready(self, timeout=None): _LOGGER.error("Exception initializing SDK") _LOGGER.debug(str(e)) await self._telemetry_init_producer.record_bur_time_out() + await self._internal_events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_TIMED_OUT, None)) raise TimeoutException('SDK Initialization: time of %d exceeded' % timeout) async def destroy(self, destroyed_event=None): @@ -481,7 +487,7 @@ def client(self): This client is only a set of references to structures hold by the factory. Creating one a fast operation and safe to be used anywhere. """ - return ClientAsync(self, self._recorder, self._labels_enabled, self._fallback_treatment_calculator) + return ClientAsync(self, self._recorder, self._events_manager, self._labels_enabled, self._fallback_treatment_calculator) def _wrap_impression_listener(listener, metadata): """ @@ -698,11 +704,14 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url= 'events': EventsAPIAsync(http_client, api_key, sdk_metadata, telemetry_runtime_producer), 'telemetry': TelemetryAPIAsync(http_client, api_key, sdk_metadata, telemetry_runtime_producer), } + internal_events_queue = asyncio.Queue() + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) + internal_events_task = EventsTaskAsync(events_manager.notify_internal_event, internal_events_queue) storages = { - 'splits': InMemorySplitStorageAsync(cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []), - 'segments': InMemorySegmentStorageAsync(), - 'rule_based_segments': InMemoryRuleBasedSegmentStorageAsync(), + 'splits': InMemorySplitStorageAsync(internal_events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []), + 'segments': InMemorySegmentStorageAsync(internal_events_queue), + 'rule_based_segments': InMemoryRuleBasedSegmentStorageAsync(internal_events_queue), 'impressions': InMemoryImpressionStorageAsync(cfg['impressionsQueueSize'], telemetry_runtime_producer), 'events': InMemoryEventStorageAsync(cfg['eventsQueueSize'], telemetry_runtime_producer), } @@ -748,6 +757,7 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url= TelemetrySyncTaskAsync(synchronizers.telemetry_sync.synchronize_stats, cfg['metricsRefreshRate']), unique_keys_task, clear_filter_task, + internal_events_task ) synchronizer = SynchronizerAsync(synchronizers, tasks) @@ -770,11 +780,12 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url= ) await telemetry_init_producer.record_config(cfg, extra_cfg, total_flag_sets, invalid_flag_sets) + internal_events_task.start() manager_start_task = asyncio.get_running_loop().create_task(manager.start()) return SplitFactoryAsync(api_key, storages, cfg['labelsEnabled'], - recorder, manager, + recorder, internal_events_queue, events_manager, manager, telemetry_producer, telemetry_init_producer, telemetry_submitter, manager_start_task=manager_start_task, api_client=http_client, fallback_treatment_calculator=FallbackTreatmentCalculator(cfg['fallbackTreatments'])) @@ -933,12 +944,16 @@ async def _build_redis_factory_async(api_key, cfg): manager = RedisManagerAsync(synchronizer) await telemetry_init_producer.record_config(cfg, {}, 0, 0) manager.start() + internal_events_queue = asyncio.Queue() + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) split_factory = SplitFactoryAsync( api_key, storages, cfg['labelsEnabled'], recorder, + internal_events_queue, + events_manager, manager, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_init_producer, @@ -1101,12 +1116,16 @@ async def _build_pluggable_factory_async(api_key, cfg): manager = RedisManagerAsync(synchronizer) manager.start() await telemetry_init_producer.record_config(cfg, {}, 0, 0) + internal_events_queue = asyncio.Queue() + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) split_factory = SplitFactoryAsync( api_key, storages, cfg['labelsEnabled'], recorder, + internal_events_queue, + events_manager, manager, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_init_producer, @@ -1205,10 +1224,12 @@ async def _build_localhost_factory_async(cfg): telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer() + internal_events_queue = asyncio.Queue() + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) storages = { - 'splits': InMemorySplitStorageAsync(), - 'segments': InMemorySegmentStorageAsync(), # not used, just to avoid possible future errors. - 'rule_based_segments': InMemoryRuleBasedSegmentStorageAsync(), + 'splits': InMemorySplitStorageAsync(internal_events_queue), + 'segments': InMemorySegmentStorageAsync(internal_events_queue), # not used, just to avoid possible future errors. + 'rule_based_segments': InMemoryRuleBasedSegmentStorageAsync(internal_events_queue), 'impressions': LocalhostImpressionsStorageAsync(), 'events': LocalhostEventsStorageAsync(), } @@ -1257,11 +1278,14 @@ async def _build_localhost_factory_async(cfg): telemetry_evaluation_producer, telemetry_runtime_producer ) + return SplitFactoryAsync( 'localhost', storages, False, recorder, + internal_events_queue, + events_manager, manager, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), diff --git a/splitio/events/events_delivery.py b/splitio/events/events_delivery.py index 129c14dc..a582d8a0 100644 --- a/splitio/events/events_delivery.py +++ b/splitio/events/events_delivery.py @@ -19,3 +19,10 @@ def deliver(self, sdk_event, event_metadata, event_handler): except Exception as ex: _LOGGER.error("Exception when calling handler for Sdk Event %s", sdk_event) _LOGGER.error(ex) + + async def deliver_async(self, sdk_event, event_metadata, event_handler): + try: + await event_handler(event_metadata) + except Exception as ex: + _LOGGER.error("Exception when calling handler for Sdk Event %s", sdk_event) + _LOGGER.error(ex) diff --git a/splitio/events/events_manager.py b/splitio/events/events_manager.py index 9457e24a..b51a992c 100644 --- a/splitio/events/events_manager.py +++ b/splitio/events/events_manager.py @@ -2,6 +2,7 @@ import threading import logging from collections import namedtuple +from splitio.optional.loaders import asyncio from splitio.events import EventsManagerInterface from splitio.models.events import SdkEvent @@ -11,7 +12,7 @@ ValidSdkEvent = namedtuple('ValidSdkEvent', ['sdk_event', 'valid']) ActiveSubscriptions = namedtuple('ActiveSubscriptions', ['triggered', 'handler']) -class EventsManager(EventsManagerInterface): +class EventsManagerBase(EventsManagerInterface): """Events Manager class.""" def __init__(self, events_configurations, events_delivery): @@ -22,54 +23,19 @@ def __init__(self, events_configurations, events_delivery): self._internal_events_status = {} self._events_delivery = events_delivery self._manager_config = events_configurations - self._lock = threading.RLock() def register(self, sdk_event, event_handler): - if self._active_subscriptions.get(sdk_event) != None and self._get_event_handler(sdk_event) != None: - return - - with self._lock: - # SDK ready already fired - if sdk_event == SdkEvent.SDK_READY and self._event_already_triggered(sdk_event): - self._active_subscriptions[sdk_event] = ActiveSubscriptions(True, event_handler) - _LOGGER.debug("EventsManager: Firing SDK_READY event for new subscription") - self._fire_sdk_event(sdk_event, None) - return - - self._active_subscriptions[sdk_event] = ActiveSubscriptions(False, event_handler) - + pass + def unregister(self, sdk_event): - if self._active_subscriptions.get(sdk_event) == None: - return - - with self._lock: - del self._active_subscriptions[sdk_event] - + pass + def notify_internal_event(self, sdk_internal_event, event_metadata): - with self._lock: - for sorted_event in self._manager_config.evaluation_order: - if sorted_event in self._get_sdk_event_if_applicable(sdk_internal_event): - if self._get_event_handler(sorted_event) != None: - self._fire_sdk_event(sorted_event, event_metadata) - - # if client is not subscribed to SDK_READY - if sorted_event == SdkEvent.SDK_READY and self._get_event_handler(sorted_event) == None: - _LOGGER.debug("EventsManager: Registering SDK_READY event as fired") - self._active_subscriptions[SdkEvent.SDK_READY] = ActiveSubscriptions(True, None) - + pass def destroy(self): - with self._lock: - self._active_subscriptions = {} - self._internal_events_status = {} - - def _fire_sdk_event(self, sdk_event, event_metadata): - _LOGGER.debug("EventsManager: Firing Sdk event %s", sdk_event) - notify_event = threading.Thread(target=self._events_delivery.deliver, args=[sdk_event, event_metadata, self._get_event_handler(sdk_event)], - name='SplitSDKEventNotify', daemon=True) - notify_event.start() - self._set_sdk_event_triggered(sdk_event) - + pass + def _event_already_triggered(self, sdk_event): if self._active_subscriptions.get(sdk_event) != None: return self._active_subscriptions.get(sdk_event).triggered @@ -81,11 +47,10 @@ def _get_internal_event_status(self, sdk_internal_event): return self._internal_events_status[sdk_internal_event] return False - + def _update_internal_event_status(self, sdk_internal_event, status): - with self._lock: - self._internal_events_status[sdk_internal_event] = status - + self._internal_events_status[sdk_internal_event] = status + def _set_sdk_event_triggered(self, sdk_event): if self._active_subscriptions.get(sdk_event) == None: return @@ -94,7 +59,7 @@ def _set_sdk_event_triggered(self, sdk_event): return self._active_subscriptions[sdk_event] = self._active_subscriptions[sdk_event]._replace(triggered = True) - + def _get_event_handler(self, sdk_event): if self._active_subscriptions.get(sdk_event) == None: return None @@ -103,12 +68,11 @@ def _get_event_handler(self, sdk_event): def _get_sdk_event_if_applicable(self, sdk_internal_event): final_sdk_event = ValidSdkEvent(None, False) - self._update_internal_event_status(sdk_internal_event, True) events_to_fire = [] require_any_sdk_event = self._check_require_any(sdk_internal_event) if require_any_sdk_event.valid: - if (not self._set_sdk_event_triggered(require_any_sdk_event.sdk_event) and + if (not self._event_already_triggered(require_any_sdk_event.sdk_event) and self._execution_limit(require_any_sdk_event.sdk_event) == 1) or \ self._execution_limit(require_any_sdk_event.sdk_event) == -1: final_sdk_event = final_sdk_event._replace(sdk_event = require_any_sdk_event.sdk_event, @@ -170,4 +134,114 @@ def _check_require_any(self, sdk_internal_event): valid_sdk_event = valid_sdk_event._replace(valid = True, sdk_event = name) return valid_sdk_event - return valid_sdk_event \ No newline at end of file + return valid_sdk_event + +class EventsManager(EventsManagerBase): + """Events Manager class.""" + + def __init__(self, events_configurations, events_delivery): + """ + Construct Events Manager instance. + """ + EventsManagerBase.__init__(self, events_configurations, events_delivery) + self._lock = threading.RLock() + + def register(self, sdk_event, event_handler): + if self._active_subscriptions.get(sdk_event) != None and self._get_event_handler(sdk_event) != None: + return + + with self._lock: + # SDK ready already fired + if sdk_event == SdkEvent.SDK_READY and self._event_already_triggered(sdk_event): + self._active_subscriptions[sdk_event] = ActiveSubscriptions(True, event_handler) + _LOGGER.debug("EventsManager: Firing SDK_READY event for new subscription") + self._fire_sdk_event(sdk_event, None) + return + + self._active_subscriptions[sdk_event] = ActiveSubscriptions(False, event_handler) + + def unregister(self, sdk_event): + if self._active_subscriptions.get(sdk_event) == None: + return + + with self._lock: + del self._active_subscriptions[sdk_event] + + def notify_internal_event(self, sdk_internal_event, event_metadata): + with self._lock: + self._update_internal_event_status(sdk_internal_event, True) + for sorted_event in self._manager_config.evaluation_order: + if sorted_event in self._get_sdk_event_if_applicable(sdk_internal_event): + if self._get_event_handler(sorted_event) != None: + self._fire_sdk_event(sorted_event, event_metadata) + + # if client is not subscribed to SDK_READY + if sorted_event == SdkEvent.SDK_READY and self._get_event_handler(sorted_event) == None: + _LOGGER.debug("EventsManager: Registering SDK_READY event as fired") + self._active_subscriptions[SdkEvent.SDK_READY] = ActiveSubscriptions(True, None) + + def destroy(self): + with self._lock: + self._active_subscriptions = {} + self._internal_events_status = {} + + def _fire_sdk_event(self, sdk_event, event_metadata): + _LOGGER.debug("EventsManager: Firing Sdk event %s", sdk_event) + notify_event = threading.Thread(target=self._events_delivery.deliver, args=[sdk_event, event_metadata, self._get_event_handler(sdk_event)], + name='SplitSDKEventNotify', daemon=True) + notify_event.start() + self._set_sdk_event_triggered(sdk_event) + +class EventsManagerAsync(EventsManagerBase): + """Events Manager Async class.""" + + def __init__(self, events_configurations, events_delivery): + """ + Construct Events Manager instance. + """ + EventsManagerBase.__init__(self, events_configurations, events_delivery) + self._lock = asyncio.Lock() + + async def register(self, sdk_event, event_handler): + if self._active_subscriptions.get(sdk_event) != None and self._get_event_handler(sdk_event) != None: + return + + async with self._lock: + # SDK ready already fired + if sdk_event == SdkEvent.SDK_READY and self._event_already_triggered(sdk_event): + self._active_subscriptions[sdk_event] = ActiveSubscriptions(True, event_handler) + _LOGGER.debug("EventsManager: Firing SDK_READY event for new subscription") + await self._fire_sdk_event(sdk_event, None) + return + + self._active_subscriptions[sdk_event] = ActiveSubscriptions(False, event_handler) + + async def unregister(self, sdk_event): + if self._active_subscriptions.get(sdk_event) == None: + return + + async with self._lock: + del self._active_subscriptions[sdk_event] + + async def notify_internal_event(self, sdk_internal_event, event_metadata): + async with self._lock: + self._update_internal_event_status(sdk_internal_event, True) + for sorted_event in self._manager_config.evaluation_order: + if sorted_event in self._get_sdk_event_if_applicable(sdk_internal_event): + if self._get_event_handler(sorted_event) != None: + await self._fire_sdk_event(sorted_event, event_metadata) + + # if client is not subscribed to SDK_READY + if sorted_event == SdkEvent.SDK_READY and self._get_event_handler(sorted_event) == None: + _LOGGER.debug("EventsManager: Registering SDK_READY event as fired") + self._active_subscriptions[SdkEvent.SDK_READY] = ActiveSubscriptions(True, None) + + async def destroy(self): + async with self._lock: + self._active_subscriptions = {} + self._internal_events_status = {} + + async def _fire_sdk_event(self, sdk_event, event_metadata): + _LOGGER.debug("EventsManager: Firing Sdk event %s", sdk_event) + asyncio.get_running_loop().create_task(self._events_delivery.deliver_async(sdk_event, event_metadata, self._get_event_handler(sdk_event))) + self._set_sdk_event_triggered(sdk_event) \ No newline at end of file diff --git a/splitio/events/events_task.py b/splitio/events/events_task.py index ea0ffce7..8158dc04 100644 --- a/splitio/events/events_task.py +++ b/splitio/events/events_task.py @@ -3,6 +3,8 @@ import threading import abc +from splitio.optional.loaders import asyncio + _LOGGER = logging.getLogger(__name__) class EventsTaskBase(object, metaclass=abc.ABCMeta): @@ -80,4 +82,65 @@ def stop(self, stop_flag=None): return self._running = False - self._internal_events_queue.put(self._centinel) \ No newline at end of file + self._internal_events_queue.put(self._centinel) + +class EventsTaskAsync(EventsTaskBase): + """sdk internal events processing task.""" + + _centinel = object() + + def __init__(self, notify_internal_events, internal_events_queue): + """ + Class constructor. + + :param synchronize_segment: handler to perform segment synchronization on incoming event + :type synchronize_segment: function + + :param segment_queue: queue with segment updates notifications + :type segment_queue: queue + """ + self._internal_events_queue = internal_events_queue + self._handler = notify_internal_events + self._running = False + self._worker = None + + def is_running(self): + """Return whether the working is running.""" + return self._running + + async def _run(self): + """Run worker handler.""" + while self.is_running(): + event = await self._internal_events_queue.get() + if not self.is_running(): + break + + if event == self._centinel: + continue + + _LOGGER.debug('Processing sdk internal event: %s', event.internal_event) + try: + await self._handler(event.internal_event, event.metadata) + except Exception: + _LOGGER.error('Exception raised in events manager') + _LOGGER.debug('Exception information: ', exc_info=True) + + def start(self): + """Start worker.""" + if self.is_running(): + _LOGGER.debug('SDK Event Worker is already running') + return + + self._running = True + _LOGGER.debug('Starting SDK Event Task worker') + asyncio.get_running_loop().create_task(self._run()) + + async def stop(self, stop_flag=None): + """Stop worker.""" + _LOGGER.debug('Stopping SDK Event Task worker') + if not self.is_running(): + _LOGGER.debug('SDK Event Worker is not running. Ignoring.') + return + + self._running = False + await self._internal_events_queue.put(self._centinel) \ No newline at end of file diff --git a/splitio/storage/inmemmory.py b/splitio/storage/inmemmory.py index 675478d3..bbde8816 100644 --- a/splitio/storage/inmemmory.py +++ b/splitio/storage/inmemmory.py @@ -154,10 +154,11 @@ def update(self, to_add, to_delete, new_change_number): [self._put(add_segment) for add_segment in to_add] [self._remove(delete_segment) for delete_segment in to_delete] self._set_change_number(new_change_number) - self._internal_event_queue.put( - SdkInternalEventNotification( - SdkInternalEvent.RB_SEGMENTS_UPDATED, - EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) + if len(to_add) > 0 or len(to_delete) > 0: + self._internal_event_queue.put( + SdkInternalEventNotification( + SdkInternalEvent.RB_SEGMENTS_UPDATED, + EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) def _put(self, rule_based_segment): """ @@ -244,11 +245,12 @@ def fetch_many(self, segment_names): class InMemoryRuleBasedSegmentStorageAsync(RuleBasedSegmentsStorage): """InMemory implementation of a feature flag storage base.""" - def __init__(self): + def __init__(self, internal_event_queue): """Constructor.""" self._lock = asyncio.Lock() self._rule_based_segments = {} self._change_number = -1 + self._internal_event_queue = internal_event_queue async def clear(self): """ @@ -284,6 +286,11 @@ async def update(self, to_add, to_delete, new_change_number): [await self._put(add_segment) for add_segment in to_add] [await self._remove(delete_segment) for delete_segment in to_delete] await self._set_change_number(new_change_number) + if len(to_add) > 0 or len(to_delete) > 0: + await self._internal_event_queue.put( + SdkInternalEventNotification( + SdkInternalEvent.RB_SEGMENTS_UPDATED, + EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) async def _put(self, rule_based_segment): """ @@ -716,7 +723,7 @@ def is_flag_set_exist(self, flag_set): class InMemorySplitStorageAsync(InMemorySplitStorageBase): """InMemory implementation of a feature flag async storage.""" - def __init__(self, flag_sets=[]): + def __init__(self, internal_event_queue, flag_sets=[]): """Constructor.""" self._lock = asyncio.Lock() self._feature_flags = {} @@ -724,6 +731,7 @@ def __init__(self, flag_sets=[]): self._traffic_types = Counter() self.flag_set = FlagSets(flag_sets) self.flag_set_filter = FlagSetsFilter(flag_sets) + self._internal_event_queue = internal_event_queue async def clear(self): """ @@ -772,6 +780,14 @@ async def update(self, to_add, to_delete, new_change_number): [await self._put(add_feature_flag) for add_feature_flag in to_add] [await self._remove(delete_feature_flag) for delete_feature_flag in to_delete] await self._set_change_number(new_change_number) + to_notify = [] + [to_notify.append(feature.name) for feature in to_add] + to_notify.extend(to_delete) + if len(to_notify) > 0: + await self._internal_event_queue.put( + SdkInternalEventNotification( + SdkInternalEvent.FLAGS_UPDATED, + EventsMetadata(SdkEventType.FLAG_UPDATE, set(to_notify)))) async def _put(self, feature_flag): """ @@ -917,6 +933,11 @@ async def kill_locally(self, feature_flag_name, default_treatment, change_number return feature_flag.local_kill(default_treatment, change_number) await self._put(feature_flag) + await self._internal_event_queue.put( + SdkInternalEventNotification( + SdkInternalEvent.FLAG_KILLED_NOTIFICATION, + EventsMetadata(SdkEventType.FLAG_UPDATE, {feature_flag_name}))) + async def get_segment_names(self): """ @@ -1000,10 +1021,11 @@ def update(self, segment_name, to_add, to_remove, change_number=None): if change_number is not None: self._segments[segment_name].change_number = change_number - self._internal_event_queue.put( - SdkInternalEventNotification( - SdkInternalEvent.SEGMENTS_UPDATED, - EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) + if len(to_add) > 0 or len(to_remove) >0: + self._internal_event_queue.put( + SdkInternalEventNotification( + SdkInternalEvent.SEGMENTS_UPDATED, + EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) def get_change_number(self, segment_name): """ @@ -1081,11 +1103,12 @@ def get_segments_keys_count(self): class InMemorySegmentStorageAsync(SegmentStorage): """In-memory implementation of a segment async storage.""" - def __init__(self): + def __init__(self, internal_event_queue): """Constructor.""" self._segments = {} self._change_numbers = {} self._lock = asyncio.Lock() + self._internal_event_queue = internal_event_queue async def get(self, segment_name): """ @@ -1114,6 +1137,11 @@ async def put(self, segment): """ async with self._lock: self._segments[segment.name] = segment + await self._internal_event_queue.put( + SdkInternalEventNotification( + SdkInternalEvent.SEGMENTS_UPDATED, + EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) + async def update(self, segment_name, to_add, to_remove, change_number=None): """ @@ -1134,6 +1162,12 @@ async def update(self, segment_name, to_add, to_remove, change_number=None): self._segments[segment_name].update(to_add, to_remove) if change_number is not None: self._segments[segment_name].change_number = change_number + if len(to_add) > 0 or len(to_remove) >0: + await self._internal_event_queue.put( + SdkInternalEventNotification( + SdkInternalEvent.SEGMENTS_UPDATED, + EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) + async def get_change_number(self, segment_name): """ diff --git a/splitio/sync/synchronizer.py b/splitio/sync/synchronizer.py index 71194d26..6bbb7fa6 100644 --- a/splitio/sync/synchronizer.py +++ b/splitio/sync/synchronizer.py @@ -654,6 +654,9 @@ async def stop_periodic_data_recording(self, blocking): :type blocking: bool """ _LOGGER.debug('Stopping periodic data recording') + if self._split_tasks.internal_events_task: + await self._split_tasks.internal_events_task.stop() + if blocking: await self._stop_periodic_data_recording() _LOGGER.debug('all tasks finished successfully.') diff --git a/tests/client/test_client.py b/tests/client/test_client.py index 94da58a2..1efd4143 100644 --- a/tests/client/test_client.py +++ b/tests/client/test_client.py @@ -4,10 +4,11 @@ import unittest.mock as mock import pytest import queue +import asyncio from splitio.client.client import Client, _LOGGER as _logger, CONTROL, ClientAsync, EvaluationOptions from splitio.client.factory import SplitFactory, Status as FactoryStatus, SplitFactoryAsync -from splitio.events.events_manager import EventsManager +from splitio.events.events_manager import EventsManager, EventsManagerAsync from splitio.models.fallback_config import FallbackTreatmentsConfiguration, FallbackTreatmentCalculator from splitio.models.fallback_treatment import FallbackTreatment from splitio.models.impressions import Impression, Label @@ -1744,11 +1745,17 @@ class ClientAsyncTests(object): # pylint: disable=too-few-public-methods @pytest.mark.asyncio async def test_get_treatment_async(self, mocker): """Test get_treatment_async execution paths.""" + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + telemetry_storage = await InMemoryTelemetryStorageAsync.create() telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() - rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync() + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) @@ -1764,7 +1771,8 @@ async def test_get_treatment_async(self, mocker): class TelemetrySubmitterMock(): async def synchronize_config(*_): - pass + pass + factory = SplitFactoryAsync(mocker.Mock(), {'splits': split_storage, 'segments': segment_storage, @@ -1773,6 +1781,8 @@ async def synchronize_config(*_): 'events': event_storage}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -1780,7 +1790,7 @@ async def synchronize_config(*_): ) await factory.block_until_ready(1) - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(None)) client._evaluator = mocker.Mock(spec=Evaluator) client._evaluator.eval_with_context.return_value = { 'treatment': 'on', @@ -1815,11 +1825,17 @@ def _raise(*_): @pytest.mark.asyncio async def test_get_treatment_with_config_async(self, mocker): """Test get_treatment execution paths.""" + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + telemetry_storage = await InMemoryTelemetryStorageAsync.create() telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() - rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync() + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) @@ -1838,6 +1854,8 @@ async def test_get_treatment_with_config_async(self, mocker): 'events': event_storage}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -1852,7 +1870,7 @@ async def synchronize_config(*_): mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) await factory.block_until_ready(1) - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(None)) client._evaluator = mocker.Mock(spec=Evaluator) client._evaluator.eval_with_context.return_value = { 'treatment': 'on', @@ -1892,11 +1910,17 @@ def _raise(*_): @pytest.mark.asyncio async def test_get_treatments_async(self, mocker): """Test get_treatment execution paths.""" + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + telemetry_storage = await InMemoryTelemetryStorageAsync.create() telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() - rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync() + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) @@ -1915,6 +1939,8 @@ async def test_get_treatments_async(self, mocker): 'events': event_storage}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -1929,7 +1955,7 @@ async def synchronize_config(*_): mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) await factory.block_until_ready(1) - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(None)) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { 'treatment': 'on', @@ -1972,11 +1998,17 @@ def _raise(*_): @pytest.mark.asyncio async def test_get_treatments_by_flag_set_async(self, mocker): """Test get_treatment execution paths.""" + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + telemetry_storage = await InMemoryTelemetryStorageAsync.create() telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() - rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync() + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) @@ -1995,6 +2027,8 @@ async def test_get_treatments_by_flag_set_async(self, mocker): 'events': event_storage}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -2009,7 +2043,7 @@ async def synchronize_config(*_): mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) await factory.block_until_ready(1) - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(None)) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { 'treatment': 'on', @@ -2052,11 +2086,17 @@ def _raise(*_): @pytest.mark.asyncio async def test_get_treatments_by_flag_sets_async(self, mocker): """Test get_treatment execution paths.""" + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + telemetry_storage = await InMemoryTelemetryStorageAsync.create() telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() - rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync() + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) @@ -2075,6 +2115,8 @@ async def test_get_treatments_by_flag_sets_async(self, mocker): 'events': event_storage}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -2089,7 +2131,7 @@ async def synchronize_config(*_): mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) await factory.block_until_ready(1) - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(None)) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { 'treatment': 'on', @@ -2132,11 +2174,17 @@ def _raise(*_): @pytest.mark.asyncio async def test_get_treatments_with_config(self, mocker): """Test get_treatment execution paths.""" + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + telemetry_storage = await InMemoryTelemetryStorageAsync.create() telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() - rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync() + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) @@ -2154,6 +2202,8 @@ async def test_get_treatments_with_config(self, mocker): 'events': event_storage}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -2168,7 +2218,7 @@ async def synchronize_config(*_): mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) await factory.block_until_ready(1) - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(None)) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { 'treatment': 'on', @@ -2216,11 +2266,17 @@ def _raise(*_): @pytest.mark.asyncio async def test_get_treatments_with_config_by_flag_set(self, mocker): """Test get_treatment execution paths.""" + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + telemetry_storage = await InMemoryTelemetryStorageAsync.create() telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() - rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync() + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) @@ -2238,6 +2294,8 @@ async def test_get_treatments_with_config_by_flag_set(self, mocker): 'events': event_storage}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -2252,7 +2310,7 @@ async def synchronize_config(*_): mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) await factory.block_until_ready(1) - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(None)) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { 'treatment': 'on', @@ -2300,11 +2358,17 @@ def _raise(*_): @pytest.mark.asyncio async def test_get_treatments_with_config_by_flag_sets(self, mocker): """Test get_treatment execution paths.""" + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + telemetry_storage = await InMemoryTelemetryStorageAsync.create() telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() - rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync() + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) @@ -2322,6 +2386,8 @@ async def test_get_treatments_with_config_by_flag_sets(self, mocker): 'events': event_storage}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -2336,7 +2402,7 @@ async def synchronize_config(*_): mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) await factory.block_until_ready(1) - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(None)) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { 'treatment': 'on', @@ -2384,11 +2450,17 @@ def _raise(*_): @pytest.mark.asyncio async def test_impression_toggle_optimized(self, mocker): """Test get_treatment execution paths.""" + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + telemetry_storage = await InMemoryTelemetryStorageAsync.create() telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() - rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync() + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) @@ -2409,6 +2481,8 @@ async def test_impression_toggle_optimized(self, mocker): 'events': event_storage}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -2422,7 +2496,7 @@ async def test_impression_toggle_optimized(self, mocker): from_raw(splits_json['splitChange1_1']['ff']['d'][1]), from_raw(splits_json['splitChange1_1']['ff']['d'][2]) ], [], -1) - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(None)) treatment = await client.get_treatment('some_key', 'SPLIT_1') assert treatment == 'off' treatment = await client.get_treatment('some_key', 'SPLIT_2') @@ -2447,11 +2521,17 @@ async def test_impression_toggle_optimized(self, mocker): @pytest.mark.asyncio async def test_impression_toggle_debug(self, mocker): """Test get_treatment execution paths.""" + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + telemetry_storage = await InMemoryTelemetryStorageAsync.create() telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() - rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync() + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) @@ -2472,6 +2552,8 @@ async def test_impression_toggle_debug(self, mocker): 'events': event_storage}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -2485,7 +2567,7 @@ async def test_impression_toggle_debug(self, mocker): from_raw(splits_json['splitChange1_1']['ff']['d'][1]), from_raw(splits_json['splitChange1_1']['ff']['d'][2]) ], [], -1) - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(None)) assert await client.get_treatment('some_key', 'SPLIT_1') == 'off' assert await client.get_treatment('some_key', 'SPLIT_2') == 'on' assert await client.get_treatment('some_key', 'SPLIT_3') == 'on' @@ -2507,11 +2589,17 @@ async def test_impression_toggle_debug(self, mocker): @pytest.mark.asyncio async def test_impression_toggle_none(self, mocker): """Test get_treatment execution paths.""" + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + telemetry_storage = await InMemoryTelemetryStorageAsync.create() telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() - rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync() + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) @@ -2532,6 +2620,8 @@ async def test_impression_toggle_none(self, mocker): 'events': event_storage}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -2545,7 +2635,7 @@ async def test_impression_toggle_none(self, mocker): from_raw(splits_json['splitChange1_1']['ff']['d'][1]), from_raw(splits_json['splitChange1_1']['ff']['d'][2]) ], [], -1) - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(None)) assert await client.get_treatment('some_key', 'SPLIT_1') == 'off' assert await client.get_treatment('some_key', 'SPLIT_2') == 'on' assert await client.get_treatment('some_key', 'SPLIT_3') == 'on' @@ -2557,7 +2647,13 @@ async def test_impression_toggle_none(self, mocker): @pytest.mark.asyncio async def test_track_async(self, mocker): """Test that destroy/destroyed calls are forwarded to the factory.""" - split_storage = InMemorySplitStorageAsync() + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + + split_storage = InMemorySplitStorageAsync(internal_events_queue) segment_storage = mocker.Mock(spec=SegmentStorage) rb_segment_storage = mocker.Mock(spec=RuleBasedSegmentsStorage) impression_storage = mocker.Mock(spec=ImpressionStorage) @@ -2580,6 +2676,8 @@ async def put(event): 'events': event_storage}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -2596,7 +2694,7 @@ async def synchronize_config(*_): mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) await factory.block_until_ready(1) - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(None)) assert await client.track('key', 'user', 'purchase', 12) is True assert self.events[0] == [EventWrapper( event=Event('key', 'user', 'purchase', 12, 1000, None), @@ -2606,11 +2704,17 @@ async def synchronize_config(*_): @pytest.mark.asyncio async def test_telemetry_not_ready_async(self, mocker): + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + telemetry_storage = await InMemoryTelemetryStorageAsync.create() telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() - rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync() + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = InMemoryEventStorageAsync(10, telemetry_runtime_producer) @@ -2625,6 +2729,8 @@ async def test_telemetry_not_ready_async(self, mocker): 'events': mocker.Mock()}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -2640,7 +2746,7 @@ async def synchronize_config(*_): type(factory).ready = ready_property await factory.block_until_ready(1) - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(None)) assert await client.get_treatment('some_key', 'SPLIT_2') == CONTROL assert(telemetry_storage._tel_config._not_ready == 1) await client.track('key', 'tt', 'ev') @@ -2649,11 +2755,17 @@ async def synchronize_config(*_): @pytest.mark.asyncio async def test_telemetry_record_treatment_exception_async(self, mocker): + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + telemetry_storage = await InMemoryTelemetryStorageAsync.create() telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() - rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync() + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = InMemoryEventStorageAsync(10, telemetry_runtime_producer) @@ -2674,6 +2786,8 @@ async def test_telemetry_record_treatment_exception_async(self, mocker): 'events': event_storage}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -2688,7 +2802,7 @@ async def synchronize_config(*_): ready_property.return_value = True type(factory).ready = ready_property - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(None)) client._evaluator = mocker.Mock() def _raise(*_): raise RuntimeError('something') @@ -2723,11 +2837,17 @@ def _raise(*_): @pytest.mark.asyncio async def test_telemetry_method_latency_async(self, mocker): + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + telemetry_storage = await InMemoryTelemetryStorageAsync.create() telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() - rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync() + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = InMemoryEventStorageAsync(10, telemetry_runtime_producer) @@ -2748,6 +2868,8 @@ async def test_telemetry_method_latency_async(self, mocker): 'events': event_storage}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -2766,7 +2888,7 @@ async def synchronize_config(*_): await factory.block_until_ready(1) except: pass - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(None)) assert await client.get_treatment('key', 'SPLIT_2') == 'on' assert(telemetry_storage._method_latencies._treatment[0] == 1) @@ -2798,7 +2920,13 @@ async def synchronize_config(*_): @pytest.mark.asyncio async def test_telemetry_track_exception_async(self, mocker): - split_storage = InMemorySplitStorageAsync() + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + + split_storage = InMemorySplitStorageAsync(internal_events_queue) segment_storage = mocker.Mock(spec=SegmentStorage) rb_segment_storage = mocker.Mock(spec=RuleBasedSegmentsStorage) impression_storage = mocker.Mock(spec=ImpressionStorage) @@ -2821,6 +2949,8 @@ async def test_telemetry_track_exception_async(self, mocker): 'events': event_storage}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -2836,7 +2966,7 @@ async def exc(*_): recorder.record_track_stats = exc await factory.block_until_ready(1) - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(None)) try: await client.track('key', 'tt', 'ev') except: @@ -2847,11 +2977,17 @@ async def exc(*_): @pytest.mark.asyncio async def test_impressions_properties_async(self, mocker): """Test get_treatment_async execution paths.""" + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + telemetry_storage = await InMemoryTelemetryStorageAsync.create() telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() - rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync() + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) @@ -2876,6 +3012,8 @@ async def synchronize_config(*_): 'events': event_storage}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -2883,7 +3021,7 @@ async def synchronize_config(*_): ) await factory.block_until_ready(1) - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(None)) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { 'treatment': 'on', @@ -2956,6 +3094,12 @@ async def synchronize_config(*_): @pytest.mark.asyncio async def test_fallback_treatment_eval_exception(self, mocker): # using fallback when the evaluator has RuntimeError exception + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + split_storage = mocker.Mock(spec=SplitStorage) segment_storage = mocker.Mock(spec=SegmentStorage) rb_segment_storage = mocker.Mock(spec=RuleBasedSegmentsStorage) @@ -2985,6 +3129,8 @@ async def synchronize_config(*_): 'events': event_storage}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, impmanager, telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -3001,7 +3147,7 @@ async def put(impressions): self.imps = impressions impression_storage.put = put - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(FallbackTreatment("on-global", '{"prop": "val"}')))) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(FallbackTreatment("on-global", '{"prop": "val"}')))) def eval_with_context(*_): raise RuntimeError() @@ -3112,6 +3258,12 @@ async def fetch_many_rbs(*_): @pytest.mark.asyncio async def test_fallback_treatment_exception(self, mocker): # using fallback when the evaluator has RuntimeError exception + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + split_storage = mocker.Mock(spec=SplitStorage) segment_storage = mocker.Mock(spec=SegmentStorage) rb_segment_storage = mocker.Mock(spec=RuleBasedSegmentsStorage) @@ -3136,6 +3288,8 @@ async def test_fallback_treatment_exception(self, mocker): 'events': event_storage}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, impmanager, telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -3156,7 +3310,7 @@ def synchronize_config(*_): pass factory._telemetry_submitter = TelemetrySubmitterMock() - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(FallbackTreatment("on-global")))) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(FallbackTreatment("on-global")))) def eval_with_context(*_): raise Exception() @@ -3204,6 +3358,12 @@ async def context_for(*_): @pytest.mark.asyncio async def test_fallback_treatment_not_ready_impressions(self, mocker): # using fallback when the evaluator has RuntimeError exception + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + split_storage = mocker.Mock(spec=SplitStorage) segment_storage = mocker.Mock(spec=SegmentStorage) rb_segment_storage = mocker.Mock(spec=RuleBasedSegmentsStorage) @@ -3228,6 +3388,8 @@ async def manager_start_task(): 'events': event_storage}, mocker.Mock(), recorder, + internal_events_queue, + events_manager, impmanager, telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -3245,7 +3407,7 @@ def synchronize_config(*_): pass factory._telemetry_submitter = TelemetrySubmitterMock() - client = ClientAsync(factory, recorder, True, FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(FallbackTreatment("on-global")))) + client = ClientAsync(factory, recorder, events_manager, True, FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(FallbackTreatment("on-global")))) ready_property = mocker.PropertyMock() ready_property.return_value = False type(factory).ready = ready_property @@ -3286,4 +3448,29 @@ async def context_for(*_): try: await factory.destroy() except: - pass \ No newline at end of file + pass + + @pytest.mark.asyncio + async def test_events_subscription(self, mocker): + events_manager = mocker.Mock(spec=EventsManagerAsync) + self.event = None + self.handle = None + async def register(sdk_event, callback_handle): + self.event = sdk_event + self.handle = callback_handle + events_manager.register = register + + client = ClientAsync(mocker.Mock(), mocker.Mock(), events_manager, True, FallbackTreatmentCalculator(None)) + await client.on(SdkEvent.SDK_READY, self.event_callback) + assert self.event == SdkEvent.SDK_READY + assert self.handle == self.event_callback + + self.event = None + await client.on("dd", self.event_callback) + assert self.event == None + + await client.on(SdkEvent.SDK_READY, "qwe") + assert self.event == None + + async def event_callback(self, metadata): + pass \ No newline at end of file diff --git a/tests/client/test_factory.py b/tests/client/test_factory.py index 14a6ec27..45e64c72 100644 --- a/tests/client/test_factory.py +++ b/tests/client/test_factory.py @@ -20,6 +20,7 @@ from splitio.engine.evaluator import Evaluator, EvaluationContext from splitio.engine.impressions.strategies import StrategyDebugMode, StrategyNoneMode, StrategyOptimizedMode from splitio.events.events_task import EventsTask +from splitio.events.events_manager import EventsManagerAsync from splitio.models.splits import from_raw from splitio.models.fallback_config import FallbackTreatmentsConfiguration, FallbackTreatmentCalculator from splitio.models.fallback_treatment import FallbackTreatment @@ -1078,8 +1079,14 @@ async def test_pluggable_client_creation_async(self, mocker): @pytest.mark.asyncio async def test_destroy_redis_async(self, mocker): + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + async def _make_factory_with_apikey(apikey, *_, **__): - return SplitFactoryAsync(apikey, {}, True, mocker.Mock(spec=ImpressionsManager), None, mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock()) + return SplitFactoryAsync(apikey, {}, True, mocker.Mock(), internal_events_queue, events_manager, mocker.Mock(spec=ManagerAsync), mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock()) factory_module_logger = mocker.Mock() build_redis = mocker.Mock() diff --git a/tests/client/test_input_validator.py b/tests/client/test_input_validator.py index 2df8964b..e1634f54 100644 --- a/tests/client/test_input_validator.py +++ b/tests/client/test_input_validator.py @@ -1,10 +1,12 @@ """Unit tests for the input_validator module.""" import pytest import logging +import asyncio from splitio.client.factory import SplitFactory, get_factory, SplitFactoryAsync, get_factory_async from splitio.client.client import CONTROL, Client, _LOGGER as _logger, ClientAsync from splitio.client.key import Key +from splitio.events.events_manager import EventsManagerAsync from splitio.storage import SplitStorage, EventStorage, ImpressionStorage, SegmentStorage, RuleBasedSegmentsStorage from splitio.storage.inmemmory import InMemoryTelemetryStorage, InMemoryTelemetryStorageAsync, \ InMemorySplitStorage, InMemorySplitStorageAsync, InMemoryRuleBasedSegmentStorage, InMemoryRuleBasedSegmentStorageAsync @@ -1682,6 +1684,12 @@ class ClientInputValidationAsyncTests(object): @pytest.mark.asyncio async def test_get_treatment(self, mocker): """Test get_treatment validation.""" + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + split_mock = mocker.Mock(spec=Split) default_treatment_mock = mocker.PropertyMock() default_treatment_mock.return_value = 'default_treatment' @@ -1720,6 +1728,8 @@ async def get_change_number(*_): }, mocker.Mock(), recorder, + internal_events_queue, + events_manager, impmanager, telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -1730,7 +1740,7 @@ async def get_change_number(*_): ready_mock.return_value = True type(factory).ready = ready_mock - client = ClientAsync(factory, mocker.Mock(), mocker.Mock(), FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, mocker.Mock(), events_manager, mocker.Mock(), FallbackTreatmentCalculator(None)) async def record_treatment_stats(*_): pass @@ -1941,6 +1951,12 @@ async def fetch_many(*_): @pytest.mark.asyncio async def test_get_treatment_with_config(self, mocker): """Test get_treatment validation.""" + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + split_mock = mocker.Mock(spec=Split) default_treatment_mock = mocker.PropertyMock() default_treatment_mock.return_value = 'default_treatment' @@ -1983,6 +1999,8 @@ async def get_change_number(*_): }, mocker.Mock(), recorder, + internal_events_queue, + events_manager, impmanager, telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -1993,7 +2011,7 @@ async def get_change_number(*_): ready_mock.return_value = True type(factory).ready = ready_mock - client = ClientAsync(factory, mocker.Mock(), mocker.Mock(), FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, mocker.Mock(), events_manager, mocker.Mock(), FallbackTreatmentCalculator(None)) async def record_treatment_stats(*_): pass client._recorder.record_treatment_stats = record_treatment_stats @@ -2203,6 +2221,12 @@ async def fetch_many(*_): @pytest.mark.asyncio async def test_track(self, mocker): """Test track method().""" + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + events_storage_mock = mocker.Mock(spec=EventStorage) async def put(*_): return True @@ -2228,6 +2252,8 @@ async def put(*_): }, mocker.Mock(), recorder, + internal_events_queue, + events_manager, impmanager, telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -2236,7 +2262,7 @@ async def put(*_): ) factory._sdk_key = 'some-test' - client = ClientAsync(factory, recorder, mocker.Mock(), FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, mocker.Mock(), FallbackTreatmentCalculator(None)) client._event_storage = event_storage _logger = mocker.Mock() mocker.patch('splitio.client.input_validator._LOGGER', new=_logger) @@ -2478,6 +2504,12 @@ async def is_valid_traffic_type(*_): @pytest.mark.asyncio async def test_get_treatments(self, mocker): """Test getTreatments() method.""" + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + split_mock = mocker.Mock(spec=Split) default_treatment_mock = mocker.PropertyMock() default_treatment_mock.return_value = 'default_treatment' @@ -2519,6 +2551,8 @@ async def fetch_many_rbs(*_): }, mocker.Mock(), recorder, + internal_events_queue, + events_manager, impmanager, telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -2529,7 +2563,7 @@ async def fetch_many_rbs(*_): ready_mock.return_value = True type(factory).ready = ready_mock - client = ClientAsync(factory, recorder, mocker.Mock(), FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, mocker.Mock(), FallbackTreatmentCalculator(None)) async def record_treatment_stats(*_): pass client._recorder.record_treatment_stats = record_treatment_stats @@ -2643,6 +2677,12 @@ async def fetch_many(*_): @pytest.mark.asyncio async def test_get_treatments_with_config(self, mocker): """Test getTreatments() method.""" + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + split_mock = mocker.Mock(spec=Split) default_treatment_mock = mocker.PropertyMock() default_treatment_mock.return_value = 'default_treatment' @@ -2684,6 +2724,8 @@ async def fetch_many_rbs(*_): }, mocker.Mock(), recorder, + internal_events_queue, + events_manager, impmanager, telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -2696,7 +2738,7 @@ def _configs(treatment): return '{"some": "property"}' if treatment == 'default_treatment' else None split_mock.get_configurations_for.side_effect = _configs - client = ClientAsync(factory, mocker.Mock(), mocker.Mock(), FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, mocker.Mock(), events_manager, mocker.Mock(), FallbackTreatmentCalculator(None)) async def record_treatment_stats(*_): pass client._recorder.record_treatment_stats = record_treatment_stats @@ -2808,6 +2850,12 @@ async def fetch_many(*_): @pytest.mark.asyncio async def test_get_treatments_by_flag_set(self, mocker): + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + split_mock = mocker.Mock(spec=Split) default_treatment_mock = mocker.PropertyMock() default_treatment_mock.return_value = 'default_treatment' @@ -2852,6 +2900,8 @@ async def fetch_many_rbs(*_): }, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -2862,7 +2912,7 @@ async def fetch_many_rbs(*_): ready_mock.return_value = True type(factory).ready = ready_mock - client = ClientAsync(factory, recorder, mocker.Mock(), FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, mocker.Mock(), FallbackTreatmentCalculator(None)) async def record_treatment_stats(*_): pass client._recorder.record_treatment_stats = record_treatment_stats @@ -2954,6 +3004,12 @@ async def get_feature_flags_by_sets(*_): @pytest.mark.asyncio async def test_get_treatments_by_flag_sets(self, mocker): + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + split_mock = mocker.Mock(spec=Split) default_treatment_mock = mocker.PropertyMock() default_treatment_mock.return_value = 'default_treatment' @@ -2999,6 +3055,8 @@ async def get_feature_flags_by_sets(*_): }, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -3009,7 +3067,7 @@ async def get_feature_flags_by_sets(*_): ready_mock.return_value = True type(factory).ready = ready_mock - client = ClientAsync(factory, recorder, mocker.Mock(), FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, mocker.Mock(), FallbackTreatmentCalculator(None)) async def record_treatment_stats(*_): pass client._recorder.record_treatment_stats = record_treatment_stats @@ -3107,6 +3165,12 @@ async def get_feature_flags_by_sets(*_): @pytest.mark.asyncio async def test_get_treatments_with_config_by_flag_set(self, mocker): + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + split_mock = mocker.Mock(spec=Split) def _configs(treatment): return '{"some": "property"}' if treatment == 'default_treatment' else None @@ -3155,6 +3219,8 @@ async def get_feature_flags_by_sets(*_): }, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -3165,7 +3231,7 @@ async def get_feature_flags_by_sets(*_): ready_mock.return_value = True type(factory).ready = ready_mock - client = ClientAsync(factory, recorder, mocker.Mock(), FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, mocker.Mock(), FallbackTreatmentCalculator(None)) async def record_treatment_stats(*_): pass client._recorder.record_treatment_stats = record_treatment_stats @@ -3257,6 +3323,12 @@ async def get_feature_flags_by_sets(*_): @pytest.mark.asyncio async def test_get_treatments_with_config_by_flag_sets(self, mocker): + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + split_mock = mocker.Mock(spec=Split) def _configs(treatment): return '{"some": "property"}' if treatment == 'default_treatment' else None @@ -3305,6 +3377,8 @@ async def get_feature_flags_by_sets(*_): }, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), mocker.Mock(), telemetry_producer, @@ -3316,7 +3390,7 @@ async def get_feature_flags_by_sets(*_): ready_mock.return_value = True type(factory).ready = ready_mock - client = ClientAsync(factory, recorder, mocker.Mock(), FallbackTreatmentCalculator(None)) + client = ClientAsync(factory, recorder, events_manager, mocker.Mock(), FallbackTreatmentCalculator(None)) async def record_treatment_stats(*_): pass client._recorder.record_treatment_stats = record_treatment_stats @@ -3522,6 +3596,12 @@ class ManagerInputValidationAsyncTests(object): #pylint: disable=too-few-public @pytest.mark.asyncio async def test_split_(self, mocker): """Test split input validation.""" + internal_events_queue = asyncio.Queue() + events_manager = mocker.Mock(EventsManagerAsync) + async def notify_internal_event(sdk_internal_event, event_metadata): + pass + events_manager.notify_internal_event = notify_internal_event + storage_mock = mocker.Mock(spec=SplitStorage) split_mock = mocker.Mock(spec=Split) async def get(*_): @@ -3543,6 +3623,8 @@ async def get(*_): }, mocker.Mock(), recorder, + internal_events_queue, + events_manager, mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), diff --git a/tests/client/test_manager.py b/tests/client/test_manager.py index 5cb0d2e1..c5454f67 100644 --- a/tests/client/test_manager.py +++ b/tests/client/test_manager.py @@ -1,6 +1,7 @@ """SDK main manager test module.""" import pytest import queue +import asyncio from splitio.client.factory import SplitFactory from splitio.client.manager import SplitManager, SplitManagerAsync, _LOGGER as _logger @@ -90,9 +91,10 @@ class SplitManagerAsyncTests(object): # pylint: disable=too-few-public-methods @pytest.mark.asyncio async def test_manager_calls(self, mocker): + internal_events_queue = asyncio.Queue() telemetry_storage = InMemoryTelemetryStorageAsync() telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) - storage = InMemorySplitStorageAsync() + storage = InMemorySplitStorageAsync(internal_events_queue) factory = mocker.Mock(spec=SplitFactory) factory._storages = {'split': storage} diff --git a/tests/engine/test_evaluator.py b/tests/engine/test_evaluator.py index dc83cc36..edf510c0 100644 --- a/tests/engine/test_evaluator.py +++ b/tests/engine/test_evaluator.py @@ -5,6 +5,7 @@ import pytest import copy import queue +import asyncio from splitio.models.splits import Split, Status, from_raw, Prerequisites from splitio.models import segments @@ -425,9 +426,11 @@ def test_evaluate_treatment_with_fallback(self, mocker): @pytest.mark.asyncio async def test_evaluate_treatment_with_rbs_in_condition_async(self): e = evaluator.Evaluator(splitters.Splitter()) - splits_storage = InMemorySplitStorageAsync() - rbs_storage = InMemoryRuleBasedSegmentStorageAsync() - segment_storage = InMemorySegmentStorageAsync() + internal_events_queue = asyncio.Queue() + + splits_storage = InMemorySplitStorageAsync(internal_events_queue) + rbs_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) evaluation_facctory = AsyncEvaluationDataFactory(splits_storage, segment_storage, rbs_storage) rbs_segments = os.path.join(os.path.dirname(__file__), 'files', 'rule_base_segments.json') @@ -451,9 +454,10 @@ async def test_using_segment_in_excluded_async(self): with open(rbs_segments, 'r') as flo: data = json.loads(flo.read()) e = evaluator.Evaluator(splitters.Splitter()) - splits_storage = InMemorySplitStorageAsync() - rbs_storage = InMemoryRuleBasedSegmentStorageAsync() - segment_storage = InMemorySegmentStorageAsync() + internal_events_queue = asyncio.Queue() + splits_storage = InMemorySplitStorageAsync(internal_events_queue) + rbs_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) evaluation_facctory = AsyncEvaluationDataFactory(splits_storage, segment_storage, rbs_storage) mocked_split = Split('some', 12345, False, 'off', 'user', Status.ACTIVE, 12, split_conditions, 1.2, 100, 1234, {}, None, False) @@ -476,9 +480,10 @@ async def test_using_rbs_in_excluded_async(self): with open(rbs_segments, 'r') as flo: data = json.loads(flo.read()) e = evaluator.Evaluator(splitters.Splitter()) - splits_storage = InMemorySplitStorageAsync() - rbs_storage = InMemoryRuleBasedSegmentStorageAsync() - segment_storage = InMemorySegmentStorageAsync() + internal_events_queue = asyncio.Queue() + splits_storage = InMemorySplitStorageAsync(internal_events_queue) + rbs_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) evaluation_facctory = AsyncEvaluationDataFactory(splits_storage, segment_storage, rbs_storage) mocked_split = Split('some', 12345, False, 'off', 'user', Status.ACTIVE, 12, split_conditions, 1.2, 100, 1234, {}, None, False) @@ -500,9 +505,10 @@ async def test_prerequisites(self): with open(splits_load, 'r') as flo: data = json.loads(flo.read()) e = evaluator.Evaluator(splitters.Splitter()) - splits_storage = InMemorySplitStorageAsync() - rbs_storage = InMemoryRuleBasedSegmentStorageAsync() - segment_storage = InMemorySegmentStorageAsync() + internal_events_queue = asyncio.Queue() + splits_storage = InMemorySplitStorageAsync(internal_events_queue) + rbs_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) evaluation_facctory = AsyncEvaluationDataFactory(splits_storage, segment_storage, rbs_storage) rbs = rule_based_segments.from_raw(data["rbs"]["d"][0]) @@ -590,9 +596,10 @@ async def test_get_context(self): """Test context.""" mocked_split = Split('some', 12345, False, 'off', 'user', Status.ACTIVE, 12, split_conditions, 1.2, 100, 1234, {}, None, False, [Prerequisites('split2', ['on'])]) split2 = Split('split2', 12345, False, 'off', 'user', Status.ACTIVE, 12, split_conditions, 1.2, 100, 1234, {}, None, False, []) - flag_storage = InMemorySplitStorageAsync([]) - segment_storage = InMemorySegmentStorageAsync() - rbs_segment_storage = InMemoryRuleBasedSegmentStorageAsync() + internal_events_queue = asyncio.Queue() + flag_storage = InMemorySplitStorageAsync(internal_events_queue, []) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) + rbs_segment_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) await flag_storage.update([mocked_split, split2], [], -1) rbs = copy.deepcopy(rbs_raw) rbs['conditions'].append( diff --git a/tests/events/test_events_delivery.py b/tests/events/test_events_delivery.py index fc2d5464..27076de4 100644 --- a/tests/events/test_events_delivery.py +++ b/tests/events/test_events_delivery.py @@ -1,4 +1,6 @@ """EventsManager test module.""" +import pytest + from splitio.models.events import SdkEvent, SdkInternalEvent from splitio.events.events_metadata import EventsMetadata from splitio.events.events_delivery import EventsDelivery @@ -17,11 +19,26 @@ def test_firing_events(self): events_delivery.deliver(SdkEvent.SDK_READY, metadata, self._sdk_ready_callback) assert self.sdk_ready_flag self._verify_metadata(metadata) + + @pytest.mark.asyncio + async def test_firing_events(self): + events_delivery = EventsDelivery() + + metadata = EventsMetadata(SdkEventType.FLAG_UPDATE, { "feature1" }) + self.sdk_ready_flag = False + self.metadata = None + await events_delivery.deliver_async(SdkEvent.SDK_READY, metadata, self._sdk_ready_callback_async) + assert self.sdk_ready_flag + self._verify_metadata(metadata) def _sdk_ready_callback(self, metadata): self.sdk_ready_flag = True self.metadata = metadata + async def _sdk_ready_callback_async(self, metadata): + self.sdk_ready_flag = True + self.metadata = metadata + def _verify_metadata(self, metadata): assert metadata.get_type() == self.metadata.get_type() assert metadata.get_names() == self.metadata.get_names() \ No newline at end of file diff --git a/tests/events/test_events_manager.py b/tests/events/test_events_manager.py index 48c6fa45..35cf6161 100644 --- a/tests/events/test_events_manager.py +++ b/tests/events/test_events_manager.py @@ -1,10 +1,12 @@ """EventsManager test module.""" import pytest +import asyncio + from splitio.models.events import SdkEvent, SdkInternalEvent from splitio.events.events_metadata import EventsMetadata from splitio.events.events_manager_config import EventsManagerConfig from splitio.events.events_delivery import EventsDelivery -from splitio.events.events_manager import EventsManager +from splitio.events.events_manager import EventsManager, EventsManagerAsync from splitio.events.events_metadata import SdkEventType class EventsManagerTests(object): @@ -95,6 +97,105 @@ def _sdk_timeout_callback(self, metadata): self.sdk_timed_out_flag = True self.metadata = metadata + def _verify_metadata(self, metadata): + assert metadata.get_type() == self.metadata.get_type() + assert metadata.get_names() == self.metadata.get_names() + +class EventsManagerAsyncTests(object): + """Tests for EventsManagerAsync.""" + + sdk_ready_flag = False + sdk_timed_out_flag = False + sdk_update_flag = False + metadata = None + + @pytest.mark.asyncio + async def test_firing_events(self): + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) + await events_manager.register(SdkEvent.SDK_READY, self._sdk_ready_callback) + await events_manager.register(SdkEvent.SDK_UPDATE, self._sdk_update_callback) + + metadata = EventsMetadata(SdkEventType.FLAG_UPDATE, { "feature1" }) + await events_manager.notify_internal_event(SdkInternalEvent.FLAGS_UPDATED, metadata) + await events_manager.notify_internal_event(SdkInternalEvent.FLAG_KILLED_NOTIFICATION, metadata) + await events_manager.notify_internal_event(SdkInternalEvent.RB_SEGMENTS_UPDATED, metadata) + await events_manager.notify_internal_event(SdkInternalEvent.SEGMENTS_UPDATED, metadata) + assert not self.sdk_ready_flag + assert not self.sdk_timed_out_flag + assert not self.sdk_update_flag + + self._reset_flags() + await events_manager.notify_internal_event(SdkInternalEvent.SDK_TIMED_OUT, metadata) + assert not self.sdk_ready_flag + assert not self.sdk_timed_out_flag # not registered yet + assert not self.sdk_update_flag + + await events_manager.register(SdkEvent.SDK_READY_TIMED_OUT, self._sdk_timeout_callback) + await events_manager.notify_internal_event(SdkInternalEvent.SDK_TIMED_OUT, metadata) + await asyncio.sleep(.3) + assert not self.sdk_ready_flag + assert self.sdk_timed_out_flag + assert not self.sdk_update_flag + self._verify_metadata(metadata) + + self._reset_flags() + await events_manager.notify_internal_event(SdkInternalEvent.SDK_READY, metadata) + await asyncio.sleep(.3) + assert self.sdk_ready_flag + assert not self.sdk_timed_out_flag + assert not self.sdk_update_flag + self._verify_metadata(metadata) + + self._reset_flags() + await events_manager.notify_internal_event(SdkInternalEvent.RB_SEGMENTS_UPDATED, metadata) + await asyncio.sleep(.3) + assert not self.sdk_ready_flag + assert not self.sdk_timed_out_flag + assert self.sdk_update_flag + self._verify_metadata(metadata) + + self._reset_flags() + await events_manager.notify_internal_event(SdkInternalEvent.FLAG_KILLED_NOTIFICATION, metadata) + await asyncio.sleep(.3) + assert not self.sdk_ready_flag + assert not self.sdk_timed_out_flag + assert self.sdk_update_flag + self._verify_metadata(metadata) + + self._reset_flags() + await events_manager.notify_internal_event(SdkInternalEvent.FLAGS_UPDATED, metadata) + await asyncio.sleep(.3) + assert not self.sdk_ready_flag + assert not self.sdk_timed_out_flag + assert self.sdk_update_flag + self._verify_metadata(metadata) + + self._reset_flags() + await events_manager.notify_internal_event(SdkInternalEvent.SEGMENTS_UPDATED, metadata) + await asyncio.sleep(.3) + assert not self.sdk_ready_flag + assert not self.sdk_timed_out_flag + assert self.sdk_update_flag + self._verify_metadata(metadata) + + def _reset_flags(self): + self.sdk_ready_flag = False + self.sdk_timed_out_flag = False + self.sdk_update_flag = False + self.metadata = None + + async def _sdk_ready_callback(self, metadata): + self.sdk_ready_flag = True + self.metadata = metadata + + async def _sdk_update_callback(self, metadata): + self.sdk_update_flag = True + self.metadata = metadata + + async def _sdk_timeout_callback(self, metadata): + self.sdk_timed_out_flag = True + self.metadata = metadata + def _verify_metadata(self, metadata): assert metadata.get_type() == self.metadata.get_type() assert metadata.get_names() == self.metadata.get_names() \ No newline at end of file diff --git a/tests/events/test_events_task.py b/tests/events/test_events_task.py index 17d23bec..d667f76c 100644 --- a/tests/events/test_events_task.py +++ b/tests/events/test_events_task.py @@ -2,16 +2,17 @@ import pytest import queue import time +import asyncio from splitio.models.events import SdkInternalEvent from splitio.models.notification import SdkInternalEventNotification from splitio.events.events_metadata import EventsMetadata from splitio.events.events_metadata import SdkEventType -from splitio.events.events_task import EventsTask +from splitio.events.events_task import EventsTask, EventsTaskAsync class EventsTaskTests(object): - """Tests for EventsManager.""" + """Tests for EventsTask.""" internal_event = None metadata = None @@ -71,4 +72,68 @@ def _verify_metadata(self, metadata): assert metadata.get_type() == self.metadata.get_type() assert metadata.get_names() == self.metadata.get_names() + +class EventsTaskAsyncTests(object): + """Tests for EventsTaskAsyncr.""" + + internal_event = None + metadata = None + + @pytest.mark.asyncio + async def test_firing_events(self): + events_queue = asyncio.Queue() + events_task = EventsTaskAsync(self._event_callback, events_queue) + + events_task.start() + assert events_task.is_running() + + metadata = EventsMetadata(SdkEventType.FLAG_UPDATE, { "feature1" }) + await events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_READY, metadata)) + await asyncio.sleep(.5) + assert self.internal_event == SdkInternalEvent.SDK_READY + self._verify_metadata(metadata) + + self._reset_flags() + await events_queue.put(SdkInternalEventNotification(SdkInternalEvent.RB_SEGMENTS_UPDATED, metadata)) + await asyncio.sleep(.5) + assert self.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED + self._verify_metadata(metadata) + + await events_task.stop() + await asyncio.sleep(.5) + assert not events_task.is_running() + + @pytest.mark.asyncio + async def test_on_error(self): + events_queue = asyncio.Queue() + + async def handler_sync(internal_event, metadata): + raise Exception('some') + + events_task = EventsTaskAsync(handler_sync, events_queue) + events_task.start() + assert events_task.is_running() + + await events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_READY, None)) + + with pytest.raises(Exception): + events_task._handler() + + assert events_task.is_running() + await events_task.stop() + await asyncio.sleep(1) + assert not events_task.is_running() + + def _reset_flags(self): + self.internal_event = None + self.metadata = None + + async def _event_callback(self, internal_event, metadata): + self.internal_event = internal_event + self.metadata = metadata + + def _verify_metadata(self, metadata): + assert metadata.get_type() == self.metadata.get_type() + assert metadata.get_names() == self.metadata.get_names() + \ No newline at end of file diff --git a/tests/integration/test_client_e2e.py b/tests/integration/test_client_e2e.py index 0b2fe70f..c243951f 100644 --- a/tests/integration/test_client_e2e.py +++ b/tests/integration/test_client_e2e.py @@ -24,9 +24,9 @@ from splitio.engine.impressions.manager import Counter as ImpressionsCounter from splitio.engine.impressions.unique_keys_tracker import UniqueKeysTracker, UniqueKeysTrackerAsync from splitio.events.events_delivery import EventsDelivery -from splitio.events.events_manager import EventsManager +from splitio.events.events_manager import EventsManager, EventsManagerAsync from splitio.events.events_manager_config import EventsManagerConfig -from splitio.events.events_task import EventsTask +from splitio.events.events_task import EventsTask, EventsTaskAsync from splitio.models import splits, segments, rule_based_segments from splitio.models.events import SdkEvent from splitio.models.fallback_config import FallbackTreatmentsConfiguration, FallbackTreatmentCalculator @@ -2608,9 +2608,12 @@ def setup_method(self): async def _setup_method(self): """Prepare storages with test data.""" - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() - rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync() + internal_events_queue = asyncio.Queue() + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) + + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) split_fn = os.path.join(os.path.dirname(__file__), 'files', 'splitChanges.json') with open(split_fn, 'r') as flo: @@ -2651,6 +2654,8 @@ async def _setup_method(self): storages, True, recorder, + internal_events_queue, + events_manager, None, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), @@ -2780,9 +2785,12 @@ def setup_method(self): async def _setup_method(self): """Prepare storages with test data.""" - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() - rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync() + internal_events_queue = asyncio.Queue() + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) + + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) split_fn = os.path.join(os.path.dirname(__file__), 'files', 'splitChanges.json') with open(split_fn, 'r') as flo: data = json.loads(flo.read()) @@ -2823,6 +2831,8 @@ async def _setup_method(self): storages, True, recorder, + internal_events_queue, + events_manager, None, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), @@ -3125,6 +3135,9 @@ def setup_method(self): async def _setup_method(self): """Prepare storages with test data.""" + internal_events_queue = asyncio.Queue() + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) + metadata = SdkMetadata('python-1.2.3', 'some_ip', 'some_name') redis_client = await build_async(DEFAULT_CONFIG.copy()) await self._clear_cache(redis_client) @@ -3177,6 +3190,8 @@ async def _setup_method(self): storages, True, recorder, + internal_events_queue, + events_manager, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), telemetry_submitter=telemetry_submitter, @@ -3348,6 +3363,9 @@ def setup_method(self): async def _setup_method(self): """Prepare storages with test data.""" + internal_events_queue = asyncio.Queue() + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) + metadata = SdkMetadata('python-1.2.3', 'some_ip', 'some_name') redis_client = await build_async(DEFAULT_CONFIG.copy()) await self._clear_cache(redis_client) @@ -3400,6 +3418,8 @@ async def _setup_method(self): storages, True, recorder, + internal_events_queue, + events_manager, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), telemetry_submitter=telemetry_submitter, @@ -3621,6 +3641,9 @@ def setup_method(self): async def _setup_method(self): """Prepare storages with test data.""" + internal_events_queue = asyncio.Queue() + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) + metadata = SdkMetadata('python-1.2.3', 'some_ip', 'some_name') self.pluggable_storage_adapter = StorageMockAdapterAsync() split_storage = PluggableSplitStorageAsync(self.pluggable_storage_adapter, 'myprefix') @@ -3651,6 +3674,8 @@ async def _setup_method(self): storages, True, recorder, + internal_events_queue, + events_manager, RedisManagerAsync(PluggableSynchronizerAsync()), telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), @@ -3850,6 +3875,9 @@ def setup_method(self): async def _setup_method(self): """Prepare storages with test data.""" + internal_events_queue = asyncio.Queue() + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) + metadata = SdkMetadata('python-1.2.3', 'some_ip', 'some_name') self.pluggable_storage_adapter = StorageMockAdapterAsync() split_storage = PluggableSplitStorageAsync(self.pluggable_storage_adapter) @@ -3881,6 +3909,8 @@ async def _setup_method(self): storages, True, recorder, + internal_events_queue, + events_manager, RedisManagerAsync(PluggableSynchronizerAsync()), telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), @@ -4066,6 +4096,9 @@ def setup_method(self): async def _setup_method(self): """Prepare storages with test data.""" + internal_events_queue = asyncio.Queue() + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) + metadata = SdkMetadata('python-1.2.3', 'some_ip', 'some_name') self.pluggable_storage_adapter = StorageMockAdapterAsync() split_storage = PluggableSplitStorageAsync(self.pluggable_storage_adapter) @@ -4117,6 +4150,8 @@ async def _setup_method(self): storages, True, recorder, + internal_events_queue, + events_manager, manager, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), @@ -4303,8 +4338,11 @@ class InMemoryImpressionsToggleIntegrationAsyncTests(object): @pytest.mark.asyncio async def test_optimized(self): - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() + internal_events_queue = asyncio.Queue() + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) + + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) await split_storage.update([splits.from_raw(splits_json['splitChange1_1']['ff']['d'][0]), splits.from_raw(splits_json['splitChange1_1']['ff']['d'][1]), @@ -4319,7 +4357,7 @@ async def test_optimized(self): storages = { 'splits': split_storage, 'segments': segment_storage, - 'rule_based_segments': InMemoryRuleBasedSegmentStorageAsync(), + 'rule_based_segments': InMemoryRuleBasedSegmentStorageAsync(internal_events_queue), 'impressions': InMemoryImpressionStorageAsync(5000, telemetry_runtime_producer), 'events': InMemoryEventStorageAsync(5000, telemetry_runtime_producer), } @@ -4331,6 +4369,8 @@ async def test_optimized(self): storages, True, recorder, + internal_events_queue, + events_manager, None, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), @@ -4364,8 +4404,11 @@ async def test_optimized(self): @pytest.mark.asyncio async def test_debug(self): - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() + internal_events_queue = asyncio.Queue() + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) + + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) await split_storage.update([splits.from_raw(splits_json['splitChange1_1']['ff']['d'][0]), splits.from_raw(splits_json['splitChange1_1']['ff']['d'][1]), @@ -4380,7 +4423,7 @@ async def test_debug(self): storages = { 'splits': split_storage, 'segments': segment_storage, - 'rule_based_segments': InMemoryRuleBasedSegmentStorageAsync(), + 'rule_based_segments': InMemoryRuleBasedSegmentStorageAsync(internal_events_queue), 'impressions': InMemoryImpressionStorageAsync(5000, telemetry_runtime_producer), 'events': InMemoryEventStorageAsync(5000, telemetry_runtime_producer), } @@ -4392,6 +4435,8 @@ async def test_debug(self): storages, True, recorder, + internal_events_queue, + events_manager, None, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), @@ -4425,8 +4470,11 @@ async def test_debug(self): @pytest.mark.asyncio async def test_none(self): - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() + internal_events_queue = asyncio.Queue() + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) + + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) await split_storage.update([splits.from_raw(splits_json['splitChange1_1']['ff']['d'][0]), splits.from_raw(splits_json['splitChange1_1']['ff']['d'][1]), @@ -4441,7 +4489,7 @@ async def test_none(self): storages = { 'splits': split_storage, 'segments': segment_storage, - 'rule_based_segments': InMemoryRuleBasedSegmentStorageAsync(), + 'rule_based_segments': InMemoryRuleBasedSegmentStorageAsync(internal_events_queue), 'impressions': InMemoryImpressionStorageAsync(5000, telemetry_runtime_producer), 'events': InMemoryEventStorageAsync(5000, telemetry_runtime_producer), } @@ -4453,6 +4501,8 @@ async def test_none(self): storages, True, recorder, + internal_events_queue, + events_manager, None, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), @@ -4491,6 +4541,9 @@ class RedisImpressionsToggleIntegrationAsyncTests(object): @pytest.mark.asyncio async def test_optimized(self): + internal_events_queue = asyncio.Queue() + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) + """Prepare storages with test data.""" metadata = SdkMetadata('python-1.2.3', 'some_ip', 'some_name') redis_client = await build_async(DEFAULT_CONFIG.copy()) @@ -4522,6 +4575,8 @@ async def test_optimized(self): storages, True, recorder, + internal_events_queue, + events_manager, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), fallback_treatment_calculator=FallbackTreatmentCalculator(None) @@ -4562,6 +4617,9 @@ async def test_optimized(self): @pytest.mark.asyncio async def test_debug(self): """Prepare storages with test data.""" + internal_events_queue = asyncio.Queue() + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) + metadata = SdkMetadata('python-1.2.3', 'some_ip', 'some_name') redis_client = await build_async(DEFAULT_CONFIG.copy()) split_storage = RedisSplitStorageAsync(redis_client, True) @@ -4592,6 +4650,8 @@ async def test_debug(self): storages, True, recorder, + internal_events_queue, + events_manager, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), fallback_treatment_calculator=FallbackTreatmentCalculator(None) @@ -4632,6 +4692,9 @@ async def test_debug(self): @pytest.mark.asyncio async def test_none(self): """Prepare storages with test data.""" + internal_events_queue = asyncio.Queue() + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) + metadata = SdkMetadata('python-1.2.3', 'some_ip', 'some_name') redis_client = await build_async(DEFAULT_CONFIG.copy()) split_storage = RedisSplitStorageAsync(redis_client, True) @@ -4662,6 +4725,8 @@ async def test_none(self): storages, True, recorder, + internal_events_queue, + events_manager, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), fallback_treatment_calculator=FallbackTreatmentCalculator(None) diff --git a/tests/push/test_split_worker.py b/tests/push/test_split_worker.py index 198372a7..28b5408d 100644 --- a/tests/push/test_split_worker.py +++ b/tests/push/test_split_worker.py @@ -523,8 +523,9 @@ async def update(feature_flag_add, feature_flag_delete, change_number): @pytest.mark.asyncio async def test_fetch_segment(self, mocker): q = asyncio.Queue() - split_storage = InMemorySplitStorageAsync() - segment_storage = InMemorySegmentStorageAsync() + internal_events_queue = asyncio.Queue() + split_storage = InMemorySplitStorageAsync(internal_events_queue) + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) self.segment_name = None async def segment_handler_sync(segment_name, change_number): diff --git a/tests/storage/test_inmemory_storage.py b/tests/storage/test_inmemory_storage.py index a37a1a4d..354da30e 100644 --- a/tests/storage/test_inmemory_storage.py +++ b/tests/storage/test_inmemory_storage.py @@ -4,6 +4,7 @@ import pytest import copy import queue +import asyncio from splitio.models.splits import Split from splitio.models.segments import Segment @@ -413,7 +414,7 @@ class InMemorySplitStorageAsyncTests(object): @pytest.mark.asyncio async def test_storing_retrieving_splits(self, mocker): """Test storing and retrieving splits works.""" - storage = InMemorySplitStorageAsync() + storage = InMemorySplitStorageAsync(asyncio.Queue()) split = mocker.Mock(spec=Split) name_property = mocker.PropertyMock() @@ -448,7 +449,7 @@ async def test_get_splits(self, mocker): type(split1).sets = sets_property type(split2).sets = sets_property - storage = InMemorySplitStorageAsync() + storage = InMemorySplitStorageAsync(asyncio.Queue()) await storage.update([split1, split2], [], -1) splits = await storage.fetch_many(['split1', 'split2', 'split3']) @@ -460,7 +461,7 @@ async def test_get_splits(self, mocker): @pytest.mark.asyncio async def test_store_get_changenumber(self): """Test that storing and retrieving change numbers works.""" - storage = InMemorySplitStorageAsync() + storage = InMemorySplitStorageAsync(asyncio.Queue()) assert await storage.get_change_number() == -1 await storage.update([], [], 5) assert await storage.get_change_number() == 5 @@ -481,7 +482,7 @@ async def test_get_split_names(self, mocker): type(split1).sets = sets_property type(split2).sets = sets_property - storage = InMemorySplitStorageAsync() + storage = InMemorySplitStorageAsync(asyncio.Queue()) await storage.update([split1, split2], [], -1) assert set(await storage.get_split_names()) == set(['split1', 'split2']) @@ -502,7 +503,7 @@ async def test_get_all_splits(self, mocker): type(split1).sets = sets_property type(split2).sets = sets_property - storage = InMemorySplitStorageAsync() + storage = InMemorySplitStorageAsync(asyncio.Queue()) await storage.update([split1, split2], [], -1) all_splits = await storage.get_all_splits() @@ -537,7 +538,7 @@ async def test_is_valid_traffic_type(self, mocker): type(split2).sets = sets_property type(split3).sets = sets_property - storage = InMemorySplitStorageAsync() + storage = InMemorySplitStorageAsync(asyncio.Queue()) await storage.update([split1], [], -1) assert await storage.is_valid_traffic_type('user') is True @@ -566,7 +567,7 @@ async def test_is_valid_traffic_type(self, mocker): @pytest.mark.asyncio async def test_traffic_type_inc_dec_logic(self, mocker): """Test that adding/removing split, handles traffic types correctly.""" - storage = InMemorySplitStorageAsync() + storage = InMemorySplitStorageAsync(asyncio.Queue()) split1 = mocker.Mock() name1_prop = mocker.PropertyMock() @@ -599,7 +600,7 @@ async def test_traffic_type_inc_dec_logic(self, mocker): @pytest.mark.asyncio async def test_kill_locally(self): """Test kill local.""" - storage = InMemorySplitStorageAsync() + storage = InMemorySplitStorageAsync(asyncio.Queue()) split = Split('some_split', 123456789, False, 'some', 'traffic_type', 'ACTIVE', 1) @@ -620,7 +621,7 @@ async def test_kill_locally(self): @pytest.mark.asyncio async def test_flag_sets_with_config_sets(self): - storage = InMemorySplitStorageAsync(['set10', 'set02', 'set05']) + storage = InMemorySplitStorageAsync(asyncio.Queue(), ['set10', 'set02', 'set05']) assert storage.flag_set_filter.flag_sets == {'set10', 'set02', 'set05'} assert storage.flag_set_filter.should_filter @@ -666,7 +667,7 @@ async def test_flag_sets_with_config_sets(self): @pytest.mark.asyncio async def test_flag_sets_withut_config_sets(self): - storage = InMemorySplitStorageAsync() + storage = InMemorySplitStorageAsync(asyncio.Queue()) assert storage.flag_set_filter.flag_sets == set({}) assert not storage.flag_set_filter.should_filter @@ -796,7 +797,7 @@ class InMemorySegmentStorageAsyncTests(object): @pytest.mark.asyncio async def test_segment_storage_retrieval(self, mocker): """Test storing and retrieving segments.""" - storage = InMemorySegmentStorageAsync() + storage = InMemorySegmentStorageAsync(asyncio.Queue()) segment = mocker.Mock(spec=Segment) name_property = mocker.PropertyMock() name_property.return_value = 'some_segment' @@ -809,14 +810,14 @@ async def test_segment_storage_retrieval(self, mocker): @pytest.mark.asyncio async def test_change_number(self, mocker): """Test storing and retrieving segment changeNumber.""" - storage = InMemorySegmentStorageAsync() + storage = InMemorySegmentStorageAsync(asyncio.Queue()) await storage.set_change_number('some_segment', 123) # Change number is not updated if segment doesn't exist assert await storage.get_change_number('some_segment') is None assert await storage.get_change_number('nonexistant-segment') is None # Change number is updated if segment does exist. - storage = InMemorySegmentStorageAsync() + storage = InMemorySegmentStorageAsync(asyncio.Queue()) segment = mocker.Mock(spec=Segment) name_property = mocker.PropertyMock() name_property.return_value = 'some_segment' @@ -828,7 +829,7 @@ async def test_change_number(self, mocker): @pytest.mark.asyncio async def test_segment_contains(self, mocker): """Test using storage to determine whether a key belongs to a segment.""" - storage = InMemorySegmentStorageAsync() + storage = InMemorySegmentStorageAsync(asyncio.Queue()) segment = mocker.Mock(spec=Segment) name_property = mocker.PropertyMock() name_property.return_value = 'some_segment' @@ -841,7 +842,7 @@ async def test_segment_contains(self, mocker): @pytest.mark.asyncio async def test_segment_update(self): """Test updating a segment.""" - storage = InMemorySegmentStorageAsync() + storage = InMemorySegmentStorageAsync(asyncio.Queue()) segment = Segment('some_segment', ['key1', 'key2', 'key3'], 123) await storage.put(segment) assert await storage.get('some_segment') == segment @@ -1973,7 +1974,7 @@ class InMemoryRuleBasedSegmentStorageAsyncTests(object): @pytest.mark.asyncio async def test_storing_retrieving_segments(self, mocker): """Test storing and retrieving splits works.""" - rbs_storage = InMemoryRuleBasedSegmentStorageAsync() + rbs_storage = InMemoryRuleBasedSegmentStorageAsync(asyncio.Queue()) segment1 = mocker.Mock(spec=RuleBasedSegment) name_property = mocker.PropertyMock() @@ -1996,7 +1997,7 @@ async def test_storing_retrieving_segments(self, mocker): @pytest.mark.asyncio async def test_store_get_changenumber(self): """Test that storing and retrieving change numbers works.""" - storage = InMemoryRuleBasedSegmentStorageAsync() + storage = InMemoryRuleBasedSegmentStorageAsync(asyncio.Queue()) assert await storage.get_change_number() == -1 await storage.update([], [], 5) assert await storage.get_change_number() == 5 @@ -2021,7 +2022,7 @@ async def test_contains(self): raw3 = copy.deepcopy(raw) raw3["name"] = "segment3" segment3 = rule_based_segments.from_raw(raw3) - storage = InMemoryRuleBasedSegmentStorageAsync() + storage = InMemoryRuleBasedSegmentStorageAsync(asyncio.Queue()) await storage.update([segment1, segment2, segment3], [], -1) assert await storage.contains(["segment1"]) assert await storage.contains(["segment1", "segment3"]) diff --git a/tests/sync/test_segments_synchronizer.py b/tests/sync/test_segments_synchronizer.py index a3657e98..5b405ef8 100644 --- a/tests/sync/test_segments_synchronizer.py +++ b/tests/sync/test_segments_synchronizer.py @@ -686,7 +686,7 @@ async def get_segment_names(): return ['segmentA', 'segmentB', 'segmentC'] split_storage.get_segment_names = get_segment_names - storage = InMemorySegmentStorageAsync() + storage = InMemorySegmentStorageAsync(asyncio.Queue()) segment_a = {'name': 'segmentA', 'added': ['key1', 'key2', 'key3'], 'removed': [], 'since': -1, 'till': 123} @@ -767,7 +767,7 @@ async def test_reading_json(self, mocker): async with aiofiles.open("./segmentA.json", "w") as f: await f.write('{"name": "segmentA", "added": ["key1", "key2", "key3"], "removed": [],"since": -1, "till": 123}') split_storage = mocker.Mock(spec=InMemorySplitStorageAsync) - storage = InMemorySegmentStorageAsync() + storage = InMemorySegmentStorageAsync(asyncio.Queue()) segments_synchronizer = LocalSegmentSynchronizerAsync('.', split_storage, storage) assert await segments_synchronizer.synchronize_segments(['segmentA']) diff --git a/tests/sync/test_splits_synchronizer.py b/tests/sync/test_splits_synchronizer.py index ca3daa82..b27606a4 100644 --- a/tests/sync/test_splits_synchronizer.py +++ b/tests/sync/test_splits_synchronizer.py @@ -792,8 +792,9 @@ async def clear(): @pytest.mark.asyncio async def test_sync_flag_sets_with_config_sets(self, mocker): """Test split sync with flag sets.""" - storage = InMemorySplitStorageAsync(['set1', 'set2']) - rbs_storage = InMemoryRuleBasedSegmentStorageAsync() + internal_events_queue = asyncio.Queue() + storage = InMemorySplitStorageAsync(internal_events_queue, ['set1', 'set2']) + rbs_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) split = self.splits[0].copy() split['name'] = 'second' @@ -840,8 +841,9 @@ async def get_changes(*args, **kwargs): @pytest.mark.asyncio async def test_sync_flag_sets_without_config_sets(self, mocker): """Test split sync with flag sets.""" - storage = InMemorySplitStorageAsync() - rbs_storage = InMemoryRuleBasedSegmentStorageAsync() + internal_events_queue = asyncio.Queue() + storage = InMemorySplitStorageAsync(internal_events_queue) + rbs_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) split = self.splits[0].copy() split['name'] = 'second' splits1 = [self.splits[0].copy(), split] @@ -1261,8 +1263,9 @@ async def test_synchronize_splits_error(self, mocker): @pytest.mark.asyncio async def test_synchronize_splits(self, mocker): """Test split sync.""" - storage = InMemorySplitStorageAsync() - rbs_storage = InMemoryRuleBasedSegmentStorageAsync() + internal_events_queue = asyncio.Queue() + storage = InMemorySplitStorageAsync(internal_events_queue) + rbs_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) async def read_splits_from_json_file(*args, **kwargs): return self.payload @@ -1306,8 +1309,9 @@ async def read_splits_from_json_file(*args, **kwargs): @pytest.mark.asyncio async def test_sync_flag_sets_with_config_sets(self, mocker): """Test split sync with flag sets.""" - storage = InMemorySplitStorageAsync(['set1', 'set2']) - rbs_storage = InMemoryRuleBasedSegmentStorageAsync() + internal_events_queue = asyncio.Queue() + storage = InMemorySplitStorageAsync(internal_events_queue, ['set1', 'set2']) + rbs_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) split = self.payload["ff"]["d"][0].copy() split['name'] = 'second' @@ -1349,8 +1353,9 @@ async def read_feature_flags_from_json_file(*args, **kwargs): @pytest.mark.asyncio async def test_sync_flag_sets_without_config_sets(self, mocker): """Test split sync with flag sets.""" - storage = InMemorySplitStorageAsync() - rbs_storage = InMemoryRuleBasedSegmentStorageAsync() + internal_events_queue = asyncio.Queue() + storage = InMemorySplitStorageAsync(internal_events_queue) + rbs_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) split = self.payload["ff"]["d"][0].copy() split['name'] = 'second' @@ -1393,8 +1398,9 @@ async def test_reading_json(self, mocker): """Test reading json file.""" async with aiofiles.open("./splits.json", "w") as f: await f.write(json.dumps(self.payload)) - storage = InMemorySplitStorageAsync() - rbs_storage = InMemoryRuleBasedSegmentStorageAsync() + internal_events_queue = asyncio.Queue() + storage = InMemorySplitStorageAsync(internal_events_queue) + rbs_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) split_synchronizer = LocalSplitSynchronizerAsync("./splits.json", storage, rbs_storage, LocalhostMode.JSON) await split_synchronizer.synchronize_splits() diff --git a/tests/sync/test_synchronizer.py b/tests/sync/test_synchronizer.py index 258077d4..179d7978 100644 --- a/tests/sync/test_synchronizer.py +++ b/tests/sync/test_synchronizer.py @@ -2,6 +2,7 @@ import unittest.mock as mock import pytest import queue +import asyncio from splitio.sync.synchronizer import Synchronizer, SynchronizerAsync, SplitTasks, SplitSynchronizers, LocalhostSynchronizer, LocalhostSynchronizerAsync, RedisSynchronizer, RedisSynchronizerAsync from splitio.tasks.split_sync import SplitSynchronizationTask, SplitSynchronizationTaskAsync @@ -502,8 +503,9 @@ async def get_segment_names_rbs(): @pytest.mark.asyncio async def test_synchronize_splits(self, mocker): - split_storage = InMemorySplitStorageAsync() - rbs_storage = InMemoryRuleBasedSegmentStorageAsync() + internal_events_queue = asyncio.Queue() + split_storage = InMemorySplitStorageAsync(internal_events_queue) + rbs_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) split_api = mocker.Mock() async def fetch_splits(change, rb, options): @@ -513,7 +515,7 @@ async def fetch_splits(change, rb, options): split_api.fetch_splits = fetch_splits split_sync = SplitSynchronizerAsync(split_api, split_storage, rbs_storage) - segment_storage = InMemorySegmentStorageAsync() + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) segment_api = mocker.Mock() async def get_change_number(): @@ -545,8 +547,9 @@ async def fetch_segment(segment_name, change, options): @pytest.mark.asyncio async def test_synchronize_splits_calling_segment_sync_once(self, mocker): - split_storage = InMemorySplitStorageAsync() - rbs_storage = InMemoryRuleBasedSegmentStorageAsync() + internal_events_queue = asyncio.Queue() + split_storage = InMemorySplitStorageAsync(internal_events_queue) + rbs_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) async def get_change_number(): return 123 split_storage.get_change_number = get_change_number @@ -580,8 +583,9 @@ async def segment_exist_in_storage(segment): @pytest.mark.asyncio async def test_sync_all(self, mocker): - split_storage = InMemorySplitStorageAsync() - rbs_storage = InMemoryRuleBasedSegmentStorageAsync() + internal_events_queue = asyncio.Queue() + split_storage = InMemorySplitStorageAsync(internal_events_queue) + rbs_storage = InMemoryRuleBasedSegmentStorageAsync(internal_events_queue) async def get_change_number(): return 123 split_storage.get_change_number = get_change_number @@ -612,7 +616,7 @@ async def fetch_splits(change, rb, options): split_api.fetch_splits = fetch_splits split_sync = SplitSynchronizerAsync(split_api, split_storage, rbs_storage) - segment_storage = InMemorySegmentStorageAsync() + segment_storage = InMemorySegmentStorageAsync(internal_events_queue) async def get_change_number(segment): return 123 segment_storage.get_change_number = get_change_number diff --git a/tests/sync/test_telemetry.py b/tests/sync/test_telemetry.py index 5b41b344..dd8119e2 100644 --- a/tests/sync/test_telemetry.py +++ b/tests/sync/test_telemetry.py @@ -2,6 +2,7 @@ import unittest.mock as mock import pytest import queue +import asyncio from splitio.sync.telemetry import TelemetrySynchronizer, TelemetrySynchronizerAsync, InMemoryTelemetrySubmitter, InMemoryTelemetrySubmitterAsync from splitio.engine.telemetry import TelemetryStorageConsumer, TelemetryStorageConsumerAsync @@ -184,9 +185,9 @@ async def test_synchronize_telemetry(self, mocker): api = mocker.Mock(spec=TelemetryAPI) telemetry_storage = await InMemoryTelemetryStorageAsync.create() telemetry_consumer = TelemetryStorageConsumerAsync(telemetry_storage) - split_storage = InMemorySplitStorageAsync() + split_storage = InMemorySplitStorageAsync(asyncio.Queue()) await split_storage.update([Split('split1', 1234, 1, False, 'user', Status.ACTIVE, 123)], [], -1) - segment_storage = InMemorySegmentStorageAsync() + segment_storage = InMemorySegmentStorageAsync(asyncio.Queue()) await segment_storage.put(Segment('segment1', [], 123)) telemetry_submitter = InMemoryTelemetrySubmitterAsync(telemetry_consumer, split_storage, segment_storage, api) diff --git a/tests/util/test_storage_helper.py b/tests/util/test_storage_helper.py index dc75caa0..60e83e8c 100644 --- a/tests/util/test_storage_helper.py +++ b/tests/util/test_storage_helper.py @@ -1,6 +1,7 @@ """Storage Helper tests.""" import pytest import queue +import asyncio from splitio.util.storage_helper import update_feature_flag_storage, get_valid_flag_sets, combine_valid_flag_sets, \ update_rule_based_segment_storage, update_rule_based_segment_storage_async, update_feature_flag_storage_async, \ @@ -201,7 +202,7 @@ def test_get_standard_segment_in_rbs_storage(self, mocker): @pytest.mark.asyncio async def test_get_standard_segment_in_rbs_storage(self, mocker): - storage = InMemoryRuleBasedSegmentStorageAsync() + storage = InMemoryRuleBasedSegmentStorageAsync(asyncio.Queue()) segments = await update_rule_based_segment_storage_async(storage, [self.rbs], 123) assert await get_standard_segment_names_in_rbs_storage_async(storage) == {'excluded_segment', 'employees'} From f2ad152ed3eb99ed3fa304378503ce82f0efcf50 Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Wed, 21 Jan 2026 12:50:24 -0800 Subject: [PATCH 2/3] finish tests --- splitio/events/events_task.py | 2 +- tests/client/test_factory.py | 106 +++++++++++++++- tests/integration/test_client_e2e.py | 168 +++++++++++++++++++++++++ tests/storage/test_inmemory_storage.py | 75 +++++++++++ tests/tasks/util/test_asynctask.py | 4 +- 5 files changed, 351 insertions(+), 4 deletions(-) diff --git a/splitio/events/events_task.py b/splitio/events/events_task.py index 8158dc04..3c7e34f3 100644 --- a/splitio/events/events_task.py +++ b/splitio/events/events_task.py @@ -133,7 +133,7 @@ def start(self): self._running = True _LOGGER.debug('Starting SDK Event Task worker') - asyncio.get_running_loop().create_task(self._run()) + asyncio.get_running_loop().create_task(self._run(), name="EventsTaskWorker") async def stop(self, stop_flag=None): """Stop worker.""" diff --git a/tests/client/test_factory.py b/tests/client/test_factory.py index 45e64c72..64da9541 100644 --- a/tests/client/test_factory.py +++ b/tests/client/test_factory.py @@ -1109,4 +1109,108 @@ async def _make_factory_with_apikey(apikey, *_, **__): await asyncio.sleep(0.5) assert factory.destroyed assert len(build_redis.mock_calls) == 2 - \ No newline at end of file + + @pytest.mark.asyncio + async def test_internal_ready_event_notification(self, mocker): + """Test that a client with in-memory storage is sending internal events correctly.""" + # Setup synchronizer + def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk_matadata, telemetry_runtime_producer, sse_url=None, client_key=None): + synchronizer = mocker.Mock(spec=SynchronizerAsync) + async def sync_all(*_): + return None + synchronizer.sync_all = sync_all + + def start_periodic_fetching(): + pass + synchronizer.start_periodic_fetching = start_periodic_fetching + + def start_periodic_data_recording(): + pass + synchronizer.start_periodic_data_recording = start_periodic_data_recording + + self._ready_flag = ready_flag + self._synchronizer = synchronizer + self._streaming_enabled = False + self._telemetry_runtime_producer = telemetry_runtime_producer + + mocker.patch('splitio.sync.manager.ManagerAsync.__init__', new=_split_synchronizer) + + async def synchronize_config(*_): + await asyncio.sleep(2) + pass + mocker.patch('splitio.sync.telemetry.InMemoryTelemetrySubmitterAsync.synchronize_config', new=synchronize_config) + + async def record_ready_time(*_): + pass + mocker.patch('splitio.models.telemetry.TelemetryConfigAsync.record_ready_time', new=record_ready_time) + + async def record_active_and_redundant_factories(*_): + pass + mocker.patch('splitio.models.telemetry.TelemetryConfigAsync.record_active_and_redundant_factories', new=record_active_and_redundant_factories) + + # Start factory and make assertions + factory = await get_factory_async('some_api_key', config={'streamingEmabled': False}) + for task in asyncio.all_tasks(): + if task.get_name() == "EventsTaskWorker": + task.cancel() + try: + await factory.block_until_ready(3) + except: + pass + await asyncio.sleep(.2) + event = await factory._internal_events_queue.get() + assert event.internal_event == SdkInternalEvent.SDK_READY + assert event.metadata == None + await factory.destroy() + + @pytest.mark.asyncio + async def test_internal_timeout_event_notification(self, mocker): + """Test that a client with in-memory storage is sending internal events correctly.""" + # Setup synchronizer + def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk_matadata, telemetry_runtime_producer, sse_url=None, client_key=None): + synchronizer = mocker.Mock(spec=SynchronizerAsync) + async def sync_all(*_): + return None + synchronizer.sync_all = sync_all + + def start_periodic_fetching(): + pass + synchronizer.start_periodic_fetching = start_periodic_fetching + + def start_periodic_data_recording(): + pass + synchronizer.start_periodic_data_recording = start_periodic_data_recording + + self._ready_flag = ready_flag + self._synchronizer = synchronizer + self._streaming_enabled = False + self._telemetry_runtime_producer = telemetry_runtime_producer + + mocker.patch('splitio.sync.manager.ManagerAsync.__init__', new=_split_synchronizer) + + async def synchronize_config(*_): + await asyncio.sleep(3) + pass + mocker.patch('splitio.sync.telemetry.InMemoryTelemetrySubmitterAsync.synchronize_config', new=synchronize_config) + + async def record_ready_time(*_): + pass + mocker.patch('splitio.models.telemetry.TelemetryConfigAsync.record_ready_time', new=record_ready_time) + + async def record_active_and_redundant_factories(*_): + pass + mocker.patch('splitio.models.telemetry.TelemetryConfigAsync.record_active_and_redundant_factories', new=record_active_and_redundant_factories) + + # Start factory and make assertions + factory = await get_factory_async('some_api_key', config={'streamingEmabled': False}) + for task in asyncio.all_tasks(): + if task.get_name() == "EventsTaskWorker": + task.cancel() + try: + await factory.block_until_ready(1) + except: + pass + event = await factory._internal_events_queue.get() + assert event.internal_event == SdkInternalEvent.SDK_TIMED_OUT + assert event.metadata == None + await factory.destroy() diff --git a/tests/integration/test_client_e2e.py b/tests/integration/test_client_e2e.py index c243951f..7181f141 100644 --- a/tests/integration/test_client_e2e.py +++ b/tests/integration/test_client_e2e.py @@ -2600,6 +2600,174 @@ def _ready_callback(self, metadata): def _timeout_callback(self, metadata): self.timeout_flag = True +class InMemoryEventsNotificationAsyncTests(object): + """Inmemory storage-based events notification tests.""" + + ready_flag = False + timeout_flag = False + + @pytest.mark.asyncio + async def test_sdk_timeout_fire(self): + """Prepare storages with test data.""" + factory2 = await get_factory_async('some_api_key') + client = factory2.client() + await client.on(SdkEvent.SDK_READY_TIMED_OUT, self._timeout_callback) + try: + await factory2.block_until_ready(1) + except Exception as e: + pass + + await asyncio.sleep(1) + assert self.timeout_flag + + """Shut down the factory.""" + await factory2.destroy() + + @pytest.mark.asyncio + async def test_sdk_ready(self): + """Prepare storages with test data.""" + events_queue = asyncio.Queue() + split_storage = InMemorySplitStorageAsync(events_queue) + segment_storage = InMemorySegmentStorageAsync(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(events_queue) + + split_fn = os.path.join(os.path.dirname(__file__), 'files', 'splitChanges.json') + with open(split_fn, 'r') as flo: + data = json.loads(flo.read()) + for split in data['ff']['d']: + await split_storage.update([splits.from_raw(split)], [], 0) + + for rbs in data['rbs']['d']: + await rb_segment_storage.update([rule_based_segments.from_raw(rbs)], [], 0) + + segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentEmployeesChanges.json') + with open(segment_fn, 'r') as flo: + data = json.loads(flo.read()) + await segment_storage.put(segments.from_raw(data)) + + segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentHumanBeignsChanges.json') + with open(segment_fn, 'r') as flo: + data = json.loads(flo.read()) + await segment_storage.put(segments.from_raw(data)) + + telemetry_storage = await InMemoryTelemetryStorageAsync.create() + telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer() + + storages = { + 'splits': split_storage, + 'segments': segment_storage, + 'rule_based_segments': rb_segment_storage, + 'impressions': InMemoryImpressionStorageAsync(5000, telemetry_runtime_producer), + 'events': InMemoryEventStorageAsync(5000, telemetry_runtime_producer), + } + impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener + recorder = StandardRecorderAsync(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, imp_counter=ImpressionsCounter()) + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) + internal_events_task = EventsTaskAsync(events_manager.notify_internal_event, events_queue) + + # Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception. + try: + factory = SplitFactoryAsync('some_api_key', + storages, + True, + recorder, + events_queue, + events_manager, + None, + telemetry_producer=telemetry_producer, + telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), + fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(None, {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')})) + ) # pylint:disable=attribute-defined-outside-init + internal_events_task.start() + except: + pass + + client = factory.client() + await client.on(SdkEvent.SDK_READY, self._ready_callback) + await factory.block_until_ready(5) + assert self.ready_flag + + """Shut down the factory.""" + await internal_events_task.stop() + await factory.destroy() + + @pytest.mark.asyncio + async def test_sdk_ready_fire_later(self): + """Prepare storages with test data.""" + events_queue = asyncio.Queue() + split_storage = InMemorySplitStorageAsync(events_queue) + segment_storage = InMemorySegmentStorageAsync(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(events_queue) + + split_fn = os.path.join(os.path.dirname(__file__), 'files', 'splitChanges.json') + with open(split_fn, 'r') as flo: + data = json.loads(flo.read()) + for split in data['ff']['d']: + await split_storage.update([splits.from_raw(split)], [], 0) + + for rbs in data['rbs']['d']: + await rb_segment_storage.update([rule_based_segments.from_raw(rbs)], [], 0) + + segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentEmployeesChanges.json') + with open(segment_fn, 'r') as flo: + data = json.loads(flo.read()) + await segment_storage.put(segments.from_raw(data)) + + segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentHumanBeignsChanges.json') + with open(segment_fn, 'r') as flo: + data = json.loads(flo.read()) + await segment_storage.put(segments.from_raw(data)) + + telemetry_storage = await InMemoryTelemetryStorageAsync.create() + telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer() + + storages = { + 'splits': split_storage, + 'segments': segment_storage, + 'rule_based_segments': rb_segment_storage, + 'impressions': InMemoryImpressionStorageAsync(5000, telemetry_runtime_producer), + 'events': InMemoryEventStorageAsync(5000, telemetry_runtime_producer), + } + impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener + recorder = StandardRecorderAsync(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, imp_counter=ImpressionsCounter()) + events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) + internal_events_task = EventsTaskAsync(events_manager.notify_internal_event, events_queue) + + # Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception. + try: + factory = SplitFactoryAsync('some_api_key', + storages, + True, + recorder, + events_queue, + events_manager, + None, + telemetry_producer=telemetry_producer, + telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), + fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(None, {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')})) + ) # pylint:disable=attribute-defined-outside-init + internal_events_task.start() + except: + pass + + client = factory.client() + await factory.block_until_ready(5) + await client.on(SdkEvent.SDK_READY, self._ready_callback) + + """Shut down the factory.""" + await internal_events_task.stop() + await factory.destroy() + + async def _ready_callback(self, metadata): + self.ready_flag = True + + async def _timeout_callback(self, metadata): + self.timeout_flag = True + class InMemoryIntegrationAsyncTests(object): """Inmemory storage-based integration tests.""" diff --git a/tests/storage/test_inmemory_storage.py b/tests/storage/test_inmemory_storage.py index 354da30e..0f830239 100644 --- a/tests/storage/test_inmemory_storage.py +++ b/tests/storage/test_inmemory_storage.py @@ -708,7 +708,36 @@ async def test_flag_sets_withut_config_sets(self): await storage.update([split3], [], 1) assert await storage.get_feature_flags_by_sets(['set05']) == ['split3'] assert await storage.get_feature_flags_by_sets(['set04', 'set05']) == ['split3'] + + @pytest.mark.asyncio + async def test_internal_event_notification(self, mocker): + """Test retrieving a list of all split names.""" + split1 = mocker.Mock() + name1_prop = mocker.PropertyMock() + name1_prop.return_value = 'split1' + type(split1).name = name1_prop + split2 = mocker.Mock() + name2_prop = mocker.PropertyMock() + name2_prop.return_value = 'split2' + type(split2).name = name2_prop + sets_property = mocker.PropertyMock() + sets_property.return_value = ['set_1'] + type(split1).sets = sets_property + type(split2).sets = sets_property + events_queue = asyncio.Queue() + storage = InMemorySplitStorageAsync(events_queue) + await storage.update([split1, split2], [], -1) + event = await events_queue.get() + assert event.internal_event == SdkInternalEvent.FLAGS_UPDATED + assert event.metadata.get_type() == SdkEventType.FLAG_UPDATE + assert event.metadata.get_names() == {'split1', 'split2'} + await storage.kill_locally('split1', 'default_treatment', 3) + event = await events_queue.get() + assert event.internal_event == SdkInternalEvent.FLAG_KILLED_NOTIFICATION + assert event.metadata.get_type() == SdkEventType.FLAG_UPDATE + assert event.metadata.get_names() == {'split1'} + class InMemorySegmentStorageTests(object): """In memory segment storage tests.""" @@ -855,6 +884,23 @@ async def test_segment_update(self): assert not await storage.segment_contains('some_segment', 'key3') assert await storage.get_change_number('some_segment') == 456 + @pytest.mark.asyncio + async def test_internal_event_notification(self): + """Test updating a segment.""" + events_queue = asyncio.Queue() + storage = InMemorySegmentStorageAsync(events_queue) + segment = Segment('some_segment', ['key1', 'key2', 'key3'], 123) + await storage.put(segment) + event = await events_queue.get() + assert event.internal_event == SdkInternalEvent.SEGMENTS_UPDATED + assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert len(event.metadata.get_names()) == 0 + + await storage.update('some_segment', ['key4', 'key5'], ['key2', 'key3'], 456) + event = await events_queue.get() + assert event.internal_event == SdkInternalEvent.SEGMENTS_UPDATED + assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert len(event.metadata.get_names()) == 0 class InMemoryImpressionsStorageTests(object): """InMemory impressions storage test cases.""" @@ -2027,3 +2073,32 @@ async def test_contains(self): assert await storage.contains(["segment1"]) assert await storage.contains(["segment1", "segment3"]) assert not await storage.contains(["segment5"]) + + @pytest.mark.asyncio + async def test_internal_event_notification(self, mocker): + """Test storing and retrieving splits works.""" + events_queue = asyncio.Queue() + rbs_storage = InMemoryRuleBasedSegmentStorageAsync(events_queue) + + segment1 = mocker.Mock(spec=RuleBasedSegment) + name_property = mocker.PropertyMock() + name_property.return_value = 'some_segment' + type(segment1).name = name_property + + segment2 = mocker.Mock() + name2_prop = mocker.PropertyMock() + name2_prop.return_value = 'segment2' + type(segment2).name = name2_prop + + await rbs_storage.update([segment1, segment2], [], -1) + event = await events_queue.get() + assert event.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED + assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert len(event.metadata.get_names()) == 0 + + await rbs_storage.update([], ['some_segment'], -1) + event = await events_queue.get() + assert event.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED + assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert len(event.metadata.get_names()) == 0 + diff --git a/tests/tasks/util/test_asynctask.py b/tests/tasks/util/test_asynctask.py index 690182ed..b587b9c5 100644 --- a/tests/tasks/util/test_asynctask.py +++ b/tests/tasks/util/test_asynctask.py @@ -92,7 +92,7 @@ def raise_exception(): task.stop(on_stop_event) on_stop_event.wait(1) - assert on_stop_event.isSet() + assert on_stop_event.is_set() assert on_init.mock_calls == [mocker.call()] assert on_stop.mock_calls == [mocker.call()] assert 9 <= len(main_func.mock_calls) <= 10 @@ -113,7 +113,7 @@ def test_force_run(self, mocker): task.stop(on_stop_event) on_stop_event.wait(1) - assert on_stop_event.isSet() + assert on_stop_event.is_set() assert on_init.mock_calls == [mocker.call()] assert on_stop.mock_calls == [mocker.call()] assert len(main_func.mock_calls) == 2 From e7f721aa83531ab88da690e37b59e2731f8ba182 Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Thu, 22 Jan 2026 09:57:45 -0800 Subject: [PATCH 3/3] fixed typo for segment event type --- splitio/client/factory.py | 22 ++++++++++++++-------- splitio/events/events_metadata.py | 2 +- splitio/storage/inmemmory.py | 12 ++++++------ tests/integration/test_streaming_e2e.py | 2 +- tests/storage/test_inmemory_storage.py | 16 ++++++++-------- 5 files changed, 30 insertions(+), 24 deletions(-) diff --git a/splitio/client/factory.py b/splitio/client/factory.py index 670cf6c3..272e6f3f 100644 --- a/splitio/client/factory.py +++ b/splitio/client/factory.py @@ -1145,11 +1145,11 @@ def _build_localhost_factory(cfg): telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer() - events_queue = queue.Queue() + internal_events_queue = queue.Queue() storages = { - 'splits': InMemorySplitStorage(events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []), - 'segments': InMemorySegmentStorage(events_queue), # not used, just to avoid possible future errors. - 'rule_based_segments': InMemoryRuleBasedSegmentStorage(events_queue), + 'splits': InMemorySplitStorage(internal_events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []), + 'segments': InMemorySegmentStorage(internal_events_queue), # not used, just to avoid possible future errors. + 'rule_based_segments': InMemoryRuleBasedSegmentStorage(internal_events_queue), 'impressions': LocalhostImpressionsStorage(), 'events': LocalhostEventsStorage(), } @@ -1162,6 +1162,8 @@ def _build_localhost_factory(cfg): LocalSegmentSynchronizer(cfg['segmentDirectory'], storages['splits'], storages['segments']), None, None, None, ) + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) + internal_events_task = EventsTask(events_manager.notify_internal_event, internal_events_queue) feature_flag_sync_task = None segment_sync_task = None @@ -1178,6 +1180,7 @@ def _build_localhost_factory(cfg): feature_flag_sync_task, segment_sync_task, None, None, None, + internal_events_task ) sdk_metadata = util.get_metadata(cfg) @@ -1199,8 +1202,7 @@ def _build_localhost_factory(cfg): telemetry_evaluation_producer, telemetry_runtime_producer ) - internal_events_queue = queue.Queue() - events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) + internal_events_task.start() return SplitFactory( 'localhost', @@ -1226,6 +1228,8 @@ async def _build_localhost_factory_async(cfg): internal_events_queue = asyncio.Queue() events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) + internal_events_task = EventsTaskAsync(events_manager.notify_internal_event, internal_events_queue) + storages = { 'splits': InMemorySplitStorageAsync(internal_events_queue), 'segments': InMemorySegmentStorageAsync(internal_events_queue), # not used, just to avoid possible future errors. @@ -1258,6 +1262,7 @@ async def _build_localhost_factory_async(cfg): feature_flag_sync_task, segment_sync_task, None, None, None, + internal_events_task ) sdk_metadata = util.get_metadata(cfg) @@ -1277,8 +1282,9 @@ async def _build_localhost_factory_async(cfg): storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer - ) - + ) + internal_events_task.start() + return SplitFactoryAsync( 'localhost', storages, diff --git a/splitio/events/events_metadata.py b/splitio/events/events_metadata.py index 5d6f4961..0707a8f5 100644 --- a/splitio/events/events_metadata.py +++ b/splitio/events/events_metadata.py @@ -5,7 +5,7 @@ class SdkEventType(Enum): """Public event types""" FLAG_UPDATE = 'FLAG_UPDATE' - SEGMENT_UPDATE = 'SEGMENT_UPDATE' + SEGMENTS_UPDATE = 'SEGMENTS_UPDATE' class EventsMetadata(object): """Events Metadata class.""" diff --git a/splitio/storage/inmemmory.py b/splitio/storage/inmemmory.py index bbde8816..db71f7fd 100644 --- a/splitio/storage/inmemmory.py +++ b/splitio/storage/inmemmory.py @@ -158,7 +158,7 @@ def update(self, to_add, to_delete, new_change_number): self._internal_event_queue.put( SdkInternalEventNotification( SdkInternalEvent.RB_SEGMENTS_UPDATED, - EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) + EventsMetadata(SdkEventType.SEGMENTS_UPDATE, {}))) def _put(self, rule_based_segment): """ @@ -290,7 +290,7 @@ async def update(self, to_add, to_delete, new_change_number): await self._internal_event_queue.put( SdkInternalEventNotification( SdkInternalEvent.RB_SEGMENTS_UPDATED, - EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) + EventsMetadata(SdkEventType.SEGMENTS_UPDATE, {}))) async def _put(self, rule_based_segment): """ @@ -999,7 +999,7 @@ def put(self, segment): self._internal_event_queue.put( SdkInternalEventNotification( SdkInternalEvent.SEGMENTS_UPDATED, - EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) + EventsMetadata(SdkEventType.SEGMENTS_UPDATE, {}))) def update(self, segment_name, to_add, to_remove, change_number=None): """ @@ -1025,7 +1025,7 @@ def update(self, segment_name, to_add, to_remove, change_number=None): self._internal_event_queue.put( SdkInternalEventNotification( SdkInternalEvent.SEGMENTS_UPDATED, - EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) + EventsMetadata(SdkEventType.SEGMENTS_UPDATE, {}))) def get_change_number(self, segment_name): """ @@ -1140,7 +1140,7 @@ async def put(self, segment): await self._internal_event_queue.put( SdkInternalEventNotification( SdkInternalEvent.SEGMENTS_UPDATED, - EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) + EventsMetadata(SdkEventType.SEGMENTS_UPDATE, {}))) async def update(self, segment_name, to_add, to_remove, change_number=None): @@ -1166,7 +1166,7 @@ async def update(self, segment_name, to_add, to_remove, change_number=None): await self._internal_event_queue.put( SdkInternalEventNotification( SdkInternalEvent.SEGMENTS_UPDATED, - EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) + EventsMetadata(SdkEventType.SEGMENTS_UPDATE, {}))) async def get_change_number(self, segment_name): diff --git a/tests/integration/test_streaming_e2e.py b/tests/integration/test_streaming_e2e.py index a673c65c..bb2dc91a 100644 --- a/tests/integration/test_streaming_e2e.py +++ b/tests/integration/test_streaming_e2e.py @@ -128,7 +128,7 @@ def test_happiness(self): sse_server.publish(make_segment_change_event('segment1', 1)) time.sleep(1) assert self.update_flag - assert self.metadata[len(self.metadata)-1].get_type() == SdkEventType.SEGMENT_UPDATE + assert self.metadata[len(self.metadata)-1].get_type() == SdkEventType.SEGMENTS_UPDATE flag = False for meta in self.metadata: if 'split2' in meta.get_names(): diff --git a/tests/storage/test_inmemory_storage.py b/tests/storage/test_inmemory_storage.py index 0f830239..d46980aa 100644 --- a/tests/storage/test_inmemory_storage.py +++ b/tests/storage/test_inmemory_storage.py @@ -811,13 +811,13 @@ def test_internal_event_notification(self): storage.put(segment) event = events_queue.get() assert event.internal_event == SdkInternalEvent.SEGMENTS_UPDATED - assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert event.metadata.get_type() == SdkEventType.SEGMENTS_UPDATE assert len(event.metadata.get_names()) == 0 storage.update('some_segment', ['key4', 'key5'], ['key2', 'key3'], 456) event = events_queue.get() assert event.internal_event == SdkInternalEvent.SEGMENTS_UPDATED - assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert event.metadata.get_type() == SdkEventType.SEGMENTS_UPDATE assert len(event.metadata.get_names()) == 0 class InMemorySegmentStorageAsyncTests(object): @@ -893,13 +893,13 @@ async def test_internal_event_notification(self): await storage.put(segment) event = await events_queue.get() assert event.internal_event == SdkInternalEvent.SEGMENTS_UPDATED - assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert event.metadata.get_type() == SdkEventType.SEGMENTS_UPDATE assert len(event.metadata.get_names()) == 0 await storage.update('some_segment', ['key4', 'key5'], ['key2', 'key3'], 456) event = await events_queue.get() assert event.internal_event == SdkInternalEvent.SEGMENTS_UPDATED - assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert event.metadata.get_type() == SdkEventType.SEGMENTS_UPDATE assert len(event.metadata.get_names()) == 0 class InMemoryImpressionsStorageTests(object): @@ -2006,12 +2006,12 @@ def test_internal_event_notification(self, mocker): rbs_storage.update([segment1, segment2], [], -1) event = events_queue.get() assert event.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED - assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert event.metadata.get_type() == SdkEventType.SEGMENTS_UPDATE assert len(event.metadata.get_names()) == 0 rbs_storage.update([], ['some_segment'], -1) assert event.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED - assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert event.metadata.get_type() == SdkEventType.SEGMENTS_UPDATE assert len(event.metadata.get_names()) == 0 class InMemoryRuleBasedSegmentStorageAsyncTests(object): @@ -2093,12 +2093,12 @@ async def test_internal_event_notification(self, mocker): await rbs_storage.update([segment1, segment2], [], -1) event = await events_queue.get() assert event.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED - assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert event.metadata.get_type() == SdkEventType.SEGMENTS_UPDATE assert len(event.metadata.get_names()) == 0 await rbs_storage.update([], ['some_segment'], -1) event = await events_queue.get() assert event.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED - assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert event.metadata.get_type() == SdkEventType.SEGMENTS_UPDATE assert len(event.metadata.get_names()) == 0