Why does my RxJS Observable chain not re-execute upon subsequent emissions?

32 Views Asked by At

I am working with RxJS in an Angular service, and I've encountered a peculiar issue with one of my observable chains. Specifically, I have two observable chains that are intended to enrich a stream of notifications with user data. The first observable chain logs and maps the results correctly and can be triggered multiple times without issue. However, the second observable chain, which includes a concatMap operator to combine and enrich notifications with user data, only executes once and does not respond to subsequent emissions from the source observable.

Here's the observable chain that works as expected and allows for repeated executions:

private enriched_sns$ = this.sn_results$.asObservable().pipe(
  tap((results) => console.log('%cResults', 'color: cyan', results)),
  concatMap((results) => {
    console.log('before', results);
    return results;
  }),
  tap((results) => console.log('after', results)),
  map((results) => results),
);

And here's the problematic observable chain that only runs once:

private enriched_sns$ = this.sn_results$.asObservable().pipe(
  tap((results) => console.log('%cResults', 'color: cyan', results)),
  concatMap((results) => {
    console.log('before', results);
    const enrichedNotifications$ = zip(
      results.map((result: StreamNotificationResult) => {
        return this.read_user_data_service.readUserData(result.activities[0].actor.id).pipe(
          map((user) => {
            const enriched_notification: StreamEnrichedNotification = {
                user,
                notification: result,
            };
            return enriched_notification;
          }),
        );
      }),
    );
    // Combine the original results and the enriched notifications
    return combineLatest([of(results), enrichedNotifications$]);
  }),
  tap((results) => console.log('after', results)),
  map((results) => results),
);

I suspect the issue might be related to how I'm using concatMap or possibly how the observables are combined and returned, but I'm not sure why it's only executing once and not reacting to subsequent emissions from this.sn_results$.

Has anyone encountered a similar issue or can spot what might be causing this behavior? Any insights or suggestions on how to ensure the second observable chain reacts to every emission from the source observable would be greatly appreciated.

1

There are 1 best solutions below

1
syahiruddin On

I discovered that the reason why readUserData() was not returning anything on the next emission is because I was using combineLatest() instead of zip(). After switching to zip(), I was able to receive the data on the second emission because zip() waits for all observables to complete before returning the result.

private enriched_sns$ = this.sn_results$.asObservable().pipe(
  tap((results) => console.log('%cResults', 'color: cyan', results)),
  concatMap((results) => {
    console.log('before', results);
    const enrichedNotifications$ = zip(
      results.map((result: StreamNotificationResult) => {
        return this.read_user_data_service.readUserData(result.activities[0].actor.id).pipe(
          map((user) => {
            const enriched_notification: StreamEnrichedNotification = {
                user,
                notification: result,
            };
            return enriched_notification;
          }),
        );
      }),
    );
    // Combine the original results and the enriched notifications
    return zip([of(results), enrichedNotifications$]); // Now it will wait for all observable to complete
  }),
  tap((results) => console.log('after', results)),
  map((results) => results),
);