diff --git a/splitio/push/segmentworker.py b/splitio/push/segmentworker.py new file mode 100644 index 00000000..36f83af8 --- /dev/null +++ b/splitio/push/segmentworker.py @@ -0,0 +1,72 @@ +import logging +import threading + +_LOGGER = logging.getLogger(__name__) + +class SegmentWorker(object): + """Segment Worker for processing updates.""" + _centinel = object() + + def __init__(self, synchronize_segment, segment_queue): + """ + Class constructor. + + :param synchronize_segment: handler to perform segment synchronization on incoming event + :type synchronize_segment: function + + :param segment_queue: queue with segment updates notifications + :type segment_queue: queue + """ + self._segment_queue = segment_queue + self._handler = synchronize_segment + self._running = False + self._worker = None + + def set_running(self, value): + """ + Enables/Disable mode + + :param value: flag for enabling/disabling + :type value: bool + """ + self._running = value + + def is_running(self): + """ + Return running + """ + return self._running + + def _run(self): + """ + Run worker handler + """ + while self.is_running(): + event = self._segment_queue.get() + if not self.is_running(): + break + if event == self._centinel: + continue + _LOGGER.debug('Processing segment_update: %s, change_number: %d', event.segment_name, event.change_number) + self._handler(event.segment_name, event.change_number) + + def start(self): + """ + Start worker + """ + if self.is_running(): + _LOGGER.debug('Worker is already running') + return + _LOGGER.debug('Starting Segment Worker') + self.set_running(True) + self._worker = threading.Thread(target=self._run) + self._worker.setDaemon(True) + self._worker.start() + + def stop(self): + """ + Stop worker + """ + _LOGGER.debug('Stopping Segment Worker') + self.set_running(False) + self._segment_queue.put(self._centinel) diff --git a/splitio/push/splitworker.py b/splitio/push/splitworker.py new file mode 100644 index 00000000..ae692845 --- /dev/null +++ b/splitio/push/splitworker.py @@ -0,0 +1,72 @@ +import logging +import threading + +_LOGGER = logging.getLogger(__name__) + +class SplitWorker(object): + """Split Worker for processing updates.""" + _centinel = object() + + def __init__(self, synchronize_split, split_queue): + """ + Class constructor. + + :param synchronize_split: handler to perform split synchronization on incoming event + :type synchronize_split: function + + :param split_queue: queue with split updates notifications + :type split_queue: queue + """ + self._split_queue = split_queue + self._handler = synchronize_split + self._running = False + self._worker = None + + def set_running(self, value): + """ + Enables/Disable mode + + :param value: flag for enabling/disabling + :type value: bool + """ + self._running = value + + def is_running(self): + """ + Return running + """ + return self._running + + def _run(self): + """ + Run worker handler + """ + while self.is_running(): + event = self._split_queue.get() + if not self.is_running(): + break + if event == self._centinel: + continue + _LOGGER.debug('Processing split_update %d', event.change_number) + self._handler(event.change_number) + + def start(self): + """ + Start worker + """ + if self.is_running(): + _LOGGER.debug('Worker is already running') + return + _LOGGER.debug('Starting Split Worker') + self.set_running(True) + self._worker = threading.Thread(target=self._run) + self._worker.setDaemon(True) + self._worker.start() + + def stop(self): + """ + Stop worker + """ + _LOGGER.debug('Stopping Split Worker') + self.set_running(False) + self._split_queue.put(self._centinel) diff --git a/tests/push/test_segment_worker.py b/tests/push/test_segment_worker.py new file mode 100644 index 00000000..6c41f133 --- /dev/null +++ b/tests/push/test_segment_worker.py @@ -0,0 +1,36 @@ +"""Split Worker tests.""" +import time +import queue + +from splitio.push.segmentworker import SegmentWorker +from splitio.models.notification import SegmentChangeNotification + +change_number_received = None +segment_name_received = None + +def handler_sync(segment_name, change_number): + global change_number_received + global segment_name_received + change_number_received = change_number + segment_name_received = segment_name + return + + +class SegmentWorkerTests(object): + q = queue.Queue() + segment_worker = SegmentWorker(handler_sync, q) + + def test_handler(self): + global change_number_received + assert self.segment_worker.is_running() == False + self.segment_worker.start() + assert self.segment_worker.is_running() == True + + self.q.put(SegmentChangeNotification('some', 'SEGMENT_UPDATE', 123456789, 'some')) + + time.sleep(0.1) + assert change_number_received == 123456789 + assert segment_name_received == 'some' + + self.segment_worker.stop() + assert self.segment_worker.is_running() == False diff --git a/tests/push/test_split_worker.py b/tests/push/test_split_worker.py new file mode 100644 index 00000000..05b84741 --- /dev/null +++ b/tests/push/test_split_worker.py @@ -0,0 +1,32 @@ +"""Split Worker tests.""" +import time +import queue + +from splitio.push.splitworker import SplitWorker +from splitio.models.notification import SplitChangeNotification + +change_number_received = None + +def handler_sync(change_number): + global change_number_received + change_number_received = change_number + return + + +class SplitWorkerTests(object): + q = queue.Queue() + split_worker = SplitWorker(handler_sync, q) + + def test_handler(self): + global change_number_received + assert self.split_worker.is_running() == False + self.split_worker.start() + assert self.split_worker.is_running() == True + + self.q.put(SplitChangeNotification('some', 'SPLIT_UPDATE', 123456789)) + + time.sleep(0.1) + assert change_number_received == 123456789 + + self.split_worker.stop() + assert self.split_worker.is_running() == False