Unable to query more than once using streams

107 Views Asked by At

Currently I am able to retrieve a response when i curl to my endpoint, but only once. Any additional request to my server triggers a stream error: Stream already being consumed, you must either fork() or observe().

My stack: node, express, highlandjs, mongodb.

//server.js     
app.get('/queries', query.calculateTotal);

//my endpoint function

var _ = require('lodash')
var sg = require("reactive-superglue")
var query = sg.mongodb("mongodb://localhost:27017/qatrackerdb").collection("test1")

exports.calculateTotal = function (err, res) {
    query.find()
        .collect()
        .map(function(x) {
            console.log(x)
            return _.size(x)
        })
        .apply(function(x) {
            return res.status(200).json(x)
        })
}

The server response after second attempt at hitting my endpoint: curl -i -X GET http://localhost:3000/queries/

GET /queries/ 200 34.442 ms - 632
GET /queries/ - - ms - -
GET /queries/ 500 2.371 ms - 1998
Error: Stream already being consumed, you must either fork() or observe()
    at Stream._addConsumer 
1

There are 1 best solutions below

0
On BEST ANSWER

Not knowing highland.js, my guess would be that the error message is giving you the answer, use observe instead of apply. Possibly returning

query.find() .collect() .map(function(x) { console.log(x) return _.size(x) }) and having the requester of the calculateTotal function observe the return:

calculateTotal().observe([observeFunction])

This way each time you call it, you are returning the stream to consume. Right now you are consuming the stream in the function. Maybe that is why it is complaining when you call it again.