Transforming a Connectable Flux into Hot via share operator not working

148 Views Asked by At

I have below program which first creates an interval flux, taking 5 elements and subscribe to it.
Post that,I convert it to connectable flux using replay operator with auto connect 2 and then convert it to hot publisher. However,any subequent subsription doesn't receive any data irrespective of any number of subscriber I add. (sleep is used here to show effect of hot stream)

        Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(5);
        flux.subscribe();
        flux = flux.replay().autoConnect(2).share();   //hot publisher     
        flux.subscribe(aLong -> System.out.println("first " + aLong));   //no data
        sleep(2000);
        flux.subscribe(aLong -> System.out.println("second " + aLong));  //no data

The catching thing is that, if I set auto connect to 1 (effectively making it a normal flux), then expected behaviour is observed (Output shown below).

Output:
first 0
first 1
first 2
second 2
first 3
second 3
first 4
second 4

Please clarify or correct me, if I am wrong.

1

There are 1 best solutions below

0
G SriHAri On BEST ANSWER
 Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(2);
    flux.subscribe(aLong -> System.out.println("source " + aLong));
    flux = flux.replay().autoConnect(2).share();   //hot publisher
    flux.subscribe(aLong -> System.out.println("first " + aLong));   
    sleep(2000);
    flux.subscribe(aLong -> System.out.println("second " + aLong));

output : source 0 source 1 (here only source printed because, we are calling share() after subscribe)

 Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(2);
    flux.replay().autoConnect(3).share().
        subscribe(aLong -> System.out.println("source " + aLong));   
    flux.subscribe(aLong -> System.out.println("first " + aLong));
    sleep(2000);
    flux.subscribe(aLong -> System.out.println("second " + aLong));

output : first 0 first 1 second 0 second 1 (here source not printed )

Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(2).replay().autoConnect(3);
    flux.share().subscribe(aLong -> System.out.println("source " + aLong));  
    flux.subscribe(aLong -> System.out.println("first " + aLong));
    sleep(2000);
    flux.subscribe(aLong -> System.out.println("second " + aLong));

output : source 0 first 0 second 0 source 1 first 1 second 1

Observe these 3 scenarios and its outputs,its important here where and when you are subscribed and shared