Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions splitio/push/segmentworker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import logging
import threading

_LOGGER = logging.getLogger(__name__)

class SegmentWorker(object):
"""Segment Worker for processing updates."""
_centinel = object()

def __init__(self, synchronize_segment, segment_queue):
"""
Class constructor.

:param synchronize_segment: handler to perform segment synchronization on incoming event
:type synchronize_segment: function

:param segment_queue: queue with segment updates notifications
:type segment_queue: queue
"""
self._segment_queue = segment_queue
self._handler = synchronize_segment
self._running = False
self._worker = None

def set_running(self, value):
"""
Enables/Disable mode

:param value: flag for enabling/disabling
:type value: bool
"""
self._running = value

def is_running(self):
"""
Return running
"""
return self._running

def _run(self):
"""
Run worker handler
"""
while self.is_running():
event = self._segment_queue.get()
if not self.is_running():
break
if event == self._centinel:
continue
_LOGGER.debug('Processing segment_update: %s, change_number: %d', event.segment_name, event.change_number)
self._handler(event.segment_name, event.change_number)

def start(self):
"""
Start worker
"""
if self.is_running():
_LOGGER.debug('Worker is already running')
return
_LOGGER.debug('Starting Segment Worker')
self.set_running(True)
self._worker = threading.Thread(target=self._run)
self._worker.setDaemon(True)
self._worker.start()

def stop(self):
"""
Stop worker
"""
_LOGGER.debug('Stopping Segment Worker')
self.set_running(False)
self._segment_queue.put(self._centinel)
72 changes: 72 additions & 0 deletions splitio/push/splitworker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import logging
import threading

_LOGGER = logging.getLogger(__name__)

class SplitWorker(object):
"""Split Worker for processing updates."""
_centinel = object()

def __init__(self, synchronize_split, split_queue):
"""
Class constructor.

:param synchronize_split: handler to perform split synchronization on incoming event
:type synchronize_split: function

:param split_queue: queue with split updates notifications
:type split_queue: queue
"""
self._split_queue = split_queue
self._handler = synchronize_split
self._running = False
self._worker = None

def set_running(self, value):
"""
Enables/Disable mode

:param value: flag for enabling/disabling
:type value: bool
"""
self._running = value

def is_running(self):
"""
Return running
"""
return self._running

def _run(self):
"""
Run worker handler
"""
while self.is_running():
event = self._split_queue.get()
if not self.is_running():
break
if event == self._centinel:
continue
_LOGGER.debug('Processing split_update %d', event.change_number)
self._handler(event.change_number)

def start(self):
"""
Start worker
"""
if self.is_running():
_LOGGER.debug('Worker is already running')
return
_LOGGER.debug('Starting Split Worker')
self.set_running(True)
self._worker = threading.Thread(target=self._run)
self._worker.setDaemon(True)
self._worker.start()

def stop(self):
"""
Stop worker
"""
_LOGGER.debug('Stopping Split Worker')
self.set_running(False)
self._split_queue.put(self._centinel)
36 changes: 36 additions & 0 deletions tests/push/test_segment_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Split Worker tests."""
import time
import queue

from splitio.push.segmentworker import SegmentWorker
from splitio.models.notification import SegmentChangeNotification

change_number_received = None
segment_name_received = None

def handler_sync(segment_name, change_number):
global change_number_received
global segment_name_received
change_number_received = change_number
segment_name_received = segment_name
return


class SegmentWorkerTests(object):
q = queue.Queue()
segment_worker = SegmentWorker(handler_sync, q)

def test_handler(self):
global change_number_received
assert self.segment_worker.is_running() == False
self.segment_worker.start()
assert self.segment_worker.is_running() == True

self.q.put(SegmentChangeNotification('some', 'SEGMENT_UPDATE', 123456789, 'some'))

time.sleep(0.1)
assert change_number_received == 123456789
assert segment_name_received == 'some'

self.segment_worker.stop()
assert self.segment_worker.is_running() == False
32 changes: 32 additions & 0 deletions tests/push/test_split_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Split Worker tests."""
import time
import queue

from splitio.push.splitworker import SplitWorker
from splitio.models.notification import SplitChangeNotification

change_number_received = None

def handler_sync(change_number):
global change_number_received
change_number_received = change_number
return


class SplitWorkerTests(object):
q = queue.Queue()
split_worker = SplitWorker(handler_sync, q)

def test_handler(self):
global change_number_received
assert self.split_worker.is_running() == False
self.split_worker.start()
assert self.split_worker.is_running() == True

self.q.put(SplitChangeNotification('some', 'SPLIT_UPDATE', 123456789))

time.sleep(0.1)
assert change_number_received == 123456789

self.split_worker.stop()
assert self.split_worker.is_running() == False