Getting overloaded method ~> with alternatives Akka broadcast

54 Views Asked by At

I am trying to broadcast incoming Source[ByteString, Any] to 2 different flows and then fan-in(zip) the output. However I'm getting error "overloaded method ~> with alternatives".

val byteStringSource: Source[ByteString, Any] = Source.fromIterator(() => (1 to 10).map(i => ByteString(s"Element $i")).iterator)

  val incrementer = Flow[String].map{ x =>
    x
  }
  val multiplier = Flow[String].map{ x =>
    x
  }

  val output = Sink.foreach[(Type1, Type2)] { n1 =>
    println(s"First obj is ${(n1._1.toString)} & second obj is ${n1._2.toString}")
  }

  val graph = RunnableGraph.fromGraph(
    GraphDSL.create() {implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._

      val broadcast = builder.add(Broadcast[String](2))
      val zip = builder.add(Zip[Type1, Type2])//fan-in operator

      byteStringSource ~> broadcast
      broadcast.out(0) ~> incrementer ~> zip.in0
      broadcast.out(1) ~> multiplier ~> zip.in1

      zip.out ~> output
      ClosedShape
    }
  )
  graph.run()

How do I get around with this issue?

1

There are 1 best solutions below

0
webuster On

Your source is of type ByteString but the broadcast element is of type String, so the ~> operator won't be applicable.

For your simple example you can remove ByteString and just use plain strings. If your real situation uses more complex types, you can map the original source into the type that the broadcast will accept, so something like

val theSource = byteStringSource.map(byteString => ...)

and just use theSource in your GraphDSL.