I performed the bellow test to try to figure out how ksqldb will group by records from topic with 3 partitions. Is there any parameter that tells to ksqldb to make the windowing by partitions to avoid losing data due to expiration period ?
CREATE STREAM ratings (title VARCHAR, release_year INT, rating DOUBLE, timestamp VARCHAR)
WITH (kafka_topic='ratings',
timestamp='timestamp',
timestamp_format='yyyy-MM-dd HH:mm:ss',
partitions=3,
value_format='json');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('AsjPxJDrFr', 1988, 9.2, '2023-05-18 15:35:01');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('sBaOVCofGF', 1975, 9.2, '2023-05-18 15:35:02');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('aOhNlSgVqW', 2010, 9.2, '2023-05-18 15:35:03');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('dKeObIdQAm', 2005, 8.9, '2023-05-18 15:35:04');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('tFMgeaJAvP', 1992, 8.9, '2023-05-18 15:35:05');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('LZGnwpXOHv', 1978, 6.8, '2023-05-19 15:35:06');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('CzKbTdHAsO', 2008, 6.8, '2023-05-19 15:35:07');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('zbeSPRfGMO', 2015, 5.9, '2023-05-19 15:35:08');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('WZaKjixEMn', 1984, 5.9, '2023-05-19 15:35:09');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('MhQdtsCvxR', 2001, 6.8, '2023-05-20 15:35:10');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('kXjvNTmOpD', 1999, 6.8, '2023-05-20 15:35:11');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('GfBuHqXmKr', 2003, 6.8, '2023-05-20 15:35:12');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('oKRPfYhXJL', 1987, 6.8, '2023-05-20 15:35:13');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('zVHjCbrkYD', 2012, 8.8, '2023-05-21 15:35:14');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('mALOfdzBwQ', 1972, 8.8, '2023-05-21 15:35:15');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('eRcIZVWnSa', 2007, 6.5, '2023-05-21 15:35:16');
SET 'auto.offset.reset' = 'earliest';
SELECT rating,
COUNT(*) AS rating_count,
TIMESTAMPTOSTRING(WINDOWSTART, 'yyy-MM-dd HH:mm:ss', 'UTC') as window_start,
TIMESTAMPTOSTRING(WINDOWEND, 'yyy-MM-dd HH:mm:ss', 'UTC') as window_end
FROM ratings
WINDOW TUMBLING (SIZE 1 DAY, RETENTION 30 DAYS)
GROUP BY rating
EMIT CHANGES;
+-------------------------------------------+-------------------------------------------+-------------------------------------------+-------------------------------------------+
|RATING |RATING_COUNT |WINDOW_START |WINDOW_END |
+-------------------------------------------+-------------------------------------------+-------------------------------------------+-------------------------------------------+
|8.8 |1 |2023-05-21 00:00:00 |2023-05-22 00:00:00 |
|8.8 |2 |2023-05-21 00:00:00 |2023-05-22 00:00:00 |
|9.2 |1 |2023-05-18 00:00:00 |2023-05-19 00:00:00 |
|6.8 |1 |2023-05-19 00:00:00 |2023-05-20 00:00:00 |
|6.8 |1 |2023-05-20 00:00:00 |2023-05-21 00:00:00 |
|6.5 |1 |2023-05-21 00:00:00 |2023-05-22 00:00:00 |
|5.9 |1 |2023-05-19 00:00:00 |2023-05-20 00:00:00 |
|5.9 |2 |2023-05-19 00:00:00 |2023-05-20 00:00:00 |
Skipped records:
ksqldb-server | [2023-05-21 11:26:22,141] WARN Skipping record for expired window. topic=[_confluent-ksql-default_transient_transient_RATINGS_9159540547105289197_1684668380893-Aggregate-GroupBy-repartition] partition=[1] offset=[2] timestamp=[1684424104000] window=[1684368000000,1684454400000) expiration=[1684683315000] streamTime=[1684683315000] (org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate:142)
ksqldb-server | [2023-05-21 11:26:22,206] WARN Skipping record for expired window. topic=[_confluent-ksql-default_transient_transient_RATINGS_9159540547105289197_1684668380893-Aggregate-GroupBy-repartition] partition=[2] offset=[4] timestamp=[1684424101000] window=[1684368000000,1684454400000) expiration=[1684683316000] streamTime=[1684683316000] (org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate:142)
ksqldb-server | [2023-05-21 11:26:22,208] WARN Skipping record for expired window. topic=[_confluent-ksql-default_transient_transient_RATINGS_9159540547105289197_1684668380893-Aggregate-GroupBy-repartition] partition=[2] offset=[5] timestamp=[1684424102000] window=[1684368000000,1684454400000) expiration=[1684683316000] streamTime=[1684683316000] (org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate:142)
ksqldb-server | [2023-05-21 11:26:22,209] WARN Skipping record for expired window. topic=[_confluent-ksql-default_transient_transient_RATINGS_9159540547105289197_1684668380893-Aggregate-GroupBy-repartition] partition=[2] offset=[6] timestamp=[1684596910000] window=[1684540800000,1684627200000) expiration=[1684683316000] streamTime=[1684683316000] (org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate:142)
ksqldb-server | [2023-05-21 11:26:22,212] WARN Skipping record for expired window. topic=[_confluent-ksql-default_transient_transient_RATINGS_9159540547105289197_1684668380893-Aggregate-GroupBy-repartition] partition=[2] offset=[7] timestamp=[1684596911000] window=[1684540800000,1684627200000) expiration=[1684683316000] streamTime=[1684683316000] (org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate:142)
ksqldb-server | [2023-05-21 11:26:22,218] WARN Skipping record for expired window. topic=[_confluent-ksql-default_transient_transient_RATINGS_9159540547105289197_1684668380893-Aggregate-GroupBy-repartition] partition=[2] offset=[8] timestamp=[1684596913000] window=[1684540800000,1684627200000) expiration=[1684683316000] streamTime=[1684683316000] (org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate:142)
ksqldb-server | [2023-05-21 11:26:22,306] WARN Skipping record for expired window. topic=[_confluent-ksql-default_transient_transient_RATINGS_9159540547105289197_1684668380893-Aggregate-GroupBy-repartition] partition=[2] offset=[9] timestamp=[1684510506000] window=[1684454400000,1684540800000) expiration=[1684683316000] streamTime=[1684683316000] (org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate:142)
ksqldb-server | [2023-05-21 11:26:22,310] WARN Skipping record for expired window. topic=[_confluent-ksql-default_transient_transient_RATINGS_9159540547105289197_1684668380893-Aggregate-GroupBy-repartition] partition=[1] offset=[3] timestamp=[1684424105000] window=[1684368000000,1684454400000) expiration=[1684683315000] streamTime=[1684683315000] (org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate:142)