Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions rpqueue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class CronTab(object):
SHOULD_QUIT = multiprocessing.Array('i', (0,), lock=False)
REENTRY_RETRY = 5
MINIMUM_DELAY = 1
EXECUTE_TASKS = False

REDIS_CONNECTION_SETTINGS = {}
POOL = None
Expand Down Expand Up @@ -224,6 +225,9 @@ def __call__(self, taskid=None, nowarn=False):
log_handler.debug("You probably intended to call the function: %s, you are half-way there", self.name)
return _ExecutingTask(self, taskid)
def next(self, now=None):
if not EXECUTE_TASKS:
return None

if isinstance(self.delay, CronTab):
return self.delay.next(now)
return self.delay
Expand Down Expand Up @@ -439,6 +443,8 @@ def execute_tasks(queues=None, threads_per_process=1, processes=1, wait_per_thre
Will execute tasks from the (optionally) provided queues until the first
value in the global SHOULD_QUIT is considered false.
'''
global EXECUTE_TASKS
EXECUTE_TASKS = True
signal.signal(signal.SIGUSR1, quit_on_signal)
log_handler.setLevel(logging.DEBUG)
sp = []
Expand Down
Binary file removed tests/__init__.pyc
Binary file not shown.
5 changes: 3 additions & 2 deletions tests/test_rpqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def test_retry_task2(self):
taskr2.execute(0)
time.sleep(.5)
self.assertEquals(saw[0], 0)

taskr.execute(1)
time.sleep(.5)
self.assertEquals(saw[0], 5)
Expand All @@ -106,6 +106,7 @@ def test_exception_no_kill(self):
self.assertEquals(saw[0], 6)

def test_periodic_task(self):
rpqueue.EXECUTE_TASKS = True
# this will cause the task to be enqueued immediately
@rpqueue.periodic_task(1, queue=queue)
def periodic_task(self):
Expand All @@ -115,7 +116,7 @@ def periodic_task(self):
pass
time.sleep(2)
x = saw[0]
self.assertTrue(x > 10, x)
self.assertTrue(x > 0, x)

def test_z_performance(self):
scale = 1000
Expand Down