Skip to content

Commit 08830ce

Browse files
author
Charles Weir
committed
Finished feature threadCoroutines.
2 parents bf2b73f + 98c3f3c commit 08830ce

File tree

5 files changed

+496
-54
lines changed

5 files changed

+496
-54
lines changed

BrickPython/Coroutine.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
# Scheduler
2+
# Support for coroutines using Python generator functions.
3+
#
4+
# Copyright (c) 2014 Charles Weir. Shared under the MIT Licence.
5+
6+
import logging
7+
import sys, traceback
8+
import threading
9+
import datetime
10+
11+
class StopCoroutineException( Exception ):
12+
'''Exception used to stop a coroutine'''
13+
pass
14+
15+
ProgramStartTime = datetime.datetime.now()
16+
17+
class Coroutine( threading.Thread ):
18+
def __init__(self, func, *args, **kwargs):
19+
threading.Thread.__init__(self)
20+
self.args = args
21+
self.kwargs = kwargs
22+
self.logger = logging
23+
self.mySemaphore = threading.Semaphore(0)
24+
self.callerSemaphore = threading.Semaphore(0)
25+
self.stopEvent = threading.Event()
26+
self.setDaemon(True) # Daemon threads don't prevent the process from exiting.
27+
self.func = func
28+
self.lastExceptionCaught = None
29+
self.start()
30+
31+
@staticmethod
32+
def currentTimeMillis():
33+
'Answers the time in floating point milliseconds since program start.'
34+
global ProgramStartTime
35+
c = datetime.datetime.now() - ProgramStartTime
36+
return c.days * (3600.0 * 1000 * 24) + c.seconds * 1000.0 + c.microseconds / 1000.0
37+
38+
def run(self):
39+
self.callResult = None
40+
try:
41+
self.mySemaphore.acquire()
42+
self.func(*self.args,**self.kwargs)
43+
except (StopCoroutineException, StopIteration):
44+
pass
45+
except Exception as e:
46+
self.lastExceptionCaught = e
47+
self.logger.info( "Coroutine - caught exception: %r" % (e) )
48+
exc_type, exc_value, exc_traceback = sys.exc_info()
49+
trace = "".join(traceback.format_tb(exc_traceback))
50+
self.logger.debug( "Traceback (latest call first):\n %s" % trace )
51+
self.stopEvent.set() # Need to tell caller to do a join.
52+
self.callerSemaphore.release()
53+
threading.Thread.run(self) # Does some cleanup.
54+
55+
def call(self, param = None):
56+
'''Executed from the caller thread. Runs the coroutine until it calls wait.
57+
Does nothing if the thread has terminated.
58+
If a parameter is passed, it is returned from the Coroutine.wait() function in the coroutine thread.'''
59+
if self.is_alive():
60+
self.callParam = param
61+
self.mySemaphore.release()
62+
self.callerSemaphore.acquire()
63+
if self.stopEvent.is_set():
64+
self.join() # Ensure that is_alive is false on exit.
65+
# For testing - assertions within coroutines must be passed back to main thread.
66+
if self.lastExceptionCaught != None and isinstance(self.lastExceptionCaught, AssertionError):
67+
raise self.lastExceptionCaught
68+
return self.callResult
69+
70+
def stop(self):
71+
'''Executed from the caller thread. Stops the coroutine, causing its thread to terminate.
72+
On completion the thread has terminated: is_active() is false.
73+
To support this, a coroutine mustn't catch the StopCoroutineException (unless it re-raises it).
74+
'''
75+
self.stopEvent.set()
76+
self.call()
77+
self.join()
78+
79+
@staticmethod
80+
def wait(param = None):
81+
'''Called from within the coroutine to hand back control to the caller thread.
82+
If a parameter is passed, it will be returned from Coroutine.call in the caller thread.
83+
'''
84+
self=threading.currentThread()
85+
self.callResult = param
86+
self.callerSemaphore.release()
87+
self.mySemaphore.acquire()
88+
if (self.stopEvent.isSet()):
89+
raise StopCoroutineException()
90+
return self.callParam
91+
92+
@staticmethod
93+
def waitMilliseconds(timeMillis):
94+
'''Called from within the coroutine to wait the given time.
95+
I.e. Invocations of the coroutine using call() will do nothing until then. '''
96+
startTime = Coroutine.currentTimeMillis()
97+
while Coroutine.currentTimeMillis() - startTime < timeMillis:
98+
Coroutine.wait()
99+
100+
@staticmethod
101+
def runTillFirstCompletes(*coroutines):
102+
def runTillFirstCompletesFunc(*coroutineList):
103+
while all(c.is_alive() for c in coroutineList):
104+
for c in coroutineList:
105+
c.call()
106+
if not c.is_alive():
107+
break
108+
Coroutine.wait()
109+
for c in coroutineList:
110+
if c.is_alive():
111+
c.stop()
112+
113+
result = Coroutine(runTillFirstCompletesFunc, *coroutines)
114+
return result
115+
116+
@staticmethod
117+
def runTillAllComplete(*coroutines):
118+
def runTillAllCompleteFunc(*coroutineList):
119+
while any(c.is_alive() for c in coroutineList):
120+
for c in coroutineList:
121+
c.call()
122+
Coroutine.wait()
123+
124+
result = Coroutine(runTillAllCompleteFunc, *coroutines)
125+
return result
126+
127+
def withTimeout(self, timeoutMillis):
128+
'''Answers this coroutine, decorated with a timeout that stops it if called after timeoutMillis has elapsed.
129+
'''
130+
def timeoutFunc(timeoutMillis):
131+
Coroutine.waitMilliseconds(timeoutMillis)
132+
result = Coroutine.runTillFirstCompletes(self, Coroutine(timeoutFunc, timeoutMillis))
133+
return result
134+

BrickPython/Scheduler.py

Lines changed: 55 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,66 +1,74 @@
11
# Scheduler
2-
# Support for coroutines using Python generator functions.
2+
# Support for coroutines using either Python generator functions or thread-based coroutines.
33
#
44
# Copyright (c) 2014 Charles Weir. Shared under the MIT Licence.
55

6-
import datetime
7-
import logging
8-
import sys, traceback
6+
import threading
7+
from Coroutine import Coroutine, StopCoroutineException
98

10-
class StopCoroutineException( Exception ):
11-
'''Exception used to stop a coroutine'''
12-
pass
9+
class GeneratorCoroutineWrapper(Coroutine):
10+
'''Internal: Wraps a generator-style coroutine with a thread'''
1311

14-
ProgramStartTime = datetime.datetime.now()
12+
def __init__(self, generator):
13+
'''`scheduler` - the main Scheduler object
14+
`generator` - the generator object created by calling the generator function'''
15+
Coroutine.__init__(self, self.action)
16+
self.generator = generator
17+
18+
def action(self):
19+
'The thread entry function - executed within thread `thread`'
20+
for _ in self.generator:
21+
Coroutine.wait()
22+
23+
def stop(self):
24+
try:
25+
self.generator.throw(StopCoroutineException())
26+
except StopCoroutineException:
27+
pass
28+
Coroutine.stop(self)
1529

1630

1731
class Scheduler():
18-
''' This manages an arbitrary number of coroutines (implemented as generator functions), supporting
32+
''' This manages an arbitrary number of coroutines (including generator functions), supporting
1933
invoking each every *timeMillisBetweenWorkCalls*, and detecting when each has completed.
2034
2135
It supports one special coroutine - the updatorCoroutine, which is invoked before and after all the other ones.
2236
'''
2337

2438
timeMillisBetweenWorkCalls = 50
2539

40+
@staticmethod
41+
def makeCoroutine(coroutineOrGenerator):
42+
return coroutineOrGenerator if coroutineOrGenerator is Coroutine else GeneratorCoroutineWrapper(coroutineOrGenerator)
43+
44+
2645
@staticmethod
2746
def currentTimeMillis():
28-
'Answers the time in floating point milliseconds since program start.'
29-
global ProgramStartTime
30-
c = datetime.datetime.now() - ProgramStartTime
31-
return c.days * (3600.0 * 1000 * 24) + c.seconds * 1000.0 + c.microseconds / 1000.0
47+
return Coroutine.currentTimeMillis()
3248

3349
def __init__(self, timeMillisBetweenWorkCalls = 50):
3450
Scheduler.timeMillisBetweenWorkCalls = timeMillisBetweenWorkCalls
3551
self.coroutines = []
3652
self.timeOfLastCall = Scheduler.currentTimeMillis()
37-
self.updateCoroutine = self.nullCoroutine() # for testing - usually replaced.
53+
self.updateCoroutine = Scheduler.makeCoroutine( self.nullCoroutine() ) # for testing - usually replaced.
3854
#: The most recent exception raised by a coroutine:
3955
self.lastExceptionCaught = Exception("None")
4056

41-
4257
def doWork(self):
4358
'Executes all the coroutines, handling exceptions'
4459

4560
timeNow = Scheduler.currentTimeMillis()
4661
if timeNow == self.timeOfLastCall: # Ensure each call gets a different timer value.
4762
return
4863
self.timeOfLastCall = timeNow
49-
self.updateCoroutine.next()
64+
self.updateCoroutine.call()
5065
for coroutine in self.coroutines[:]: # Copy of coroutines, so it doesn't matter removing one
51-
try:
52-
coroutine.next()
53-
except (StopIteration):
54-
self.coroutines.remove( coroutine )
55-
except Exception as e:
56-
self.lastExceptionCaught = e
57-
logging.info( "Scheduler - caught: %r" % (e) )
58-
exc_type, exc_value, exc_traceback = sys.exc_info()
59-
trace = "".join(traceback.format_tb(exc_traceback))
60-
logging.debug( "Traceback (latest call first):\n %s" % trace )
61-
self.coroutines.remove( coroutine )
62-
63-
self.updateCoroutine.next()
66+
coroutine.call()
67+
if not coroutine.is_alive():
68+
self.coroutines.remove(coroutine)
69+
self.lastExceptionCaught = coroutine.lastExceptionCaught
70+
71+
self.updateCoroutine.call()
6472

6573
def timeMillisToNextCall(self):
6674
'Wait time before the next doWork call should be called.'
@@ -71,31 +79,36 @@ def timeMillisToNextCall(self):
7179
def addSensorCoroutine(self, *coroutineList):
7280
'''Adds one or more new sensor/program coroutines to be scheduled, answering the last one to be added.
7381
Sensor coroutines are scheduled *before* Action coroutines'''
74-
self.coroutines[0:0] = coroutineList
75-
return coroutineList[-1]
82+
for generatorFunction in coroutineList:
83+
latestAdded = Scheduler.makeCoroutine(generatorFunction)
84+
self.coroutines.insert(0, latestAdded)
85+
return generatorFunction
7686

7787
def addActionCoroutine(self, *coroutineList):
7888
'''Adds one or more new motor control coroutines to be scheduled, answering the last coroutine to be added.
7989
Action coroutines are scheduled *after* Sensor coroutines'''
80-
self.coroutines.extend( coroutineList )
81-
return coroutineList[-1]
90+
for generatorFunction in coroutineList:
91+
latestAdded = Scheduler.makeCoroutine(generatorFunction)
92+
self.coroutines.append(latestAdded)
93+
return generatorFunction
8294

8395
def setUpdateCoroutine(self, coroutine):
8496
# Private - set the coroutine that manages the interaction with the BrickPi.
8597
# The coroutine will be invoked once at the start and once at the end of each doWork call.
86-
self.updateCoroutine = coroutine
98+
self.updateCoroutine = Scheduler.makeCoroutine(coroutine)
99+
100+
def findCoroutineForGenerator(self, generator):
101+
return (c for c in self.coroutines if c.generator == generator).next()
87102

88103
def stopCoroutine( self, *coroutineList ):
89104
'Terminates the given one or more coroutines'
90-
for coroutine in coroutineList:
91-
try:
92-
coroutine.throw(StopCoroutineException)
93-
except (StopCoroutineException,StopIteration): # If the coroutine doesn't catch the exception to tidy up, it comes back here.
94-
self.coroutines.remove( coroutine )
105+
for generator in coroutineList:
106+
coroutine = self.findCoroutineForGenerator(generator)
107+
coroutine.stop()
95108

96109
def stopAllCoroutines(self):
97110
'Terminates all coroutines (except the updater one) - rather drastic!'
98-
self.stopCoroutine(*self.coroutines[:]) # Makes a copy of the list - don't want to be changing it.
111+
self.stopCoroutine(*[c.generator for c in self.coroutines]) # Makes a copy of the list - don't want to be changing it.
99112

100113
def numCoroutines( self ):
101114
'Answers the number of active coroutines'
@@ -106,7 +119,7 @@ def stillRunning( self, *coroutineList ):
106119
return any( c in self.coroutines for c in coroutineList )
107120

108121
#############################################################################################
109-
# Coroutines
122+
# Generator-based coroutines. Kept for backward compatibility.
110123
#############################################################################################
111124

112125
@staticmethod
@@ -155,3 +168,4 @@ def waitFor(function, *args ):
155168
'Coroutine that waits until the given function (with optional parameters) returns True.'
156169
while not function(*args):
157170
yield
171+

0 commit comments

Comments
 (0)