I am working on an Akka.net Streams application that uses Graph API. I would like to provide source data via OfferAsync method of Source.Queue.
How do I access ISourceQueueWithComplete.OfferAsync method after creating a graph to add data to the stream?
Here is a code that I am using to create a graph:
// Create a graph
var runnableGraph = RunnableGraph.FromGraph(GraphDsl.Create(
builder =>
{
// create source
var sourceQueue = Source.Queue<int>(100, OverflowStrategy.Fail)
// use builder to configure the graph
...
}
// run the graph
runnableGraph.Run(materializer);
Here is the code that I want to use to get data for the stream:
while (true)
{
var events = GetEventsFromExternalSource();
foreach(var singleEvent in events)
{
sourceQueueWithComplete.OfferAsync(singleEvent);
}
}
You just need to pass the
Source.Queue<T>
in as an input variable when you initially call theGraphDsl
- this will allow the graph to have a materialization type, which you can access once the graph is compiled:Executing this program will result in: