Here's a basic demonstration of a hot publisher in Project Reactor
@Test
void testSimpleFlux() {
Flux<Integer> intFlux = Flux.just(1, 2, 3, 4, 5)
.publish()
.autoConnect()
.log();
intFlux.subscribeOn(Schedulers.newParallel("Thread-one"))
.subscribe(System.out::println);
intFlux.subscribeOn(Schedulers.newParallel("Thread-two"))
.subscribe(System.out::println);
}
What I expected:
The first subscriber consumes all the elements (it subscribed and triggered emission first), while the second subscriber consumes only some of the elements since it comes late for the party (for example, only 2, 3, 4, and 5)
What actually happened:
The first subscriber got nothing, while the second subscriber consumed every integer twice
08:10:04.095 [Thread-one-1] INFO reactor.Flux.AutoConnect.1 -- onSubscribe(FluxPublish.PublishInner)
08:10:04.095 [Thread-two-2] INFO reactor.Flux.AutoConnect.1 -- onSubscribe(FluxPublish.PublishInner)
08:10:04.106 [Thread-two-2] INFO reactor.Flux.AutoConnect.1 -- request(unbounded)
08:10:04.106 [Thread-one-1] INFO reactor.Flux.AutoConnect.1 -- request(unbounded)
08:10:04.108 [Thread-two-2] INFO reactor.Flux.AutoConnect.1 -- onNext(1)
1
08:10:04.109 [Thread-two-2] INFO reactor.Flux.AutoConnect.1 -- onNext(1)
1
08:10:04.109 [Thread-two-2] INFO reactor.Flux.AutoConnect.1 -- onNext(2)
2
08:10:04.109 [Thread-two-2] INFO reactor.Flux.AutoConnect.1 -- onNext(2)
2
08:10:04.109 [Thread-two-2] INFO reactor.Flux.AutoConnect.1 -- onNext(3)
3
08:10:04.109 [Thread-two-2] INFO reactor.Flux.AutoConnect.1 -- onNext(3)
3
08:10:04.110 [Thread-two-2] INFO reactor.Flux.AutoConnect.1 -- onNext(4)
4
08:10:04.110 [Thread-two-2] INFO reactor.Flux.AutoConnect.1 -- onNext(4)
4
08:10:04.110 [Thread-two-2] INFO reactor.Flux.AutoConnect.1 -- onNext(5)
5
08:10:04.111 [Thread-two-2] INFO reactor.Flux.AutoConnect.1 -- onNext(5)
5
08:10:04.111 [Thread-two-2] INFO reactor.Flux.AutoConnect.1 -- onComplete()
08:10:04.112 [Thread-two-2] INFO reactor.Flux.AutoConnect.1 -- onComplete()
How could you explain it?