I have the following code (Obviously made as an example):
class RandomClass(
val producer: Producer<String, String>
) {
fun randomFunction(): Boolean {
// Using .get() because I want to make sure it got sent before continuing
producer.send( (...) ).get()
return true
}
}
I build my test by creating a MockProducer and injecting it instead of a real Producer to the class. Then I execute my function and verify the results.
"Some test of random function" {
val mockProducer = MockProducer(
false,
StringSerializer(),
StringSerializer()
)
// Build class with mockproducers
val randomClass = RandomClass(
producer = mockProducer
)
// Execute function
randomClass.randomFunction()
// Verify executions
mockProducer.history().size shouldBe 1
}
The code above will now just hang as it the function call .get() is blocking the thread. Which is to be expected since we have not yet told the MockProducer what to do with the .send() function call.
From what I have learned, it seems like I have to decide what to do with the function call after the .send() has been invoked. Which to me makes absolutely no sense in a test scope as I do not know when the send is actually invoked. From my test I only control when I perform the call to randomClass.randomFunction() and after that the program will execute when available and since the function is blocking means that the test will not continue until the .send() has been completed. Therefore due to the .get() I cannot tell the MockProducer what to do with the .send() invokation after it has happened.
I cite from this guide from Baeldung: https://www.baeldung.com/kafka-mockproducer
(...) Second, we’ll call mockProducer.errorNext(e), so that MockProducer returns an exception for the last send() call.
And I cite from the javadoc for MockProducer.completeNext():
Complete the earliest uncompleted call successfully. Returns: true if there was an uncompleted call to complete
I would like to tell the MockProducer what to do with the next call or n calls ahead of time. This would be comparable to how Mocks with Mockito/Mockk would work. The example below contains what I would like to do:
"Some test of random function" {
val mockProducer = MockProducer(
false,
StringSerializer(),
StringSerializer()
)
// Build class with mockproducers
val randomClass = RandomClass(
producer = mockProducer
)
// I wish to tell the MockProducer ahead of time that
// the next send should be completed successfully
mockProducer.completeNext() // Or .errorNext() if I wish to test an error
// Execute function
randomClass.randomFunction()
// Verify executions
mockProducer.history().size shouldBe 1
}
Is there really no way for me to tell a MockProducer ahead of time how to complete calls to .send()?
I ran into the similar problem today. I use send with a callback instead of send returning a future and want to define the callback result (exceptional or not) in advance.
What you do in addition is that you want to find out the result of the Future right after the send of the message. The result can be evaluated only after the message is sent into Kafka broker, which is done using flush operation. Real (not mock) producer can be configured to flush immediately automatically and it even does it by default since linger.ms producer setting is 0 by default. To simulate flush in MockProducer, you have to call completeNext() errorNext() or flush().
To achieve what you want, you can extend MockProducer and override completeSend() to behave according to what you define before calling send() and also override send() to do flush to avoid blocking the thread if you want to check the Future immediately (although I would either avoid checking the Future result after each sent message, or if you really need it, call flush() after send manually and only then check the result).
Example implementation: