Transactional KafkaProducer recover from timeout

74 Views Asked by At

I am testing a Kotlin KafkaProducer application using Transactions and the possible scenarios:

  • Happy path: (working fine)

    • begin transaction
    • producer.send
    • commit transaction
    • Assert consume.poll has a message
  • Exception on producer: (also working fine)

    • begin transaction
    • producer.send
    • Throw RuntimeException
    • Abort transaction
    • Assert consume.poll has no messages
  • Timeout on producer: (Throws a ProducerFencedException)

    • begin transaction
    • producer.send
    • Thread.Sleep (to let the transaction producer timeout)
    • Assert consume.poll has no messages (everything after this throws a ProducerFencedException)

I know the org.apache.kafka.common.errors.ProducerFencedException is thrown when the client is using a producer with an old Epoch (Zombie producer as said in Kafka literature). But my question is: how do I recover from a Timeout in a transactional producer? I am wondering if there is a way to let my producer to know that this producer is not valid and should be reset.

Thanks.

Here is my code (I omitted the successful cases)

  • Main.kt
val producer = Producer()

fun main() {
    happyPath()
    timeoutTestSync()
    happyPath()
}

fun happyPath() {
    val consumer = Consumer()

    assert(consumer.readClean().isEmpty)
    assert(consumer.readDirty().isEmpty)

    producer.produceInTransaction("test${RandomStringUtils.randomAlphanumeric(1)}", false, false)

    assert(consumer.readClean().count() == 1)
}

fun timeoutTestSync() {
    val consumer = Consumer()

    assert(consumer.readClean().isEmpty)
    assert(consumer.readDirty().isEmpty)

    producer.produceInTransaction("test${RandomStringUtils.randomAlphanumeric(1)}", true, false)

    assert(consumer.readDirty().isEmpty)
    assert(consumer.readClean().isEmpty)
}
  • Consumer.kt
private const val INPUT_TOPIC = "output"

class Consumer {

    private val consumer: KafkaConsumer<String, String> = createKafkaConsumer(false)
    private val dirtConsumer: KafkaConsumer<String, String> = createKafkaConsumer(true)

    private fun createKafkaConsumer(dirtRead:Boolean): KafkaConsumer<String, String> {
        val props:MutableMap<String, Any> = HashMap()

        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9099"
        props[ConsumerConfig.CLIENT_ID_CONFIG] = "client-${RandomStringUtils.randomAlphanumeric(3)}"
        props[ConsumerConfig.GROUP_ID_CONFIG] = "group-${RandomStringUtils.randomAlphanumeric(3)}"
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = true
        props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
        if (dirtRead)
            props[ConsumerConfig.ISOLATION_LEVEL_CONFIG] = "read_uncommitted"
        else
            props[ConsumerConfig.ISOLATION_LEVEL_CONFIG] = "read_committed"

        val consumer:KafkaConsumer<String, String> = KafkaConsumer<String, String>(props)
        consumer.subscribe(Collections.singletonList(INPUT_TOPIC))

        return consumer
    }

    fun readClean(): ConsumerRecords<String, String> {

        var records: ConsumerRecords<String, String> = consumer.poll(Duration.ofMillis(10000))

        return records
    }

    fun readDirty(): ConsumerRecords<String, String> {

        var records: ConsumerRecords<String, String> = dirtConsumer.poll(Duration.ofMillis(10000))

        return records
    }
}
  • Producer.kt
private const val OUTPUT_TOPIC = "output"

class Producer {
    private val producer: KafkaProducer<String, String> = createKafkaProducer()

    private fun createKafkaProducer(): KafkaProducer<String, String> {
        val props = Properties()
        props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9099"
        props[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = "true"
        props[ProducerConfig.TRANSACTION_TIMEOUT_CONFIG] = 5000
        props[ProducerConfig.TRANSACTIONAL_ID_CONFIG] = "prod-${RandomStringUtils.randomAlphanumeric(3)}-"
        props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
        props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
        val producerOut:KafkaProducer<String,String> = KafkaProducer(props)
        producerOut.initTransactions()
        return producerOut
    }


    fun produceInTransaction(message:String, delay:Boolean, exception:Boolean) {
        try {
            producer.beginTransaction()

            val x = producer.send(ProducerRecord<String, String>(OUTPUT_TOPIC, "1", message))

            when {
                delay -> {
                    Thread.sleep(10000)
                }
                exception -> {
                    throw RuntimeException("Simulated exception")
                }
            }

            producer.commitTransaction()
        }
        catch (e:RuntimeException) {
            producer.abortTransaction()
            println("Exception in transaction: ${e}")
        }
    }
}
1

There are 1 best solutions below

0
Rishabh Sharma On

If you expect your transactions to timeout, you would need to handle ProducerFencedException. The producer cannot by itself know that it has expired and the exception occurs when it tries to commit the ongoing transaction. You could handle the exception in your code - initialise a new producer object and retry with the new producer.

That being said, if you have configured your settings correctly - this should be a very rare case scenario. You could reference this thread to see various configuration options.