I have a use case where I have a Source that provides an "address" of a service. Then I want to map that address to a source that streams data from that service. Now, in case I will get a new address I'd like to create a new Source for that address and continue streaming from the second source, while stopping the previous source (because there is no guaranty what is going to happen to the last address/service/source).
So far I found that flatMapConcat is the closest to what I need, but I'd like to stop the previous sources and keep the latest one.
In a way I'd like to have:
AddressSource
.flatMatLatest(address => StreamingSource.from(address))
// at this point we should be receiving elements produced by the latest StreamingSource
.to(Sink...)
Admittedly, this is only mentally compiled, but something like this should work:
Basically:
flatMapConcatwith a killswitch and prematerialize that source so that you get access to the killswitchflatMapConcatThe killswitch will complete the emitted source, causing
flatMapConcatto demand another source, which is readily available (since it was emitted immediately after triggering the killswitch) in thedetach(basically a buffer of 1). Since thedetachis now empty, demand is signaled through thestatefulMaptosourceOfAddresses.