diff --git a/splitio/client/factory.py b/splitio/client/factory.py index 42102f19..7c33a5b2 100644 --- a/splitio/client/factory.py +++ b/splitio/client/factory.py @@ -202,9 +202,17 @@ def destroy(self, destroyed_event=None): try: if self._sync_manager is not None: - self._sync_manager.stop() - if destroyed_event is not None: - destroyed_event.set() + if destroyed_event is not None: + + def _wait_for_tasks_to_stop(): + self._sync_manager.stop(True) + destroyed_event.set() + + wait_thread = threading.Thread(target=_wait_for_tasks_to_stop) + wait_thread.setDaemon(True) + wait_thread.start() + else: + self._sync_manager.stop(False) finally: self._status = Status.DESTROYED with _INSTANTIATED_FACTORIES_LOCK: diff --git a/splitio/sync/manager.py b/splitio/sync/manager.py index 108da6f2..06e20d64 100644 --- a/splitio/sync/manager.py +++ b/splitio/sync/manager.py @@ -61,15 +61,20 @@ def start(self): _LOGGER.debug('Exception information: ', exc_info=True) raise - def stop(self): - """Stop manager logic.""" + def stop(self, blocking): + """ + Stop manager logic. + + :param blocking: flag to wait until tasks are stopped + :type blocking: bool + """ _LOGGER.info('Stopping manager tasks') if self._streaming_enabled: self._push_status_handler_active = False self._queue.put(self._CENTINEL_EVENT) self._push.stop() self._synchronizer.stop_periodic_fetching(True) - self._synchronizer.stop_periodic_data_recording() + self._synchronizer.stop_periodic_data_recording(blocking) def _streaming_feedback_handler(self): """ diff --git a/splitio/sync/synchronizer.py b/splitio/sync/synchronizer.py index 3f53e66c..032d5555 100644 --- a/splitio/sync/synchronizer.py +++ b/splitio/sync/synchronizer.py @@ -185,7 +185,7 @@ def start_periodic_data_recording(self): pass @abc.abstractmethod - def stop_periodic_data_recording(self): + def stop_periodic_data_recording(self, blocking): """Stop recorders.""" pass @@ -284,20 +284,30 @@ def start_periodic_data_recording(self): self._split_tasks.telemetry_task.start() self._split_tasks.impressions_count_task.start() - def stop_periodic_data_recording(self): - """Stop recorders.""" + def stop_periodic_data_recording(self, blocking): + """ + Stop recorders. + + :param blocking: flag to wait until tasks are stopped + :type blocking: bool + """ _LOGGER.debug('Stopping periodic data recording') - events = [] - for task in [ - self._split_tasks.impressions_task, - self._split_tasks.events_task, - self._split_tasks.impressions_count_task - ]: - stop_event = threading.Event() - task.stop(stop_event) - events.append(stop_event) - if all(event.wait() for event in events): - _LOGGER.debug('all tasks finished successfully.') + if blocking: + events = [] + for task in [ + self._split_tasks.impressions_task, + self._split_tasks.events_task, + self._split_tasks.impressions_count_task + ]: + stop_event = threading.Event() + task.stop(stop_event) + events.append(stop_event) + if all(event.wait() for event in events): + _LOGGER.debug('all tasks finished successfully.') + else: + self._split_tasks.impressions_task.stop() + self._split_tasks.events_task.stop() + self._split_tasks.impressions_count_task.stop() self._split_tasks.telemetry_task.stop() def kill_split(self, split_name, default_treatment, change_number): diff --git a/tests/client/test_factory.py b/tests/client/test_factory.py index 2c31eb61..a3bad1d0 100644 --- a/tests/client/test_factory.py +++ b/tests/client/test_factory.py @@ -146,7 +146,98 @@ def test_uwsgi_client_creation(self): def test_destroy(self, mocker): """Test that tasks are shutdown and data is flushed when destroy is called.""" + def stop_mock(): + return + + split_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask) + split_async_task_mock.stop.side_effect = stop_mock + + def _split_task_init_mock(self, synchronize_splits, period): + self._task = split_async_task_mock + self._period = period + mocker.patch('splitio.client.factory.SplitSynchronizationTask.__init__', + new=_split_task_init_mock) + + segment_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask) + segment_async_task_mock.stop.side_effect = stop_mock + + def _segment_task_init_mock(self, synchronize_segments, worker_pool, period): + self._task = segment_async_task_mock + self._worker_pool = mocker.Mock() + self._period = period + mocker.patch('splitio.client.factory.SegmentSynchronizationTask.__init__', + new=_segment_task_init_mock) + + imp_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask) + imp_async_task_mock.stop.side_effect = stop_mock + + def _imppression_task_init_mock(self, synchronize_impressions, period): + self._period = period + self._task = imp_async_task_mock + mocker.patch('splitio.client.factory.ImpressionsSyncTask.__init__', + new=_imppression_task_init_mock) + + evt_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask) + evt_async_task_mock.stop.side_effect = stop_mock + + def _event_task_init_mock(self, synchronize_events, period): + self._period = period + self._task = evt_async_task_mock + mocker.patch('splitio.client.factory.EventsSyncTask.__init__', new=_event_task_init_mock) + + telemetry_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask) + telemetry_async_task_mock.stop.side_effect = stop_mock + + def _telemetry_task_init_mock(self, synchronize_counters, period): + self._period = period + self._task = telemetry_async_task_mock + mocker.patch('splitio.client.factory.ImpressionsCountSyncTask.__init__', + new=_telemetry_task_init_mock) + + imp_count_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask) + imp_count_async_task_mock.stop.side_effect = stop_mock + + def _imppression_count_task_init_mock(self, synchronize_counters): + self._task = imp_count_async_task_mock + mocker.patch('splitio.client.factory.ImpressionsCountSyncTask.__init__', + new=_imppression_count_task_init_mock) + + split_sync = mocker.Mock(spec=SplitSynchronizer) + split_sync.synchronize_splits.return_values = None + segment_sync = mocker.Mock(spec=SegmentSynchronizer) + segment_sync.synchronize_segments.return_values = None + syncs = SplitSynchronizers(split_sync, segment_sync, mocker.Mock(), + mocker.Mock(), mocker.Mock(), mocker.Mock()) + tasks = SplitTasks(split_async_task_mock, segment_async_task_mock, imp_async_task_mock, + evt_async_task_mock, telemetry_async_task_mock, + imp_count_async_task_mock) + + # Setup synchronizer + def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sse_url=None): + synchronizer = Synchronizer(syncs, tasks) + self._ready_flag = ready_flag + self._synchronizer = synchronizer + self._streaming_enabled = False + mocker.patch('splitio.sync.manager.Manager.__init__', new=_split_synchronizer) + + # Start factory and make assertions + factory = get_factory('some_api_key') + factory.block_until_ready() + assert factory.ready + assert factory.destroyed is False + + factory.destroy() + assert len(imp_async_task_mock.stop.mock_calls) == 1 + assert len(evt_async_task_mock.stop.mock_calls) == 1 + assert len(telemetry_async_task_mock.stop.mock_calls) == 1 + assert len(imp_count_async_task_mock.stop.mock_calls) == 1 + assert factory.destroyed is True + + def test_destroy_with_event(self, mocker): + """Test that tasks are shutdown and data is flushed when destroy is called.""" + def stop_mock(event): + time.sleep(0.1) event.set() return @@ -230,7 +321,11 @@ def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sse assert factory.ready assert factory.destroyed is False - factory.destroy() + event = threading.Event() + factory.destroy(event) + assert not event.is_set() + time.sleep(1) + assert event.is_set() assert len(imp_async_task_mock.stop.mock_calls) == 1 assert len(evt_async_task_mock.stop.mock_calls) == 1 assert len(telemetry_async_task_mock.stop.mock_calls) == 1 @@ -239,8 +334,26 @@ def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sse def test_multiple_factories(self, mocker): """Test multiple factories instantiation and tracking.""" + sdk_ready_flag = threading.Event() + + def _init(self, ready_flag, some, auth_api, streaming_enabled, sse_url=None): + self._ready_flag = ready_flag + self._synchronizer = mocker.Mock(spec=Synchronizer) + self._streaming_enabled = False + mocker.patch('splitio.sync.manager.Manager.__init__', new=_init) + + def _start(self, *args, **kwargs): + sdk_ready_flag.set() + mocker.patch('splitio.sync.manager.Manager.start', new=_start) + + def _stop(self, *args, **kwargs): + pass + mocker.patch('splitio.sync.manager.Manager.stop', new=_stop) + + mockManager = Manager(sdk_ready_flag, mocker.Mock(), mocker.Mock(), False) + def _make_factory_with_apikey(apikey, *_, **__): - return SplitFactory(apikey, {}, True, mocker.Mock(spec=ImpressionsManager)) + return SplitFactory(apikey, {}, True, mocker.Mock(spec=ImpressionsManager), mockManager) factory_module_logger = mocker.Mock() build_in_memory = mocker.Mock() @@ -303,106 +416,3 @@ def _make_factory_with_apikey(apikey, *_, **__): factory2.destroy() factory3.destroy() factory4.destroy() - - -''' - def test_destroy_with_event(self, mocker): - """Test that tasks are shutdown and data is flushed when destroy is called.""" - spl_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask) - def _split_task_init_mock(self, api, storage, period, event): - self._task = spl_async_task_mock - self._api = api - self._storage = storage - self._period = period - self._event = event - event.set() - mocker.patch('splitio.client.factory.SplitSynchronizationTask.__init__', new=_split_task_init_mock) - - sgm_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask) - worker_pool_mock = mocker.Mock(spec=workerpool.WorkerPool) - def _segment_task_init_mock(self, api, storage, split_storage, period, event): - self._task = sgm_async_task_mock - self._worker_pool = worker_pool_mock - self._api = api - self._segment_storage = storage - self._split_storage = split_storage - self._period = period - self._event = event - event.set() - mocker.patch('splitio.client.factory.SegmentSynchronizationTask.__init__', new=_segment_task_init_mock) - - imp_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask) - def _imppression_task_init_mock(self, api, storage, refresh_rate, bulk_size): - self._logger = mocker.Mock() - self._impressions_api = api - self._storage = storage - self._period = refresh_rate - self._task = imp_async_task_mock - self._failed = mocker.Mock() - self._bulk_size = bulk_size - mocker.patch('splitio.client.factory.ImpressionsSyncTask.__init__', new=_imppression_task_init_mock) - - evt_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask) - def _event_task_init_mock(self, api, storage, refresh_rate, bulk_size): - self._logger = mocker.Mock() - self._impressions_api = api - self._storage = storage - self._period = refresh_rate - self._task = evt_async_task_mock - self._failed = mocker.Mock() - self._bulk_size = bulk_size - mocker.patch('splitio.client.factory.EventsSyncTask.__init__', new=_event_task_init_mock) - - tmt_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask) - def _telemetry_task_init_mock(self, api, storage, refresh_rate): - self._task = tmt_async_task_mock - self._logger = mocker.Mock() - self._api = api - self._storage = storage - self._period = refresh_rate - mocker.patch('splitio.client.factory.TelemetrySynchronizationTask.__init__', new=_telemetry_task_init_mock) - - # Start factory and make assertions - factory = get_factory('some_api_key') - assert factory.destroyed is False - - factory.block_until_ready() - assert factory.ready - - event = threading.Event() - factory.destroy(event) - - # When destroy is called an event is created and passed to each task when - # stop() is called. We will extract those events assert their type, and assert that - # by setting them, the main event gets set. - splits_event = spl_async_task_mock.stop.mock_calls[0][1][0] - segments_event = worker_pool_mock.stop.mock_calls[0][1][0] # Segment task stops when wp finishes. - impressions_event = imp_async_task_mock.stop.mock_calls[0][1][0] - events_event = evt_async_task_mock.stop.mock_calls[0][1][0] - telemetry_event = tmt_async_task_mock.stop.mock_calls[0][1][0] - - # python2 & 3 compatibility - try: - from threading import _Event as __EVENT_CLASS - except ImportError: - from threading import Event as __EVENT_CLASS - - assert isinstance(splits_event, __EVENT_CLASS) - assert isinstance(segments_event, __EVENT_CLASS) - assert isinstance(impressions_event, __EVENT_CLASS) - assert isinstance(events_event, __EVENT_CLASS) - assert isinstance(telemetry_event, __EVENT_CLASS) - assert not event.is_set() - - splits_event.set() - segments_event.set() - impressions_event.set() - events_event.set() - telemetry_event.set() - - time.sleep(1) # I/O wait to trigger context switch, to give the waiting thread - # a chance to run and set the main event. - - assert event.is_set() - assert factory.destroyed -''' diff --git a/tests/sync/test_synchronizer.py b/tests/sync/test_synchronizer.py index 90e26cc0..22769646 100644 --- a/tests/sync/test_synchronizer.py +++ b/tests/sync/test_synchronizer.py @@ -168,7 +168,7 @@ def stop_mock_2(): split_tasks = SplitTasks(mocker.Mock(), mocker.Mock(), impression_task, event_task, telemetry_task, impression_count_task) synchronizer = Synchronizer(mocker.Mock(spec=SplitSynchronizers), split_tasks) - synchronizer.stop_periodic_data_recording() + synchronizer.stop_periodic_data_recording(True) assert len(impression_task.stop.mock_calls) == 1 assert len(impression_count_task.stop.mock_calls) == 1