Intuition behind the functor in Stream f m r of the streaming library

145 Views Asked by At

A simplified (non-effectful) definition of Stream from the streaming library reads like this:

data Stream f = Step (f (Stream f))
              | Return

I'm trying to understand what the motivation for introducing this functor f is.

A typical such f is Of a, with Of defined as

data Of a b = !a :> b
    deriving Functor

When I read f as Of a in the definition of Stream this sort of makes sense, Of a b is something like an a followed by more that is obtainable from b. With this interpretation, the f (Stream f) in the definition of Stream reads something like Of a (Stream (Of a)). But in this case a simpler version would have been

data Stream a = Step a (Stream a)
              | Return

I struggle to understand why the generalization with this functor f is used. In the introduction the author says that Stream

... can be used to stream successive distinct steps characterized by any functor f

and

the general type Stream f m r expresses a succession of steps ... with a shape determined by the 'functor' parameter f.

But in Streaming.Prelude the only functors I can find are Or a, Stream (Of a) m and Identity. The first one makes a stream of as, the second a stream of streams, the third achieves 'erasure'.

I really don't get it. I can achieve all of these things, i.e. simple stream of as, stream of streams, and erasure, without using this f in Stream.

What does this functor f do that can't be done otherwise?

1

There are 1 best solutions below

4
danidiaz On

A key feature of streaming's grouping functions is that they don't force you to keep entire groups in memory at any point. They perform grouping in an "streaming" fashion. The moment the start of a new group is detected, you can begin processing the group "downstream".

For example, imagine a function like lines :: Stream (Of Text) IO r -> Stream (Stream (Of Text) IO) IO r. That function will start a new group whenever it detects a newline in the stream, even if the end of the new line hasn't materialized yet.

The type Stream (Stream (OfText) IO) IO r has the advantage that, to reach the line that comes after the current one, we must consume the current line completely. This is because the "rest of the stream" lies in the result value of the functor parameter. And Streams only give up their results once they have been exhausted.

A type like Stream (Of (Stream (Of a) IO ()) IO () would not force us to exhaust the current line before moving to the next one. We could simply ignore the inner Stream that was yielded advance the outer Stream. But that "next line" might not even exist yet, as we haven't actually read what came before! Perhaps this scheme could be made to work somehow, but the semantics would be less clear and require more knowledge from the user.


Functions like mapped let us consume "downstream" groups which are defined "upstream". For example, this code reads strings form stdin, where the string "-" is a group separator. The downstream code parses the strings into Ints and shows a message whenever a 5 is encountered. Notice that messages are shown before the group is closed upstream.

    import Streaming
    import qualified Streaming.Prelude as S

    example :: Stream (Of ()) IO () 
    example = 
        let upstream :: Stream (Stream (Of String) IO) IO ()
            upstream = S.breaks (=="-") S.stdinLn
            downstream :: Stream (Stream (Of String) IO) IO () 
                       -> Stream (Of ()) IO ()
            downstream = S.mapped handleGroup
            handleGroup :: Stream (Of String) IO r ->
                           IO (Of () r)
            handleGroup stream = do
                print "group began" 
                r <- S.effects
                   . S.chain (\_ -> putStrLn "found!") 
                   . S.filter (==5) 
                   . S.map (read @Int) 
                   $ stream 
                print "group ended"
                return (() :> r)
         in downstream upstream

    main :: IO ()
    main = S.effects example