Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions splitio/client/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,17 @@ def destroy(self, destroyed_event=None):

try:
if self._sync_manager is not None:
self._sync_manager.stop()
if destroyed_event is not None:
destroyed_event.set()
if destroyed_event is not None:

def _wait_for_tasks_to_stop():
self._sync_manager.stop(True)
destroyed_event.set()

wait_thread = threading.Thread(target=_wait_for_tasks_to_stop)
wait_thread.setDaemon(True)
wait_thread.start()
else:
self._sync_manager.stop(False)
finally:
self._status = Status.DESTROYED
with _INSTANTIATED_FACTORIES_LOCK:
Expand Down
11 changes: 8 additions & 3 deletions splitio/sync/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,20 @@ def start(self):
_LOGGER.debug('Exception information: ', exc_info=True)
raise

def stop(self):
"""Stop manager logic."""
def stop(self, blocking):
"""
Stop manager logic.

:param blocking: flag to wait until tasks are stopped
:type blocking: bool
"""
_LOGGER.info('Stopping manager tasks')
if self._streaming_enabled:
self._push_status_handler_active = False
self._queue.put(self._CENTINEL_EVENT)
self._push.stop()
self._synchronizer.stop_periodic_fetching(True)
self._synchronizer.stop_periodic_data_recording()
self._synchronizer.stop_periodic_data_recording(blocking)

def _streaming_feedback_handler(self):
"""
Expand Down
38 changes: 24 additions & 14 deletions splitio/sync/synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def start_periodic_data_recording(self):
pass

@abc.abstractmethod
def stop_periodic_data_recording(self):
def stop_periodic_data_recording(self, blocking):
"""Stop recorders."""
pass

Expand Down Expand Up @@ -284,20 +284,30 @@ def start_periodic_data_recording(self):
self._split_tasks.telemetry_task.start()
self._split_tasks.impressions_count_task.start()

def stop_periodic_data_recording(self):
"""Stop recorders."""
def stop_periodic_data_recording(self, blocking):
"""
Stop recorders.

:param blocking: flag to wait until tasks are stopped
:type blocking: bool
"""
_LOGGER.debug('Stopping periodic data recording')
events = []
for task in [
self._split_tasks.impressions_task,
self._split_tasks.events_task,
self._split_tasks.impressions_count_task
]:
stop_event = threading.Event()
task.stop(stop_event)
events.append(stop_event)
if all(event.wait() for event in events):
_LOGGER.debug('all tasks finished successfully.')
if blocking:
events = []
for task in [
self._split_tasks.impressions_task,
self._split_tasks.events_task,
self._split_tasks.impressions_count_task
]:
stop_event = threading.Event()
task.stop(stop_event)
events.append(stop_event)
if all(event.wait() for event in events):
_LOGGER.debug('all tasks finished successfully.')
else:
self._split_tasks.impressions_task.stop()
self._split_tasks.events_task.stop()
self._split_tasks.impressions_count_task.stop()
self._split_tasks.telemetry_task.stop()

def kill_split(self, split_name, default_treatment, change_number):
Expand Down
220 changes: 115 additions & 105 deletions tests/client/test_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,98 @@ def test_uwsgi_client_creation(self):
def test_destroy(self, mocker):
"""Test that tasks are shutdown and data is flushed when destroy is called."""

def stop_mock():
return

split_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask)
split_async_task_mock.stop.side_effect = stop_mock

def _split_task_init_mock(self, synchronize_splits, period):
self._task = split_async_task_mock
self._period = period
mocker.patch('splitio.client.factory.SplitSynchronizationTask.__init__',
new=_split_task_init_mock)

segment_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask)
segment_async_task_mock.stop.side_effect = stop_mock

def _segment_task_init_mock(self, synchronize_segments, worker_pool, period):
self._task = segment_async_task_mock
self._worker_pool = mocker.Mock()
self._period = period
mocker.patch('splitio.client.factory.SegmentSynchronizationTask.__init__',
new=_segment_task_init_mock)

imp_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask)
imp_async_task_mock.stop.side_effect = stop_mock

def _imppression_task_init_mock(self, synchronize_impressions, period):
self._period = period
self._task = imp_async_task_mock
mocker.patch('splitio.client.factory.ImpressionsSyncTask.__init__',
new=_imppression_task_init_mock)

evt_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask)
evt_async_task_mock.stop.side_effect = stop_mock

def _event_task_init_mock(self, synchronize_events, period):
self._period = period
self._task = evt_async_task_mock
mocker.patch('splitio.client.factory.EventsSyncTask.__init__', new=_event_task_init_mock)

telemetry_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask)
telemetry_async_task_mock.stop.side_effect = stop_mock

def _telemetry_task_init_mock(self, synchronize_counters, period):
self._period = period
self._task = telemetry_async_task_mock
mocker.patch('splitio.client.factory.ImpressionsCountSyncTask.__init__',
new=_telemetry_task_init_mock)

imp_count_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask)
imp_count_async_task_mock.stop.side_effect = stop_mock

def _imppression_count_task_init_mock(self, synchronize_counters):
self._task = imp_count_async_task_mock
mocker.patch('splitio.client.factory.ImpressionsCountSyncTask.__init__',
new=_imppression_count_task_init_mock)

split_sync = mocker.Mock(spec=SplitSynchronizer)
split_sync.synchronize_splits.return_values = None
segment_sync = mocker.Mock(spec=SegmentSynchronizer)
segment_sync.synchronize_segments.return_values = None
syncs = SplitSynchronizers(split_sync, segment_sync, mocker.Mock(),
mocker.Mock(), mocker.Mock(), mocker.Mock())
tasks = SplitTasks(split_async_task_mock, segment_async_task_mock, imp_async_task_mock,
evt_async_task_mock, telemetry_async_task_mock,
imp_count_async_task_mock)

# Setup synchronizer
def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sse_url=None):
synchronizer = Synchronizer(syncs, tasks)
self._ready_flag = ready_flag
self._synchronizer = synchronizer
self._streaming_enabled = False
mocker.patch('splitio.sync.manager.Manager.__init__', new=_split_synchronizer)

# Start factory and make assertions
factory = get_factory('some_api_key')
factory.block_until_ready()
assert factory.ready
assert factory.destroyed is False

factory.destroy()
assert len(imp_async_task_mock.stop.mock_calls) == 1
assert len(evt_async_task_mock.stop.mock_calls) == 1
assert len(telemetry_async_task_mock.stop.mock_calls) == 1
assert len(imp_count_async_task_mock.stop.mock_calls) == 1
assert factory.destroyed is True

def test_destroy_with_event(self, mocker):
"""Test that tasks are shutdown and data is flushed when destroy is called."""

def stop_mock(event):
time.sleep(0.1)
event.set()
return

Expand Down Expand Up @@ -230,7 +321,11 @@ def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sse
assert factory.ready
assert factory.destroyed is False

factory.destroy()
event = threading.Event()
factory.destroy(event)
assert not event.is_set()
time.sleep(1)
assert event.is_set()
assert len(imp_async_task_mock.stop.mock_calls) == 1
assert len(evt_async_task_mock.stop.mock_calls) == 1
assert len(telemetry_async_task_mock.stop.mock_calls) == 1
Expand All @@ -239,8 +334,26 @@ def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sse

def test_multiple_factories(self, mocker):
"""Test multiple factories instantiation and tracking."""
sdk_ready_flag = threading.Event()

def _init(self, ready_flag, some, auth_api, streaming_enabled, sse_url=None):
self._ready_flag = ready_flag
self._synchronizer = mocker.Mock(spec=Synchronizer)
self._streaming_enabled = False
mocker.patch('splitio.sync.manager.Manager.__init__', new=_init)

def _start(self, *args, **kwargs):
sdk_ready_flag.set()
mocker.patch('splitio.sync.manager.Manager.start', new=_start)

def _stop(self, *args, **kwargs):
pass
mocker.patch('splitio.sync.manager.Manager.stop', new=_stop)

mockManager = Manager(sdk_ready_flag, mocker.Mock(), mocker.Mock(), False)

def _make_factory_with_apikey(apikey, *_, **__):
return SplitFactory(apikey, {}, True, mocker.Mock(spec=ImpressionsManager))
return SplitFactory(apikey, {}, True, mocker.Mock(spec=ImpressionsManager), mockManager)

factory_module_logger = mocker.Mock()
build_in_memory = mocker.Mock()
Expand Down Expand Up @@ -303,106 +416,3 @@ def _make_factory_with_apikey(apikey, *_, **__):
factory2.destroy()
factory3.destroy()
factory4.destroy()


'''
def test_destroy_with_event(self, mocker):
"""Test that tasks are shutdown and data is flushed when destroy is called."""
spl_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask)
def _split_task_init_mock(self, api, storage, period, event):
self._task = spl_async_task_mock
self._api = api
self._storage = storage
self._period = period
self._event = event
event.set()
mocker.patch('splitio.client.factory.SplitSynchronizationTask.__init__', new=_split_task_init_mock)

sgm_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask)
worker_pool_mock = mocker.Mock(spec=workerpool.WorkerPool)
def _segment_task_init_mock(self, api, storage, split_storage, period, event):
self._task = sgm_async_task_mock
self._worker_pool = worker_pool_mock
self._api = api
self._segment_storage = storage
self._split_storage = split_storage
self._period = period
self._event = event
event.set()
mocker.patch('splitio.client.factory.SegmentSynchronizationTask.__init__', new=_segment_task_init_mock)

imp_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask)
def _imppression_task_init_mock(self, api, storage, refresh_rate, bulk_size):
self._logger = mocker.Mock()
self._impressions_api = api
self._storage = storage
self._period = refresh_rate
self._task = imp_async_task_mock
self._failed = mocker.Mock()
self._bulk_size = bulk_size
mocker.patch('splitio.client.factory.ImpressionsSyncTask.__init__', new=_imppression_task_init_mock)

evt_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask)
def _event_task_init_mock(self, api, storage, refresh_rate, bulk_size):
self._logger = mocker.Mock()
self._impressions_api = api
self._storage = storage
self._period = refresh_rate
self._task = evt_async_task_mock
self._failed = mocker.Mock()
self._bulk_size = bulk_size
mocker.patch('splitio.client.factory.EventsSyncTask.__init__', new=_event_task_init_mock)

tmt_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask)
def _telemetry_task_init_mock(self, api, storage, refresh_rate):
self._task = tmt_async_task_mock
self._logger = mocker.Mock()
self._api = api
self._storage = storage
self._period = refresh_rate
mocker.patch('splitio.client.factory.TelemetrySynchronizationTask.__init__', new=_telemetry_task_init_mock)

# Start factory and make assertions
factory = get_factory('some_api_key')
assert factory.destroyed is False

factory.block_until_ready()
assert factory.ready

event = threading.Event()
factory.destroy(event)

# When destroy is called an event is created and passed to each task when
# stop() is called. We will extract those events assert their type, and assert that
# by setting them, the main event gets set.
splits_event = spl_async_task_mock.stop.mock_calls[0][1][0]
segments_event = worker_pool_mock.stop.mock_calls[0][1][0] # Segment task stops when wp finishes.
impressions_event = imp_async_task_mock.stop.mock_calls[0][1][0]
events_event = evt_async_task_mock.stop.mock_calls[0][1][0]
telemetry_event = tmt_async_task_mock.stop.mock_calls[0][1][0]

# python2 & 3 compatibility
try:
from threading import _Event as __EVENT_CLASS
except ImportError:
from threading import Event as __EVENT_CLASS

assert isinstance(splits_event, __EVENT_CLASS)
assert isinstance(segments_event, __EVENT_CLASS)
assert isinstance(impressions_event, __EVENT_CLASS)
assert isinstance(events_event, __EVENT_CLASS)
assert isinstance(telemetry_event, __EVENT_CLASS)
assert not event.is_set()

splits_event.set()
segments_event.set()
impressions_event.set()
events_event.set()
telemetry_event.set()

time.sleep(1) # I/O wait to trigger context switch, to give the waiting thread
# a chance to run and set the main event.

assert event.is_set()
assert factory.destroyed
'''
2 changes: 1 addition & 1 deletion tests/sync/test_synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def stop_mock_2():
split_tasks = SplitTasks(mocker.Mock(), mocker.Mock(), impression_task, event_task,
telemetry_task, impression_count_task)
synchronizer = Synchronizer(mocker.Mock(spec=SplitSynchronizers), split_tasks)
synchronizer.stop_periodic_data_recording()
synchronizer.stop_periodic_data_recording(True)

assert len(impression_task.stop.mock_calls) == 1
assert len(impression_count_task.stop.mock_calls) == 1
Expand Down