I am facing an issue with my Spring Boot application (version 3.2.1) and Embedded Kafka. The problem is that the KafkaTemplate is not successfully sending messages
Here is my test code
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
@ExtendWith(MockitoExtension.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }, controlledShutdown = true)
public class PaymentITest {
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Value("${demo.kafka.topic.payment}")
private String topicName;
@Test
void test() {
String jsonPayload = "{ \"payload\": { \"payment_id\": \"1\" } }";
kafkaTemplate.send(topicName, "key", jsonPayload);
}
}
**The Kafka Consumer **
import fr.test.test.compute.common.event.PaymentEvent;
import fr.test.test.compute.core.domain.port.api.PaymentRequester;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaPaymentConsumer {
private final PaymentRequester paymentRequester;
@KafkaListener(topics = "#{'${demo.kafka.topic.payment}'}",
groupId = "#{'${demo.kafka.group-id}'}")
public void consumePaymentEvents(PaymentEvent paymentEvent) {
paymentRequester.handlePaymentReceivedEvent(paymentEvent.extractModel());
}
}
test/resources/application.yml
spring:
mongodb:
embedded:
storage:
oplogSize: 10
repl-set-name: rs0
version: "5.0.5"
kafka:
consumer:
bootstrap-servers: localhost:9092
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
group-id: kafka-group-id
auto-offset-reset: earliest
enable-auto-commit: false
properties:
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: fr.test.test.compute.config.kafka.KafkaEventDeserializer
producer:
bootstrap.servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: fr.test.test.compute.config.kafka.KafkaEventSerializer
de:
flapdoodle:
mongodb:
embedded:
version: 4.0.2
demo:
kafka:
topic:
payment: payment_topic2
group-id: kafka-group
I already tested with a local kafka broker, the consumer works well !
but when running the test, I added a breakpoint in the consumer, and this last is not being hit ! what could be causing the KafkaTemplate not to send messages (as shown in the picture below) ? Is it an issue related to Kafka Embedded in SpringBoot 3
Here is the Github repository for the code base https://github.com/smaillns/springboot-mongo-kafka
Any suggestions would be appreciated ?
java.util.concurrent.CompletableFuture@bfc918c[Not completed]

Have you tried to comment out
bootstrap-servers: localhost:9092in test resourcesapplication.yml?. I have had similar problem and this helped me, otherwise my app was trying to call my Kafka in Docker.