fail to consume kafka events with nestjs

3.8k Views Asked by At

I wanted to consume Kafka messages and thus I've invoked the emit() message first which successfully stored a message in Kafka.

I can see that by running the batch file: bin/windows/kafka-console-consumer.bat --topic signals --from-beginning --bootstrap-server localhost:9092

and the new message is logged. {"botId":"TKS","emailToken":"38fhsf29h","pair":"xy","size":100}

This is my nodejs/nestjs service file. What am I missing in my script - since the same script writes successfully to Kafka? I've tried @MessagePattern and @EventPattern to consume the message. Any ideas?

import { Injectable, OnModuleInit } from '@nestjs/common';
import { TVSignal } from './tvsignal';
import { Client, ClientKafka, EventPattern, MessagePattern, Payload, Transport } from "@nestjs/microservices";
import { Logger } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';

@Injectable()
export class AppService implements OnModuleInit {

  constructor(private eventEmitter: EventEmitter2) {}

  @Client({
    transport: Transport.KAFKA,
    options: {
      client: {
        clientId: 'trading-signal-receiver',
        brokers: ['localhost:9092'],
      },
      consumer: {
        groupId: 'signals',
        allowAutoTopicCreation: true
      },
      producer: {
      }
    }
  })
  kafkaClient: ClientKafka;


  async onModuleInit() {
    this.kafkaClient.subscribeToResponseOf('signals');
    await this.kafkaClient.connect();
    Logger.log("consumer assignments: " + JSON.stringify(this.kafkaClient.getConsumerAssignments()))

  }
  
  // DOES NOT WORK
  @EventPattern('signals')
  async handleEntityCreated(payload: any) {
    Logger.log("RECEIVED NEW: "+ JSON.stringify(payload));
  }

  getTradingSignals(): string {
    return ""
  }

  // DOES NOT WORK
  @MessagePattern('signals') // Our topic name
  getMessage(@Payload() message) {
    Logger.log("RECEIVED KAFKA MSG" + message.value);
    return 'Hello World';
  }


    // WORKS
    storeSignal(signal: TVSignal){
    Logger.log("STORED: " + JSON.stringify(signal))  
    Logger.log("consumer assignments: " + JSON.stringify(this.kafkaClient.getConsumerAssignments()))
    // send message to OnEvent
    this.eventEmitter.emit('signal.saved', signal);
    // store obj in kafka
    return this.kafkaClient.emit('signals', signal); // args - topic, message
    
  }

  

}
2

There are 2 best solutions below

1
feder On BEST ANSWER

There reason why I could not consume them, was because I missed out on calling app.startAllMicroservices() (KAFKA is one of them).

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { microserviceConfig} from "./msKafkaConfig";

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  app.enableCors();   
  app.connectMicroservice(microserviceConfig);
  await app.startAllMicroservices();
  
  await app.listen(3000);



}
bootstrap();
0
Maximiliano Olivero On

Here is the KafkaOptions you can attach to the connection in app instance. Basically we set the configuration for the consumer as microservice.

import { Transport, KafkaOptions } from '@nestjs/microservices';

const {
  streaming_kafka_msk_notification_bootstrap_brokers: brokerNotification,
} = process.env;

export const KAFKA = {
  topic: process.env.streaming_kafka_msk_notification_topic_name,
  broker: process.env.streaming_kafka_msk_notification_bootstrap_brokers,
};

export const microServiceKafka: KafkaOptions = {
  transport: Transport.KAFKA,
  options: {
    consumer: {
      groupId: 'notifications-ms',
    },
    client: {
      brokers: brokerNotification.split(','),
    },
    subscribe: {
      fromBeginning: true,
    },
  },
};