Skip to content

venndr/commanded_mongodb_adapter

Repository files navigation

Commanded MongoDB Adapter

A MongoDB adapter for Commanded, providing event sourcing capabilities using MongoDB as the underlying event store.

Features

  • Event Sourcing with MongoDB: Store and retrieve events using MongoDB's flexible document model
  • Optimistic Concurrency Control: Ensures stream consistency using expected version checks
  • Event Subscriptions: Subscribe to all events with automatic checkpoint management
  • Snapshot Support: Store and retrieve aggregate snapshots for performance optimization
  • Stream Queries: Efficiently query events by stream with forward streaming
  • Indexes: Automatically creates optimized indexes for event retrieval
  • Connection Pooling: Configurable connection pooling for high-performance applications

Installation

Add commanded_mongodb_adapter to your list of dependencies in mix.exs:

def deps do
  [
    {:commanded, "~> 1.4"},
    {:commanded_mongodb_adapter, "~> 0.1.0"},
    {:mongodb_driver, "~> 1.0"},
    {:jason, "~> 1.3"}  # For JSON serialization
  ]
end

Configuration

Configure Commanded to use the MongoDB adapter in your config/config.exs:

config :my_app, MyApp.EventStore,
  adapter: Commanded.EventStore.Adapters.MongoDB,
  serializer: Commanded.Serialization.JsonSerializer,
  url: "mongodb://localhost:27017/eventstore"

Connection Options

You can use either a connection URL or individual connection parameters:

Using URL

config :my_app, MyApp.EventStore,
  adapter: Commanded.EventStore.Adapters.MongoDB,
  serializer: Commanded.Serialization.JsonSerializer,
  url: "mongodb://username:password@localhost:27017/eventstore?authSource=admin"

Using Individual Parameters

config :my_app, MyApp.EventStore,
  adapter: Commanded.EventStore.Adapters.MongoDB,
  serializer: Commanded.Serialization.JsonSerializer,
  hostname: "localhost",
  port: 27017,
  database: "eventstore",
  username: "user",
  password: "pass",
  auth_source: "admin",
  pool_size: 10,
  timeout: 15_000

Configuration Options

  • :adapter - The adapter module (required, set to Commanded.EventStore.Adapters.MongoDB)
  • :serializer - Event serializer module (required, e.g., Commanded.Serialization.JsonSerializer)
  • :url - MongoDB connection URL (alternative to individual parameters)
  • :hostname - MongoDB hostname (default: "localhost")
  • :port - MongoDB port (default: 27017)
  • :database - Database name (required)
  • :username - Authentication username (optional)
  • :password - Authentication password (optional)
  • :auth_source - Authentication database (optional)
  • :pool_size - Connection pool size (default: 10)
  • :timeout - Query timeout in milliseconds (default: 15000)

Usage

Define Your Event Store

defmodule MyApp.EventStore do
  use Commanded.EventStore,
    adapter: Commanded.EventStore.Adapters.MongoDB,
    serializer: Commanded.Serialization.JsonSerializer

  def init(config) do
    {:ok, config}
  end
end

Add to Your Application Supervisor

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      MyApp.EventStore,
      MyApp.Router
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Appending Events

alias Commanded.EventStore.EventData

events = [
  %EventData{
    event_type: "BankAccountOpened",
    data: %{account_number: "12345", initial_balance: 1000},
    metadata: %{user_id: "user123"}
  }
]

:ok = MyApp.EventStore.append_to_stream(
  "account-12345",
  0,  # expected version
  events
)

Reading Events

# Stream events forward
MyApp.EventStore.stream_forward("account-12345", 0)
|> Enum.each(fn event ->
  IO.inspect(event)
end)

Subscribing to Events

{:ok, subscription} = MyApp.EventStore.subscribe_to_all_streams(
  "my_subscription",
  self(),
  start_from: :origin
)

# Receive events
receive do
  {:events, events} ->
    Enum.each(events, &process_event/1)
    MyApp.EventStore.ack_event(subscription, List.last(events))
end

Snapshots

alias Commanded.EventStore.SnapshotData

# Record a snapshot
snapshot = %SnapshotData{
  source_uuid: "account-12345",
  source_version: 10,
  source_type: "BankAccount",
  data: %{balance: 5000},
  metadata: %{}
}

:ok = MyApp.EventStore.record_snapshot(snapshot)

# Read a snapshot
{:ok, snapshot} = MyApp.EventStore.read_snapshot("account-12345")

MongoDB Collections

The adapter creates and manages the following MongoDB collections:

events

Stores all events with global ordering.

Fields:

  • event_id - UUID for the event
  • stream_uuid - Stream identifier
  • stream_version - Version within the stream
  • event_number - Global event number (unique, sequential)
  • event_type - Event type string
  • data - Serialized event data
  • metadata - Serialized event metadata
  • correlation_id - Optional correlation ID
  • causation_id - Optional causation ID
  • created_at - Timestamp

Indexes:

  • {stream_uuid: 1, stream_version: 1} - unique
  • {event_number: 1} - unique
  • {stream_uuid: 1}

streams

Tracks stream metadata and current versions for concurrency control.

Fields:

  • stream_uuid - Stream identifier (unique)
  • stream_version - Current version
  • created_at - Creation timestamp
  • updated_at - Last update timestamp

Indexes:

  • {stream_uuid: 1} - unique

snapshots

Stores aggregate snapshots.

Fields:

  • source_uuid - Aggregate identifier
  • source_version - Version at snapshot
  • source_type - Aggregate type
  • data - Serialized snapshot data
  • metadata - Serialized snapshot metadata
  • created_at - Timestamp

Indexes:

  • {source_uuid: 1} - unique

subscriptions

Tracks subscription progress (checkpoints).

Fields:

  • subscription_name - Subscription identifier (unique)
  • last_seen_event_number - Last acknowledged event number
  • updated_at - Last update timestamp

Indexes:

  • {subscription_name: 1} - unique

Architecture

Event Storage

Events are stored in the events collection with both stream-local versioning (stream_version) and global ordering (event_number). This dual versioning enables:

  • Optimistic concurrency control using stream versions
  • Global event ordering for subscriptions and $all stream queries
  • Efficient stream queries using compound indexes

Subscriptions

Subscriptions are implemented using a polling mechanism:

  1. Each subscription runs in its own GenServer process
  2. Events are polled at regular intervals (default: 1 second)
  3. Checkpoints are stored in the subscriptions collection
  4. Events are sent to subscribers as {:events, [events]} messages

Concurrency Control

The adapter uses MongoDB's unique indexes to ensure:

  • Stream versioning - Each event in a stream has a unique version
  • Global ordering - Each event has a unique global event number
  • Atomic appends - MongoDB's atomic operations ensure consistency

Limitations

  • Polling-based subscriptions: Unlike some event stores with push-based notifications, this adapter uses polling (configurable interval)
  • No built-in clustering: For clustered deployments, ensure MongoDB is configured for replication
  • Subscription replay: Large event stores may take time to catch up when starting a new subscription from :origin

Performance Considerations

  • Indexes: The adapter creates indexes automatically, but large collections may take time to index
  • Pool size: Adjust :pool_size based on your application's concurrency needs
  • Batch sizes: Event streaming uses batches of 1000 events by default
  • Polling interval: The subscription polling interval is set to 1 second by default

Development

Running Tests

mix deps.get
mix test

Running with Docker

Start MongoDB using Docker:

docker run -d -p 27017:27017 --name mongodb mongo:latest

Contributing

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/my-feature)
  3. Commit your changes (git commit -am 'Add my feature')
  4. Push to the branch (git push origin feature/my-feature)
  5. Create a new Pull Request

License

MIT License. See LICENSE for details.

Acknowledgments

Resources

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages