Skip to content

publish() and connect() without subscribers drains upstream() #2178

@jonenst

Description

@jonenst

The behavior of publish() and connect() changes a lot depending on whether you have subscribers when you call connect(). With one subscriber, it respects the downstream request. Without a subscriber, it drains the upstream.

Expected Behavior

publish() and connect() without a subscriber should act like when there is a subscriber with no more request

Actual Behavior

publish() and connect() without a subscriber acts like when there is a subscriber doing an unbounded request

Steps to Reproduce

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;

public class Aze {

    public static void testit(int req1, int req2) {
        Flux<Integer> f = Flux.<Integer, Integer>generate(() -> 0, (state, sink) -> {
            if (state > 4) {
                sink.error(new RuntimeException("Shouldn't happen"));
            }
            sink.next(state);
            return state + 1;
        }).log();
        ConnectableFlux<Integer> ff = f.publish(2);
        if (req1 > 0) {
            ff.subscribe((x) -> { }, (x) -> {
                System.out.println("ERROR");
            }, () -> { }, (s) -> {
                s.request(req1);
            });
        }
        ff.connect();
        if (req2 > 0) {
            ff.subscribe((x) -> { }, (x) -> {
                System.out.println("ERROR");
            }, () -> { }, (s) -> {
                s.request(req2);
            });
        };
    }
    public static void main(String[] args) throws Exception {
        System.out.println("request 1 before");
        testit(1, -1);
        System.out.println("request 1 after");
        testit(-1, 1);
    }

}
request 1 before
10:43:21.820 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
10:43:21.852 [main] INFO reactor.Flux.Generate.1 - | onSubscribe([Fuseable] FluxGenerate.GenerateSubscription)
10:43:21.858 [main] INFO reactor.Flux.Generate.1 - | request(2)
10:43:21.859 [main] INFO reactor.Flux.Generate.1 - | onNext(0)
10:43:21.860 [main] INFO reactor.Flux.Generate.1 - | request(1)
10:43:21.860 [main] INFO reactor.Flux.Generate.1 - | onNext(1)
10:43:21.860 [main] INFO reactor.Flux.Generate.1 - | onNext(2)
request 1 after
10:43:21.860 [main] INFO reactor.Flux.Generate.2 - | onSubscribe([Fuseable] FluxGenerate.GenerateSubscription)
10:43:21.860 [main] INFO reactor.Flux.Generate.2 - | request(2)
10:43:21.860 [main] INFO reactor.Flux.Generate.2 - | onNext(0)
10:43:21.860 [main] INFO reactor.Flux.Generate.2 - | request(1)
10:43:21.860 [main] INFO reactor.Flux.Generate.2 - | onNext(1)
10:43:21.860 [main] INFO reactor.Flux.Generate.2 - | request(1)
10:43:21.860 [main] INFO reactor.Flux.Generate.2 - | onNext(2)
10:43:21.860 [main] INFO reactor.Flux.Generate.2 - | request(1)
10:43:21.860 [main] INFO reactor.Flux.Generate.2 - | onNext(3)
10:43:21.861 [main] INFO reactor.Flux.Generate.2 - | request(1)
10:43:21.861 [main] INFO reactor.Flux.Generate.2 - | onNext(4)
10:43:21.861 [main] INFO reactor.Flux.Generate.2 - | request(1)
10:43:21.865 [main] ERROR reactor.Flux.Generate.2 - | onError(java.lang.RuntimeException: Shouldn't happen)
10:43:21.868 [main] ERROR reactor.Flux.Generate.2 - 
java.lang.RuntimeException: Shouldn't happen
	at org.gridsuite.notification.server.Aze.lambda$1(Aze.java:14)
	at reactor.core.publisher.FluxGenerate$GenerateSubscription.slowPath(FluxGenerate.java:262)
	at reactor.core.publisher.FluxGenerate$GenerateSubscription.request(FluxGenerate.java:204)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:137)
	at reactor.core.publisher.FluxPublish$PublishSubscriber.onSubscribe(FluxPublish.java:241)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:171)
	at reactor.core.publisher.FluxGenerate.subscribe(FluxGenerate.java:83)
	at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:53)
	at reactor.core.publisher.FluxPublish.connect(FluxPublish.java:100)
	at reactor.core.publisher.ConnectableFlux.connect(ConnectableFlux.java:99)
	at org.gridsuite.notification.server.Aze.testit(Aze.java:30)
	at org.gridsuite.notification.server.Aze.main(Aze.java:46)
10:43:21.868 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 5

Possible Solution

Not sure but I looked into the publish code, at some point it does a "len == cancel" to check if every subscriber cancelled and this is true for no subscriber but maybe should be false ?

Your Environment

  • Reactor version(s) used:
    [INFO] | | - io.projectreactor:reactor-core:jar:3.3.3.RELEASE:compile

  • JVM version (javar -version):
    java version "11.0.5" 2019-10-15 LTS
    Java(TM) SE Runtime Environment 18.9 (build 11.0.5+10-LTS)
    Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.5+10-LTS, mixed mode)

  • OS and version (eg uname -a):
    Linux gm0winl149.bureau.si.interne 5.1.18-200.fc29.x86_64 - Minor cosmetic tweaks for EmitterProcessor #1 SMP Mon Jul 15 16:09:08 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux

Metadata

Metadata

Assignees

No one assigned

    Labels

    status/declinedWe feel we shouldn't currently apply this change/suggestion

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions