Reprocessing messages AWS kinesis data stream message using KCL consumer

544 Views Asked by At

A KCL consumer is running using Auto Scaling Group(ASG) configured according to the number of provisioned shards of the kinesis data stream on EC2 machines which means if the kinesis data stream has n number of provisioned shards then the maximum n number of EC2 machines can be configured to consume messages from each shard as per this document link

Now, Messages will be processed in real-time as soon as messages arrive in the kinesis data stream as the shard type iterator is set as LATEST for the KCL consumer. For more info check here.

A dynamo DB table is configured for a KCL consumer having entries of checkpoints for each provisioned shard to keep track of the shards in a kinesis data stream that is being leased and processed by the workers of the KCL consumer application.

If we want to process every message present in the kinesis data stream as per the data retention period of it (which is by default 7 days). Is there any simple and easy mechanism to do it?

Possible theoretical solution (can be incorrect or improved):

  • First Approach

    • Stop KCL consumer workers.
    • Delete the DynamoDB table associated with each provisioned shard so that workers start picking up the messages from the kinesis data stream.
    • Restart the KCL consumer service.
  • Second Approach

    • Stop the KCL consumer
    • Editing/Updating the checkpoint values for each shard related to previous/old timestamp. Any conversion formula? I don't know. Can we have any other dump value instead which will be overwritten by the KCL consumer?
    • Restart KCL consumer service
  • Any other approach?

Kindly feel free to suggest/comment on how can we reprocess kinesis data stream messages again effectively without any problem.

1

There are 1 best solutions below

0
mdoblado On

To reprocess all the stream data with your First approach you would need to change the type iterator from LATEST to TRIM_HORIZON before deleting the tables and restarting the KCL consumer, otherwise you would only process new arrivals to the stream.

The second approach is possible, you will need to get the shard-iterator for all the shards, using also the TRIM_HORIZON shard iterator type. There is also the possibility to indicate a timestamp in case you would need to reprocess less data than the retention of your stream. This aws reference documentation can be useful .