Queueing data structure on top of a MongoDB collection.
mongo-taskqueue stores tasks in a single MongoDB collection and exposes a small API for enqueueing, leasing, and completing work. It is designed for simple worker loops that need scheduling, retries, and deduplication without an extra queue service.
- Single-collection queue with priority ordering
- Delayed enqueue and scheduled execution
- Visibility timeouts with optional heartbeat extension
- Retry tracking with optional exponential backoff
- Dedupe keys (string) to avoid duplicate in-flight tasks
- Global and per-key rate limiting
- Dead-letter collection for discarded tasks
- Sync and async APIs
- CLI for common operations
pip install mongo-taskqueueAsync support:
pip install "mongo-taskqueue[async]"from mongotq import get_task_queue
queue = get_task_queue(
database_name="app",
collection_name="jobs",
host="mongodb://localhost:27017",
ttl=-1,
)
queue.append({"job": "email", "to": "alice"})
task = queue.next()
if task:
try:
# do work
queue.on_success(task)
except Exception as exc:
queue.on_failure(task, error_message=str(exc))get_task_queue(...) accepts:
ttl: seconds a pending task can live before being marked failed (-1for no expiry)max_retries: maximum failure count before discarddiscard_strategy:keeporremovevisibility_timeout: lease duration for pending tasksretry_backoff_baseandretry_backoff_max: exponential backoff controlsdead_letter_collection: where discarded tasks are copied when usingdiscard_strategy="remove"meta_collection: metadata collection for rate limitsrate_limit_per_second: global and per-key dequeue limitsensure_indexes: create indexes if missing
Statuses are available as constants:
STATUS_NEW, STATUS_PENDING, STATUS_FAILED, STATUS_SUCCESSFUL.
Common transitions:
next()leases the next available task and marks itpendingon_success(task)marks itsuccessfulon_failure(task)increments retries and may reschedule or failon_retry(task)releases a leased task back tonewrefresh()requeues expired leases, expires pending tasks (TTL), and discards tasks over retry limits
queue.append({"job": "later"}, delay_seconds=30)
queue.append({"job": "at-time"}, scheduled_at=1710000000.0)Dedupe keys are indexed for string values only.
queue.append({"job": "once"}, dedupe_key="job-123")Duplicate inserts return False.
Set rate_limit_per_second to throttle dequeue frequency. When a task has a
rateLimitKey, the per-key limit is enforced in addition to the global limit.
If a per-key limit is hit after a task is leased, the task is released and
scheduled slightly in the future.
queue = get_task_queue(
database_name="app",
collection_name="jobs",
host="mongodb://localhost:27017",
ttl=-1,
max_retries=1,
discard_strategy="remove",
dead_letter_collection="jobs_dead",
)Discarded tasks are copied to the dead-letter collection during refresh().
from mongotq import AsyncTaskQueue
queue = AsyncTaskQueue(
database="app",
collection="jobs",
host="mongodb://localhost:27017",
ttl=-1,
)
await queue.append({"job": "async"})
task = await queue.next()
if task:
await queue.on_success(task)The CLI is installed as mongotq.
Environment variables:
MONGOTQ_HOST(orMONGO_URI)MONGOTQ_DATABASEMONGOTQ_COLLECTIONMONGOTQ_TTL
Example:
mongotq \
--host mongodb://localhost:27017 \
--database app \
--collection jobs \
append '{"job": "email"}'Common commands:
append,next,pop,refresh,size,statushead,tail,purge,requeue-failedheartbeat,dead-letter-count,resolve-anomalies
Tests are designed to run in GitHub Actions. Local runs are skipped unless
GITHUB_ACTIONS=true is set.
MIT. See LICENSE.