Akka to Scala Cats FS2 migration

253 Views Asked by At

"Source" and "OverflowStrategy" are parts of Akka streams. How can I reimplement below code in FS2?

object Topic {
  def apply: Topic = {
    val (a1, a2) = Source.queue[Message[_]](100, OverflowStrategy.backpressure).preMaterialize()
    val (b1, b2) = Source.queue[Signal](100, OverflowStrategy.backpressure).preMaterialize()
    new Topic(a1, a2, b1, b2)
  }
}
1

There are 1 best solutions below

0
Stanislav Kovalenko On

In fs2 there are several constructors, for example Queue.bounded[F, M](maxSize) or Queue.circularBuffer[F, M](maxSize). For backpressure you should use Queue.bounded[F, M](maxSize). It will do the same as the backpressure strategy for akka queue.

The full list of methods: https://www.javadoc.io/doc/co.fs2/fs2-core_2.13/2.2.2/fs2/concurrent/Queue$.html

You will have something like this:

object Topic {
  def apply[F]: F[Topic] = for {
    a <- Queue.bounded[F, Message[_]](100)
    b <- Queue.bounded[F, Signal](100)
  } yield new Topic(a, b)
}

Also, you can consider using prebuild Topic type from FS2. https://www.javadoc.io/doc/co.fs2/fs2-core_2.13/2.2.2/fs2/concurrent/Topic.html

Topic allows you to distribute A published by arbitrary number of publishers to arbitrary number of subscribers. Topic has built-in back-pressure support implemented as maximum bound (maxQueued) that a subscriber is allowed to enqueue. Once that bound is hit, publishing may semantically block until the lagging subscriber consumes some of its queued elements.