Flink Hudi is not merging small Parquet files

75 Views Asked by At

Based on Hoodie documentation for CoW table it should take care auto parquet file sizing. However, it just doesn't work. Could anyone please assist?

CREATE TABLE hudi_table(
    ts BIGINT,
    uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
    rider VARCHAR(20),
    driver VARCHAR(20),
    fare DOUBLE,
    city VARCHAR(20)
)
WITH (
  'connector' = 'hudi',
  'path' = 'file:///opt/flink/hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.operation'='insert',
  'hoodie.parquet.small.file.limit'='500000',
  'hoodie.parquet.max.file.size'= '1000000'
);

When I insert multiple records one by one - a separate parquet file (420 kb size) is produced for each commit and they are never merged.. (notice the small.file.limit which is set to 500 kb and max.file.size which is set to 1MB).

INSERT INTO hudi_table
VALUES
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco');
...
2

There are 2 best solutions below

1
Swapnil Khante On

It seems that you are using a spark-specific config for a Flink pipeline. Flink config for max file size is write.parquet.max.file.size

Also if you wish to improve query performance while working with small files you can make use of Clustering. Clustering with Hudi helps you stitch a lot of small files into larger ones.

0
oceansize On

Just like Swapnil pointed in the above's comment - Flink setting for parquet max file size is different.

Regarding commits - in streaming mode (like using Flink datagen connector to generate infitine source data) - they are only performed during Flink checkpoints. So it's mandatory to enable checkpointing (in Flink SQL this is like SET 'execution.checkpointing.interval' = '1m'; - otherwise, even though parquet files are written, no commits are produced and it's impossible to read any data from the hudi table. And that's I was unable to find in the official documentation what is quite confusing.

Hope this will be useful for anyone.