AWS Kinesis Client Java: Setting up TRIM_HORIZON Position in Stream does not work

1.8k Views Asked by At

I am running a test system that spawns a Kinesis producer which starts writing messages, e.g.: 1 through 100 to a stream with two shards.

During that cycle a consumer starts to read the messages from the stream. I noticed that the consumer only reads the LATEST messages that come into the stream after it's running. So for example, it starts reading at message 43. I tried modifying the Worker.class to use the TRIM_HORIZON Policy but this doesn't seem to be working.

KinesisClientLibConfiguration c = new KinesisClientLibConfiguration("MediaPlan", "randeepstream",
    DefaultAWSCredentialsProviderChain.getInstance(),
    "consumer1")
    .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
final Worker w = new Worker.Builder()
    .recordProcessorFactory(rpf)
    .config(kinesisConfig)
    .build();
new Thread(() -> w.run()).start();

My consumer's processor is setup as:

public class ConsumerRecordProcessorImpl implements IRecordProcessor {

    public void initialize(InitializationInput initializationInput) {
        log.info("Setting up consumer with shard {} starting at {}", initializationInput.getShardId(),
                initializationInput.getExtendedSequenceNumber());
    }

    public void processRecords(ProcessRecordsInput processRecordsInput) {
        ...
    }
}

I would expect to see a message like: Setting up consumer with shard shardId-000000000000 starting at TRIM_HORIZON 0 but instead I get: Setting up consumer with shard shardId-000000000000 starting at LATEST 0

How do I get my consumer to stop reading the latest and read all unprocessed messages?

1

There are 1 best solutions below

7
Tejas Garde On

Here is an example using amazon-kinesis-client lib v2.

You will have to use Schedular(software.amazon.kinesis.coordinator) which reads record in background and provide a retrieval config to this scheduler as follows

RetrievalConfig retrievalConfig = setRetrievalConfig();

Scheduler scheduler = new Scheduler(
        configsBuilder.checkpointConfig(),
        configsBuilder.coordinatorConfig(),
        configsBuilder.leaseManagementConfig(),
        configsBuilder.lifecycleConfig(),
        configsBuilder.metricsConfig(),
        configsBuilder.processorConfig(),
        retrievalConfig);

private RetrievalConfig setRetrievalConfig(){
    InitialPositionInStreamExtended initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
    RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));
    retrievalConfig.initialPositionInStreamExtended(initialPositionInStreamExtended);
    return retrievalConfig;
}

Notice the InitialPositionInStream.TRIM_HORIZON this will tell the scheduler to start consume records after last known position. So even if the consumer is down and producer still running, all the records produces during downtime of the consumer will be consumed.

NOTE: configBuilder is object of ConfigsBuilder (software.amazon.kinesis.common)

UPDATE: initialPositionInStream position won't updated unless you call the checkpoint() API after processing the data you received from kinesis. So once you call the checkpinter() then latest Position of the stream processing record gets updated in DynamoDB and now KCL will process the record from this position.