Kafkacat consume between timestamp giving wrong results when counting records

1.2k Views Asked by At

I want to count the number of messages in a given Kafka topic between two timestamps. I tried doing this using kafkacat, using the following command:

# START_DATE = 01.04.2022 02:00:00Z
# END_DATE = 01.04.2022 02:05:00Z
$ kafkacat -C -b broker:9092 -t mytopic -o s@1648778400000 -o e@1648778700000 -p 0 -f '[ts %T] [partition %p] [offset %o] %k\n' -e -c 1

In fact, this is the same approach that is listed as the answer in a very similar question.

According to kafkacat --help:

Consumer options:
  -o <offset>        Offset to start consuming from:
                     beginning | end | stored |
                     <value>  (absolute offset) |
                     -<value> (relative offset from end)
                     s@<value> (timestamp in ms to start at)
                     e@<value> (timestamp in ms to stop at (not included))

Correspondingly, I would expect the above command to give me the first record that has a timestamp greater than s@<value> and smaller than e@<value>. However, it instead gives me a record that has a timestamp prior to s@<value> (in fact, it just gives me the first record in partition 0):

# output of above command
[ts 1648692486141] [partition 0] [offset 2] 643b0013-b3e1-47a5-a9d3-7478c0e91ca4

Am I misunderstanding the consumer options s@<value> and e@<value>?

Kafkacat version:

Version 1.5.0 (JSON, librdkafka 1.2.1 builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer)

Additionally, I'm seeing some odd behaviour even with just s@<value>. For example:

kafkacat -C -b broker:9092 -t mytopic -o s@1648778400000 -p 0 -f '[ts %T] [partition %p] [offset %o] %k\n' -e -c 1

should, as I understand it, output the first record with record.timestamp ≥ 1648778400000. The actual output is different:

[ts 1648692486141] [partition 0] [offset 2] 643b0013-b3e1-47a5-a9d3-7478c0e91ca4

and contains a timestamp prior to the one I set (31.03.2022 02:08:06Z vs. 01.04.2022 02:00:00Z).

This output is the same when I tested using docker run edenhill/kcat:1.7.1 (the above was an Ubuntu kafkacat)

1

There are 1 best solutions below

3
OneCricketeer On

I don't think you can provide -o multiple times. Therefore, your options include

-o e@1648778700000 -p 0  -c 1

To read one message from partition 0, which is less than timestamp 1648778700000


To properly consume between timestamps, find the offsets for the start timestamp, commit them to a consumer group, then start a consumer in the group with your end timestamp