11# Scheduler
2- # Support for coroutines using Python generator functions.
2+ # Support for coroutines using either Python generator functions or thread-based coroutines .
33#
44# Copyright (c) 2014 Charles Weir. Shared under the MIT Licence.
55
6- import datetime
7- import logging
8- import sys , traceback
6+ import threading
7+ from Coroutine import Coroutine , StopCoroutineException
98
10- class StopCoroutineException ( Exception ):
11- '''Exception used to stop a coroutine'''
12- pass
9+ class GeneratorCoroutineWrapper (Coroutine ):
10+ '''Internal: Wraps a generator-style coroutine with a thread'''
1311
14- ProgramStartTime = datetime .datetime .now ()
12+ def __init__ (self , generator ):
13+ '''`scheduler` - the main Scheduler object
14+ `generator` - the generator object created by calling the generator function'''
15+ Coroutine .__init__ (self , self .action )
16+ self .generator = generator
17+
18+ def action (self ):
19+ 'The thread entry function - executed within thread `thread`'
20+ for _ in self .generator :
21+ Coroutine .wait ()
22+
23+ def stop (self ):
24+ try :
25+ self .generator .throw (StopCoroutineException ())
26+ except StopCoroutineException :
27+ pass
28+ Coroutine .stop (self )
1529
1630
1731class Scheduler ():
18- ''' This manages an arbitrary number of coroutines (implemented as generator functions), supporting
32+ ''' This manages an arbitrary number of coroutines (including generator functions), supporting
1933 invoking each every *timeMillisBetweenWorkCalls*, and detecting when each has completed.
2034
2135 It supports one special coroutine - the updatorCoroutine, which is invoked before and after all the other ones.
2236 '''
2337
2438 timeMillisBetweenWorkCalls = 50
2539
40+ @staticmethod
41+ def makeCoroutine (coroutineOrGenerator ):
42+ return coroutineOrGenerator if coroutineOrGenerator is Coroutine else GeneratorCoroutineWrapper (coroutineOrGenerator )
43+
44+
2645 @staticmethod
2746 def currentTimeMillis ():
28- 'Answers the time in floating point milliseconds since program start.'
29- global ProgramStartTime
30- c = datetime .datetime .now () - ProgramStartTime
31- return c .days * (3600.0 * 1000 * 24 ) + c .seconds * 1000.0 + c .microseconds / 1000.0
47+ return Coroutine .currentTimeMillis ()
3248
3349 def __init__ (self , timeMillisBetweenWorkCalls = 50 ):
3450 Scheduler .timeMillisBetweenWorkCalls = timeMillisBetweenWorkCalls
3551 self .coroutines = []
3652 self .timeOfLastCall = Scheduler .currentTimeMillis ()
37- self .updateCoroutine = self .nullCoroutine () # for testing - usually replaced.
53+ self .updateCoroutine = Scheduler . makeCoroutine ( self .nullCoroutine () ) # for testing - usually replaced.
3854 #: The most recent exception raised by a coroutine:
3955 self .lastExceptionCaught = Exception ("None" )
4056
41-
4257 def doWork (self ):
4358 'Executes all the coroutines, handling exceptions'
4459
4560 timeNow = Scheduler .currentTimeMillis ()
4661 if timeNow == self .timeOfLastCall : # Ensure each call gets a different timer value.
4762 return
4863 self .timeOfLastCall = timeNow
49- self .updateCoroutine .next ()
64+ self .updateCoroutine .call ()
5065 for coroutine in self .coroutines [:]: # Copy of coroutines, so it doesn't matter removing one
51- try :
52- coroutine .next ()
53- except (StopIteration ):
54- self .coroutines .remove ( coroutine )
55- except Exception as e :
56- self .lastExceptionCaught = e
57- logging .info ( "Scheduler - caught: %r" % (e ) )
58- exc_type , exc_value , exc_traceback = sys .exc_info ()
59- trace = "" .join (traceback .format_tb (exc_traceback ))
60- logging .debug ( "Traceback (latest call first):\n %s" % trace )
61- self .coroutines .remove ( coroutine )
62-
63- self .updateCoroutine .next ()
66+ coroutine .call ()
67+ if not coroutine .is_alive ():
68+ self .coroutines .remove (coroutine )
69+ self .lastExceptionCaught = coroutine .lastExceptionCaught
70+
71+ self .updateCoroutine .call ()
6472
6573 def timeMillisToNextCall (self ):
6674 'Wait time before the next doWork call should be called.'
@@ -71,31 +79,36 @@ def timeMillisToNextCall(self):
7179 def addSensorCoroutine (self , * coroutineList ):
7280 '''Adds one or more new sensor/program coroutines to be scheduled, answering the last one to be added.
7381 Sensor coroutines are scheduled *before* Action coroutines'''
74- self .coroutines [0 :0 ] = coroutineList
75- return coroutineList [- 1 ]
82+ for generatorFunction in coroutineList :
83+ latestAdded = Scheduler .makeCoroutine (generatorFunction )
84+ self .coroutines .insert (0 , latestAdded )
85+ return generatorFunction
7686
7787 def addActionCoroutine (self , * coroutineList ):
7888 '''Adds one or more new motor control coroutines to be scheduled, answering the last coroutine to be added.
7989 Action coroutines are scheduled *after* Sensor coroutines'''
80- self .coroutines .extend ( coroutineList )
81- return coroutineList [- 1 ]
90+ for generatorFunction in coroutineList :
91+ latestAdded = Scheduler .makeCoroutine (generatorFunction )
92+ self .coroutines .append (latestAdded )
93+ return generatorFunction
8294
8395 def setUpdateCoroutine (self , coroutine ):
8496 # Private - set the coroutine that manages the interaction with the BrickPi.
8597 # The coroutine will be invoked once at the start and once at the end of each doWork call.
86- self .updateCoroutine = coroutine
98+ self .updateCoroutine = Scheduler .makeCoroutine (coroutine )
99+
100+ def findCoroutineForGenerator (self , generator ):
101+ return (c for c in self .coroutines if c .generator == generator ).next ()
87102
88103 def stopCoroutine ( self , * coroutineList ):
89104 'Terminates the given one or more coroutines'
90- for coroutine in coroutineList :
91- try :
92- coroutine .throw (StopCoroutineException )
93- except (StopCoroutineException ,StopIteration ): # If the coroutine doesn't catch the exception to tidy up, it comes back here.
94- self .coroutines .remove ( coroutine )
105+ for generator in coroutineList :
106+ coroutine = self .findCoroutineForGenerator (generator )
107+ coroutine .stop ()
95108
96109 def stopAllCoroutines (self ):
97110 'Terminates all coroutines (except the updater one) - rather drastic!'
98- self .stopCoroutine (* self .coroutines [: ]) # Makes a copy of the list - don't want to be changing it.
111+ self .stopCoroutine (* [ c . generator for c in self .coroutines ]) # Makes a copy of the list - don't want to be changing it.
99112
100113 def numCoroutines ( self ):
101114 'Answers the number of active coroutines'
@@ -106,7 +119,7 @@ def stillRunning( self, *coroutineList ):
106119 return any ( c in self .coroutines for c in coroutineList )
107120
108121 #############################################################################################
109- # Coroutines
122+ # Generator-based coroutines. Kept for backward compatibility.
110123 #############################################################################################
111124
112125 @staticmethod
@@ -155,3 +168,4 @@ def waitFor(function, *args ):
155168 'Coroutine that waits until the given function (with optional parameters) returns True.'
156169 while not function (* args ):
157170 yield
171+
0 commit comments