Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
'futures>=3.0.5;python_version<"3"'
]

with open(path.join(path.abspath(path.dirname(__file__)),
'splitio', 'version.py')) as f:
with open(path.join(path.abspath(path.dirname(__file__)), 'splitio', 'version.py')) as f:
exec(f.read()) # pylint: disable=exec-used

setup(
Expand All @@ -38,11 +37,13 @@
license='Apache License 2.0',
install_requires=INSTALL_REQUIRES,
tests_require=TESTS_REQUIRES,
# dependency_links=['https://github.com/splitio/mmh3cffi/tarball/feature/development#egg=mmh3cffi-0.2.0'],
extras_require={
'test': TESTS_REQUIRES,
'redis': ['redis>=2.10.5'],
'uwsgi': ['uwsgi>=2.0.0'],
'cpphash': ['mmh3cffi>=0.1.5']
# 'cpphash': ['mmh3cffi==0.2.0']
'cpphash': ['mmh3cffi@git+https://github.com/splitio/mmh3cffi@development#egg=mmh3cffi']
},
setup_requires=['pytest-runner'],
classifiers=[
Expand Down
67 changes: 57 additions & 10 deletions splitio/api/impressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@

from splitio.api import APIException, headers_from_metadata
from splitio.api.client import HttpClientException
from splitio.engine.impressions import ImpressionsMode


class ImpressionsAPI(object): # pylint: disable=too-few-public-methods
"""Class that uses an httpClient to communicate with the impressions API."""

def __init__(self, client, apikey, sdk_metadata):
def __init__(self, client, apikey, sdk_metadata, mode=ImpressionsMode.OPTIMIZED):
"""
Class constructor.

Expand All @@ -25,6 +26,7 @@ def __init__(self, client, apikey, sdk_metadata):
self._client = client
self._apikey = apikey
self._metadata = headers_from_metadata(sdk_metadata)
self._metadata['SplitSDKImpressionsMode'] = mode.name

@staticmethod
def _build_bulk(impressions):
Expand All @@ -35,19 +37,20 @@ def _build_bulk(impressions):
:type impressions: list(splitio.models.impressions.Impression)

:return: Dictionary of lists of impressions.
:rtype: dict
:rtype: list
"""
return [
{
'testName': test_name,
'keyImpressions': [
'f': test_name,
'i': [
{
'keyName': impression.matching_key,
'treatment': impression.treatment,
'time': impression.time,
'changeNumber': impression.change_number,
'label': impression.label,
'bucketingKey': impression.bucketing_key
'k': impression.matching_key,
't': impression.treatment,
'm': impression.time,
'c': impression.change_number,
'r': impression.label,
'b': impression.bucketing_key,
'pt': impression.previous_time
}
for impression in imps
]
Expand All @@ -58,6 +61,27 @@ def _build_bulk(impressions):
)
]

@staticmethod
def _build_counters(counters):
"""
Build an impression bulk formatted as the API expects it.

:param counters: List of impression counters per feature.
:type counters: list[splitio.engine.impressions.Counter.CountPerFeature]

:return: dict with list of impression count dtos
:rtype: dict
"""
return {
'pf': [
{
'f': pf_count.feature,
'm': pf_count.timeframe,
'rc': pf_count.count
} for pf_count in counters
]
}

def flush_impressions(self, impressions):
"""
Send impressions to the backend.
Expand All @@ -80,3 +104,26 @@ def flush_impressions(self, impressions):
self._logger.error('Http client is throwing exceptions')
self._logger.debug('Error: ', exc_info=True)
raise_from(APIException('Impressions not flushed properly.'), exc)

def flush_counters(self, counters):
"""
Send impressions to the backend.

:param impressions: Impressions bulk
:type impressions: list
"""
bulk = self._build_counters(counters)
try:
response = self._client.post(
'events',
'/testImpressions/count',
self._apikey,
body=bulk,
extra_headers=self._metadata
)
if not 200 <= response.status_code < 300:
raise APIException(response.body, response.status_code)
except HttpClientException as exc:
self._logger.error('Http client is throwing exceptions')
self._logger.debug('Error: ', exc_info=True)
raise_from(APIException('Impressions not flushed properly.'), exc)
55 changes: 18 additions & 37 deletions splitio/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from splitio.models.events import Event, EventWrapper
from splitio.models.telemetry import get_latency_bucket_index
from splitio.client import input_validator
from splitio.client.listener import ImpressionListenerException
from splitio.util import utctime_ms


class Client(object): # pylint: disable=too-many-instance-attributes
Expand All @@ -22,7 +22,7 @@ class Client(object): # pylint: disable=too-many-instance-attributes
_METRIC_GET_TREATMENT_WITH_CONFIG = 'sdk.getTreatmentWithConfig'
_METRIC_GET_TREATMENTS_WITH_CONFIG = 'sdk.getTreatmentsWithConfig'

def __init__(self, factory, labels_enabled=True, impression_listener=None):
def __init__(self, factory, impressions_manager, labels_enabled=True):
"""
Construct a Client instance.

Expand All @@ -32,15 +32,15 @@ def __init__(self, factory, labels_enabled=True, impression_listener=None):
:param labels_enabled: Whether to store labels on impressions
:type labels_enabled: bool

:param impression_listener: impression listener implementation
:type impression_listener: ImpressionListener
:param impressions_manager: impression manager instance
:type impressions_manager: splitio.engine.impressions.Manager

:rtype: Client
"""
self._logger = logging.getLogger(self.__class__.__name__)
self._factory = factory
self._labels_enabled = labels_enabled
self._impression_listener = impression_listener
self._impressions_manager = impressions_manager

self._splitter = Splitter()
self._split_storage = factory._get_storage('splits') # pylint: disable=protected-access
Expand Down Expand Up @@ -68,25 +68,6 @@ def destroyed(self):
"""Return whether the factory holding this client has been destroyed."""
return self._factory.destroyed

def _send_impression_to_listener(self, impression, attributes):
"""
Send impression result to custom listener.

:param impression: Generated impression
:type impression: Impression

:param attributes: An optional dictionary of attributes
:type attributes: dict
"""
if self._impression_listener is not None:
try:
self._impression_listener.log_impression(impression, attributes)
except ImpressionListenerException:
self._logger.error(
'An exception was raised while calling user-custom impression listener'
)
self._logger.debug('Error', exc_info=True)

def _evaluate_if_ready(self, matching_key, bucketing_key, feature, attributes=None):
if not self.ready:
return {
Expand Down Expand Up @@ -135,11 +116,10 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name):
result['impression']['label'],
result['impression']['change_number'],
bucketing_key,
start
utctime_ms(),
)

self._record_stats([impression], start, metric_name)
self._send_impression_to_listener(impression, attributes)
self._record_stats([(impression, attributes)], start, metric_name)
return result['treatment'], result['configurations']
except Exception: # pylint: disable=broad-except
self._logger.error('Error getting treatment for feature')
Expand All @@ -152,10 +132,9 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name):
Label.EXCEPTION,
self._split_storage.get_change_number(),
bucketing_key,
start
utctime_ms(),
)
self._record_stats([impression], start, metric_name)
self._send_impression_to_listener(impression, attributes)
self._record_stats([(impression, attributes)], start, metric_name)
except Exception: # pylint: disable=broad-except
self._logger.error('Error reporting impression into get_treatment exception block')
self._logger.debug('Error: ', exc_info=True)
Expand Down Expand Up @@ -200,7 +179,7 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name)
result['impression']['label'],
result['impression']['change_number'],
bucketing_key,
start)
utctime_ms())

bulk_impressions.append(impression)
treatments[feature] = (result['treatment'], result['configurations'])
Expand All @@ -215,9 +194,11 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name)
# Register impressions
try:
if bulk_impressions:
self._record_stats(bulk_impressions, start, self._METRIC_GET_TREATMENTS)
for impression in bulk_impressions:
self._send_impression_to_listener(impression, attributes)
self._record_stats(
[(i, attributes) for i in bulk_impressions],
start,
metric_name
)
except Exception: # pylint: disable=broad-except
self._logger.error('%s: An exception when trying to store '
'impressions.' % method_name)
Expand Down Expand Up @@ -350,7 +331,7 @@ def _record_stats(self, impressions, start, operation):
Record impressions and metrics.

:param impressions: Generated impressions
:type impressions: list||Impression
:type impressions: list[tuple[splitio.models.impression.Impression, dict]]

:param start: timestamp when get_treatment or get_treatments was called
:type start: int
Expand All @@ -360,7 +341,7 @@ def _record_stats(self, impressions, start, operation):
"""
try:
end = int(round(time.time() * 1000))
self._impressions_storage.put(impressions)
self._impressions_manager.track(impressions)
self._telemetry_storage.inc_latency(operation, get_latency_bucket_index(end - start))
except Exception: # pylint: disable=broad-except
self._logger.error('Error recording impressions and metrics')
Expand Down Expand Up @@ -409,7 +390,7 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
traffic_type_name=traffic_type,
event_type_id=event_type,
value=value,
timestamp=int(time.time()*1000),
timestamp=utctime_ms(),
properties=properties,
)
return self._events_storage.put([EventWrapper(
Expand Down
82 changes: 81 additions & 1 deletion splitio/client/config.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
"""Default settings for the Split.IO SDK Python client."""
from __future__ import absolute_import, division, print_function, unicode_literals

import os.path
import logging

from splitio.engine.impressions import ImpressionsMode


_LOGGER = logging.getLogger(__name__)


DEFAULT_CONFIG = {
'operationMode': 'in-memory',
'connectionTimeout': 1500,
'splitSdkMachineName': None,
'splitSdkMachineIp': None,
'featuresRefreshRate': 5,
'segmentsRefreshRate': 60,
'metricsRefreshRate': 60,
'impressionsRefreshRate': 10,
'impressionsRefreshRate': 5 * 60,
'impressionsBulkSize': 5000,
'impressionsQueueSize': 10000,
'eventsPushRate': 10,
'eventsBulkSize': 5000,
'eventsQueueSize': 10000,
'labelsEnabled': True,
'IPAddressesEnabled': True,
'impressionsMode': 'OPTIMIZED',
'impressionListener': None,
'redisLocalCacheEnabled': False,
'redisLocalCacheTTL': 5,
Expand Down Expand Up @@ -46,3 +56,73 @@
'machineIp': None,
'splitFile': os.path.join(os.path.expanduser('~'), '.split')
}


def _parse_operation_mode(apikey, config):
"""
Process incoming config to determine operation mode.

:param config: user supplied config
:type config: dict

:returns: operation mode
:rtype: str
"""
if apikey == 'localhost':
return 'localhost-standalone'

if 'redisHost' in config or 'redisSentinels' in config:
return 'redis-consumer'

if 'uwsgiClient' in config:
return 'uwsgi-consumer'

return 'inmemory-standalone'

def _sanitize_impressions_mode(mode, refresh_rate=None):
"""
Check supplied impressions mode and adjust refresh rate.

:param config: default + supplied config
:type config: dict

:returns: config with sanitized impressions mode & refresh rate
:rtype: config
"""
if not isinstance(mode, ImpressionsMode):
try:
mode = ImpressionsMode(mode.upper())
except (ValueError, AttributeError):
_LOGGER.warning('You passed an invalid impressionsMode, impressionsMode should be '
'one of the following values: `debug` or `optimized`. '
'Defaulting to `optimized` mode.')
mode = ImpressionsMode.OPTIMIZED

if mode == ImpressionsMode.DEBUG:
refresh_rate = max(1, refresh_rate) if refresh_rate is not None else 60
else:
refresh_rate = max(60, refresh_rate) if refresh_rate is not None else 5 * 60

return mode, refresh_rate

def sanitize(apikey, config):
"""
Look for inconsistencies or ill-formed configs and tune it accordingly.

:param apikey: customer's apikey
:type apikey: str

:param config: DEFAULT + user supplied config
:type config: dict

:returns: sanitized config
:rtype: dict
"""
config['operationMode'] = _parse_operation_mode(apikey, config)
processed = DEFAULT_CONFIG.copy()
processed.update(config)
imp_mode, imp_rate = _sanitize_impressions_mode(config.get('impressionsMode'),
config.get('impressionsRefreshRate'))
processed['impressionsMode'] = imp_mode
processed['impressionsRefreshRate'] = imp_rate
return processed
Loading