|
| 1 | +# Tests for Scheduler |
| 2 | +# |
| 3 | +# Copyright (c) 2014 Charles Weir. Shared under the MIT Licence. |
| 4 | + |
| 5 | +# Run tests as |
| 6 | +# python TestCoroutine.py |
| 7 | +# or, if you've got it installed: |
| 8 | +# nosetests |
| 9 | + |
| 10 | + |
| 11 | + |
| 12 | +from BrickPython.Coroutine import * |
| 13 | +import unittest |
| 14 | +import logging |
| 15 | +from mock import * |
| 16 | + |
| 17 | +class TestCoroutine(unittest.TestCase): |
| 18 | + ''' Tests for the Scheduler class, its built-in coroutines, and its coroutine handling. |
| 19 | + ''' |
| 20 | + coroutineCalls = [] |
| 21 | + @staticmethod |
| 22 | + def dummyCoroutineFunc(): |
| 23 | + for i in range(1, 5): |
| 24 | + TestCoroutine.coroutineCalls.append(i) |
| 25 | + Coroutine.wait(); |
| 26 | + |
| 27 | + @staticmethod |
| 28 | + def dummyCoroutineFuncThatDoesCleanup(): |
| 29 | + for i in range(1, 6): |
| 30 | + TestCoroutine.coroutineCalls.append(i) |
| 31 | + try: |
| 32 | + Coroutine.wait() |
| 33 | + finally: |
| 34 | + TestCoroutine.coroutineCalls.append( -1 ) |
| 35 | + |
| 36 | + @staticmethod |
| 37 | + def dummyCoroutineFuncThatThrowsException(): |
| 38 | + raise Exception("Hello") |
| 39 | + |
| 40 | + def setUp(self): |
| 41 | + TestCoroutine.coroutineCalls = [] |
| 42 | + |
| 43 | + def tearDown(self): |
| 44 | + pass |
| 45 | + |
| 46 | + def testCoroutinesGetCalledUntilDone(self): |
| 47 | + # When we start a coroutine |
| 48 | + f = TestCoroutine.dummyCoroutineFunc |
| 49 | + coroutine = Coroutine( f ) |
| 50 | + # It's a daemon thread |
| 51 | + self.assertTrue( coroutine.isDaemon()) |
| 52 | + # It doesn't run until we call it. |
| 53 | + self.assertEqual(TestCoroutine.coroutineCalls, [] ) |
| 54 | + # Each call gets one iteration |
| 55 | + coroutine.call() |
| 56 | + self.assertEqual(TestCoroutine.coroutineCalls, [1] ) |
| 57 | + # And when we run it until finished |
| 58 | + for i in range(0,10): |
| 59 | + coroutine.call() |
| 60 | + # It has completed |
| 61 | + self.assertEqual(TestCoroutine.coroutineCalls, [1,2,3,4] ) |
| 62 | + self.assertFalse( coroutine.isAlive() ) |
| 63 | + |
| 64 | + def testCoroutinesGetStoppedAndCleanedUp(self): |
| 65 | + # When we start a coroutine |
| 66 | + coroutine = Coroutine(TestCoroutine.dummyCoroutineFuncThatDoesCleanup) |
| 67 | + # run it for a bit then stop it |
| 68 | + coroutine.call() |
| 69 | + coroutine.stop() |
| 70 | + # It has stopped and cleaned up |
| 71 | + self.assertFalse( coroutine.isAlive()) |
| 72 | + self.assertEquals( TestCoroutine.coroutineCalls, [1,-1] ) |
| 73 | + |
| 74 | + def testCoroutineExceptionLogging(self): |
| 75 | + coroutine = Coroutine(TestCoroutine.dummyCoroutineFuncThatThrowsException) |
| 76 | + coroutine.logger = Mock() |
| 77 | + coroutine.call() |
| 78 | + self.assertTrue(coroutine.logger.info.called) |
| 79 | + firstParam = coroutine.logger.info.call_args[0][0] |
| 80 | + self.assertRegexpMatches(firstParam, "Coroutine - caught exception: .*Exception.*") |
| 81 | + self.assertRegexpMatches(coroutine.logger.debug.call_args[0][0], "Traceback.*") |
| 82 | + |
| 83 | + |
| 84 | + def testCoroutineWaitMilliseconds(self): |
| 85 | + def dummyCoroutineFuncWaiting1Sec(): |
| 86 | + Coroutine.waitMilliseconds(1000) |
| 87 | + |
| 88 | + coroutine = Coroutine(dummyCoroutineFuncWaiting1Sec) |
| 89 | + # Doesn't matter not restoring this; tests never use real time: |
| 90 | + Coroutine.currentTimeMillis = Mock(side_effect = [1,10,500,1200]) |
| 91 | + for i in range(1,3): |
| 92 | + coroutine.call() |
| 93 | + self.assertTrue(coroutine.is_alive(),"Coroutine dead at call %d" % i) |
| 94 | + coroutine.call() |
| 95 | + self.assertFalse(coroutine.is_alive()) |
| 96 | + |
| 97 | + def testCoroutineCanHaveParameters(self): |
| 98 | + def func(*args, **kwargs): |
| 99 | + print "In coroutine: %r %r" % (args, kwargs) |
| 100 | + self.assertEquals(args, (1)) |
| 101 | + self.assertEquals(kwargs, {"extra": 2}) |
| 102 | + coroutine = Coroutine(func, 1, extra=2) |
| 103 | + coroutine.call() |
| 104 | + |
| 105 | + pass |
| 106 | + |
| 107 | + def testWaitCanPassAndReceiveParameters(self): |
| 108 | + pass |
| 109 | + |
| 110 | + def testRunCoroutinesUntilFirstCompletes(self): |
| 111 | + pass |
| 112 | + |
| 113 | + def testRunCoroutinesUntilAllComplete(self): |
| 114 | + pass |
| 115 | + |
| 116 | + |
| 117 | + |
| 118 | +# def testWaitMilliseconds(self): |
| 119 | +# # If we wait for 10 ms |
| 120 | +# for i in self.scheduler.waitMilliseconds(10): |
| 121 | +# pass |
| 122 | +# # that's about the time that will have passed: |
| 123 | +# assert( self.scheduler.currentTimeMillis() in range(10,12) ) |
| 124 | +# |
| 125 | +# def testRunTillFirstCompletes(self): |
| 126 | +# # When we run three coroutines using runTillFirstCompletes: |
| 127 | +# for i in self.scheduler.runTillFirstCompletes(TestCoroutine.dummyCoroutine(1,9), |
| 128 | +# TestCoroutine.dummyCoroutine(1,2), |
| 129 | +# TestCoroutine.dummyCoroutine(1,9) ): |
| 130 | +# pass |
| 131 | +# # the first to complete stops the others: |
| 132 | +# self.assertEquals( TestCoroutine.coroutineCalls, [1,1,1,2] ) |
| 133 | +# self.assertEquals( self.scheduler.numCoroutines(), 0) |
| 134 | +# |
| 135 | +# def testRunTillAllComplete( self ): |
| 136 | +# # When we run three coroutines using runTillAllComplete: |
| 137 | +# for i in self.scheduler.runTillAllComplete( *[TestCoroutine.dummyCoroutine(1,i) for i in [2,3,4]] ): |
| 138 | +# pass |
| 139 | +# # they all run to completion: |
| 140 | +# print TestCoroutine.coroutineCalls |
| 141 | +# assert( TestCoroutine.coroutineCalls == [1,1,1,2,2,3] ) |
| 142 | +# assert( self.scheduler.numCoroutines() == 0) |
| 143 | +# |
| 144 | +# def testWithTimeout(self): |
| 145 | +# # When we run a coroutine with a timeout: |
| 146 | +# for i in self.scheduler.withTimeout(10, TestCoroutine.dummyCoroutineThatDoesCleanup(1,99) ): |
| 147 | +# pass |
| 148 | +# # It completes at around the timeout, and does cleanup: |
| 149 | +# print TestCoroutine.coroutineCalls |
| 150 | +# self.assertTrue( 0 < TestCoroutine.coroutineCalls[-2] <= 10) # N.b. currentTimeMillis is called more than once per doWork call. |
| 151 | +# self.assertEquals( TestCoroutine.coroutineCalls[-1], -1 ) |
| 152 | +# |
| 153 | +# def testTimeMillisToNextCall(self): |
| 154 | +# # Given a mock timer, and a different scheduler set up with a known time interval |
| 155 | +# scheduler = Scheduler(20) |
| 156 | +# # when we have just coroutines that take no time |
| 157 | +# scheduler.addActionCoroutine( TestCoroutine.dummyCoroutine() ) |
| 158 | +# # then the time to next tick is the default less a bit for the timer check calls: |
| 159 | +# scheduler.doWork() |
| 160 | +# ttnt = scheduler.timeMillisToNextCall() |
| 161 | +# assert( ttnt in range(17,20) ) |
| 162 | +# # when we have an additional coroutine that takes time |
| 163 | +# scheduler.addSensorCoroutine( TestCoroutine.dummyCoroutineThatTakesTime() ) |
| 164 | +# # then the time to next tick is less by the amount of time taken by the coroutine: |
| 165 | +# scheduler.doWork() |
| 166 | +# ttnt = scheduler.timeMillisToNextCall() |
| 167 | +# assert( ttnt in range(7,10) ) |
| 168 | +# # but when the coroutines take more time than the time interval available |
| 169 | +# for i in range(0,2): |
| 170 | +# scheduler.addSensorCoroutine( TestCoroutine.dummyCoroutineThatTakesTime() ) |
| 171 | +# # the time to next tick never gets less than zero |
| 172 | +# scheduler.doWork() |
| 173 | +# ttnt = scheduler.timeMillisToNextCall() |
| 174 | +# assert( ttnt == 0 ) |
| 175 | +# # and incidentally, we should have all the coroutines still running |
| 176 | +# assert( scheduler.numCoroutines() == 4 ) |
| 177 | +# |
| 178 | +# def timeCheckerCoroutine(self): |
| 179 | +# # Helper coroutine for testEachCallToACoroutineGetsADifferentTime |
| 180 | +# # Checks that each call gets a different time value. |
| 181 | +# time = Scheduler.currentTimeMillis() |
| 182 | +# while True: |
| 183 | +# yield |
| 184 | +# newTime = Scheduler.currentTimeMillis() |
| 185 | +# self.assertNotEquals( newTime, time ) |
| 186 | +# time = newTime |
| 187 | +# |
| 188 | +# def testEachCallToACoroutineGetsADifferentTime(self): |
| 189 | +# scheduler = Scheduler() |
| 190 | +# Scheduler.currentTimeMillis = Mock( side_effect = [0,0,0,0,0,0,0,0,0,0,1,2,3,4,5] ) |
| 191 | +# # For any coroutine, |
| 192 | +# scheduler.setUpdateCoroutine( self.timeCheckerCoroutine() ) |
| 193 | +# # We can guarantee that the timer always increments between calls (for speed calculations etc). |
| 194 | +# for i in range(1,10): |
| 195 | +# scheduler.doWork() |
| 196 | +# |
| 197 | +# def testTheWaitForCoroutine(self): |
| 198 | +# scheduler = Scheduler() |
| 199 | +# arrayParameter = [] |
| 200 | +# # When we create a WaitFor coroutine with a function that takes one parameter (actually an array) |
| 201 | +# coroutine = scheduler.waitFor( lambda ap: len(ap) > 0, arrayParameter ) |
| 202 | +# # It runs |
| 203 | +# for i in range(0,5): |
| 204 | +# coroutine.next() |
| 205 | +# # Until the function returns true. |
| 206 | +# arrayParameter.append(1) |
| 207 | +# TestCoroutine.checkCoroutineFinished( coroutine ) |
| 208 | +# |
| 209 | +# @staticmethod |
| 210 | +# def throwingCoroutine(): |
| 211 | +# yield |
| 212 | +# raise Exception("Hello") |
| 213 | +# |
| 214 | +# def testExceptionThrownFromCoroutine(self): |
| 215 | +# scheduler = Scheduler() |
| 216 | +# self.assertIsNotNone(scheduler.lastExceptionCaught) |
| 217 | +# scheduler.addActionCoroutine(self.throwingCoroutine()) |
| 218 | +# for i in range(1,3): |
| 219 | +# scheduler.doWork() |
| 220 | +# self.assertEquals(scheduler.lastExceptionCaught.message, "Hello") |
| 221 | +# |
| 222 | +# def testRunTillFirstCompletesWithException(self): |
| 223 | +# # When we run three coroutines using runTillFirstCompletes: |
| 224 | +# self.scheduler.addActionCoroutine(self.scheduler.runTillFirstCompletes(self.throwingCoroutine(), |
| 225 | +# TestCoroutine.dummyCoroutine(1,2), |
| 226 | +# TestCoroutine.dummyCoroutine(1,9) )) |
| 227 | +# for i in range(1,10): |
| 228 | +# self.scheduler.doWork() |
| 229 | +# # the first to complete stops the others: |
| 230 | +# self.assertEquals( TestCoroutine.coroutineCalls, [1,1] ) |
| 231 | +# self.assertEquals( self.scheduler.numCoroutines(), 0) |
| 232 | +# # and the exception is caught by the Scheduler: |
| 233 | +# self.assertEquals(self.scheduler.lastExceptionCaught.message, "Hello") |
| 234 | +# |
| 235 | +# def testRunTillAllCompleteWithException( self ): |
| 236 | +# # When we run three coroutines using runTillAllComplete: |
| 237 | +# self.scheduler.addActionCoroutine(self.scheduler.runTillAllComplete(self.throwingCoroutine(), |
| 238 | +# TestCoroutine.dummyCoroutine(1,2))) |
| 239 | +# for i in range(1,10): |
| 240 | +# self.scheduler.doWork() |
| 241 | +# # the first to complete stops the others: |
| 242 | +# self.assertEquals( TestCoroutine.coroutineCalls, [1] ) |
| 243 | +# self.assertEquals( self.scheduler.numCoroutines(), 0) |
| 244 | +# # and the exception is caught by the Scheduler: |
| 245 | +# self.assertEquals(self.scheduler.lastExceptionCaught.message, "Hello") |
| 246 | +# |
| 247 | +# def testCanCatchExceptionWithinNestedCoroutines(self): |
| 248 | +# self.caught = 0 |
| 249 | +# def outerCoroutine(self): |
| 250 | +# try: |
| 251 | +# for i in self.throwingCoroutine(): |
| 252 | +# yield |
| 253 | +# except: |
| 254 | +# self.caught = 1 |
| 255 | +# for i in outerCoroutine(self): |
| 256 | +# pass |
| 257 | +# self.assertEquals(self.caught, 1) |
| 258 | + |
| 259 | + |
| 260 | +if __name__ == '__main__': |
| 261 | + logging.basicConfig(format='%(message)s', level=logging.DEBUG) # Logging is a simple print |
| 262 | + unittest.main() |
| 263 | + |
0 commit comments