Ngx-Mqtt for Angular - client connection to broker breaks from time to time. Browser refresh required

37 Views Asked by At

Our project is at Angular 16, with npm package "ngx-mqtt": "^16.1.0".

We have an admin screen which auto-connects to the HiveMq Mqtt broker in the constructor:

constructor(
    private readonly mqttNotificationsService: MqttNotificationsService,
    private readonly config: AppConfigService,
) {
    if (this.config.getConfig('mqttMessagesEnabled')) {
        this.mqttNotificationsService.startConnection();
    }        
}

But from time to time this connection breaks. That is, my notifications icon just shows 0 messages - and the only resolution is to hard refresh the browser. After this refresh, my browser client successfully reconnects to the MQTT Broker - and the latest messages are retrieved.

mqtt client conn

Here are the startConnection and restartMqtt methods, as well as the hubListener func:

startConnection(): void {
      if (!this.config.getConfig('mqttMessagesEnabled')) {
          return;
      }

      this.topicsService
          .getTopics(-1, '')
          .pipe(
              tap((topics) => {
                  this.topics = topics;
              }),
          )
          .subscribe({
              next: (_) => {
                  this.connection = this.getBrokerConnection();
                  try {
                      this.mqttService.connect(this.connection);
                      this.hubListener();
                  } catch (error) {
                      console.log('mqtt.connect error', error);
                  }
              },
          });
  }

  restartMqtt(): void {
      this.destroyConnection();
      timer(1000).pipe(
          tap((_) => {
              this.startConnection();
          }),
      ).subscribe();
  }
  
  destroyConnection() {
    this.mqttConnected = false;
    if (this.subscriptions) {
        this.subscriptions.forEach((sub) => sub.unsubscribe());
    }
    this.subscriptions = [];
    this.subscribeMultiple = [];
    try {
        this.mqttService?.disconnect(true);
    } catch (error) {
        console.log('Disconnect failed', error.toString());
    }
}

  // ON CONNECT!
    public hubListener = () => {
        this.mqttService.onConnect
            .pipe(
                tap((conn) => {
                    // see mqtt-packet interface IConnackPacket
                    console.log(`Mqtt OnConnect returned: ${conn.cmd}`); // 'CONNACK' = Conn Acknowledgement
                    this.mqttConnected = true;
                    this.subscribeToTopics();
                }),
            )
            .subscribe();
        this.mqttService.onClose.subscribe((_) => (this.mqttConnected = false));
        this.mqttService.onError
            .pipe(
                tap((result) => {
                    console.log('Mqtt onError event fired: ', result);
                }),
            )
            .subscribe();
        this.mqttService.onOffline
            .pipe(
                tap((result) => {
                    this.mqttConnected = false;
                    console.log('Mqtt onOffline event fired: ', result);
                }),
            )
            .subscribe();
        this.mqttService.onMessage // Broker published a msg; cmd: 'publish'
            .subscribe();
        this.mqttService.onSuback // Subscribe Acknowledgement to Mqtt topic
            .subscribe();
        this.mqttService.onPacketsend.subscribe(); // resulting from a 'pingreq' packet to broker
    };

It's a bit difficult to reproduce. But at some point my broker connection appears to be broke. That is, even if I switch pages and come back - I still cannot retrieve the latest messages.

Thank you.

1

There are 1 best solutions below

0
bob.mazzo On

I ended up moving the hubListener call out of startConnection into the constructor(). This way if this.mqttService.onConnect gets fired multiple times (for what reason I don't know), then at least hubListener won't get called again.

All is good now, and my mqtt notifications are always present.

    constructor(
        public mqttService: MqttService,
        private readonly topicsService: MqttTopicsService,
        private readonly toastr: ToastrService,
        private readonly configService: ConfigService,
    ) {

        this.configService.mqttNotification$ = this.mqttReceivedSubject.asObservable();
        this.configService.siteConfigMonitoringEvent$.pipe(
            filter((monitoringEnabled) => monitoringEnabled),
            tap((enabled) => {
                this.siteConfigMonitoringEnabled = enabled;
                this.restartMqtt();
            }),

        ).subscribe();

        this.hubListener(); // Now my onConnect event will be setup only once !

    }