I'm using the streaming package.
I want to use result of one step defined by the S.store as a parameter to a following step in the pipeline by preserving the constant memory. The myStream is loaded and parsed from a file.
I have a following example that typechecks:
import qualified Streaming.Prelude as S
import qualified Data.Map.Strict as M
data A = MkA deriving (Show)
insertA :: MonadIO m => S.Stream (S.Of A) m r -> m (M.Map String Int)
insertA = undefined
insertB :: MonadIO m => M.Map String Int -> S.Stream (S.Of A) m r -> m Int
insertB = undefined
myStream :: S.Stream (S.Of A) IO r
myStream = undefined
run :: IO ()
run =
myStream
& S.store insertA
& insertB M.empty
& print
However, the line & insertB M.empty is taking an empty map but I want to use the map from the previous step, from the insertA function.
The insertB function then uses this Map for a lookup.
The solution I can think of is the following:
run :: IO ()
run =
myStream
& S.store insertA
& ( \e -> do
resultMap <- S.effects e
insertB resultMap e
)
& print
Questions
Does this preserve streaming benefits like running in constant memory?
How does it solve this in the background, since the stream needs to be processed as a whole to get the Map? It passes the same stream multiple times - loads it from a file 2 times to preserve the constant memory?
In case this would be the case (loads the file 2 times), what if the source of the stream was not from parsing a file but from some data stream that can be read only once?
Is there any other elegant solution to this problem that also holds the benefits of streaming where the next step in a pipeline needs to use the result of the previous one?
There's a problem with the proposed code here:
The problem is that you are "running" the same stream twice, and that is usually problematic for
IO-based streams.For example, imagine
myStreamreads from a file handle. By the time we invokeinsertBfor the second pass,effectswill have already reached end-of-file! Any further reads from the handle won't return any data.Of course, we could read the same file twice with two different streams. That preserves streaming, but requires two passes.
It should be noted though that for certain base monads that have built-in resource management, like resourcet, you can run the same
Streamvalue twice, because the stream code is "smart" enough to allocate and deallocate the underlying resources each time the stream is run.For example, the version of the
Streamtype present in linear-base supports the functionreadFile:Which returns a
Streamworking in a resource-awareIO.That said, I'm not a fan of hiding such repeated reads of a file in a streaming pipeline, it seems confusing to me.