Adapters make Workflow infrastructure-agnostic by providing standardised interfaces for different technology stacks. This guide explains how adapters work, which ones are available, and how to choose the right combination for your needs.
💡 Getting Started with SQL? Check out the Database Setup Guide for complete MariaDB/MySQL and PostgreSQL setup instructions with connection strings, schema creation, and performance tuning.
Workflow uses the adapter pattern to decouple core workflow logic from infrastructure concerns. Each adapter type serves a specific purpose:
┌─────────────────────────────────────────────────────────────┐
│ Workflow Core │
├─────────────────────────────────────────────────────────────┤
│ EventStreamer │ RecordStore │ RoleScheduler │ TimeoutStore │
│ Interface │ Interface │ Interface │ Interface │
├─────────────────┼───────────────┼───────────────┼──────────────┤
│ • Kafka │ • PostgreSQL │ • Rink │ • SQL │
│ • Reflex │ • MySQL │ • etcd │ • Redis │
│ • Memory │ • Memory │ • Memory │ • Memory │
└─────────────────┴───────────────┴───────────────┴──────────────┘
Purpose: Publish and consume workflow events for step coordination.
Interface:
type EventStreamer interface {
NewSender(ctx context.Context, topic string) (EventSender, error)
NewReceiver(ctx context.Context, topic string, name string, opts ...ReceiverOption) (EventReceiver, error)
}
type EventSender interface {
Send(ctx context.Context, foreignID string, statusType int, headers map[Header]string) error
Close() error
}
type EventReceiver interface {
Recv(ctx context.Context) (*Event, Ack, error)
Close() error
}Available Adapters:
| Adapter | Use Case | Install |
|---|---|---|
| kafkastreamer | Production event streaming | go get github.com/luno/workflow/adapters/kafkastreamer |
| reflexstreamer | Luno's Reflex event sourcing | go get github.com/luno/workflow/adapters/reflexstreamer |
| memstreamer | Development and testing | Built-in |
Example:
// Kafka for production
kafkaConfig := sarama.NewConfig()
kafkaConfig.Producer.RequiredAcks = sarama.WaitForAll
streamer := kafkastreamer.New([]string{"kafka:9092"}, kafkaConfig)
// Memory for development
streamer := memstreamer.New()Purpose: Persist workflow run state with transactional guarantees.
Interface:
type RecordStore interface {
Store(ctx context.Context, record *Record) error
Lookup(ctx context.Context, runID string) (*Record, error)
Latest(ctx context.Context, workflowName, foreignID string) (*Record, error)
List(ctx context.Context, workflowName string, offsetID int64, limit int, order OrderType, filters ...RecordFilter) ([]Record, error)
// Outbox pattern support
ListOutboxEvents(ctx context.Context, workflowName string, limit int64) ([]OutboxEvent, error)
DeleteOutboxEvent(ctx context.Context, id string) error
}Available Adapters:
| Adapter | Use Case | Install |
|---|---|---|
| sqlstore | Production with SQL databases | go get github.com/luno/workflow/adapters/sqlstore |
| memrecordstore | Development and testing | Built-in |
Requirements:
- ACID Transactions: Required for transactional outbox pattern
- Query Support: Must support filtering, sorting, and pagination
- Schema Management: Must handle workflow schema evolution
Example:
// PostgreSQL for production
db, err := sql.Open("postgres", "postgres://user:pass@host/db")
store := sqlstore.New(db, db, "workflow_records", "workflow_outbox")
// MariaDB/MySQL for production
db, err := sql.Open("mysql", "user:pass@tcp(localhost:3306)/workflow_db?parseTime=true")
store := sqlstore.New(db, db, "workflow_records", "workflow_outbox")
// Memory for development
store := memrecordstore.New()Purpose: Coordinate distributed execution ensuring only one instance of each role runs at a time.
Interface:
type RoleScheduler interface {
Await(ctx context.Context, role string) (context.Context, context.CancelFunc, error)
}Available Adapters:
| Adapter | Use Case | Install |
|---|---|---|
| rinkrolescheduler | Production distributed coordination | go get github.com/luno/workflow/adapters/rinkrolescheduler |
| memrolescheduler | Single-instance development | Built-in |
Example:
// Rink for production distributed systems
rinkConfig := rink.Config{
Endpoints: []string{"rink-1:8080", "rink-2:8080"},
}
scheduler := rinkrolescheduler.New(rinkConfig)
// Memory for single instance
scheduler := memrolescheduler.New()Purpose: Schedule durable timeouts that survive process restarts.
Interface:
type TimeoutStore interface {
Store(ctx context.Context, timeout Timeout) error
List(ctx context.Context, workflowName string, status Status) ([]Timeout, error)
Complete(ctx context.Context, id string) error
}Available Adapters:
| Adapter | Use Case | Install |
|---|---|---|
| sqltimeout | Production durable timeouts | go get github.com/luno/workflow/adapters/sqltimeout |
| memtimeoutstore | Development and testing | Built-in |
Example:
// SQL for production
timeoutStore := sqltimeout.New(db)
// Built with timeout support
wf := b.Build(
eventStreamer, recordStore, roleScheduler,
workflow.WithTimeoutStore(timeoutStore),
)Goal: Fast feedback, easy debugging, minimal setup.
func NewDevelopmentWorkflow() *workflow.Workflow[Order, OrderStatus] {
return b.Build(
memstreamer.New(),
memrecordstore.New(),
memrolescheduler.New(),
// No timeout store needed for development
)
}Characteristics:
- ✅ Zero infrastructure dependencies
- ✅ Fast startup/teardown
- ✅ Perfect for unit tests
- ❌ No persistence across restarts
- ❌ Single instance only
Goal: Production-like environment for integration testing.
func NewStagingWorkflow() *workflow.Workflow[Order, OrderStatus] {
db := setupDatabase()
return b.Build(
kafkastreamer.New(kafkaBrokers, kafkaConfig),
sqlstore.New(db, "workflow_records", "workflow_outbox"),
rinkrolescheduler.New(rinkConfig),
workflow.WithTimeoutStore(sqltimeout.New(db)),
)
}Characteristics:
- ✅ Full production adapters
- ✅ Persistent storage
- ✅ Multi-instance testing
⚠️ Shared infrastructure with other services
Goal: Maximum reliability, scalability, and observability.
func NewProductionWorkflow() *workflow.Workflow[Order, OrderStatus] {
// Production database with connection pooling
db := setupProductionDB()
// Kafka with optimal configuration
kafkaConfig := &sarama.Config{
Producer.RequiredAcks: sarama.WaitForAll,
Producer.Retry.Max: 5,
Consumer.Group.Rebalance.Strategy: sarama.BalanceStrategyRoundRobin,
}
return b.Build(
kafkastreamer.New(kafkaBrokers, kafkaConfig),
sqlstore.New(db, "workflow_records", "workflow_outbox"),
rinkrolescheduler.New(rinkConfig),
workflow.WithTimeoutStore(sqltimeout.New(db)),
workflow.WithDefaultOptions(
workflow.ParallelCount(5),
workflow.ErrBackOff(time.Minute),
workflow.PauseAfterErrCount(3),
),
)
}All adapter implementations should be tested using the provided adapter test suites:
func TestMyEventStreamer(t *testing.T) {
streamer := myeventstreamer.New(config)
adaptertest.TestEventStreamer(t, streamer)
}func TestMyRecordStore(t *testing.T) {
store := myrecordstore.New(config)
adaptertest.RunRecordStoreTest(t, store)
}func TestMyRoleScheduler(t *testing.T) {
scheduler := myrolescheduler.New(config)
adaptertest.RunRoleSchedulerTest(t, scheduler)
}type MyEventStreamer struct {
config Config
}
func (s *MyEventStreamer) NewSender(ctx context.Context, topic string) (workflow.EventSender, error) {
return &MySender{
client: s.client,
topic: topic,
}, nil
}
func (s *MyEventStreamer) NewReceiver(ctx context.Context, topic string, name string, opts ...workflow.ReceiverOption) (workflow.EventReceiver, error) {
return &MyReceiver{
client: s.client,
topic: topic,
groupName: name,
}, nil
}
type MySender struct {
client MyClient
topic string
}
func (s *MySender) Send(ctx context.Context, foreignID string, statusType int, headers map[workflow.Header]string) error {
return s.client.Publish(ctx, s.topic, foreignID, statusType, headers)
}
func (s *MySender) Close() error {
return s.client.Close()
}
type MyReceiver struct {
client MyClient
topic string
groupName string
}
func (r *MyReceiver) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, error) {
msg, err := r.client.PollMessage(ctx, r.topic, r.groupName)
if err != nil {
return nil, nil, err
}
event := &workflow.Event{
ID: msg.ID,
ForeignID: msg.ForeignID,
Type: msg.Type,
// ... map other fields
}
ack := func() error {
return r.client.AckMessage(ctx, msg.ID)
}
return event, ack, nil
}
func (r *MyReceiver) Close() error {
return r.client.Close()
}type MyRecordStore struct {
db MyDatabase
}
func (s *MyRecordStore) Store(ctx context.Context, record *workflow.Record) error {
tx, err := s.db.BeginTx(ctx)
if err != nil {
return err
}
defer tx.Rollback()
// Store the record
if err := s.storeRecord(tx, record); err != nil {
return err
}
// Store outbox events
if err := s.storeOutboxEvents(tx, record.OutboxEvents); err != nil {
return err
}
return tx.Commit()
}
func (s *MyRecordStore) Lookup(ctx context.Context, runID string) (*workflow.Record, error) {
// Query record by run ID
row := s.db.QueryRowContext(ctx, "SELECT ... FROM records WHERE run_id = ?", runID)
return s.scanRecord(row)
}
// Implement other interface methods...kafkaConfig := &sarama.Config{
// Producer settings
Producer.RequiredAcks: sarama.WaitForAll, // Durability
Producer.Retry.Max: 5, // Retries
Producer.Flush.Frequency: 100 * time.Millisecond, // Batching
Producer.Flush.Messages: 100, // Batch size
Producer.Compression: sarama.CompressionSnappy, // Compression
// Consumer settings
Consumer.Offsets.Initial: sarama.OffsetOldest, // Start from beginning
Consumer.Fetch.Min: 1024, // Min fetch size
Consumer.Fetch.Max: 1024 * 1024, // Max fetch size
Consumer.Group.Heartbeat.Interval: 3 * time.Second, // Heartbeat
Consumer.Group.Session.Timeout: 10 * time.Second, // Session timeout
}-- Indexes for workflow_records
CREATE INDEX idx_workflow_records_workflow_foreign ON workflow_records(workflow_name, foreign_id);
CREATE INDEX idx_workflow_records_status ON workflow_records(workflow_name, status);
CREATE INDEX idx_workflow_records_run_state ON workflow_records(run_state);
CREATE INDEX idx_workflow_records_updated_at ON workflow_records(updated_at);
-- Indexes for workflow_outbox
CREATE INDEX idx_workflow_outbox_workflow_created ON workflow_outbox(workflow_name, created_at);
-- Connection pool settings
max_connections = 100
shared_buffers = '256MB'
effective_cache_size = '1GB'// Configure workflow options for memory efficiency
workflow.WithDefaultOptions(
workflow.ParallelCount(5), // Don't over-parallelize
workflow.PollingFrequency(500*time.Millisecond), // Reduce polling frequency
workflow.ErrBackOff(time.Minute), // Longer backoff reduces load
)Some adapters provide additional monitoring capabilities:
import "github.com/luno/workflow/adapters/webui"
// Add HTTP handlers for workflow monitoring
http.Handle("/", webui.HomeHandlerFunc(webui.Paths{
List: "/api/list",
ObjectData: "/api/object",
}))
http.HandleFunc("/api/list", webui.ListHandlerFunc(recordStore))
http.HandleFunc("/api/object", webui.ObjectDataHandlerFunc(recordStore))import "github.com/luno/workflow/adapters/jlog"
// Use structured logging
logger := jlog.New()
wf := b.Build(
eventStreamer, recordStore, roleScheduler,
workflow.WithLogger(logger),
)- Replace adapters in build configuration
- Migrate data if needed (usually not, since development uses memory)
- Update configuration for production settings
- Test thoroughly with production-like load
- Deploy new version with new adapter
- Let existing events drain from old system
- Switch traffic to new system
- Decommission old system
- Schema changes: Use migration scripts
- Data migration: Export/import if changing database types
- Zero-downtime: Use blue/green deployment pattern
- Use production adapters in staging: Catch integration issues early
- Test adapter combinations: Some combinations may have unexpected behavior
- Monitor adapter performance: Each adapter adds latency and failure points
- Keep adapters updated: Security and performance improvements
- Implement health checks: Verify adapter connectivity and performance
- Plan for failure: What happens if an adapter becomes unavailable?
Adapters are the foundation of Workflow's flexibility. Choose the right combination for your needs and scale them as your requirements grow.
- Configuration - Tune adapter and workflow performance
- Deployment - Production deployment patterns
- Monitoring - Monitor adapter health and performance