-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathsave_events_into_outbox.py
More file actions
194 lines (152 loc) · 6.1 KB
/
save_events_into_outbox.py
File metadata and controls
194 lines (152 loc) · 6.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
"""
Example: Saving Events to Outbox Pattern
This example demonstrates how to save events to an outbox repository within
command handlers, implementing the Outbox pattern for reliable event publishing.
The system shows how to ensure transactional consistency when publishing events.
Use case: Guaranteeing event persistence in the same transaction as business logic.
Events are saved to an outbox repository within the command handler transaction.
If the command succeeds, events are persisted and can be published later by a
separate process. This prevents event loss and ensures eventual consistency.
================================================================================
HOW TO RUN THIS EXAMPLE
================================================================================
Run the example:
python examples/save_events_into_outbox.py
The example will:
- Execute multiple JoinMeetingCommand commands
- Save events to outbox repository within each command handler
- Commit transactions after saving events
- Query events from outbox by topic
- Display the number of events stored in the outbox
Note: This example uses MockOutboxedEventRepository for demonstration. In production,
you would use a real database-backed outbox repository (e.g., SQLAlchemyOutboxedEventRepository).
================================================================================
WHAT THIS EXAMPLE DEMONSTRATES
================================================================================
1. Outbox Event Registration:
- Register event types in OutboxedEventMap
- Map event names to event types for deserialization
- Support multiple event types in the same system
2. Dependency Injection:
- Inject OutboxedEventRepository into command handlers
- Configure DI container to provide repository instances
- Use request scope for repository instances
3. Event Persistence:
- Save events to outbox repository within command handlers
- Events are persisted in the same transaction as business logic
- Commit transactions after successful event saving
4. Event Querying:
- Query events from outbox by topic
- Retrieve events for publishing by separate process
- Support filtering and batch retrieval
================================================================================
REQUIREMENTS
================================================================================
Make sure you have installed:
- cqrs (this package)
- di (dependency injection)
- pydantic (for typed payloads)
================================================================================
"""
import asyncio
import functools
import typing
import uuid
from collections import defaultdict
import di
import pydantic
from di import dependent
import cqrs
from cqrs.outbox import mock
from cqrs.requests import bootstrap
OUTBOX_STORAGE = defaultdict[
uuid.UUID,
typing.List[cqrs.NotificationEvent],
](lambda: [])
mock_repository_factory = functools.partial(
mock.MockOutboxedEventRepository,
session_factory=functools.partial(lambda: OUTBOX_STORAGE),
)
class UserJoinedNotificationPayload(pydantic.BaseModel, frozen=True):
user_id: str
meeting_id: str
class UserJoinedECSTPayload(pydantic.BaseModel, frozen=True):
user_id: str
meeting_id: str
cqrs.OutboxedEventMap.register(
"user_joined_notification",
cqrs.NotificationEvent[UserJoinedNotificationPayload],
)
cqrs.OutboxedEventMap.register(
"user_joined_ecst",
cqrs.NotificationEvent[UserJoinedECSTPayload],
)
class JoinMeetingCommand(cqrs.Request):
user_id: str
meeting_id: str
class JoinMeetingCommandHandler(cqrs.RequestHandler[JoinMeetingCommand, None]):
def __init__(self, outbox: cqrs.OutboxedEventRepository):
self.outbox = outbox
@property
def events(self):
return []
async def handle(self, request: JoinMeetingCommand) -> None:
print(f"User {request.user_id} joined meeting {request.meeting_id}")
self.outbox.add(
cqrs.NotificationEvent[UserJoinedNotificationPayload](
event_name="user_joined_notification",
topic="user_notification_events",
payload=UserJoinedNotificationPayload(
user_id=request.user_id,
meeting_id=request.meeting_id,
),
),
)
self.outbox.add(
cqrs.NotificationEvent[UserJoinedECSTPayload](
event_name="user_joined_ecst",
topic="user_ecst_events",
payload=UserJoinedECSTPayload(
user_id=request.user_id,
meeting_id=request.meeting_id,
),
),
)
await self.outbox.commit()
def command_mapper(mapper: cqrs.RequestMap) -> None:
mapper.bind(JoinMeetingCommand, JoinMeetingCommandHandler)
def setup_di() -> di.Container:
"""
Initialize DI container
"""
container = di.Container()
bind = di.bind_by_type(
dependent.Dependent(mock_repository_factory, scope="request"),
cqrs.OutboxedEventRepository,
)
container.bind(bind)
return container
async def main():
mediator = bootstrap.bootstrap(
di_container=setup_di(),
commands_mapper=command_mapper,
)
repository = mock_repository_factory()
await mediator.send(JoinMeetingCommand(user_id="1", meeting_id="1"))
await mediator.send(JoinMeetingCommand(user_id="2", meeting_id="1"))
await mediator.send(JoinMeetingCommand(user_id="3", meeting_id="1"))
await mediator.send(JoinMeetingCommand(user_id="4", meeting_id="1"))
notification_events = await repository.get_many(
topic="user_notification_events",
)
ecst_events = await repository.get_many(
topic="user_ecst_events",
)
assert len(OUTBOX_STORAGE) == 8
assert len(notification_events) == 4
assert len(ecst_events) == 4
print("There are {} users in the room".format(len(OUTBOX_STORAGE)))
print(f"There are {len(notification_events)} notification events in the outbox")
print(f"There are {len(ecst_events)} ecst events in the outbox")
if __name__ == "__main__":
asyncio.run(main())