
queue is a queue and workflow library with pluggable backends and runtime extensions.
Installation
go get github.com/goforj/queueQuick Start
import (
"context"
"fmt"
"github.com/goforj/queue"
)
func main() {
q, _ := queue.NewWorkerpool(
queue.WithWorkers(2), // optional; default: runtime.NumCPU() (min 1)
)
type EmailPayload struct {
To string `json:"to"`
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
var payload EmailPayload
_ = m.Bind(&payload)
fmt.Println("send to", payload.To)
return nil
})
_ = q.StartWorkers(context.Background())
defer q.Shutdown(context.Background())
_, _ = q.Dispatch(
queue.NewJob("emails:send").
Payload(EmailPayload{To: "[email protected]"}),
)
}Drivers
| Driver / Backend | Mode | Notes | Durable | Async | Delay | Unique | Backoff | Timeout | Native Stats | Queue Admin |
|---|---|---|---|---|---|---|---|---|---|---|
| Drop-only | Discards dispatched jobs; useful for disabled queue modes and smoke tests. | - | - | - | - | - | - | - | - | |
| Inline (caller) | Deterministic local execution with no external infra. | - | - | - | ✓ | - | ✓ | - | - | |
| In-process pool | Local async behavior without external broker/database. | - | ✓ | ✓ | ✓ | ✓ | ✓ | - | - | |
| SQL durable queue | MySQL driver module (driver/mysqlqueue) built on shared SQL queue core. | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | - | |
| SQL durable queue | Postgres driver module (driver/postgresqueue) built on shared SQL queue core. | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | - | |
| SQL durable queue | SQLite driver module (driver/sqlitequeue) built on shared SQL queue core. | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | - | |
| Redis/Asynq | Production Redis backend (Asynq semantics). | ✓ | ✓ | ✓ | ✓ | - | ✓ | ✓ | ✓ | |
| Broker target | NATS transport with queue-subject routing. | - | ✓ | ✓ | ✓ | ✓ | ✓ | - | - | |
| Broker target | AWS SQS transport with endpoint overrides for localstack/testing. | - | ✓ | ✓ | ✓ | ✓ | ✓ | - | - | |
| Broker target | RabbitMQ transport and worker consumption. | - | ✓ | ✓ | ✓ | ✓ | ✓ | - | - |
SQL-backed queues (
sqlite,mysql,postgres) are durable and convenient, but they trade throughput for operational simplicity. They default to1worker, and increasing concurrency may require DB tuning (indexes, connection pool, lock contention). Prefer broker-backed drivers for higher-throughput workloads.Queue Admin status: the cross-driver admin contract is defined in core (
ListJobs,RetryJob,CancelJob,DeleteJob,ClearQueue,QueueHistory), but full queue admin operations are currently implemented only for Redis. Other drivers returnErrQueueAdminUnsupportedfor unsupported admin actions.
Driver constructor quick examples
Use root constructors for in-process backends, and driver-module constructors for external backends. See the Driver Constructors API section below for full constructor shapes (New(...) and NewWithConfig(...)). Driver backends live in separate packages so applications only import/link the optional backend dependencies they actually use (smaller builds, less dependency overhead, cleaner deploys).
package main
import (
"github.com/goforj/queue"
"github.com/goforj/queue/driver/mysqlqueue"
"github.com/goforj/queue/driver/natsqueue"
"github.com/goforj/queue/driver/postgresqueue"
"github.com/goforj/queue/driver/rabbitmqqueue"
"github.com/goforj/queue/driver/redisqueue"
"github.com/goforj/queue/driver/sqlitequeue"
"github.com/goforj/queue/driver/sqsqueue"
)
func main() {
queue.NewSync() // in-process sync
queue.NewWorkerpool() // in-process worker pool
queue.NewNull() // drop-only / disabled mode
sqlitequeue.New("file:queue.db?_busy_timeout=5000") // SQL durable queue (SQLite)
mysqlqueue.New("user:pass@tcp(127.0.0.1:3306)/app") // SQL durable queue (MySQL)
postgresqueue.New("postgres://user:[email protected]:5432/app?sslmode=disable") // SQL durable queue (Postgres)
redisqueue.New("127.0.0.1:6379") // Redis/Asynq
natsqueue.New("nats://127.0.0.1:4222") // NATS
sqsqueue.New("us-east-1") // SQS
rabbitmqqueue.New("amqp://guest:[email protected]:5672/") // RabbitMQ
}Quick Start (Advanced: Workflows)
import (
"context"
"github.com/goforj/queue"
)
type EmailPayload struct {
ID int `json:"id"`
}
func main() {
q, _ := queue.NewWorkerpool()
q.Register("reports:generate", func(ctx context.Context, m queue.Message) error {
return nil
})
q.Register("reports:upload", func(ctx context.Context, m queue.Message) error {
var payload EmailPayload
if err := m.Bind(&payload); err != nil {
return err
}
return nil
})
q.Register("users:notify_report_ready", func(ctx context.Context, m queue.Message) error {
return nil
})
_ = q.StartWorkers(context.Background())
defer q.Shutdown(context.Background())
chainID, _ := q.Chain(
// 1) generate report data
queue.NewJob("reports:generate").Payload(map[string]any{"report_id": "rpt_123"}),
// 2) upload report artifact after generate succeeds
queue.NewJob("reports:upload").Payload(EmailPayload{ID: 123}),
// 3) notify user only after upload succeeds
queue.NewJob("users:notify_report_ready").Payload(map[string]any{"user_id": 123}),
).OnQueue("critical").Dispatch(context.Background())
_ = chainID
}Run as a Worker Service
Use Run(ctx) for long-lived workers: it starts processing, waits for shutdown signals, and performs graceful termination.
import (
"context"
"log"
"os/signal"
"syscall"
"github.com/goforj/queue"
)
func main() {
q, _ := queue.NewWorkerpool()
// Register handlers before starting workers.
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
return nil
})
// Create a context that is canceled on SIGINT/SIGTERM (Ctrl+C, container stop).
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
// Run starts workers, blocks until ctx is canceled, then gracefully shuts down.
if err := q.Run(ctx); err != nil {
log.Fatal(err)
}
}Core Concepts
Job: Typed work unit for app handlers.
_, _ = q.Dispatch(
queue.NewJob("emails:send").Payload(EmailPayload{To: "[email protected]"}),
)Chain: Ordered workflow (A then B then C).
_, _ = q.Chain(
queue.NewJob("reports:generate"),
queue.NewJob("reports:upload"),
queue.NewJob("users:notify_report_ready"),
).Dispatch(context.Background())Batch: Parallel workflow with callbacks.
_, _ = q.Batch(
queue.NewJob("emails:send"),
queue.NewJob("sms:send"),
).Then(queue.NewJob("notifications:done")).Dispatch(context.Background())Middleware: Cross-cutting execution policy.
q, _ := queue.New(
queue.Config{Driver: queue.DriverWorkerpool},
queue.WithMiddleware(audit, skipMaintenance, fatalValidation),
)Events: Lifecycle hooks and observability.
q, _ := queue.New(
queue.Config{Driver: queue.DriverWorkerpool, Observer: queue.NewStatsCollector()},
)Backends: Driver/runtime transport selection.
q, _ := queue.NewWorkerpool()
rq, _ := redisqueue.New("127.0.0.1:6379")
_, _ = q, rqJob builder options
// Define a struct for your job payload.
type EmailPayload struct {
ID int `json:"id"`
To string `json:"to"`
}
// Fluent builder pattern for job options.
job := queue.NewJob("emails:send").
// Payload can be bytes, structs, maps, or JSON-marshalable values.
// Default payload is empty.
Payload(EmailPayload{ID: 123, To: "[email protected]"}).
// OnQueue sets the queue name.
// Default is empty; broker-style drivers expect an explicit queue.
OnQueue("default").
// Timeout sets per-job execution timeout.
// Default is unset; some drivers may apply driver/runtime defaults.
Timeout(20 * time.Second).
// Retry sets max retries.
// Default is 0, which means one total attempt.
Retry(3).
// Backoff sets retry delay.
// Default is unset; Redis dispatch returns ErrBackoffUnsupported.
Backoff(500 * time.Millisecond).
// Delay schedules first execution in the future.
// Default is 0 (run immediately).
Delay(2 * time.Second).
// UniqueFor deduplicates Type+Payload for a TTL window.
// Default is 0 (no dedupe).
UniqueFor(45 * time.Second)
// Dispatch the job to the queue.
_, _ = q.Dispatch(job)
// In handlers, use Bind to decode payload into a struct.
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
var payload EmailPayload
if err := m.Bind(&payload); err != nil {
return err
}
return nil
})Benchmarks
Run local + integration-backed benchmarks (requires Docker/testcontainers):
cd docs && GOWORK=off INTEGRATION_BACKEND=all GOCACHE=/tmp/queue-gocache go test -tags=benchrender ./bench -run '^TestRenderBenchmarks$'Latency (ns/op)
Throughput (ops/s)
Allocated Bytes (B/op)
Allocations (allocs/op)
Tables
| Class | Driver | ns/op | ops/s | B/op | allocs/op |
|---|---|---|---|---|---|
| External | nats | 774 | 1291823 | 1258 | 13 |
| External | redis | 95295 | 10494 | 2113 | 33 |
| External | rabbitmq | 165780 | 6032 | 1882 | 57 |
| External | sqlite | 202380 | 4941 | 1931 | 47 |
| External | postgres | 1056731 | 946 | 3809 | 78 |
| External | sqs | 1873911 | 534 | 94784 | 1082 |
| External | mysql | 2286406 | 437 | 3303 | 62 |
| Local | null | 37 | 26673780 | 128 | 1 |
| Local | sync | 282 | 3539823 | 408 | 6 |
| Local | workerpool | 650 | 1538462 | 456 | 7 |
Middleware
Use queue.WithMiddleware(...) to apply cross-cutting workflow behavior to workflow job execution (logging, filtering, and error policy).
Common patterns:
- wrap handler execution (before/after logging, timing, tracing)
- skip jobs conditionally (maintenance mode, feature flags)
- convert matched errors into terminal failures (no retry)
var errValidation = errors.New("validation failed")
maintenanceMode := false
audit := queue.MiddlewareFunc(func(ctx context.Context, m queue.Message, next queue.Next) error {
log.Printf("start job=%s", m.JobType)
err := next(ctx, m)
log.Printf("done job=%s err=%v", m.JobType, err)
return err
})
skipMaintenance := queue.SkipWhen{
Predicate: func(context.Context, queue.Message) bool {
return maintenanceMode
},
}
fatalValidation := queue.FailOnError{
When: func(err error) bool {
return errors.Is(err, errValidation)
},
}
q, _ := queue.New(
queue.Config{Driver: queue.DriverWorkerpool},
queue.WithMiddleware(audit, skipMaintenance, fatalValidation),
)
_ = qObservability
Use queue.Observer implementations to capture normalized runtime events across drivers.
collector := queue.NewStatsCollector()
observer := queue.MultiObserver(
collector,
queue.ObserverFunc(func(event queue.Event) {
_ = event.Kind
}),
)
q, _ := queue.New(queue.Config{
Driver: queue.DriverWorkerpool,
Observer: observer,
})
_ = qDistributed counters and source of truth
StatsCollectorcounters are process-local and event-driven.- In multi-process deployments, aggregate metrics externally (OTel/Prometheus/etc.).
- Prefer backend-native stats when available.
queue.SupportsNativeStats(q)indicates native driver snapshot support.queue.Snapshot(ctx, q, collector)merges native + collector where possible.
Compose observers
events := make(chan queue.Event, 100)
collector := queue.NewStatsCollector()
observer := queue.MultiObserver(
collector,
queue.ChannelObserver{
Events: events,
DropIfFull: true,
},
queue.ObserverFunc(func(e queue.Event) {
_ = e
}),
)
q, _ := queue.New(queue.Config{
Driver: queue.DriverWorkerpool,
Observer: observer,
})
_ = qKitchen sink event logging (runtime + workflow)
Runnable example: examples/observeall/main.go
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
runtimeObserver := queue.ObserverFunc(func(event queue.Event) {
attemptInfo := fmt.Sprintf("attempt=%d/%d", event.Attempt, event.MaxRetry+1)
jobInfo := fmt.Sprintf("job=%s key=%s queue=%s driver=%s", event.JobType, event.JobKey, event.Queue, event.Driver)
switch event.Kind {
case queue.EventEnqueueAccepted:
logger.Info("Accepted dispatch", "msg", fmt.Sprintf("Accepted %s", jobInfo), "scheduled", event.Scheduled, "at", event.Time.Format(time.RFC3339Nano))
case queue.EventEnqueueRejected:
logger.Error("Dispatch failed", "msg", fmt.Sprintf("Rejected %s", jobInfo), "error", event.Err)
case queue.EventEnqueueDuplicate:
logger.Warn("Skipped duplicate job", "msg", fmt.Sprintf("Duplicate %s", jobInfo))
case queue.EventEnqueueCanceled:
logger.Warn("Canceled dispatch", "msg", fmt.Sprintf("Canceled %s", jobInfo), "error", event.Err)
case queue.EventProcessStarted:
logger.Info("Started processing job", "msg", fmt.Sprintf("Started %s (%s)", jobInfo, attemptInfo), "at", event.Time.Format(time.RFC3339Nano))
case queue.EventProcessSucceeded:
logger.Info("Processed job", "msg", fmt.Sprintf("Processed %s in %s (%s)", jobInfo, event.Duration, attemptInfo))
case queue.EventProcessFailed:
logger.Error("Processing failed", "msg", fmt.Sprintf("Failed %s after %s (%s)", jobInfo, event.Duration, attemptInfo), "error", event.Err)
case queue.EventProcessRetried:
logger.Warn("Retrying job", "msg", fmt.Sprintf("Retry scheduled for %s (%s)", jobInfo, attemptInfo), "error", event.Err)
case queue.EventProcessArchived:
logger.Error("Archived failed job", "msg", fmt.Sprintf("Archived %s after final failure (%s)", jobInfo, attemptInfo), "error", event.Err)
case queue.EventQueuePaused:
logger.Info("Paused queue", "msg", fmt.Sprintf("Paused queue=%s driver=%s", event.Queue, event.Driver))
case queue.EventQueueResumed:
logger.Info("Resumed queue", "msg", fmt.Sprintf("Resumed queue=%s driver=%s", event.Queue, event.Driver))
default:
logger.Info("Queue event", "msg", fmt.Sprintf("kind=%s %s", event.Kind, jobInfo))
}
})
workflowObserver := queue.WorkflowObserverFunc(func(event queue.WorkflowEvent) {
logger.Info("workflow event",
"kind", event.Kind,
"dispatch_id", event.DispatchID,
"job_id", event.JobID,
"chain_id", event.ChainID,
"batch_id", event.BatchID,
"job_type", event.JobType,
"queue", event.Queue,
"attempt", event.Attempt,
"duration", event.Duration,
"err", event.Err,
)
})
q, _ := queue.New(
queue.Config{
Driver: queue.DriverSync,
Observer: runtimeObserver,
},
queue.WithObserver(workflowObserver),
)
_ = qEvents reference
| Type | EventKind | Meaning |
|---|---|---|
| queue | enqueue_accepted | Job accepted by driver for enqueue. |
| queue | enqueue_rejected | Job enqueue failed. |
| queue | enqueue_duplicate | Duplicate job rejected due to uniqueness key. |
| queue | enqueue_canceled | Context cancellation prevented enqueue. |
| queue | process_started | Worker began processing job. |
| queue | process_succeeded | Handler returned success. |
| queue | process_failed | Handler returned error. |
| queue | process_retried | Driver scheduled retry attempt. |
| queue | process_archived | Job moved to terminal failure state. |
| queue | queue_paused | Queue was paused (driver supports pause). |
| queue | queue_resumed | Queue was resumed. |
| workflow | dispatch_started | Workflow runtime accepted a dispatch request and created a dispatch record. |
| workflow | dispatch_succeeded | Dispatch was successfully enqueued to the underlying queue runtime. |
| workflow | dispatch_failed | Dispatch failed before job execution could start. |
| workflow | job_started | A workflow job handler started execution. |
| workflow | job_succeeded | A workflow job handler completed successfully. |
| workflow | job_failed | A workflow job handler returned an error. |
| workflow | chain_started | A chain workflow was created and started. |
| workflow | chain_advanced | Chain progressed from one node to the next node. |
| workflow | chain_completed | Chain reached terminal success. |
| workflow | chain_failed | Chain reached terminal failure. |
| workflow | batch_started | A batch workflow was created and started. |
| workflow | batch_progressed | Batch state changed as jobs completed/failed. |
| workflow | batch_completed | Batch reached terminal success (or allowed-failure completion). |
| workflow | batch_failed | Batch reached terminal failure. |
| workflow | batch_cancelled | Batch was cancelled before normal completion. |
| workflow | callback_started | Chain/batch callback execution started. |
| workflow | callback_succeeded | Chain/batch callback completed successfully. |
| workflow | callback_failed | Chain/batch callback returned an error. |
Examples
Runnable examples live in the separate examples module (./examples). They are not included when applications import github.com/goforj/queue, which keeps dependency graphs and build/link overhead smaller.
Admin Support
Queue admin APIs are part of the core contract so additional drivers can implement them over time. At this time, full admin operations (ListJobs, RetryJob, CancelJob, DeleteJob, ClearQueue) are Redis-only. Use queue.SupportsQueueAdmin(q) (or handle queue.ErrQueueAdminUnsupported) to gate admin workflows per runtime.
API reference
The API section below is autogenerated; do not edit between the markers.
API Index
API
Admin
CancelJob
CancelJob cancels a job when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
err = queue.CancelJob(context.Background(), q, "job-id")
_ = errQueue.CancelJob
CancelJob cancels a job via queue admin capability when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
if !queue.SupportsQueueAdmin(q) {
return
}
err = q.CancelJob(context.Background(), "job-id")
_ = errClearQueue
ClearQueue clears queue jobs when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
err = queue.ClearQueue(context.Background(), q, "default")
_ = errQueue.ClearQueue
ClearQueue clears queue jobs via queue admin capability when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
if !queue.SupportsQueueAdmin(q) {
return
}
err = q.ClearQueue(context.Background(), "default")
_ = errDeleteJob
DeleteJob deletes a job when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
err = queue.DeleteJob(context.Background(), q, "default", "job-id")
_ = errQueue.DeleteJob
DeleteJob deletes a job via queue admin capability when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
if !queue.SupportsQueueAdmin(q) {
return
}
err = q.DeleteJob(context.Background(), "default", "job-id")
_ = errHistory
History returns queue history points via queue admin capability when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
points, err := q.History(context.Background(), "default", queue.QueueHistoryHour)
_ = errListJobs
ListJobs lists jobs for a queue and state when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
_, err = queue.ListJobs(context.Background(), q, queue.ListJobsOptions{
Queue: "default",
State: queue.JobStatePending,
})
_ = errQueue.ListJobs
ListJobs lists jobs via queue admin capability when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
_, err = q.ListJobs(context.Background(), queue.ListJobsOptions{
Queue: "default",
State: queue.JobStatePending,
})
_ = errNormalize
Normalize returns a safe options payload with defaults applied.
opts := queue.ListJobsOptions{Queue: "", State: "", Page: 0, PageSize: 1000}
normalized := opts.Normalize()
fmt.Println(normalized.Queue, normalized.State, normalized.Page, normalized.PageSize)
// Output: default pending 1 500QueueHistory
QueueHistory returns queue history points when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
_, err = queue.QueueHistory(context.Background(), q, "default", queue.QueueHistoryHour)
_ = errRetryJob
RetryJob retries (runs now) a job when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
err = queue.RetryJob(context.Background(), q, "default", "job-id")
_ = errQueue.RetryJob
RetryJob retries (runs now) a job via queue admin capability when supported.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
if !queue.SupportsQueueAdmin(q) {
return
}
err = q.RetryJob(context.Background(), "default", "job-id")
_ = errSinglePointHistory
SinglePointHistory converts a snapshot into a single current-history point. This helper is intended for driver modules that do not expose historical buckets.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Processed: 12, Failed: 1},
},
}
points := queue.SinglePointHistory(snapshot, "default")
fmt.Println(len(points), points[0].Processed, points[0].Failed)
// Output: 1 12 1SupportsQueueAdmin
SupportsQueueAdmin reports whether queue admin operations are available.
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
fmt.Println(queue.SupportsQueueAdmin(q))
// Output: trueTimelineHistoryFromSnapshot
TimelineHistoryFromSnapshot records queue counters and returns windowed points. This is intended for drivers that don't expose native multi-point history.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Processed: 5, Failed: 1},
},
}
points := queue.TimelineHistoryFromSnapshot(snapshot, "default", queue.QueueHistoryHour)
fmt.Println(len(points) >= 1)
// Output: trueConstructors
queue.New
New creates the high-level Queue API based on Config.Driver.
q, err := queue.New(queue.Config{Driver: queue.DriverWorkerpool})
if err != nil {
return
}
type EmailPayload struct {
ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
var payload EmailPayload
if err := m.Bind(&payload); err != nil {
return err
}
_ = payload
return nil
})
_ = q.WithWorkers(1).StartWorkers(context.Background()) // optional; default: runtime.NumCPU() (min 1)
defer q.Shutdown(context.Background())
_, _ = q.Dispatch(
queue.NewJob("emails:send").
Payload(EmailPayload{ID: 1}).
OnQueue("default"),
)NewNull
NewNull creates a Queue on the null backend.
q, err := queue.NewNull()
if err != nil {
return
}NewStatsCollector
NewStatsCollector creates an event collector for queue counters.
collector := queue.NewStatsCollector()NewSync
NewSync creates a Queue on the synchronous in-process backend.
q, err := queue.NewSync()
if err != nil {
return
}NewWorkerpool
NewWorkerpool creates a Queue on the in-process workerpool backend.
q, err := queue.NewWorkerpool()
if err != nil {
return
}Job
Backoff
Backoff sets delay between retries.
job := queue.NewJob("emails:send").Backoff(500 * time.Millisecond)Bind
Bind unmarshals job payload JSON into dst.
type EmailPayload struct {
ID int `json:"id"`
To string `json:"to"`
}
job := queue.NewJob("emails:send").Payload(EmailPayload{
ID: 1,
To: "[email protected]",
})
var payload EmailPayload
if err := job.Bind(&payload); err != nil {
return
}
_ = payload.ToDelay
Delay defers execution by duration.
job := queue.NewJob("emails:send").Delay(300 * time.Millisecond)NewJob
NewJob creates a job value with a required job type.
job := queue.NewJob("emails:send")OnQueue
OnQueue sets the target queue name.
job := queue.NewJob("emails:send").OnQueue("critical")Payload
Payload sets job payload from common value types.
Example: payload bytes
jobBytes := queue.NewJob("emails:send").Payload([]byte(`{"id":1}`))Example: payload struct
type Meta struct {
Nested bool `json:"nested"`
}
type EmailPayload struct {
ID int `json:"id"`
To string `json:"to"`
Meta Meta `json:"meta"`
}
jobStruct := queue.NewJob("emails:send").Payload(EmailPayload{
ID: 1,
To: "[email protected]",
Meta: Meta{Nested: true},
})Example: payload map
jobMap := queue.NewJob("emails:send").Payload(map[string]any{
"id": 1,
"to": "[email protected]",
"meta": map[string]any{"nested": true},
})PayloadBytes
PayloadBytes returns a copy of job payload bytes.
job := queue.NewJob("emails:send").Payload([]byte(`{"id":1}`))
payload := job.PayloadBytes()PayloadJSON
PayloadJSON marshals payload as JSON.
job := queue.NewJob("emails:send").PayloadJSON(map[string]int{"id": 1})Retry
Retry sets max retry attempts.
job := queue.NewJob("emails:send").Retry(4)Timeout
Timeout sets per-job execution timeout.
job := queue.NewJob("emails:send").Timeout(10 * time.Second)UniqueFor
UniqueFor enables uniqueness dedupe within the given TTL.
job := queue.NewJob("emails:send").UniqueFor(45 * time.Second)Observability
Active
Active returns active count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Active: 2},
},
}
fmt.Println(snapshot.Active("default"))
// Output: 2Archived
Archived returns archived count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Archived: 7},
},
}
fmt.Println(snapshot.Archived("default"))
// Output: 7Failed
Failed returns failed count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Failed: 2},
},
}
fmt.Println(snapshot.Failed("default"))
// Output: 2MultiObserver
MultiObserver fans out events to multiple observers.
events := make(chan queue.Event, 2)
observer := queue.MultiObserver(
queue.ChannelObserver{Events: events},
queue.ObserverFunc(func(queue.Event) {}),
)
observer.Observe(queue.Event{Kind: queue.EventEnqueueAccepted})
fmt.Println(len(events))
// Output: 1ChannelObserver.Observe
Observe forwards an event to the configured channel.
ch := make(chan queue.Event, 1)
observer := queue.ChannelObserver{Events: ch}
observer.Observe(queue.Event{Kind: queue.EventProcessStarted, Queue: "default"})
event := <-chObserver.Observe
Observe handles a queue runtime event.
var observer queue.Observer
observer.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "default",
})ObserverFunc.Observe
Observe calls the wrapped function.
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
observer := queue.ObserverFunc(func(event queue.Event) {
logger.Info("queue event",
"kind", event.Kind,
"driver", event.Driver,
"queue", event.Queue,
"job_type", event.JobType,
"attempt", event.Attempt,
"max_retry", event.MaxRetry,
"duration", event.Duration,
"err", event.Err,
)
})
observer.Observe(queue.Event{
Kind: queue.EventProcessSucceeded,
Driver: queue.DriverSync,
Queue: "default",
JobType: "emails:send",
})StatsCollector.Observe
Observe records an event and updates normalized counters.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})Pause
Pause pauses queue consumption for drivers that support it.
q, _ := queue.NewSync()
_ = queue.Pause(context.Background(), q, "default")
snapshot, _ := queue.Snapshot(context.Background(), q, nil)
fmt.Println(snapshot.Paused("default"))
// Output: 1Paused
Paused returns paused count for a queue.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventQueuePaused,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
snapshot := collector.Snapshot()
fmt.Println(snapshot.Paused("default"))
// Output: 1Pending
Pending returns pending count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Pending: 3},
},
}
fmt.Println(snapshot.Pending("default"))
// Output: 3Processed
Processed returns processed count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Processed: 11},
},
}
fmt.Println(snapshot.Processed("default"))
// Output: 11StatsSnapshot.Queue
Queue returns queue counters for a queue name.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
snapshot := collector.Snapshot()
counters, ok := snapshot.Queue("default")
fmt.Println(ok, counters.Pending)
// Output: true 1Queues
Queues returns sorted queue names present in the snapshot.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "critical",
Time: time.Now(),
})
snapshot := collector.Snapshot()
names := snapshot.Queues()
fmt.Println(len(names), names[0])
// Output: 1 criticalReady
Ready validates backend readiness for the provided queue runtime.
q, _ := queue.NewSync()
fmt.Println(queue.Ready(context.Background(), q) == nil)
// trueResume
Resume resumes queue consumption for drivers that support it.
q, _ := queue.NewSync()
_ = queue.Pause(context.Background(), q, "default")
_ = queue.Resume(context.Background(), q, "default")
snapshot, _ := queue.Snapshot(context.Background(), q, nil)
fmt.Println(snapshot.Paused("default"))
// Output: 0RetryCount
RetryCount returns retry count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Retry: 1},
},
}
fmt.Println(snapshot.RetryCount("default"))
// Output: 1SafeObserve
SafeObserve delivers an event to an observer and recovers observer panics.
This is an advanced helper intended for driver-module implementations.
Scheduled
Scheduled returns scheduled count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Scheduled: 4},
},
}
fmt.Println(snapshot.Scheduled("default"))
// Output: 4Snapshot
Snapshot returns driver-native stats, falling back to collector data.
q, _ := queue.NewSync()
snapshot, _ := q.Stats(context.Background())
_, ok := snapshot.Queue("default")
fmt.Println(ok)
// Output: trueStatsCollector.Snapshot
Snapshot returns a copy of collected counters.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
collector.Observe(queue.Event{
Kind: queue.EventProcessStarted,
Driver: queue.DriverSync,
Queue: "default",
JobKey: "job-1",
Time: time.Now(),
})
collector.Observe(queue.Event{
Kind: queue.EventProcessSucceeded,
Driver: queue.DriverSync,
Queue: "default",
JobKey: "job-1",
Duration: 12 * time.Millisecond,
Time: time.Now(),
})
snapshot := collector.Snapshot()
counters, _ := snapshot.Queue("default")
throughput, _ := snapshot.Throughput("default")
fmt.Printf("queues=%v\n", snapshot.Queues())
fmt.Printf("counters=%+v\n", counters)
fmt.Printf("hour=%+v\n", throughput.Hour)
// Output:
// queues=[default]
// counters={Pending:0 Active:0 Scheduled:0 Retry:0 Archived:0 Processed:1 Failed:0 Paused:0 AvgWait:0s AvgRun:12ms}
// hour={Processed:1 Failed:0}SupportsNativeStats
SupportsNativeStats reports whether a queue runtime exposes native stats snapshots.
q, _ := queue.NewSync()
fmt.Println(queue.SupportsNativeStats(q))
// Output: trueSupportsPause
SupportsPause reports whether a queue runtime supports Pause/Resume.
q, _ := queue.NewSync()
fmt.Println(queue.SupportsPause(q))
// Output: trueThroughput
Throughput returns rolling throughput windows for a queue name.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventProcessSucceeded,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
snapshot := collector.Snapshot()
throughput, ok := snapshot.Throughput("default")
fmt.Printf("ok=%v hour=%+v day=%+v week=%+v\n", ok, throughput.Hour, throughput.Day, throughput.Week)
// Output: ok=true hour={Processed:1 Failed:0} day={Processed:1 Failed:0} week={Processed:1 Failed:0}Queue
Batch
Batch creates a batch builder for fan-out workflow execution.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
_, _ = q.Batch(
queue.NewJob("emails:send").Payload(map[string]any{"id": 1}),
queue.NewJob("emails:send").Payload(map[string]any{"id": 2}),
).Name("send-emails").OnQueue("default").Dispatch(context.Background())Chain
Chain creates a chain builder for sequential workflow execution.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("first", func(ctx context.Context, m queue.Message) error { return nil })
q.Register("second", func(ctx context.Context, m queue.Message) error { return nil })
_, _ = q.Chain(
queue.NewJob("first"),
queue.NewJob("second"),
).OnQueue("default").Dispatch(context.Background())Queue.Dispatch
Dispatch enqueues a high-level job using context.Background.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
job := queue.NewJob("emails:send").Payload(map[string]any{"id": 1}).OnQueue("default")
_, _ = q.Dispatch(job)Queue.DispatchCtx
DispatchCtx enqueues a high-level job using the provided context.
Queue.Driver
Driver reports the configured backend driver for the underlying queue runtime.
q, err := queue.NewSync()
if err != nil {
return
}
fmt.Println(q.Driver())
// Output: syncFindBatch
FindBatch returns current batch state by ID.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
batchID, err := q.Batch(queue.NewJob("emails:send")).Dispatch(context.Background())
if err != nil {
return
}
_, _ = q.FindBatch(context.Background(), batchID)FindChain
FindChain returns current chain state by ID.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("first", func(ctx context.Context, m queue.Message) error { return nil })
chainID, err := q.Chain(queue.NewJob("first")).Dispatch(context.Background())
if err != nil {
return
}
_, _ = q.FindChain(context.Background(), chainID)Queue.Pause
Pause pauses consumption for a queue when supported by the underlying driver. See the README "Queue Backends" table for Pause/Resume support and docs/backend-guarantees.md (Capability Matrix) for broader backend differences.
q, err := queue.NewSync()
if err != nil {
return
}
if queue.SupportsPause(q) {
_ = q.Pause(context.Background(), "default")
}Prune
Prune deletes old workflow state records.
q, err := queue.NewSync()
if err != nil {
return
}
_ = q.Prune(context.Background(), time.Now().Add(-24*time.Hour))Queue.Ready
Ready validates queue backend readiness for dispatch/worker operation.
q, err := queue.NewSync()
if err != nil {
return
}
fmt.Println(q.Ready(context.Background()) == nil)
// trueQueue.Register
Register binds a handler for a high-level job type.
q, err := queue.NewSync()
if err != nil {
return
}
type EmailPayload struct {
ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
var payload EmailPayload
if err := m.Bind(&payload); err != nil {
return err
}
_ = payload
return nil
})Queue.Resume
Resume resumes consumption for a queue when supported by the underlying driver.
q, err := queue.NewSync()
if err != nil {
return
}
if queue.SupportsPause(q) {
_ = q.Resume(context.Background(), "default")
}Run
Run starts worker processing, blocks until ctx is canceled, then gracefully shuts down.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q, err := queue.NewWorkerpool()
if err != nil {
return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()
_ = q.Run(ctx)Queue.Shutdown
Shutdown drains workers and closes underlying resources.
q, err := queue.NewWorkerpool()
if err != nil {
return
}
_ = q.StartWorkers(context.Background())
_ = q.Shutdown(context.Background())Queue.StartWorkers
StartWorkers starts worker processing.
q, err := queue.NewWorkerpool()
if err != nil {
return
}
_ = q.StartWorkers(context.Background())Stats
Stats returns a normalized snapshot when supported by the underlying driver.
q, err := queue.NewSync()
if err != nil {
return
}
if queue.SupportsNativeStats(q) {
_, _ = q.Stats(context.Background())
}WithClock
WithClock overrides the workflow runtime clock.
q, err := queue.New(
queue.Config{Driver: queue.DriverSync},
queue.WithClock(func() time.Time { return time.Unix(0, 0) }),
)
if err != nil {
return
}WithMiddleware
WithMiddleware appends queue workflow middleware.
mw := queue.MiddlewareFunc(func(ctx context.Context, m queue.Message, next queue.Next) error {
return next(ctx, m)
})
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithMiddleware(mw))
if err != nil {
return
}WithObserver
WithObserver installs a workflow lifecycle observer.
observer := queue.WorkflowObserverFunc(func(event queue.WorkflowEvent) {
_ = event.Kind
})
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithObserver(observer))
if err != nil {
return
}WithStore
WithStore overrides the workflow orchestration store.
var store queue.WorkflowStore
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithStore(store))
if err != nil {
return
}WithWorkers
WithWorkers sets desired worker concurrency before StartWorkers. It applies to high-level queue constructors (for example NewWorkerpool/New/NewSync).
q, err := queue.NewWorkerpool(
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}Queue.WithWorkers
WithWorkers sets desired worker concurrency before StartWorkers.
q, err := queue.NewWorkerpool()
if err != nil {
return
}
q.WithWorkers(4) // optional; default: runtime.NumCPU() (min 1)Testing
FakeQueue.AssertCount
AssertCount fails when dispatch count is not expected.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertCount(t, 1)FakeQueue.AssertDispatched
AssertDispatched fails when jobType was not dispatched.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertDispatched(t, "emails:send")FakeQueue.AssertDispatchedOn
AssertDispatchedOn fails when jobType was not dispatched on queueName.
fake := queue.NewFake()
_ = fake.Dispatch(
queue.NewJob("emails:send").
OnQueue("critical"),
)
fake.AssertDispatchedOn(t, "critical", "emails:send")FakeQueue.AssertDispatchedTimes
AssertDispatchedTimes fails when jobType dispatch count does not match expected.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertDispatchedTimes(t, "emails:send", 2)FakeQueue.AssertNotDispatched
AssertNotDispatched fails when jobType was dispatched.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertNotDispatched(t, "emails:cancel")FakeQueue.AssertNothingDispatched
AssertNothingDispatched fails when any dispatch was recorded.
fake := queue.NewFake()
fake.AssertNothingDispatched(t)FakeQueue.Dispatch
Dispatch records a typed job payload in-memory using the fake default queue.
fake := queue.NewFake()
err := fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))FakeQueue.DispatchCtx
DispatchCtx submits a typed job payload using the provided context.
fake := queue.NewFake()
ctx := context.Background()
err := fake.DispatchCtx(ctx, queue.NewJob("emails:send").OnQueue("default"))
fmt.Println(err == nil)
// Output: trueFakeQueue.Driver
Driver returns the active queue driver.
fake := queue.NewFake()
driver := fake.Driver()NewFake
NewFake creates a queue fake that records dispatches and provides assertions.
fake := queue.NewFake()
_ = fake.Dispatch(
queue.NewJob("emails:send").
Payload(map[string]any{"id": 1}).
OnQueue("critical"),
)
records := fake.Records()
fmt.Println(len(records), records[0].Queue, records[0].Job.Type)
// Output: 1 critical emails:sendFakeQueue.Ready
Ready validates fake queue readiness.
fake := queue.NewFake()
fmt.Println(fake.Ready(context.Background()) == nil)
// trueFakeQueue.Records
Records returns a copy of all dispatch records.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
records := fake.Records()
fmt.Println(len(records), records[0].Job.Type)
// Output: 1 emails:sendFakeQueue.Register
Register associates a handler with a job type.
fake := queue.NewFake()
fake.Register("emails:send", func(context.Context, queue.Job) error { return nil })FakeQueue.Reset
Reset clears all recorded dispatches.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
fmt.Println(len(fake.Records()))
fake.Reset()
fmt.Println(len(fake.Records()))
// Output:
// 1
// 0FakeQueue.Shutdown
Shutdown drains running work and releases resources.
fake := queue.NewFake()
err := fake.Shutdown(context.Background())FakeQueue.StartWorkers
StartWorkers starts worker execution.
fake := queue.NewFake()
err := fake.StartWorkers(context.Background())FakeQueue.Workers
Workers sets desired worker concurrency before StartWorkers.
fake := queue.NewFake()
q := fake.Workers(4)
fmt.Println(q != nil)
// Output: trueDriver Constructors
mysqlqueue
mysqlqueue.New
New creates a high-level Queue using the MySQL SQL backend.
q, err := mysqlqueue.New(
"user:pass@tcp(127.0.0.1:3306)/queue?parseTime=true",
queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
return
}mysqlqueue.NewWithConfig
NewWithConfig creates a high-level Queue using an explicit MySQL SQL driver config.
q, err := mysqlqueue.NewWithConfig(
mysqlqueue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
DB: nil, // optional; provide *sql.DB instead of DSN
DSN: "user:pass@tcp(127.0.0.1:3306)/queue?parseTime=true", // optional if DB is set
ProcessingRecoveryGrace: 2 * time.Second, // default if <=0: 2s
ProcessingLeaseNoTimeout: 5 * time.Minute, // default if <=0: 5m
},
queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
return
}natsqueue
natsqueue.New
New creates a high-level Queue using the NATS backend.
q, err := natsqueue.New(
"nats://127.0.0.1:4222",
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}natsqueue.NewWithConfig
NewWithConfig creates a high-level Queue using an explicit NATS driver config.
q, err := natsqueue.NewWithConfig(
natsqueue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
URL: "nats://127.0.0.1:4222", // required
},
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}postgresqueue
postgresqueue.New
New creates a high-level Queue using the Postgres SQL backend.
q, err := postgresqueue.New(
"postgres://user:[email protected]:5432/queue?sslmode=disable",
queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
return
}postgresqueue.NewWithConfig
NewWithConfig creates a high-level Queue using an explicit Postgres SQL driver config.
q, err := postgresqueue.NewWithConfig(
postgresqueue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
DB: nil, // optional; provide *sql.DB instead of DSN
DSN: "postgres://user:[email protected]:5432/queue?sslmode=disable", // optional if DB is set
ProcessingRecoveryGrace: 2 * time.Second, // default if <=0: 2s
ProcessingLeaseNoTimeout: 5 * time.Minute, // default if <=0: 5m
},
queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
return
}rabbitmqqueue
rabbitmqqueue.New
New creates a high-level Queue using the RabbitMQ backend.
q, err := rabbitmqqueue.New(
"amqp://guest:[email protected]:5672/",
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}rabbitmqqueue.NewWithConfig
NewWithConfig creates a high-level Queue using an explicit RabbitMQ driver config.
q, err := rabbitmqqueue.NewWithConfig(
rabbitmqqueue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
URL: "amqp://guest:[email protected]:5672/", // required
},
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}redisqueue
redisqueue.New
New creates a high-level Queue using the Redis backend.
q, err := redisqueue.New(
"127.0.0.1:6379",
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}redisqueue.NewWithConfig
NewWithConfig creates a high-level Queue using an explicit Redis driver config.
q, err := redisqueue.NewWithConfig(
redisqueue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
Addr: "127.0.0.1:6379", // required
Password: "", // optional; default empty
DB: 0, // optional; default 0
ServerLogger: nil, // optional; default backend logger
ServerLogLevel: redisqueue.ServerLogLevelDefault, // optional
},
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}sqlitequeue
sqlitequeue.New
New creates a high-level Queue using the SQLite SQL backend.
q, err := sqlitequeue.New(
"file:queue.db?_busy_timeout=5000",
queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
return
}sqlitequeue.NewWithConfig
NewWithConfig creates a high-level Queue using an explicit SQLite SQL driver config.
q, err := sqlitequeue.NewWithConfig(
sqlitequeue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
DB: nil, // optional; provide *sql.DB instead of DSN
DSN: "file:queue.db?_busy_timeout=5000", // optional if DB is set
ProcessingRecoveryGrace: 2 * time.Second, // default if <=0: 2s
ProcessingLeaseNoTimeout: 5 * time.Minute, // default if <=0: 5m
},
queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
return
}sqsqueue
sqsqueue.New
New creates a high-level Queue using the SQS backend.
q, err := sqsqueue.New(
"us-east-1",
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}sqsqueue.NewWithConfig
NewWithConfig creates a high-level Queue using an explicit SQS driver config.
q, err := sqsqueue.NewWithConfig(
sqsqueue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
Region: "us-east-1", // default if empty: "us-east-1"
Endpoint: "", // optional; set for LocalStack/custom endpoint
AccessKey: "", // optional; static credentials
SecretKey: "", // optional; static credentials
},
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}Testing API
Examples in this section assume they are used inside tests and t is a *testing.T (or testing.TB).
Fake.AssertBatchCount
AssertBatchCount fails if total recorded workflow batch count does not match n.
f := queuefake.New()
_, _ = f.Workflow().Batch(bus.NewJob("a", nil)).Dispatch(nil)
f.AssertBatchCount(t, 1)Fake.AssertBatched
AssertBatched fails unless at least one recorded workflow batch matches predicate.
f := queuefake.New()
_, _ = f.Workflow().Batch(bus.NewJob("a", nil), bus.NewJob("b", nil)).Dispatch(nil)
f.AssertBatched(t, func(spec bus.BatchSpec) bool { return len(spec.JobTypes) == 2 })Fake.AssertChained
AssertChained fails if no recorded workflow chain matches expected job type order.
f := queuefake.New()
_, _ = f.Workflow().Chain(bus.NewJob("a", nil), bus.NewJob("b", nil)).Dispatch(nil)
f.AssertChained(t, []string{"a", "b"})Fake.AssertCount
AssertCount fails when total dispatch count is not expected.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("a"))
_ = q.Dispatch(queue.NewJob("b"))
f.AssertCount(t, 2)Fake.AssertDispatched
AssertDispatched fails when jobType was not dispatched.
f := queuefake.New()
_ = f.Queue().Dispatch(queue.NewJob("emails:send"))
f.AssertDispatched(t, "emails:send")Fake.AssertDispatchedOn
AssertDispatchedOn fails when jobType was not dispatched on queueName.
f := queuefake.New()
_ = f.Queue().Dispatch(queue.NewJob("emails:send").OnQueue("critical"))
f.AssertDispatchedOn(t, "critical", "emails:send")Fake.AssertDispatchedTimes
AssertDispatchedTimes fails when jobType dispatch count does not match expected.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send"))
_ = q.Dispatch(queue.NewJob("emails:send"))
f.AssertDispatchedTimes(t, "emails:send", 2)Fake.AssertNotDispatched
AssertNotDispatched fails when jobType was dispatched.
f := queuefake.New()
f.AssertNotDispatched(t, "emails:send")Fake.AssertNothingBatched
AssertNothingBatched fails if any workflow batch was recorded.
f := queuefake.New()
f.AssertNothingBatched(t)Fake.AssertNothingDispatched
AssertNothingDispatched fails when any dispatch was recorded.
f := queuefake.New()
f.AssertNothingDispatched(t)Fake.AssertNothingWorkflowDispatched
AssertNothingWorkflowDispatched fails when any workflow dispatch was recorded.
f := queuefake.New()
f.AssertNothingWorkflowDispatched(t)Fake.AssertWorkflowDispatched
AssertWorkflowDispatched fails when jobType was not workflow-dispatched.
f := queuefake.New()
_, _ = f.Workflow().Chain(bus.NewJob("a", nil)).Dispatch(nil)
f.AssertWorkflowDispatched(t, "a")Fake.AssertWorkflowDispatchedOn
AssertWorkflowDispatchedOn fails when jobType was not workflow-dispatched on queueName.
f := queuefake.New()
_, _ = f.Workflow().Chain(bus.NewJob("a", nil)).OnQueue("critical").Dispatch(nil)
f.AssertWorkflowDispatchedOn(t, "critical", "a")Fake.AssertWorkflowDispatchedTimes
AssertWorkflowDispatchedTimes fails when workflow dispatch count for jobType does not match expected.
f := queuefake.New()
wf := f.Workflow()
_, _ = wf.Chain(bus.NewJob("a", nil)).Dispatch(nil)
_, _ = wf.Chain(bus.NewJob("a", nil)).Dispatch(nil)
f.AssertWorkflowDispatchedTimes(t, "a", 2)Fake.AssertWorkflowNotDispatched
AssertWorkflowNotDispatched fails when jobType was workflow-dispatched.
f := queuefake.New()
f.AssertWorkflowNotDispatched(t, "emails:send")Fake.Count
Count returns the total number of recorded dispatches.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("a"))
_ = q.Dispatch(queue.NewJob("b"))
_ = f.Count()Fake.CountJob
CountJob returns how many times a job type was dispatched.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send"))
_ = q.Dispatch(queue.NewJob("emails:send"))
_ = f.CountJob("emails:send")Fake.CountOn
CountOn returns how many times a job type was dispatched on a queue.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send").OnQueue("critical"))
_ = f.CountOn("critical", "emails:send")queuefake.New
New creates a fake queue harness backed by queue.NewFake().
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
f.AssertDispatched(t, "emails:send")
f.AssertCount(t, 1)Fake.Queue
Queue returns the queue fake to inject into code under test.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send").OnQueue("default"))Fake.Records
Records returns a copy of recorded dispatches.
f := queuefake.New()
_ = f.Queue().Dispatch(queue.NewJob("emails:send"))
records := f.Records()Fake.Reset
Reset clears recorded dispatches.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send"))
f.Reset()
f.AssertNothingDispatched(t)Fake.Workflow
Workflow returns the workflow/orchestration fake for chain/batch assertions.
f := queuefake.New()
wf := f.Workflow()
_, _ = wf.Chain(
bus.NewJob("a", nil),
bus.NewJob("b", nil),
).Dispatch(context.Background())
f.AssertChained(t, []string{"a", "b"})Contributing
Testing
Unit tests (root module):
go test ./...Integration tests (separate integration module):
go test -tags=integration ./integration/...Select specific backends with INTEGRATION_BACKEND (comma-separated), for example:
INTEGRATION_BACKEND=sqlite go test -tags=integration ./integration/...
INTEGRATION_BACKEND=redis,rabbitmq go test -tags=integration ./integration/... -count=1
INTEGRATION_BACKEND=all go test -tags=integration ./integration/... -count=1Matrix status and backend integration notes are tracked in docs/integration-scenarios.md.
