11package io .reflectoring .reactive .batch ;
22
3- import io .reactivex .Flowable ;
4- import io .reactivex .Scheduler ;
5- import io .reactivex .Single ;
6- import io .reactivex .schedulers .Schedulers ;
7- import io .reflectoring .reactive .batch .MessageHandler .Result ;
3+ import io .reactivex .rxjava3 .core .Flowable ;
4+ import io .reactivex .rxjava3 .core .Scheduler ;
5+ import io .reactivex .rxjava3 .core .Single ;
6+ import io .reactivex .rxjava3 .schedulers .Schedulers ;
87import java .util .concurrent .Executors ;
98import java .util .concurrent .LinkedBlockingDeque ;
109import java .util .concurrent .ThreadPoolExecutor ;
1110import java .util .concurrent .TimeUnit ;
12- import org .reactivestreams .Subscriber ;
13- import org .reactivestreams .Subscription ;
1411
1512public class ReactiveBatchProcessor {
1613
1714 private final static Logger logger = new Logger ();
1815
1916 private final int threads ;
2017
18+ private final int threadPoolQueueSize ;
19+
2120 private final MessageHandler messageHandler ;
2221
2322 private final MessageSource messageSource ;
2423
25- public ReactiveBatchProcessor (
24+ ReactiveBatchProcessor (
2625 MessageSource messageSource ,
2726 MessageHandler messageHandler ,
28- int threads ) {
27+ int threads ,
28+ int threadPoolQueueSize ) {
2929 this .messageSource = messageSource ;
3030 this .threads = threads ;
3131 this .messageHandler = messageHandler ;
32+ this .threadPoolQueueSize = threadPoolQueueSize ;
3233 }
3334
34- public void start () {
35+ void start () {
3536
36- Scheduler scheduler = threadPoolScheduler (threads , 10 );
37+ Scheduler scheduler = threadPoolScheduler (threads , threadPoolQueueSize );
3738
3839 messageSource .getMessageBatches ()
3940 .subscribeOn (Schedulers .from (Executors .newSingleThreadExecutor ()))
@@ -42,35 +43,7 @@ public void start() {
4243 .flatMapSingle (m -> Single .defer (() -> Single .just (m )
4344 .map (messageHandler ::handleMessage ))
4445 .subscribeOn (scheduler ))
45- .subscribeWith (subscriber ());
46- }
47-
48- private Subscriber <MessageHandler .Result > subscriber () {
49- return new Subscriber <>() {
50- private Subscription subscription ;
51-
52- @ Override
53- public void onSubscribe (Subscription subscription ) {
54- this .subscription = subscription ;
55- subscription .request (threads );
56- logger .log ("subscribed" );
57- }
58-
59- @ Override
60- public void onNext (Result message ) {
61- subscription .request (threads );
62- }
63-
64- @ Override
65- public void onError (Throwable t ) {
66- logger .log ("error" );
67- }
68-
69- @ Override
70- public void onComplete () {
71- logger .log ("completed" );
72- }
73- };
46+ .subscribeWith (new SimpleSubscriber <>(threads , 1 ));
7447 }
7548
7649 private Scheduler threadPoolScheduler (int poolSize , int queueSize ) {
@@ -79,7 +52,8 @@ private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
7952 poolSize ,
8053 0L ,
8154 TimeUnit .SECONDS ,
82- new LinkedBlockingDeque <>(queueSize )
55+ new LinkedBlockingDeque <>(queueSize ),
56+ new WaitForCapacityPolicy ()
8357 ));
8458 }
8559
0 commit comments