Skip to content

Commit fa550e7

Browse files
committed
Add a service wrapper to the domain class to better handle concurrency.
1 parent afff0fb commit fa550e7

File tree

13 files changed

+538
-86
lines changed

13 files changed

+538
-86
lines changed

model/_utils/__init__.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#
2+
# This file is protected by Copyright. Please refer to the COPYRIGHT file
3+
# distributed with this source distribution.
4+
#
5+
# This file is part of REDHAWK rtl-demo-app.
6+
#
7+
# REDHAWK rtl-demo-app is free software: you can redistribute it and/or modify it under
8+
# the terms of the GNU Lesser General Public License as published by the Free
9+
# Software Foundation, either version 3 of the License, or (at your option) any
10+
# later version.
11+
#
12+
# REDHAWK rtl-demo-app is distributed in the hope that it will be useful, but WITHOUT
13+
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14+
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15+
# details.
16+
#
17+
# You should have received a copy of the GNU Lesser General Public License
18+
# along with this program. If not, see http://www.gnu.org/licenses/.
19+
#

model/_utils/tasking.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
#
2+
# This file is protected by Copyright. Please refer to the COPYRIGHT file
3+
# distributed with this source distribution.
4+
#
5+
# This file is part of REDHAWK rtl-demo-app.
6+
#
7+
# REDHAWK rtl-demo-app is free software: you can redistribute it and/or modify it under
8+
# the terms of the GNU Lesser General Public License as published by the Free
9+
# Software Foundation, either version 3 of the License, or (at your option) any
10+
# later version.
11+
#
12+
# REDHAWK rtl-demo-app is distributed in the hope that it will be useful, but WITHOUT
13+
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14+
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15+
# details.
16+
#
17+
# You should have received a copy of the GNU Lesser General Public License
18+
# along with this program. If not, see http://www.gnu.org/licenses/.
19+
#
20+
from functools import wraps, partial
21+
from tornado import gen, concurrent
22+
from tornado import ioloop
23+
24+
# Suppressed known DeprecationWarning for the futures backport
25+
import warnings, exceptions
26+
warnings.filterwarnings("ignore", "The futures package has been deprecated.*", exceptions.DeprecationWarning, "futures")
27+
import futures
28+
29+
import logging
30+
import sys
31+
from futures import ThreadPoolExecutor
32+
33+
EXECUTOR = ThreadPoolExecutor(100)
34+
35+
_LINE = '%'*40
36+
37+
38+
def safe_return_future(func):
39+
'''
40+
Identical to tornado.gen.return_future plus
41+
thread safety. Executes the callback in
42+
the ioloop thread
43+
'''
44+
@wraps(func)
45+
def exec_func(*args, **kwargs):
46+
47+
future = concurrent.TracebackFuture()
48+
49+
# accept optional callback
50+
callback = kwargs.pop('callback', None)
51+
if callback:
52+
future.add_done_callback(callback)
53+
54+
try:
55+
56+
io_loop = kwargs.pop('ioloop', None)
57+
if not io_loop:
58+
io_loop = ioloop.IOLoop.current()
59+
60+
def _ioloop_callback(val):
61+
# print "set result to %s" % val
62+
future.set_result(val)
63+
64+
def _callback(val):
65+
# set the result in the ioloop thread
66+
# print "callback val is %s " % val
67+
io_loop.add_callback(_ioloop_callback, val)
68+
69+
# print "Func %s " % func
70+
func(callback=_callback, *args, **kwargs)
71+
except Exception:
72+
future.set_exc_info(sys.exc_info())
73+
74+
return future
75+
76+
exec_func.__doc__ = \
77+
("%s\nsafe_return_future() wrapped function.\n" +
78+
"Runs asynchronously and returns a Future.\n" +
79+
"See _utils.concurrent for more info\n%s\n%s") % (_LINE, _LINE, exec_func.__doc__)
80+
return exec_func
81+
82+
83+
84+
def background_task(func):
85+
86+
@wraps(func)
87+
def exec_background(*args, **kwargs):
88+
'''
89+
Executes a function in a background thread
90+
and returns a Future invoked when the thread completes.
91+
Useful for IO Bound processes that block. For CPU
92+
bound processes consider using celery, DO NOT execute
93+
CPU Bound tasks in the tornado process!
94+
95+
io_loop is the optional ioloop used to invoke the callback
96+
in the processing thread. This is useful for unit tests
97+
that do not use the singleton ioloop. If set to none,
98+
IOLoop.current() is returned
99+
'''
100+
# traceback future maintains python stack in exception
101+
future = concurrent.TracebackFuture()
102+
103+
# use explicit ioloop for unit testing
104+
# Ref: https://github.com/tornadoweb/tornado/issues/663
105+
io_loop = kwargs.pop('ioloop', None)
106+
if not io_loop:
107+
io_loop = ioloop.IOLoop.current()
108+
109+
# accept optional callback
110+
callback = kwargs.pop('callback', None)
111+
if callback:
112+
future.add_done_callback(callback)
113+
114+
def _do_task(*args, **kwargs):
115+
try:
116+
rtn = func(*args, **kwargs)
117+
io_loop.add_callback(future.set_result, rtn)
118+
except Exception, e:
119+
logging.debug("Callback exception", exc_info=True)
120+
io_loop.add_callback(future.set_exc_info, sys.exc_info())
121+
122+
EXECUTOR.submit(partial(_do_task, *args, **kwargs))
123+
return future
124+
125+
exec_background.__doc__ = \
126+
("%s\nbackground_task() wrapped function.\n" +
127+
"Runs asynchronously and returns a Future.\n" +
128+
"See _utils.concurrent for more info\n%s\n%s") % (_LINE, _LINE, exec_background.__doc__)
129+
return exec_background

model/_utils/test_concurrent.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
#!/usr/bin/env python
2+
#
3+
# This file is protected by Copyright. Please refer to the COPYRIGHT file
4+
# distributed with this source distribution.
5+
#
6+
# This file is part of REDHAWK rtl-demo-app.
7+
#
8+
# REDHAWK rtl-demo-app is free software: you can redistribute it and/or modify it under
9+
# the terms of the GNU Lesser General Public License as published by the Free
10+
# Software Foundation, either version 3 of the License, or (at your option) any
11+
# later version.
12+
#
13+
# REDHAWK rtl-demo-app is distributed in the hope that it will be useful, but WITHOUT
14+
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15+
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
16+
# details.
17+
#
18+
# You should have received a copy of the GNU Lesser General Public License
19+
# along with this program. If not, see http://www.gnu.org/licenses/.
20+
#
21+
22+
import unittest
23+
import sys
24+
import logging
25+
import time
26+
27+
from tornado import gen
28+
from tornado.testing import AsyncTestCase, LogTrapTestCase, main, gen_test
29+
30+
from futures import ThreadPoolExecutor
31+
32+
import tasking
33+
34+
EXECUTOR = ThreadPoolExecutor(4)
35+
36+
# all method returning suite is required by tornado.testing.main()
37+
def all():
38+
return unittest.TestLoader().loadTestsFromModule(__import__(__name__))
39+
40+
41+
class FuturesTest(AsyncTestCase):
42+
43+
@tasking.background_task
44+
def _sleepfunc(self, input, duration=1, exception=False):
45+
logging.debug("_sleepfun start")
46+
time.sleep(duration)
47+
if exception:
48+
raise input
49+
logging.debug("_sleepfun end")
50+
return input
51+
52+
53+
@gen_test
54+
def test_background_future(self):
55+
'''
56+
Runs a thread that sleeps in the background and generates a Future to indicate it's
57+
done or raised an exception.
58+
'''
59+
f = yield self._sleepfunc("the input is 1", 1)
60+
logging.debug("The future is %s", f)
61+
self.assertEquals("the input is 1", f)
62+
63+
@gen_test
64+
def test_background_future_except1(self):
65+
'''
66+
Runs a thread that sleeps in the background and generates a Future to indicate it's
67+
done or raised an exception.
68+
'''
69+
try:
70+
f = yield self._sleepfunc(ValueError('ignore me'), 1, exception=True)
71+
self.fail("Expecting ValueError")
72+
except ValueError:
73+
logging.info("Manual inspection of stack trace required:", exc_info=1)
74+
75+
def test_background_future_exception(self):
76+
'''
77+
Runs a thread that sleeps in the background and generates a Future to indicate it's
78+
done or raised an exception.
79+
'''
80+
f = self._sleepfunc(ValueError('ignore me'), 1, exception=True)
81+
logging.debug("The future is %s", f)
82+
self.io_loop.add_future(f, self.stop)
83+
f2 = self.wait()
84+
# print "RESULT IS '%s'" % self.wait()
85+
self.assertEquals(ValueError, type(f2.exception()))
86+
87+
88+
@gen_test
89+
def test_callback_future(self):
90+
91+
@tasking.safe_return_future
92+
def cbfunc(input, duration=1, callback=None):
93+
logging.debug("input=%s, callback=%s", input, callback)
94+
def background(i):
95+
logging.debug("invoking cbfunc")
96+
time.sleep(duration)
97+
callback(i)
98+
EXECUTOR.submit(background, input)
99+
100+
f = yield cbfunc("the input is 1", 1)
101+
logging.debug("The future is %s", f)
102+
self.assertEquals("the input is 1", f)
103+
104+
105+
#TODO: Add tests for callback options
106+
107+
108+
if __name__ == '__main__':
109+
110+
# to enable logging, use --logging=debug
111+
main()

model/domain.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
from ossie.utils import redhawk
2626
from ossie.utils.redhawk.channels import ODMListener
2727

28-
__author__ = 'rpcanno'
29-
3028

3129
def scan_domains():
3230
return redhawk.scan()
@@ -94,13 +92,13 @@ def properties(self):
9492
props = self.domMgr_ptr.query([]) # TODO: self.domMgr_ptr._properties
9593
return props
9694

97-
def _domain(self):
95+
def get_domain_info(self):
9896
if self.domMgr_ptr:
9997
return self.domMgr_ptr
10098
raise ResourceNotFound('domain', self.name)
10199

102100
def find_app(self, app_id=None):
103-
_dom = self._domain()
101+
_dom = self.get_domain_info()
104102
apps = _dom.apps
105103

106104
if not app_id:
@@ -123,7 +121,7 @@ def find_component(self, app_id, comp_id=None):
123121
raise ResourceNotFound('component', comp_id)
124122

125123
def find_device_manager(self, device_manager_id=None):
126-
_dom = self._domain()
124+
_dom = self.get_domain_info()
127125

128126
if not device_manager_id:
129127
return _dom.devMgrs
@@ -170,7 +168,7 @@ def components(self, app_id):
170168
return comps_dict
171169

172170
def launch(self, app_name):
173-
_dom = self._domain()
171+
_dom = self.get_domain_info()
174172
try:
175173
app = _dom.createApplication(app_name)
176174
return app._get_identifier()
@@ -186,7 +184,7 @@ def release(self, app_id):
186184
raise WaveformReleaseError(app_id, str(e))
187185

188186
def available_apps(self):
189-
_dom = self._domain()
187+
_dom = self.get_domain_info()
190188
sads_full_path = _dom.catalogSads()
191189
sads = _dom._sads
192190
sad_ret = []

0 commit comments

Comments
 (0)