All embedded kafka tests in one kafka broker, one test context

681 Views Asked by At

I have spring application with some embedded kafka tests @EmbeddedKafka. For each test I need a separate topic.

@ExtendWith(SpringExtension::class)
@SpringBootTest(
    classes = [
        Application::class
    ]
)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@EmbeddedKafka(
    partitions = 1,
    topics = [
        "topicName"
    ]
)
@ActiveProfiles("test")
@Import(value = [ApplicationListenerCaseTest.TestConfig::class])
class ApplicationListenerCaseTest {

    @Autowired
    private lateinit var testKafkaTemplate: KafkaTemplate<String, String>

    @Autowired
    private lateinit var kafkaListenerEndpointRegistry: KafkaListenerEndpointRegistry

    @Autowired
    private lateinit var kafkaEmbeddedKafka: EmbeddedKafkaBroker

    override fun getTestKafkaTemplate() = testKafkaTemplate

    @BeforeEach
    fun setup() {
        for (msgListenerContainer in kafkaListenerEndpointRegistry.listenerContainers) {
            ContainerTestUtils.waitForAssignment(msgListenerContainer, kafkaEmbeddedKafka.partitionsPerTopic)
        }
    }

    @Test
    fun test() {
         // ...     
    }

    @TestConfiguration
    class TestConfig {

        @Bean
        @Primary
        fun testKafkaTemplate(
            @Value("topicName") topic: String,
            broker: EmbeddedKafkaBroker
        ) = KafkaTemplate(DefaultKafkaProducerFactory<String, String>(KafkaTestUtils.producerProps(broker)))
            .apply { defaultTopic = topic }
    }
}

There are several such tests and they take a very long time.I think the long execution is due to the fact that a separate kafka broker is created for each test.

Is there a way to run all tests in one broker and to avoid wasting time creating a new context for every test?

2

There are 2 best solutions below

2
Gary Russell On

See the documentation: https://docs.spring.io/spring-kafka/docs/current/reference/html/#same-broker-multiple-tests

You can use the same broker for multiple test classes with something similar to the following:

public final class EmbeddedKafkaHolder {

    private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, false)
            .brokerListProperty("spring.kafka.bootstrap-servers");

    private static boolean started;

    public static EmbeddedKafkaBroker getEmbeddedKafka() {
        if (!started) {
            try {
                embeddedKafka.afterPropertiesSet();
            }
            catch (Exception e) {
                throw new KafkaException("Embedded broker failed to start", e);
            }
            started = true;
        }
        return embeddedKafka;
    }

    private EmbeddedKafkaHolder() {
        super();
    }

}

This assumes a Spring Boot environment and the embedded broker replaces the bootstrap servers property.

Then, in each test class, you can use something similar to the following:

static {
    EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}

private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();

...

The preceding example provides no mechanism for shutting down the broker(s) when all tests are complete. This could be a problem if, say, you run your tests in a Gradle daemon. You should not use this technique in such a situation, or you should use something to call destroy() on the EmbeddedKafkaBroker when your tests are complete.

0
user1801374 On

If you are using Spring for Apache Kafka 3.0 or later you can use

GlobalEmbeddedKafkaTestExecutionListener

see also Using the Same Broker(s) for Multiple Test Classes

Starting with version 3.0, the framework exposes a GlobalEmbeddedKafkaTestExecutionListener for the JUnit Platform; it is disabled by default. This requires JUnit Platform 1.8 or greater. The purpose of this listener is to start one global EmbeddedKafkaBroker for the whole test plan and stop it at the end of the plan. To enable this listener, and therefore have a single global embedded Kafka cluster for all the tests in the project, the spring.kafka.global.embedded.enabled property must be set to true via system properties or JUnit Platform configuration.

Here is an example. Substitute with the name of your Spring-boot application class if running Spring-boot and other dependencies for your build system.

When running your tests look for the below lines in the test output to verify that it indeed uses a single global Kafka broker for all tests.

11:27:22.676 [Test worker] INFO org.springframework.kafka.test.junit.GlobalEmbeddedKafkaTestExecutionListener -- Started global Embedded Kafka on: 127.0.0.1:62897

2023-12-05T11:27:34.767+01:00 INFO 24820 --- [ Test worker] GlobalEmbeddedKafkaTestExecutionListener : Stopped global Embedded Kafka.

build.gradle:

...
dependencies {
    ...
    implementation 'org.springframework.kafka:spring-kafka'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
    ...
}
...

.../src/test/resources/junit-platform.properties:

spring.kafka.global.embedded.enabled=true

.../src/test/resources/application.yaml:

spring:
  ...
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: fxs
test:
  topic: embedded-test-topic
...

EmbeddedKafkaIntegrationTest.java:

@SpringBootTest(classes = <Application>.class)
class EmbeddedKafkaIntegrationTest {

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @Value("${test.topic}")
    private String topic;

    @Test
    public void givenEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived() throws Exception {
        String data = "Sending with our own simple KafkaProducer";

        producer.send(topic, data);

        boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
        Assertions.assertTrue(messageConsumed);
        MatcherAssert.assertThat(consumer.getPayload(), Matchers.containsString(data));
    }
}

KafkaConsumer.java:

@Component
public class KafkaConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    private CountDownLatch latch = new CountDownLatch(1);
    private String payload;

    @KafkaListener(topics = "${test.topic}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("received payload='{}'", consumerRecord.toString());
        payload = consumerRecord.toString();
        latch.countDown();
    }

    public void resetLatch() {
        latch = new CountDownLatch(1);
    }

    public CountDownLatch getLatch() {
        return latch;
    }

    public String getPayload() {
        return payload;
    }
}

KafkaProducer.java:

@Component
public class KafkaProducer {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String payload) {
        LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
        kafkaTemplate.send(topic, payload);
    }
}