Suppose we have a conduit stream and we'd like to process each incoming element asynchronously.
Assume
import Control.Concurrent (threadDelay)
import Control.Monad.IO.Class (MonadIO(..))
-- async
import Control.Concurrent.Async (Concurrently(..), Async, wait, withAsync, mapConcurrently)
-- conduit
import qualified Data.Conduit as C (ConduitT, runConduit)
import Data.Conduit ((.|))
import qualified Data.Conduit.Combinators as C (sinkLazy, map, mapM)
import qualified Data.Conduit.List as C (sourceList)
This first implementation notConcurrently doesn't do the job since wait blocks the execution of the conduit at each element.
notConcurrently :: MonadIO m => (a -> IO b) -> C.ConduitT a b m ()
notConcurrently io = C.mapM (\x -> liftIO $ withAsync (io x) wait)
Another possibility is to pass the Async down the conduit and then wait at the end:
concurrentlyC2 :: MonadIO m => (t -> IO a) -> C.ConduitT t (Async a) m ()
concurrentlyC2 io = C.mapM $ \x -> liftIO $ withAsync (io x) pure
awaitSink :: (MonadIO m) => C.ConduitT (Async a) o m [a]
awaitSink = do
xs <- C.sinkList
liftIO $ traverse wait xs
The problem here is that we need to tell the main thread to wait for all the child asyncs otherwise they are canceled. Any suggestions?
some test code :
blip :: MonadIO m => Int -> m ()
blip i = liftIO $ do
_ <- threadDelay 1000000
putStrLn $ unwords ["hello from thread", show i]
test1 :: IO [()]
test1 = C.runConduit $ C.sourceList [1..10] .| concurrentlyC2 blip .| awaitSink