The fs2 stream does not interrupt here:
import cats.effect.*
import fs2.Stream
import scala.concurrent.duration.*
import cats.effect.unsafe.implicits.global
val test = for {
cancel <- Deferred[IO, Either[Throwable, Unit]]
_ <- (IO.unit.delayBy(5.seconds).map { _ => println("Completing deferred"); cancel.complete(Right(())) }).start
_ <- Stream.awakeEvery[IO](1.second).map(x => println(x)).interruptWhen(cancel).compile.drain
} yield ()
test.unsafeRunSync()
but it interrupts if we swap the lines and fibers:
import cats.effect.*
import fs2.Stream
import scala.concurrent.duration.*
import cats.effect.unsafe.implicits.global
val test = for {
cancel <- Deferred[IO, Either[Throwable, Unit]]
_ <- Stream.awakeEvery[IO](1.second).map(x => println(x)).interruptWhen(cancel).compile.drain.start
_ <- (IO.unit.delayBy(5.seconds).map { _ => println("Completing deferred"); cancel.complete(Right(())) })
} yield ()
test.unsafeRunSync()
I wonder why...
The issue is that you are not using
IOproperly.Remember an
IO[A]is just a program description, a value. It does nothing on its own.When you call
cancel.completeyou are just creating a new program, it is not doing anything unless you compose it with other programs. And you are composing it in amapmethod; which doesn't really combine the programs, so yourcancelis lost, and thestartwill just create a fiber that will create such a program and discard it.In the second example, since
fortranslates everything to aflatMapyou ended up composing the program by accident.The quick solution is to use
flatMaprather thanmapin the first example. But, IMHO, a better solution is using proper combinators like this:You can see the code running here.