Does ordering matter when committing offsets using a message?

36 Views Asked by At

The Confluent-Kafka Python Client provides multiple ways to commit topic offsets:

  • Using a message (using the offset value associated with it)
  • Using a TopicPartition structure (which also has an associated offset)
  • Using commit() with no arguments, which (presumably) uses the value of the offset from the consumer

When using the last option, there is little opportunity for anything to go wrong, because the broker is tracking the offset value. (Actually, I am assuming the broker tracks the offset value. The alternative would be the consumer tracks the latest offset it has read. It doesn't actually matter where that data lives, the point is it is an automated part of the API.)

When using the first option, it would seem that message ordering is important.

If we write a program which consumes a number of messages, processes some of them, and retains the others, we might want to commit just some of the offsets.

  • It only makes sense to commit an offset if all messages with smaller offsets have been consumed and processed

Given the above condition, the question then becomes

  • Does the order in which messages are committed matter?

If we commit message with offset=101, followed by message with offset=100, what effect does this have? Will the stored offset be "rewound" back to 100? Or does the broker see a request to commit a smaller value and ignore it?

1

There are 1 best solutions below

0
FreelanceConsultant On

It appears to matter what order the messages are committed in.

I wrote a test script. Here are three scripts:

  • One produces 3 messages
  • One consumes 3 messages, and commits the first two in reverse order
  • The final script consumes all messages

If I run these scripts in order, the final script starts consuming messages from the second produced message.

In other words, if the messages produced are:

  • Message 1
  • Message 2
  • Message 3

The final consumer will consume:

  • Message 2
  • Message 3

Not:

  • Message 3

Inverting the order of the message commits confirms the behaviour, in which case the final consumer will consume:

  • Message 3 only

Code:

#!/usr/bin/env python3

from confluent_kafka import Producer
from confluent_kafka import Message

def main():

    topic = 'test_topic'

    producer = create_producer()

    number_of_messages = 3

    for i in range(number_of_messages):

        producer.produce(
            topic=topic,
            key='no_key',
            value=f'message {i} of {number_of_messages}')

    producer.poll(3)
    producer.flush(10)

def create_producer():

    config = {
        'bootstrap.servers': ...,
        'client.id': 'produce_3',
        'enable.idempotence': True,
        'linger.ms': 20,
    }

    producer = Producer(config)

    return producer

if __name__ == '__main__':
    main()

#!/usr/bin/env python3

from confluent_kafka import Consumer
from confluent_kafka import Message
from confluent_kafka import TopicPartition

def main():

    topic = 'test_topic'

    consumer = create_consumer()

    consumer.subscribe(topics=[topic])

    message_count = 0
    messages = []

    while True:
        message = consumer.poll(10)

        if message is None:
            print(f'no message')
        elif message.error():
            print(f'error: {message.error()}')
            break
        else:
            print(f'message: {message.value()}')
            messages.append(message)

            message_count += 1

            if message_count == 2:
                # commit messages, but in wrong order
                consumer.commit(message=messages[0], asynchronous=False)
                consumer.commit(message=messages[1], asynchronous=False)

    consumer.close()

def create_consumer():

    config = {
        'bootstrap.servers': ...,
        'client.id': 'consumer_3_commit',
        'group.id': 'consumer_3_commit',
        'enable.auto.commit': False,
        'auto.offset.reset': 'earliest',
    }

    consumer = Consumer(config)

    return consumer

if __name__ == '__main__':
    main()

#!/usr/bin/env python3

from confluent_kafka import Consumer
from confluent_kafka import Message
from confluent_kafka import TopicPartition

def main():

    topic = 'test_topic'

    consumer = create_consumer()

    consumer.subscribe(topics=[topic])

    while True:
        message = consumer.poll(10)

        if message is None:
            print(f'no message')
        elif message.error():
            print(f'error: {message.error()}')
            break
        else:
            print(f'message: {message.value()}')

    consumer.close()

def create_consumer():

    config = {
        'bootstrap.servers': ...,
        'client.id': 'consumer_3_commit',
        'group.id': 'consumer_3_commit',
        'enable.auto.commit': False,
        'auto.offset.reset': 'earliest',
    }

    consumer = Consumer(config)

    return consumer

if __name__ == '__main__':
    main()