Skip to content

Commit 34211bf

Browse files
author
Mukul Sharma
committed
Reflect 76 - Resolved merge conflicts
2 parents 8ca0e6e + 6e0af82 commit 34211bf

49 files changed

Lines changed: 811 additions & 137 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

aws/aws-hello-world/Dockerfile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
FROM openjdk:8-jdk-alpine
22
ARG JAR_FILE=build/libs/*.jar
33
COPY ${JAR_FILE} app.jar
4-
ENTRYPOINT ["java","-jar","/app.jar"]
4+
ENTRYPOINT ["java","-jar","/app.jar"]
5+
EXPOSE 8080

build-all.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ build_gradle_module() {
3131
chmod +x gradlew
3232

3333
build_gradle_module "solid"
34+
build_gradle_module "spring-boot/data-migration/flyway"
3435
build_gradle_module "reactive"
3536
build_gradle_module "junit/assumptions"
3637
build_gradle_module "logging"

reactive/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ repositories {
1515
}
1616

1717
dependencies {
18-
implementation 'io.reactivex.rxjava2:rxjava:2.2.17'
18+
implementation 'io.reactivex.rxjava3:rxjava:3.0.0-RC9'
1919
testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.0.1'
2020
testImplementation 'org.awaitility:awaitility:3.0.0'
2121
}

reactive/src/main/java/io/reflectoring/reactive/batch/Logger.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package io.reflectoring.reactive.batch;
22

3-
public class Logger {
3+
class Logger {
44

5-
public void log(String string) {
5+
void log(String string) {
66
System.out.println(String.format("%s %s: %s", System.currentTimeMillis(), Thread.currentThread().getName(), string));
77
}
88

reactive/src/main/java/io/reflectoring/reactive/batch/Message.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package io.reflectoring.reactive.batch;
22

3-
public class Message {
3+
class Message {
44

55
private final String content;
66

7-
public Message(String content) {
7+
Message(String content) {
88
this.content = content;
99
}
1010

reactive/src/main/java/io/reflectoring/reactive/batch/MessageBatch.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
import java.util.Collections;
44
import java.util.List;
55

6-
public class MessageBatch {
6+
class MessageBatch {
77

88
private final List<Message> messages;
99

10-
public MessageBatch(List<Message> messages) {
10+
MessageBatch(List<Message> messages) {
1111
this.messages = messages;
1212
}
1313

reactive/src/main/java/io/reflectoring/reactive/batch/MessageHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.reflectoring.reactive.batch;
22

3-
public interface MessageHandler {
3+
interface MessageHandler {
44

55
enum Result {
66
SUCCESS,

reactive/src/main/java/io/reflectoring/reactive/batch/MessageSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package io.reflectoring.reactive.batch;
22

3-
import io.reactivex.Flowable;
3+
import io.reactivex.rxjava3.core.Flowable;
44

5-
public interface MessageSource {
5+
interface MessageSource {
66

77
Flowable<MessageBatch> getMessageBatches();
88

reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java

Lines changed: 15 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,40 @@
11
package io.reflectoring.reactive.batch;
22

3-
import io.reactivex.Flowable;
4-
import io.reactivex.Scheduler;
5-
import io.reactivex.Single;
6-
import io.reactivex.schedulers.Schedulers;
7-
import io.reflectoring.reactive.batch.MessageHandler.Result;
3+
import io.reactivex.rxjava3.core.Flowable;
4+
import io.reactivex.rxjava3.core.Scheduler;
5+
import io.reactivex.rxjava3.core.Single;
6+
import io.reactivex.rxjava3.schedulers.Schedulers;
87
import java.util.concurrent.Executors;
98
import java.util.concurrent.LinkedBlockingDeque;
109
import java.util.concurrent.ThreadPoolExecutor;
1110
import java.util.concurrent.TimeUnit;
12-
import org.reactivestreams.Subscriber;
13-
import org.reactivestreams.Subscription;
1411

1512
public class ReactiveBatchProcessor {
1613

1714
private final static Logger logger = new Logger();
1815

1916
private final int threads;
2017

18+
private final int threadPoolQueueSize;
19+
2120
private final MessageHandler messageHandler;
2221

2322
private final MessageSource messageSource;
2423

25-
public ReactiveBatchProcessor(
24+
ReactiveBatchProcessor(
2625
MessageSource messageSource,
2726
MessageHandler messageHandler,
28-
int threads) {
27+
int threads,
28+
int threadPoolQueueSize) {
2929
this.messageSource = messageSource;
3030
this.threads = threads;
3131
this.messageHandler = messageHandler;
32+
this.threadPoolQueueSize = threadPoolQueueSize;
3233
}
3334

34-
public void start() {
35+
void start() {
3536

36-
Scheduler scheduler = threadPoolScheduler(threads, 10);
37+
Scheduler scheduler = threadPoolScheduler(threads, threadPoolQueueSize);
3738

3839
messageSource.getMessageBatches()
3940
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
@@ -42,35 +43,7 @@ public void start() {
4243
.flatMapSingle(m -> Single.defer(() -> Single.just(m)
4344
.map(messageHandler::handleMessage))
4445
.subscribeOn(scheduler))
45-
.subscribeWith(subscriber());
46-
}
47-
48-
private Subscriber<MessageHandler.Result> subscriber() {
49-
return new Subscriber<>() {
50-
private Subscription subscription;
51-
52-
@Override
53-
public void onSubscribe(Subscription subscription) {
54-
this.subscription = subscription;
55-
subscription.request(threads);
56-
logger.log("subscribed");
57-
}
58-
59-
@Override
60-
public void onNext(Result message) {
61-
subscription.request(threads);
62-
}
63-
64-
@Override
65-
public void onError(Throwable t) {
66-
logger.log("error");
67-
}
68-
69-
@Override
70-
public void onComplete() {
71-
logger.log("completed");
72-
}
73-
};
46+
.subscribeWith(new SimpleSubscriber<>(threads, 1));
7447
}
7548

7649
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
@@ -79,7 +52,8 @@ private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
7952
poolSize,
8053
0L,
8154
TimeUnit.SECONDS,
82-
new LinkedBlockingDeque<>(queueSize)
55+
new LinkedBlockingDeque<>(queueSize),
56+
new WaitForCapacityPolicy()
8357
));
8458
}
8559

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package io.reflectoring.reactive.batch;
2+
3+
import io.reactivex.rxjava3.core.Flowable;
4+
import io.reactivex.rxjava3.core.Scheduler;
5+
import io.reactivex.rxjava3.core.Single;
6+
import io.reactivex.rxjava3.schedulers.Schedulers;
7+
import java.util.concurrent.Executors;
8+
import java.util.concurrent.LinkedBlockingDeque;
9+
import java.util.concurrent.ThreadPoolExecutor;
10+
import java.util.concurrent.TimeUnit;
11+
12+
public class ReactiveBatchProcessorV1 {
13+
14+
private final static Logger logger = new Logger();
15+
16+
private final int threads;
17+
18+
private final int threadPoolQueueSize;
19+
20+
private final MessageHandler messageHandler;
21+
22+
private final MessageSource messageSource;
23+
24+
public ReactiveBatchProcessorV1(
25+
MessageSource messageSource,
26+
MessageHandler messageHandler,
27+
int threads,
28+
int threadPoolQueueSize) {
29+
this.messageSource = messageSource;
30+
this.threads = threads;
31+
this.messageHandler = messageHandler;
32+
this.threadPoolQueueSize = threadPoolQueueSize;
33+
}
34+
35+
public void start() {
36+
// WARNING: this code doesn't work as expected
37+
messageSource.getMessageBatches()
38+
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
39+
.doOnNext(batch -> logger.log(batch.toString()))
40+
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
41+
.flatMapSingle(m -> Single.just(messageHandler.handleMessage(m))
42+
.subscribeOn(threadPoolScheduler(threads, threadPoolQueueSize)))
43+
.subscribeWith(new SimpleSubscriber<>(threads, 1));
44+
}
45+
46+
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
47+
return Schedulers.from(new ThreadPoolExecutor(
48+
poolSize,
49+
poolSize,
50+
0L,
51+
TimeUnit.SECONDS,
52+
new LinkedBlockingDeque<>(queueSize)
53+
));
54+
}
55+
56+
}

0 commit comments

Comments
 (0)