Async processing of Conduit elements

152 Views Asked by At

Suppose we have a conduit stream and we'd like to process each incoming element asynchronously.

Assume

import Control.Concurrent (threadDelay)
import Control.Monad.IO.Class (MonadIO(..))
-- async
import Control.Concurrent.Async (Concurrently(..), Async, wait, withAsync, mapConcurrently)
-- conduit
import qualified Data.Conduit as C (ConduitT, runConduit)
import Data.Conduit ((.|))
import qualified Data.Conduit.Combinators as C (sinkLazy, map, mapM)
import qualified Data.Conduit.List as C (sourceList)

This first implementation notConcurrently doesn't do the job since wait blocks the execution of the conduit at each element.

notConcurrently :: MonadIO m => (a -> IO b) -> C.ConduitT a b m ()
notConcurrently io = C.mapM (\x -> liftIO $ withAsync (io x) wait)

Another possibility is to pass the Async down the conduit and then wait at the end:

concurrentlyC2 :: MonadIO m => (t -> IO a) -> C.ConduitT t (Async a) m ()
concurrentlyC2 io = C.mapM $ \x -> liftIO $ withAsync (io x) pure

awaitSink :: (MonadIO m) => C.ConduitT (Async a) o m [a]
awaitSink = do
  xs <- C.sinkList
  liftIO $ traverse wait xs

The problem here is that we need to tell the main thread to wait for all the child asyncs otherwise they are canceled. Any suggestions?


some test code :

blip :: MonadIO m => Int -> m ()
blip i = liftIO $ do
  _ <- threadDelay 1000000
  putStrLn $ unwords ["hello from thread", show i]


test1 :: IO [()]
test1 = C.runConduit $ C.sourceList [1..10] .| concurrentlyC2 blip .| awaitSink
0

There are 0 best solutions below