puzzled with flink window state

79 Views Asked by At

I'm currently confused about windows and states. Suppose I have a program that counts user access data every minute and needs to do sum statistics in each window. Assume that at this time, I configure checkpoint for the fault tolerance of the program. The checkpoint configuration is to trigger every 30 seconds. Then when the time is 01:00, the program hangs. In theory, it can only be restored to the state data at 00:30, but there is no trigger window at 00:30. After calculation, we get the kafka offset data at 00:30 and the window data at 00:00. Is there any problem with my understanding?

here is my program,flink version is 1.14: flink graph

build stream config is

config.getDelayMaxDuration() = 60000,config.getAggregateWindowMillisecond()=60000 checkpoint interval is 30s

 SingleOutputStreamOperator<BaseResult> wordCountSampleStream = subStream.assignTimestampsAndWatermarks(
                        WatermarkStrategy.<MetricEvent>forBoundedOutOfOrderness(config.getDelayMaxDuration())
                                .withTimestampAssigner(new MetricEventTimestampAssigner())
                                .withIdleness(config.getWindowIdlenessTime())
                ).setParallelism(CommonJobConfig.getParallelismOfSubJob("WORD_COUNT_SAMPLE_TEST"))
                .flatMap(new WordCountToResultFlatMapFunction(config)).setParallelism(CommonJobConfig.getParallelismOfSubJob("WORD_COUNT_SAMPLE_TEST"))
                .keyBy(new BaseResultKeySelector())
                .window(TumblingEventTimeWindows.of(Time.milliseconds(config.getAggregateWindowMillisecond())))
                .apply(new WordCountWindowFunction(config)).setParallelism(CommonJobConfig.getParallelismOfSubJob("WORD_COUNT_SAMPLE_TEST"));
        wordCountSampleStream.addSink(sink).setParallelism(CommonJobConfig.getParallelismOfSubJob("WORD_COUNT_SAMPLE_TEST"));

window apply function:

public class WordCountWindowFunction extends RichWindowFunction<BaseResult, BaseResult, String, TimeWindow> {
private StreamingConfig config;

private Logger logger = LoggerFactory.getLogger(WordCountWindowFunction.class);

public WordCountWindowFunction(StreamingConfig config) {
    this.config = config;
}


@Override
public void close() throws Exception {
    super.close();
}

@Override
public void apply(String s, TimeWindow window, Iterable<BaseResult> input, Collector<BaseResult> out) throws Exception {
    WordCountEventPortrait result = new WordCountEventPortrait();
    long curWindowTimestamp = window.getStart() / config.getAggregateWindowMillisecond() * config.getAggregateWindowMillisecond();
    result.setDatasource("word_count_test");
    result.setTimeSlot(curWindowTimestamp);
    for (BaseResult sub : input) {
        logger.info("in window cur sub is {} ", sub);
        WordCountEventPortrait curInvoke = (WordCountEventPortrait) sub;
        result.setTotalCount(result.getTotalCount() + curInvoke.getTotalCount());
        result.setWord(curInvoke.getWord());
    }
    logger.info("out window result is {} ", result);
    out.collect(result);
}

}

sink function:

public class ClickHouseRichSinkFunction extends RichSinkFunction<BaseResult> implements CheckpointedFunction {
private ConcurrentHashMap<String, SinkBatchInsertHelper<BaseResult>> tempResult = new ConcurrentHashMap<>();
private ClickHouseDataSource dataSource;
private Logger logger = LoggerFactory.getLogger(ClickHouseRichSinkFunction.class);


@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    for (Map.Entry<String, SinkBatchInsertHelper<BaseResult>> helper : tempResult.entrySet()) {
        helper.getValue().insertAllTempData();
    }
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
}

@Override
public void open(Configuration parameters) throws Exception {
    Properties properties = new Properties();
    properties.setProperty("user", CommonJobConfig.CLICKHOUSE_USER);
    properties.setProperty("password", CommonJobConfig.CLICKHOUSE_PASSWORD);
    dataSource = new ClickHouseDataSource(CommonJobConfig.CLICKHOUSE_JDBC_URL, properties);
}

@Override
public void close() {
    AtomicInteger totalCount = new AtomicInteger();
    tempResult.values().forEach(it -> {
        totalCount.addAndGet(it.getTempList().size());
        batchSaveBaseResult(it.getTempList());
        it.getTempList().clear();
    });
}

@Override
public void invoke(BaseResult value, Context context) {
    tempResult.compute(value.getDatasource(), (datasource, baseResults) -> {
        if (baseResults == null) {
            baseResults = new SinkBatchInsertHelper<>(CommonJobConfig.COMMON_BATCH_INSERT_COUNT,
                    needToInsert -> batchSaveBaseResult(needToInsert),
                    CommonJobConfig.BATCH_INSERT_INTERVAL_MS);
        }
        baseResults.tempInsertSingle(value);
        return baseResults;
    });
}

private void batchSaveBaseResult(List<BaseResult> list) {
    if (list.isEmpty()) {
        return;
    }
    String sql = list.get(0).getPreparedSQL();
    try {
        try (PreparedStatement ps = dataSource.getConnection().prepareStatement(sql)) {
            for (BaseResult curResult : list) {
                curResult.addParamsToPreparedStatement(ps);
                ps.addBatch();
            }
            ps.executeBatch();
        }
    } catch (SQLException error) {
        log.error("has exception during batch insert,datasource is {} ", list.get(0).getDatasource(), error);
    }
}

}

batch insert helper:

public class SinkBatchInsertHelper<T> {
private List<T> waitToInsert;

private ReentrantLock lock;

private int bulkActions;

private AtomicInteger tempActions;

private Consumer<List<T>> consumer;

private AtomicLong lastSendTimestamp;

private long sendInterval;

private Logger logger = LoggerFactory.getLogger(SinkBatchInsertHelper.class);

public SinkBatchInsertHelper(int bulkActions, Consumer<List<T>> consumer, long sendInterval) {
    this.waitToInsert = new ArrayList<>();
    this.lock = new ReentrantLock();
    this.bulkActions = bulkActions;
    this.tempActions = new AtomicInteger(0);
    this.consumer = consumer;
    this.sendInterval = sendInterval;
    this.lastSendTimestamp = new AtomicLong(0);
}


public void tempInsertSingle(T data) {
    lock.lock();
    try {
        waitToInsert.add(data);
        if (tempActions.incrementAndGet() >= bulkActions || ((System.currentTimeMillis() - lastSendTimestamp.get()) >= sendInterval)) {
            batchInsert();
        }
    } finally {
        lastSendTimestamp.set(System.currentTimeMillis());
        lock.unlock();
    }
}

public long insertAllTempData() {
    lock.lock();
    try {
        long result = tempActions.get();
        if (tempActions.get() > 0) {
            batchInsert();
        }
        return result;
    } finally {
        lock.unlock();
    }
}

private void batchInsert() {
    for(T t: waitToInsert){
            logger.info("batch insert data:{}", t);
    }
    consumer.accept(waitToInsert);
    waitToInsert.clear();
    tempActions.set(0);
}


public int getTempActions() {
    return tempActions.get();
}

public List<T> getTempList() {
    lock.lock();
    try {
        return waitToInsert;
    } finally {
        lock.unlock();
    }
}

}

The resulting phenomenon is: Suppose I cancel the task at 00:31:30, then when I restart the task, the statistics at 00:31:00 will be less than expected. I found out by printing records that it was because when the sink wrote the data at 00:30:00, the kafka consumer had actually consumed the data after 00:31:00, but this part of the data was not written to ck. , it was not replayed in the window when restarting, so this part of the data was lost. The statistics will not return to normal until 00:32:00.

1

There are 1 best solutions below

3
David Anderson On

There's no need to be concerned about synchronizing checkpointing with your windows; the two are independent of each other.

Whenever a checkpoint is taken it will include

  • the Kafka offset for each partition
  • the in-progress window state, resulting from having processed the events up through those offsets

https://www.youtube.com/watch?v=zkWa2ZDjwb4 walks through this in a bit more detail.