I am using Mongoose version 7.1.0 in a Node.js application. I have set up a change stream on a MongoDB collection to listen for insert operations. When an insert operation occurs, I send a notification to certain users.
Here is the relevant part of my code:
let resumeTokenRis;
const ris = mongoose.connection.db.collection("ris");
const changeStreamRis = ris.watch([], { resumeAfter: resumeTokenRis });
changeStreamRis.on("change", async (change) => {
if (change.operationType === "insert") {
resumeTokenRis = change._id;
console.log("RI Inserted");
const ri = change.fullDocument;
let userIds = ri.issueStat.taggedUser?.map(userId => userId.toString()) || [];
userIds = [...new Set(userIds)].map(id => new ObjectId(id));
const userTokens = await NotificationToken.find({ userId: { $in: userIds } });
const user = await User.findById(ri.user);
let tokens = userTokens.flatMap(userToken => userToken.tokens);
if (tokens.length === 0) {
console.log("No tokens found for this user.");
return;
}
const message = {
notification: {
title: `New RI Assignment`,
body: `Assigned by ${user.name} with ${ri.priority} priority.`,
},
data: {
id: `${ri.refId}`,
type: "RI",
},
tokens: tokens,
};
admin.messaging().sendEachForMulticast(message)
.then((response) => {
console.log(response);
response.responses.forEach((res, index) => {
if (!res.success) {
const failedToken = tokens[index];
console.log(`Failed token ID: ${failedToken}`);
// Delete outdated token
NotificationToken.updateOne(
{ userId: { $in: userIds } },
{ $pull: { tokens: failedToken } }
)
.then(() => console.log(`Token ${failedToken} deleted successfully`))
.catch((error) => console.error(`Failed to delete token ${failedToken}: ${error}`));
}
});
})
.catch((error) => {
console.log(error);
});
}
});
The problem I'm facing is that the change stream sometimes stops listening for changes. I suspect this might be due to inactivity, but I'm not sure. There are no error messages in my console when this happens.
I want the change stream to keep running continuously, even if there are no changes for a long time. How can I keep the change stream continuously running? Any help would be appreciated.