I am trying to broadcast incoming Source[ByteString, Any] to 2 different flows and then fan-in(zip) the output. However I'm getting error "overloaded method ~> with alternatives".
val byteStringSource: Source[ByteString, Any] = Source.fromIterator(() => (1 to 10).map(i => ByteString(s"Element $i")).iterator)
val incrementer = Flow[String].map{ x =>
x
}
val multiplier = Flow[String].map{ x =>
x
}
val output = Sink.foreach[(Type1, Type2)] { n1 =>
println(s"First obj is ${(n1._1.toString)} & second obj is ${n1._2.toString}")
}
val graph = RunnableGraph.fromGraph(
GraphDSL.create() {implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[String](2))
val zip = builder.add(Zip[Type1, Type2])//fan-in operator
byteStringSource ~> broadcast
broadcast.out(0) ~> incrementer ~> zip.in0
broadcast.out(1) ~> multiplier ~> zip.in1
zip.out ~> output
ClosedShape
}
)
graph.run()
How do I get around with this issue?
Your source is of type
ByteStringbut the broadcast element is of typeString, so the~>operator won't be applicable.For your simple example you can remove ByteString and just use plain strings. If your real situation uses more complex types, you can map the original source into the type that the broadcast will accept, so something like
and just use
theSourcein your GraphDSL.