A MongoDB adapter for Commanded, providing event sourcing capabilities using MongoDB as the underlying event store.
- Event Sourcing with MongoDB: Store and retrieve events using MongoDB's flexible document model
- Optimistic Concurrency Control: Ensures stream consistency using expected version checks
- Event Subscriptions: Subscribe to all events with automatic checkpoint management
- Snapshot Support: Store and retrieve aggregate snapshots for performance optimization
- Stream Queries: Efficiently query events by stream with forward streaming
- Indexes: Automatically creates optimized indexes for event retrieval
- Connection Pooling: Configurable connection pooling for high-performance applications
Add commanded_mongodb_adapter to your list of dependencies in mix.exs:
def deps do
[
{:commanded, "~> 1.4"},
{:commanded_mongodb_adapter, "~> 0.1.0"},
{:mongodb_driver, "~> 1.0"},
{:jason, "~> 1.3"} # For JSON serialization
]
endConfigure Commanded to use the MongoDB adapter in your config/config.exs:
config :my_app, MyApp.EventStore,
adapter: Commanded.EventStore.Adapters.MongoDB,
serializer: Commanded.Serialization.JsonSerializer,
url: "mongodb://localhost:27017/eventstore"You can use either a connection URL or individual connection parameters:
config :my_app, MyApp.EventStore,
adapter: Commanded.EventStore.Adapters.MongoDB,
serializer: Commanded.Serialization.JsonSerializer,
url: "mongodb://username:password@localhost:27017/eventstore?authSource=admin"config :my_app, MyApp.EventStore,
adapter: Commanded.EventStore.Adapters.MongoDB,
serializer: Commanded.Serialization.JsonSerializer,
hostname: "localhost",
port: 27017,
database: "eventstore",
username: "user",
password: "pass",
auth_source: "admin",
pool_size: 10,
timeout: 15_000:adapter- The adapter module (required, set toCommanded.EventStore.Adapters.MongoDB):serializer- Event serializer module (required, e.g.,Commanded.Serialization.JsonSerializer):url- MongoDB connection URL (alternative to individual parameters):hostname- MongoDB hostname (default: "localhost"):port- MongoDB port (default: 27017):database- Database name (required):username- Authentication username (optional):password- Authentication password (optional):auth_source- Authentication database (optional):pool_size- Connection pool size (default: 10):timeout- Query timeout in milliseconds (default: 15000)
defmodule MyApp.EventStore do
use Commanded.EventStore,
adapter: Commanded.EventStore.Adapters.MongoDB,
serializer: Commanded.Serialization.JsonSerializer
def init(config) do
{:ok, config}
end
enddefmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
MyApp.EventStore,
MyApp.Router
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
endalias Commanded.EventStore.EventData
events = [
%EventData{
event_type: "BankAccountOpened",
data: %{account_number: "12345", initial_balance: 1000},
metadata: %{user_id: "user123"}
}
]
:ok = MyApp.EventStore.append_to_stream(
"account-12345",
0, # expected version
events
)# Stream events forward
MyApp.EventStore.stream_forward("account-12345", 0)
|> Enum.each(fn event ->
IO.inspect(event)
end){:ok, subscription} = MyApp.EventStore.subscribe_to_all_streams(
"my_subscription",
self(),
start_from: :origin
)
# Receive events
receive do
{:events, events} ->
Enum.each(events, &process_event/1)
MyApp.EventStore.ack_event(subscription, List.last(events))
endalias Commanded.EventStore.SnapshotData
# Record a snapshot
snapshot = %SnapshotData{
source_uuid: "account-12345",
source_version: 10,
source_type: "BankAccount",
data: %{balance: 5000},
metadata: %{}
}
:ok = MyApp.EventStore.record_snapshot(snapshot)
# Read a snapshot
{:ok, snapshot} = MyApp.EventStore.read_snapshot("account-12345")The adapter creates and manages the following MongoDB collections:
Stores all events with global ordering.
Fields:
event_id- UUID for the eventstream_uuid- Stream identifierstream_version- Version within the streamevent_number- Global event number (unique, sequential)event_type- Event type stringdata- Serialized event datametadata- Serialized event metadatacorrelation_id- Optional correlation IDcausation_id- Optional causation IDcreated_at- Timestamp
Indexes:
{stream_uuid: 1, stream_version: 1}- unique{event_number: 1}- unique{stream_uuid: 1}
Tracks stream metadata and current versions for concurrency control.
Fields:
stream_uuid- Stream identifier (unique)stream_version- Current versioncreated_at- Creation timestampupdated_at- Last update timestamp
Indexes:
{stream_uuid: 1}- unique
Stores aggregate snapshots.
Fields:
source_uuid- Aggregate identifiersource_version- Version at snapshotsource_type- Aggregate typedata- Serialized snapshot datametadata- Serialized snapshot metadatacreated_at- Timestamp
Indexes:
{source_uuid: 1}- unique
Tracks subscription progress (checkpoints).
Fields:
subscription_name- Subscription identifier (unique)last_seen_event_number- Last acknowledged event numberupdated_at- Last update timestamp
Indexes:
{subscription_name: 1}- unique
Events are stored in the events collection with both stream-local versioning (stream_version) and global ordering (event_number). This dual versioning enables:
- Optimistic concurrency control using stream versions
- Global event ordering for subscriptions and $all stream queries
- Efficient stream queries using compound indexes
Subscriptions are implemented using a polling mechanism:
- Each subscription runs in its own GenServer process
- Events are polled at regular intervals (default: 1 second)
- Checkpoints are stored in the
subscriptionscollection - Events are sent to subscribers as
{:events, [events]}messages
The adapter uses MongoDB's unique indexes to ensure:
- Stream versioning - Each event in a stream has a unique version
- Global ordering - Each event has a unique global event number
- Atomic appends - MongoDB's atomic operations ensure consistency
- Polling-based subscriptions: Unlike some event stores with push-based notifications, this adapter uses polling (configurable interval)
- No built-in clustering: For clustered deployments, ensure MongoDB is configured for replication
- Subscription replay: Large event stores may take time to catch up when starting a new subscription from
:origin
- Indexes: The adapter creates indexes automatically, but large collections may take time to index
- Pool size: Adjust
:pool_sizebased on your application's concurrency needs - Batch sizes: Event streaming uses batches of 1000 events by default
- Polling interval: The subscription polling interval is set to 1 second by default
mix deps.get
mix testStart MongoDB using Docker:
docker run -d -p 27017:27017 --name mongodb mongo:latest- Fork the repository
- Create your feature branch (
git checkout -b feature/my-feature) - Commit your changes (
git commit -am 'Add my feature') - Push to the branch (
git push origin feature/my-feature) - Create a new Pull Request
MIT License. See LICENSE for details.
- Commanded - CQRS/ES framework for Elixir
- MongoDB Elixir Driver - MongoDB driver for Elixir
- Inspired by the EventStore adapter architecture