I want to reference the materialized value from the flow. Below is the code snippet, but its not compiling, error:
type mismatch;
found : (akka.NotUsed, scala.concurrent.Future[akka.Done])
required: (Playground.DomainObj, scala.concurrent.Future[akka.Done])
Code:
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import scala.concurrent.Future
import akka.NotUsed
import akka.Done
implicit val actorSystem = ActorSystem("example")
case class DomainObj(name: String, age: Int)
val customFlow1:Flow[String,DomainObj,NotUsed] = Flow[String].map(s => {
DomainObj(s, 50)
})
val customFlow2 = Flow[DomainObj].map(s => {
s.age + 10
})
val printAnySink: Sink[Any, Future[Done]] = Sink.foreach(println)
val c1 = Source.single("John").viaMat(customFlow1)(Keep.right).viaMat(customFlow2)(Keep.left).toMat(printAnySink)(Keep.both)
val res: (DomainObj, Future[Done]) = c1.run()
Find the code in playground: https://scastie.scala-lang.org/P9iSx49cQcaOZfKtVCzTPA
I want to reference the DomainObj after the stream completes/
The materialized value of a
Flow[String, DomainObj, NotUsed]isNotUsed, not aDomainObj, thereforec1's materialized value is(NotUsed, Future[Done]).It looks like the intent here is to capture the
DomainObjwhich is created incustomFlow1. That can be accomplished withNote that
Sink.headeffectively requires thatcustomFlow1can only be used downstream of something that only emits once.