Skip to content

Commit 2aa79bb

Browse files
author
Lars Solberg
committed
poc for celery
1 parent 7d9cc56 commit 2aa79bb

13 files changed

Lines changed: 194 additions & 93 deletions

File tree

api/data/opa/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import logging
3+
from typing import Dict
34
from dynaconf import LazySettings
45

56
config = LazySettings()
@@ -25,3 +26,5 @@ def init_configuration():
2526
logging.debug(f'Using configuration environemnt={config.ENV_FOR_DYNACONF}')
2627
logging.debug('Configuration is:')
2728
logging.debug(json.dumps(config.as_dict(internal=False), indent=2))
29+
30+
state: Dict = {}

api/data/opa/core/plugin.py

Lines changed: 83 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@ class Driver(BasePlugin):
2626
opts: Dict[str, Any]
2727
pm: 'PluginManager'
2828

29-
def __init__(self, opts=None):
29+
def __init__(self, opts=None, load='auto'):
3030
self.opts = opts or {}
31+
self.load = load
3132

3233
def _pre_connection_check(self, connectionstatus, load):
3334
if connectionstatus == False: # if no host found
34-
if load == 'yes':
35+
if self.load == 'yes':
3536
raise Exception(
3637
f'Connect pre-check failed for {self.name}, as if the host is not there? Options {self.opts}'
3738
)
@@ -41,14 +42,14 @@ def _pre_connection_check(self, connectionstatus, load):
4142
if hasattr(self, 'validate'):
4243
return True
4344

44-
def initialize(self, load=None):
45+
def initialize(self):
4546
connectionstatus = self.connect()
46-
if self._pre_connection_check(connectionstatus, load):
47+
if self._pre_connection_check(connectionstatus, self.load):
4748
self.validate()
4849

49-
async def initialize_async(self, load=None):
50+
async def initialize_async(self):
5051
connectionstatus = await self.connect()
51-
if self._pre_connection_check(connectionstatus, load):
52+
if self._pre_connection_check(connectionstatus, self.load):
5253
await self.validate()
5354

5455
def get_instance(self):
@@ -70,8 +71,9 @@ class Setup(BasePlugin):
7071
...
7172

7273

73-
def get_defined_plugins(mod):
74-
returndata = {'hook-definitions': [], 'hooks': [], 'drivers': [], 'setup': []}
74+
def get_defined_plugins(mod, plugin_types=None):
75+
plugin_types = plugin_types or ['hook-definitions', 'hooks', 'drivers', 'setup']
76+
returndata = defaultdict(list)
7577

7678
for name, obj in inspect.getmembers(mod, inspect.isclass):
7779
if mod.__name__ != obj.__module__:
@@ -81,13 +83,13 @@ def get_defined_plugins(mod):
8183
if obj is Hook or obj is Driver or obj is Setup:
8284
continue
8385

84-
if issubclass(obj, Hook):
86+
if issubclass(obj, Hook) and 'hooks' in plugin_types:
8587
returndata['hooks'].append(obj)
86-
elif issubclass(obj, HookDefinition):
88+
elif issubclass(obj, HookDefinition) and 'hook-definitions' in plugin_types:
8789
returndata['hook-definitions'].append(obj)
88-
elif issubclass(obj, Driver):
90+
elif issubclass(obj, Driver) and 'drivers' in plugin_types:
8991
returndata['drivers'].append(obj)
90-
elif issubclass(obj, Setup):
92+
elif issubclass(obj, Setup) and 'setup' in plugin_types:
9193
returndata['setup'].append(obj)
9294
return returndata
9395

@@ -100,13 +102,17 @@ class PluginManager:
100102
hooks: Dict[str, List[Hook]]
101103
hook_definitions: Dict[str, Hook]
102104

105+
store: Dict[str, Any]
106+
103107
def __init__(self):
104108
self.status = defaultdict(dict)
105109
self.drivers = {}
106110
self.optional_components = {}
107111
self.hooks = defaultdict(list)
108112
self.hook_definitions = {}
109113

114+
self.store = {'task_candidates': []}
115+
110116
def post_hook(self):
111117
final_hooks: Dict[str, Hook] = {}
112118
for hook_name, hooks in self.hooks.items():
@@ -143,13 +149,7 @@ def register_driver(self, driver: Driver):
143149
logging.debug(f'Registered driver {name}')
144150
self.drivers[name] = driver
145151

146-
async def load_components(self):
147-
"""
148-
Preload the components that we are going to use.
149-
The components will be available using singletons that represent connections.
150-
We will therefor reuse each components connection and connect once per
151-
definition in config.OPTIONAL_COMPONENTS
152-
"""
152+
def _preload_drivers(self):
153153
for name, values in config.OPTIONAL_COMPONENTS.items():
154154
name = name.lower()
155155
load = values.get('LOAD', 'auto')
@@ -164,18 +164,30 @@ async def load_components(self):
164164
f'Invalid driver specified ({drivername}), no way to handle it'
165165
)
166166

167-
driverinstance = driver(opts=values.get('OPTS', {}))
167+
driverinstance = driver(opts=values.get('OPTS', {}), load=load)
168168
driverinstance.pm = self
169-
170-
if asyncio.iscoroutinefunction(driverinstance.connect):
171-
await driverinstance.initialize_async(load=load)
172-
else:
173-
driverinstance.initialize(load=load)
174169
self.optional_components[name] = driverinstance
175170

176171
logging.info(
177172
f'Connecting to {name} with driver {drivername}, using {driverinstance.opts}'
178173
)
174+
yield driverinstance
175+
176+
177+
async def load_components(self):
178+
for driverinstance in self._preload_drivers():
179+
if asyncio.iscoroutinefunction(driverinstance.connect):
180+
await driverinstance.initialize_async()
181+
else:
182+
driverinstance.initialize()
183+
184+
def load_sync_components_global(self):
185+
for driverinstance in self._preload_drivers():
186+
if asyncio.iscoroutinefunction(driverinstance.connect):
187+
logging.debug(f'Driver {driverinstance.name} is async, wont load')
188+
else:
189+
driverinstance.initialize()
190+
179191

180192
def register_hook_definition(self, obj):
181193
try:
@@ -238,14 +250,7 @@ async def call_async(self, name, *args, **kwargs):
238250

239251
return await self.hooks[name].run(*args, **kwargs)
240252

241-
242-
plugin_manager: PluginManager
243-
244-
245-
async def startup(app):
246-
global plugin_manager
247-
plugin_manager = PluginManager() # Singleton used around the app
248-
253+
def _get_plugindata():
249254
"""
250255
Plugins are imported from multiple paths with these rules:
251256
* First with a unique name wins
@@ -284,12 +289,17 @@ async def startup(app):
284289
sys.path = unique(sys_paths)
285290

286291
plugins_to_load = defaultdict(list)
292+
task_candidates = []
287293

288294
for plugin in pkgutil.iter_modules(PLUGIN_PATHS):
289295
allow_match = os.path.join(plugin.module_finder.path, plugin.name)
296+
tasks_candidate = False
290297

291298
if plugin.ispkg:
292299
metafile = os.path.join(allow_match, 'meta.json')
300+
301+
if os.path.exists(os.path.join(allow_match, 'tasks.py')):
302+
tasks_candidate = True
293303
else:
294304
metafile = f'{allow_match}-meta.json'
295305

@@ -345,24 +355,60 @@ async def startup(app):
345355
logging.info(f'Loading plugin: {plugin.name}')
346356
mod = import_module(plugin.name)
347357

358+
if tasks_candidate:
359+
task_candidates.append(plugin.name)
360+
348361
defined_plugins = get_defined_plugins(mod)
349362
for pt in ['hook-definitions', 'hooks', 'drivers', 'setup']:
350363
plugins_to_load[pt] += defined_plugins[pt]
351364

352-
for hook_definition in plugins_to_load['hook-definitions']:
365+
return {'plugins_to_load': plugins_to_load, 'task_candidates': task_candidates}
366+
367+
368+
plugin_manager: PluginManager
369+
370+
371+
async def startup(app):
372+
global plugin_manager
373+
plugin_manager = PluginManager()
374+
375+
plugin_manager.store.update(**_get_plugindata())
376+
377+
for hook_definition in plugin_manager.store['plugins_to_load']['hook-definitions']:
353378
plugin_manager.register_hook_definition(hook_definition)
354379

355-
for hook in plugins_to_load['hooks']:
380+
for hook in plugin_manager.store['plugins_to_load']['hooks']:
356381
plugin_manager.register_hook(hook)
357382
plugin_manager.post_hook()
358383

359-
for driver in plugins_to_load['drivers']:
384+
for driver in plugin_manager.store['plugins_to_load']['drivers']:
360385
plugin_manager.register_driver(driver)
361386
await plugin_manager.load_components()
362387

363-
for setup in plugins_to_load['setup']:
388+
for setup in plugin_manager.store['plugins_to_load']['setup']:
364389
plugin_manager.run_setup(setup, {'app': app, 'pm': plugin_manager})
365390

391+
def startup_worker():
392+
"""
393+
This function is dedicated to celery worker startup. We can't use the regular one
394+
because it is async, and it is tailored to fastapi
395+
"""
396+
global plugin_manager
397+
plugin_manager = PluginManager()
398+
399+
plugin_manager.store.update(**_get_plugindata())
400+
401+
for hook_definition in plugin_manager.store['plugins_to_load']['hook-definitions']:
402+
plugin_manager.register_hook_definition(hook_definition)
403+
404+
for hook in plugin_manager.store['plugins_to_load']['hooks']:
405+
plugin_manager.register_hook(hook)
406+
plugin_manager.post_hook()
407+
408+
for driver in plugin_manager.store['plugins_to_load']['drivers']:
409+
plugin_manager.register_driver(driver)
410+
plugin_manager.load_sync_components_global()
411+
366412

367413
async def shutdown():
368414
pass

api/data/opa/main.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,23 @@
11
import sys
22

3+
from typing import Any
4+
35
from starlette.middleware.cors import CORSMiddleware
46
from fastapi import Depends, FastAPI, Header, HTTPException
57

6-
from opa import config, init_configuration
8+
from opa import config, init_configuration, state
79
from opa.core import plugin
810

911
app: FastAPI
1012

11-
1213
async def plugin_startup():
1314
"""
1415
This function is called after app is available (via on_startup)
1516
"""
1617
await plugin.startup(app)
1718

1819

19-
def start_app():
20+
def start_api():
2021
global app
2122
init_configuration()
2223

@@ -43,6 +44,17 @@ def start_app():
4344

4445
return app
4546

47+
def start_worker():
48+
global celery
49+
init_configuration()
50+
plugin.startup_worker()
51+
52+
# We must export main.celery for the worker to be happy
53+
celery = plugin.plugin_manager.optional_components['celery'].instance
4654

4755
if 'uvicorn' in sys.argv[0]:
48-
start_app()
56+
state['runner'] = 'uvicorn'
57+
start_api()
58+
elif 'celery' in sys.argv[0]:
59+
state['runner'] = 'celery'
60+
start_worker()

api/data/opa/plugins/dev-helpers.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,8 @@
1010

1111
if config.PTVSD:
1212
import ptvsd
13+
from opa import state
1314

14-
ptvsd.enable_attach(('0.0.0.0', 5678))
15+
# Wont work in celery mode...
16+
if state['runner'] == 'uvicorn':
17+
ptvsd.enable_attach(('0.0.0.0', 5678))

api/data/opa/plugins/driver_celery.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@ class celery_setup(Hook):
1414
name = 'driver.celery.setup'
1515
order = -1
1616

17-
def run(self, celery_app):
18-
print('running hook')
19-
celery_app.conf.task_routes = {"worker.celery_worker.test_celery": "test-queue"}
20-
celery_app.conf.update(task_track_started=True)
17+
def run(self, celery_app, task_candidates):
18+
celery_app.autodiscover_tasks(task_candidates)
2119
return celery_app
2220

2321

@@ -35,6 +33,6 @@ def connect(self):
3533
"tasks", backend=self.opts.BACKEND_URL, broker=self.opts.BROKER_URL,
3634
)
3735

38-
celery_app = self.pm.call('driver.celery.setup', celery_app)
36+
celery_app = self.pm.call('driver.celery.setup', celery_app=celery_app, task_candidates=self.pm.store['task_candidates'])
3937

4038
self.instance = celery_app

api/data/opa/tests/conftest.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,33 +20,33 @@ def fix_syspath():
2020

2121
@pytest.fixture(scope="function")
2222
def app():
23-
with TestClient(main.start_app()) as client:
23+
with TestClient(main.start_api()) as client:
2424
yield client
2525

2626

2727
@pytest.fixture(scope="function")
2828
def app_c1(monkeypatch):
2929
monkeypatch.setenv('ENV', 'testing_1')
30-
with TestClient(main.start_app()) as client:
30+
with TestClient(main.start_api()) as client:
3131
yield client
3232

3333

3434
@pytest.fixture(scope="function")
3535
def app_c2(monkeypatch):
3636
monkeypatch.setenv('ENV', 'testing_2')
37-
with TestClient(main.start_app()) as client:
37+
with TestClient(main.start_api()) as client:
3838
yield client
3939

4040

4141
@pytest.fixture(scope="function")
4242
def app_dev(monkeypatch):
4343
monkeypatch.setenv('ENV', 'dev')
44-
with TestClient(main.start_app()) as client:
44+
with TestClient(main.start_api()) as client:
4545
yield client
4646

4747

4848
@pytest.fixture(scope="function")
4949
def app_examples(monkeypatch):
5050
monkeypatch.setenv('ENV', 'testing_examples')
51-
with TestClient(main.start_app()) as client:
51+
with TestClient(main.start_api()) as client:
5252
yield client

api/data/opa/tests/test_optional_components.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ def test_bogus_mongo(monkeypatch):
1313
monkeypatch.setenv('ENV', 'testing_optional_components_mongo_bogus')
1414

1515
with pytest.raises(OperationFailure, match=r".*Authentication failed.*"):
16-
with TestClient(main.start_app()):
16+
with TestClient(main.start_api()):
1717
pass
1818

1919

@@ -24,20 +24,20 @@ def test_bogus_redis(monkeypatch):
2424
ConnectionError,
2525
match=r".*Error 111 connecting to mongo:6379. Connection refused.*",
2626
):
27-
with TestClient(main.start_app()):
27+
with TestClient(main.start_api()):
2828
pass
2929

3030

3131
def test_nonexist_redis_auto(monkeypatch):
3232
monkeypatch.setenv('ENV', 'testing_optional_components_redis_nonexisting_auto')
3333

34-
with TestClient(main.start_app()):
34+
with TestClient(main.start_api()):
3535
pass
3636

3737

3838
def test_nonexist_redis_required(monkeypatch):
3939
monkeypatch.setenv('ENV', 'testing_optional_components_redis_nonexisting_required')
4040

4141
with pytest.raises(Exception, match=r".*Connect pre-check failed for.*"):
42-
with TestClient(main.start_app()):
42+
with TestClient(main.start_api()):
4343
pass

0 commit comments

Comments
 (0)