Небольшой проект, в котором реализована асинхронная обработка задач через очередь.
Основная идея — разделить приём задач (API) и их выполнение (worker), чтобы система не блокировалась и могла масштабироваться.
- API принимает задачу
- Сохраняет её в PostgreSQL
- Кладёт
job_idв Redis (очередь) - Worker забирает задачу
- Обрабатывает и обновляет статус + результат
- FastAPI
- PostgreSQL
- Redis
- SQLAlchemy
- Docker Compose
docker compose up --buildПосле запуска:
- API: http://127.0.0.1:8002
- Swagger: http://127.0.0.1:8002/docs
curl -X POST http://127.0.0.1:8002/jobs \
-H "Content-Type: application/json" \
-d '{
"type": "text_analysis",
"payload": {
"text": "hello world hello fastapi redis"
},
"priority": 1
}'curl -X POST http://127.0.0.1:8002/jobs \
-H "Content-Type: application/json" \
-d '{
"type": "text_analysis",
"payload": {},
"priority": 1,
"max_retries": 3
}'curl http://127.0.0.1:8002/jobs/{job_id}- pending
- queued
- processing
- done
- failed
Если задача падает:
- увеличивается
retries - если
retries < max_retries→ задача снова отправляется в очередь - если достигнут лимит →
failed
Очередь реализована через Redis Sorted Set.
- чем меньше значение
priority, тем выше приоритет - worker всегда берёт задачу с наивысшим приоритетом
docker compose up --scale worker=3Несколько worker’ов обрабатывают задачи параллельно.
docker compose logs -f worker- реализовал очередь через Redis
- разделил API и worker
- добавил retry механизм
- сделал приоритетную очередь
- настроил Docker Compose
- реализовал масштабирование worker’ов
- дебажил реальные проблемы (порты, контейнеры, подключения)
- мониторинг (Prometheus / Grafana)
- CI/CD
- аутентификация
- rate limiting
- Kubernetes