From 59953191e4873adb818879db4a55a38f11c6f455 Mon Sep 17 00:00:00 2001 From: Matias Melograno Date: Fri, 16 Oct 2020 17:08:25 -0300 Subject: [PATCH 1/3] added workers for segments and splits --- splitio/push/segmentworker.py | 71 +++++++++++++++++++++++++++++++ splitio/push/splitworker.py | 71 +++++++++++++++++++++++++++++++ tests/push/test_segment_worker.py | 36 ++++++++++++++++ tests/push/test_split_worker.py | 32 ++++++++++++++ 4 files changed, 210 insertions(+) create mode 100644 splitio/push/segmentworker.py create mode 100644 splitio/push/splitworker.py create mode 100644 tests/push/test_segment_worker.py create mode 100644 tests/push/test_split_worker.py diff --git a/splitio/push/segmentworker.py b/splitio/push/segmentworker.py new file mode 100644 index 00000000..7b15521f --- /dev/null +++ b/splitio/push/segmentworker.py @@ -0,0 +1,71 @@ +import logging +import threading + + +class SegmentWorker(object): + """Segment Worker for processing updates.""" + _centinel = object() + + def __init__(self, synchronize_segment, segment_queue): + """ + Class constructor. + + :param synchronize_segment: handler to perform segment synchronization on incoming event + :type synchronize_segment: function + + :param segment_queue: queue with segment updates notifications + :type segment_queue: queue + """ + self._segment_queue = segment_queue + self._handler = synchronize_segment + self._running = False + self._worker = None + self._logger = logging.getLogger(self.__class__.__name__) + + def set_running(self, value): + """ + Enables/Disable mode + + :param value: flag for enabling/disabling + :type value: bool + """ + self._running = value + + def is_running(self): + """ + Return running + """ + return self._running + + def _run(self): + """ + Run worker handler + """ + while self.is_running(): + event = self._segment_queue.get() + if not self.is_running(): + break + if event == self._centinel: + continue + self._logger.debug('Processing segment_update: %s, change_number: %d', event.segment_name, event.change_number) + self._handler(event.segment_name, event.change_number) + + def start(self): + """ + Start worker + """ + if self.is_running(): + self._logger.debug('Worker is already running') + return + self._logger.debug('Starting Segment Worker') + self.set_running(True) + self._worker = threading.Thread(target=self._run, daemon=True) + self._worker.start() + + def stop(self): + """ + Stop worker + """ + self._logger.debug('Stopping Segment Worker') + self.set_running(False) + self._segment_queue.put(self._centinel) diff --git a/splitio/push/splitworker.py b/splitio/push/splitworker.py new file mode 100644 index 00000000..2c232f8e --- /dev/null +++ b/splitio/push/splitworker.py @@ -0,0 +1,71 @@ +import logging +import threading + + +class SplitWorker(object): + """Split Worker for processing updates.""" + _centinel = object() + + def __init__(self, synchronize_split, split_queue): + """ + Class constructor. + + :param synchronize_split: handler to perform split synchronization on incoming event + :type synchronize_split: function + + :param split_queue: queue with split updates notifications + :type split_queue: queue + """ + self._split_queue = split_queue + self._handler = synchronize_split + self._running = False + self._worker = None + self._logger = logging.getLogger(self.__class__.__name__) + + def set_running(self, value): + """ + Enables/Disable mode + + :param value: flag for enabling/disabling + :type value: bool + """ + self._running = value + + def is_running(self): + """ + Return running + """ + return self._running + + def _run(self): + """ + Run worker handler + """ + while self.is_running(): + event = self._split_queue.get() + if not self.is_running(): + break + if event == self._centinel: + continue + self._logger.debug('Processing split_update %d', event.change_number) + self._handler(event.change_number) + + def start(self): + """ + Start worker + """ + if self.is_running(): + self._logger.debug('Worker is already running') + return + self._logger.debug('Starting Split Worker') + self.set_running(True) + self._worker = threading.Thread(target=self._run, daemon=True) + self._worker.start() + + def stop(self): + """ + Stop worker + """ + self._logger.debug('Stopping Split Worker') + self.set_running(False) + self._split_queue.put(self._centinel) diff --git a/tests/push/test_segment_worker.py b/tests/push/test_segment_worker.py new file mode 100644 index 00000000..6c41f133 --- /dev/null +++ b/tests/push/test_segment_worker.py @@ -0,0 +1,36 @@ +"""Split Worker tests.""" +import time +import queue + +from splitio.push.segmentworker import SegmentWorker +from splitio.models.notification import SegmentChangeNotification + +change_number_received = None +segment_name_received = None + +def handler_sync(segment_name, change_number): + global change_number_received + global segment_name_received + change_number_received = change_number + segment_name_received = segment_name + return + + +class SegmentWorkerTests(object): + q = queue.Queue() + segment_worker = SegmentWorker(handler_sync, q) + + def test_handler(self): + global change_number_received + assert self.segment_worker.is_running() == False + self.segment_worker.start() + assert self.segment_worker.is_running() == True + + self.q.put(SegmentChangeNotification('some', 'SEGMENT_UPDATE', 123456789, 'some')) + + time.sleep(0.1) + assert change_number_received == 123456789 + assert segment_name_received == 'some' + + self.segment_worker.stop() + assert self.segment_worker.is_running() == False diff --git a/tests/push/test_split_worker.py b/tests/push/test_split_worker.py new file mode 100644 index 00000000..05b84741 --- /dev/null +++ b/tests/push/test_split_worker.py @@ -0,0 +1,32 @@ +"""Split Worker tests.""" +import time +import queue + +from splitio.push.splitworker import SplitWorker +from splitio.models.notification import SplitChangeNotification + +change_number_received = None + +def handler_sync(change_number): + global change_number_received + change_number_received = change_number + return + + +class SplitWorkerTests(object): + q = queue.Queue() + split_worker = SplitWorker(handler_sync, q) + + def test_handler(self): + global change_number_received + assert self.split_worker.is_running() == False + self.split_worker.start() + assert self.split_worker.is_running() == True + + self.q.put(SplitChangeNotification('some', 'SPLIT_UPDATE', 123456789)) + + time.sleep(0.1) + assert change_number_received == 123456789 + + self.split_worker.stop() + assert self.split_worker.is_running() == False From b491cbd58b24ad0b109b105d92fc5b8aa3ed806f Mon Sep 17 00:00:00 2001 From: Matias Melograno Date: Fri, 16 Oct 2020 17:33:42 -0300 Subject: [PATCH 2/3] fixed 2.7 issue --- splitio/push/segmentworker.py | 3 ++- splitio/push/splitworker.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/splitio/push/segmentworker.py b/splitio/push/segmentworker.py index 7b15521f..6bce48a8 100644 --- a/splitio/push/segmentworker.py +++ b/splitio/push/segmentworker.py @@ -59,7 +59,8 @@ def start(self): return self._logger.debug('Starting Segment Worker') self.set_running(True) - self._worker = threading.Thread(target=self._run, daemon=True) + self._worker = threading.Thread(target=self._run) + self._worker.setDaemon(True) self._worker.start() def stop(self): diff --git a/splitio/push/splitworker.py b/splitio/push/splitworker.py index 2c232f8e..00f0e28d 100644 --- a/splitio/push/splitworker.py +++ b/splitio/push/splitworker.py @@ -59,7 +59,8 @@ def start(self): return self._logger.debug('Starting Split Worker') self.set_running(True) - self._worker = threading.Thread(target=self._run, daemon=True) + self._worker = threading.Thread(target=self._run) + self._worker.setDaemon(True) self._worker.start() def stop(self): From dcdaf6c5d4f544724912c4df146722cf8aef9b2e Mon Sep 17 00:00:00 2001 From: Matias Melograno Date: Tue, 20 Oct 2020 11:57:51 -0300 Subject: [PATCH 3/3] general logger --- splitio/push/segmentworker.py | 10 +++++----- splitio/push/splitworker.py | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/splitio/push/segmentworker.py b/splitio/push/segmentworker.py index 6bce48a8..36f83af8 100644 --- a/splitio/push/segmentworker.py +++ b/splitio/push/segmentworker.py @@ -1,6 +1,7 @@ import logging import threading +_LOGGER = logging.getLogger(__name__) class SegmentWorker(object): """Segment Worker for processing updates.""" @@ -20,7 +21,6 @@ def __init__(self, synchronize_segment, segment_queue): self._handler = synchronize_segment self._running = False self._worker = None - self._logger = logging.getLogger(self.__class__.__name__) def set_running(self, value): """ @@ -47,7 +47,7 @@ def _run(self): break if event == self._centinel: continue - self._logger.debug('Processing segment_update: %s, change_number: %d', event.segment_name, event.change_number) + _LOGGER.debug('Processing segment_update: %s, change_number: %d', event.segment_name, event.change_number) self._handler(event.segment_name, event.change_number) def start(self): @@ -55,9 +55,9 @@ def start(self): Start worker """ if self.is_running(): - self._logger.debug('Worker is already running') + _LOGGER.debug('Worker is already running') return - self._logger.debug('Starting Segment Worker') + _LOGGER.debug('Starting Segment Worker') self.set_running(True) self._worker = threading.Thread(target=self._run) self._worker.setDaemon(True) @@ -67,6 +67,6 @@ def stop(self): """ Stop worker """ - self._logger.debug('Stopping Segment Worker') + _LOGGER.debug('Stopping Segment Worker') self.set_running(False) self._segment_queue.put(self._centinel) diff --git a/splitio/push/splitworker.py b/splitio/push/splitworker.py index 00f0e28d..ae692845 100644 --- a/splitio/push/splitworker.py +++ b/splitio/push/splitworker.py @@ -1,6 +1,7 @@ import logging import threading +_LOGGER = logging.getLogger(__name__) class SplitWorker(object): """Split Worker for processing updates.""" @@ -20,7 +21,6 @@ def __init__(self, synchronize_split, split_queue): self._handler = synchronize_split self._running = False self._worker = None - self._logger = logging.getLogger(self.__class__.__name__) def set_running(self, value): """ @@ -47,7 +47,7 @@ def _run(self): break if event == self._centinel: continue - self._logger.debug('Processing split_update %d', event.change_number) + _LOGGER.debug('Processing split_update %d', event.change_number) self._handler(event.change_number) def start(self): @@ -55,9 +55,9 @@ def start(self): Start worker """ if self.is_running(): - self._logger.debug('Worker is already running') + _LOGGER.debug('Worker is already running') return - self._logger.debug('Starting Split Worker') + _LOGGER.debug('Starting Split Worker') self.set_running(True) self._worker = threading.Thread(target=self._run) self._worker.setDaemon(True) @@ -67,6 +67,6 @@ def stop(self): """ Stop worker """ - self._logger.debug('Stopping Split Worker') + _LOGGER.debug('Stopping Split Worker') self.set_running(False) self._split_queue.put(self._centinel)