I am working with RxJS in an Angular service, and I've encountered a peculiar issue with one of my observable chains. Specifically, I have two observable chains that are intended to enrich a stream of notifications with user data. The first observable chain logs and maps the results correctly and can be triggered multiple times without issue. However, the second observable chain, which includes a concatMap operator to combine and enrich notifications with user data, only executes once and does not respond to subsequent emissions from the source observable.
Here's the observable chain that works as expected and allows for repeated executions:
private enriched_sns$ = this.sn_results$.asObservable().pipe(
tap((results) => console.log('%cResults', 'color: cyan', results)),
concatMap((results) => {
console.log('before', results);
return results;
}),
tap((results) => console.log('after', results)),
map((results) => results),
);
And here's the problematic observable chain that only runs once:
private enriched_sns$ = this.sn_results$.asObservable().pipe(
tap((results) => console.log('%cResults', 'color: cyan', results)),
concatMap((results) => {
console.log('before', results);
const enrichedNotifications$ = zip(
results.map((result: StreamNotificationResult) => {
return this.read_user_data_service.readUserData(result.activities[0].actor.id).pipe(
map((user) => {
const enriched_notification: StreamEnrichedNotification = {
user,
notification: result,
};
return enriched_notification;
}),
);
}),
);
// Combine the original results and the enriched notifications
return combineLatest([of(results), enrichedNotifications$]);
}),
tap((results) => console.log('after', results)),
map((results) => results),
);
I suspect the issue might be related to how I'm using concatMap or possibly how the observables are combined and returned, but I'm not sure why it's only executing once and not reacting to subsequent emissions from this.sn_results$.
Has anyone encountered a similar issue or can spot what might be causing this behavior? Any insights or suggestions on how to ensure the second observable chain reacts to every emission from the source observable would be greatly appreciated.
I discovered that the reason why
readUserData()was not returning anything on the next emission is because I was usingcombineLatest()instead ofzip(). After switching tozip(), I was able to receive the data on the second emission becausezip()waits for all observables to complete before returning the result.