I'm trying to parse a file with a million lines, each line is a json string with some information about a book (author, contents etc). I'm using iota to load the file, as my program throws an OutOfMemoryError if I try to use slurp. I'm also using cheshire to parse the strings. The program simply loads a file and counts all the words in all books.
My first attempt included pmap to do the heavy work, I figured this would essentially make use of all my cpu cores.
(ns multicore-parsing.core
(:require [cheshire.core :as json]
[iota :as io]
[clojure.string :as string]
[clojure.core.reducers :as r]))
(defn words-pmap
[filename]
(letfn [(parse-with-keywords [str]
(json/parse-string str true))
(words [book]
(string/split (:contents book) #"\s+"))]
(->>
(io/vec filename)
(pmap parse-with-keywords)
(pmap words)
(r/reduce #(apply conj %1 %2) #{})
(count))))
While it does seem to use all cores, each core rarely uses more than 50% of its capacity, my guess is that it has to do with batch size of pmap and so I stumbled across relatively old question where some comments make reference to the clojure.core.reducers library.
I decided to rewrite the function using reducers/map:
(defn words-reducers
[filename]
(letfn [(parse-with-keywords [str]
(json/parse-string str true))
(words [book]
(string/split (:contents book) #"\s+"))]
(->>
(io/vec filename)
(r/map parse-with-keywords)
(r/map words)
(r/reduce #(apply conj %1 %2) #{})
(count))))
But the cpu usage is worse, and it even takes longer to finish compared to the previous implementation:
multicore-parsing.core=> (time (words-pmap "./dummy_data.txt"))
"Elapsed time: 20899.088919 msecs"
546
multicore-parsing.core=> (time (words-reducers "./dummy_data.txt"))
"Elapsed time: 28790.976455 msecs"
546
What am I doing wrong? Is mmap loading + reducers the correct approach when parsing a large file?
EDIT: this is the file I'm using.
EDIT2: Here are the timings with iota/seq instead of iota/vec:
multicore-parsing.core=> (time (words-reducers "./dummy_data.txt"))
"Elapsed time: 160981.224565 msecs"
546
multicore-parsing.core=> (time (words-pmap "./dummy_data.txt"))
"Elapsed time: 160296.482722 msecs"
546
I don't believe that reducers are going to be the right solution for you, as they don't cope with lazy sequences at all well (a reducer will give correct results with a lazy sequence, but won't parallelise well).
You might want to take a look at this sample code from the book Seven Concurrency Models in Seven Weeks (disclaimer: I am the author) which solves a similar problem (counting the number of times each word appears on Wikipedia).
Given a list of Wikipedia pages, this function counts the words sequentially (
get-wordsreturns a sequence of words from a page):This is a parallel version using
pmapwhich does run faster, but only around 1.5x faster:The reason it only goes around 1.5x faster is because the
reducebecomes a bottleneck—it's calling(partial merge-with +)once for each page. Merging batches of 100 pages improves the performance to around 3.2x on a 4-core machine: