Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions splitio/sync/manager.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Synchronization manager module."""
import logging
import time
from threading import Thread
from queue import Queue
from splitio.push.manager import PushManager, Status
from splitio.api import APIException
from splitio.util.backoff import Backoff


_LOGGER = logging.getLogger(__name__)
Expand All @@ -12,8 +14,7 @@
class Manager(object):
"""Manager Class."""

def __init__(self, ready_flag, synchronizer, auth_api, streaming_enabled,
sse_url=None):
def __init__(self, ready_flag, synchronizer, auth_api, streaming_enabled, sse_url=None): # pylint:disable=too-many-arguments
"""
Construct Manager.

Expand All @@ -33,6 +34,7 @@ def __init__(self, ready_flag, synchronizer, auth_api, streaming_enabled,
self._ready_flag = ready_flag
self._synchronizer = synchronizer
if self._streaming_enabled:
self._backoff = Backoff()
self._queue = Queue()
self._push = PushManager(auth_api, synchronizer, self._queue, sse_url)
self._push_status_handler = Thread(target=self._streaming_feedback_handler,
Expand Down Expand Up @@ -78,6 +80,7 @@ def _streaming_feedback_handler(self):
self._synchronizer.stop_periodic_fetching()
self._push.update_workers_status(True)
self._synchronizer.sync_all()
self._backoff.reset()
elif status == Status.PUSH_SUBSYSTEM_DOWN:
_LOGGER.info('streaming temporarily down. starting periodic fetching')
self._push.update_workers_status(False)
Expand All @@ -86,6 +89,7 @@ def _streaming_feedback_handler(self):
_LOGGER.info('error in streaming. restarting flow')
self._synchronizer.start_periodic_fetching()
self._push.stop(True)
time.sleep(self._backoff.get())
self._push.start()
elif status == Status.PUSH_NONRETRYABLE_ERROR:
_LOGGER.info('non-recoverable error in streaming. switching to polling.')
Expand Down
32 changes: 32 additions & 0 deletions splitio/util/backoff.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Exponential Backoff duration calculator."""


class Backoff(object):
"""Backoff duration calculator."""

MAX_ALLOWED_WAIT = 30 * 60 # half an hour

def __init__(self, base=1):
"""
Class constructor.

:param base: basic unit to be multiplied on each iteration (seconds)
:param base: float
"""
self._base = base
self._attempt = 0

def get(self):
"""
Return the current time to wait and pre-calculate the next one.

:returns: time to wait until next retry.
:rtype: float
"""
to_return = min(self._base * (2 ** self._attempt), self.MAX_ALLOWED_WAIT)
self._attempt += 1
return to_return

def reset(self):
"""Reset the attempt count."""
self._attempt = 0
45 changes: 45 additions & 0 deletions tests/util/test_backoff.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Backoff unit tests."""
from splitio.util.backoff import Backoff


class BackOffTests(object): # pylint:disable=too-few-public-methods
"""Backoff test cases."""

def test_basic_functionality(self): # pylint:disable=no-self-use
"""Test basic working."""
backoff = Backoff()
assert backoff.get() == 1
assert backoff.get() == 2
assert backoff.get() == 4
assert backoff.get() == 8
assert backoff.get() == 16
assert backoff.get() == 32
assert backoff.get() == 64
assert backoff.get() == 128
assert backoff.get() == 256
assert backoff.get() == 512
assert backoff.get() == 1024

# assert that it's limited to 30 minutes
assert backoff.get() == 1800
assert backoff.get() == 1800
assert backoff.get() == 1800
assert backoff.get() == 1800

# assert that resetting begins on 1
backoff.reset()
assert backoff.get() == 1
assert backoff.get() == 2
assert backoff.get() == 4
assert backoff.get() == 8
assert backoff.get() == 16
assert backoff.get() == 32
assert backoff.get() == 64
assert backoff.get() == 128
assert backoff.get() == 256
assert backoff.get() == 512
assert backoff.get() == 1024
assert backoff.get() == 1800
assert backoff.get() == 1800
assert backoff.get() == 1800
assert backoff.get() == 1800