Flink. Batch load data. Exception OOM

75 Views Asked by At

I tried to transfer data from one table to another in Flink using SQL in BATCH mode and noticed that at first all the data tries to cache in memory before going further along the pipeline. Because I have a lot of data, I get a memory shortage error. Is there a way to read data in chunks rather than all at once?

Example of a stream:

CREATE TABLE t_destination_nrt (
  `fk_id`        decimal(32, 0) primary key not enforced,
  `fv_text`      string,
  `fd_timestamp` timestamp,
  `fv_desc`      string
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://192.168.1.17:5432/postgres',
    'table-name' = 'nrt.t_destination',
    'username' = 'postgres',
    'password' = 'postgres',
  'scan.fetch-size' = '1000',
  'lookup.cache.ttl' = '60s',
  'lookup.cache.max-rows' = '10000',
  'lookup.max-retries' = '3',
  'sink.buffer-flush.max-rows' = '5000',
  'sink.buffer-flush.interval' = '2s',
  'sink.max-retries' = '3'
)

CREATE TABLE t_source_nrt (
  `fk_id`        decimal(32, 0)  primary key not enforced,
  `fv_text`      string,
  `fd_timestamp` timestamp
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://192.168.1.17:5432/postgres',
  'table-name' = 'nrt.t_source',
  'username' = 'postgres',
  'password' = 'postgres',
  'scan.fetch-size' = '1000',
  'lookup.cache.ttl' = '60s',
  'lookup.cache.max-rows' = '10000',
  'lookup.max-retries' = '3',
  'sink.buffer-flush.max-rows' = '5000',
  'sink.buffer-flush.interval' = '2s',
  'sink.max-retries' = '3'
)

CREATE TABLE t_dict_nrt (
  `fk_id`      decimal(32, 0) primary key not enforced,
  `fv_desc`    string
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://192.168.1.17:5432/postgres',
  'table-name' = 'nrt.t_dict',
  'username' = 'postgres',
  'password' = 'postgres'
)

insert into t_destination_nrt (fk_id, fv_text, fd_timestamp, fv_desc)
select /*+ BROADCAST(dn) */
       sn.fk_id,
       sn.fv_text,
       sn.fd_timestamp,
       dn.fv_desc
  from t_source_nrt         sn
       left join t_dict_nrt dn on sn.fk_id % 3 = dn.fk_id

I try reload data using flink sql in batch mode from table to table. I expected the data loading by chunk, but flink performs full caching of the source table before insertion into destination.

Apache Flink Dashboard. Graph

Apache Flink Dashboard. Metric

0

There are 0 best solutions below