Error encountering: "[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: Client network socket disconnected before secure TLS connection was established","retryCount":0,"retryTime":301}"
Kafka is working fine, I'm able to use producer with no error, but when I start executing inside loop for some testing purposes, I encounter the problem stated above.
And here is another thing, when i remove the producer.disconnect and run again (inside loop again), I got no error.
Can someone explain to me why it is happening ?
For more better understading, here is my code:
Producer:
const { Kafka, Partitioners } = require('kafkajs');
const logger = require('../logger');
class Producer {
#config;
#producerInstance;
/**
* Kafka Config Object (Producer)
* @type {
* {
* topic: string,
* kafka: object,
* producer: object
* producerSendOptions: object
* }
* }
*/
constructor(kafkaConfig = {}) {
this.#config = kafkaConfig;
this.#producerInstance = this.#initProducer();
}
#initProducer() {
const kafka = new Kafka(this.#config.kafka);
const producer = kafka.producer({
...this.#config?.producer,
createPartitioner: Partitioners.DefaultPartitioner,
});
return producer;
}
async produceEvent(events = {}) {
try {
await this.#producerInstance.connect();
await this.#producerInstance.send({
topic: this.#config.topic,
acks: -1,
messages: [events],
});
await this.#producerInstance.disconnect();
return events;
} catch (error) {
logger.error(error.message);
}
}
}
module.exports = {
Producer,
};
Somewhere in other files, I'm calling above class like this:
Class TestProducer {
#sampleProducer;
constructor(){
this.#sampleClient = new SampleClient();
}
async testFunction() {
const sampleMessage = // any message i want to produce
const iterations = Array.from({ length: 5 });
for (const _ of iterations) {
await this.SampleClient.produceEvent(sampleMessage);
}
}
}
SampleClient file:
const { Producer } = require('../common/kafka/kafka-producer');
const { CONFIG_KAFKA } = require('../common/kafkaconfig');
class SampleClient extends Producer {
constructor(config = CONFIG_KAFKA) {
super(config);
}
}
module.exports = { SampleClient };
Before posting this, I already tried/test multiple times, and it is consistent it is not just intermittent.
- With no loop execution (one time only), I got no error
- Inside loop, i got error stated above
- Inside loop but i comment the producer.disconnect, I got no error