I am testing a Kotlin KafkaProducer application using Transactions and the possible scenarios:
Happy path: (working fine)
- begin transaction
- producer.send
- commit transaction
- Assert consume.poll has a message
Exception on producer: (also working fine)
- begin transaction
- producer.send
- Throw RuntimeException
- Abort transaction
- Assert consume.poll has no messages
Timeout on producer: (Throws a ProducerFencedException)
- begin transaction
- producer.send
- Thread.Sleep (to let the transaction producer timeout)
- Assert consume.poll has no messages (everything after this throws a ProducerFencedException)
I know the org.apache.kafka.common.errors.ProducerFencedException is thrown when the client is using a producer with an old Epoch (Zombie producer as said in Kafka literature). But my question is: how do I recover from a Timeout in a transactional producer? I am wondering if there is a way to let my producer to know that this producer is not valid and should be reset.
Thanks.
Here is my code (I omitted the successful cases)
- Main.kt
val producer = Producer()
fun main() {
happyPath()
timeoutTestSync()
happyPath()
}
fun happyPath() {
val consumer = Consumer()
assert(consumer.readClean().isEmpty)
assert(consumer.readDirty().isEmpty)
producer.produceInTransaction("test${RandomStringUtils.randomAlphanumeric(1)}", false, false)
assert(consumer.readClean().count() == 1)
}
fun timeoutTestSync() {
val consumer = Consumer()
assert(consumer.readClean().isEmpty)
assert(consumer.readDirty().isEmpty)
producer.produceInTransaction("test${RandomStringUtils.randomAlphanumeric(1)}", true, false)
assert(consumer.readDirty().isEmpty)
assert(consumer.readClean().isEmpty)
}
- Consumer.kt
private const val INPUT_TOPIC = "output"
class Consumer {
private val consumer: KafkaConsumer<String, String> = createKafkaConsumer(false)
private val dirtConsumer: KafkaConsumer<String, String> = createKafkaConsumer(true)
private fun createKafkaConsumer(dirtRead:Boolean): KafkaConsumer<String, String> {
val props:MutableMap<String, Any> = HashMap()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9099"
props[ConsumerConfig.CLIENT_ID_CONFIG] = "client-${RandomStringUtils.randomAlphanumeric(3)}"
props[ConsumerConfig.GROUP_ID_CONFIG] = "group-${RandomStringUtils.randomAlphanumeric(3)}"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = true
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
if (dirtRead)
props[ConsumerConfig.ISOLATION_LEVEL_CONFIG] = "read_uncommitted"
else
props[ConsumerConfig.ISOLATION_LEVEL_CONFIG] = "read_committed"
val consumer:KafkaConsumer<String, String> = KafkaConsumer<String, String>(props)
consumer.subscribe(Collections.singletonList(INPUT_TOPIC))
return consumer
}
fun readClean(): ConsumerRecords<String, String> {
var records: ConsumerRecords<String, String> = consumer.poll(Duration.ofMillis(10000))
return records
}
fun readDirty(): ConsumerRecords<String, String> {
var records: ConsumerRecords<String, String> = dirtConsumer.poll(Duration.ofMillis(10000))
return records
}
}
- Producer.kt
private const val OUTPUT_TOPIC = "output"
class Producer {
private val producer: KafkaProducer<String, String> = createKafkaProducer()
private fun createKafkaProducer(): KafkaProducer<String, String> {
val props = Properties()
props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9099"
props[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = "true"
props[ProducerConfig.TRANSACTION_TIMEOUT_CONFIG] = 5000
props[ProducerConfig.TRANSACTIONAL_ID_CONFIG] = "prod-${RandomStringUtils.randomAlphanumeric(3)}-"
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
val producerOut:KafkaProducer<String,String> = KafkaProducer(props)
producerOut.initTransactions()
return producerOut
}
fun produceInTransaction(message:String, delay:Boolean, exception:Boolean) {
try {
producer.beginTransaction()
val x = producer.send(ProducerRecord<String, String>(OUTPUT_TOPIC, "1", message))
when {
delay -> {
Thread.sleep(10000)
}
exception -> {
throw RuntimeException("Simulated exception")
}
}
producer.commitTransaction()
}
catch (e:RuntimeException) {
producer.abortTransaction()
println("Exception in transaction: ${e}")
}
}
}
If you expect your transactions to timeout, you would need to handle
ProducerFencedException. The producer cannot by itself know that it has expired and the exception occurs when it tries to commit the ongoing transaction. You could handle the exception in your code - initialise a new producer object and retry with the new producer.That being said, if you have configured your settings correctly - this should be a very rare case scenario. You could reference this thread to see various configuration options.