Skip to content

Commit b6d167f

Browse files
committed
finished reactive example
1 parent ec2bdc9 commit b6d167f

7 files changed

Lines changed: 228 additions & 41 deletions

File tree

reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,36 +4,37 @@
44
import io.reactivex.Scheduler;
55
import io.reactivex.Single;
66
import io.reactivex.schedulers.Schedulers;
7-
import io.reflectoring.reactive.batch.MessageHandler.Result;
87
import java.util.concurrent.Executors;
98
import java.util.concurrent.LinkedBlockingDeque;
109
import java.util.concurrent.ThreadPoolExecutor;
1110
import java.util.concurrent.TimeUnit;
12-
import org.reactivestreams.Subscriber;
13-
import org.reactivestreams.Subscription;
1411

1512
public class ReactiveBatchProcessor {
1613

1714
private final static Logger logger = new Logger();
1815

1916
private final int threads;
2017

18+
private final int threadPoolQueueSize;
19+
2120
private final MessageHandler messageHandler;
2221

2322
private final MessageSource messageSource;
2423

2524
public ReactiveBatchProcessor(
2625
MessageSource messageSource,
2726
MessageHandler messageHandler,
28-
int threads) {
27+
int threads,
28+
int threadPoolQueueSize) {
2929
this.messageSource = messageSource;
3030
this.threads = threads;
3131
this.messageHandler = messageHandler;
32+
this.threadPoolQueueSize = threadPoolQueueSize;
3233
}
3334

3435
public void start() {
3536

36-
Scheduler scheduler = threadPoolScheduler(threads, 10);
37+
Scheduler scheduler = threadPoolScheduler(threads, threadPoolQueueSize);
3738

3839
messageSource.getMessageBatches()
3940
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
@@ -42,35 +43,7 @@ public void start() {
4243
.flatMapSingle(m -> Single.defer(() -> Single.just(m)
4344
.map(messageHandler::handleMessage))
4445
.subscribeOn(scheduler))
45-
.subscribeWith(subscriber());
46-
}
47-
48-
private Subscriber<MessageHandler.Result> subscriber() {
49-
return new Subscriber<>() {
50-
private Subscription subscription;
51-
52-
@Override
53-
public void onSubscribe(Subscription subscription) {
54-
this.subscription = subscription;
55-
subscription.request(threads);
56-
logger.log("subscribed");
57-
}
58-
59-
@Override
60-
public void onNext(Result message) {
61-
subscription.request(threads);
62-
}
63-
64-
@Override
65-
public void onError(Throwable t) {
66-
logger.log("error");
67-
}
68-
69-
@Override
70-
public void onComplete() {
71-
logger.log("completed");
72-
}
73-
};
46+
.subscribeWith(new SimpleSubscriber<>(threads, 1));
7447
}
7548

7649
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
@@ -79,7 +52,8 @@ private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
7952
poolSize,
8053
0L,
8154
TimeUnit.SECONDS,
82-
new LinkedBlockingDeque<>(queueSize)
55+
new LinkedBlockingDeque<>(queueSize),
56+
new WaitForCapacityPolicy()
8357
));
8458
}
8559

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package io.reflectoring.reactive.batch;
2+
3+
import io.reactivex.Flowable;
4+
import io.reactivex.Scheduler;
5+
import io.reactivex.Single;
6+
import io.reactivex.schedulers.Schedulers;
7+
import java.util.concurrent.Executors;
8+
import java.util.concurrent.LinkedBlockingDeque;
9+
import java.util.concurrent.ThreadPoolExecutor;
10+
import java.util.concurrent.TimeUnit;
11+
12+
public class ReactiveBatchProcessorV1 {
13+
14+
private final static Logger logger = new Logger();
15+
16+
private final int threads;
17+
18+
private final int threadPoolQueueSize;
19+
20+
private final MessageHandler messageHandler;
21+
22+
private final MessageSource messageSource;
23+
24+
public ReactiveBatchProcessorV1(
25+
MessageSource messageSource,
26+
MessageHandler messageHandler,
27+
int threads,
28+
int threadPoolQueueSize) {
29+
this.messageSource = messageSource;
30+
this.threads = threads;
31+
this.messageHandler = messageHandler;
32+
this.threadPoolQueueSize = threadPoolQueueSize;
33+
}
34+
35+
public void start() {
36+
messageSource.getMessageBatches()
37+
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
38+
.doOnNext(batch -> logger.log(batch.toString()))
39+
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
40+
.flatMapSingle(m -> Single.just(messageHandler.handleMessage(m))
41+
.subscribeOn(threadPoolScheduler(threads, threadPoolQueueSize)))
42+
.subscribeWith(new SimpleSubscriber<>(threads, 1));
43+
}
44+
45+
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
46+
return Schedulers.from(new ThreadPoolExecutor(
47+
poolSize,
48+
poolSize,
49+
0L,
50+
TimeUnit.SECONDS,
51+
new LinkedBlockingDeque<>(queueSize)
52+
));
53+
}
54+
55+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package io.reflectoring.reactive.batch;
2+
3+
import io.reactivex.Flowable;
4+
import io.reactivex.Scheduler;
5+
import io.reactivex.Single;
6+
import io.reactivex.schedulers.Schedulers;
7+
import java.util.concurrent.Executors;
8+
import java.util.concurrent.LinkedBlockingDeque;
9+
import java.util.concurrent.ThreadPoolExecutor;
10+
import java.util.concurrent.TimeUnit;
11+
12+
public class ReactiveBatchProcessorV2 {
13+
14+
private final static Logger logger = new Logger();
15+
16+
private final int threads;
17+
18+
private final int threadPoolQueueSize;
19+
20+
private final MessageHandler messageHandler;
21+
22+
private final MessageSource messageSource;
23+
24+
public ReactiveBatchProcessorV2(
25+
MessageSource messageSource,
26+
MessageHandler messageHandler,
27+
int threads,
28+
int threadPoolQueueSize) {
29+
this.messageSource = messageSource;
30+
this.threads = threads;
31+
this.messageHandler = messageHandler;
32+
this.threadPoolQueueSize = threadPoolQueueSize;
33+
}
34+
35+
public void start() {
36+
messageSource.getMessageBatches()
37+
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
38+
.doOnNext(batch -> logger.log(batch.toString()))
39+
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
40+
.flatMapSingle(m -> Single.defer(() -> Single.just(messageHandler.handleMessage(m)))
41+
.subscribeOn(threadPoolScheduler(threads, threadPoolQueueSize)))
42+
.subscribeWith(new SimpleSubscriber<>(threads, 1));
43+
}
44+
45+
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
46+
return Schedulers.from(new ThreadPoolExecutor(
47+
poolSize,
48+
poolSize,
49+
0L,
50+
TimeUnit.SECONDS,
51+
new LinkedBlockingDeque<>(queueSize)
52+
));
53+
}
54+
55+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package io.reflectoring.reactive.batch;
2+
3+
import io.reactivex.Flowable;
4+
import io.reactivex.Scheduler;
5+
import io.reactivex.Single;
6+
import io.reactivex.schedulers.Schedulers;
7+
import java.util.concurrent.Executors;
8+
import java.util.concurrent.LinkedBlockingDeque;
9+
import java.util.concurrent.ThreadPoolExecutor;
10+
import java.util.concurrent.TimeUnit;
11+
12+
public class ReactiveBatchProcessorV3 {
13+
14+
private final static Logger logger = new Logger();
15+
16+
private final int threads;
17+
18+
private final int threadPoolQueueSize;
19+
20+
private final MessageHandler messageHandler;
21+
22+
private final MessageSource messageSource;
23+
24+
public ReactiveBatchProcessorV3(
25+
MessageSource messageSource,
26+
MessageHandler messageHandler,
27+
int threads,
28+
int threadPoolQueueSize) {
29+
this.messageSource = messageSource;
30+
this.threads = threads;
31+
this.messageHandler = messageHandler;
32+
this.threadPoolQueueSize = threadPoolQueueSize;
33+
}
34+
35+
public void start() {
36+
Scheduler scheduler = threadPoolScheduler(threads, threadPoolQueueSize);
37+
38+
messageSource.getMessageBatches()
39+
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
40+
.doOnNext(batch -> logger.log(batch.toString()))
41+
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
42+
.flatMapSingle(m -> Single.defer(() -> Single.just(messageHandler.handleMessage(m)))
43+
.subscribeOn(scheduler))
44+
.subscribeWith(new SimpleSubscriber<>(threads, 1));
45+
}
46+
47+
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
48+
return Schedulers.from(new ThreadPoolExecutor(
49+
poolSize,
50+
poolSize,
51+
0L,
52+
TimeUnit.SECONDS,
53+
new LinkedBlockingDeque<>(queueSize)
54+
));
55+
}
56+
57+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.reflectoring.reactive.batch;
2+
3+
import org.reactivestreams.Subscriber;
4+
import org.reactivestreams.Subscription;
5+
6+
public class SimpleSubscriber<T> implements Subscriber<T> {
7+
8+
private final Logger logger = new Logger();
9+
10+
private final int initialFetchCount;
11+
private final int onNextFetchCount;
12+
13+
private Subscription subscription;
14+
15+
public SimpleSubscriber(int initialFetchCount, int onNextFetchCount) {
16+
this.initialFetchCount = initialFetchCount;
17+
this.onNextFetchCount = onNextFetchCount;
18+
}
19+
20+
@Override
21+
public void onSubscribe(Subscription subscription) {
22+
this.subscription = subscription;
23+
subscription.request(initialFetchCount);
24+
logger.log("subscribed");
25+
}
26+
27+
@Override
28+
public void onNext(T message) {
29+
subscription.request(onNextFetchCount);
30+
}
31+
32+
@Override
33+
public void onError(Throwable t) {
34+
logger.log("error");
35+
}
36+
37+
@Override
38+
public void onComplete() {
39+
logger.log("completed");
40+
}
41+
42+
}

reactive/src/main/java/io/reflectoring/reactive/batch/RetryRejectedExecutionHandler.java renamed to reactive/src/main/java/io/reflectoring/reactive/batch/WaitForCapacityPolicy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import java.util.concurrent.RejectedExecutionHandler;
55
import java.util.concurrent.ThreadPoolExecutor;
66

7-
public class RetryRejectedExecutionHandler implements RejectedExecutionHandler {
7+
public class WaitForCapacityPolicy implements RejectedExecutionHandler {
88

99
@Override
1010
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {

reactive/src/test/java/io/reflectoring/ReactiveBatchProcessorTest.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55

66
import io.reflectoring.reactive.batch.MessageSource;
77
import io.reflectoring.reactive.batch.ReactiveBatchProcessor;
8+
import io.reflectoring.reactive.batch.ReactiveBatchProcessorV1;
9+
import io.reflectoring.reactive.batch.ReactiveBatchProcessorV2;
10+
import io.reflectoring.reactive.batch.ReactiveBatchProcessorV3;
811
import java.util.concurrent.TimeUnit;
912
import org.junit.jupiter.api.Test;
1013

@@ -13,18 +16,19 @@ public class ReactiveBatchProcessorTest {
1316
@Test
1417
public void allMessagesAreProcessedOnMultipleThreads() {
1518

16-
int batches = 3;
17-
int batchSize = 4;
18-
int threads = 5;
19+
int batches = 10;
20+
int batchSize = 3;
21+
int threads = 2;
22+
int threadPoolQueueSize = 10;
1923

2024
MessageSource messageSource = new TestMessageSource(batches, batchSize);
2125
TestMessageHandler messageHandler = new TestMessageHandler();
2226

2327
ReactiveBatchProcessor processor = new ReactiveBatchProcessor(
2428
messageSource,
2529
messageHandler,
26-
threads
27-
);
30+
threads,
31+
threadPoolQueueSize);
2832

2933
processor.start();
3034

0 commit comments

Comments
 (0)