Akka Streams flatMapConcat that stops previous sources as new ones are created

42 Views Asked by At

I have a use case where I have a Source that provides an "address" of a service. Then I want to map that address to a source that streams data from that service. Now, in case I will get a new address I'd like to create a new Source for that address and continue streaming from the second source, while stopping the previous source (because there is no guaranty what is going to happen to the last address/service/source).

So far I found that flatMapConcat is the closest to what I need, but I'd like to stop the previous sources and keep the latest one.

In a way I'd like to have:

AddressSource
  .flatMatLatest(address => StreamingSource.from(address))
  // at this point we should be receiving elements produced by the latest StreamingSource
  .to(Sink...)
1

There are 1 best solutions below

0
Levi Ramsey On

Admittedly, this is only mentally compiled, but something like this should work:

sourceOfAddresses
  .statefulMap(() => Option.empty[KillSwitch])(
    { (lastEmittedKillswitch, address) =>
      lastEmittedKillswitch.foreach(_.shutdown)

      // this just happens to match the desired ordering of the tuple, woohoo!
      buildSourceForAddress(address)
        .viaMat(Killswitches.single)(Keep.right)
        .preMaterialize()  // will need the materializer in implicit scope for this to work
    },
    _ => None
  )
  .detach // in combination with the shutdown above, ensure perpetual demand
  .flatMapConcat(identity)

Basically:

  • build every source you're passing to flatMapConcat with a killswitch and prematerialize that source so that you get access to the killswitch
  • save that killswitch before passing to flatMapConcat
  • when you get a new address, if there's a saved killswitch, activate it

The killswitch will complete the emitted source, causing flatMapConcat to demand another source, which is readily available (since it was emitted immediately after triggering the killswitch) in the detach (basically a buffer of 1). Since the detach is now empty, demand is signaled through the statefulMap to sourceOfAddresses.