I set up a kafka s3 sink connector to consume messages from a kafka topic and dump them into minio in parquet format. Finally I query from dremio to verify the integrity of the data pipe.
The kafka topic consists of 12 partitions and each partition contains various # of records.
What I've found out is that
if I set flush.size=1. I can get all records one per parquet file in minio and query in dremio returns correct # of records.
if I set flush.size > 1 I won't be able to get the exact total number of records in minio and dremio query. I've always got less. The larger the flush.size is set, the more records are skipped and if flush.size is set large enough, partitions are skipped as well.
I understand that it's probably not skipping records.
The connector is waiting for more new records to fill up the buffer size then flushes to s3. This won't work as if the data is EOD, I'll have to wait for 24 hours to get yesterdays data dumped to minio?
I am looking for a parameter to trigger time-out then force flush to s3. I tried rotate.interval.ms but it only checks first record and last record time stamp span. It will not trigger a time out and force flush if no new record is injected to kafka.
Is there any parameter to trigger time-out and force flush to s3? It seems that all rotate interval parameters are expecting a new record to trigger the evaluation of the flush condition, either span or scheduled. That's not gonna serve the purpose I mentioned. We want to time-out and force flush without the dependency on a new record being processed.
rotate.schedule.interval.ms works. I made a typo in sink properties, I put rotate.scheduled.interval.ms
Once I correct the typo, it asks me to specify timezone then everything works as expected. I got all 10071 records in all 12 partitions.