From b93d090622b7a0106116da1d5e1a9b5662508766 Mon Sep 17 00:00:00 2001 From: Martin Redolatti Date: Wed, 7 Oct 2020 16:51:21 -0300 Subject: [PATCH 1/4] initial integration --- splitio/api/impressions.py | 62 ++++- splitio/client/client.py | 46 +--- splitio/client/config.py | 1 + splitio/client/factory.py | 30 ++- .../engine/{impmanager.py => impressions.py} | 50 +++- splitio/tasks/impressions_sync.py | 81 ++++++ tests/api/test_impressions_api.py | 65 ++++- tests/client/test_client.py | 101 ++++---- tests/client/test_factory.py | 5 +- tests/client/test_input_validator.py | 12 +- ...test_impmanager.py => test_impressions.py} | 147 ++++++----- tests/integration/test_client_e2e.py | 232 +++++++++++++++++- 12 files changed, 616 insertions(+), 216 deletions(-) rename splitio/engine/{impmanager.py => impressions.py} (76%) rename tests/engine/{test_impmanager.py => test_impressions.py} (84%) diff --git a/splitio/api/impressions.py b/splitio/api/impressions.py index 3f26e8c9..36ce5b4b 100644 --- a/splitio/api/impressions.py +++ b/splitio/api/impressions.py @@ -35,19 +35,19 @@ def _build_bulk(impressions): :type impressions: list(splitio.models.impressions.Impression) :return: Dictionary of lists of impressions. - :rtype: dict + :rtype: list """ return [ { - 'testName': test_name, - 'keyImpressions': [ + 'f': test_name, + 'i': [ { - 'keyName': impression.matching_key, - 'treatment': impression.treatment, - 'time': impression.time, - 'changeNumber': impression.change_number, - 'label': impression.label, - 'bucketingKey': impression.bucketing_key + 'k': impression.matching_key, + 't': impression.treatment, + 'm': impression.time, + 'c': impression.change_number, + 'r': impression.label, + 'b': impression.bucketing_key } for impression in imps ] @@ -58,6 +58,27 @@ def _build_bulk(impressions): ) ] + @staticmethod + def _build_counters(counters): + """ + Build an impression bulk formatted as the API expects it. + + :param counters: List of impression counters per feature. + :type counters: list[splitio.engine.impressions.Counter.CountPerFeature] + + :return: dict with list of impression count dtos + :rtype: dict + """ + return { + 'pf': [ + { + 'f': pf_count.feature, + 'm': pf_count.timeframe, + 'c': pf_count.count + } for pf_count in counters + ] + } + def flush_impressions(self, impressions): """ Send impressions to the backend. @@ -80,3 +101,26 @@ def flush_impressions(self, impressions): self._logger.error('Http client is throwing exceptions') self._logger.debug('Error: ', exc_info=True) raise_from(APIException('Impressions not flushed properly.'), exc) + + def flush_counters(self, counters): + """ + Send impressions to the backend. + + :param impressions: Impressions bulk + :type impressions: list + """ + bulk = self._build_counters(counters) + try: + response = self._client.post( + 'events', + '/testImpressions/count', + self._apikey, + body=bulk, + extra_headers=self._metadata + ) + if not 200 <= response.status_code < 300: + raise APIException(response.body, response.status_code) + except HttpClientException as exc: + self._logger.error('Http client is throwing exceptions') + self._logger.debug('Error: ', exc_info=True) + raise_from(APIException('Impressions not flushed properly.'), exc) diff --git a/splitio/client/client.py b/splitio/client/client.py index 894503fb..3151114f 100644 --- a/splitio/client/client.py +++ b/splitio/client/client.py @@ -11,7 +11,6 @@ from splitio.models.events import Event, EventWrapper from splitio.models.telemetry import get_latency_bucket_index from splitio.client import input_validator -from splitio.client.listener import ImpressionListenerException class Client(object): # pylint: disable=too-many-instance-attributes @@ -22,7 +21,7 @@ class Client(object): # pylint: disable=too-many-instance-attributes _METRIC_GET_TREATMENT_WITH_CONFIG = 'sdk.getTreatmentWithConfig' _METRIC_GET_TREATMENTS_WITH_CONFIG = 'sdk.getTreatmentsWithConfig' - def __init__(self, factory, labels_enabled=True, impression_listener=None): + def __init__(self, factory, impressions_manager, labels_enabled=True): """ Construct a Client instance. @@ -32,15 +31,15 @@ def __init__(self, factory, labels_enabled=True, impression_listener=None): :param labels_enabled: Whether to store labels on impressions :type labels_enabled: bool - :param impression_listener: impression listener implementation - :type impression_listener: ImpressionListener + :param impressions_manager: impression manager instance + :type impressions_manager: splitio.engine.impressions.Manager :rtype: Client """ self._logger = logging.getLogger(self.__class__.__name__) self._factory = factory self._labels_enabled = labels_enabled - self._impression_listener = impression_listener + self._impressions_manager = impressions_manager self._splitter = Splitter() self._split_storage = factory._get_storage('splits') # pylint: disable=protected-access @@ -68,25 +67,6 @@ def destroyed(self): """Return whether the factory holding this client has been destroyed.""" return self._factory.destroyed - def _send_impression_to_listener(self, impression, attributes): - """ - Send impression result to custom listener. - - :param impression: Generated impression - :type impression: Impression - - :param attributes: An optional dictionary of attributes - :type attributes: dict - """ - if self._impression_listener is not None: - try: - self._impression_listener.log_impression(impression, attributes) - except ImpressionListenerException: - self._logger.error( - 'An exception was raised while calling user-custom impression listener' - ) - self._logger.debug('Error', exc_info=True) - def _evaluate_if_ready(self, matching_key, bucketing_key, feature, attributes=None): if not self.ready: return { @@ -138,8 +118,7 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name): start ) - self._record_stats([impression], start, metric_name) - self._send_impression_to_listener(impression, attributes) + self._record_stats([(impression, attributes)], start, metric_name) return result['treatment'], result['configurations'] except Exception: # pylint: disable=broad-except self._logger.error('Error getting treatment for feature') @@ -154,8 +133,7 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name): bucketing_key, start ) - self._record_stats([impression], start, metric_name) - self._send_impression_to_listener(impression, attributes) + self._record_stats([(impression, attributes)], start, metric_name) except Exception: # pylint: disable=broad-except self._logger.error('Error reporting impression into get_treatment exception block') self._logger.debug('Error: ', exc_info=True) @@ -215,9 +193,11 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name) # Register impressions try: if bulk_impressions: - self._record_stats(bulk_impressions, start, self._METRIC_GET_TREATMENTS) - for impression in bulk_impressions: - self._send_impression_to_listener(impression, attributes) + self._record_stats( + [(i, attributes) for i in bulk_impressions], + start, + metric_name + ) except Exception: # pylint: disable=broad-except self._logger.error('%s: An exception when trying to store ' 'impressions.' % method_name) @@ -350,7 +330,7 @@ def _record_stats(self, impressions, start, operation): Record impressions and metrics. :param impressions: Generated impressions - :type impressions: list||Impression + :type impressions: list[tuple[splitio.models.impression.Impression, dict]] :param start: timestamp when get_treatment or get_treatments was called :type start: int @@ -360,7 +340,7 @@ def _record_stats(self, impressions, start, operation): """ try: end = int(round(time.time() * 1000)) - self._impressions_storage.put(impressions) + self._impressions_manager.track(impressions) self._telemetry_storage.inc_latency(operation, get_latency_bucket_index(end - start)) except Exception: # pylint: disable=broad-except self._logger.error('Error recording impressions and metrics') diff --git a/splitio/client/config.py b/splitio/client/config.py index 30ea411e..db104c18 100644 --- a/splitio/client/config.py +++ b/splitio/client/config.py @@ -17,6 +17,7 @@ 'eventsQueueSize': 10000, 'labelsEnabled': True, 'IPAddressesEnabled': True, + 'impressionsMode': 'OPTIMIZED', 'impressionListener': None, 'redisLocalCacheEnabled': False, 'redisLocalCacheTTL': 5, diff --git a/splitio/client/factory.py b/splitio/client/factory.py index 01a0fc0d..2e900c1a 100644 --- a/splitio/client/factory.py +++ b/splitio/client/factory.py @@ -15,6 +15,7 @@ from splitio.client.config import DEFAULT_CONFIG from splitio.client import util from splitio.client.listener import ImpressionListenerWrapper +from splitio.engine.impressions import Manager as ImpressionsManager # Storage from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \ @@ -73,10 +74,10 @@ def __init__( # pylint: disable=too-many-arguments apikey, storages, labels_enabled, + impressions_manager=None, apis=None, tasks=None, sdk_ready_flag=None, - impression_listener=None ): """ Class constructor. @@ -91,8 +92,8 @@ def __init__( # pylint: disable=too-many-arguments :type tasks: dict :param sdk_ready_flag: Event to set when the sdk is ready. :type sdk_ready_flag: threading.Event - :param impression_listener: User custom listener to handle impressions locally. - :type impression_listener: splitio.client.listener.ImpressionListener + :param impression_manager: Impressions manager instance + :type impression_listener: ImpressionsManager """ self._apikey = apikey self._logger = logging.getLogger(self.__class__.__name__) @@ -101,7 +102,7 @@ def __init__( # pylint: disable=too-many-arguments self._apis = apis if apis else {} self._tasks = tasks if tasks else {} self._sdk_ready_flag = sdk_ready_flag - self._impression_listener = impression_listener + self._impressions_manager = impressions_manager # If we have a ready flag, it means we have sync tasks that need to finish # before the SDK client becomes ready. @@ -138,7 +139,7 @@ def client(self): This client is only a set of references to structures hold by the factory. Creating one a fast operation and safe to be used anywhere. """ - return Client(self, self._labels_enabled, self._impression_listener) + return Client(self, self._labels_enabled, self._impressions_manager) def manager(self): """ @@ -337,10 +338,11 @@ def segment_ready_task(): api_key, storages, cfg['labelsEnabled'], + ImpressionsManager(storages['impressions'].put, cfg['impressionsMode'], True, + _wrap_impression_listener(cfg['impressionListener'], sdk_metadata)), apis, tasks, sdk_ready_flag, - impression_listener=_wrap_impression_listener(cfg['impressionListener'], sdk_metadata) ) @@ -363,7 +365,8 @@ def _build_redis_factory(api_key, config): api_key, storages, cfg['labelsEnabled'], - impression_listener=_wrap_impression_listener(cfg['impressionListener'], sdk_metadata) + ImpressionsManager(storages['impressions'].put, cfg['impressionsMode'], False, + _wrap_impression_listener(cfg['impressionListener'], sdk_metadata)) ) @@ -384,7 +387,8 @@ def _build_uwsgi_factory(api_key, config): api_key, storages, cfg['labelsEnabled'], - impression_listener=_wrap_impression_listener(cfg['impressionListener'], sdk_metadata) + ImpressionsManager(storages['impressions'].put, cfg['impressionsMode'], True, + _wrap_impression_listener(cfg['impressionListener'], sdk_metadata)) ) @@ -408,7 +412,15 @@ def _build_localhost_factory(config): ready_event )} tasks['splits'].start() - return SplitFactory('localhost', storages, False, None, tasks, ready_event) + return SplitFactory( + 'localhost', + storages, + False, + ImpressionsManager(storages['impressions'].put, cfg['impressionsMode'], True, None), + None, + tasks, + ready_event + ) def get_factory(api_key, **kwargs): diff --git a/splitio/engine/impmanager.py b/splitio/engine/impressions.py similarity index 76% rename from splitio/engine/impmanager.py rename to splitio/engine/impressions.py index 2571e9a0..0aabdeba 100644 --- a/splitio/engine/impmanager.py +++ b/splitio/engine/impressions.py @@ -9,6 +9,7 @@ from splitio.models.impressions import Impression from splitio.engine.hashfns import murmur_128 from splitio.engine.cache.lru import SimpleLruCache +from splitio.client.listener import ImpressionListenerException from splitio import util @@ -116,7 +117,7 @@ class Counter(object): """Class that counts impressions per timeframe.""" CounterKey = namedtuple('Count', ['feature', 'timeframe']) - CountPerFeature = namedtuple('Count', ['feature', 'timeframe', 'count']) + CountPerFeature = namedtuple('CountPerFeature', ['feature', 'timeframe', 'count']) def __init__(self): """Class constructor.""" @@ -172,6 +173,12 @@ def __init__(self, forwarder, mode=ImpressionsMode.OPTIMIZED, standalone=True, l :param listener: Optional impressions listener that will capture all seen impressions. :type listener: splitio.client.listener.ImpressionListenerWrapper """ + if not isinstance(mode, ImpressionsMode): + try: + mode = ImpressionsMode(mode) + except ValueError: + mode = ImpressionsMode.OPTIMIZED + self._forwarder = forwarder self._observer = Observer(_IMPRESSION_OBSERVER_CACHE_SIZE) if standalone else None self._counter = Counter() if standalone and mode == ImpressionsMode.OPTIMIZED else None @@ -183,19 +190,42 @@ def track(self, impressions): Impressions are analyzed to see if they've been seen before and counted. - :param impressions: List of impression objects - :type impressions: list[splitio.models.impression.Impression] + :param impressions: List of impression objects with attributes + :type impressions: list[tuple[splitio.models.impression.Impression, dict]] """ - imps = [self._observer.test_and_set(i) for i in impressions] if self._observer \ + imps = [(self._observer.test_and_set(imp), attrs) for imp, attrs in impressions] if self._observer \ else impressions if self._counter: - self._counter.track(imps) + self._counter.track([imp for imp, _ in imps]) - if self._listener: - for imp in imps: - self._listener.log_impression(imp) + self._send_impressions_to_listener(imps) this_hour = truncate_time(util.utctime_ms()) - self._forwarder(imps if self._counter is None - else [i for i in imps if i.previous_time is None or i.previous_time < this_hour]) # pylint:disable=line-too-long + self._forwarder([imp for imp, _ in imps] if self._counter is None + else [i for i, _ in imps if i.previous_time is None or i.previous_time < this_hour]) + + def get_counts(self): + """ + Return counts of impressions per features. + + :returns: A list of counter objects. + :rtype: list[Counter.CountPerFeature] + """ + return self._counter.pop_all() if self._counter is not None else [] + + def _send_impressions_to_listener(self, impressions): + """ + Send impression result to custom listener. + + :param impressions: List of impression objects with attributes + :type impressions: list[tuple[splitio.models.impression.Impression, dict]] + """ + if self._listener is not None: + try: + for impression, attributes in impressions: + self._listener.log_impression(impression, attributes) + except ImpressionListenerException: + pass +# self._logger.error('An exception was raised while calling user-custom impression listener') +# self._logger.debug('Error', exc_info=True) diff --git a/splitio/tasks/impressions_sync.py b/splitio/tasks/impressions_sync.py index 075f4547..4e372ef1 100644 --- a/splitio/tasks/impressions_sync.py +++ b/splitio/tasks/impressions_sync.py @@ -9,6 +9,7 @@ from splitio.api import APIException from splitio.tasks import BaseSynchronizationTask from splitio.tasks.util.asynctask import AsyncTask +from splitio.engine.impressions import Manager as ImpressionsManager class ImpressionsSyncTask(BaseSynchronizationTask): @@ -99,3 +100,83 @@ def is_running(self): def flush(self): """Flush impressions in storage.""" self._task.force_execution() + + +class ImpressionsCountSyncTask(BaseSynchronizationTask): + """Impressions synchronization task uses an asynctask.AsyncTask to send impressions.""" + + def __init__(self, impressions_manager): + """ + Class constructor. + + :param impressions_manager: Impressions manager instance + :type impressions_manager: ImpressionsManager + """ + self._logger = logging.getLogger(self.__class__.__name__) + self._impressions_manager = impressions_manager + self._task = AsyncTask(self._send_impressions, self._period, on_stop=self._send_impressions) + + def _get_failed(self): + """Return up to impressions stored in the failed impressions queue.""" + imps = [] + count = 0 + while count < self._bulk_size: + try: + imps.append(self._failed.get(False)) + count += 1 + except queue.Empty: + # If no more items in queue, break the loop + break + return imps + + def _add_to_failed_queue(self, imps): + """ + Add impressions that were about to be sent to a secondary queue for failed sends. + + :param imps: List of impressions that failed to be pushed. + :type imps: list + """ + for impression in imps: + self._failed.put(impression, False) + + def _send_impressions(self): + """Send impressions from both the failed and new queues.""" + to_send = self._get_failed() + if len(to_send) < self._bulk_size: + # If the amount of previously failed items is less than the bulk + # size, try to complete with new impressions from storage + to_send.extend(self._storage.pop_many(self._bulk_size - len(to_send))) + + if not to_send: + return + + try: + self._impressions_api.flush_impressions(to_send) + except APIException as exc: + self._logger.error( + 'Exception raised while reporting impressions: %s -- %d', + exc.message, + exc.status_code + ) + self._add_to_failed_queue(to_send) + + def start(self): + """Start executing the impressions synchronization task.""" + self._task.start() + + def stop(self, event=None): + """Stop executing the impressions synchronization task.""" + self._task.stop(event) + + def is_running(self): + """ + Return whether the task is running or not. + + :return: True if the task is running. False otherwise. + :rtype: bool + """ + return self._task.running() + + def flush(self): + """Flush impressions in storage.""" + self._task.force_execution() diff --git a/tests/api/test_impressions_api.py b/tests/api/test_impressions_api.py index 63650acc..3428fdfc 100644 --- a/tests/api/test_impressions_api.py +++ b/tests/api/test_impressions_api.py @@ -3,6 +3,7 @@ import pytest from splitio.api import impressions, client, APIException from splitio.models.impressions import Impression +from splitio.engine.impressions import Counter from splitio.client.util import get_metadata from splitio.client.config import DEFAULT_CONFIG from splitio.version import __version__ @@ -16,18 +17,34 @@ class ImpressionsAPITests(object): Impression('k3', 'f1', 'on', 'l1', 123456, 'b1', 321654) ] expectedImpressions = [{ - 'testName': 'f1', - 'keyImpressions': [ - {'keyName': 'k1', 'bucketingKey': 'b1', 'treatment': 'on', 'label': 'l1', 'time': 321654, 'changeNumber': 123456}, - {'keyName': 'k3', 'bucketingKey': 'b1', 'treatment': 'on', 'label': 'l1', 'time': 321654, 'changeNumber': 123456}, + 'f': 'f1', + 'i': [ + {'k': 'k1', 'b': 'b1', 't': 'on', 'r': 'l1', 'm': 321654, 'c': 123456}, + {'k': 'k3', 'b': 'b1', 't': 'on', 'r': 'l1', 'm': 321654, 'c': 123456}, ], }, { - 'testName': 'f2', - 'keyImpressions': [ - {'keyName': 'k2', 'bucketingKey': 'b1', 'treatment': 'off', 'label': 'l1', 'time': 321654, 'changeNumber': 123456}, + 'f': 'f2', + 'i': [ + {'k': 'k2', 'b': 'b1', 't': 'off', 'r': 'l1', 'm': 321654, 'c': 123456}, ] }] + counters = [ + Counter.CountPerFeature('f1', 123, 2), + Counter.CountPerFeature('f2', 123, 123), + Counter.CountPerFeature('f1', 456, 111), + Counter.CountPerFeature('f2', 456, 222) + ] + + expected_counters = { + 'pf': [ + {'f': 'f1', 'm': 123, 'c': 2}, + {'f': 'f2', 'm': 123, 'c': 123}, + {'f': 'f1', 'm': 456, 'c': 111}, + {'f': 'f2', 'm': 456, 'c': 222}, + ] + } + def test_post_impressions(self, mocker): """Test impressions posting API call.""" httpclient = mocker.Mock(spec=client.HttpClient) @@ -84,3 +101,37 @@ def test_post_impressions_ip_address_disabled(self, mocker): # validate key-value args (body) assert call_made[2]['body'] == self.expectedImpressions + + def test_post_counters(self, mocker): + """Test impressions posting API call.""" + httpclient = mocker.Mock(spec=client.HttpClient) + httpclient.post.return_value = client.HttpResponse(200, '') + cfg = DEFAULT_CONFIG.copy() + cfg.update({'IPAddressesEnabled': True, 'machineName': 'some_machine_name', 'machineIp': '123.123.123.123'}) + sdk_metadata = get_metadata(cfg) + impressions_api = impressions.ImpressionsAPI(httpclient, 'some_api_key', sdk_metadata) + response = impressions_api.flush_counters(self.counters) + + call_made = httpclient.post.mock_calls[0] + + # validate positional arguments + assert call_made[1] == ('events', '/testImpressions/count', 'some_api_key') + + # validate key-value args (headers) + assert call_made[2]['extra_headers'] == { + 'SplitSDKVersion': 'python-%s' % __version__, + 'SplitSDKMachineIP': '123.123.123.123', + 'SplitSDKMachineName': 'some_machine_name' + } + + # validate key-value args (body) + assert call_made[2]['body'] == self.expected_counters + + httpclient.reset_mock() + def raise_exception(*args, **kwargs): + raise client.HttpClientException('some_message') + httpclient.post.side_effect = raise_exception + with pytest.raises(APIException) as exc_info: + response = impressions_api.flush_counters(self.counters) + assert exc_info.type == APIException + assert exc_info.value.message == 'some_message' diff --git a/tests/client/test_client.py b/tests/client/test_client.py index 2546b27d..26c7c937 100644 --- a/tests/client/test_client.py +++ b/tests/client/test_client.py @@ -13,6 +13,8 @@ from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \ InMemoryImpressionStorage, InMemoryTelemetryStorage, InMemoryEventStorage from splitio.models import splits, segments +from splitio.engine.impressions import Manager as ImpressionManager + class ClientTests(object): #pylint: disable=too-few-public-methods """Split client test cases.""" @@ -43,7 +45,8 @@ def _get_storage_mock(name): mocker.patch('splitio.client.client.time.time', new=lambda: 1) mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) - client = Client(factory, True, None) + impmanager = mocker.Mock(spec=ImpressionManager) + client = Client(factory, impmanager, True) client._evaluator = mocker.Mock(spec=Evaluator) client._evaluator.evaluate_feature.return_value = { 'treatment': 'on', @@ -54,28 +57,23 @@ def _get_storage_mock(name): }, } client._logger = mocker.Mock() - client._send_impression_to_listener = mocker.Mock() assert client.get_treatment('some_key', 'some_feature') == 'on' assert mocker.call( - [Impression('some_key', 'some_feature', 'on', 'some_label', 123, None, 1000)] - ) in impression_storage.put.mock_calls + [(Impression('some_key', 'some_feature', 'on', 'some_label', 123, None, 1000), None)] + ) in impmanager.track.mock_calls assert mocker.call('sdk.getTreatment', 5) in telemetry_storage.inc_latency.mock_calls assert client._logger.mock_calls == [] - assert mocker.call( - Impression('some_key', 'some_feature', 'on', 'some_label', 123, None, 1000), - None - ) in client._send_impression_to_listener.mock_calls # Test with client not ready ready_property = mocker.PropertyMock() ready_property.return_value = False type(factory).ready = ready_property - impression_storage.put.reset_mock() + impmanager.track.reset_mock() assert client.get_treatment('some_key', 'some_feature', {'some_attribute': 1}) == 'control' assert mocker.call( - [Impression('some_key', 'some_feature', 'control', Label.NOT_READY, mocker.ANY, mocker.ANY, mocker.ANY)] - ) in impression_storage.put.mock_calls + [(Impression('some_key', 'some_feature', 'control', Label.NOT_READY, mocker.ANY, mocker.ANY, mocker.ANY), {'some_attribute': 1})] + ) in impmanager.track.mock_calls # Test with exception: ready_property.return_value = True @@ -85,8 +83,8 @@ def _raise(*_): client._evaluator.evaluate_feature.side_effect = _raise assert client.get_treatment('some_key', 'some_feature') == 'control' assert mocker.call( - [Impression('some_key', 'some_feature', 'control', 'exception', -1, None, 1000)] - ) in impression_storage.put.mock_calls + [(Impression('some_key', 'some_feature', 'control', 'exception', -1, None, 1000), None)] + ) in impmanager.track.mock_calls assert len(telemetry_storage.inc_latency.mock_calls) == 3 def test_get_treatment_with_config(self, mocker): @@ -115,7 +113,8 @@ def _get_storage_mock(name): mocker.patch('splitio.client.client.time.time', new=lambda: 1) mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) - client = Client(factory, True, None) + impmanager = mocker.Mock(spec=ImpressionManager) + client = Client(factory, impmanager, True) client._evaluator = mocker.Mock(spec=Evaluator) client._evaluator.evaluate_feature.return_value = { 'treatment': 'on', @@ -133,24 +132,21 @@ def _get_storage_mock(name): 'some_feature' ) == ('on', '{"some_config": True}') assert mocker.call( - [Impression('some_key', 'some_feature', 'on', 'some_label', 123, None, 1000)] - ) in impression_storage.put.mock_calls + [(Impression('some_key', 'some_feature', 'on', 'some_label', 123, None, 1000), None)] + ) in impmanager.track.mock_calls assert mocker.call('sdk.getTreatmentWithConfig', 5) in telemetry_storage.inc_latency.mock_calls assert client._logger.mock_calls == [] - assert mocker.call( - Impression('some_key', 'some_feature', 'on', 'some_label', 123, None, 1000), - None - ) in client._send_impression_to_listener.mock_calls # Test with client not ready ready_property = mocker.PropertyMock() ready_property.return_value = False type(factory).ready = ready_property - impression_storage.put.reset_mock() + impmanager.track.reset_mock() assert client.get_treatment_with_config('some_key', 'some_feature', {'some_attribute': 1}) == ('control', None) assert mocker.call( - [Impression('some_key', 'some_feature', 'control', Label.NOT_READY, mocker.ANY, mocker.ANY, mocker.ANY)] - ) in impression_storage.put.mock_calls + [(Impression('some_key', 'some_feature', 'control', Label.NOT_READY, mocker.ANY, mocker.ANY, mocker.ANY), + {'some_attribute': 1})] + ) in impmanager.track.mock_calls # Test with exception: ready_property.return_value = True @@ -160,8 +156,8 @@ def _raise(*_): client._evaluator.evaluate_feature.side_effect = _raise assert client.get_treatment_with_config('some_key', 'some_feature') == ('control', None) assert mocker.call( - [Impression('some_key', 'some_feature', 'control', 'exception', -1, None, 1000)] - ) in impression_storage.put.mock_calls + [(Impression('some_key', 'some_feature', 'control', 'exception', -1, None, 1000), None)] + ) in impmanager.track.mock_calls assert len(telemetry_storage.inc_latency.mock_calls) == 3 def test_get_treatments(self, mocker): @@ -190,7 +186,8 @@ def _get_storage_mock(name): mocker.patch('splitio.client.client.time.time', new=lambda: 1) mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) - client = Client(factory, True, None) + impmanager = mocker.Mock(spec=ImpressionManager) + client = Client(factory, impmanager, True) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { 'treatment': 'on', @@ -208,29 +205,21 @@ def _get_storage_mock(name): client._send_impression_to_listener = mocker.Mock() assert client.get_treatments('key', ['f1', 'f2']) == {'f1': 'on', 'f2': 'on'} - impressions_called = impression_storage.put.mock_calls[0][1][0] - assert Impression('key', 'f1', 'on', 'some_label', 123, None, 1000) in impressions_called - assert Impression('key', 'f2', 'on', 'some_label', 123, None, 1000) in impressions_called + impressions_called = impmanager.track.mock_calls[0][1][0] + assert (Impression('key', 'f1', 'on', 'some_label', 123, None, 1000), None) in impressions_called + assert (Impression('key', 'f2', 'on', 'some_label', 123, None, 1000), None) in impressions_called assert mocker.call('sdk.getTreatments', 5) in telemetry_storage.inc_latency.mock_calls assert client._logger.mock_calls == [] - assert mocker.call( - Impression('key', 'f1', 'on', 'some_label', 123, None, 1000), - None - ) in client._send_impression_to_listener.mock_calls - assert mocker.call( - Impression('key', 'f2', 'on', 'some_label', 123, None, 1000), - None - ) in client._send_impression_to_listener.mock_calls # Test with client not ready ready_property = mocker.PropertyMock() ready_property.return_value = False type(factory).ready = ready_property - impression_storage.put.reset_mock() + impmanager.track.reset_mock() assert client.get_treatments('some_key', ['some_feature'], {'some_attribute': 1}) == {'some_feature': 'control'} assert mocker.call( - [Impression('some_key', 'some_feature', 'control', Label.NOT_READY, mocker.ANY, mocker.ANY, mocker.ANY)] - ) in impression_storage.put.mock_calls + [(Impression('some_key', 'some_feature', 'control', Label.NOT_READY, mocker.ANY, mocker.ANY, mocker.ANY), {'some_attribute': 1})] + ) in impmanager.track.mock_calls # Test with exception: ready_property.return_value = True @@ -267,7 +256,8 @@ def _get_storage_mock(name): mocker.patch('splitio.client.client.time.time', new=lambda: 1) mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) - client = Client(factory, True, None) + impmanager = mocker.Mock(spec=ImpressionManager) + client = Client(factory, impmanager, True) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { 'treatment': 'on', @@ -282,35 +272,26 @@ def _get_storage_mock(name): 'f2': evaluation } client._logger = mocker.Mock() - client._send_impression_to_listener = mocker.Mock() assert client.get_treatments_with_config('key', ['f1', 'f2']) == { 'f1': ('on', '{"color": "red"}'), 'f2': ('on', '{"color": "red"}') } - impressions_called = impression_storage.put.mock_calls[0][1][0] - assert Impression('key', 'f1', 'on', 'some_label', 123, None, 1000) in impressions_called - assert Impression('key', 'f2', 'on', 'some_label', 123, None, 1000) in impressions_called - assert mocker.call('sdk.getTreatments', 5) in telemetry_storage.inc_latency.mock_calls + impressions_called = impmanager.track.mock_calls[0][1][0] + assert (Impression('key', 'f1', 'on', 'some_label', 123, None, 1000), None) in impressions_called + assert (Impression('key', 'f2', 'on', 'some_label', 123, None, 1000), None) in impressions_called + assert mocker.call('sdk.getTreatmentsWithConfig', 5) in telemetry_storage.inc_latency.mock_calls assert client._logger.mock_calls == [] - assert mocker.call( - Impression('key', 'f1', 'on', 'some_label', 123, None, 1000), - None - ) in client._send_impression_to_listener.mock_calls - assert mocker.call( - Impression('key', 'f2', 'on', 'some_label', 123, None, 1000), - None - ) in client._send_impression_to_listener.mock_calls # Test with client not ready ready_property = mocker.PropertyMock() ready_property.return_value = False type(factory).ready = ready_property - impression_storage.put.reset_mock() + impmanager.track.reset_mock() assert client.get_treatments_with_config('some_key', ['some_feature'], {'some_attribute': 1}) == {'some_feature': ('control', None)} assert mocker.call( - [Impression('some_key', 'some_feature', 'control', Label.NOT_READY, mocker.ANY, mocker.ANY, mocker.ANY)] - ) in impression_storage.put.mock_calls + [(Impression('some_key', 'some_feature', 'control', Label.NOT_READY, mocker.ANY, mocker.ANY, mocker.ANY), {'some_attribute': 1})] + ) in impmanager.track.mock_calls # Test with exception: ready_property.return_value = True @@ -343,7 +324,8 @@ def _get_storage_mock(name): destroyed_mock = mocker.PropertyMock() type(factory).destroyed = destroyed_mock - client = Client(factory) + impmanager = mocker.Mock(spec=ImpressionManager) + client = Client(factory, impmanager, True) client.destroy() assert factory.destroy.mock_calls == [mocker.call()] assert client.destroyed is not None @@ -373,7 +355,8 @@ def _get_storage_mock(name): factory._apikey = 'test' mocker.patch('splitio.client.client.time.time', new=lambda: 1) - client = Client(factory) + impmanager = mocker.Mock(spec=ImpressionManager) + client = Client(factory, impmanager, True) assert client.track('key', 'user', 'purchase', 12) is True assert mocker.call([ EventWrapper( diff --git a/tests/client/test_factory.py b/tests/client/test_factory.py index 0f1cc4a2..96f60a42 100644 --- a/tests/client/test_factory.py +++ b/tests/client/test_factory.py @@ -4,7 +4,6 @@ import time import threading -from splitio.client.listener import ImpressionListenerWrapper from splitio.client.factory import get_factory, SplitFactory, _INSTANTIATED_FACTORIES from splitio.client.config import DEFAULT_CONFIG from splitio.storage import redis, inmemmory, uwsgi @@ -15,6 +14,7 @@ from splitio.api.impressions import ImpressionsAPI from splitio.api.events import EventsAPI from splitio.api.telemetry import TelemetryAPI +from splitio.engine.impressions import Manager as ImpressionsManager class SplitFactoryTests(object): @@ -162,7 +162,7 @@ def test_redis_client_creation(self, mocker): max_connections=999 )] assert factory._labels_enabled is False - assert isinstance(factory._impression_listener, ImpressionListenerWrapper) + assert isinstance(factory._impressions_manager, ImpressionsManager) factory.block_until_ready() time.sleep(1) # give a chance for the bg thread to set the ready status assert factory.ready @@ -180,7 +180,6 @@ def test_uwsgi_client_creation(self): assert factory._apis == {} assert factory._tasks == {} assert factory._labels_enabled is True - assert factory._impression_listener is None factory.block_until_ready() time.sleep(1) # give a chance for the bg thread to set the ready status assert factory.ready diff --git a/tests/client/test_input_validator.py b/tests/client/test_input_validator.py index e02961d7..c1911cdc 100644 --- a/tests/client/test_input_validator.py +++ b/tests/client/test_input_validator.py @@ -45,7 +45,7 @@ def _get_storage_mock(storage): factory_destroyed.return_value = False type(factory_mock).destroyed = factory_destroyed - client = Client(factory_mock) + client = Client(factory_mock, mocker.Mock()) client._logger = mocker.Mock() mocker.patch('splitio.client.input_validator._LOGGER', new=client._logger) @@ -273,7 +273,7 @@ def _get_storage_mock(storage): factory_destroyed.return_value = False type(factory_mock).destroyed = factory_destroyed - client = Client(factory_mock) + client = Client(factory_mock, mocker.Mock()) client._logger = mocker.Mock() mocker.patch('splitio.client.input_validator._LOGGER', new=client._logger) @@ -472,7 +472,7 @@ def _get_storage_mock(storage): ] def test_valid_properties(self, mocker): - """Test valid_properties() method""" + """Test valid_properties() method.""" assert input_validator.valid_properties(None) == (True, None, 1024) assert input_validator.valid_properties([]) == (False, None, 0) assert input_validator.valid_properties(True) == (False, None, 0) @@ -525,7 +525,7 @@ def test_track(self, mocker): type(factory_mock).destroyed = factory_destroyed factory_mock._apikey = 'some-test' - client = Client(factory_mock) + client = Client(factory_mock, mocker.Mock()) client._events_storage = mocker.Mock(spec=EventStorage) client._events_storage.put.return_value = True client._logger = mocker.Mock() @@ -795,7 +795,7 @@ def _get_storage_mock(storage): factory_destroyed.return_value = False type(factory_mock).destroyed = factory_destroyed - client = Client(factory_mock) + client = Client(factory_mock, mocker.Mock()) client._logger = mocker.Mock() mocker.patch('splitio.client.input_validator._LOGGER', new=client._logger) @@ -921,7 +921,7 @@ def _configs(treatment): return '{"some": "property"}' if treatment == 'default_treatment' else None split_mock.get_configurations_for.side_effect = _configs - client = Client(factory_mock) + client = Client(factory_mock, mocker.Mock()) client._logger = mocker.Mock() mocker.patch('splitio.client.input_validator._LOGGER', new=client._logger) diff --git a/tests/engine/test_impmanager.py b/tests/engine/test_impressions.py similarity index 84% rename from tests/engine/test_impmanager.py rename to tests/engine/test_impressions.py index 46d0b458..9d2e38f2 100644 --- a/tests/engine/test_impmanager.py +++ b/tests/engine/test_impressions.py @@ -1,6 +1,6 @@ """Impression manager, observer & hasher tests.""" from datetime import datetime -from splitio.engine.impmanager import Hasher, Observer, Counter, Manager, \ +from splitio.engine.impressions import Hasher, Observer, Counter, Manager, \ ImpressionsMode, truncate_time from splitio.models.impressions import Impression from splitio.client.listener import ImpressionListenerWrapper @@ -108,18 +108,18 @@ def forwarder(incoming): assert manager._forwarder is forwarder # An impression that hasn't happened in the last hour (pt = None) should be tracked - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), - Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), + (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking the same impression a ms later should call the forwarder function - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking a in impression with a different key makes it to the queue - manager.track([Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)]) + manager.track([(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)] @@ -130,8 +130,8 @@ def forwarder(incoming): utc_time_mock.return_value = utc_now # Track the same impressions but "one hour later" - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), - Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1), @@ -167,19 +167,19 @@ def forwarder(incoming): assert manager._forwarder is forwarder # An impression that hasn't happened in the last hour (pt = None) should be tracked - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), - Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), + (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking the same impression a ms later should call the forwarder function - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2, utc_now-3)] # Tracking a in impression with a different key makes it to the queue - manager.track([Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)]) + manager.track([(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2, utc_now-3), @@ -191,8 +191,8 @@ def forwarder(incoming): utc_time_mock.return_value = utc_now # Track the same impressions but "one hour later" - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), - Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2, old_utc-3), @@ -222,19 +222,19 @@ def forwarder(incoming): assert manager._forwarder is forwarder # An impression that hasn't happened in the last hour (pt = None) should be tracked - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), - Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), + (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking the same impression a ms later should call the forwarder function - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)] # Tracking a in impression with a different key makes it to the queue - manager.track([Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)]) + manager.track([(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), @@ -246,8 +246,8 @@ def forwarder(incoming): utc_time_mock.return_value = utc_now # Track the same impressions but "one hour later" - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), - Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2), @@ -275,19 +275,19 @@ def forwarder(incoming): assert manager._forwarder is forwarder # An impression that hasn't happened in the last hour (pt = None) should be tracked - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), - Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), + (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking the same impression a ms later should call the forwarder function - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)] # Tracking a in impression with a different key makes it to the queue - manager.track([Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)]) + manager.track([(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), @@ -299,8 +299,8 @@ def forwarder(incoming): utc_time_mock.return_value = utc_now # Track the same impressions but "one hour later" - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), - Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2), @@ -330,18 +330,18 @@ def forwarder(incoming): assert manager._forwarder is forwarder # An impression that hasn't happened in the last hour (pt = None) should be tracked - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), - Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), + (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking the same impression a ms later should call the forwarder function - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking a in impression with a different key makes it to the queue - manager.track([Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)]) + manager.track([(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)] @@ -352,8 +352,8 @@ def forwarder(incoming): utc_time_mock.return_value = utc_now # Track the same impressions but "one hour later" - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), - Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1), @@ -370,12 +370,12 @@ def forwarder(incoming): ]) assert listener.log_impression.mock_calls == [ - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3)), - mocker.call(Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3)), - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2, old_utc-3)), - mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1)), - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1, old_utc-3)), - mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2, old_utc-1)) + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), None), + mocker.call(Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), None), + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2, old_utc-3), None), + mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1), None), + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1, old_utc-3), None), + mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2, old_utc-1), None) ] @@ -400,19 +400,19 @@ def forwarder(incoming): assert manager._forwarder is forwarder # An impression that hasn't happened in the last hour (pt = None) should be tracked - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), - Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), + (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking the same impression a ms later should call the forwarder function - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2, utc_now-3)] # Tracking a in impression with a different key makes it to the queue - manager.track([Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)]) + manager.track([(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2, utc_now-3), @@ -424,8 +424,8 @@ def forwarder(incoming): utc_time_mock.return_value = utc_now # Track the same impressions but "one hour later" - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), - Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2, old_utc-3), @@ -436,12 +436,12 @@ def forwarder(incoming): assert len(manager._observer._cache._data) == 3 # distinct impressions seen assert listener.log_impression.mock_calls == [ - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3)), - mocker.call(Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3)), - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2, old_utc-3)), - mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1)), - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1, old_utc-3)), - mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2, old_utc-1)) + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), None), + mocker.call(Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), None), + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2, old_utc-3), None), + mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1), None), + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1, old_utc-3), None), + mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2, old_utc-1), None) ] @@ -466,19 +466,19 @@ def forwarder(incoming): assert manager._forwarder is forwarder # An impression that hasn't happened in the last hour (pt = None) should be tracked - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), - Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), + (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking the same impression a ms later should call the forwarder function - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)] # Tracking a in impression with a different key makes it to the queue - manager.track([Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)]) + manager.track([(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), @@ -490,8 +490,8 @@ def forwarder(incoming): utc_time_mock.return_value = utc_now # Track the same impressions but "one hour later" - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), - Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2), @@ -500,12 +500,12 @@ def forwarder(incoming): Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)] assert listener.log_impression.mock_calls == [ - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3)), - mocker.call(Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3)), - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2)), - mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1)), - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1)), - mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)) + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), None), + mocker.call(Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), None), + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2), None), + mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1), None), + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None) ] @@ -530,19 +530,19 @@ def forwarder(incoming): assert manager._forwarder is forwarder # An impression that hasn't happened in the last hour (pt = None) should be tracked - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), - Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), + (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking the same impression a ms later should call the forwarder function - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)] # Tracking a in impression with a different key makes it to the queue - manager.track([Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)]) + manager.track([(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), @@ -554,8 +554,8 @@ def forwarder(incoming): utc_time_mock.return_value = utc_now # Track the same impressions but "one hour later" - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), - Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2), @@ -564,11 +564,10 @@ def forwarder(incoming): Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)] assert listener.log_impression.mock_calls == [ - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3)), - mocker.call(Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3)), - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2)), - mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1)), - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1)), - mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)) + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), None), + mocker.call(Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), None), + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2), None), + mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1), None), + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None) ] - diff --git a/tests/integration/test_client_e2e.py b/tests/integration/test_client_e2e.py index 0da42da0..ba7f9eba 100644 --- a/tests/integration/test_client_e2e.py +++ b/tests/integration/test_client_e2e.py @@ -14,6 +14,7 @@ RedisSplitStorage, RedisSegmentStorage, RedisTelemetryStorage from splitio.storage.adapters.redis import RedisAdapter from splitio.models import splits, segments +from splitio.engine.impressions import Manager as ImpressionsManager class InMemoryIntegrationTests(object): @@ -40,13 +41,15 @@ def setup_method(self): data = json.loads(flo.read()) segment_storage.put(segments.from_raw(data)) - self.factory = SplitFactory('some_api_key', { #pylint:disable=attribute-defined-outside-init + storages = { 'splits': split_storage, 'segments': segment_storage, 'impressions': InMemoryImpressionStorage(5000), 'events': InMemoryEventStorage(5000), 'telemetry': InMemoryTelemetryStorage() - }, True) + } + impmanager = ImpressionsManager(storages['impressions'].put, 'DEBUG') + self.factory = SplitFactory('some_api_key', storages, impmanager, True) #pylint:disable=attribute-defined-outside-init def _validate_last_impressions(self, client, *to_validate): """Validate the last N impressions are present disregarding the order.""" @@ -256,6 +259,219 @@ def test_manager_methods(self): assert len(manager.splits()) == 7 +class InMemoryOptimizedIntegrationTests(object): + """Inmemory storage-based integration tests.""" + + def setup_method(self): + """Prepare storages with test data.""" + split_storage = InMemorySplitStorage() + segment_storage = InMemorySegmentStorage() + + split_fn = os.path.join(os.path.dirname(__file__), 'files', 'splitChanges.json') + with open(split_fn, 'r') as flo: + data = json.loads(flo.read()) + for split in data['splits']: + split_storage.put(splits.from_raw(split)) + + segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentEmployeesChanges.json') + with open(segment_fn, 'r') as flo: + data = json.loads(flo.read()) + segment_storage.put(segments.from_raw(data)) + + segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentHumanBeignsChanges.json') + with open(segment_fn, 'r') as flo: + data = json.loads(flo.read()) + segment_storage.put(segments.from_raw(data)) + + storages = { + 'splits': split_storage, + 'segments': segment_storage, + 'impressions': InMemoryImpressionStorage(5000), + 'events': InMemoryEventStorage(5000), + 'telemetry': InMemoryTelemetryStorage() + } + impmanager = ImpressionsManager(storages['impressions'].put, 'OPTIMIZED', standalone=True) + self.factory = SplitFactory('some_api_key', storages, impmanager, True) #pylint:disable=attribute-defined-outside-init + + def _validate_last_impressions(self, client, *to_validate): + """Validate the last N impressions are present disregarding the order.""" + imp_storage = client._factory._get_storage('impressions') + impressions = imp_storage.pop_many(len(to_validate)) + as_tup_set = set((i.feature_name, i.matching_key, i.treatment) for i in impressions) + assert as_tup_set == set(to_validate) + + def test_get_treatment(self): + """Test client.get_treatment().""" + client = self.factory.client() + + assert client.get_treatment('user1', 'sample_feature') == 'on' + self._validate_last_impressions(client, ('sample_feature', 'user1', 'on')) + client.get_treatment('user1', 'sample_feature') + client.get_treatment('user1', 'sample_feature') + client.get_treatment('user1', 'sample_feature') + + # Only one impression was added, and popped when validating, the rest were ignored + assert self.factory._storages['impressions']._impressions.qsize() == 0 + + assert client.get_treatment('invalidKey', 'sample_feature') == 'off' + self._validate_last_impressions(client, ('sample_feature', 'invalidKey', 'off')) + + assert client.get_treatment('invalidKey', 'invalid_feature') == 'control' + self._validate_last_impressions(client) # No impressions should be present + + # testing a killed feature. No matter what the key, must return default treatment + assert client.get_treatment('invalidKey', 'killed_feature') == 'defTreatment' + self._validate_last_impressions(client, ('killed_feature', 'invalidKey', 'defTreatment')) + + # testing ALL matcher + assert client.get_treatment('invalidKey', 'all_feature') == 'on' + self._validate_last_impressions(client, ('all_feature', 'invalidKey', 'on')) + + # testing WHITELIST matcher + assert client.get_treatment('whitelisted_user', 'whitelist_feature') == 'on' + self._validate_last_impressions(client, ('whitelist_feature', 'whitelisted_user', 'on')) + assert client.get_treatment('unwhitelisted_user', 'whitelist_feature') == 'off' + self._validate_last_impressions(client, ('whitelist_feature', 'unwhitelisted_user', 'off')) + + # testing INVALID matcher + assert client.get_treatment('some_user_key', 'invalid_matcher_feature') == 'control' + self._validate_last_impressions(client) # No impressions should be present + + # testing Dependency matcher + assert client.get_treatment('somekey', 'dependency_test') == 'off' + self._validate_last_impressions(client, ('dependency_test', 'somekey', 'off')) + + # testing boolean matcher + assert client.get_treatment('True', 'boolean_test') == 'on' + self._validate_last_impressions(client, ('boolean_test', 'True', 'on')) + + # testing regex matcher + assert client.get_treatment('abc4', 'regex_test') == 'on' + self._validate_last_impressions(client, ('regex_test', 'abc4', 'on')) + + def test_get_treatments(self): + """Test client.get_treatments().""" + client = self.factory.client() + + result = client.get_treatments('user1', ['sample_feature']) + assert len(result) == 1 + assert result['sample_feature'] == 'on' + self._validate_last_impressions(client, ('sample_feature', 'user1', 'on')) + + result = client.get_treatments('invalidKey', ['sample_feature']) + assert len(result) == 1 + assert result['sample_feature'] == 'off' + self._validate_last_impressions(client, ('sample_feature', 'invalidKey', 'off')) + + result = client.get_treatments('invalidKey', ['invalid_feature']) + assert len(result) == 1 + assert result['invalid_feature'] == 'control' + self._validate_last_impressions(client) + + # testing a killed feature. No matter what the key, must return default treatment + result = client.get_treatments('invalidKey', ['killed_feature']) + assert len(result) == 1 + assert result['killed_feature'] == 'defTreatment' + self._validate_last_impressions(client, ('killed_feature', 'invalidKey', 'defTreatment')) + + # testing ALL matcher + result = client.get_treatments('invalidKey', ['all_feature']) + assert len(result) == 1 + assert result['all_feature'] == 'on' + self._validate_last_impressions(client, ('all_feature', 'invalidKey', 'on')) + + # testing multiple splitNames + result = client.get_treatments('invalidKey', [ + 'all_feature', + 'killed_feature', + 'invalid_feature', + 'sample_feature' + ]) + assert len(result) == 4 + assert result['all_feature'] == 'on' + assert result['killed_feature'] == 'defTreatment' + assert result['invalid_feature'] == 'control' + assert result['sample_feature'] == 'off' + assert self.factory._storages['impressions']._impressions.qsize() == 0 + + def test_get_treatments_with_config(self): + """Test client.get_treatments_with_config().""" + client = self.factory.client() + + result = client.get_treatments_with_config('user1', ['sample_feature']) + assert len(result) == 1 + assert result['sample_feature'] == ('on', '{"size":15,"test":20}') + self._validate_last_impressions(client, ('sample_feature', 'user1', 'on')) + + result = client.get_treatments_with_config('invalidKey', ['sample_feature']) + assert len(result) == 1 + assert result['sample_feature'] == ('off', None) + self._validate_last_impressions(client, ('sample_feature', 'invalidKey', 'off')) + + result = client.get_treatments_with_config('invalidKey', ['invalid_feature']) + assert len(result) == 1 + assert result['invalid_feature'] == ('control', None) + self._validate_last_impressions(client) + + # testing a killed feature. No matter what the key, must return default treatment + result = client.get_treatments_with_config('invalidKey', ['killed_feature']) + assert len(result) == 1 + assert result['killed_feature'] == ('defTreatment', '{"size":15,"defTreatment":true}') + self._validate_last_impressions(client, ('killed_feature', 'invalidKey', 'defTreatment')) + + # testing ALL matcher + result = client.get_treatments_with_config('invalidKey', ['all_feature']) + assert len(result) == 1 + assert result['all_feature'] == ('on', None) + self._validate_last_impressions(client, ('all_feature', 'invalidKey', 'on')) + + # testing multiple splitNames + result = client.get_treatments_with_config('invalidKey', [ + 'all_feature', + 'killed_feature', + 'invalid_feature', + 'sample_feature' + ]) + assert len(result) == 4 + + assert result['all_feature'] == ('on', None) + assert result['killed_feature'] == ('defTreatment', '{"size":15,"defTreatment":true}') + assert result['invalid_feature'] == ('control', None) + assert result['sample_feature'] == ('off', None) + assert self.factory._storages['impressions']._impressions.qsize() == 0 + + def test_manager_methods(self): + """Test manager.split/splits.""" + manager = self.factory.manager() + result = manager.split('all_feature') + assert result.name == 'all_feature' + assert result.traffic_type is None + assert result.killed is False + assert len(result.treatments) == 2 + assert result.change_number == 123 + assert result.configs == {} + + result = manager.split('killed_feature') + assert result.name == 'killed_feature' + assert result.traffic_type is None + assert result.killed is True + assert len(result.treatments) == 2 + assert result.change_number == 123 + assert result.configs['defTreatment'] == '{"size":15,"defTreatment":true}' + assert result.configs['off'] == '{"size":15,"test":20}' + + result = manager.split('sample_feature') + assert result.name == 'sample_feature' + assert result.traffic_type is None + assert result.killed is False + assert len(result.treatments) == 2 + assert result.change_number == 123 + assert result.configs['on'] == '{"size":15,"test":20}' + + assert len(manager.split_names()) == 7 + assert len(manager.splits()) == 7 + + class RedisIntegrationTests(object): """Redis storage-based integration tests.""" @@ -285,13 +501,15 @@ def setup_method(self): redis_client.sadd(segment_storage._get_key(data['name']), *data['added']) redis_client.set(segment_storage._get_till_key(data['name']), data['till']) - self.factory = SplitFactory('some_api_key', { #pylint:disable=attribute-defined-outside-init + storages = { 'splits': split_storage, 'segments': segment_storage, 'impressions': RedisImpressionsStorage(redis_client, metadata), 'events': RedisEventsStorage(redis_client, metadata), 'telemetry': RedisTelemetryStorage(redis_client, metadata) - }, True) + } + impmanager = ImpressionsManager(storages['impressions'].put, 'DEBUG') + self.factory = SplitFactory('some_api_key', storages, impmanager, True) #pylint:disable=attribute-defined-outside-init def _validate_last_impressions(self, client, *to_validate): """Validate the last N impressions are present disregarding the order.""" @@ -561,13 +779,15 @@ def setup_method(self): redis_client.sadd(segment_storage._get_key(data['name']), *data['added']) redis_client.set(segment_storage._get_till_key(data['name']), data['till']) - self.factory = SplitFactory('some_api_key', { #pylint:disable=attribute-defined-outside-init + storages = { 'splits': split_storage, 'segments': segment_storage, 'impressions': RedisImpressionsStorage(redis_client, metadata), 'events': RedisEventsStorage(redis_client, metadata), 'telemetry': RedisTelemetryStorage(redis_client, metadata) - }, True) + } + impmanager = ImpressionsManager(storages['impressions'].put, 'DEBUG') + self.factory = SplitFactory('some_api_key', storages, impmanager, True) #pylint:disable=attribute-defined-outside-init class LocalhostIntegrationTests(object): #pylint: disable=too-few-public-methods From a0a443d2f38e93756d00610f3ec70840417766c8 Mon Sep 17 00:00:00 2001 From: Martin Redolatti Date: Wed, 7 Oct 2020 20:14:13 -0300 Subject: [PATCH 2/4] update configs --- splitio/client/config.py | 73 +++++++++++++++++++++++++++- splitio/client/factory.py | 63 ++++++++++++------------ splitio/engine/impressions.py | 6 --- splitio/tasks/impressions_sync.py | 51 +++++-------------- splitio/version.py | 2 +- tests/client/test_config.py | 42 ++++++++++++++++ tests/integration/test_client_e2e.py | 10 ++-- tests/tasks/test_impressions_sync.py | 35 +++++++++++++ 8 files changed, 197 insertions(+), 85 deletions(-) create mode 100644 tests/client/test_config.py diff --git a/splitio/client/config.py b/splitio/client/config.py index db104c18..9a204958 100644 --- a/splitio/client/config.py +++ b/splitio/client/config.py @@ -1,15 +1,20 @@ """Default settings for the Split.IO SDK Python client.""" from __future__ import absolute_import, division, print_function, unicode_literals + import os.path +from splitio.engine.impressions import ImpressionsMode + + DEFAULT_CONFIG = { + 'operationMode': 'in-memory', 'connectionTimeout': 1500, 'splitSdkMachineName': None, 'splitSdkMachineIp': None, 'featuresRefreshRate': 5, 'segmentsRefreshRate': 60, 'metricsRefreshRate': 60, - 'impressionsRefreshRate': 10, + 'impressionsRefreshRate': 5 * 60, 'impressionsBulkSize': 5000, 'impressionsQueueSize': 10000, 'eventsPushRate': 10, @@ -47,3 +52,69 @@ 'machineIp': None, 'splitFile': os.path.join(os.path.expanduser('~'), '.split') } + + +def _parse_operation_mode(apikey, config): + """ + Process incoming config to determine operation mode. + + :param config: user supplied config + :type config: dict + + :returns: operation mode + :rtype: str + """ + if apikey == 'localhost': + return 'localhost-standalone' + + if 'redisHost' in config or 'redisSentinels' in config: + return 'redis-consumer' + + if 'uwsgiClient' in config: + return 'uwsgi-consumer' + + return 'inmemory-standalone' + +def _sanitize_impressions_mode(mode, refresh_rate=None): + """ + Check supplied impressions mode and adjust refresh rate. + + :param config: default + supplied config + :type config: dict + + :returns: config with sanitized impressions mode & refresh rate + :rtype: config + """ + if not isinstance(mode, ImpressionsMode): + try: + mode = ImpressionsMode(mode) + except ValueError: + mode = ImpressionsMode.OPTIMIZED + + if mode == ImpressionsMode.DEBUG: + refresh_rate = max(1, refresh_rate) if refresh_rate is not None else 60 + else: + refresh_rate = max(60, refresh_rate) if refresh_rate is not None else 5 * 60 + + return mode, refresh_rate + +def sanitize(apikey, config): + """ + Look for inconsistencies or ill-formed configs and tune it accordingly. + + :param apikey: customer's apikey + :type apikey: str + + :param config: DEFAULT + user supplied config + :type config: dict + + :returns: sanitized config + :rtype: dict + """ + config['operationMode'] = _parse_operation_mode(apikey, config) + processed = DEFAULT_CONFIG.copy() + processed.update(config) + imp_mode, imp_rate = _sanitize_impressions_mode(config.get('impressionsMode'), config.get('impressionsRefreshRate')) + processed['impressionsMode'] = imp_mode + processed['impressionsRefreshRate'] = imp_rate + return processed diff --git a/splitio/client/factory.py b/splitio/client/factory.py index 2e900c1a..8b13fcbb 100644 --- a/splitio/client/factory.py +++ b/splitio/client/factory.py @@ -12,7 +12,7 @@ from splitio.client.client import Client from splitio.client import input_validator from splitio.client.manager import SplitManager -from splitio.client.config import DEFAULT_CONFIG +from splitio.client.config import sanitize as sanitize_config from splitio.client import util from splitio.client.listener import ImpressionListenerWrapper from splitio.engine.impressions import Manager as ImpressionsManager @@ -38,7 +38,7 @@ # Tasks from splitio.tasks.split_sync import SplitSynchronizationTask from splitio.tasks.segment_sync import SegmentSynchronizationTask -from splitio.tasks.impressions_sync import ImpressionsSyncTask +from splitio.tasks.impressions_sync import ImpressionsSyncTask, ImpressionsCountSyncTask from splitio.tasks.events_sync import EventsSyncTask from splitio.tasks.telemetry_sync import TelemetrySynchronizationTask @@ -234,13 +234,11 @@ def _wrap_impression_listener(listener, metadata): return None -def _build_in_memory_factory(api_key, config, sdk_url=None, events_url=None): # pylint: disable=too-many-locals +def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None): # pylint: disable=too-many-locals """Build and return a split factory tailored to the supplied config.""" if not input_validator.validate_factory_instantiation(api_key): return None - cfg = DEFAULT_CONFIG.copy() - cfg.update(config) http_client = HttpClient( sdk_url=sdk_url, events_url=events_url, @@ -272,6 +270,12 @@ def _build_in_memory_factory(api_key, config, sdk_url=None, events_url=None): # segments_ready_flag = threading.Event() sdk_ready_flag = threading.Event() + imp_manager = ImpressionsManager( + storages['impressions'].put, + cfg['impressionsMode'], + True, + _wrap_impression_listener(cfg['impressionListener'], sdk_metadata)) + tasks = { 'splits': SplitSynchronizationTask( apis['splits'], @@ -295,6 +299,11 @@ def _build_in_memory_factory(api_key, config, sdk_url=None, events_url=None): # cfg['impressionsBulkSize'] ), + 'impressions_count': ImpressionsCountSyncTask( + apis['impressions'], + imp_manager + ), + 'events': EventsSyncTask( apis['events'], storages['events'], @@ -312,6 +321,7 @@ def _build_in_memory_factory(api_key, config, sdk_url=None, events_url=None): # # Start tasks that have no dependencies tasks['splits'].start() tasks['impressions'].start() + tasks['impressions_count'].start() tasks['events'].start() tasks['telemetry'].start() @@ -334,22 +344,13 @@ def segment_ready_task(): segment_completion_thread = threading.Thread(target=segment_ready_task) segment_completion_thread.setDaemon(True) segment_completion_thread.start() - return SplitFactory( - api_key, - storages, - cfg['labelsEnabled'], - ImpressionsManager(storages['impressions'].put, cfg['impressionsMode'], True, - _wrap_impression_listener(cfg['impressionListener'], sdk_metadata)), - apis, - tasks, - sdk_ready_flag, - ) + return SplitFactory(api_key, storages, cfg['labelsEnabled'], + imp_manager, apis, tasks, sdk_ready_flag) -def _build_redis_factory(api_key, config): + +def _build_redis_factory(api_key, cfg): """Build and return a split factory with redis-based storage.""" - cfg = DEFAULT_CONFIG.copy() - cfg.update(config) sdk_metadata = util.get_metadata(cfg) redis_adapter = redis.build(cfg) cache_enabled = cfg.get('redisLocalCacheEnabled', False) @@ -370,10 +371,8 @@ def _build_redis_factory(api_key, config): ) -def _build_uwsgi_factory(api_key, config): +def _build_uwsgi_factory(api_key, cfg): """Build and return a split factory with redis-based storage.""" - cfg = DEFAULT_CONFIG.copy() - cfg.update(config) sdk_metadata = util.get_metadata(cfg) uwsgi_adapter = get_uwsgi() storages = { @@ -392,10 +391,8 @@ def _build_uwsgi_factory(api_key, config): ) -def _build_localhost_factory(config): +def _build_localhost_factory(cfg): """Build and return a localhost factory for testing/development purposes.""" - cfg = DEFAULT_CONFIG.copy() - cfg.update(config) storages = { 'splits': InMemorySplitStorage(), 'segments': InMemorySegmentStorage(), # not used, just to avoid possible future errors. @@ -413,12 +410,12 @@ def _build_localhost_factory(config): )} tasks['splits'].start() return SplitFactory( - 'localhost', - storages, - False, + 'localhost', + storages, + False, ImpressionsManager(storages['impressions'].put, cfg['impressionsMode'], True, None), - None, - tasks, + None, + tasks, ready_event ) @@ -444,15 +441,15 @@ def get_factory(api_key, **kwargs): "(Singleton pattern) and reusing it throughout your application." ) - config = kwargs.get('config', {}) + config = sanitize_config(api_key, kwargs.get('config', {})) - if api_key == 'localhost': + if config['operationMode'] == 'localhost-standalone': return _build_localhost_factory(config) - if 'redisHost' in config or 'redisSentinels' in config: + if config['operationMode'] == 'redis-consumer': return _build_redis_factory(api_key, config) - if 'uwsgiClient' in config: + if config['operationMode'] == 'uwsgi-consumer': return _build_uwsgi_factory(api_key, config) return _build_in_memory_factory( diff --git a/splitio/engine/impressions.py b/splitio/engine/impressions.py index 0aabdeba..b9b41783 100644 --- a/splitio/engine/impressions.py +++ b/splitio/engine/impressions.py @@ -173,12 +173,6 @@ def __init__(self, forwarder, mode=ImpressionsMode.OPTIMIZED, standalone=True, l :param listener: Optional impressions listener that will capture all seen impressions. :type listener: splitio.client.listener.ImpressionListenerWrapper """ - if not isinstance(mode, ImpressionsMode): - try: - mode = ImpressionsMode(mode) - except ValueError: - mode = ImpressionsMode.OPTIMIZED - self._forwarder = forwarder self._observer = Observer(_IMPRESSION_OBSERVER_CACHE_SIZE) if standalone else None self._counter = Counter() if standalone and mode == ImpressionsMode.OPTIMIZED else None diff --git a/splitio/tasks/impressions_sync.py b/splitio/tasks/impressions_sync.py index 4e372ef1..ed998059 100644 --- a/splitio/tasks/impressions_sync.py +++ b/splitio/tasks/impressions_sync.py @@ -9,7 +9,6 @@ from splitio.api import APIException from splitio.tasks import BaseSynchronizationTask from splitio.tasks.util.asynctask import AsyncTask -from splitio.engine.impressions import Manager as ImpressionsManager class ImpressionsSyncTask(BaseSynchronizationTask): @@ -105,60 +104,34 @@ def flush(self): class ImpressionsCountSyncTask(BaseSynchronizationTask): """Impressions synchronization task uses an asynctask.AsyncTask to send impressions.""" - def __init__(self, impressions_manager): + _PERIOD = 30 * 60 # 30 minutes + + def __init__(self, impressions_api, impressions_manager): """ Class constructor. + :param impressions_api: Impressions Api object to send data to the backend + :type impressions_api: splitio.api.impressions.ImpressionsAPI + :param impressions_manager: Impressions manager instance - :type impressions_manager: ImpressionsManager + :type impressions_manager: splitio.engine.impressions.Manager """ self._logger = logging.getLogger(self.__class__.__name__) + self._impressions_api = impressions_api self._impressions_manager = impressions_manager - self._task = AsyncTask(self._send_impressions, self._period, on_stop=self._send_impressions) + self._task = AsyncTask(self._send_counters, self._PERIOD, on_stop=self._send_counters) - def _get_failed(self): - """Return up to impressions stored in the failed impressions queue.""" - imps = [] - count = 0 - while count < self._bulk_size: - try: - imps.append(self._failed.get(False)) - count += 1 - except queue.Empty: - # If no more items in queue, break the loop - break - return imps - - def _add_to_failed_queue(self, imps): - """ - Add impressions that were about to be sent to a secondary queue for failed sends. - - :param imps: List of impressions that failed to be pushed. - :type imps: list - """ - for impression in imps: - self._failed.put(impression, False) - - def _send_impressions(self): + def _send_counters(self): """Send impressions from both the failed and new queues.""" - to_send = self._get_failed() - if len(to_send) < self._bulk_size: - # If the amount of previously failed items is less than the bulk - # size, try to complete with new impressions from storage - to_send.extend(self._storage.pop_many(self._bulk_size - len(to_send))) - - if not to_send: - return - + to_send = self._impressions_manager.get_counts() try: - self._impressions_api.flush_impressions(to_send) + self._impressions_api.flush_counters(to_send) except APIException as exc: self._logger.error( 'Exception raised while reporting impressions: %s -- %d', exc.message, exc.status_code ) - self._add_to_failed_queue(to_send) def start(self): """Start executing the impressions synchronization task.""" diff --git a/splitio/version.py b/splitio/version.py index 7b2b8ec7..81104b12 100644 --- a/splitio/version.py +++ b/splitio/version.py @@ -1 +1 @@ -__version__ = '8.2.1' +__version__ = '8.3.0-rc1' diff --git a/tests/client/test_config.py b/tests/client/test_config.py new file mode 100644 index 00000000..37adf9eb --- /dev/null +++ b/tests/client/test_config.py @@ -0,0 +1,42 @@ +"""Configuration unit tests.""" +#pylint: disable=protected-access,no-self-use,line-too-long + +from splitio.client import config +from splitio.engine.impressions import ImpressionsMode + + +class ConfigSanitizationTests(object): + """Inmemory storage-based integration tests.""" + + def test_parse_operation_mode(self): + """Make sure operation mode is correctly captured.""" + assert config._parse_operation_mode('some', {}) == 'inmemory-standalone' + assert config._parse_operation_mode('localhost', {}) == 'localhost-standalone' + assert config._parse_operation_mode('some', {'redisHost': 'x'}) == 'redis-consumer' + assert config._parse_operation_mode('some', {'uwsgiClient': True}) == 'uwsgi-consumer' + + def test_sanitize_imp_mode(self): + """Test sanitization of impressions mode.""" + mode, rate = config._sanitize_impressions_mode('OPTIMIZED', 1) + assert mode == ImpressionsMode.OPTIMIZED + assert rate == 60 + + mode, rate = config._sanitize_impressions_mode('DEBUG', 1) + assert mode == ImpressionsMode.DEBUG + assert rate == 1 + + mode, rate = config._sanitize_impressions_mode('ANYTHING', 200) + assert mode == ImpressionsMode.OPTIMIZED + assert rate == 200 + + mode, rate = config._sanitize_impressions_mode(43, -1) + assert mode == ImpressionsMode.OPTIMIZED + assert rate == 60 + + mode, rate = config._sanitize_impressions_mode('OPTIMIZED') + assert mode == ImpressionsMode.OPTIMIZED + assert rate == 300 + + mode, rate = config._sanitize_impressions_mode('DEBUG') + assert mode == ImpressionsMode.DEBUG + assert rate == 60 diff --git a/tests/integration/test_client_e2e.py b/tests/integration/test_client_e2e.py index ba7f9eba..8ddfa411 100644 --- a/tests/integration/test_client_e2e.py +++ b/tests/integration/test_client_e2e.py @@ -14,7 +14,7 @@ RedisSplitStorage, RedisSegmentStorage, RedisTelemetryStorage from splitio.storage.adapters.redis import RedisAdapter from splitio.models import splits, segments -from splitio.engine.impressions import Manager as ImpressionsManager +from splitio.engine.impressions import Manager as ImpressionsManager, ImpressionsMode class InMemoryIntegrationTests(object): @@ -48,7 +48,7 @@ def setup_method(self): 'events': InMemoryEventStorage(5000), 'telemetry': InMemoryTelemetryStorage() } - impmanager = ImpressionsManager(storages['impressions'].put, 'DEBUG') + impmanager = ImpressionsManager(storages['impressions'].put, ImpressionsMode.DEBUG) self.factory = SplitFactory('some_api_key', storages, impmanager, True) #pylint:disable=attribute-defined-outside-init def _validate_last_impressions(self, client, *to_validate): @@ -290,7 +290,7 @@ def setup_method(self): 'events': InMemoryEventStorage(5000), 'telemetry': InMemoryTelemetryStorage() } - impmanager = ImpressionsManager(storages['impressions'].put, 'OPTIMIZED', standalone=True) + impmanager = ImpressionsManager(storages['impressions'].put, ImpressionsMode.OPTIMIZED, standalone=True) self.factory = SplitFactory('some_api_key', storages, impmanager, True) #pylint:disable=attribute-defined-outside-init def _validate_last_impressions(self, client, *to_validate): @@ -508,7 +508,7 @@ def setup_method(self): 'events': RedisEventsStorage(redis_client, metadata), 'telemetry': RedisTelemetryStorage(redis_client, metadata) } - impmanager = ImpressionsManager(storages['impressions'].put, 'DEBUG') + impmanager = ImpressionsManager(storages['impressions'].put, ImpressionsMode.DEBUG) self.factory = SplitFactory('some_api_key', storages, impmanager, True) #pylint:disable=attribute-defined-outside-init def _validate_last_impressions(self, client, *to_validate): @@ -786,7 +786,7 @@ def setup_method(self): 'events': RedisEventsStorage(redis_client, metadata), 'telemetry': RedisTelemetryStorage(redis_client, metadata) } - impmanager = ImpressionsManager(storages['impressions'].put, 'DEBUG') + impmanager = ImpressionsManager(storages['impressions'].put, ImpressionsMode.DEBUG) self.factory = SplitFactory('some_api_key', storages, impmanager, True) #pylint:disable=attribute-defined-outside-init diff --git a/tests/tasks/test_impressions_sync.py b/tests/tasks/test_impressions_sync.py index 4851abf9..a4c27b9e 100644 --- a/tests/tasks/test_impressions_sync.py +++ b/tests/tasks/test_impressions_sync.py @@ -7,6 +7,9 @@ from splitio.storage import ImpressionStorage from splitio.models.impressions import Impression from splitio.api.impressions import ImpressionsAPI +from splitio.engine.impressions import Manager as ImpressionsManager +from splitio.engine.impressions import Counter + class ImpressionsSyncTests(object): """Impressions Syncrhonization task test cases.""" @@ -36,3 +39,35 @@ def test_normal_operation(self, mocker): stop_event.wait(5) assert stop_event.is_set() assert len(api.flush_impressions.mock_calls) > calls_now + + +class ImpressionsCountSyncTests(object): + """Impressions Syncrhonization task test cases.""" + + def test_normal_operation(self, mocker): + """Test that the task works properly under normal circumstances.""" + manager = mocker.Mock(spec=ImpressionsManager) + + counters = [ + Counter.CountPerFeature('f1', 123, 2), + Counter.CountPerFeature('f2', 123, 123), + Counter.CountPerFeature('f1', 456, 111), + Counter.CountPerFeature('f2', 456, 222) + ] + + manager.get_counts.return_value = counters + api = mocker.Mock(spec=ImpressionsAPI) + api.flush_counters.return_value = HttpResponse(200, '') + impressions_sync.ImpressionsCountSyncTask._PERIOD = 1 + task = impressions_sync.ImpressionsCountSyncTask(api, manager) + task.start() + time.sleep(2) + assert task.is_running() + assert manager.get_counts.mock_calls[0] == mocker.call() + assert api.flush_counters.mock_calls[0] == mocker.call(counters) + stop_event = threading.Event() + calls_now = len(api.flush_counters.mock_calls) + task.stop(stop_event) + stop_event.wait(5) + assert stop_event.is_set() + assert len(api.flush_counters.mock_calls) > calls_now From 19fc89ccb5e4ba28a96431598ac08648477fa534 Mon Sep 17 00:00:00 2001 From: Martin Redolatti Date: Thu, 8 Oct 2020 12:13:20 -0300 Subject: [PATCH 3/4] fix remaining tests --- splitio/api/impressions.py | 9 ++++++--- splitio/client/client.py | 9 +++++---- splitio/client/factory.py | 6 +++--- splitio/engine/impressions.py | 10 +++++----- splitio/tasks/events_sync.py | 9 +++------ splitio/tasks/impressions_sync.py | 23 ++++++++++------------- splitio/tasks/segment_sync.py | 3 ++- splitio/tasks/split_sync.py | 3 ++- splitio/tasks/telemetry_sync.py | 3 +++ splitio/tasks/util/asynctask.py | 12 +++++++----- splitio/tasks/uwsgi_wrappers.py | 9 ++++----- tests/api/test_impressions_api.py | 25 ++++++++++++++----------- tests/client/test_client.py | 10 +++++----- tests/client/test_factory.py | 2 +- tests/integration/test_client_e2e.py | 8 ++++---- 15 files changed, 74 insertions(+), 67 deletions(-) diff --git a/splitio/api/impressions.py b/splitio/api/impressions.py index 36ce5b4b..517b8f86 100644 --- a/splitio/api/impressions.py +++ b/splitio/api/impressions.py @@ -7,12 +7,13 @@ from splitio.api import APIException, headers_from_metadata from splitio.api.client import HttpClientException +from splitio.engine.impressions import ImpressionsMode class ImpressionsAPI(object): # pylint: disable=too-few-public-methods """Class that uses an httpClient to communicate with the impressions API.""" - def __init__(self, client, apikey, sdk_metadata): + def __init__(self, client, apikey, sdk_metadata, mode=ImpressionsMode.OPTIMIZED): """ Class constructor. @@ -25,6 +26,7 @@ def __init__(self, client, apikey, sdk_metadata): self._client = client self._apikey = apikey self._metadata = headers_from_metadata(sdk_metadata) + self._metadata['SplitSDKImpressionsMode'] = mode.name @staticmethod def _build_bulk(impressions): @@ -47,7 +49,8 @@ def _build_bulk(impressions): 'm': impression.time, 'c': impression.change_number, 'r': impression.label, - 'b': impression.bucketing_key + 'b': impression.bucketing_key, + 'pt': impression.previous_time } for impression in imps ] @@ -74,7 +77,7 @@ def _build_counters(counters): { 'f': pf_count.feature, 'm': pf_count.timeframe, - 'c': pf_count.count + 'rc': pf_count.count } for pf_count in counters ] } diff --git a/splitio/client/client.py b/splitio/client/client.py index 3151114f..6ebf1cb4 100644 --- a/splitio/client/client.py +++ b/splitio/client/client.py @@ -11,6 +11,7 @@ from splitio.models.events import Event, EventWrapper from splitio.models.telemetry import get_latency_bucket_index from splitio.client import input_validator +from splitio.util import utctime_ms class Client(object): # pylint: disable=too-many-instance-attributes @@ -115,7 +116,7 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name): result['impression']['label'], result['impression']['change_number'], bucketing_key, - start + utctime_ms(), ) self._record_stats([(impression, attributes)], start, metric_name) @@ -131,7 +132,7 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name): Label.EXCEPTION, self._split_storage.get_change_number(), bucketing_key, - start + utctime_ms(), ) self._record_stats([(impression, attributes)], start, metric_name) except Exception: # pylint: disable=broad-except @@ -178,7 +179,7 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name) result['impression']['label'], result['impression']['change_number'], bucketing_key, - start) + utctime_ms()) bulk_impressions.append(impression) treatments[feature] = (result['treatment'], result['configurations']) @@ -389,7 +390,7 @@ def track(self, key, traffic_type, event_type, value=None, properties=None): traffic_type_name=traffic_type, event_type_id=event_type, value=value, - timestamp=int(time.time()*1000), + timestamp=utctime_ms(), properties=properties, ) return self._events_storage.put([EventWrapper( diff --git a/splitio/client/factory.py b/splitio/client/factory.py index 8b13fcbb..fd0e1172 100644 --- a/splitio/client/factory.py +++ b/splitio/client/factory.py @@ -74,7 +74,7 @@ def __init__( # pylint: disable=too-many-arguments apikey, storages, labels_enabled, - impressions_manager=None, + impressions_manager, apis=None, tasks=None, sdk_ready_flag=None, @@ -139,7 +139,7 @@ def client(self): This client is only a set of references to structures hold by the factory. Creating one a fast operation and safe to be used anywhere. """ - return Client(self, self._labels_enabled, self._impressions_manager) + return Client(self, self._impressions_manager, self._labels_enabled) def manager(self): """ @@ -249,7 +249,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None): # py apis = { 'splits': SplitsAPI(http_client, api_key), 'segments': SegmentsAPI(http_client, api_key), - 'impressions': ImpressionsAPI(http_client, api_key, sdk_metadata), + 'impressions': ImpressionsAPI(http_client, api_key, sdk_metadata, cfg['impressionsMode']), 'events': EventsAPI(http_client, api_key, sdk_metadata), 'telemetry': TelemetryAPI(http_client, api_key, sdk_metadata) } diff --git a/splitio/engine/impressions.py b/splitio/engine/impressions.py index b9b41783..99bc757c 100644 --- a/splitio/engine/impressions.py +++ b/splitio/engine/impressions.py @@ -65,11 +65,11 @@ def _stringify(self, impression): :returns: a string representation of the impression :rtype: str """ - return self._PATTERN % (impression.matching_key, - impression.feature_name, - impression.treatment, - impression.label, - impression.change_number) + return self._PATTERN % (impression.matching_key if impression.matching_key else 'UNKNOWN', + impression.feature_name if impression.feature_name else 'UNKNOWN', + impression.treatment if impression.treatment else 'UNKNOWN', + impression.label if impression.label else 'UNKNOWN', + impression.change_number if impression.change_number else 0) def process(self, impression): """ diff --git a/splitio/tasks/events_sync.py b/splitio/tasks/events_sync.py index a938518d..e0402a2d 100644 --- a/splitio/tasks/events_sync.py +++ b/splitio/tasks/events_sync.py @@ -70,12 +70,9 @@ def _send_events(self): try: self._events_api.flush_events(to_send) - except APIException as exc: - self._logger.error( - 'Exception raised while reporting events: %s -- %d', - exc.message, - exc.status_code - ) + except APIException: + self._logger.error('Exception raised while reporting events') + self._logger.debug('Exception information: ', exc_info=True) self._add_to_failed_queue(to_send) def start(self): diff --git a/splitio/tasks/impressions_sync.py b/splitio/tasks/impressions_sync.py index ed998059..ada9500d 100644 --- a/splitio/tasks/impressions_sync.py +++ b/splitio/tasks/impressions_sync.py @@ -71,12 +71,9 @@ def _send_impressions(self): try: self._impressions_api.flush_impressions(to_send) - except APIException as exc: - self._logger.error( - 'Exception raised while reporting impressions: %s -- %d', - exc.message, - exc.status_code - ) + except APIException: + self._logger.error('Exception raised while reporting impressions') + self._logger.debug('Exception information: ', exc_info=True) self._add_to_failed_queue(to_send) def start(self): @@ -104,7 +101,7 @@ def flush(self): class ImpressionsCountSyncTask(BaseSynchronizationTask): """Impressions synchronization task uses an asynctask.AsyncTask to send impressions.""" - _PERIOD = 30 * 60 # 30 minutes + _PERIOD = 5 # 30 * 60 # 30 minutes def __init__(self, impressions_api, impressions_manager): """ @@ -124,14 +121,14 @@ def __init__(self, impressions_api, impressions_manager): def _send_counters(self): """Send impressions from both the failed and new queues.""" to_send = self._impressions_manager.get_counts() + if not to_send: + return + try: self._impressions_api.flush_counters(to_send) - except APIException as exc: - self._logger.error( - 'Exception raised while reporting impressions: %s -- %d', - exc.message, - exc.status_code - ) + except APIException: + self._logger.error('Exception raised while reporting impression counts') + self._logger.debug('Exception information: ', exc_info=True) def start(self): """Start executing the impressions synchronization task.""" diff --git a/splitio/tasks/segment_sync.py b/splitio/tasks/segment_sync.py index 2fa60b0f..93089b1a 100644 --- a/splitio/tasks/segment_sync.py +++ b/splitio/tasks/segment_sync.py @@ -46,7 +46,8 @@ def _update_segment(self, segment_name): try: segment_changes = self._segment_api.fetch_segment(segment_name, since) except APIException: - self._logger.error('Error fetching segments') + self._logger.error('Exception raised while fetching segment %s', segment_name) + self._logger.debug('Exception information: ', exc_info=True) return False if since == -1: # first time fetching the segment diff --git a/splitio/tasks/split_sync.py b/splitio/tasks/split_sync.py index b6d9cbed..a122913e 100644 --- a/splitio/tasks/split_sync.py +++ b/splitio/tasks/split_sync.py @@ -42,7 +42,8 @@ def _update_splits(self): try: split_changes = self._api.fetch_splits(till) except APIException: - self._logger.error('Failed to fetch split from servers') + self._logger.error('Exception raised while fetching splits') + self._logger.debug('Exception information: ', exc_info=True) return False for split in split_changes.get('splits', []): diff --git a/splitio/tasks/telemetry_sync.py b/splitio/tasks/telemetry_sync.py index 96e084ef..0f5cf9cd 100644 --- a/splitio/tasks/telemetry_sync.py +++ b/splitio/tasks/telemetry_sync.py @@ -37,6 +37,7 @@ def _flush_telemetry(self): self._api.flush_latencies(latencies) except APIException: self._logger.error('Failed send telemetry/latencies to split BE.') + self._logger.debug('Exception information: ', exc_info=True) try: counters = self._storage.pop_counters() @@ -44,6 +45,7 @@ def _flush_telemetry(self): self._api.flush_counters(counters) except APIException: self._logger.error('Failed send telemetry/counters to split BE.') + self._logger.debug('Exception information: ', exc_info=True) try: gauges = self._storage.pop_gauges() @@ -51,6 +53,7 @@ def _flush_telemetry(self): self._api.flush_gauges(gauges) except APIException: self._logger.error('Failed send telemetry/gauges to split BE.') + self._logger.debug('Exception information: ', exc_info=True) def start(self): """Start the task.""" diff --git a/splitio/tasks/util/asynctask.py b/splitio/tasks/util/asynctask.py index 88fecc00..91aeeb12 100644 --- a/splitio/tasks/util/asynctask.py +++ b/splitio/tasks/util/asynctask.py @@ -102,11 +102,13 @@ def _execution_wrapper(self): except queue.Empty: # If no message was received, the timeout has expired # and we're ready for a new execution - if not _safe_run(self._main): - _LOGGER.error( - "An error occurred when executing the task. " - "Retrying after perio expires" - ) + pass + + if not _safe_run(self._main): + _LOGGER.error( + "An error occurred when executing the task. " + "Retrying after perio expires" + ) finally: self._cleanup() diff --git a/splitio/tasks/uwsgi_wrappers.py b/splitio/tasks/uwsgi_wrappers.py index 73d04120..d6aa774a 100644 --- a/splitio/tasks/uwsgi_wrappers.py +++ b/splitio/tasks/uwsgi_wrappers.py @@ -3,7 +3,7 @@ import logging import time -from splitio.client.config import DEFAULT_CONFIG +from splitio.client.config import sanitize as sanitize_config from splitio.client.util import get_metadata from splitio.storage.adapters.uwsgi_cache import get_uwsgi from splitio.storage.uwsgi import UWSGIEventStorage, UWSGIImpressionStorage, \ @@ -34,9 +34,7 @@ def _get_config(user_config): :return: Calculated configuration. :rtype: dict """ - sdk_config = DEFAULT_CONFIG.copy() - sdk_config.update(user_config) - return sdk_config + return sanitize_config(user_config['apikey'], user_config) def uwsgi_update_splits(user_config): @@ -113,7 +111,8 @@ def uwsgi_report_impressions(user_config): ImpressionsAPI( HttpClient(1500, config.get('sdk_url'), config.get('events_url')), config['apikey'], - metadata + metadata, + config['impressionsMode'] ), storage, None, # Period not needed. Task is being triggered manually. diff --git a/tests/api/test_impressions_api.py b/tests/api/test_impressions_api.py index 3428fdfc..54d64b1a 100644 --- a/tests/api/test_impressions_api.py +++ b/tests/api/test_impressions_api.py @@ -3,7 +3,7 @@ import pytest from splitio.api import impressions, client, APIException from splitio.models.impressions import Impression -from splitio.engine.impressions import Counter +from splitio.engine.impressions import Counter, ImpressionsMode from splitio.client.util import get_metadata from splitio.client.config import DEFAULT_CONFIG from splitio.version import __version__ @@ -19,13 +19,13 @@ class ImpressionsAPITests(object): expectedImpressions = [{ 'f': 'f1', 'i': [ - {'k': 'k1', 'b': 'b1', 't': 'on', 'r': 'l1', 'm': 321654, 'c': 123456}, - {'k': 'k3', 'b': 'b1', 't': 'on', 'r': 'l1', 'm': 321654, 'c': 123456}, + {'k': 'k1', 'b': 'b1', 't': 'on', 'r': 'l1', 'm': 321654, 'c': 123456, 'pt': None}, + {'k': 'k3', 'b': 'b1', 't': 'on', 'r': 'l1', 'm': 321654, 'c': 123456, 'pt': None}, ], }, { 'f': 'f2', 'i': [ - {'k': 'k2', 'b': 'b1', 't': 'off', 'r': 'l1', 'm': 321654, 'c': 123456}, + {'k': 'k2', 'b': 'b1', 't': 'off', 'r': 'l1', 'm': 321654, 'c': 123456, 'pt': None}, ] }] @@ -38,10 +38,10 @@ class ImpressionsAPITests(object): expected_counters = { 'pf': [ - {'f': 'f1', 'm': 123, 'c': 2}, - {'f': 'f2', 'm': 123, 'c': 123}, - {'f': 'f1', 'm': 456, 'c': 111}, - {'f': 'f2', 'm': 456, 'c': 222}, + {'f': 'f1', 'm': 123, 'rc': 2}, + {'f': 'f2', 'm': 123, 'rc': 123}, + {'f': 'f1', 'm': 456, 'rc': 111}, + {'f': 'f2', 'm': 456, 'rc': 222}, ] } @@ -64,7 +64,8 @@ def test_post_impressions(self, mocker): assert call_made[2]['extra_headers'] == { 'SplitSDKVersion': 'python-%s' % __version__, 'SplitSDKMachineIP': '123.123.123.123', - 'SplitSDKMachineName': 'some_machine_name' + 'SplitSDKMachineName': 'some_machine_name', + 'SplitSDKImpressionsMode': 'OPTIMIZED' } # validate key-value args (body) @@ -86,7 +87,7 @@ def test_post_impressions_ip_address_disabled(self, mocker): cfg = DEFAULT_CONFIG.copy() cfg.update({'IPAddressesEnabled': False}) sdk_metadata = get_metadata(cfg) - impressions_api = impressions.ImpressionsAPI(httpclient, 'some_api_key', sdk_metadata) + impressions_api = impressions.ImpressionsAPI(httpclient, 'some_api_key', sdk_metadata, ImpressionsMode.DEBUG) response = impressions_api.flush_impressions(self.impressions) call_made = httpclient.post.mock_calls[0] @@ -97,6 +98,7 @@ def test_post_impressions_ip_address_disabled(self, mocker): # validate key-value args (headers) assert call_made[2]['extra_headers'] == { 'SplitSDKVersion': 'python-%s' % __version__, + 'SplitSDKImpressionsMode': 'DEBUG' } # validate key-value args (body) @@ -121,7 +123,8 @@ def test_post_counters(self, mocker): assert call_made[2]['extra_headers'] == { 'SplitSDKVersion': 'python-%s' % __version__, 'SplitSDKMachineIP': '123.123.123.123', - 'SplitSDKMachineName': 'some_machine_name' + 'SplitSDKMachineName': 'some_machine_name', + 'SplitSDKImpressionsMode': 'OPTIMIZED' } # validate key-value args (body) diff --git a/tests/client/test_client.py b/tests/client/test_client.py index 26c7c937..3bfefe7a 100644 --- a/tests/client/test_client.py +++ b/tests/client/test_client.py @@ -42,7 +42,7 @@ def _get_storage_mock(name): factory._get_storage.side_effect = _get_storage_mock type(factory).destroyed = destroyed_property - mocker.patch('splitio.client.client.time.time', new=lambda: 1) + mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) impmanager = mocker.Mock(spec=ImpressionManager) @@ -110,7 +110,7 @@ def _get_storage_mock(name): factory._get_storage.side_effect = _get_storage_mock type(factory).destroyed = destroyed_property - mocker.patch('splitio.client.client.time.time', new=lambda: 1) + mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) impmanager = mocker.Mock(spec=ImpressionManager) @@ -183,7 +183,7 @@ def _get_storage_mock(name): factory._get_storage.side_effect = _get_storage_mock type(factory).destroyed = destroyed_property - mocker.patch('splitio.client.client.time.time', new=lambda: 1) + mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) impmanager = mocker.Mock(spec=ImpressionManager) @@ -253,7 +253,7 @@ def _get_storage_mock(name): factory._get_storage.side_effect = _get_storage_mock type(factory).destroyed = destroyed_property - mocker.patch('splitio.client.client.time.time', new=lambda: 1) + mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) impmanager = mocker.Mock(spec=ImpressionManager) @@ -353,7 +353,7 @@ def _get_storage_mock(name): destroyed_mock.return_value = False type(factory).destroyed = destroyed_mock factory._apikey = 'test' - mocker.patch('splitio.client.client.time.time', new=lambda: 1) + mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) impmanager = mocker.Mock(spec=ImpressionManager) client = Client(factory, impmanager, True) diff --git a/tests/client/test_factory.py b/tests/client/test_factory.py index 96f60a42..c1080731 100644 --- a/tests/client/test_factory.py +++ b/tests/client/test_factory.py @@ -345,7 +345,7 @@ def _telemetry_task_init_mock(self, api, storage, refresh_rate): def test_multiple_factories(self, mocker): """Test multiple factories instantiation and tracking.""" def _make_factory_with_apikey(apikey, *_, **__): - return SplitFactory(apikey, {}, True) + return SplitFactory(apikey, {}, True, mocker.Mock(spec=ImpressionsManager)) factory_module_logger = mocker.Mock() build_in_memory = mocker.Mock() diff --git a/tests/integration/test_client_e2e.py b/tests/integration/test_client_e2e.py index 8ddfa411..771ef760 100644 --- a/tests/integration/test_client_e2e.py +++ b/tests/integration/test_client_e2e.py @@ -49,7 +49,7 @@ def setup_method(self): 'telemetry': InMemoryTelemetryStorage() } impmanager = ImpressionsManager(storages['impressions'].put, ImpressionsMode.DEBUG) - self.factory = SplitFactory('some_api_key', storages, impmanager, True) #pylint:disable=attribute-defined-outside-init + self.factory = SplitFactory('some_api_key', storages, True, impmanager) #pylint:disable=attribute-defined-outside-init def _validate_last_impressions(self, client, *to_validate): """Validate the last N impressions are present disregarding the order.""" @@ -291,7 +291,7 @@ def setup_method(self): 'telemetry': InMemoryTelemetryStorage() } impmanager = ImpressionsManager(storages['impressions'].put, ImpressionsMode.OPTIMIZED, standalone=True) - self.factory = SplitFactory('some_api_key', storages, impmanager, True) #pylint:disable=attribute-defined-outside-init + self.factory = SplitFactory('some_api_key', storages, True, impmanager) #pylint:disable=attribute-defined-outside-init def _validate_last_impressions(self, client, *to_validate): """Validate the last N impressions are present disregarding the order.""" @@ -509,7 +509,7 @@ def setup_method(self): 'telemetry': RedisTelemetryStorage(redis_client, metadata) } impmanager = ImpressionsManager(storages['impressions'].put, ImpressionsMode.DEBUG) - self.factory = SplitFactory('some_api_key', storages, impmanager, True) #pylint:disable=attribute-defined-outside-init + self.factory = SplitFactory('some_api_key', storages, True, impmanager) #pylint:disable=attribute-defined-outside-init def _validate_last_impressions(self, client, *to_validate): """Validate the last N impressions are present disregarding the order.""" @@ -787,7 +787,7 @@ def setup_method(self): 'telemetry': RedisTelemetryStorage(redis_client, metadata) } impmanager = ImpressionsManager(storages['impressions'].put, ImpressionsMode.DEBUG) - self.factory = SplitFactory('some_api_key', storages, impmanager, True) #pylint:disable=attribute-defined-outside-init + self.factory = SplitFactory('some_api_key', storages, True, impmanager) #pylint:disable=attribute-defined-outside-init class LocalhostIntegrationTests(object): #pylint: disable=too-few-public-methods From 5e5bf6fef6705a563fcb5519b7f7970667ee1c8b Mon Sep 17 00:00:00 2001 From: Martin Redolatti Date: Thu, 8 Oct 2020 15:34:15 -0300 Subject: [PATCH 4/4] support lowercase impressions mode --- splitio/client/config.py | 9 +++++++-- tests/client/test_config.py | 4 ++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/splitio/client/config.py b/splitio/client/config.py index 9a204958..f5ee7a1a 100644 --- a/splitio/client/config.py +++ b/splitio/client/config.py @@ -2,10 +2,14 @@ from __future__ import absolute_import, division, print_function, unicode_literals import os.path +import logging from splitio.engine.impressions import ImpressionsMode +_LOGGER = logging.getLogger(__name__) + + DEFAULT_CONFIG = { 'operationMode': 'in-memory', 'connectionTimeout': 1500, @@ -87,8 +91,9 @@ def _sanitize_impressions_mode(mode, refresh_rate=None): """ if not isinstance(mode, ImpressionsMode): try: - mode = ImpressionsMode(mode) - except ValueError: + mode = ImpressionsMode(mode.upper()) + except (ValueError, AttributeError): + _LOGGER.warning('unrecognized impressionsMode supplied. Defaulting to OPTIMIZED') mode = ImpressionsMode.OPTIMIZED if mode == ImpressionsMode.DEBUG: diff --git a/tests/client/test_config.py b/tests/client/test_config.py index 37adf9eb..0ed0d1b1 100644 --- a/tests/client/test_config.py +++ b/tests/client/test_config.py @@ -25,6 +25,10 @@ def test_sanitize_imp_mode(self): assert mode == ImpressionsMode.DEBUG assert rate == 1 + mode, rate = config._sanitize_impressions_mode('debug', 1) + assert mode == ImpressionsMode.DEBUG + assert rate == 1 + mode, rate = config._sanitize_impressions_mode('ANYTHING', 200) assert mode == ImpressionsMode.OPTIMIZED assert rate == 200