Dynamic piping with FRP

316 Views Asked by At

Consider a problem:

  • split file by lines
  • write lines to a result file
  • if a result file exceeds some size create a new result file

For example, if I have a file which weights 4gb and split size is equal 1gb. The result is four files weights 1gb.

I'm looking for a solution with something like Rx*/Bacon or any other similar library in any language.

1

There are 1 best solutions below

0
On

My solution in Coffee with Highland.js:

_ = require('underscore')
H = require('highland')
fs = require('fs')
debug = require('debug')
log = debug('main')
assert = require('assert')

readS = H(fs.createReadStream('walmart.dump')).map((buffer) ->
  { buffer: buffer }
)
MAX_SIZE = 10 ** 7
counter = 0
nextStream = ()->
  stream = fs.createWriteStream("result/data#{counter}.txt")
  wrapper = H.wrapCallback(stream.write.bind(stream))
  counter += 1
  return wrapper


debug('profile')('start')
s = readS.scan({
    size: 0
    stream: nextStream()
  }, (acc, {buffer}) ->
  debug('scan')(acc, buffer)
  acc.size += buffer.length
  acc.buffer = buffer
  if acc.size > MAX_SIZE
      debug('notify')(counter - 1, acc.size)
      acc.size = 0
      acc.stream = nextStream()
  log(acc)
  return acc
).filter((x)->x.buffer?)

s.parallel 4
s.flatMap((x) ->
  debug('flatMap')(x)
  x.stream(x.buffer)
)
.done -> debug('profile')('finish')

walmart.dump is a text file which contains 6gb of text. Splitting for 649 files takes:

  profile start +0ms
  profile finish +53s