Skip to content

Commit e002023

Browse files
author
Charles Weir
committed
cw: new Coroutine class
1 parent 7f04783 commit e002023

File tree

2 files changed

+392
-0
lines changed

2 files changed

+392
-0
lines changed

BrickPython/Coroutine.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
# Scheduler
2+
# Support for coroutines using Python generator functions.
3+
#
4+
# Copyright (c) 2014 Charles Weir. Shared under the MIT Licence.
5+
6+
import logging
7+
import sys, traceback
8+
import threading
9+
import datetime
10+
11+
class StopCoroutineException( Exception ):
12+
'''Exception used to stop a coroutine'''
13+
pass
14+
15+
ProgramStartTime = datetime.datetime.now()
16+
17+
class Coroutine( threading.Thread ):
18+
def __init__(self, func, *args, **kwargs):
19+
threading.Thread.__init__(self)
20+
self.args = args
21+
self.kwargs = kwargs
22+
self.logger = logging
23+
self.mySemaphore = threading.Semaphore(0)
24+
self.callerSemaphore = threading.Semaphore(0)
25+
self.stopEvent = threading.Event()
26+
self.setDaemon(True) # Daemon threads don't prevent the process from exiting.
27+
self.func = func
28+
self.lastExceptionCaught = None
29+
self.start()
30+
31+
@staticmethod
32+
def currentTimeMillis():
33+
'Answers the time in floating point milliseconds since program start.'
34+
global ProgramStartTime
35+
c = datetime.datetime.now() - ProgramStartTime
36+
return c.days * (3600.0 * 1000 * 24) + c.seconds * 1000.0 + c.microseconds / 1000.0
37+
38+
def run(self):
39+
try:
40+
self.mySemaphore.acquire()
41+
self.func(*self.args,**self.kwargs)
42+
except (StopCoroutineException, StopIteration):
43+
pass
44+
except Exception as e:
45+
self.lastExceptionCaught = e
46+
self.logger.info( "Coroutine - caught exception: %r" % (e) )
47+
exc_type, exc_value, exc_traceback = sys.exc_info()
48+
trace = "".join(traceback.format_tb(exc_traceback))
49+
self.logger.debug( "Traceback (latest call first):\n %s" % trace )
50+
self.callerSemaphore.release()
51+
threading.Thread.run(self) # Does some cleanup.
52+
53+
def call(self):
54+
'Executed from the caller thread. Runs the coroutine until it calls wait. Does nothing if the thread has terminated.'
55+
if self.isAlive():
56+
self.mySemaphore.release()
57+
self.callerSemaphore.acquire()
58+
59+
def stop(self):
60+
'Executed from the caller thread. Stops the coroutine, causing thread to terminate.'
61+
self.stopEvent.set()
62+
self.call()
63+
64+
@staticmethod
65+
def wait():
66+
'Called from within the coroutine to hand back control to the caller thread'
67+
self=threading.currentThread()
68+
self.callerSemaphore.release()
69+
self.mySemaphore.acquire()
70+
if (self.stopEvent.isSet()):
71+
raise StopCoroutineException()
72+
73+
@staticmethod
74+
def waitMilliseconds(timeMillis):
75+
'Called from within the coroutine to wait the given time'
76+
startTime = Coroutine.currentTimeMillis()
77+
while Coroutine.currentTimeMillis() - startTime < timeMillis:
78+
Coroutine.wait()
79+
80+
# while not self.stopEvent.is_set():
81+
82+
#
83+
# self.scheduler.coroutines.remove( self )
84+
# self.scheduler.semaphore.release()
85+
86+
class GeneratorCoroutineWrapper(Coroutine):
87+
'''Internal: Wraps a generator-style coroutine with a thread'''
88+
89+
def __init__(self, scheduler, generator):
90+
'''`scheduler` - the main Scheduler object
91+
`generator` - the generator object created by calling the generator function'''
92+
Coroutine.__init__(self)
93+
self.scheduler = scheduler
94+
self.stopEvent = threading.Event()
95+
self.generator = generator
96+
self.thread = threading.Thread(target=self.action)
97+
self.thread.setDaemon(True) # Daemon threads don't prevent the process from exiting.
98+
self.thread.start()
99+
100+
def action(self):
101+
'The thread entry function - executed within thread `thread`'
102+
try:
103+
self.semaphore.acquire()
104+
while not self.stopEvent.is_set():
105+
self.generator.next()
106+
self.scheduler.semaphore.release()
107+
self.semaphore.acquire()
108+
self.generator.throw(StopCoroutineException)
109+
except (StopCoroutineException, StopIteration):
110+
pass
111+
except Exception as e:
112+
self.scheduler.lastExceptionCaught = e
113+
logging.info( "Scheduler - caught: %r" % (e) )
114+
exc_type, exc_value, exc_traceback = sys.exc_info()
115+
trace = "".join(traceback.format_tb(exc_traceback))
116+
logging.debug( "Traceback (latest call first):\n %s" % trace )
117+
118+
self.scheduler.coroutines.remove( self )
119+
self.scheduler.semaphore.release()
120+
121+
def next(self):
122+
'Runs a bit of processing (next on the generator) - executed from the scheduler thread - returns only when processing has completed'
123+
self.semaphore.release()
124+
self.scheduler.semaphore.acquire()
125+
126+
def stop(self):
127+
'Causes the thread to stop - executed from the scheduler thread'
128+
self.stopEvent.set()
129+

test/TestCoroutine.py

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
# Tests for Scheduler
2+
#
3+
# Copyright (c) 2014 Charles Weir. Shared under the MIT Licence.
4+
5+
# Run tests as
6+
# python TestCoroutine.py
7+
# or, if you've got it installed:
8+
# nosetests
9+
10+
11+
12+
from BrickPython.Coroutine import *
13+
import unittest
14+
import logging
15+
from mock import *
16+
17+
class TestCoroutine(unittest.TestCase):
18+
''' Tests for the Scheduler class, its built-in coroutines, and its coroutine handling.
19+
'''
20+
coroutineCalls = []
21+
@staticmethod
22+
def dummyCoroutineFunc():
23+
for i in range(1, 5):
24+
TestCoroutine.coroutineCalls.append(i)
25+
Coroutine.wait();
26+
27+
@staticmethod
28+
def dummyCoroutineFuncThatDoesCleanup():
29+
for i in range(1, 6):
30+
TestCoroutine.coroutineCalls.append(i)
31+
try:
32+
Coroutine.wait()
33+
finally:
34+
TestCoroutine.coroutineCalls.append( -1 )
35+
36+
@staticmethod
37+
def dummyCoroutineFuncThatThrowsException():
38+
raise Exception("Hello")
39+
40+
def setUp(self):
41+
TestCoroutine.coroutineCalls = []
42+
43+
def tearDown(self):
44+
pass
45+
46+
def testCoroutinesGetCalledUntilDone(self):
47+
# When we start a coroutine
48+
f = TestCoroutine.dummyCoroutineFunc
49+
coroutine = Coroutine( f )
50+
# It's a daemon thread
51+
self.assertTrue( coroutine.isDaemon())
52+
# It doesn't run until we call it.
53+
self.assertEqual(TestCoroutine.coroutineCalls, [] )
54+
# Each call gets one iteration
55+
coroutine.call()
56+
self.assertEqual(TestCoroutine.coroutineCalls, [1] )
57+
# And when we run it until finished
58+
for i in range(0,10):
59+
coroutine.call()
60+
# It has completed
61+
self.assertEqual(TestCoroutine.coroutineCalls, [1,2,3,4] )
62+
self.assertFalse( coroutine.isAlive() )
63+
64+
def testCoroutinesGetStoppedAndCleanedUp(self):
65+
# When we start a coroutine
66+
coroutine = Coroutine(TestCoroutine.dummyCoroutineFuncThatDoesCleanup)
67+
# run it for a bit then stop it
68+
coroutine.call()
69+
coroutine.stop()
70+
# It has stopped and cleaned up
71+
self.assertFalse( coroutine.isAlive())
72+
self.assertEquals( TestCoroutine.coroutineCalls, [1,-1] )
73+
74+
def testCoroutineExceptionLogging(self):
75+
coroutine = Coroutine(TestCoroutine.dummyCoroutineFuncThatThrowsException)
76+
coroutine.logger = Mock()
77+
coroutine.call()
78+
self.assertTrue(coroutine.logger.info.called)
79+
firstParam = coroutine.logger.info.call_args[0][0]
80+
self.assertRegexpMatches(firstParam, "Coroutine - caught exception: .*Exception.*")
81+
self.assertRegexpMatches(coroutine.logger.debug.call_args[0][0], "Traceback.*")
82+
83+
84+
def testCoroutineWaitMilliseconds(self):
85+
def dummyCoroutineFuncWaiting1Sec():
86+
Coroutine.waitMilliseconds(1000)
87+
88+
coroutine = Coroutine(dummyCoroutineFuncWaiting1Sec)
89+
# Doesn't matter not restoring this; tests never use real time:
90+
Coroutine.currentTimeMillis = Mock(side_effect = [1,10,500,1200])
91+
for i in range(1,3):
92+
coroutine.call()
93+
self.assertTrue(coroutine.is_alive(),"Coroutine dead at call %d" % i)
94+
coroutine.call()
95+
self.assertFalse(coroutine.is_alive())
96+
97+
def testCoroutineCanHaveParameters(self):
98+
def func(*args, **kwargs):
99+
print "In coroutine: %r %r" % (args, kwargs)
100+
self.assertEquals(args, (1))
101+
self.assertEquals(kwargs, {"extra": 2})
102+
coroutine = Coroutine(func, 1, extra=2)
103+
coroutine.call()
104+
105+
pass
106+
107+
def testWaitCanPassAndReceiveParameters(self):
108+
pass
109+
110+
def testRunCoroutinesUntilFirstCompletes(self):
111+
pass
112+
113+
def testRunCoroutinesUntilAllComplete(self):
114+
pass
115+
116+
117+
118+
# def testWaitMilliseconds(self):
119+
# # If we wait for 10 ms
120+
# for i in self.scheduler.waitMilliseconds(10):
121+
# pass
122+
# # that's about the time that will have passed:
123+
# assert( self.scheduler.currentTimeMillis() in range(10,12) )
124+
#
125+
# def testRunTillFirstCompletes(self):
126+
# # When we run three coroutines using runTillFirstCompletes:
127+
# for i in self.scheduler.runTillFirstCompletes(TestCoroutine.dummyCoroutine(1,9),
128+
# TestCoroutine.dummyCoroutine(1,2),
129+
# TestCoroutine.dummyCoroutine(1,9) ):
130+
# pass
131+
# # the first to complete stops the others:
132+
# self.assertEquals( TestCoroutine.coroutineCalls, [1,1,1,2] )
133+
# self.assertEquals( self.scheduler.numCoroutines(), 0)
134+
#
135+
# def testRunTillAllComplete( self ):
136+
# # When we run three coroutines using runTillAllComplete:
137+
# for i in self.scheduler.runTillAllComplete( *[TestCoroutine.dummyCoroutine(1,i) for i in [2,3,4]] ):
138+
# pass
139+
# # they all run to completion:
140+
# print TestCoroutine.coroutineCalls
141+
# assert( TestCoroutine.coroutineCalls == [1,1,1,2,2,3] )
142+
# assert( self.scheduler.numCoroutines() == 0)
143+
#
144+
# def testWithTimeout(self):
145+
# # When we run a coroutine with a timeout:
146+
# for i in self.scheduler.withTimeout(10, TestCoroutine.dummyCoroutineThatDoesCleanup(1,99) ):
147+
# pass
148+
# # It completes at around the timeout, and does cleanup:
149+
# print TestCoroutine.coroutineCalls
150+
# self.assertTrue( 0 < TestCoroutine.coroutineCalls[-2] <= 10) # N.b. currentTimeMillis is called more than once per doWork call.
151+
# self.assertEquals( TestCoroutine.coroutineCalls[-1], -1 )
152+
#
153+
# def testTimeMillisToNextCall(self):
154+
# # Given a mock timer, and a different scheduler set up with a known time interval
155+
# scheduler = Scheduler(20)
156+
# # when we have just coroutines that take no time
157+
# scheduler.addActionCoroutine( TestCoroutine.dummyCoroutine() )
158+
# # then the time to next tick is the default less a bit for the timer check calls:
159+
# scheduler.doWork()
160+
# ttnt = scheduler.timeMillisToNextCall()
161+
# assert( ttnt in range(17,20) )
162+
# # when we have an additional coroutine that takes time
163+
# scheduler.addSensorCoroutine( TestCoroutine.dummyCoroutineThatTakesTime() )
164+
# # then the time to next tick is less by the amount of time taken by the coroutine:
165+
# scheduler.doWork()
166+
# ttnt = scheduler.timeMillisToNextCall()
167+
# assert( ttnt in range(7,10) )
168+
# # but when the coroutines take more time than the time interval available
169+
# for i in range(0,2):
170+
# scheduler.addSensorCoroutine( TestCoroutine.dummyCoroutineThatTakesTime() )
171+
# # the time to next tick never gets less than zero
172+
# scheduler.doWork()
173+
# ttnt = scheduler.timeMillisToNextCall()
174+
# assert( ttnt == 0 )
175+
# # and incidentally, we should have all the coroutines still running
176+
# assert( scheduler.numCoroutines() == 4 )
177+
#
178+
# def timeCheckerCoroutine(self):
179+
# # Helper coroutine for testEachCallToACoroutineGetsADifferentTime
180+
# # Checks that each call gets a different time value.
181+
# time = Scheduler.currentTimeMillis()
182+
# while True:
183+
# yield
184+
# newTime = Scheduler.currentTimeMillis()
185+
# self.assertNotEquals( newTime, time )
186+
# time = newTime
187+
#
188+
# def testEachCallToACoroutineGetsADifferentTime(self):
189+
# scheduler = Scheduler()
190+
# Scheduler.currentTimeMillis = Mock( side_effect = [0,0,0,0,0,0,0,0,0,0,1,2,3,4,5] )
191+
# # For any coroutine,
192+
# scheduler.setUpdateCoroutine( self.timeCheckerCoroutine() )
193+
# # We can guarantee that the timer always increments between calls (for speed calculations etc).
194+
# for i in range(1,10):
195+
# scheduler.doWork()
196+
#
197+
# def testTheWaitForCoroutine(self):
198+
# scheduler = Scheduler()
199+
# arrayParameter = []
200+
# # When we create a WaitFor coroutine with a function that takes one parameter (actually an array)
201+
# coroutine = scheduler.waitFor( lambda ap: len(ap) > 0, arrayParameter )
202+
# # It runs
203+
# for i in range(0,5):
204+
# coroutine.next()
205+
# # Until the function returns true.
206+
# arrayParameter.append(1)
207+
# TestCoroutine.checkCoroutineFinished( coroutine )
208+
#
209+
# @staticmethod
210+
# def throwingCoroutine():
211+
# yield
212+
# raise Exception("Hello")
213+
#
214+
# def testExceptionThrownFromCoroutine(self):
215+
# scheduler = Scheduler()
216+
# self.assertIsNotNone(scheduler.lastExceptionCaught)
217+
# scheduler.addActionCoroutine(self.throwingCoroutine())
218+
# for i in range(1,3):
219+
# scheduler.doWork()
220+
# self.assertEquals(scheduler.lastExceptionCaught.message, "Hello")
221+
#
222+
# def testRunTillFirstCompletesWithException(self):
223+
# # When we run three coroutines using runTillFirstCompletes:
224+
# self.scheduler.addActionCoroutine(self.scheduler.runTillFirstCompletes(self.throwingCoroutine(),
225+
# TestCoroutine.dummyCoroutine(1,2),
226+
# TestCoroutine.dummyCoroutine(1,9) ))
227+
# for i in range(1,10):
228+
# self.scheduler.doWork()
229+
# # the first to complete stops the others:
230+
# self.assertEquals( TestCoroutine.coroutineCalls, [1,1] )
231+
# self.assertEquals( self.scheduler.numCoroutines(), 0)
232+
# # and the exception is caught by the Scheduler:
233+
# self.assertEquals(self.scheduler.lastExceptionCaught.message, "Hello")
234+
#
235+
# def testRunTillAllCompleteWithException( self ):
236+
# # When we run three coroutines using runTillAllComplete:
237+
# self.scheduler.addActionCoroutine(self.scheduler.runTillAllComplete(self.throwingCoroutine(),
238+
# TestCoroutine.dummyCoroutine(1,2)))
239+
# for i in range(1,10):
240+
# self.scheduler.doWork()
241+
# # the first to complete stops the others:
242+
# self.assertEquals( TestCoroutine.coroutineCalls, [1] )
243+
# self.assertEquals( self.scheduler.numCoroutines(), 0)
244+
# # and the exception is caught by the Scheduler:
245+
# self.assertEquals(self.scheduler.lastExceptionCaught.message, "Hello")
246+
#
247+
# def testCanCatchExceptionWithinNestedCoroutines(self):
248+
# self.caught = 0
249+
# def outerCoroutine(self):
250+
# try:
251+
# for i in self.throwingCoroutine():
252+
# yield
253+
# except:
254+
# self.caught = 1
255+
# for i in outerCoroutine(self):
256+
# pass
257+
# self.assertEquals(self.caught, 1)
258+
259+
260+
if __name__ == '__main__':
261+
logging.basicConfig(format='%(message)s', level=logging.DEBUG) # Logging is a simple print
262+
unittest.main()
263+

0 commit comments

Comments
 (0)