Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 57 additions & 10 deletions splitio/api/impressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@

from splitio.api import APIException, headers_from_metadata
from splitio.api.client import HttpClientException
from splitio.engine.impressions import ImpressionsMode


class ImpressionsAPI(object): # pylint: disable=too-few-public-methods
"""Class that uses an httpClient to communicate with the impressions API."""

def __init__(self, client, apikey, sdk_metadata):
def __init__(self, client, apikey, sdk_metadata, mode=ImpressionsMode.OPTIMIZED):
"""
Class constructor.

Expand All @@ -25,6 +26,7 @@ def __init__(self, client, apikey, sdk_metadata):
self._client = client
self._apikey = apikey
self._metadata = headers_from_metadata(sdk_metadata)
self._metadata['SplitSDKImpressionsMode'] = mode.name

@staticmethod
def _build_bulk(impressions):
Expand All @@ -35,19 +37,20 @@ def _build_bulk(impressions):
:type impressions: list(splitio.models.impressions.Impression)

:return: Dictionary of lists of impressions.
:rtype: dict
:rtype: list
"""
return [
{
'testName': test_name,
'keyImpressions': [
'f': test_name,
'i': [
{
'keyName': impression.matching_key,
'treatment': impression.treatment,
'time': impression.time,
'changeNumber': impression.change_number,
'label': impression.label,
'bucketingKey': impression.bucketing_key
'k': impression.matching_key,
't': impression.treatment,
'm': impression.time,
'c': impression.change_number,
'r': impression.label,
'b': impression.bucketing_key,
'pt': impression.previous_time
}
for impression in imps
]
Expand All @@ -58,6 +61,27 @@ def _build_bulk(impressions):
)
]

@staticmethod
def _build_counters(counters):
"""
Build an impression bulk formatted as the API expects it.

:param counters: List of impression counters per feature.
:type counters: list[splitio.engine.impressions.Counter.CountPerFeature]

:return: dict with list of impression count dtos
:rtype: dict
"""
return {
'pf': [
{
'f': pf_count.feature,
'm': pf_count.timeframe,
'rc': pf_count.count
} for pf_count in counters
]
}

def flush_impressions(self, impressions):
"""
Send impressions to the backend.
Expand All @@ -80,3 +104,26 @@ def flush_impressions(self, impressions):
self._logger.error('Http client is throwing exceptions')
self._logger.debug('Error: ', exc_info=True)
raise_from(APIException('Impressions not flushed properly.'), exc)

def flush_counters(self, counters):
"""
Send impressions to the backend.

:param impressions: Impressions bulk
:type impressions: list
"""
bulk = self._build_counters(counters)
try:
response = self._client.post(
'events',
'/testImpressions/count',
self._apikey,
body=bulk,
extra_headers=self._metadata
)
if not 200 <= response.status_code < 300:
raise APIException(response.body, response.status_code)
except HttpClientException as exc:
self._logger.error('Http client is throwing exceptions')
self._logger.debug('Error: ', exc_info=True)
raise_from(APIException('Impressions not flushed properly.'), exc)
55 changes: 18 additions & 37 deletions splitio/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from splitio.models.events import Event, EventWrapper
from splitio.models.telemetry import get_latency_bucket_index
from splitio.client import input_validator
from splitio.client.listener import ImpressionListenerException
from splitio.util import utctime_ms


class Client(object): # pylint: disable=too-many-instance-attributes
Expand All @@ -22,7 +22,7 @@ class Client(object): # pylint: disable=too-many-instance-attributes
_METRIC_GET_TREATMENT_WITH_CONFIG = 'sdk.getTreatmentWithConfig'
_METRIC_GET_TREATMENTS_WITH_CONFIG = 'sdk.getTreatmentsWithConfig'

def __init__(self, factory, labels_enabled=True, impression_listener=None):
def __init__(self, factory, impressions_manager, labels_enabled=True):
"""
Construct a Client instance.

Expand All @@ -32,15 +32,15 @@ def __init__(self, factory, labels_enabled=True, impression_listener=None):
:param labels_enabled: Whether to store labels on impressions
:type labels_enabled: bool

:param impression_listener: impression listener implementation
:type impression_listener: ImpressionListener
:param impressions_manager: impression manager instance
:type impressions_manager: splitio.engine.impressions.Manager

:rtype: Client
"""
self._logger = logging.getLogger(self.__class__.__name__)
self._factory = factory
self._labels_enabled = labels_enabled
self._impression_listener = impression_listener
self._impressions_manager = impressions_manager

self._splitter = Splitter()
self._split_storage = factory._get_storage('splits') # pylint: disable=protected-access
Expand Down Expand Up @@ -68,25 +68,6 @@ def destroyed(self):
"""Return whether the factory holding this client has been destroyed."""
return self._factory.destroyed

def _send_impression_to_listener(self, impression, attributes):
"""
Send impression result to custom listener.

:param impression: Generated impression
:type impression: Impression

:param attributes: An optional dictionary of attributes
:type attributes: dict
"""
if self._impression_listener is not None:
try:
self._impression_listener.log_impression(impression, attributes)
except ImpressionListenerException:
self._logger.error(
'An exception was raised while calling user-custom impression listener'
)
self._logger.debug('Error', exc_info=True)

def _evaluate_if_ready(self, matching_key, bucketing_key, feature, attributes=None):
if not self.ready:
return {
Expand Down Expand Up @@ -135,11 +116,10 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name):
result['impression']['label'],
result['impression']['change_number'],
bucketing_key,
start
utctime_ms(),
)

self._record_stats([impression], start, metric_name)
self._send_impression_to_listener(impression, attributes)
self._record_stats([(impression, attributes)], start, metric_name)
return result['treatment'], result['configurations']
except Exception: # pylint: disable=broad-except
self._logger.error('Error getting treatment for feature')
Expand All @@ -152,10 +132,9 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name):
Label.EXCEPTION,
self._split_storage.get_change_number(),
bucketing_key,
start
utctime_ms(),
)
self._record_stats([impression], start, metric_name)
self._send_impression_to_listener(impression, attributes)
self._record_stats([(impression, attributes)], start, metric_name)
except Exception: # pylint: disable=broad-except
self._logger.error('Error reporting impression into get_treatment exception block')
self._logger.debug('Error: ', exc_info=True)
Expand Down Expand Up @@ -200,7 +179,7 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name)
result['impression']['label'],
result['impression']['change_number'],
bucketing_key,
start)
utctime_ms())

bulk_impressions.append(impression)
treatments[feature] = (result['treatment'], result['configurations'])
Expand All @@ -215,9 +194,11 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name)
# Register impressions
try:
if bulk_impressions:
self._record_stats(bulk_impressions, start, self._METRIC_GET_TREATMENTS)
for impression in bulk_impressions:
self._send_impression_to_listener(impression, attributes)
self._record_stats(
[(i, attributes) for i in bulk_impressions],
start,
metric_name
)
except Exception: # pylint: disable=broad-except
self._logger.error('%s: An exception when trying to store '
'impressions.' % method_name)
Expand Down Expand Up @@ -350,7 +331,7 @@ def _record_stats(self, impressions, start, operation):
Record impressions and metrics.

:param impressions: Generated impressions
:type impressions: list||Impression
:type impressions: list[tuple[splitio.models.impression.Impression, dict]]

:param start: timestamp when get_treatment or get_treatments was called
:type start: int
Expand All @@ -360,7 +341,7 @@ def _record_stats(self, impressions, start, operation):
"""
try:
end = int(round(time.time() * 1000))
self._impressions_storage.put(impressions)
self._impressions_manager.track(impressions)
self._telemetry_storage.inc_latency(operation, get_latency_bucket_index(end - start))
except Exception: # pylint: disable=broad-except
self._logger.error('Error recording impressions and metrics')
Expand Down Expand Up @@ -409,7 +390,7 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
traffic_type_name=traffic_type,
event_type_id=event_type,
value=value,
timestamp=int(time.time()*1000),
timestamp=utctime_ms(),
properties=properties,
)
return self._events_storage.put([EventWrapper(
Expand Down
79 changes: 78 additions & 1 deletion splitio/client/config.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
"""Default settings for the Split.IO SDK Python client."""
from __future__ import absolute_import, division, print_function, unicode_literals

import os.path
import logging

from splitio.engine.impressions import ImpressionsMode


_LOGGER = logging.getLogger(__name__)


DEFAULT_CONFIG = {
'operationMode': 'in-memory',
'connectionTimeout': 1500,
'splitSdkMachineName': None,
'splitSdkMachineIp': None,
'featuresRefreshRate': 5,
'segmentsRefreshRate': 60,
'metricsRefreshRate': 60,
'impressionsRefreshRate': 10,
'impressionsRefreshRate': 5 * 60,
'impressionsBulkSize': 5000,
'impressionsQueueSize': 10000,
'eventsPushRate': 10,
'eventsBulkSize': 5000,
'eventsQueueSize': 10000,
'labelsEnabled': True,
'IPAddressesEnabled': True,
'impressionsMode': 'OPTIMIZED',
'impressionListener': None,
'redisLocalCacheEnabled': False,
'redisLocalCacheTTL': 5,
Expand Down Expand Up @@ -46,3 +56,70 @@
'machineIp': None,
'splitFile': os.path.join(os.path.expanduser('~'), '.split')
}


def _parse_operation_mode(apikey, config):
"""
Process incoming config to determine operation mode.

:param config: user supplied config
:type config: dict

:returns: operation mode
:rtype: str
"""
if apikey == 'localhost':
return 'localhost-standalone'

if 'redisHost' in config or 'redisSentinels' in config:
return 'redis-consumer'

if 'uwsgiClient' in config:
return 'uwsgi-consumer'

return 'inmemory-standalone'

def _sanitize_impressions_mode(mode, refresh_rate=None):
"""
Check supplied impressions mode and adjust refresh rate.

:param config: default + supplied config
:type config: dict

:returns: config with sanitized impressions mode & refresh rate
:rtype: config
"""
if not isinstance(mode, ImpressionsMode):
try:
mode = ImpressionsMode(mode.upper())
except (ValueError, AttributeError):
_LOGGER.warning('unrecognized impressionsMode supplied. Defaulting to OPTIMIZED')
mode = ImpressionsMode.OPTIMIZED

if mode == ImpressionsMode.DEBUG:
refresh_rate = max(1, refresh_rate) if refresh_rate is not None else 60
else:
refresh_rate = max(60, refresh_rate) if refresh_rate is not None else 5 * 60

return mode, refresh_rate

def sanitize(apikey, config):
"""
Look for inconsistencies or ill-formed configs and tune it accordingly.

:param apikey: customer's apikey
:type apikey: str

:param config: DEFAULT + user supplied config
:type config: dict

:returns: sanitized config
:rtype: dict
"""
config['operationMode'] = _parse_operation_mode(apikey, config)
processed = DEFAULT_CONFIG.copy()
processed.update(config)
imp_mode, imp_rate = _sanitize_impressions_mode(config.get('impressionsMode'), config.get('impressionsRefreshRate'))
processed['impressionsMode'] = imp_mode
processed['impressionsRefreshRate'] = imp_rate
return processed
Loading