I have been playing with Elixir Flow for some time now and recently I was trying to parallelize my workflow using Flow and Repo.stream using:
endless_db_stream = MyRepo.stream(some_query)
MyRepo.transaction(fn ->
endless_db_stream
|> Flow.from_enumerable()
|> Flow.each(&process(&1))
|> Flow.run
end)
but it just doesn't work. Now I have did some research and stumble into this comment from Jose Valim saying basically Repo.stream aren't really compatible with GenStage and I believe its also not compatible with Flow (since its built atop of GenStage).
My question is, has anyone use PSQL as the unbounded data source for Flow before?
P/S: In the same GitHub thread above there is a "hack" that uses a GenStage to wrap the Repo.stream then act as a producer, but I was looking for a more streamlined approach as I was planning to use Flow instead of GenStage
There are two ways to implement it in our projects.
the simpler way: use resources ids as the data source.
You can list all resources ids first, then get every resource separately in flow.
the normal way: use
Stream.resource/3to customize the data source.You can also make a stream by
Stream.resource/3, use a paginated query to get some resources at once.see more detail here, How to build Streams in Elixir easily with Stream.resource/3 Awesomeness