What is correct way to SUM total over full data from both unbounded (Kafka) and bounded (JDBC)?

47 Views Asked by At

I have a Kafka source that has a retention of 7 days, and we have a JDBC data store that contains all the data. The data are "trades" and each trade has a amount

I want to create a streaming job that keeps track of "total overall sum of amounts" (totalVolume) over the full history of data both in JDBC store + all new incoming event data via Kafka.

Currently I have 1 batch job that calculates the total sum once and stores in the database, and then a streaming job that picks where the batch job left off, relying on a field named lastVolumeTimestamp and continue adding new comin amounts. Is this the correct way of doing this or is there a more robust way of doing this in Flink SQL? I want to make sure no event is missed.

The batch job looks like this:

INSERT INTO
  sink
SELECT
    poolId,
    SUM(amountUSD) as totalVolume,
    MAX(timestamp) as totalVolumeTimestamp
FROM
    trades_store -- JDBC backed
GROUP BY poolId
;

And the streaming job looks like this:

CREATE VIEW trades_agg AS
SELECT
    ss.poolId,
    SUM(ss.amount) as totalVolume,
    MAX(ss.tradeTimestamp) as totalVolumeTimestamp,
    COALESCE(LAST_VALUE(pl.volumeUSD), 0) as currentTotalVolume
FROM
    trades_stream ss
LEFT JOIN
    pools_store FOR SYSTEM_TIME AS OF ss.procTime AS pl 
ON
    ss.poolId = pl.poolId
WHERE
    pl.totalVolumeTimestamp IS NULL OR
    ss.tradeTimestamp > pl.totalVolumeTimestamp
GROUP BY ss.poolId, pl.poolId
;

INSERT INTO
    pools_sink
SELECT
    volumeUSD + currentVolumeUSD as totalVolumeUSD,
    totalVolumeTimestamp
FROM
    trades_agg
WHERE
    totalVolumeTimestamp IS NOT NULL AND
    totalVolumeTimestamp > 0
;

The most immediate issue is I have to stop the streaming job if I want to run the batch (e.g. to fix some old data).

But the main question is, is this the correct way of using Flink aggregation functions for such task?

0

There are 0 best solutions below