Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 30 additions & 17 deletions splitio/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
from collections import namedtuple
import copy

from splitio.client import input_validator
from splitio.engine.evaluator import Evaluator, CONTROL, EvaluationDataFactory, AsyncEvaluationDataFactory
from splitio.engine.splitters import Splitter
from splitio.models.impressions import Impression, Label, ImpressionDecorated
from splitio.models.events import Event, EventWrapper, SdkEvent
from splitio.models.telemetry import get_latency_bucket_index, MethodExceptionsAndLatencies
from splitio.client import input_validator
from splitio.optional.loaders import asyncio
from splitio.util.time import get_current_epoch_time_ms, utctime_ms


Expand Down Expand Up @@ -40,7 +41,7 @@ class ClientBase(object): # pylint: disable=too-many-instance-attributes
'impressions_disabled': False
}

def __init__(self, factory, recorder, labels_enabled=True, fallback_treatment_calculator=None):
def __init__(self, factory, recorder, events_manager, labels_enabled=True, fallback_treatment_calculator=None):
"""
Construct a Client instance.

Expand All @@ -66,6 +67,7 @@ def __init__(self, factory, recorder, labels_enabled=True, fallback_treatment_ca
self._telemetry_evaluation_producer = self._factory._telemetry_evaluation_producer
self._telemetry_init_producer = self._factory._telemetry_init_producer
self._fallback_treatment_calculator = fallback_treatment_calculator
self._events_manager = events_manager

@property
def ready(self):
Expand Down Expand Up @@ -221,6 +223,23 @@ def _get_fallback_eval_results(self, eval_result, feature):
def _check_impression_label(self, result):
return result['impression']['label'] == None or (result['impression']['label'] != None and result['impression']['label'].find(Label.SPLIT_NOT_FOUND) == -1)

def _validate_sdk_event_info(self, sdk_event, callback_handle):
if not self._check_sdk_event(sdk_event):
return False

if not hasattr(callback_handle, '__call__'):
_LOGGER.warning("Client Event Subscription: The callback handle passed must be of type function, ignoring event subscribing action.")
return False

return True

def _check_sdk_event(self, sdk_event):
if not isinstance(sdk_event, SdkEvent):
_LOGGER.warning("Client Event Subscription: The event passed must be of type SdkEvent, ignoring event subscribing action.")
return False

return True

class Client(ClientBase): # pylint: disable=too-many-instance-attributes
"""Entry point for the split sdk."""

Expand All @@ -239,8 +258,7 @@ def __init__(self, factory, recorder, events_manager, labels_enabled=True, fallb

:rtype: Client
"""
ClientBase.__init__(self, factory, recorder, labels_enabled, fallback_treatment_calculator)
self._events_manager = events_manager
ClientBase.__init__(self, factory, recorder, events_manager, labels_enabled, fallback_treatment_calculator)
self._context_factory = EvaluationDataFactory(factory._get_storage('splits'), factory._get_storage('segments'), factory._get_storage('rule_based_segments'))

def destroy(self):
Expand All @@ -256,17 +274,6 @@ def on(self, sdk_event, callback_handle):
return

self._events_manager.register(sdk_event, callback_handle)

def _validate_sdk_event_info(self, sdk_event, callback_handle):
if not isinstance(sdk_event, SdkEvent):
_LOGGER.warning("Client Event Subscription: The event passed must be of type SdkEvent, ignoring event subscribing action.")
return False

if not hasattr(callback_handle, '__call__'):
_LOGGER.warning("Client Event Subscription: The callback handle passed must be of type function, ignoring event subscribing action.")
return False

return True

def get_treatment(self, key, feature_flag_name, attributes=None, evaluation_options=None):
"""
Expand Down Expand Up @@ -743,7 +750,7 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
class ClientAsync(ClientBase): # pylint: disable=too-many-instance-attributes
"""Entry point for the split sdk."""

def __init__(self, factory, recorder, labels_enabled=True, fallback_treatment_calculator=None):
def __init__(self, factory, recorder, events_manager, labels_enabled=True, fallback_treatment_calculator=None):
"""
Construct a Client instance.

Expand All @@ -758,7 +765,7 @@ def __init__(self, factory, recorder, labels_enabled=True, fallback_treatment_ca

:rtype: Client
"""
ClientBase.__init__(self, factory, recorder, labels_enabled, fallback_treatment_calculator)
ClientBase.__init__(self, factory, recorder, events_manager, labels_enabled, fallback_treatment_calculator)
self._context_factory = AsyncEvaluationDataFactory(factory._get_storage('splits'), factory._get_storage('segments'), factory._get_storage('rule_based_segments'))

async def destroy(self):
Expand All @@ -769,6 +776,12 @@ async def destroy(self):
"""
await self._factory.destroy()

async def on(self, sdk_event, callback_handle):
if not self._validate_sdk_event_info(sdk_event, callback_handle):
return

await self._events_manager.register(sdk_event, callback_handle)

async def get_treatment(self, key, feature_flag_name, attributes=None, evaluation_options=None):
"""
Get the treatment for a feature and key, with an optional dictionary of attributes, for async calls
Expand Down
44 changes: 34 additions & 10 deletions splitio/client/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
TelemetryStorageProducerAsync, TelemetryStorageConsumerAsync
from splitio.engine.impressions.manager import Counter as ImpressionsCounter
from splitio.engine.impressions.unique_keys_tracker import UniqueKeysTracker, UniqueKeysTrackerAsync
from splitio.events.events_manager import EventsManager
from splitio.events.events_manager import EventsManager, EventsManagerAsync
from splitio.events.events_manager_config import EventsManagerConfig
from splitio.events.events_task import EventsTask
from splitio.events.events_task import EventsTask, EventsTaskAsync
from splitio.events.events_delivery import EventsDelivery
from splitio.models.fallback_config import FallbackTreatmentCalculator
from splitio.models.notification import SdkInternalEventNotification
Expand Down Expand Up @@ -352,6 +352,8 @@ def __init__( # pylint: disable=too-many-arguments
storages,
labels_enabled,
recorder,
internal_events_queue,
events_manager,
sync_manager=None,
telemetry_producer=None,
telemetry_init_producer=None,
Expand Down Expand Up @@ -387,6 +389,8 @@ def __init__( # pylint: disable=too-many-arguments
self._telemetry_submitter = telemetry_submitter
self._ready_time = get_current_epoch_time_ms()
_LOGGER.debug("Running in asyncio mode")
self._internal_events_queue = internal_events_queue
self._events_manager = events_manager
self._manager_start_task = manager_start_task
self._status = Status.NOT_INITIALIZED
self._sdk_ready_flag = asyncio.Event()
Expand All @@ -409,6 +413,7 @@ async def _update_status_when_ready_async(self):
_LOGGER.debug(str(e))
self._status = Status.READY
self._sdk_ready_flag.set()
await self._internal_events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_READY, None))

def manager(self):
"""
Expand All @@ -434,6 +439,7 @@ async def block_until_ready(self, timeout=None):
_LOGGER.error("Exception initializing SDK")
_LOGGER.debug(str(e))
await self._telemetry_init_producer.record_bur_time_out()
await self._internal_events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_TIMED_OUT, None))
raise TimeoutException('SDK Initialization: time of %d exceeded' % timeout)

async def destroy(self, destroyed_event=None):
Expand Down Expand Up @@ -481,7 +487,7 @@ def client(self):
This client is only a set of references to structures hold by the factory.
Creating one a fast operation and safe to be used anywhere.
"""
return ClientAsync(self, self._recorder, self._labels_enabled, self._fallback_treatment_calculator)
return ClientAsync(self, self._recorder, self._events_manager, self._labels_enabled, self._fallback_treatment_calculator)

def _wrap_impression_listener(listener, metadata):
"""
Expand Down Expand Up @@ -698,11 +704,14 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=
'events': EventsAPIAsync(http_client, api_key, sdk_metadata, telemetry_runtime_producer),
'telemetry': TelemetryAPIAsync(http_client, api_key, sdk_metadata, telemetry_runtime_producer),
}
internal_events_queue = asyncio.Queue()
events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery())
internal_events_task = EventsTaskAsync(events_manager.notify_internal_event, internal_events_queue)

storages = {
'splits': InMemorySplitStorageAsync(cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
'segments': InMemorySegmentStorageAsync(),
'rule_based_segments': InMemoryRuleBasedSegmentStorageAsync(),
'splits': InMemorySplitStorageAsync(internal_events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
'segments': InMemorySegmentStorageAsync(internal_events_queue),
'rule_based_segments': InMemoryRuleBasedSegmentStorageAsync(internal_events_queue),
'impressions': InMemoryImpressionStorageAsync(cfg['impressionsQueueSize'], telemetry_runtime_producer),
'events': InMemoryEventStorageAsync(cfg['eventsQueueSize'], telemetry_runtime_producer),
}
Expand Down Expand Up @@ -748,6 +757,7 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=
TelemetrySyncTaskAsync(synchronizers.telemetry_sync.synchronize_stats, cfg['metricsRefreshRate']),
unique_keys_task,
clear_filter_task,
internal_events_task
)

synchronizer = SynchronizerAsync(synchronizers, tasks)
Expand All @@ -770,11 +780,12 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=
)

await telemetry_init_producer.record_config(cfg, extra_cfg, total_flag_sets, invalid_flag_sets)
internal_events_task.start()

manager_start_task = asyncio.get_running_loop().create_task(manager.start())

return SplitFactoryAsync(api_key, storages, cfg['labelsEnabled'],
recorder, manager,
recorder, internal_events_queue, events_manager, manager,
telemetry_producer, telemetry_init_producer,
telemetry_submitter, manager_start_task=manager_start_task,
api_client=http_client, fallback_treatment_calculator=FallbackTreatmentCalculator(cfg['fallbackTreatments']))
Expand Down Expand Up @@ -933,12 +944,16 @@ async def _build_redis_factory_async(api_key, cfg):
manager = RedisManagerAsync(synchronizer)
await telemetry_init_producer.record_config(cfg, {}, 0, 0)
manager.start()
internal_events_queue = asyncio.Queue()
events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery())

split_factory = SplitFactoryAsync(
api_key,
storages,
cfg['labelsEnabled'],
recorder,
internal_events_queue,
events_manager,
manager,
telemetry_producer=telemetry_producer,
telemetry_init_producer=telemetry_init_producer,
Expand Down Expand Up @@ -1101,12 +1116,16 @@ async def _build_pluggable_factory_async(api_key, cfg):
manager = RedisManagerAsync(synchronizer)
manager.start()
await telemetry_init_producer.record_config(cfg, {}, 0, 0)
internal_events_queue = asyncio.Queue()
events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery())

split_factory = SplitFactoryAsync(
api_key,
storages,
cfg['labelsEnabled'],
recorder,
internal_events_queue,
events_manager,
manager,
telemetry_producer=telemetry_producer,
telemetry_init_producer=telemetry_init_producer,
Expand Down Expand Up @@ -1205,10 +1224,12 @@ async def _build_localhost_factory_async(cfg):
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()

internal_events_queue = asyncio.Queue()
events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery())
storages = {
'splits': InMemorySplitStorageAsync(),
'segments': InMemorySegmentStorageAsync(), # not used, just to avoid possible future errors.
'rule_based_segments': InMemoryRuleBasedSegmentStorageAsync(),
'splits': InMemorySplitStorageAsync(internal_events_queue),
'segments': InMemorySegmentStorageAsync(internal_events_queue), # not used, just to avoid possible future errors.
'rule_based_segments': InMemoryRuleBasedSegmentStorageAsync(internal_events_queue),
'impressions': LocalhostImpressionsStorageAsync(),
'events': LocalhostEventsStorageAsync(),
}
Expand Down Expand Up @@ -1257,11 +1278,14 @@ async def _build_localhost_factory_async(cfg):
telemetry_evaluation_producer,
telemetry_runtime_producer
)

return SplitFactoryAsync(
'localhost',
storages,
False,
recorder,
internal_events_queue,
events_manager,
manager,
telemetry_producer=telemetry_producer,
telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(),
Expand Down
7 changes: 7 additions & 0 deletions splitio/events/events_delivery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,10 @@ def deliver(self, sdk_event, event_metadata, event_handler):
except Exception as ex:
_LOGGER.error("Exception when calling handler for Sdk Event %s", sdk_event)
_LOGGER.error(ex)

async def deliver_async(self, sdk_event, event_metadata, event_handler):
try:
await event_handler(event_metadata)
except Exception as ex:
_LOGGER.error("Exception when calling handler for Sdk Event %s", sdk_event)
_LOGGER.error(ex)
Loading