From 7097150d111d3e2b393b52812ee578a04633f756 Mon Sep 17 00:00:00 2001 From: Martin Redolatti Date: Tue, 27 Oct 2020 13:31:01 -0300 Subject: [PATCH] add backoff --- splitio/sync/manager.py | 8 +++++-- splitio/util/backoff.py | 32 +++++++++++++++++++++++++++ tests/util/test_backoff.py | 45 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 2 deletions(-) create mode 100644 splitio/util/backoff.py create mode 100644 tests/util/test_backoff.py diff --git a/splitio/sync/manager.py b/splitio/sync/manager.py index 8e91f524..5988cc69 100644 --- a/splitio/sync/manager.py +++ b/splitio/sync/manager.py @@ -1,9 +1,11 @@ """Synchronization manager module.""" import logging +import time from threading import Thread from queue import Queue from splitio.push.manager import PushManager, Status from splitio.api import APIException +from splitio.util.backoff import Backoff _LOGGER = logging.getLogger(__name__) @@ -12,8 +14,7 @@ class Manager(object): """Manager Class.""" - def __init__(self, ready_flag, synchronizer, auth_api, streaming_enabled, - sse_url=None): + def __init__(self, ready_flag, synchronizer, auth_api, streaming_enabled, sse_url=None): # pylint:disable=too-many-arguments """ Construct Manager. @@ -33,6 +34,7 @@ def __init__(self, ready_flag, synchronizer, auth_api, streaming_enabled, self._ready_flag = ready_flag self._synchronizer = synchronizer if self._streaming_enabled: + self._backoff = Backoff() self._queue = Queue() self._push = PushManager(auth_api, synchronizer, self._queue, sse_url) self._push_status_handler = Thread(target=self._streaming_feedback_handler, @@ -78,6 +80,7 @@ def _streaming_feedback_handler(self): self._synchronizer.stop_periodic_fetching() self._push.update_workers_status(True) self._synchronizer.sync_all() + self._backoff.reset() elif status == Status.PUSH_SUBSYSTEM_DOWN: _LOGGER.info('streaming temporarily down. starting periodic fetching') self._push.update_workers_status(False) @@ -86,6 +89,7 @@ def _streaming_feedback_handler(self): _LOGGER.info('error in streaming. restarting flow') self._synchronizer.start_periodic_fetching() self._push.stop(True) + time.sleep(self._backoff.get()) self._push.start() elif status == Status.PUSH_NONRETRYABLE_ERROR: _LOGGER.info('non-recoverable error in streaming. switching to polling.') diff --git a/splitio/util/backoff.py b/splitio/util/backoff.py new file mode 100644 index 00000000..d26ffe50 --- /dev/null +++ b/splitio/util/backoff.py @@ -0,0 +1,32 @@ +"""Exponential Backoff duration calculator.""" + + +class Backoff(object): + """Backoff duration calculator.""" + + MAX_ALLOWED_WAIT = 30 * 60 # half an hour + + def __init__(self, base=1): + """ + Class constructor. + + :param base: basic unit to be multiplied on each iteration (seconds) + :param base: float + """ + self._base = base + self._attempt = 0 + + def get(self): + """ + Return the current time to wait and pre-calculate the next one. + + :returns: time to wait until next retry. + :rtype: float + """ + to_return = min(self._base * (2 ** self._attempt), self.MAX_ALLOWED_WAIT) + self._attempt += 1 + return to_return + + def reset(self): + """Reset the attempt count.""" + self._attempt = 0 diff --git a/tests/util/test_backoff.py b/tests/util/test_backoff.py new file mode 100644 index 00000000..5fffbc33 --- /dev/null +++ b/tests/util/test_backoff.py @@ -0,0 +1,45 @@ +"""Backoff unit tests.""" +from splitio.util.backoff import Backoff + + +class BackOffTests(object): # pylint:disable=too-few-public-methods + """Backoff test cases.""" + + def test_basic_functionality(self): # pylint:disable=no-self-use + """Test basic working.""" + backoff = Backoff() + assert backoff.get() == 1 + assert backoff.get() == 2 + assert backoff.get() == 4 + assert backoff.get() == 8 + assert backoff.get() == 16 + assert backoff.get() == 32 + assert backoff.get() == 64 + assert backoff.get() == 128 + assert backoff.get() == 256 + assert backoff.get() == 512 + assert backoff.get() == 1024 + + # assert that it's limited to 30 minutes + assert backoff.get() == 1800 + assert backoff.get() == 1800 + assert backoff.get() == 1800 + assert backoff.get() == 1800 + + # assert that resetting begins on 1 + backoff.reset() + assert backoff.get() == 1 + assert backoff.get() == 2 + assert backoff.get() == 4 + assert backoff.get() == 8 + assert backoff.get() == 16 + assert backoff.get() == 32 + assert backoff.get() == 64 + assert backoff.get() == 128 + assert backoff.get() == 256 + assert backoff.get() == 512 + assert backoff.get() == 1024 + assert backoff.get() == 1800 + assert backoff.get() == 1800 + assert backoff.get() == 1800 + assert backoff.get() == 1800