Skip to content

Commit ec2bdc9

Browse files
committed
Reactive Batch Processing Example
1 parent 5fe5dec commit ec2bdc9

11 files changed

Lines changed: 198 additions & 125 deletions

File tree

build-all.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ build_gradle_module() {
3030

3131
chmod +x gradlew
3232

33+
build_gradle_module "reactive"
3334
build_gradle_module "junit/assumptions"
3435
build_gradle_module "logging"
3536
build_gradle_module "pact/pact-feign-consumer"

reactive/build.gradle

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ repositories {
1515
}
1616

1717
dependencies {
18-
compile 'io.reactivex.rxjava2:rxjava:2.2.17'
19-
testCompile 'org.junit.jupiter:junit-jupiter-engine:5.0.1'
20-
testCompile 'org.awaitility:awaitility:3.0.0'
18+
implementation 'io.reactivex.rxjava2:rxjava:2.2.17'
19+
testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.0.1'
20+
testImplementation 'org.awaitility:awaitility:3.0.0'
2121
}
2222

2323
test {
1.43 KB
Binary file not shown.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-6.0.1-bin.zip
34
zipStoreBase=GRADLE_USER_HOME
45
zipStorePath=wrapper/dists
5-
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.4-bin.zip

reactive/gradlew

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,12 @@
11
package io.reflectoring.reactive.batch;
22

3-
import io.reactivex.Single;
3+
public interface MessageHandler {
44

5-
import java.util.concurrent.atomic.AtomicInteger;
5+
enum Result {
6+
SUCCESS,
7+
FAILURE
8+
}
69

7-
public class MessageHandler {
10+
Result handleMessage(Message message);
811

9-
private final AtomicInteger processedMessages = new AtomicInteger();
10-
11-
private Logger logger = new Logger();
12-
13-
enum Result {
14-
SUCCESS,
15-
FAILURE
16-
}
17-
18-
public Result handleMessage(Message message) {
19-
logger.log(String.format("handling message %s", message));
20-
sleep(500);
21-
this.processedMessages.getAndAdd(1);
22-
return Result.SUCCESS;
23-
}
24-
25-
private void sleep(long millis) {
26-
try {
27-
Thread.sleep(millis);
28-
} catch (InterruptedException e) {
29-
throw new RuntimeException(e);
30-
}
31-
}
32-
33-
public Integer getProcessedMessages() {
34-
return processedMessages.get();
35-
}
3612
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.reflectoring.reactive.batch;
2+
3+
import io.reactivex.Flowable;
4+
5+
public interface MessageSource {
6+
7+
Flowable<MessageBatch> getMessageBatches();
8+
9+
}

reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java

Lines changed: 71 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -5,95 +5,82 @@
55
import io.reactivex.Single;
66
import io.reactivex.schedulers.Schedulers;
77
import io.reflectoring.reactive.batch.MessageHandler.Result;
8-
import org.reactivestreams.Subscriber;
9-
import org.reactivestreams.Subscription;
10-
11-
import java.util.ArrayList;
12-
import java.util.List;
8+
import java.util.concurrent.Executors;
139
import java.util.concurrent.LinkedBlockingDeque;
1410
import java.util.concurrent.ThreadPoolExecutor;
1511
import java.util.concurrent.TimeUnit;
12+
import org.reactivestreams.Subscriber;
13+
import org.reactivestreams.Subscription;
1614

1715
public class ReactiveBatchProcessor {
1816

19-
private final static Logger logger = new Logger();
20-
21-
private final int MESSAGE_BATCHES = 10;
22-
23-
private final int BATCH_SIZE = 3;
24-
25-
private final int THREADS = 4;
26-
27-
private final MessageHandler messageHandler = new MessageHandler();
28-
29-
public void start() {
30-
31-
Scheduler threadPoolScheduler = threadPoolScheduler(THREADS, 10);
32-
33-
retrieveMessageBatches()
34-
.doOnNext(batch -> logger.log(batch.toString()))
35-
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
36-
.flatMapSingle(message -> Single.defer(() -> Single.just(messageHandler.handleMessage(message)))
37-
.doOnSuccess(result -> logger.log("message handled"))
38-
.subscribeOn(threadPoolScheduler))
39-
.subscribeWith(subscriber());
40-
}
41-
42-
private Subscriber<MessageHandler.Result> subscriber() {
43-
return new Subscriber<>() {
44-
private Subscription subscription;
45-
46-
@Override
47-
public void onSubscribe(Subscription subscription) {
48-
this.subscription = subscription;
49-
subscription.request(THREADS);
50-
logger.log("subscribed");
51-
}
52-
53-
@Override
54-
public void onNext(Result message) {
55-
subscription.request(THREADS);
56-
}
57-
58-
@Override
59-
public void onError(Throwable t) {
60-
logger.log("error");
61-
}
62-
63-
@Override
64-
public void onComplete() {
65-
logger.log("completed");
66-
}
67-
};
68-
}
69-
70-
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
71-
return Schedulers.from(new ThreadPoolExecutor(
72-
poolSize,
73-
poolSize,
74-
0L,
75-
TimeUnit.SECONDS,
76-
new LinkedBlockingDeque<>(queueSize),
77-
new RetryRejectedExecutionHandler()
78-
));
79-
}
80-
81-
public boolean allMessagesProcessed() {
82-
return this.messageHandler.getProcessedMessages() == MESSAGE_BATCHES * BATCH_SIZE;
83-
}
84-
85-
private Flowable<MessageBatch> retrieveMessageBatches() {
86-
return Flowable.range(1, MESSAGE_BATCHES)
87-
.map(this::messageBatch);
88-
}
89-
90-
private MessageBatch messageBatch(int batchNumber) {
91-
List<Message> messages = new ArrayList<>();
92-
for (int i = 1; i <= BATCH_SIZE; i++) {
93-
messages.add(new Message(String.format("%d-%d", batchNumber, i)));
94-
}
95-
return new MessageBatch(messages);
96-
}
97-
17+
private final static Logger logger = new Logger();
18+
19+
private final int threads;
20+
21+
private final MessageHandler messageHandler;
22+
23+
private final MessageSource messageSource;
24+
25+
public ReactiveBatchProcessor(
26+
MessageSource messageSource,
27+
MessageHandler messageHandler,
28+
int threads) {
29+
this.messageSource = messageSource;
30+
this.threads = threads;
31+
this.messageHandler = messageHandler;
32+
}
33+
34+
public void start() {
35+
36+
Scheduler scheduler = threadPoolScheduler(threads, 10);
37+
38+
messageSource.getMessageBatches()
39+
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
40+
.doOnNext(batch -> logger.log(batch.toString()))
41+
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
42+
.flatMapSingle(m -> Single.defer(() -> Single.just(m)
43+
.map(messageHandler::handleMessage))
44+
.subscribeOn(scheduler))
45+
.subscribeWith(subscriber());
46+
}
47+
48+
private Subscriber<MessageHandler.Result> subscriber() {
49+
return new Subscriber<>() {
50+
private Subscription subscription;
51+
52+
@Override
53+
public void onSubscribe(Subscription subscription) {
54+
this.subscription = subscription;
55+
subscription.request(threads);
56+
logger.log("subscribed");
57+
}
58+
59+
@Override
60+
public void onNext(Result message) {
61+
subscription.request(threads);
62+
}
63+
64+
@Override
65+
public void onError(Throwable t) {
66+
logger.log("error");
67+
}
68+
69+
@Override
70+
public void onComplete() {
71+
logger.log("completed");
72+
}
73+
};
74+
}
75+
76+
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
77+
return Schedulers.from(new ThreadPoolExecutor(
78+
poolSize,
79+
poolSize,
80+
0L,
81+
TimeUnit.SECONDS,
82+
new LinkedBlockingDeque<>(queueSize)
83+
));
84+
}
9885

9986
}
Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,39 @@
11
package io.reflectoring;
22

33
import static org.awaitility.Awaitility.await;
4-
import static org.junit.jupiter.api.Assertions.assertTrue;
4+
import static org.junit.jupiter.api.Assertions.assertEquals;
55

6+
import io.reflectoring.reactive.batch.MessageSource;
67
import io.reflectoring.reactive.batch.ReactiveBatchProcessor;
78
import java.util.concurrent.TimeUnit;
89
import org.junit.jupiter.api.Test;
910

1011
public class ReactiveBatchProcessorTest {
1112

1213
@Test
13-
public void test() {
14-
ReactiveBatchProcessor processor = new ReactiveBatchProcessor();
14+
public void allMessagesAreProcessedOnMultipleThreads() {
15+
16+
int batches = 3;
17+
int batchSize = 4;
18+
int threads = 5;
19+
20+
MessageSource messageSource = new TestMessageSource(batches, batchSize);
21+
TestMessageHandler messageHandler = new TestMessageHandler();
22+
23+
ReactiveBatchProcessor processor = new ReactiveBatchProcessor(
24+
messageSource,
25+
messageHandler,
26+
threads
27+
);
1528

1629
processor.start();
1730

1831
await()
1932
.atMost(10, TimeUnit.SECONDS)
2033
.pollInterval(1, TimeUnit.SECONDS)
21-
.untilAsserted(() -> assertTrue(processor.allMessagesProcessed()));
34+
.untilAsserted(() -> assertEquals(batches * batchSize, messageHandler.getProcessedMessages()));
2235

36+
assertEquals(threads, messageHandler.threadNames().size());
2337
}
2438

2539
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package io.reflectoring;
2+
3+
import io.reflectoring.reactive.batch.Logger;
4+
import io.reflectoring.reactive.batch.Message;
5+
import io.reflectoring.reactive.batch.MessageHandler;
6+
import java.util.HashSet;
7+
import java.util.Set;
8+
import java.util.concurrent.atomic.AtomicInteger;
9+
import java.util.concurrent.atomic.AtomicReference;
10+
11+
public class TestMessageHandler implements MessageHandler {
12+
13+
private final AtomicInteger processedMessages = new AtomicInteger();
14+
15+
private final AtomicReference<Set<String>> threadNames = new AtomicReference<>(new HashSet<>());
16+
17+
private Logger logger = new Logger();
18+
19+
@Override
20+
public Result handleMessage(Message message) {
21+
sleep(500);
22+
logger.log(String.format("processed message %s", message));
23+
threadNames.get().add(Thread.currentThread().getName());
24+
processedMessages.addAndGet(1);
25+
return Result.SUCCESS;
26+
}
27+
28+
private void sleep(long millis) {
29+
try {
30+
Thread.sleep(millis);
31+
} catch (InterruptedException e) {
32+
throw new RuntimeException(e);
33+
}
34+
}
35+
36+
public int getProcessedMessages() {
37+
return processedMessages.get();
38+
}
39+
40+
public Set<String> threadNames() {
41+
return threadNames.get();
42+
}
43+
}

0 commit comments

Comments
 (0)