I'm working on a application that uses KafkaJS for message processing. Currently, when an error occurs during message processing, I use process.exit to terminate the process. However, this approach stops the whole process and the service wont restart itself to try to handle that message again or skips the remaining messages and doesn't retry the failed message before committing the offset.
I'm looking for a way to implement a retry mechanism in my KafkaJS consumer. The goal is to retry processing a message a certain number of times before committing its offset. If all retries fail, I would then handle the error accordingly (probably by loggging).
This is an example.
const { Kafka } = require('kafkajs');
const kafka = new Kafka({ /* Kafka configuration */ });
const consumer = kafka.consumer({ /* Consumer configuration */ });
consumer.subscribe({ /* Topic configuration */ });
await this.consumer.run({
eachMessage: async ({ message }) => {
try {
// Message processing logic
} catch (e: unknown) {
logger.info(e);
await this.consumer.disconnect();
process.exit();
}
},
});
Ive tried with catching the NumberOfRetriesExceed kafkajs error but that is only for specific kafkajs errors and not application related errors.
I think the best way is maybe to implement a DLQ but I'm not sure.