diff --git a/splitio/api/impressions.py b/splitio/api/impressions.py index 3f26e8c9..517b8f86 100644 --- a/splitio/api/impressions.py +++ b/splitio/api/impressions.py @@ -7,12 +7,13 @@ from splitio.api import APIException, headers_from_metadata from splitio.api.client import HttpClientException +from splitio.engine.impressions import ImpressionsMode class ImpressionsAPI(object): # pylint: disable=too-few-public-methods """Class that uses an httpClient to communicate with the impressions API.""" - def __init__(self, client, apikey, sdk_metadata): + def __init__(self, client, apikey, sdk_metadata, mode=ImpressionsMode.OPTIMIZED): """ Class constructor. @@ -25,6 +26,7 @@ def __init__(self, client, apikey, sdk_metadata): self._client = client self._apikey = apikey self._metadata = headers_from_metadata(sdk_metadata) + self._metadata['SplitSDKImpressionsMode'] = mode.name @staticmethod def _build_bulk(impressions): @@ -35,19 +37,20 @@ def _build_bulk(impressions): :type impressions: list(splitio.models.impressions.Impression) :return: Dictionary of lists of impressions. - :rtype: dict + :rtype: list """ return [ { - 'testName': test_name, - 'keyImpressions': [ + 'f': test_name, + 'i': [ { - 'keyName': impression.matching_key, - 'treatment': impression.treatment, - 'time': impression.time, - 'changeNumber': impression.change_number, - 'label': impression.label, - 'bucketingKey': impression.bucketing_key + 'k': impression.matching_key, + 't': impression.treatment, + 'm': impression.time, + 'c': impression.change_number, + 'r': impression.label, + 'b': impression.bucketing_key, + 'pt': impression.previous_time } for impression in imps ] @@ -58,6 +61,27 @@ def _build_bulk(impressions): ) ] + @staticmethod + def _build_counters(counters): + """ + Build an impression bulk formatted as the API expects it. + + :param counters: List of impression counters per feature. + :type counters: list[splitio.engine.impressions.Counter.CountPerFeature] + + :return: dict with list of impression count dtos + :rtype: dict + """ + return { + 'pf': [ + { + 'f': pf_count.feature, + 'm': pf_count.timeframe, + 'rc': pf_count.count + } for pf_count in counters + ] + } + def flush_impressions(self, impressions): """ Send impressions to the backend. @@ -80,3 +104,26 @@ def flush_impressions(self, impressions): self._logger.error('Http client is throwing exceptions') self._logger.debug('Error: ', exc_info=True) raise_from(APIException('Impressions not flushed properly.'), exc) + + def flush_counters(self, counters): + """ + Send impressions to the backend. + + :param impressions: Impressions bulk + :type impressions: list + """ + bulk = self._build_counters(counters) + try: + response = self._client.post( + 'events', + '/testImpressions/count', + self._apikey, + body=bulk, + extra_headers=self._metadata + ) + if not 200 <= response.status_code < 300: + raise APIException(response.body, response.status_code) + except HttpClientException as exc: + self._logger.error('Http client is throwing exceptions') + self._logger.debug('Error: ', exc_info=True) + raise_from(APIException('Impressions not flushed properly.'), exc) diff --git a/splitio/client/client.py b/splitio/client/client.py index 894503fb..6ebf1cb4 100644 --- a/splitio/client/client.py +++ b/splitio/client/client.py @@ -11,7 +11,7 @@ from splitio.models.events import Event, EventWrapper from splitio.models.telemetry import get_latency_bucket_index from splitio.client import input_validator -from splitio.client.listener import ImpressionListenerException +from splitio.util import utctime_ms class Client(object): # pylint: disable=too-many-instance-attributes @@ -22,7 +22,7 @@ class Client(object): # pylint: disable=too-many-instance-attributes _METRIC_GET_TREATMENT_WITH_CONFIG = 'sdk.getTreatmentWithConfig' _METRIC_GET_TREATMENTS_WITH_CONFIG = 'sdk.getTreatmentsWithConfig' - def __init__(self, factory, labels_enabled=True, impression_listener=None): + def __init__(self, factory, impressions_manager, labels_enabled=True): """ Construct a Client instance. @@ -32,15 +32,15 @@ def __init__(self, factory, labels_enabled=True, impression_listener=None): :param labels_enabled: Whether to store labels on impressions :type labels_enabled: bool - :param impression_listener: impression listener implementation - :type impression_listener: ImpressionListener + :param impressions_manager: impression manager instance + :type impressions_manager: splitio.engine.impressions.Manager :rtype: Client """ self._logger = logging.getLogger(self.__class__.__name__) self._factory = factory self._labels_enabled = labels_enabled - self._impression_listener = impression_listener + self._impressions_manager = impressions_manager self._splitter = Splitter() self._split_storage = factory._get_storage('splits') # pylint: disable=protected-access @@ -68,25 +68,6 @@ def destroyed(self): """Return whether the factory holding this client has been destroyed.""" return self._factory.destroyed - def _send_impression_to_listener(self, impression, attributes): - """ - Send impression result to custom listener. - - :param impression: Generated impression - :type impression: Impression - - :param attributes: An optional dictionary of attributes - :type attributes: dict - """ - if self._impression_listener is not None: - try: - self._impression_listener.log_impression(impression, attributes) - except ImpressionListenerException: - self._logger.error( - 'An exception was raised while calling user-custom impression listener' - ) - self._logger.debug('Error', exc_info=True) - def _evaluate_if_ready(self, matching_key, bucketing_key, feature, attributes=None): if not self.ready: return { @@ -135,11 +116,10 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name): result['impression']['label'], result['impression']['change_number'], bucketing_key, - start + utctime_ms(), ) - self._record_stats([impression], start, metric_name) - self._send_impression_to_listener(impression, attributes) + self._record_stats([(impression, attributes)], start, metric_name) return result['treatment'], result['configurations'] except Exception: # pylint: disable=broad-except self._logger.error('Error getting treatment for feature') @@ -152,10 +132,9 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name): Label.EXCEPTION, self._split_storage.get_change_number(), bucketing_key, - start + utctime_ms(), ) - self._record_stats([impression], start, metric_name) - self._send_impression_to_listener(impression, attributes) + self._record_stats([(impression, attributes)], start, metric_name) except Exception: # pylint: disable=broad-except self._logger.error('Error reporting impression into get_treatment exception block') self._logger.debug('Error: ', exc_info=True) @@ -200,7 +179,7 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name) result['impression']['label'], result['impression']['change_number'], bucketing_key, - start) + utctime_ms()) bulk_impressions.append(impression) treatments[feature] = (result['treatment'], result['configurations']) @@ -215,9 +194,11 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name) # Register impressions try: if bulk_impressions: - self._record_stats(bulk_impressions, start, self._METRIC_GET_TREATMENTS) - for impression in bulk_impressions: - self._send_impression_to_listener(impression, attributes) + self._record_stats( + [(i, attributes) for i in bulk_impressions], + start, + metric_name + ) except Exception: # pylint: disable=broad-except self._logger.error('%s: An exception when trying to store ' 'impressions.' % method_name) @@ -350,7 +331,7 @@ def _record_stats(self, impressions, start, operation): Record impressions and metrics. :param impressions: Generated impressions - :type impressions: list||Impression + :type impressions: list[tuple[splitio.models.impression.Impression, dict]] :param start: timestamp when get_treatment or get_treatments was called :type start: int @@ -360,7 +341,7 @@ def _record_stats(self, impressions, start, operation): """ try: end = int(round(time.time() * 1000)) - self._impressions_storage.put(impressions) + self._impressions_manager.track(impressions) self._telemetry_storage.inc_latency(operation, get_latency_bucket_index(end - start)) except Exception: # pylint: disable=broad-except self._logger.error('Error recording impressions and metrics') @@ -409,7 +390,7 @@ def track(self, key, traffic_type, event_type, value=None, properties=None): traffic_type_name=traffic_type, event_type_id=event_type, value=value, - timestamp=int(time.time()*1000), + timestamp=utctime_ms(), properties=properties, ) return self._events_storage.put([EventWrapper( diff --git a/splitio/client/config.py b/splitio/client/config.py index 30ea411e..f5ee7a1a 100644 --- a/splitio/client/config.py +++ b/splitio/client/config.py @@ -1,15 +1,24 @@ """Default settings for the Split.IO SDK Python client.""" from __future__ import absolute_import, division, print_function, unicode_literals + import os.path +import logging + +from splitio.engine.impressions import ImpressionsMode + + +_LOGGER = logging.getLogger(__name__) + DEFAULT_CONFIG = { + 'operationMode': 'in-memory', 'connectionTimeout': 1500, 'splitSdkMachineName': None, 'splitSdkMachineIp': None, 'featuresRefreshRate': 5, 'segmentsRefreshRate': 60, 'metricsRefreshRate': 60, - 'impressionsRefreshRate': 10, + 'impressionsRefreshRate': 5 * 60, 'impressionsBulkSize': 5000, 'impressionsQueueSize': 10000, 'eventsPushRate': 10, @@ -17,6 +26,7 @@ 'eventsQueueSize': 10000, 'labelsEnabled': True, 'IPAddressesEnabled': True, + 'impressionsMode': 'OPTIMIZED', 'impressionListener': None, 'redisLocalCacheEnabled': False, 'redisLocalCacheTTL': 5, @@ -46,3 +56,70 @@ 'machineIp': None, 'splitFile': os.path.join(os.path.expanduser('~'), '.split') } + + +def _parse_operation_mode(apikey, config): + """ + Process incoming config to determine operation mode. + + :param config: user supplied config + :type config: dict + + :returns: operation mode + :rtype: str + """ + if apikey == 'localhost': + return 'localhost-standalone' + + if 'redisHost' in config or 'redisSentinels' in config: + return 'redis-consumer' + + if 'uwsgiClient' in config: + return 'uwsgi-consumer' + + return 'inmemory-standalone' + +def _sanitize_impressions_mode(mode, refresh_rate=None): + """ + Check supplied impressions mode and adjust refresh rate. + + :param config: default + supplied config + :type config: dict + + :returns: config with sanitized impressions mode & refresh rate + :rtype: config + """ + if not isinstance(mode, ImpressionsMode): + try: + mode = ImpressionsMode(mode.upper()) + except (ValueError, AttributeError): + _LOGGER.warning('unrecognized impressionsMode supplied. Defaulting to OPTIMIZED') + mode = ImpressionsMode.OPTIMIZED + + if mode == ImpressionsMode.DEBUG: + refresh_rate = max(1, refresh_rate) if refresh_rate is not None else 60 + else: + refresh_rate = max(60, refresh_rate) if refresh_rate is not None else 5 * 60 + + return mode, refresh_rate + +def sanitize(apikey, config): + """ + Look for inconsistencies or ill-formed configs and tune it accordingly. + + :param apikey: customer's apikey + :type apikey: str + + :param config: DEFAULT + user supplied config + :type config: dict + + :returns: sanitized config + :rtype: dict + """ + config['operationMode'] = _parse_operation_mode(apikey, config) + processed = DEFAULT_CONFIG.copy() + processed.update(config) + imp_mode, imp_rate = _sanitize_impressions_mode(config.get('impressionsMode'), config.get('impressionsRefreshRate')) + processed['impressionsMode'] = imp_mode + processed['impressionsRefreshRate'] = imp_rate + return processed diff --git a/splitio/client/factory.py b/splitio/client/factory.py index 01a0fc0d..fd0e1172 100644 --- a/splitio/client/factory.py +++ b/splitio/client/factory.py @@ -12,9 +12,10 @@ from splitio.client.client import Client from splitio.client import input_validator from splitio.client.manager import SplitManager -from splitio.client.config import DEFAULT_CONFIG +from splitio.client.config import sanitize as sanitize_config from splitio.client import util from splitio.client.listener import ImpressionListenerWrapper +from splitio.engine.impressions import Manager as ImpressionsManager # Storage from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \ @@ -37,7 +38,7 @@ # Tasks from splitio.tasks.split_sync import SplitSynchronizationTask from splitio.tasks.segment_sync import SegmentSynchronizationTask -from splitio.tasks.impressions_sync import ImpressionsSyncTask +from splitio.tasks.impressions_sync import ImpressionsSyncTask, ImpressionsCountSyncTask from splitio.tasks.events_sync import EventsSyncTask from splitio.tasks.telemetry_sync import TelemetrySynchronizationTask @@ -73,10 +74,10 @@ def __init__( # pylint: disable=too-many-arguments apikey, storages, labels_enabled, + impressions_manager, apis=None, tasks=None, sdk_ready_flag=None, - impression_listener=None ): """ Class constructor. @@ -91,8 +92,8 @@ def __init__( # pylint: disable=too-many-arguments :type tasks: dict :param sdk_ready_flag: Event to set when the sdk is ready. :type sdk_ready_flag: threading.Event - :param impression_listener: User custom listener to handle impressions locally. - :type impression_listener: splitio.client.listener.ImpressionListener + :param impression_manager: Impressions manager instance + :type impression_listener: ImpressionsManager """ self._apikey = apikey self._logger = logging.getLogger(self.__class__.__name__) @@ -101,7 +102,7 @@ def __init__( # pylint: disable=too-many-arguments self._apis = apis if apis else {} self._tasks = tasks if tasks else {} self._sdk_ready_flag = sdk_ready_flag - self._impression_listener = impression_listener + self._impressions_manager = impressions_manager # If we have a ready flag, it means we have sync tasks that need to finish # before the SDK client becomes ready. @@ -138,7 +139,7 @@ def client(self): This client is only a set of references to structures hold by the factory. Creating one a fast operation and safe to be used anywhere. """ - return Client(self, self._labels_enabled, self._impression_listener) + return Client(self, self._impressions_manager, self._labels_enabled) def manager(self): """ @@ -233,13 +234,11 @@ def _wrap_impression_listener(listener, metadata): return None -def _build_in_memory_factory(api_key, config, sdk_url=None, events_url=None): # pylint: disable=too-many-locals +def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None): # pylint: disable=too-many-locals """Build and return a split factory tailored to the supplied config.""" if not input_validator.validate_factory_instantiation(api_key): return None - cfg = DEFAULT_CONFIG.copy() - cfg.update(config) http_client = HttpClient( sdk_url=sdk_url, events_url=events_url, @@ -250,7 +249,7 @@ def _build_in_memory_factory(api_key, config, sdk_url=None, events_url=None): # apis = { 'splits': SplitsAPI(http_client, api_key), 'segments': SegmentsAPI(http_client, api_key), - 'impressions': ImpressionsAPI(http_client, api_key, sdk_metadata), + 'impressions': ImpressionsAPI(http_client, api_key, sdk_metadata, cfg['impressionsMode']), 'events': EventsAPI(http_client, api_key, sdk_metadata), 'telemetry': TelemetryAPI(http_client, api_key, sdk_metadata) } @@ -271,6 +270,12 @@ def _build_in_memory_factory(api_key, config, sdk_url=None, events_url=None): # segments_ready_flag = threading.Event() sdk_ready_flag = threading.Event() + imp_manager = ImpressionsManager( + storages['impressions'].put, + cfg['impressionsMode'], + True, + _wrap_impression_listener(cfg['impressionListener'], sdk_metadata)) + tasks = { 'splits': SplitSynchronizationTask( apis['splits'], @@ -294,6 +299,11 @@ def _build_in_memory_factory(api_key, config, sdk_url=None, events_url=None): # cfg['impressionsBulkSize'] ), + 'impressions_count': ImpressionsCountSyncTask( + apis['impressions'], + imp_manager + ), + 'events': EventsSyncTask( apis['events'], storages['events'], @@ -311,6 +321,7 @@ def _build_in_memory_factory(api_key, config, sdk_url=None, events_url=None): # # Start tasks that have no dependencies tasks['splits'].start() tasks['impressions'].start() + tasks['impressions_count'].start() tasks['events'].start() tasks['telemetry'].start() @@ -333,21 +344,13 @@ def segment_ready_task(): segment_completion_thread = threading.Thread(target=segment_ready_task) segment_completion_thread.setDaemon(True) segment_completion_thread.start() - return SplitFactory( - api_key, - storages, - cfg['labelsEnabled'], - apis, - tasks, - sdk_ready_flag, - impression_listener=_wrap_impression_listener(cfg['impressionListener'], sdk_metadata) - ) + return SplitFactory(api_key, storages, cfg['labelsEnabled'], + imp_manager, apis, tasks, sdk_ready_flag) -def _build_redis_factory(api_key, config): + +def _build_redis_factory(api_key, cfg): """Build and return a split factory with redis-based storage.""" - cfg = DEFAULT_CONFIG.copy() - cfg.update(config) sdk_metadata = util.get_metadata(cfg) redis_adapter = redis.build(cfg) cache_enabled = cfg.get('redisLocalCacheEnabled', False) @@ -363,14 +366,13 @@ def _build_redis_factory(api_key, config): api_key, storages, cfg['labelsEnabled'], - impression_listener=_wrap_impression_listener(cfg['impressionListener'], sdk_metadata) + ImpressionsManager(storages['impressions'].put, cfg['impressionsMode'], False, + _wrap_impression_listener(cfg['impressionListener'], sdk_metadata)) ) -def _build_uwsgi_factory(api_key, config): +def _build_uwsgi_factory(api_key, cfg): """Build and return a split factory with redis-based storage.""" - cfg = DEFAULT_CONFIG.copy() - cfg.update(config) sdk_metadata = util.get_metadata(cfg) uwsgi_adapter = get_uwsgi() storages = { @@ -384,14 +386,13 @@ def _build_uwsgi_factory(api_key, config): api_key, storages, cfg['labelsEnabled'], - impression_listener=_wrap_impression_listener(cfg['impressionListener'], sdk_metadata) + ImpressionsManager(storages['impressions'].put, cfg['impressionsMode'], True, + _wrap_impression_listener(cfg['impressionListener'], sdk_metadata)) ) -def _build_localhost_factory(config): +def _build_localhost_factory(cfg): """Build and return a localhost factory for testing/development purposes.""" - cfg = DEFAULT_CONFIG.copy() - cfg.update(config) storages = { 'splits': InMemorySplitStorage(), 'segments': InMemorySegmentStorage(), # not used, just to avoid possible future errors. @@ -408,7 +409,15 @@ def _build_localhost_factory(config): ready_event )} tasks['splits'].start() - return SplitFactory('localhost', storages, False, None, tasks, ready_event) + return SplitFactory( + 'localhost', + storages, + False, + ImpressionsManager(storages['impressions'].put, cfg['impressionsMode'], True, None), + None, + tasks, + ready_event + ) def get_factory(api_key, **kwargs): @@ -432,15 +441,15 @@ def get_factory(api_key, **kwargs): "(Singleton pattern) and reusing it throughout your application." ) - config = kwargs.get('config', {}) + config = sanitize_config(api_key, kwargs.get('config', {})) - if api_key == 'localhost': + if config['operationMode'] == 'localhost-standalone': return _build_localhost_factory(config) - if 'redisHost' in config or 'redisSentinels' in config: + if config['operationMode'] == 'redis-consumer': return _build_redis_factory(api_key, config) - if 'uwsgiClient' in config: + if config['operationMode'] == 'uwsgi-consumer': return _build_uwsgi_factory(api_key, config) return _build_in_memory_factory( diff --git a/splitio/engine/impmanager.py b/splitio/engine/impressions.py similarity index 72% rename from splitio/engine/impmanager.py rename to splitio/engine/impressions.py index 2571e9a0..99bc757c 100644 --- a/splitio/engine/impmanager.py +++ b/splitio/engine/impressions.py @@ -9,6 +9,7 @@ from splitio.models.impressions import Impression from splitio.engine.hashfns import murmur_128 from splitio.engine.cache.lru import SimpleLruCache +from splitio.client.listener import ImpressionListenerException from splitio import util @@ -64,11 +65,11 @@ def _stringify(self, impression): :returns: a string representation of the impression :rtype: str """ - return self._PATTERN % (impression.matching_key, - impression.feature_name, - impression.treatment, - impression.label, - impression.change_number) + return self._PATTERN % (impression.matching_key if impression.matching_key else 'UNKNOWN', + impression.feature_name if impression.feature_name else 'UNKNOWN', + impression.treatment if impression.treatment else 'UNKNOWN', + impression.label if impression.label else 'UNKNOWN', + impression.change_number if impression.change_number else 0) def process(self, impression): """ @@ -116,7 +117,7 @@ class Counter(object): """Class that counts impressions per timeframe.""" CounterKey = namedtuple('Count', ['feature', 'timeframe']) - CountPerFeature = namedtuple('Count', ['feature', 'timeframe', 'count']) + CountPerFeature = namedtuple('CountPerFeature', ['feature', 'timeframe', 'count']) def __init__(self): """Class constructor.""" @@ -183,19 +184,42 @@ def track(self, impressions): Impressions are analyzed to see if they've been seen before and counted. - :param impressions: List of impression objects - :type impressions: list[splitio.models.impression.Impression] + :param impressions: List of impression objects with attributes + :type impressions: list[tuple[splitio.models.impression.Impression, dict]] """ - imps = [self._observer.test_and_set(i) for i in impressions] if self._observer \ + imps = [(self._observer.test_and_set(imp), attrs) for imp, attrs in impressions] if self._observer \ else impressions if self._counter: - self._counter.track(imps) + self._counter.track([imp for imp, _ in imps]) - if self._listener: - for imp in imps: - self._listener.log_impression(imp) + self._send_impressions_to_listener(imps) this_hour = truncate_time(util.utctime_ms()) - self._forwarder(imps if self._counter is None - else [i for i in imps if i.previous_time is None or i.previous_time < this_hour]) # pylint:disable=line-too-long + self._forwarder([imp for imp, _ in imps] if self._counter is None + else [i for i, _ in imps if i.previous_time is None or i.previous_time < this_hour]) + + def get_counts(self): + """ + Return counts of impressions per features. + + :returns: A list of counter objects. + :rtype: list[Counter.CountPerFeature] + """ + return self._counter.pop_all() if self._counter is not None else [] + + def _send_impressions_to_listener(self, impressions): + """ + Send impression result to custom listener. + + :param impressions: List of impression objects with attributes + :type impressions: list[tuple[splitio.models.impression.Impression, dict]] + """ + if self._listener is not None: + try: + for impression, attributes in impressions: + self._listener.log_impression(impression, attributes) + except ImpressionListenerException: + pass +# self._logger.error('An exception was raised while calling user-custom impression listener') +# self._logger.debug('Error', exc_info=True) diff --git a/splitio/tasks/events_sync.py b/splitio/tasks/events_sync.py index a938518d..e0402a2d 100644 --- a/splitio/tasks/events_sync.py +++ b/splitio/tasks/events_sync.py @@ -70,12 +70,9 @@ def _send_events(self): try: self._events_api.flush_events(to_send) - except APIException as exc: - self._logger.error( - 'Exception raised while reporting events: %s -- %d', - exc.message, - exc.status_code - ) + except APIException: + self._logger.error('Exception raised while reporting events') + self._logger.debug('Exception information: ', exc_info=True) self._add_to_failed_queue(to_send) def start(self): diff --git a/splitio/tasks/impressions_sync.py b/splitio/tasks/impressions_sync.py index 075f4547..ada9500d 100644 --- a/splitio/tasks/impressions_sync.py +++ b/splitio/tasks/impressions_sync.py @@ -71,12 +71,9 @@ def _send_impressions(self): try: self._impressions_api.flush_impressions(to_send) - except APIException as exc: - self._logger.error( - 'Exception raised while reporting impressions: %s -- %d', - exc.message, - exc.status_code - ) + except APIException: + self._logger.error('Exception raised while reporting impressions') + self._logger.debug('Exception information: ', exc_info=True) self._add_to_failed_queue(to_send) def start(self): @@ -99,3 +96,57 @@ def is_running(self): def flush(self): """Flush impressions in storage.""" self._task.force_execution() + + +class ImpressionsCountSyncTask(BaseSynchronizationTask): + """Impressions synchronization task uses an asynctask.AsyncTask to send impressions.""" + + _PERIOD = 5 # 30 * 60 # 30 minutes + + def __init__(self, impressions_api, impressions_manager): + """ + Class constructor. + + :param impressions_api: Impressions Api object to send data to the backend + :type impressions_api: splitio.api.impressions.ImpressionsAPI + + :param impressions_manager: Impressions manager instance + :type impressions_manager: splitio.engine.impressions.Manager + """ + self._logger = logging.getLogger(self.__class__.__name__) + self._impressions_api = impressions_api + self._impressions_manager = impressions_manager + self._task = AsyncTask(self._send_counters, self._PERIOD, on_stop=self._send_counters) + + def _send_counters(self): + """Send impressions from both the failed and new queues.""" + to_send = self._impressions_manager.get_counts() + if not to_send: + return + + try: + self._impressions_api.flush_counters(to_send) + except APIException: + self._logger.error('Exception raised while reporting impression counts') + self._logger.debug('Exception information: ', exc_info=True) + + def start(self): + """Start executing the impressions synchronization task.""" + self._task.start() + + def stop(self, event=None): + """Stop executing the impressions synchronization task.""" + self._task.stop(event) + + def is_running(self): + """ + Return whether the task is running or not. + + :return: True if the task is running. False otherwise. + :rtype: bool + """ + return self._task.running() + + def flush(self): + """Flush impressions in storage.""" + self._task.force_execution() diff --git a/splitio/tasks/segment_sync.py b/splitio/tasks/segment_sync.py index 2fa60b0f..93089b1a 100644 --- a/splitio/tasks/segment_sync.py +++ b/splitio/tasks/segment_sync.py @@ -46,7 +46,8 @@ def _update_segment(self, segment_name): try: segment_changes = self._segment_api.fetch_segment(segment_name, since) except APIException: - self._logger.error('Error fetching segments') + self._logger.error('Exception raised while fetching segment %s', segment_name) + self._logger.debug('Exception information: ', exc_info=True) return False if since == -1: # first time fetching the segment diff --git a/splitio/tasks/split_sync.py b/splitio/tasks/split_sync.py index b6d9cbed..a122913e 100644 --- a/splitio/tasks/split_sync.py +++ b/splitio/tasks/split_sync.py @@ -42,7 +42,8 @@ def _update_splits(self): try: split_changes = self._api.fetch_splits(till) except APIException: - self._logger.error('Failed to fetch split from servers') + self._logger.error('Exception raised while fetching splits') + self._logger.debug('Exception information: ', exc_info=True) return False for split in split_changes.get('splits', []): diff --git a/splitio/tasks/telemetry_sync.py b/splitio/tasks/telemetry_sync.py index 96e084ef..0f5cf9cd 100644 --- a/splitio/tasks/telemetry_sync.py +++ b/splitio/tasks/telemetry_sync.py @@ -37,6 +37,7 @@ def _flush_telemetry(self): self._api.flush_latencies(latencies) except APIException: self._logger.error('Failed send telemetry/latencies to split BE.') + self._logger.debug('Exception information: ', exc_info=True) try: counters = self._storage.pop_counters() @@ -44,6 +45,7 @@ def _flush_telemetry(self): self._api.flush_counters(counters) except APIException: self._logger.error('Failed send telemetry/counters to split BE.') + self._logger.debug('Exception information: ', exc_info=True) try: gauges = self._storage.pop_gauges() @@ -51,6 +53,7 @@ def _flush_telemetry(self): self._api.flush_gauges(gauges) except APIException: self._logger.error('Failed send telemetry/gauges to split BE.') + self._logger.debug('Exception information: ', exc_info=True) def start(self): """Start the task.""" diff --git a/splitio/tasks/util/asynctask.py b/splitio/tasks/util/asynctask.py index 88fecc00..91aeeb12 100644 --- a/splitio/tasks/util/asynctask.py +++ b/splitio/tasks/util/asynctask.py @@ -102,11 +102,13 @@ def _execution_wrapper(self): except queue.Empty: # If no message was received, the timeout has expired # and we're ready for a new execution - if not _safe_run(self._main): - _LOGGER.error( - "An error occurred when executing the task. " - "Retrying after perio expires" - ) + pass + + if not _safe_run(self._main): + _LOGGER.error( + "An error occurred when executing the task. " + "Retrying after perio expires" + ) finally: self._cleanup() diff --git a/splitio/tasks/uwsgi_wrappers.py b/splitio/tasks/uwsgi_wrappers.py index 73d04120..d6aa774a 100644 --- a/splitio/tasks/uwsgi_wrappers.py +++ b/splitio/tasks/uwsgi_wrappers.py @@ -3,7 +3,7 @@ import logging import time -from splitio.client.config import DEFAULT_CONFIG +from splitio.client.config import sanitize as sanitize_config from splitio.client.util import get_metadata from splitio.storage.adapters.uwsgi_cache import get_uwsgi from splitio.storage.uwsgi import UWSGIEventStorage, UWSGIImpressionStorage, \ @@ -34,9 +34,7 @@ def _get_config(user_config): :return: Calculated configuration. :rtype: dict """ - sdk_config = DEFAULT_CONFIG.copy() - sdk_config.update(user_config) - return sdk_config + return sanitize_config(user_config['apikey'], user_config) def uwsgi_update_splits(user_config): @@ -113,7 +111,8 @@ def uwsgi_report_impressions(user_config): ImpressionsAPI( HttpClient(1500, config.get('sdk_url'), config.get('events_url')), config['apikey'], - metadata + metadata, + config['impressionsMode'] ), storage, None, # Period not needed. Task is being triggered manually. diff --git a/splitio/version.py b/splitio/version.py index 7b2b8ec7..81104b12 100644 --- a/splitio/version.py +++ b/splitio/version.py @@ -1 +1 @@ -__version__ = '8.2.1' +__version__ = '8.3.0-rc1' diff --git a/tests/api/test_impressions_api.py b/tests/api/test_impressions_api.py index 63650acc..54d64b1a 100644 --- a/tests/api/test_impressions_api.py +++ b/tests/api/test_impressions_api.py @@ -3,6 +3,7 @@ import pytest from splitio.api import impressions, client, APIException from splitio.models.impressions import Impression +from splitio.engine.impressions import Counter, ImpressionsMode from splitio.client.util import get_metadata from splitio.client.config import DEFAULT_CONFIG from splitio.version import __version__ @@ -16,18 +17,34 @@ class ImpressionsAPITests(object): Impression('k3', 'f1', 'on', 'l1', 123456, 'b1', 321654) ] expectedImpressions = [{ - 'testName': 'f1', - 'keyImpressions': [ - {'keyName': 'k1', 'bucketingKey': 'b1', 'treatment': 'on', 'label': 'l1', 'time': 321654, 'changeNumber': 123456}, - {'keyName': 'k3', 'bucketingKey': 'b1', 'treatment': 'on', 'label': 'l1', 'time': 321654, 'changeNumber': 123456}, + 'f': 'f1', + 'i': [ + {'k': 'k1', 'b': 'b1', 't': 'on', 'r': 'l1', 'm': 321654, 'c': 123456, 'pt': None}, + {'k': 'k3', 'b': 'b1', 't': 'on', 'r': 'l1', 'm': 321654, 'c': 123456, 'pt': None}, ], }, { - 'testName': 'f2', - 'keyImpressions': [ - {'keyName': 'k2', 'bucketingKey': 'b1', 'treatment': 'off', 'label': 'l1', 'time': 321654, 'changeNumber': 123456}, + 'f': 'f2', + 'i': [ + {'k': 'k2', 'b': 'b1', 't': 'off', 'r': 'l1', 'm': 321654, 'c': 123456, 'pt': None}, ] }] + counters = [ + Counter.CountPerFeature('f1', 123, 2), + Counter.CountPerFeature('f2', 123, 123), + Counter.CountPerFeature('f1', 456, 111), + Counter.CountPerFeature('f2', 456, 222) + ] + + expected_counters = { + 'pf': [ + {'f': 'f1', 'm': 123, 'rc': 2}, + {'f': 'f2', 'm': 123, 'rc': 123}, + {'f': 'f1', 'm': 456, 'rc': 111}, + {'f': 'f2', 'm': 456, 'rc': 222}, + ] + } + def test_post_impressions(self, mocker): """Test impressions posting API call.""" httpclient = mocker.Mock(spec=client.HttpClient) @@ -47,7 +64,8 @@ def test_post_impressions(self, mocker): assert call_made[2]['extra_headers'] == { 'SplitSDKVersion': 'python-%s' % __version__, 'SplitSDKMachineIP': '123.123.123.123', - 'SplitSDKMachineName': 'some_machine_name' + 'SplitSDKMachineName': 'some_machine_name', + 'SplitSDKImpressionsMode': 'OPTIMIZED' } # validate key-value args (body) @@ -69,7 +87,7 @@ def test_post_impressions_ip_address_disabled(self, mocker): cfg = DEFAULT_CONFIG.copy() cfg.update({'IPAddressesEnabled': False}) sdk_metadata = get_metadata(cfg) - impressions_api = impressions.ImpressionsAPI(httpclient, 'some_api_key', sdk_metadata) + impressions_api = impressions.ImpressionsAPI(httpclient, 'some_api_key', sdk_metadata, ImpressionsMode.DEBUG) response = impressions_api.flush_impressions(self.impressions) call_made = httpclient.post.mock_calls[0] @@ -80,7 +98,43 @@ def test_post_impressions_ip_address_disabled(self, mocker): # validate key-value args (headers) assert call_made[2]['extra_headers'] == { 'SplitSDKVersion': 'python-%s' % __version__, + 'SplitSDKImpressionsMode': 'DEBUG' } # validate key-value args (body) assert call_made[2]['body'] == self.expectedImpressions + + def test_post_counters(self, mocker): + """Test impressions posting API call.""" + httpclient = mocker.Mock(spec=client.HttpClient) + httpclient.post.return_value = client.HttpResponse(200, '') + cfg = DEFAULT_CONFIG.copy() + cfg.update({'IPAddressesEnabled': True, 'machineName': 'some_machine_name', 'machineIp': '123.123.123.123'}) + sdk_metadata = get_metadata(cfg) + impressions_api = impressions.ImpressionsAPI(httpclient, 'some_api_key', sdk_metadata) + response = impressions_api.flush_counters(self.counters) + + call_made = httpclient.post.mock_calls[0] + + # validate positional arguments + assert call_made[1] == ('events', '/testImpressions/count', 'some_api_key') + + # validate key-value args (headers) + assert call_made[2]['extra_headers'] == { + 'SplitSDKVersion': 'python-%s' % __version__, + 'SplitSDKMachineIP': '123.123.123.123', + 'SplitSDKMachineName': 'some_machine_name', + 'SplitSDKImpressionsMode': 'OPTIMIZED' + } + + # validate key-value args (body) + assert call_made[2]['body'] == self.expected_counters + + httpclient.reset_mock() + def raise_exception(*args, **kwargs): + raise client.HttpClientException('some_message') + httpclient.post.side_effect = raise_exception + with pytest.raises(APIException) as exc_info: + response = impressions_api.flush_counters(self.counters) + assert exc_info.type == APIException + assert exc_info.value.message == 'some_message' diff --git a/tests/client/test_client.py b/tests/client/test_client.py index 2546b27d..3bfefe7a 100644 --- a/tests/client/test_client.py +++ b/tests/client/test_client.py @@ -13,6 +13,8 @@ from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \ InMemoryImpressionStorage, InMemoryTelemetryStorage, InMemoryEventStorage from splitio.models import splits, segments +from splitio.engine.impressions import Manager as ImpressionManager + class ClientTests(object): #pylint: disable=too-few-public-methods """Split client test cases.""" @@ -40,10 +42,11 @@ def _get_storage_mock(name): factory._get_storage.side_effect = _get_storage_mock type(factory).destroyed = destroyed_property - mocker.patch('splitio.client.client.time.time', new=lambda: 1) + mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) - client = Client(factory, True, None) + impmanager = mocker.Mock(spec=ImpressionManager) + client = Client(factory, impmanager, True) client._evaluator = mocker.Mock(spec=Evaluator) client._evaluator.evaluate_feature.return_value = { 'treatment': 'on', @@ -54,28 +57,23 @@ def _get_storage_mock(name): }, } client._logger = mocker.Mock() - client._send_impression_to_listener = mocker.Mock() assert client.get_treatment('some_key', 'some_feature') == 'on' assert mocker.call( - [Impression('some_key', 'some_feature', 'on', 'some_label', 123, None, 1000)] - ) in impression_storage.put.mock_calls + [(Impression('some_key', 'some_feature', 'on', 'some_label', 123, None, 1000), None)] + ) in impmanager.track.mock_calls assert mocker.call('sdk.getTreatment', 5) in telemetry_storage.inc_latency.mock_calls assert client._logger.mock_calls == [] - assert mocker.call( - Impression('some_key', 'some_feature', 'on', 'some_label', 123, None, 1000), - None - ) in client._send_impression_to_listener.mock_calls # Test with client not ready ready_property = mocker.PropertyMock() ready_property.return_value = False type(factory).ready = ready_property - impression_storage.put.reset_mock() + impmanager.track.reset_mock() assert client.get_treatment('some_key', 'some_feature', {'some_attribute': 1}) == 'control' assert mocker.call( - [Impression('some_key', 'some_feature', 'control', Label.NOT_READY, mocker.ANY, mocker.ANY, mocker.ANY)] - ) in impression_storage.put.mock_calls + [(Impression('some_key', 'some_feature', 'control', Label.NOT_READY, mocker.ANY, mocker.ANY, mocker.ANY), {'some_attribute': 1})] + ) in impmanager.track.mock_calls # Test with exception: ready_property.return_value = True @@ -85,8 +83,8 @@ def _raise(*_): client._evaluator.evaluate_feature.side_effect = _raise assert client.get_treatment('some_key', 'some_feature') == 'control' assert mocker.call( - [Impression('some_key', 'some_feature', 'control', 'exception', -1, None, 1000)] - ) in impression_storage.put.mock_calls + [(Impression('some_key', 'some_feature', 'control', 'exception', -1, None, 1000), None)] + ) in impmanager.track.mock_calls assert len(telemetry_storage.inc_latency.mock_calls) == 3 def test_get_treatment_with_config(self, mocker): @@ -112,10 +110,11 @@ def _get_storage_mock(name): factory._get_storage.side_effect = _get_storage_mock type(factory).destroyed = destroyed_property - mocker.patch('splitio.client.client.time.time', new=lambda: 1) + mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) - client = Client(factory, True, None) + impmanager = mocker.Mock(spec=ImpressionManager) + client = Client(factory, impmanager, True) client._evaluator = mocker.Mock(spec=Evaluator) client._evaluator.evaluate_feature.return_value = { 'treatment': 'on', @@ -133,24 +132,21 @@ def _get_storage_mock(name): 'some_feature' ) == ('on', '{"some_config": True}') assert mocker.call( - [Impression('some_key', 'some_feature', 'on', 'some_label', 123, None, 1000)] - ) in impression_storage.put.mock_calls + [(Impression('some_key', 'some_feature', 'on', 'some_label', 123, None, 1000), None)] + ) in impmanager.track.mock_calls assert mocker.call('sdk.getTreatmentWithConfig', 5) in telemetry_storage.inc_latency.mock_calls assert client._logger.mock_calls == [] - assert mocker.call( - Impression('some_key', 'some_feature', 'on', 'some_label', 123, None, 1000), - None - ) in client._send_impression_to_listener.mock_calls # Test with client not ready ready_property = mocker.PropertyMock() ready_property.return_value = False type(factory).ready = ready_property - impression_storage.put.reset_mock() + impmanager.track.reset_mock() assert client.get_treatment_with_config('some_key', 'some_feature', {'some_attribute': 1}) == ('control', None) assert mocker.call( - [Impression('some_key', 'some_feature', 'control', Label.NOT_READY, mocker.ANY, mocker.ANY, mocker.ANY)] - ) in impression_storage.put.mock_calls + [(Impression('some_key', 'some_feature', 'control', Label.NOT_READY, mocker.ANY, mocker.ANY, mocker.ANY), + {'some_attribute': 1})] + ) in impmanager.track.mock_calls # Test with exception: ready_property.return_value = True @@ -160,8 +156,8 @@ def _raise(*_): client._evaluator.evaluate_feature.side_effect = _raise assert client.get_treatment_with_config('some_key', 'some_feature') == ('control', None) assert mocker.call( - [Impression('some_key', 'some_feature', 'control', 'exception', -1, None, 1000)] - ) in impression_storage.put.mock_calls + [(Impression('some_key', 'some_feature', 'control', 'exception', -1, None, 1000), None)] + ) in impmanager.track.mock_calls assert len(telemetry_storage.inc_latency.mock_calls) == 3 def test_get_treatments(self, mocker): @@ -187,10 +183,11 @@ def _get_storage_mock(name): factory._get_storage.side_effect = _get_storage_mock type(factory).destroyed = destroyed_property - mocker.patch('splitio.client.client.time.time', new=lambda: 1) + mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) - client = Client(factory, True, None) + impmanager = mocker.Mock(spec=ImpressionManager) + client = Client(factory, impmanager, True) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { 'treatment': 'on', @@ -208,29 +205,21 @@ def _get_storage_mock(name): client._send_impression_to_listener = mocker.Mock() assert client.get_treatments('key', ['f1', 'f2']) == {'f1': 'on', 'f2': 'on'} - impressions_called = impression_storage.put.mock_calls[0][1][0] - assert Impression('key', 'f1', 'on', 'some_label', 123, None, 1000) in impressions_called - assert Impression('key', 'f2', 'on', 'some_label', 123, None, 1000) in impressions_called + impressions_called = impmanager.track.mock_calls[0][1][0] + assert (Impression('key', 'f1', 'on', 'some_label', 123, None, 1000), None) in impressions_called + assert (Impression('key', 'f2', 'on', 'some_label', 123, None, 1000), None) in impressions_called assert mocker.call('sdk.getTreatments', 5) in telemetry_storage.inc_latency.mock_calls assert client._logger.mock_calls == [] - assert mocker.call( - Impression('key', 'f1', 'on', 'some_label', 123, None, 1000), - None - ) in client._send_impression_to_listener.mock_calls - assert mocker.call( - Impression('key', 'f2', 'on', 'some_label', 123, None, 1000), - None - ) in client._send_impression_to_listener.mock_calls # Test with client not ready ready_property = mocker.PropertyMock() ready_property.return_value = False type(factory).ready = ready_property - impression_storage.put.reset_mock() + impmanager.track.reset_mock() assert client.get_treatments('some_key', ['some_feature'], {'some_attribute': 1}) == {'some_feature': 'control'} assert mocker.call( - [Impression('some_key', 'some_feature', 'control', Label.NOT_READY, mocker.ANY, mocker.ANY, mocker.ANY)] - ) in impression_storage.put.mock_calls + [(Impression('some_key', 'some_feature', 'control', Label.NOT_READY, mocker.ANY, mocker.ANY, mocker.ANY), {'some_attribute': 1})] + ) in impmanager.track.mock_calls # Test with exception: ready_property.return_value = True @@ -264,10 +253,11 @@ def _get_storage_mock(name): factory._get_storage.side_effect = _get_storage_mock type(factory).destroyed = destroyed_property - mocker.patch('splitio.client.client.time.time', new=lambda: 1) + mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) - client = Client(factory, True, None) + impmanager = mocker.Mock(spec=ImpressionManager) + client = Client(factory, impmanager, True) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { 'treatment': 'on', @@ -282,35 +272,26 @@ def _get_storage_mock(name): 'f2': evaluation } client._logger = mocker.Mock() - client._send_impression_to_listener = mocker.Mock() assert client.get_treatments_with_config('key', ['f1', 'f2']) == { 'f1': ('on', '{"color": "red"}'), 'f2': ('on', '{"color": "red"}') } - impressions_called = impression_storage.put.mock_calls[0][1][0] - assert Impression('key', 'f1', 'on', 'some_label', 123, None, 1000) in impressions_called - assert Impression('key', 'f2', 'on', 'some_label', 123, None, 1000) in impressions_called - assert mocker.call('sdk.getTreatments', 5) in telemetry_storage.inc_latency.mock_calls + impressions_called = impmanager.track.mock_calls[0][1][0] + assert (Impression('key', 'f1', 'on', 'some_label', 123, None, 1000), None) in impressions_called + assert (Impression('key', 'f2', 'on', 'some_label', 123, None, 1000), None) in impressions_called + assert mocker.call('sdk.getTreatmentsWithConfig', 5) in telemetry_storage.inc_latency.mock_calls assert client._logger.mock_calls == [] - assert mocker.call( - Impression('key', 'f1', 'on', 'some_label', 123, None, 1000), - None - ) in client._send_impression_to_listener.mock_calls - assert mocker.call( - Impression('key', 'f2', 'on', 'some_label', 123, None, 1000), - None - ) in client._send_impression_to_listener.mock_calls # Test with client not ready ready_property = mocker.PropertyMock() ready_property.return_value = False type(factory).ready = ready_property - impression_storage.put.reset_mock() + impmanager.track.reset_mock() assert client.get_treatments_with_config('some_key', ['some_feature'], {'some_attribute': 1}) == {'some_feature': ('control', None)} assert mocker.call( - [Impression('some_key', 'some_feature', 'control', Label.NOT_READY, mocker.ANY, mocker.ANY, mocker.ANY)] - ) in impression_storage.put.mock_calls + [(Impression('some_key', 'some_feature', 'control', Label.NOT_READY, mocker.ANY, mocker.ANY, mocker.ANY), {'some_attribute': 1})] + ) in impmanager.track.mock_calls # Test with exception: ready_property.return_value = True @@ -343,7 +324,8 @@ def _get_storage_mock(name): destroyed_mock = mocker.PropertyMock() type(factory).destroyed = destroyed_mock - client = Client(factory) + impmanager = mocker.Mock(spec=ImpressionManager) + client = Client(factory, impmanager, True) client.destroy() assert factory.destroy.mock_calls == [mocker.call()] assert client.destroyed is not None @@ -371,9 +353,10 @@ def _get_storage_mock(name): destroyed_mock.return_value = False type(factory).destroyed = destroyed_mock factory._apikey = 'test' - mocker.patch('splitio.client.client.time.time', new=lambda: 1) + mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) - client = Client(factory) + impmanager = mocker.Mock(spec=ImpressionManager) + client = Client(factory, impmanager, True) assert client.track('key', 'user', 'purchase', 12) is True assert mocker.call([ EventWrapper( diff --git a/tests/client/test_config.py b/tests/client/test_config.py new file mode 100644 index 00000000..0ed0d1b1 --- /dev/null +++ b/tests/client/test_config.py @@ -0,0 +1,46 @@ +"""Configuration unit tests.""" +#pylint: disable=protected-access,no-self-use,line-too-long + +from splitio.client import config +from splitio.engine.impressions import ImpressionsMode + + +class ConfigSanitizationTests(object): + """Inmemory storage-based integration tests.""" + + def test_parse_operation_mode(self): + """Make sure operation mode is correctly captured.""" + assert config._parse_operation_mode('some', {}) == 'inmemory-standalone' + assert config._parse_operation_mode('localhost', {}) == 'localhost-standalone' + assert config._parse_operation_mode('some', {'redisHost': 'x'}) == 'redis-consumer' + assert config._parse_operation_mode('some', {'uwsgiClient': True}) == 'uwsgi-consumer' + + def test_sanitize_imp_mode(self): + """Test sanitization of impressions mode.""" + mode, rate = config._sanitize_impressions_mode('OPTIMIZED', 1) + assert mode == ImpressionsMode.OPTIMIZED + assert rate == 60 + + mode, rate = config._sanitize_impressions_mode('DEBUG', 1) + assert mode == ImpressionsMode.DEBUG + assert rate == 1 + + mode, rate = config._sanitize_impressions_mode('debug', 1) + assert mode == ImpressionsMode.DEBUG + assert rate == 1 + + mode, rate = config._sanitize_impressions_mode('ANYTHING', 200) + assert mode == ImpressionsMode.OPTIMIZED + assert rate == 200 + + mode, rate = config._sanitize_impressions_mode(43, -1) + assert mode == ImpressionsMode.OPTIMIZED + assert rate == 60 + + mode, rate = config._sanitize_impressions_mode('OPTIMIZED') + assert mode == ImpressionsMode.OPTIMIZED + assert rate == 300 + + mode, rate = config._sanitize_impressions_mode('DEBUG') + assert mode == ImpressionsMode.DEBUG + assert rate == 60 diff --git a/tests/client/test_factory.py b/tests/client/test_factory.py index 0f1cc4a2..c1080731 100644 --- a/tests/client/test_factory.py +++ b/tests/client/test_factory.py @@ -4,7 +4,6 @@ import time import threading -from splitio.client.listener import ImpressionListenerWrapper from splitio.client.factory import get_factory, SplitFactory, _INSTANTIATED_FACTORIES from splitio.client.config import DEFAULT_CONFIG from splitio.storage import redis, inmemmory, uwsgi @@ -15,6 +14,7 @@ from splitio.api.impressions import ImpressionsAPI from splitio.api.events import EventsAPI from splitio.api.telemetry import TelemetryAPI +from splitio.engine.impressions import Manager as ImpressionsManager class SplitFactoryTests(object): @@ -162,7 +162,7 @@ def test_redis_client_creation(self, mocker): max_connections=999 )] assert factory._labels_enabled is False - assert isinstance(factory._impression_listener, ImpressionListenerWrapper) + assert isinstance(factory._impressions_manager, ImpressionsManager) factory.block_until_ready() time.sleep(1) # give a chance for the bg thread to set the ready status assert factory.ready @@ -180,7 +180,6 @@ def test_uwsgi_client_creation(self): assert factory._apis == {} assert factory._tasks == {} assert factory._labels_enabled is True - assert factory._impression_listener is None factory.block_until_ready() time.sleep(1) # give a chance for the bg thread to set the ready status assert factory.ready @@ -346,7 +345,7 @@ def _telemetry_task_init_mock(self, api, storage, refresh_rate): def test_multiple_factories(self, mocker): """Test multiple factories instantiation and tracking.""" def _make_factory_with_apikey(apikey, *_, **__): - return SplitFactory(apikey, {}, True) + return SplitFactory(apikey, {}, True, mocker.Mock(spec=ImpressionsManager)) factory_module_logger = mocker.Mock() build_in_memory = mocker.Mock() diff --git a/tests/client/test_input_validator.py b/tests/client/test_input_validator.py index e02961d7..c1911cdc 100644 --- a/tests/client/test_input_validator.py +++ b/tests/client/test_input_validator.py @@ -45,7 +45,7 @@ def _get_storage_mock(storage): factory_destroyed.return_value = False type(factory_mock).destroyed = factory_destroyed - client = Client(factory_mock) + client = Client(factory_mock, mocker.Mock()) client._logger = mocker.Mock() mocker.patch('splitio.client.input_validator._LOGGER', new=client._logger) @@ -273,7 +273,7 @@ def _get_storage_mock(storage): factory_destroyed.return_value = False type(factory_mock).destroyed = factory_destroyed - client = Client(factory_mock) + client = Client(factory_mock, mocker.Mock()) client._logger = mocker.Mock() mocker.patch('splitio.client.input_validator._LOGGER', new=client._logger) @@ -472,7 +472,7 @@ def _get_storage_mock(storage): ] def test_valid_properties(self, mocker): - """Test valid_properties() method""" + """Test valid_properties() method.""" assert input_validator.valid_properties(None) == (True, None, 1024) assert input_validator.valid_properties([]) == (False, None, 0) assert input_validator.valid_properties(True) == (False, None, 0) @@ -525,7 +525,7 @@ def test_track(self, mocker): type(factory_mock).destroyed = factory_destroyed factory_mock._apikey = 'some-test' - client = Client(factory_mock) + client = Client(factory_mock, mocker.Mock()) client._events_storage = mocker.Mock(spec=EventStorage) client._events_storage.put.return_value = True client._logger = mocker.Mock() @@ -795,7 +795,7 @@ def _get_storage_mock(storage): factory_destroyed.return_value = False type(factory_mock).destroyed = factory_destroyed - client = Client(factory_mock) + client = Client(factory_mock, mocker.Mock()) client._logger = mocker.Mock() mocker.patch('splitio.client.input_validator._LOGGER', new=client._logger) @@ -921,7 +921,7 @@ def _configs(treatment): return '{"some": "property"}' if treatment == 'default_treatment' else None split_mock.get_configurations_for.side_effect = _configs - client = Client(factory_mock) + client = Client(factory_mock, mocker.Mock()) client._logger = mocker.Mock() mocker.patch('splitio.client.input_validator._LOGGER', new=client._logger) diff --git a/tests/engine/test_impmanager.py b/tests/engine/test_impressions.py similarity index 84% rename from tests/engine/test_impmanager.py rename to tests/engine/test_impressions.py index 46d0b458..9d2e38f2 100644 --- a/tests/engine/test_impmanager.py +++ b/tests/engine/test_impressions.py @@ -1,6 +1,6 @@ """Impression manager, observer & hasher tests.""" from datetime import datetime -from splitio.engine.impmanager import Hasher, Observer, Counter, Manager, \ +from splitio.engine.impressions import Hasher, Observer, Counter, Manager, \ ImpressionsMode, truncate_time from splitio.models.impressions import Impression from splitio.client.listener import ImpressionListenerWrapper @@ -108,18 +108,18 @@ def forwarder(incoming): assert manager._forwarder is forwarder # An impression that hasn't happened in the last hour (pt = None) should be tracked - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), - Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), + (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking the same impression a ms later should call the forwarder function - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking a in impression with a different key makes it to the queue - manager.track([Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)]) + manager.track([(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)] @@ -130,8 +130,8 @@ def forwarder(incoming): utc_time_mock.return_value = utc_now # Track the same impressions but "one hour later" - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), - Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1), @@ -167,19 +167,19 @@ def forwarder(incoming): assert manager._forwarder is forwarder # An impression that hasn't happened in the last hour (pt = None) should be tracked - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), - Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), + (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking the same impression a ms later should call the forwarder function - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2, utc_now-3)] # Tracking a in impression with a different key makes it to the queue - manager.track([Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)]) + manager.track([(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2, utc_now-3), @@ -191,8 +191,8 @@ def forwarder(incoming): utc_time_mock.return_value = utc_now # Track the same impressions but "one hour later" - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), - Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2, old_utc-3), @@ -222,19 +222,19 @@ def forwarder(incoming): assert manager._forwarder is forwarder # An impression that hasn't happened in the last hour (pt = None) should be tracked - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), - Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), + (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking the same impression a ms later should call the forwarder function - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)] # Tracking a in impression with a different key makes it to the queue - manager.track([Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)]) + manager.track([(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), @@ -246,8 +246,8 @@ def forwarder(incoming): utc_time_mock.return_value = utc_now # Track the same impressions but "one hour later" - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), - Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2), @@ -275,19 +275,19 @@ def forwarder(incoming): assert manager._forwarder is forwarder # An impression that hasn't happened in the last hour (pt = None) should be tracked - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), - Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), + (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking the same impression a ms later should call the forwarder function - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)] # Tracking a in impression with a different key makes it to the queue - manager.track([Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)]) + manager.track([(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), @@ -299,8 +299,8 @@ def forwarder(incoming): utc_time_mock.return_value = utc_now # Track the same impressions but "one hour later" - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), - Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2), @@ -330,18 +330,18 @@ def forwarder(incoming): assert manager._forwarder is forwarder # An impression that hasn't happened in the last hour (pt = None) should be tracked - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), - Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), + (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking the same impression a ms later should call the forwarder function - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking a in impression with a different key makes it to the queue - manager.track([Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)]) + manager.track([(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)] @@ -352,8 +352,8 @@ def forwarder(incoming): utc_time_mock.return_value = utc_now # Track the same impressions but "one hour later" - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), - Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1), @@ -370,12 +370,12 @@ def forwarder(incoming): ]) assert listener.log_impression.mock_calls == [ - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3)), - mocker.call(Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3)), - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2, old_utc-3)), - mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1)), - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1, old_utc-3)), - mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2, old_utc-1)) + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), None), + mocker.call(Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), None), + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2, old_utc-3), None), + mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1), None), + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1, old_utc-3), None), + mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2, old_utc-1), None) ] @@ -400,19 +400,19 @@ def forwarder(incoming): assert manager._forwarder is forwarder # An impression that hasn't happened in the last hour (pt = None) should be tracked - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), - Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), + (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking the same impression a ms later should call the forwarder function - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2, utc_now-3)] # Tracking a in impression with a different key makes it to the queue - manager.track([Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)]) + manager.track([(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2, utc_now-3), @@ -424,8 +424,8 @@ def forwarder(incoming): utc_time_mock.return_value = utc_now # Track the same impressions but "one hour later" - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), - Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2, old_utc-3), @@ -436,12 +436,12 @@ def forwarder(incoming): assert len(manager._observer._cache._data) == 3 # distinct impressions seen assert listener.log_impression.mock_calls == [ - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3)), - mocker.call(Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3)), - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2, old_utc-3)), - mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1)), - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1, old_utc-3)), - mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2, old_utc-1)) + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), None), + mocker.call(Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), None), + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2, old_utc-3), None), + mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1), None), + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1, old_utc-3), None), + mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2, old_utc-1), None) ] @@ -466,19 +466,19 @@ def forwarder(incoming): assert manager._forwarder is forwarder # An impression that hasn't happened in the last hour (pt = None) should be tracked - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), - Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), + (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking the same impression a ms later should call the forwarder function - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)] # Tracking a in impression with a different key makes it to the queue - manager.track([Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)]) + manager.track([(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), @@ -490,8 +490,8 @@ def forwarder(incoming): utc_time_mock.return_value = utc_now # Track the same impressions but "one hour later" - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), - Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2), @@ -500,12 +500,12 @@ def forwarder(incoming): Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)] assert listener.log_impression.mock_calls == [ - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3)), - mocker.call(Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3)), - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2)), - mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1)), - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1)), - mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)) + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), None), + mocker.call(Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), None), + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2), None), + mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1), None), + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None) ] @@ -530,19 +530,19 @@ def forwarder(incoming): assert manager._forwarder is forwarder # An impression that hasn't happened in the last hour (pt = None) should be tracked - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), - Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), + (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] # Tracking the same impression a ms later should call the forwarder function - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2)] # Tracking a in impression with a different key makes it to the queue - manager.track([Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)]) + manager.track([(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), @@ -554,8 +554,8 @@ def forwarder(incoming): utc_time_mock.return_value = utc_now # Track the same impressions but "one hour later" - manager.track([Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), - Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)]) + manager.track([(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None)]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2), @@ -564,11 +564,10 @@ def forwarder(incoming): Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)] assert listener.log_impression.mock_calls == [ - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3)), - mocker.call(Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3)), - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2)), - mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1)), - mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1)), - mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2)) + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), None), + mocker.call(Impression('k1', 'f2', 'on', 'l1', 123, None, old_utc-3), None), + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-2), None), + mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, old_utc-1), None), + mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), + mocker.call(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None) ] - diff --git a/tests/integration/test_client_e2e.py b/tests/integration/test_client_e2e.py index 0da42da0..771ef760 100644 --- a/tests/integration/test_client_e2e.py +++ b/tests/integration/test_client_e2e.py @@ -14,6 +14,7 @@ RedisSplitStorage, RedisSegmentStorage, RedisTelemetryStorage from splitio.storage.adapters.redis import RedisAdapter from splitio.models import splits, segments +from splitio.engine.impressions import Manager as ImpressionsManager, ImpressionsMode class InMemoryIntegrationTests(object): @@ -40,13 +41,15 @@ def setup_method(self): data = json.loads(flo.read()) segment_storage.put(segments.from_raw(data)) - self.factory = SplitFactory('some_api_key', { #pylint:disable=attribute-defined-outside-init + storages = { 'splits': split_storage, 'segments': segment_storage, 'impressions': InMemoryImpressionStorage(5000), 'events': InMemoryEventStorage(5000), 'telemetry': InMemoryTelemetryStorage() - }, True) + } + impmanager = ImpressionsManager(storages['impressions'].put, ImpressionsMode.DEBUG) + self.factory = SplitFactory('some_api_key', storages, True, impmanager) #pylint:disable=attribute-defined-outside-init def _validate_last_impressions(self, client, *to_validate): """Validate the last N impressions are present disregarding the order.""" @@ -256,6 +259,219 @@ def test_manager_methods(self): assert len(manager.splits()) == 7 +class InMemoryOptimizedIntegrationTests(object): + """Inmemory storage-based integration tests.""" + + def setup_method(self): + """Prepare storages with test data.""" + split_storage = InMemorySplitStorage() + segment_storage = InMemorySegmentStorage() + + split_fn = os.path.join(os.path.dirname(__file__), 'files', 'splitChanges.json') + with open(split_fn, 'r') as flo: + data = json.loads(flo.read()) + for split in data['splits']: + split_storage.put(splits.from_raw(split)) + + segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentEmployeesChanges.json') + with open(segment_fn, 'r') as flo: + data = json.loads(flo.read()) + segment_storage.put(segments.from_raw(data)) + + segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentHumanBeignsChanges.json') + with open(segment_fn, 'r') as flo: + data = json.loads(flo.read()) + segment_storage.put(segments.from_raw(data)) + + storages = { + 'splits': split_storage, + 'segments': segment_storage, + 'impressions': InMemoryImpressionStorage(5000), + 'events': InMemoryEventStorage(5000), + 'telemetry': InMemoryTelemetryStorage() + } + impmanager = ImpressionsManager(storages['impressions'].put, ImpressionsMode.OPTIMIZED, standalone=True) + self.factory = SplitFactory('some_api_key', storages, True, impmanager) #pylint:disable=attribute-defined-outside-init + + def _validate_last_impressions(self, client, *to_validate): + """Validate the last N impressions are present disregarding the order.""" + imp_storage = client._factory._get_storage('impressions') + impressions = imp_storage.pop_many(len(to_validate)) + as_tup_set = set((i.feature_name, i.matching_key, i.treatment) for i in impressions) + assert as_tup_set == set(to_validate) + + def test_get_treatment(self): + """Test client.get_treatment().""" + client = self.factory.client() + + assert client.get_treatment('user1', 'sample_feature') == 'on' + self._validate_last_impressions(client, ('sample_feature', 'user1', 'on')) + client.get_treatment('user1', 'sample_feature') + client.get_treatment('user1', 'sample_feature') + client.get_treatment('user1', 'sample_feature') + + # Only one impression was added, and popped when validating, the rest were ignored + assert self.factory._storages['impressions']._impressions.qsize() == 0 + + assert client.get_treatment('invalidKey', 'sample_feature') == 'off' + self._validate_last_impressions(client, ('sample_feature', 'invalidKey', 'off')) + + assert client.get_treatment('invalidKey', 'invalid_feature') == 'control' + self._validate_last_impressions(client) # No impressions should be present + + # testing a killed feature. No matter what the key, must return default treatment + assert client.get_treatment('invalidKey', 'killed_feature') == 'defTreatment' + self._validate_last_impressions(client, ('killed_feature', 'invalidKey', 'defTreatment')) + + # testing ALL matcher + assert client.get_treatment('invalidKey', 'all_feature') == 'on' + self._validate_last_impressions(client, ('all_feature', 'invalidKey', 'on')) + + # testing WHITELIST matcher + assert client.get_treatment('whitelisted_user', 'whitelist_feature') == 'on' + self._validate_last_impressions(client, ('whitelist_feature', 'whitelisted_user', 'on')) + assert client.get_treatment('unwhitelisted_user', 'whitelist_feature') == 'off' + self._validate_last_impressions(client, ('whitelist_feature', 'unwhitelisted_user', 'off')) + + # testing INVALID matcher + assert client.get_treatment('some_user_key', 'invalid_matcher_feature') == 'control' + self._validate_last_impressions(client) # No impressions should be present + + # testing Dependency matcher + assert client.get_treatment('somekey', 'dependency_test') == 'off' + self._validate_last_impressions(client, ('dependency_test', 'somekey', 'off')) + + # testing boolean matcher + assert client.get_treatment('True', 'boolean_test') == 'on' + self._validate_last_impressions(client, ('boolean_test', 'True', 'on')) + + # testing regex matcher + assert client.get_treatment('abc4', 'regex_test') == 'on' + self._validate_last_impressions(client, ('regex_test', 'abc4', 'on')) + + def test_get_treatments(self): + """Test client.get_treatments().""" + client = self.factory.client() + + result = client.get_treatments('user1', ['sample_feature']) + assert len(result) == 1 + assert result['sample_feature'] == 'on' + self._validate_last_impressions(client, ('sample_feature', 'user1', 'on')) + + result = client.get_treatments('invalidKey', ['sample_feature']) + assert len(result) == 1 + assert result['sample_feature'] == 'off' + self._validate_last_impressions(client, ('sample_feature', 'invalidKey', 'off')) + + result = client.get_treatments('invalidKey', ['invalid_feature']) + assert len(result) == 1 + assert result['invalid_feature'] == 'control' + self._validate_last_impressions(client) + + # testing a killed feature. No matter what the key, must return default treatment + result = client.get_treatments('invalidKey', ['killed_feature']) + assert len(result) == 1 + assert result['killed_feature'] == 'defTreatment' + self._validate_last_impressions(client, ('killed_feature', 'invalidKey', 'defTreatment')) + + # testing ALL matcher + result = client.get_treatments('invalidKey', ['all_feature']) + assert len(result) == 1 + assert result['all_feature'] == 'on' + self._validate_last_impressions(client, ('all_feature', 'invalidKey', 'on')) + + # testing multiple splitNames + result = client.get_treatments('invalidKey', [ + 'all_feature', + 'killed_feature', + 'invalid_feature', + 'sample_feature' + ]) + assert len(result) == 4 + assert result['all_feature'] == 'on' + assert result['killed_feature'] == 'defTreatment' + assert result['invalid_feature'] == 'control' + assert result['sample_feature'] == 'off' + assert self.factory._storages['impressions']._impressions.qsize() == 0 + + def test_get_treatments_with_config(self): + """Test client.get_treatments_with_config().""" + client = self.factory.client() + + result = client.get_treatments_with_config('user1', ['sample_feature']) + assert len(result) == 1 + assert result['sample_feature'] == ('on', '{"size":15,"test":20}') + self._validate_last_impressions(client, ('sample_feature', 'user1', 'on')) + + result = client.get_treatments_with_config('invalidKey', ['sample_feature']) + assert len(result) == 1 + assert result['sample_feature'] == ('off', None) + self._validate_last_impressions(client, ('sample_feature', 'invalidKey', 'off')) + + result = client.get_treatments_with_config('invalidKey', ['invalid_feature']) + assert len(result) == 1 + assert result['invalid_feature'] == ('control', None) + self._validate_last_impressions(client) + + # testing a killed feature. No matter what the key, must return default treatment + result = client.get_treatments_with_config('invalidKey', ['killed_feature']) + assert len(result) == 1 + assert result['killed_feature'] == ('defTreatment', '{"size":15,"defTreatment":true}') + self._validate_last_impressions(client, ('killed_feature', 'invalidKey', 'defTreatment')) + + # testing ALL matcher + result = client.get_treatments_with_config('invalidKey', ['all_feature']) + assert len(result) == 1 + assert result['all_feature'] == ('on', None) + self._validate_last_impressions(client, ('all_feature', 'invalidKey', 'on')) + + # testing multiple splitNames + result = client.get_treatments_with_config('invalidKey', [ + 'all_feature', + 'killed_feature', + 'invalid_feature', + 'sample_feature' + ]) + assert len(result) == 4 + + assert result['all_feature'] == ('on', None) + assert result['killed_feature'] == ('defTreatment', '{"size":15,"defTreatment":true}') + assert result['invalid_feature'] == ('control', None) + assert result['sample_feature'] == ('off', None) + assert self.factory._storages['impressions']._impressions.qsize() == 0 + + def test_manager_methods(self): + """Test manager.split/splits.""" + manager = self.factory.manager() + result = manager.split('all_feature') + assert result.name == 'all_feature' + assert result.traffic_type is None + assert result.killed is False + assert len(result.treatments) == 2 + assert result.change_number == 123 + assert result.configs == {} + + result = manager.split('killed_feature') + assert result.name == 'killed_feature' + assert result.traffic_type is None + assert result.killed is True + assert len(result.treatments) == 2 + assert result.change_number == 123 + assert result.configs['defTreatment'] == '{"size":15,"defTreatment":true}' + assert result.configs['off'] == '{"size":15,"test":20}' + + result = manager.split('sample_feature') + assert result.name == 'sample_feature' + assert result.traffic_type is None + assert result.killed is False + assert len(result.treatments) == 2 + assert result.change_number == 123 + assert result.configs['on'] == '{"size":15,"test":20}' + + assert len(manager.split_names()) == 7 + assert len(manager.splits()) == 7 + + class RedisIntegrationTests(object): """Redis storage-based integration tests.""" @@ -285,13 +501,15 @@ def setup_method(self): redis_client.sadd(segment_storage._get_key(data['name']), *data['added']) redis_client.set(segment_storage._get_till_key(data['name']), data['till']) - self.factory = SplitFactory('some_api_key', { #pylint:disable=attribute-defined-outside-init + storages = { 'splits': split_storage, 'segments': segment_storage, 'impressions': RedisImpressionsStorage(redis_client, metadata), 'events': RedisEventsStorage(redis_client, metadata), 'telemetry': RedisTelemetryStorage(redis_client, metadata) - }, True) + } + impmanager = ImpressionsManager(storages['impressions'].put, ImpressionsMode.DEBUG) + self.factory = SplitFactory('some_api_key', storages, True, impmanager) #pylint:disable=attribute-defined-outside-init def _validate_last_impressions(self, client, *to_validate): """Validate the last N impressions are present disregarding the order.""" @@ -561,13 +779,15 @@ def setup_method(self): redis_client.sadd(segment_storage._get_key(data['name']), *data['added']) redis_client.set(segment_storage._get_till_key(data['name']), data['till']) - self.factory = SplitFactory('some_api_key', { #pylint:disable=attribute-defined-outside-init + storages = { 'splits': split_storage, 'segments': segment_storage, 'impressions': RedisImpressionsStorage(redis_client, metadata), 'events': RedisEventsStorage(redis_client, metadata), 'telemetry': RedisTelemetryStorage(redis_client, metadata) - }, True) + } + impmanager = ImpressionsManager(storages['impressions'].put, ImpressionsMode.DEBUG) + self.factory = SplitFactory('some_api_key', storages, True, impmanager) #pylint:disable=attribute-defined-outside-init class LocalhostIntegrationTests(object): #pylint: disable=too-few-public-methods diff --git a/tests/tasks/test_impressions_sync.py b/tests/tasks/test_impressions_sync.py index 4851abf9..a4c27b9e 100644 --- a/tests/tasks/test_impressions_sync.py +++ b/tests/tasks/test_impressions_sync.py @@ -7,6 +7,9 @@ from splitio.storage import ImpressionStorage from splitio.models.impressions import Impression from splitio.api.impressions import ImpressionsAPI +from splitio.engine.impressions import Manager as ImpressionsManager +from splitio.engine.impressions import Counter + class ImpressionsSyncTests(object): """Impressions Syncrhonization task test cases.""" @@ -36,3 +39,35 @@ def test_normal_operation(self, mocker): stop_event.wait(5) assert stop_event.is_set() assert len(api.flush_impressions.mock_calls) > calls_now + + +class ImpressionsCountSyncTests(object): + """Impressions Syncrhonization task test cases.""" + + def test_normal_operation(self, mocker): + """Test that the task works properly under normal circumstances.""" + manager = mocker.Mock(spec=ImpressionsManager) + + counters = [ + Counter.CountPerFeature('f1', 123, 2), + Counter.CountPerFeature('f2', 123, 123), + Counter.CountPerFeature('f1', 456, 111), + Counter.CountPerFeature('f2', 456, 222) + ] + + manager.get_counts.return_value = counters + api = mocker.Mock(spec=ImpressionsAPI) + api.flush_counters.return_value = HttpResponse(200, '') + impressions_sync.ImpressionsCountSyncTask._PERIOD = 1 + task = impressions_sync.ImpressionsCountSyncTask(api, manager) + task.start() + time.sleep(2) + assert task.is_running() + assert manager.get_counts.mock_calls[0] == mocker.call() + assert api.flush_counters.mock_calls[0] == mocker.call(counters) + stop_event = threading.Event() + calls_now = len(api.flush_counters.mock_calls) + task.stop(stop_event) + stop_event.wait(5) + assert stop_event.is_set() + assert len(api.flush_counters.mock_calls) > calls_now