Skip to content

Commit 23abb50

Browse files
SimY4kdavisk6
authored andcommitted
Added configuration for reactive scheduler (OpenFeign#1032)
* Added configuration for reactive scheduler * make final things explicitly final.
1 parent e282510 commit 23abb50

5 files changed

Lines changed: 59 additions & 25 deletions

File tree

reactive/src/main/java/feign/reactive/ReactorFeign.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
package feign.reactive;
1515

1616
import feign.Feign;
17-
import feign.reactive.ReactiveFeign.Builder;
17+
import reactor.core.scheduler.Scheduler;
18+
import reactor.core.scheduler.Schedulers;
1819
import java.lang.reflect.InvocationHandler;
1920
import java.lang.reflect.Method;
2021
import java.util.Map;
@@ -29,24 +30,36 @@ public static Builder builder() {
2930

3031
public static class Builder extends ReactiveFeign.Builder {
3132

33+
private Scheduler scheduler = Schedulers.elastic();
34+
3235
@Override
3336
public Feign build() {
34-
super.invocationHandlerFactory(new ReactorInvocationHandlerFactory());
37+
super.invocationHandlerFactory(new ReactorInvocationHandlerFactory(scheduler));
3538
return super.build();
3639
}
3740

3841
@Override
39-
public Feign.Builder invocationHandlerFactory(
40-
InvocationHandlerFactory invocationHandlerFactory) {
42+
public Builder invocationHandlerFactory(InvocationHandlerFactory invocationHandlerFactory) {
4143
throw new UnsupportedOperationException(
4244
"Invocation Handler Factory overrides are not supported.");
4345
}
46+
47+
public Builder scheduleOn(Scheduler scheduler) {
48+
this.scheduler = scheduler;
49+
return this;
50+
}
4451
}
4552

4653
private static class ReactorInvocationHandlerFactory implements InvocationHandlerFactory {
54+
private final Scheduler scheduler;
55+
56+
private ReactorInvocationHandlerFactory(Scheduler scheduler) {
57+
this.scheduler = scheduler;
58+
}
59+
4760
@Override
4861
public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) {
49-
return new ReactorInvocationHandler(target, dispatch);
62+
return new ReactorInvocationHandler(target, dispatch, scheduler);
5063
}
5164
}
5265
}

reactive/src/main/java/feign/reactive/ReactorInvocationHandler.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,25 @@
2020
import org.reactivestreams.Publisher;
2121
import reactor.core.publisher.Flux;
2222
import reactor.core.publisher.Mono;
23-
import reactor.core.scheduler.Schedulers;
23+
import reactor.core.scheduler.Scheduler;
2424

2525
public class ReactorInvocationHandler extends ReactiveInvocationHandler {
26+
private final Scheduler scheduler;
2627

2728
ReactorInvocationHandler(Target<?> target,
28-
Map<Method, MethodHandler> dispatch) {
29+
Map<Method, MethodHandler> dispatch,
30+
Scheduler scheduler) {
2931
super(target, dispatch);
32+
this.scheduler = scheduler;
3033
}
3134

3235
@Override
3336
protected Publisher invoke(Method method, MethodHandler methodHandler, Object[] arguments) {
3437
Publisher<?> invocation = this.invokeMethod(methodHandler, arguments);
3538
if (Flux.class.isAssignableFrom(method.getReturnType())) {
36-
return Flux.from(invocation).subscribeOn(Schedulers.elastic());
39+
return Flux.from(invocation).subscribeOn(scheduler);
3740
} else if (Mono.class.isAssignableFrom(method.getReturnType())) {
38-
return Mono.from(invocation).subscribeOn(Schedulers.elastic());
41+
return Mono.from(invocation).subscribeOn(scheduler);
3942
}
4043
throw new IllegalArgumentException(
4144
"Return type " + method.getReturnType().getName() + " is not supported");

reactive/src/main/java/feign/reactive/RxJavaFeign.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import feign.Feign;
2020
import feign.InvocationHandlerFactory;
2121
import feign.Target;
22+
import io.reactivex.Scheduler;
23+
import io.reactivex.schedulers.Schedulers;
2224

2325
public class RxJavaFeign extends ReactiveFeign {
2426

@@ -28,25 +30,36 @@ public static Builder builder() {
2830

2931
public static class Builder extends ReactiveFeign.Builder {
3032

33+
private Scheduler scheduler = Schedulers.trampoline();
34+
3135
@Override
3236
public Feign build() {
33-
super.invocationHandlerFactory(new RxJavaInvocationHandlerFactory());
37+
super.invocationHandlerFactory(new RxJavaInvocationHandlerFactory(scheduler));
3438
return super.build();
3539
}
3640

3741
@Override
38-
public Feign.Builder invocationHandlerFactory(
39-
InvocationHandlerFactory invocationHandlerFactory) {
42+
public Builder invocationHandlerFactory(InvocationHandlerFactory invocationHandlerFactory) {
4043
throw new UnsupportedOperationException(
4144
"Invocation Handler Factory overrides are not supported.");
4245
}
4346

47+
public Builder scheduleOn(Scheduler scheduler) {
48+
this.scheduler = scheduler;
49+
return this;
50+
}
4451
}
4552

4653
private static class RxJavaInvocationHandlerFactory implements InvocationHandlerFactory {
54+
private final Scheduler scheduler;
55+
56+
private RxJavaInvocationHandlerFactory(Scheduler scheduler) {
57+
this.scheduler = scheduler;
58+
}
59+
4760
@Override
4861
public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) {
49-
return new RxJavaInvocationHandler(target, dispatch);
62+
return new RxJavaInvocationHandler(target, dispatch, scheduler);
5063
}
5164
}
5265

reactive/src/main/java/feign/reactive/RxJavaInvocationHandler.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,24 @@
1616
import feign.InvocationHandlerFactory.MethodHandler;
1717
import feign.Target;
1818
import io.reactivex.Flowable;
19-
import io.reactivex.schedulers.Schedulers;
19+
import io.reactivex.Scheduler;
2020
import java.lang.reflect.Method;
2121
import java.util.Map;
2222
import org.reactivestreams.Publisher;
2323

2424
public class RxJavaInvocationHandler extends ReactiveInvocationHandler {
25+
private final Scheduler scheduler;
2526

2627
RxJavaInvocationHandler(Target<?> target,
27-
Map<Method, MethodHandler> dispatch) {
28+
Map<Method, MethodHandler> dispatch,
29+
Scheduler scheduler) {
2830
super(target, dispatch);
31+
this.scheduler = scheduler;
2932
}
3033

3134
@Override
3235
protected Publisher invoke(Method method, MethodHandler methodHandler, Object[] arguments) {
3336
return Flowable.fromPublisher(this.invokeMethod(methodHandler, arguments))
34-
.observeOn(Schedulers.trampoline());
37+
.observeOn(scheduler);
3538
}
3639
}

reactive/src/test/java/feign/reactive/ReactiveInvocationHandlerTest.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,16 @@
2323
import feign.RequestLine;
2424
import feign.Target;
2525
import io.reactivex.Flowable;
26-
2726
import java.io.IOException;
2827
import java.lang.reflect.Method;
2928
import java.util.Collections;
30-
3129
import org.junit.Before;
3230
import org.junit.Test;
3331
import org.junit.runner.RunWith;
3432
import org.mockito.Mock;
3533
import org.mockito.runners.MockitoJUnitRunner;
3634
import reactor.core.publisher.Mono;
35+
import reactor.core.scheduler.Schedulers;
3736
import reactor.test.StepVerifier;
3837

3938
@RunWith(MockitoJUnitRunner.class)
@@ -57,7 +56,7 @@ public void setUp() throws NoSuchMethodException {
5756
public void invokeOnSubscribeReactor() throws Throwable {
5857
given(this.methodHandler.invoke(any())).willReturn("Result");
5958
ReactorInvocationHandler handler = new ReactorInvocationHandler(this.target,
60-
Collections.singletonMap(method, this.methodHandler));
59+
Collections.singletonMap(method, this.methodHandler), Schedulers.elastic());
6160

6261
Object result = handler.invoke(method, this.methodHandler, new Object[] {});
6362
assertThat(result).isInstanceOf(Mono.class);
@@ -75,7 +74,7 @@ public void invokeOnSubscribeReactor() throws Throwable {
7574
public void invokeOnSubscribeEmptyReactor() throws Throwable {
7675
given(this.methodHandler.invoke(any())).willReturn(null);
7776
ReactorInvocationHandler handler = new ReactorInvocationHandler(this.target,
78-
Collections.singletonMap(method, this.methodHandler));
77+
Collections.singletonMap(method, this.methodHandler), Schedulers.elastic());
7978

8079
Object result = handler.invoke(method, this.methodHandler, new Object[] {});
8180
assertThat(result).isInstanceOf(Mono.class);
@@ -92,7 +91,7 @@ public void invokeOnSubscribeEmptyReactor() throws Throwable {
9291
public void invokeFailureReactor() throws Throwable {
9392
given(this.methodHandler.invoke(any())).willThrow(new IOException("Could Not Decode"));
9493
ReactorInvocationHandler handler = new ReactorInvocationHandler(this.target,
95-
Collections.singletonMap(this.method, this.methodHandler));
94+
Collections.singletonMap(this.method, this.methodHandler), Schedulers.elastic());
9695

9796
Object result = handler.invoke(this.method, this.methodHandler, new Object[] {});
9897
assertThat(result).isInstanceOf(Mono.class);
@@ -111,7 +110,8 @@ public void invokeOnSubscribeRxJava() throws Throwable {
111110
given(this.methodHandler.invoke(any())).willReturn("Result");
112111
RxJavaInvocationHandler handler =
113112
new RxJavaInvocationHandler(this.target,
114-
Collections.singletonMap(this.method, this.methodHandler));
113+
Collections.singletonMap(this.method, this.methodHandler),
114+
io.reactivex.schedulers.Schedulers.trampoline());
115115

116116
Object result = handler.invoke(this.method, this.methodHandler, new Object[] {});
117117
assertThat(result).isInstanceOf(Flowable.class);
@@ -129,8 +129,9 @@ public void invokeOnSubscribeRxJava() throws Throwable {
129129
public void invokeOnSubscribeEmptyRxJava() throws Throwable {
130130
given(this.methodHandler.invoke(any())).willReturn(null);
131131
RxJavaInvocationHandler handler =
132-
new RxJavaInvocationHandler(this.target,
133-
Collections.singletonMap(this.method, this.methodHandler));
132+
new RxJavaInvocationHandler(this.target,
133+
Collections.singletonMap(this.method, this.methodHandler),
134+
io.reactivex.schedulers.Schedulers.trampoline());
134135

135136
Object result = handler.invoke(this.method, this.methodHandler, new Object[] {});
136137
assertThat(result).isInstanceOf(Flowable.class);
@@ -148,7 +149,8 @@ public void invokeFailureRxJava() throws Throwable {
148149
given(this.methodHandler.invoke(any())).willThrow(new IOException("Could Not Decode"));
149150
RxJavaInvocationHandler handler =
150151
new RxJavaInvocationHandler(this.target,
151-
Collections.singletonMap(this.method, this.methodHandler));
152+
Collections.singletonMap(this.method, this.methodHandler),
153+
io.reactivex.schedulers.Schedulers.trampoline());
152154

153155
Object result = handler.invoke(this.method, this.methodHandler, new Object[] {});
154156
assertThat(result).isInstanceOf(Flowable.class);

0 commit comments

Comments
 (0)