Skip to content

Commit b5d5cf6

Browse files
author
Lars Solberg
committed
Celery fixes
1 parent ea5f30e commit b5d5cf6

7 files changed

Lines changed: 68 additions & 23 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ api/data/opa/tests/*.log
1212
.coverage
1313
api/data/opa/tests/coverage.xml
1414
.ropeproject
15+
celerybeat-schedule

api/data/opa/core/plugin.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,12 @@ def get_plugin_manager() -> PluginManager:
429429

430430

431431
def get_component(name: str):
432+
try:
432433
return plugin_manager.optional_components[name]
434+
except KeyError:
435+
raise Exception(
436+
f'Component is not defined (yet?). Defined components are: {list(plugin_manager.optional_components.keys())}'
437+
)
433438

434439

435440
def get_instance(name: str):

api/data/opa/plugins/driver_celery.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ class celery_setup(Hook):
1414
name = 'driver.celery.setup'
1515
order = -1
1616

17-
def run(self, celery_app, task_candidates):
18-
celery_app.autodiscover_tasks(task_candidates)
17+
def run(self, celery_app):
1918
return celery_app
2019

2120

@@ -33,6 +32,7 @@ def connect(self):
3332
"tasks", backend=self.opts.BACKEND_URL, broker=self.opts.BROKER_URL,
3433
)
3534

36-
celery_app = call_hook('driver.celery.setup', celery_app=celery_app, task_candidates=self.pm.store['task_candidates'])
35+
celery_app.autodiscover_tasks(self.pm.store['task_candidates'])
36+
celery_app = call_hook('driver.celery.setup', celery_app=celery_app)
3737

3838
self.instance = celery_app

api/root/etc/services.d/worker/run

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44

55
cd /data
66

7-
exec celery worker -A opa.main ${CELERY_PARAMS}
7+
exec celery ${CELERY_COMMAND:-worker} -A opa.main ${CELERY_PARAMS}

examples/docker-compose/celery-task/docker-compose.yaml

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,39 @@ services:
1010
volumes:
1111
- ./plugins:/plugins
1212

13-
worker:
13+
worker-math:
1414
image: opastack/api:latest
1515
environment:
1616
OPA_PLUGIN_PATHS: "/plugins"
17+
OPA_LOGLEVEL: debug
1718
MODE: "worker"
19+
CELERY_PARAMS: -E -Q math
1820
volumes:
1921
- ./plugins:/plugins
2022

23+
worker-counter:
24+
image: opastack/api:latest
25+
environment:
26+
OPA_PLUGIN_PATHS: "/plugins"
27+
MODE: "worker"
28+
CELERY_PARAMS: -E -Q counter
29+
volumes:
30+
- ./plugins:/plugins
31+
32+
# ### untested
33+
# worker-beat:
34+
# image: opastack/api:latest
35+
# environment:
36+
# OPA_PLUGIN_PATHS: "/plugins"
37+
# MODE: "worker"
38+
# CELERY_COMMAND: beat
39+
# volumes:
40+
# - ./plugins:/plugins
41+
2142
redis:
2243
image: "redis:5"
2344

2445
rabbitmq:
2546
image: "rabbitmq:3.8-management"
26-
ports:
27-
- "15672:15672" # Management port
47+
# ports:
48+
# - "15672:15672" # Management port

examples/docker-compose/celery-task/plugins/celerydemo/__init__.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,31 @@
44
class celery_config(Hook):
55
name = 'driver.celery.setup'
66

7-
def run(self, celery_app, task_candidates):
8-
celery_app.conf.task_routes = {"worker.celery_worker.test_celery": "test-queue"}
7+
def run(self, celery_app):
8+
celery_app.conf.task_routes = {
9+
'celerydemo.tasks.counter': 'counter',
10+
'celerydemo.tasks.divider': 'math',
11+
}
912
celery_app.conf.update(task_track_started=True)
10-
celery_app.autodiscover_tasks(task_candidates)
1113
return celery_app
1214

1315

1416
router = get_router()
1517

1618

17-
@router.get("/add/{num1}/{num2}")
18-
async def root(num1: int, num2: int):
19-
from celerydemo.tasks import test_celery
19+
@router.get('/inc')
20+
def counter(num1: int, num2: int):
21+
from celerydemo.tasks import counter
2022

21-
celery = get_instance('celery')
2223
walrus = get_instance('walrus')
23-
test_celery.delay('abc')
24-
count = str(walrus.get('celery'))
25-
return {"message": "Word received", 'count': count}
24+
current_count = str(walrus.get('celery'))
25+
counter.delay()
26+
return {'status': 'queued', 'current_count': current_count}
27+
28+
29+
@router.get('/div/{num1}/{num2}')
30+
def divider(num1: int, num2: int):
31+
from celerydemo.tasks import divider
32+
33+
divider.delay(num1, num2)
34+
return {'status': 'queued'}

examples/docker-compose/celery-task/plugins/celerydemo/tasks.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,21 @@
66

77
celery = get_instance('celery')
88

9+
910
@celery.task
10-
def test_celery(word: str) -> str:
11-
for i in range(1, 4):
11+
def counter() -> str:
12+
13+
for i in range(4):
1214
sleep(1)
13-
current_task.update_state(state='PROGRESS',
14-
meta={'process_percent': i*10})
15+
current_task.update_state(state='PROGRESS', meta={'process_percent': i * 20})
16+
17+
status = get_instance('walrus').incr('celery')
18+
return f'incremented to {status}'
19+
20+
21+
@celery.task
22+
def divider(num1, num2) -> str:
23+
sleep(4)
1524

16-
get_instance('walrus').incr('celery')
17-
return f"test task return {word}...."
25+
result = num1 / num2
26+
return f'The result was {result}'

0 commit comments

Comments
 (0)