I have a stream of directories from the readdirp
module.
I want to:-
- search for a file using a regex (e.g.
README.*
) in each directory - read the first line of that file that does not start with a
#
- print out each directory and this first non-heading line of the README in the directory.
I am trying to do this using streams and highland.js.
I am stuck trying to process a stream of all files inside each directory.
h = require 'highland'
dirStream = readdirp root: root, depth: 0, entryType: 'directories'
dirStream = h(dirStream)
.filter (entry) -> entry.stat.isDirectory()
.map (entry) ->
# Search all files in the directory for README.
fileStream = readdirp root: entry.fullPath, depth: 0, entryType: 'files', fileFilter: '!.DS_Store'
fileStream = h(fileStream).filter (entry) -> /README\..*/.test entry.name
fileStream.each (file) ->
readmeStream = fs.createReadStream file
_(readmeStream)
.split()
.takeUntil (line) -> not line.startsWith '#' and line isnt ''
.last(1)
.toArray (comment) ->
# TODO: How do I access `comment` asynchronously to include in the return value of the map?
return {name: entry.name, comment: comment}
It's best to consider Highland streams as immutable, and operations like
filter
andmap
returning new streams that depend on the old stream, rather than modifications of the old stream.Also, Highland methods are lazy: you should only call
each
ortoArray
when you absolutely need the data right now.The standard way of asynchronously mapping a stream is
flatMap
. It's likemap
, but the function you give it should return a stream. The stream you get fromflatMap
is the concatenation of all the returned streams. Because the new stream depends on all the old streams in order, it can be used to sequence asynchronous process.I'd modify your example to the following (clarified some variable names):
Let's take a walk though the types in this code. First, note that
flatMap
has type (in Haskellish notation)Stream a → (a → Stream b) → Stream b
, i.e. it takes a stream containing some things of typea
, and a function expecting things of typea
and returning streams containingb
s, and returns a stream containingb
s. It's standard for collection types (such as stream and array) to implementflatMap
as concatenating the returned collections.Let's say this has type
Stream Directory
. Thefilter
doesn't change the type, so theflatMap
will beStream Directory → (Directory → Stream b) → Stream b
. We'll see what the function returns:Call this a
Stream File
, so the secondflatMap
isStream File → (File → Stream b) → Stream b
.This is a
Stream String
.split
,takeUntil
andlast
don't change that, so what does themap
do?map
is very similar toflatMap
: its type isStream a → (a → b) → Stream b
. In this casea
isString
andb
is an object type{name : String, comment : String}
. Thenmap
returns a stream of that object, which is what the overallflatMap
function returns. Step up, andb
in the secondflatMap
is the object, so the firstflatMap
's function also returns a stream of the object, so the entire stream is aStream {name : String, comment : String}
.Note that because of Highland's laziness, this doesn't actually start any streaming or processing. You need to use
each
ortoArray
to cause athunk
and start the pipeline. Ineach
, the callback will be called with your object. Depending on what you want to do with the comments, it might be best toflatMap
some more (if you're writing them to a file for example).Well, I didn't mean to write an essay. Hope this helps.