Monitoring RxJS subscriptions application wide

40 Views Asked by At

I'm searching for a way to monitor RxJS subscriptions to detect memory leaks in our application. The application is run in a browser.

My approach so far was to wrap the prototypes methods Observable.subscribe and Subscription.unsubscribe but that does not work. The counter is not accurate. I guess, unsubscribe is called several times in some cases. The counter shows negative numbers.

Are you aware of any library, hack or method to monitor RxJS subscriptions application wide without altering each subscription in the productive code? A typescript transformer would also be an option.

My first approach was this here:


  let subscriptionCounter = 0

  Object.defineProperty(Observable.prototype, '__subscribe', { value: Observable.prototype.subscribe })

  Object.defineProperty(Observable.prototype, 'subscribe', { value: function <T> (observerOrNext?: Partial<Observer<T>> | ((value: T) => void)): Subscription {
    subscriptionCounter++
    console.log(subscriptionCounter, 'subscribe');
    return (this as Observable<T>)['__subscribe'](observerOrNext)
  }})

  Object.defineProperty(Subscription.prototype, '__unsubscribe', { value: Subscription.prototype.unsubscribe })

  Object.defineProperty(Subscription.prototype, 'unsubscribe', { value: function() {
    subscriptionCounter--
    console.log(subscriptionCounter, 'unsubscribe');
    (this as Subscription)['__unsubscribe']()
  }})
2

There are 2 best solutions below

0
maxime1992 On

I think it should work with the following:

let subscriptionCounter = 0;

Object.defineProperty(Observable.prototype, '__subscribe', {
  value: Observable.prototype.subscribe,
});

Object.defineProperty(Observable.prototype, 'subscribe', {
  value: function <T>(
    observerOrNext?: Partial<Observer<T>> | ((value: T) => void)
  ): Subscription {
    subscriptionCounter++;
    console.log(subscriptionCounter, 'subscribe');

    const originalSubscribe = (
      this['__subscribe'] as Observable<T>['subscribe']
    ).bind(this);

    if (!observerOrNext) {
      return originalSubscribe(observerOrNext);
    }

    if (typeof observerOrNext === 'function') {
      return originalSubscribe({
        next: (v) => {
          observerOrNext(v);
        },
        complete: () => {
          subscriptionCounter--;
          console.log(subscriptionCounter, 'complete');
        },
        error: () => {
          subscriptionCounter--;
          console.log(subscriptionCounter, 'error');
        },
      });
    } else {
      return originalSubscribe({
        next: (v) => {
          observerOrNext.next?.(v);
        },
        complete: () => {
          subscriptionCounter--;
          console.log(subscriptionCounter, 'complete');
          observerOrNext.complete?.();
        },
        error: (e) => {
          subscriptionCounter--;
          console.log(subscriptionCounter, 'error');
          observerOrNext.error?.(e);
        },
      });
    }
  },
});

This ensures that you take into account complete but also error.

I have made a Stackblitz to demo this, I've made a small chain example with 3 operators:

rx(
  from(['value 1', `value 2`,`value 3`]),
  map((name) => `Hello, ${name}!`),
  repeat(2)
).subscribe(console.log);

And this gives the following output:

1 subscribe
2 subscribe
3 subscribe
Hello, value 1!
Hello, value 2!
Hello, value 3!
2 complete
1 complete
2 subscribe
3 subscribe
Hello, value 1!
Hello, value 2!
Hello, value 3!
2 complete
1 complete
0 complete

As you can see, we start at 0 and the first subscription gives a count of 1, while at the end, we get a total count of 0, as expected.

What's interesting to notice here is between log lines 7-10, after the first 3 values where logged. Because the from has sent everything at this stage, it calls complete which will have a cascade effect and close the entire stream. So first from calls the complete (so total is 2), then map (total is 1) and then we get to the retry, which reopens the streams above, so instead of having a total of 0, we start again the 2 above subscriptions, to go all the way up to a total of 3, and once that's all done, we do 3, 2, 1, 0 as expected.

Another thing to do in order to check for memory leaks is to launch your app, open the dev tools and go to the Performance tab. Then press the record button. Play with your app, just like a regular user would do and stop the recording. You'll then see something like this:

enter image description here

Here it was just on the Stackblitz page I had open but it doesn't matter. The important bits to look for here are:

  • In blue: The JS heap. It's the memory used by your app. It should increase initially of course and if you load "permanent" data into your app, for example if you make an HTTP call and cache the result forever, you shouldn't expect the heap to go down. But for everything else, when you create a new component by going somewhere and it increases because it loads some data from an HTTP call but you never save it outside of the component, then when the component is destroyed, it should go down to where it was before the component was created. Also, to make sure that it's not just memory kept around that hasn't been garbage collected by the browser just yet, you can click this button to manually launch a garbage collection:

enter image description here

  • Kind of off topic here, but you can also keep an eye on the nodes. It's the HTML nodes. If for any reason you're holding onto a node element for example in a directive and you're not cleaning up the reference when destroyed, this could lead to a memory leak that keeps old DOM nodes in memory. If you want to see a real use case where I used this feature, it was with Angular when I realized I had a DOM nodes memory leak when using a ControlValueAccessor and it was hurting my app pretty badly as the memory would increase quite quickly and slowing everything down till it crashed. See all the post I made here: https://github.com/angular/angular/issues/20007#issuecomment-682831198

If you wish to learn more about the performance panel, here's the official documentation.

0
Picci On

Consider that Subscriptions are object whose goal is to store "finalizers" and call them when the unsubscribe method is invoked on the Subscription itself.

Finalizers are function of type TeardownLogic which are returned by the function passed to the constructor of Observable when an Observable is built.

Finalizers have the goal to clean up any resource that the Observable may be holding for its correct functioning.

Subscriptions can either be "open" or "close", and such state is held in the instance variable closed of the class Subscription.

Having said that, if you want to monitor how many Subscriptions are active in you app, you have to monitor how many instances of type Subscription are not closed. This can be done by wrapping the methods and decrease the counter in unsubscribe only if the Subscription is not closed.

Consider though that you avoid memory leaks as long as your finalizers actually free up the resources held by the Observables. If the finalizers are not correct, then you end up with memory leaks even if you unsubscribe.