How to unit test the intercept method of a RecordIntercept interface from Kafka using Mockito

79 Views Asked by At

I have the class KafkaRecordInterceptor which implements the interface RecordInterceptor<String, Message> as follow

@Component
public class KafkaRecordInterceptor implements RecordInterceptor<String, Message<Object>> {
    public static final ThreadLocal<MessageContext> messageContextThreadLocal = new ThreadLocal<>();
    private final List<MessageContextListener> messageContextListener;
    private final MetricsUtils metricsUtils;
    private final SystemClock systemClock;
...

then I override the intercept method as below

    @Override
    public ConsumerRecord intercept(@NonNull ConsumerRecord consumerRecord, @NonNull Consumer consumer) {

        MessageContext messageContext = new MessageContext();
        messageContext.setStartClock(systemClock.now());
        messageContextThreadLocal.set(messageContext);

        Optional<MessagePriority> messagePriorityOptional =
            Objects.requireNonNull(metricsUtils.getMessagePriority(consumerRecord));

        messageContext.setCorrelationId(KafkaUtils.extractCorrelationId(consumerRecord));
        messageContext
            .setOperationType(((GenericMessage<?>) consumerRecord.value()).getPayload().getClass().getSimpleName());
        messageContext.setPriority(messagePriorityOptional.map(MessagePriority::name).orElse(""));

        messageContextListener.forEach(listener -> listener.onStart(messageContext));

        return Objects.requireNonNull(consumerRecord);
    }

I also override

    @Override
    public void afterRecord(ConsumerRecord record, Consumer consumer) {
        MessageContext messageContext = messageContextThreadLocal.get();
        messageContextListener.forEach(listener -> listener.onClear(messageContext));
        messageContextThreadLocal.remove();
    }

My question is, what is the best way to unit test the interceptor method and afterRecord ?

2

There are 2 best solutions below

0
On

First of all, I think the method of writing the test will vary depending on what the questioner wants to verify through the test. If you think of testing the KafkaRecordIntercept() method in the example, it's hard to write what you want to verify and how you want to write because there are so many things you do with it. In my opinion, why don't we implement MessageContext-related functions through a separate component class, and in the intercept() method, we just call them.

Below is an example.

@Component
@RequiredArgsConstructor
public class KafkaRecordInterceptor implements RecordInterceptor<String, Message<Object>> {

    private final MessageContextManager messageContextManager;
    private final SystemClock systemClock;

    @Override
    public ConsumerRecord<String, Message<Object>> intercept(ConsumerRecord<String, Message<Object>> consumerRecord,
                                                             Consumer<String, Message<Object>> consumer) {
        messageContextManager.manage(consumerRecord, systemClock.now());
        return Objects.requireNonNull(consumerRecord);
    }
}

@Component
@RequiredArgsConstructor
public class MessageContextManager {

    public static final ThreadLocal<MessageContext> messageContextThreadLocal = new ThreadLocal<>();
    private final List<MessageContextListener> messageContextListener;
    private final MetricsUtils metricsUtils;

    public void manage(@NonNull ConsumerRecord consumerRecord, LocalDateTime dateTime) {
        MessageContext messageContext = new MessageContext();
        messageContext.setStartClock(dateTime);
        messageContextThreadLocal.set(messageContext);

        Optional<MessagePriority> messagePriorityOptional =
            Objects.requireNonNull(metricsUtils.getMessagePriority(consumerRecord));

        messageContext.setCorrelationId(KafkaUtils.extractCorrelationId(consumerRecord));
        messageContext
            .setOperationType(((GenericMessage<?>) consumerRecord.value()).getPayload().getClass().getSimpleName());
        messageContext.setPriority(messagePriorityOptional.map(MessagePriority::name).orElse(""));

        messageContextListener.forEach(listener -> listener.onStart(messageContext));
    }
}

For your information, the inclusion of non-handling elements within the logic, such as LocalDateTime.now(), by the developer, can make writing about the test difficult. This is a good way to inject the method as a parameter.

If written as above, KafkaRecordInterceptor can test by mocking the MessageContextManager, and unit tests related to the MessageContext can be solved by creating tests for the MessageContextManager. (mocking MessageContextListener, MetricUtils)

0
On

It looks like you can just new KafkaRecordInterceptor() in the unit test. feed its method calls with mocks for ConsumerRecord and Consumer and then verify in the end expected interaction.