How to access OfferAsync method of Source.Queue when using Akka.Net Graphs?

39 Views Asked by At

I am working on an Akka.net Streams application that uses Graph API. I would like to provide source data via OfferAsync method of Source.Queue.

How do I access ISourceQueueWithComplete.OfferAsync method after creating a graph to add data to the stream?

Here is a code that I am using to create a graph:


    // Create a graph
    var runnableGraph = RunnableGraph.FromGraph(GraphDsl.Create(
        builder =>
    {
       // create source
       var sourceQueue = Source.Queue<int>(100, OverflowStrategy.Fail)
       
       // use builder to configure the graph
       ...
    }

    // run the graph
    runnableGraph.Run(materializer);

Here is the code that I want to use to get data for the stream:

while (true)
{
   var events = GetEventsFromExternalSource();
   foreach(var singleEvent in events)
   {
      sourceQueueWithComplete.OfferAsync(singleEvent);
   }
}
   
1

There are 1 best solutions below

0
On BEST ANSWER

You just need to pass the Source.Queue<T> in as an input variable when you initially call the GraphDsl - this will allow the graph to have a materialization type, which you can access once the graph is compiled:

var actorSystem = ActorSystem.Create("Test");

// create source
var sourceQueue = Source.Queue<int>(100, OverflowStrategy.Fail);

var graph = GraphDsl.Create(sourceQueue, (builder, source) =>
{
    // connected shapes
    var flow = builder.Add(Flow.Create<int>().Select(i =i * 10));
    var sink = builder.Add(Sink.ForEach<int>(i =Console.WriteLine(i)));
    
    builder.From(source).To(flow);
    builder.From(flow).To(sink);
    
    return ClosedShape.Instance;
});

ISourceQueueWithComplete<intqueueSource = actorSystem.Materializer().Materialize(graph);

foreach(var i in Enumerable.Range(0, 10)){
    await queueSource.OfferAsync(i);
}

Executing this program will result in:

0
10
20
30
40
50
60
70
80
90