I have an async function creating a Request to an external stream; it needs to download items from the source and store them in my database. It works fine the first time when my web-server is initiated but following calls to the endpoint do not create a new readable stream - as if it knows it's been called previously. But shouldn't a function call create/invoke the code again with no context of times that function was called?
This below is my function that is triggered by calling my API-endpoint. It works perfectly the first time I call it. I expected it to create new instances of the request and stream the data again on following function calls but it did not.
const requestItems = async (req: Request, res: Response) => {
const stream = request({ url: `${baseUrl}/${endpoint}?${query.toString()}` })
const parser = JSONStream.parse('*', modifyItem)
stream.pipe(parser)
res.sendStatus(200)
parser.on('data', async (ad) => {
stream.pause()
console.log("Inside")
await writeToDatabase()
return stream.resume()
})
stream.on('complete', async () => {
await writeToDatabase()
console.log('Stream completed')
})
I am not using fetch to make a HTTP-request but importing request (which uses the through-package). My imports:
import request from 'request'
import { Request, Response } from 'express'
import JSONStream from 'JSONStream'
On first function call to requestItems it steps into parser.on('data') and console.logs "inside". On following function calls it does not steps into parser.on('data') which I expected it to do.
I understand that a stream can only be read once, but I thought that meant once per invocation. What do I need to change for the request to be called on each function call?
First time I call it, this is what the stream request outputs:
{
_events: [Object: null prototype] { pipe: [Function (anonymous)] },
_eventsCount: 1,
_maxListeners: undefined,
readable: true,
writable: true,
_qs: Querystring {
request: [Circular *1],
lib: {
formats: [Object],
parse: [Function (anonymous)],
stringify: [Function (anonymous)]
},
useQuerystring: undefined,
parseOptions: {},
stringifyOptions: {}
},
_auth: Auth {
request: [Circular *1],
hasAuth: false,
sentAuth: false,
bearerToken: null,
user: null,
pass: null
},
_oauth: OAuth { request: [Circular *1], params: null },
_multipart: Multipart {
request: [Circular *1],
boundary: 'ef9ad6e3-446a-4611-aa1b-ccfde61ac033',
chunked: false,
body: null
},
_redirect: Redirect {
request: [Circular *1],
followRedirect: true,
followRedirects: true,
followAllRedirects: false,
followOriginalHttpMethod: false,
allowRedirect: [Function (anonymous)],
maxRedirects: 10,
redirects: [],
redirectsFollowed: 0,
removeRefererHeader: false
},
_tunnel: Tunnel {
request: [Circular *1],
proxyHeaderWhiteList: [
'accept', 'accept-charset',
'accept-encoding', 'accept-language',
'accept-ranges', 'cache-control',
'content-encoding', 'content-language',
'content-location', 'content-md5',
'content-range', 'content-type',
'connection', 'date',
'expect', 'max-forwards',
'pragma', 'referer',
'te', 'user-agent',
'via'
],
proxyHeaderExclusiveList: []
},
headers: { host: 'x.api.y.com' },
setHeader: [Function (anonymous)],
hasHeader: [Function (anonymous)],
getHeader: [Function (anonymous)],
removeHeader: [Function (anonymous)],
method: 'GET',
localAddress: undefined,
pool: {},
dests: [],
__isRequestRequest: true,
uri: Url {
protocol: 'https:',
slashes: true,
auth: null,
host: 'x.api.y.com',
port: 443,
hostname: 'x.api.y.com',
hash: null,
search: '?date=2024-02-10T10%3A34%3A44',
query: 'date=2024-02-10T10%3A34%3A44',
pathname: '/stream',
path: '/stream?date=2024-02-10T10%3A34%3A44'
},
proxy: null,
tunnel: true,
setHost: true,
originalCookieHeader: undefined,
_disableCookies: true,
_jar: undefined,
port: 443,
host: 'x.api.y.com',
path: '/stream?date=2024-02-10T10%3A34%3A44',
httpModule: {
Agent: [Function: Agent],
globalAgent: Agent {
_events: [Object: null prototype],
_eventsCount: 2,
_maxListeners: undefined,
defaultPort: 443,
protocol: 'https:',
options: [Object: null prototype],
requests: [Object: null prototype] {},
sockets: [Object: null prototype] {},
freeSockets: [Object: null prototype] {},
keepAliveMsecs: 1000,
keepAlive: true,
maxSockets: Infinity,
maxFreeSockets: 256,
scheduling: 'lifo',
maxTotalSockets: Infinity,
totalSocketCount: 0,
maxCachedSessions: 100,
_sessionCache: [Object],
[Symbol(shapeMode)]: false,
[Symbol(kCapture)]: false
},
Server: [Function: Server],
createServer: [Function: createServer],
get: [Function: get],
request: [Function: request]
},
agentClass: [Function: Agent],
agent: Agent {
_events: [Object: null prototype] {
free: [Function (anonymous)],
newListener: [Function: maybeEnableKeylog]
},
_eventsCount: 2,
_maxListeners: undefined,
defaultPort: 443,
protocol: 'https:',
options: [Object: null prototype] {
keepAlive: true,
scheduling: 'lifo',
timeout: 5000,
noDelay: true,
path: null
},
requests: [Object: null prototype] {},
sockets: [Object: null prototype] {},
freeSockets: [Object: null prototype] {},
keepAliveMsecs: 1000,
keepAlive: true,
maxSockets: Infinity,
maxFreeSockets: 256,
scheduling: 'lifo',
maxTotalSockets: Infinity,
totalSocketCount: 0,
maxCachedSessions: 100,
_sessionCache: { map: {}, list: [] },
[Symbol(shapeMode)]: false,
[Symbol(kCapture)]: false
},
[Symbol(shapeMode)]: false,
[Symbol(kCapture)]: false
}
Next times I call it, this is what the stream request outputs:
{
_events: [Object: null prototype] { pipe: [Function (anonymous)] },
_eventsCount: 1,
_maxListeners: undefined,
readable: true,
writable: true,
_qs: Querystring {
request: [Circular *1],
lib: {
formats: [Object],
parse: [Function (anonymous)],
stringify: [Function (anonymous)]
},
useQuerystring: undefined,
parseOptions: {},
stringifyOptions: {}
},
_auth: Auth {
request: [Circular *1],
hasAuth: false,
sentAuth: false,
bearerToken: null,
user: null,
pass: null
},
_oauth: OAuth { request: [Circular *1], params: null },
_multipart: Multipart {
request: [Circular *1],
boundary: '02ace725-0e14-425d-8f21-f69e9835eef1',
chunked: false,
body: null
},
_redirect: Redirect {
request: [Circular *1],
followRedirect: true,
followRedirects: true,
followAllRedirects: false,
followOriginalHttpMethod: false,
allowRedirect: [Function (anonymous)],
maxRedirects: 10,
redirects: [],
redirectsFollowed: 0,
removeRefererHeader: false
},
_tunnel: Tunnel {
request: [Circular *1],
proxyHeaderWhiteList: [
'accept', 'accept-charset',
'accept-encoding', 'accept-language',
'accept-ranges', 'cache-control',
'content-encoding', 'content-language',
'content-location', 'content-md5',
'content-range', 'content-type',
'connection', 'date',
'expect', 'max-forwards',
'pragma', 'referer',
'te', 'user-agent',
'via'
],
proxyHeaderExclusiveList: []
},
headers: { host: 'x.api.y.com' },
setHeader: [Function (anonymous)],
hasHeader: [Function (anonymous)],
getHeader: [Function (anonymous)],
removeHeader: [Function (anonymous)],
method: 'GET',
localAddress: undefined,
pool: {},
dests: [],
__isRequestRequest: true,
uri: Url {
protocol: 'https:',
slashes: true,
auth: null,
host: 'x.api.y.com',
port: 443,
hostname: 'x.api.y.com',
hash: null,
search: '?date=2024-02-10T10%3A37%3A53',
query: 'date=2024-02-10T10%3A37%3A53',
pathname: '/stream',
path: '/stream?date=2024-02-10T10%3A37%3A53',
href: 'https://x.api.y.com/stream?date=2024-02-10T10%3A37%3A53'
},
proxy: null,
tunnel: true,
setHost: true,
originalCookieHeader: undefined,
_disableCookies: true,
_jar: undefined,
port: 443,
host: 'x.api.y.com',
path: '/stream?date=2024-02-10T10%3A37%3A53',
httpModule: {
Agent: [Function: Agent],
globalAgent: Agent {
_events: [Object: null prototype],
_eventsCount: 2,
_maxListeners: undefined,
defaultPort: 443,
protocol: 'https:',
options: [Object: null prototype],
requests: [Object: null prototype] {},
sockets: [Object: null prototype] {},
freeSockets: [Object: null prototype] {},
keepAliveMsecs: 1000,
keepAlive: true,
maxSockets: Infinity,
maxFreeSockets: 256,
scheduling: 'lifo',
maxTotalSockets: Infinity,
totalSocketCount: 0,
maxCachedSessions: 100,
_sessionCache: [Object],
[Symbol(shapeMode)]: false,
[Symbol(kCapture)]: false
},
Server: [Function: Server],
createServer: [Function: createServer],
get: [Function: get],
request: [Function: request]
},
agentClass: [Function: Agent],
agent: Agent {
_events: [Object: null prototype] {
free: [Function (anonymous)],
newListener: [Function: maybeEnableKeylog]
},
_eventsCount: 2,
_maxListeners: undefined,
defaultPort: 443,
protocol: 'https:',
options: [Object: null prototype] {
keepAlive: true,
scheduling: 'lifo',
timeout: 5000,
noDelay: true,
path: null
},
requests: [Object: null prototype] {},
sockets: [Object: null prototype] {},
freeSockets: [Object: null prototype] {},
keepAliveMsecs: 1000,
keepAlive: true,
maxSockets: Infinity,
maxFreeSockets: 256,
scheduling: 'lifo',
maxTotalSockets: Infinity,
totalSocketCount: 0,
maxCachedSessions: 100,
_sessionCache: { map: [Object], list: [Array] },
[Symbol(shapeMode)]: false,
[Symbol(kCapture)]: false
},
[Symbol(shapeMode)]: false,
[Symbol(kCapture)]: false
}