How does flink handles kafka offsets in checkpointing when app fails intermediately?

245 Views Asked by At

Does the KafkaSink which implements TwoPhaseCommittingSink supports the checkpointing implicitely or do I have to add any code for it?

I have a simple flink application which reads from source(KafkaSource). I want to handle the offset manually in kafka consumer so that if my app breaks down it should recover from checkpoint without a data loss or duplication.

Consider I am reading offsets from 1 --> 100 and if my flink application fails at Offset-87 Then using checkpoint, it should restart with Offset-87 not from earliest or latest. As of now, it always starts from 1 if I have no delay in processing. But if I add Thread.sleep(5000) in .map() then it works as expected and tries to read from 87.

As I read from this post - https://stackoverflow.com/a/70382650/4951838

Since I am using .print() ; it does not support exactly-once semantic. That's why it take 2-3 seconds to sync the checkpoint with kafka offsets.

So if I use KafkaSink with exactly-once guarantee, will it solve my issue?

Checkpoint config -

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);
        env.setParallelism(1);      
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setCheckpointStorage("SomeS3Url");
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                  1, // number of restart attempts
                  Time.of(1, TimeUnit.SECONDS) // delay
                ));

Kafka consumer -

KafkaSource<Long> dataSource = KafkaSource.<Long>builder()
                .setBootstrapServers(brokers)
                .setTopics("Alerts.Checkpoint")
                .setGroupId("checkpoint-alertss-1")
                .setProperties(prop)                                   .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
                .setClientIdPrefix("Alerts_01_temp_2")              
                .setDeserializer("custom deserializer")
                .build();

As you can see below, I want to fail the application when consumer reaches to certain offset.

DataStreamSource<Long> alertStream = env.fromSource(dataSource , WatermarkStrategy.noWatermarks(), "Kafka Source");

    alertStream 
    .map(new MapFunction<Long, String>() {
        @Override
        public String map(Long value) throws Exception {
            // TODO Auto-generated method stub
            if(value == 87) // value is an offset of a message
//              throw new Exception();
            return "Offset - "+ value.toString();
        }
    })
    .print();
1

There are 1 best solutions below

2
Martijn Visser On BEST ANSWER

The KafkaSink natively supports Flink's snapshotting mechanism and no additional code is required to enable that. You shouldn't handle the offset management manually in your code.