Skip to content

Commit 7869bd8

Browse files
committed
[WIP] Add class DynamoDBEventStore
DynamoDBEventStore class provides implementation for EventCollector interface and EventReader interface using Amazon DynamoDB.
1 parent 1547510 commit 7869bd8

6 files changed

Lines changed: 384 additions & 0 deletions

File tree

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ dependencies {
1414
compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.8.8'
1515
compile group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.8.8'
1616
compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.8.8'
17+
compile 'com.amazonaws:aws-java-sdk-dynamodb:1.11.529'
1718
testCompile group: 'junit', name: 'junit', version: '4.12'
1819
testCompile group: 'org.mockito', name: 'mockito-all', version: '1.10.19'
1920
testCompile 'org.assertj:assertj-core:3.11.1'

src/main/java/io/loom/eventsourcing/EventReader.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,9 @@
55

66
public interface EventReader {
77
Future<Iterable<Object>> queryEvents(UUID streamId, long fromVersion);
8+
9+
default Future<Iterable<Object>> queryEvents(UUID streamId) {
10+
long fromVersion = 0;
11+
return queryEvents(streamId, fromVersion);
12+
}
813
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package io.loom.eventsourcing.amazon;
2+
3+
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper;
4+
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBQueryExpression;
5+
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
6+
import com.fasterxml.jackson.core.JsonProcessingException;
7+
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import io.loom.eventsourcing.EventCollector;
9+
import io.loom.eventsourcing.EventReader;
10+
11+
import java.io.IOException;
12+
import java.util.ArrayList;
13+
import java.util.HashMap;
14+
import java.util.UUID;
15+
import java.util.concurrent.CompletableFuture;
16+
import java.util.concurrent.Future;
17+
18+
public class DynamoDBEventStore implements EventCollector, EventReader {
19+
private final DynamoDBMapper mapper;
20+
21+
public DynamoDBEventStore(DynamoDBMapper mapper) {
22+
this.mapper = mapper;
23+
}
24+
25+
@Override
26+
public Future<Void> collectEvents(
27+
UUID streamId, long firstVersion, Iterable<Object> events) {
28+
ArrayList<StreamEvent> streamEvents = createStreamEvents(streamId, firstVersion, events);
29+
saveStreamEvents(streamEvents);
30+
return CompletableFuture.completedFuture(null);
31+
}
32+
33+
private ArrayList<StreamEvent> createStreamEvents(UUID streamId, long firstVersion, Iterable<Object> events) {
34+
ArrayList<StreamEvent> streamEvents = new ArrayList<>();
35+
long version = firstVersion;
36+
for (Object event : events) {
37+
streamEvents.add(createStreamEvent(streamId, version++, event));
38+
}
39+
return streamEvents;
40+
}
41+
42+
private StreamEvent createStreamEvent(
43+
UUID streamId, long version, Object event) {
44+
StreamEvent streamEvent = new StreamEvent();
45+
46+
streamEvent.setStreamId(streamId);
47+
streamEvent.setVersion(version);
48+
streamEvent.setEventType(event.getClass().getName());
49+
streamEvent.setEventData(serializeEvent(event));
50+
51+
return streamEvent;
52+
}
53+
54+
private String serializeEvent(Object event) {
55+
try {
56+
ObjectMapper objectMapper = new ObjectMapper();
57+
return objectMapper.writeValueAsString(event);
58+
} catch (JsonProcessingException exception) {
59+
final String message = "Could not serialize the event object. Refer the cause for details.";
60+
throw new RuntimeException(message, exception);
61+
}
62+
}
63+
64+
private void saveStreamEvents(ArrayList<StreamEvent> streamEvents) {
65+
mapper.batchSave(streamEvents);
66+
}
67+
68+
@Override
69+
public Future<Iterable<Object>> queryEvents(UUID streamId, long fromVersion) {
70+
Iterable<StreamEvent> source = fetchStreamEvents(streamId, fromVersion);
71+
ArrayList<Object> events = deserializeEvents(source);
72+
return CompletableFuture.completedFuture(events);
73+
}
74+
75+
private Iterable<StreamEvent> fetchStreamEvents(
76+
UUID streamId, long fromVersion) {
77+
final String expression = "StreamId = :streamId and Version >= :fromVersion";
78+
79+
final HashMap<String, AttributeValue> values = new HashMap<>();
80+
values.put(":streamId", new AttributeValue().withS(streamId.toString()));
81+
values.put(":fromVersion", new AttributeValue().withN(Long.toString(fromVersion)));
82+
83+
final DynamoDBQueryExpression<StreamEvent> queryExpression =
84+
new DynamoDBQueryExpression<StreamEvent>()
85+
.withKeyConditionExpression(expression)
86+
.withExpressionAttributeValues(values);
87+
88+
return mapper.query(StreamEvent.class, queryExpression);
89+
}
90+
91+
private ArrayList<Object> deserializeEvents(Iterable<StreamEvent> source) {
92+
ArrayList<Object> events = new ArrayList<>();
93+
for (StreamEvent s : source) {
94+
events.add(deserializeEvent(s.getEventType(), s.getEventData()));
95+
}
96+
return events;
97+
}
98+
99+
private Object deserializeEvent(String eventType, String eventData) {
100+
try {
101+
ObjectMapper objectMapper = new ObjectMapper();
102+
return objectMapper.readValue(eventData, Class.forName(eventType));
103+
} catch (IOException | ClassNotFoundException exception) {
104+
final String message = "Could not deserialize an event object. Refer the cause for details.";
105+
throw new RuntimeException(message, exception);
106+
}
107+
}
108+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package io.loom.eventsourcing.amazon;
2+
3+
import com.amazonaws.services.dynamodbv2.datamodeling.*;
4+
5+
import java.util.UUID;
6+
7+
@DynamoDBTable(tableName = "StreamEvents")
8+
public class StreamEvent {
9+
private UUID streamId;
10+
private long version;
11+
private String eventType;
12+
private String eventData;
13+
private Long concurrencyVersion;
14+
15+
@DynamoDBHashKey(attributeName = "StreamId")
16+
public UUID getStreamId() {
17+
return streamId;
18+
}
19+
20+
public void setStreamId(UUID streamId) {
21+
this.streamId = streamId;
22+
}
23+
24+
@DynamoDBRangeKey(attributeName = "Version")
25+
public long getVersion() {
26+
return version;
27+
}
28+
29+
public void setVersion(long version) {
30+
this.version = version;
31+
}
32+
33+
@DynamoDBAttribute(attributeName = "EventType")
34+
public String getEventType() {
35+
return eventType;
36+
}
37+
38+
public void setEventType(String eventType) {
39+
this.eventType = eventType;
40+
}
41+
42+
@DynamoDBAttribute(attributeName = "EventData")
43+
public String getEventData() {
44+
return eventData;
45+
}
46+
47+
public void setEventData(String eventData) {
48+
this.eventData = eventData;
49+
}
50+
51+
@DynamoDBVersionAttribute(attributeName = "ConcurrencyVersion")
52+
public Long getConcurrencyVersion() {
53+
return concurrencyVersion;
54+
}
55+
56+
public void setConcurrencyVersion(Long concurrencyVersion) {
57+
this.concurrencyVersion = concurrencyVersion;
58+
}
59+
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package io.loom.eventsourcing.amazon;
2+
3+
import com.fasterxml.jackson.annotation.JsonCreator;
4+
import com.fasterxml.jackson.annotation.JsonProperty;
5+
import org.assertj.core.api.ThrowableAssert;
6+
import org.junit.Test;
7+
8+
import java.util.List;
9+
import java.util.Random;
10+
import java.util.UUID;
11+
import java.util.concurrent.ExecutionException;
12+
13+
import static io.loom.eventsourcing.amazon.LocalDynamoDB.getMapper;
14+
import static java.util.Arrays.asList;
15+
import static java.util.Collections.singletonList;
16+
import static java.util.UUID.randomUUID;
17+
import static java.util.stream.Collectors.toList;
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
21+
public class DynamoDBEventStore_specs {
22+
private static final Random random = new Random();
23+
24+
public static class Event1 {
25+
private final int value;
26+
27+
@JsonCreator
28+
Event1(@JsonProperty("value") int value) {
29+
this.value = value;
30+
}
31+
32+
public int getValue() {
33+
return value;
34+
}
35+
}
36+
37+
public static class Event2 {
38+
private final double value;
39+
40+
@JsonCreator
41+
Event2(@JsonProperty("value") double value) {
42+
this.value = value;
43+
}
44+
45+
public double getValue() {
46+
return value;
47+
}
48+
}
49+
50+
public static class Event3 {
51+
private final String value;
52+
53+
@JsonCreator
54+
Event3(@JsonProperty("value") String value) {
55+
this.value = value;
56+
}
57+
58+
public String getValue() {
59+
return value;
60+
}
61+
}
62+
63+
@Test
64+
public void queryEvents_restores_events_correctly()
65+
throws InterruptedException, ExecutionException {
66+
// Arrange
67+
final DynamoDBEventStore sut = new DynamoDBEventStore(getMapper());
68+
69+
UUID streamId = randomUUID();
70+
long firstVersion = 1;
71+
final List<Object> events = asList(
72+
new Event3(randomUUID().toString()),
73+
new Event1(random.nextInt()),
74+
new Event2(random.nextDouble()));
75+
76+
sut.collectEvents(streamId, firstVersion, events).get();
77+
78+
// Act
79+
final Iterable<Object> actual = sut.queryEvents(streamId).get();
80+
81+
// Assert
82+
assertThat(actual)
83+
.usingFieldByFieldElementComparator()
84+
.containsExactlyElementsOf(events);
85+
}
86+
87+
@Test
88+
public void queryEvents_filters_events_by_stream_id()
89+
throws ExecutionException, InterruptedException {
90+
// Arrange
91+
final DynamoDBEventStore sut = new DynamoDBEventStore(getMapper());
92+
93+
UUID streamId = randomUUID();
94+
long firstVersion = 1;
95+
final List<Object> events = asList(
96+
new Event3(randomUUID().toString()),
97+
new Event1(random.nextInt()),
98+
new Event2(random.nextDouble()));
99+
100+
sut.collectEvents(streamId, firstVersion, events).get();
101+
102+
sut.collectEvents(randomUUID(), firstVersion, events).get();
103+
104+
// Act
105+
final Iterable<Object> actual = sut.queryEvents(streamId).get();
106+
107+
// Assert
108+
assertThat(actual)
109+
.usingFieldByFieldElementComparator()
110+
.containsExactlyElementsOf(events);
111+
}
112+
113+
@Test
114+
public void queryEvents_filters_events_by_version()
115+
throws ExecutionException, InterruptedException {
116+
// Arrange
117+
final DynamoDBEventStore sut = new DynamoDBEventStore(getMapper());
118+
119+
UUID streamId = randomUUID();
120+
long firstVersion = 1;
121+
final List<Object> events = asList(
122+
new Event3(randomUUID().toString()),
123+
new Event1(random.nextInt()),
124+
new Event2(random.nextDouble()));
125+
126+
sut.collectEvents(streamId, firstVersion, events).get();
127+
128+
long fromVersion = 2;
129+
130+
// Act
131+
final Iterable<Object> actual = sut.queryEvents(streamId, fromVersion).get();
132+
133+
// Assert
134+
assertThat(actual)
135+
.usingFieldByFieldElementComparator()
136+
.containsExactlyElementsOf(events.stream().skip(1).collect(toList()));
137+
}
138+
139+
@Test
140+
public void collectEvents_controls_concurrency()
141+
throws ExecutionException, InterruptedException {
142+
// Arrange
143+
final DynamoDBEventStore sut = new DynamoDBEventStore(getMapper());
144+
145+
final UUID streamId = randomUUID();
146+
final long firstVersion = 1;
147+
final List<Object> events = singletonList(new Event1(random.nextInt()));
148+
149+
sut.collectEvents(streamId, firstVersion, events).get();
150+
151+
// Act
152+
final ThrowableAssert.ThrowingCallable action =
153+
() -> sut.collectEvents(streamId, firstVersion, events).get();
154+
155+
// Assert
156+
assertThatThrownBy(action);
157+
}
158+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package io.loom.eventsourcing.amazon;
2+
3+
import com.amazonaws.client.builder.AwsClientBuilder;
4+
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
5+
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
6+
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper;
7+
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapperConfig;
8+
import com.amazonaws.services.dynamodbv2.model.BillingMode;
9+
import com.amazonaws.services.dynamodbv2.model.DeleteTableRequest;
10+
11+
import static com.amazonaws.services.dynamodbv2.util.TableUtils.deleteTableIfExists;
12+
13+
abstract class LocalDynamoDB {
14+
private LocalDynamoDB() { }
15+
16+
private static final AmazonDynamoDB client = createClient();
17+
private static final DynamoDBMapper mapper = createMapper();
18+
19+
private static AmazonDynamoDB createClient() {
20+
return AmazonDynamoDBClientBuilder
21+
.standard()
22+
.withEndpointConfiguration(
23+
new AwsClientBuilder.EndpointConfiguration(
24+
"http://localhost:8000", "ap-northeast-2"))
25+
.build();
26+
}
27+
28+
private static DynamoDBMapper createMapper() {
29+
return new DynamoDBMapper(
30+
client, new DynamoDBMapperConfig.Builder().build());
31+
}
32+
33+
static {
34+
deleteStreamEventsTableIfExists();
35+
createStreamEventsTable();
36+
}
37+
38+
private static void deleteStreamEventsTableIfExists() {
39+
final DeleteTableRequest request =
40+
getMapper().generateDeleteTableRequest(StreamEvent.class);
41+
deleteTableIfExists(client, request);
42+
}
43+
44+
private static void createStreamEventsTable() {
45+
client.createTable(getMapper()
46+
.generateCreateTableRequest(StreamEvent.class)
47+
.withBillingMode(BillingMode.PAY_PER_REQUEST));
48+
}
49+
50+
static DynamoDBMapper getMapper() {
51+
return mapper;
52+
}
53+
}

0 commit comments

Comments
 (0)