11# Scheduler
2- # Support for coroutines using Python generator functions.
2+ # Support for coroutines using either Python generator functions or thread-based coroutines .
33#
44# Copyright (c) 2014 Charles Weir. Shared under the MIT Licence.
55
6- import datetime
7- import logging
8- import sys , traceback
96import threading
10-
11- class StopCoroutineException ( Exception ):
12- '''Exception used to stop a coroutine'''
13- pass
14-
15- ProgramStartTime = datetime .datetime .now ()
16-
17- class Coroutine ():
18- def __init__ (self ):
19- #: Semaphore is one when blocked, 0 when running
20- self .semaphore = threading .Semaphore (0 )
21-
22- def action (self ):
23- pass
24-
7+ from Coroutine import Coroutine , StopCoroutineException
258
269class GeneratorCoroutineWrapper (Coroutine ):
2710 '''Internal: Wraps a generator-style coroutine with a thread'''
2811
29- def __init__ (self , scheduler , generator ):
12+ def __init__ (self , generator ):
3013 '''`scheduler` - the main Scheduler object
3114 `generator` - the generator object created by calling the generator function'''
32- Coroutine .__init__ (self )
33- self .scheduler = scheduler
34- self .stopEvent = threading .Event ()
15+ Coroutine .__init__ (self , self .action )
3516 self .generator = generator
36- self .thread = threading .Thread (target = self .action )
37- self .thread .setDaemon (True ) # Daemon threads don't prevent the process from exiting.
38- self .thread .start ()
3917
4018 def action (self ):
4119 'The thread entry function - executed within thread `thread`'
42- try :
43- self .semaphore .acquire ()
44- while not self .stopEvent .is_set ():
45- self .generator .next ()
46- self .scheduler .semaphore .release ()
47- self .semaphore .acquire ()
48- self .generator .throw (StopCoroutineException )
49- except (StopCoroutineException , StopIteration ):
50- pass
51- except Exception as e :
52- self .scheduler .lastExceptionCaught = e
53- logging .info ( "Scheduler - caught: %r" % (e ) )
54- exc_type , exc_value , exc_traceback = sys .exc_info ()
55- trace = "" .join (traceback .format_tb (exc_traceback ))
56- logging .debug ( "Traceback (latest call first):\n %s" % trace )
57-
58- self .scheduler .coroutines .remove ( self )
59- self .scheduler .semaphore .release ()
60-
61- def next (self ):
62- 'Runs a bit of processing (next on the generator) - executed from the scheduler thread - returns only when processing has completed'
63- self .semaphore .release ()
64- self .scheduler .semaphore .acquire ()
20+ for _ in self .generator :
21+ Coroutine .wait ()
6522
6623 def stop (self ):
67- 'Causes the thread to stop - executed from the scheduler thread'
68- self .stopEvent .set ()
24+ try :
25+ self .generator .throw (StopCoroutineException ())
26+ except StopCoroutineException :
27+ pass
28+ Coroutine .stop (self )
6929
7030
7131class Scheduler ():
72- ''' This manages an arbitrary number of coroutines (implemented as generator functions), supporting
32+ ''' This manages an arbitrary number of coroutines (including generator functions), supporting
7333 invoking each every *timeMillisBetweenWorkCalls*, and detecting when each has completed.
7434
7535 It supports one special coroutine - the updatorCoroutine, which is invoked before and after all the other ones.
7636 '''
7737
7838 timeMillisBetweenWorkCalls = 50
7939
40+ @staticmethod
41+ def makeCoroutine (coroutineOrGenerator ):
42+ return coroutineOrGenerator if coroutineOrGenerator is Coroutine else GeneratorCoroutineWrapper (coroutineOrGenerator )
43+
44+
8045 @staticmethod
8146 def currentTimeMillis ():
82- 'Answers the time in floating point milliseconds since program start.'
83- global ProgramStartTime
84- c = datetime .datetime .now () - ProgramStartTime
85- return c .days * (3600.0 * 1000 * 24 ) + c .seconds * 1000.0 + c .microseconds / 1000.0
47+ return Coroutine .currentTimeMillis ()
8648
8749 def __init__ (self , timeMillisBetweenWorkCalls = 50 ):
8850 Scheduler .timeMillisBetweenWorkCalls = timeMillisBetweenWorkCalls
8951 self .coroutines = []
9052 self .timeOfLastCall = Scheduler .currentTimeMillis ()
91- self .updateCoroutine = GeneratorCoroutineWrapper ( self , self .nullCoroutine () ) # for testing - usually replaced.
53+ self .updateCoroutine = Scheduler . makeCoroutine ( self .nullCoroutine () ) # for testing - usually replaced.
9254 #: The most recent exception raised by a coroutine:
9355 self .lastExceptionCaught = Exception ("None" )
94- #: Semaphore is one when blocked (running a coroutine), 0 when active
95- self .semaphore = threading .Semaphore (0 )
96-
9756
9857 def doWork (self ):
9958 'Executes all the coroutines, handling exceptions'
@@ -102,11 +61,14 @@ def doWork(self):
10261 if timeNow == self .timeOfLastCall : # Ensure each call gets a different timer value.
10362 return
10463 self .timeOfLastCall = timeNow
105- self .updateCoroutine .next ()
64+ self .updateCoroutine .call ()
10665 for coroutine in self .coroutines [:]: # Copy of coroutines, so it doesn't matter removing one
107- coroutine .next ()
66+ coroutine .call ()
67+ if not coroutine .is_alive ():
68+ self .coroutines .remove (coroutine )
69+ self .lastExceptionCaught = coroutine .lastExceptionCaught
10870
109- self .updateCoroutine .next ()
71+ self .updateCoroutine .call ()
11072
11173 def timeMillisToNextCall (self ):
11274 'Wait time before the next doWork call should be called.'
@@ -118,22 +80,22 @@ def addSensorCoroutine(self, *coroutineList):
11880 '''Adds one or more new sensor/program coroutines to be scheduled, answering the last one to be added.
11981 Sensor coroutines are scheduled *before* Action coroutines'''
12082 for generatorFunction in coroutineList :
121- latestAdded = GeneratorCoroutineWrapper ( self , generatorFunction )
83+ latestAdded = Scheduler . makeCoroutine ( generatorFunction )
12284 self .coroutines .insert (0 , latestAdded )
12385 return generatorFunction
12486
12587 def addActionCoroutine (self , * coroutineList ):
12688 '''Adds one or more new motor control coroutines to be scheduled, answering the last coroutine to be added.
12789 Action coroutines are scheduled *after* Sensor coroutines'''
12890 for generatorFunction in coroutineList :
129- latestAdded = GeneratorCoroutineWrapper ( self , generatorFunction )
91+ latestAdded = Scheduler . makeCoroutine ( generatorFunction )
13092 self .coroutines .append (latestAdded )
13193 return generatorFunction
13294
13395 def setUpdateCoroutine (self , coroutine ):
13496 # Private - set the coroutine that manages the interaction with the BrickPi.
13597 # The coroutine will be invoked once at the start and once at the end of each doWork call.
136- self .updateCoroutine = GeneratorCoroutineWrapper ( self , coroutine )
98+ self .updateCoroutine = Scheduler . makeCoroutine ( coroutine )
13799
138100 def findCoroutineForGenerator (self , generator ):
139101 return (c for c in self .coroutines if c .generator == generator ).next ()
@@ -157,7 +119,7 @@ def stillRunning( self, *coroutineList ):
157119 return any ( c in self .coroutines for c in coroutineList )
158120
159121 #############################################################################################
160- # Coroutines
122+ # Generator-based coroutines. Kept for backward compatibility.
161123 #############################################################################################
162124
163125 @staticmethod
0 commit comments