Spark Structured Streaming - How to add records to a file until there are more than a specified number of records?

27 Views Asked by At

I have a data stream which function foreachBatch is then applied to. I perform partitioning on two columns and write the result to a Parquet file. I need to limit the file size to say 5 records.

I expect that inside each partitioning folder Spark will append new records to the end of the temporary file, and then save already filled files.

StreamingQuery query1 = joinedDF
                .writeStream()
                .foreachBatch(
                        (VoidFunction2<Dataset<Row>, Long>) (dataset, batchId) -> {
                            if (!dataset.isEmpty()) {

                                Dataset<Row> filtered = dataset.sort(col("_start_time").desc()).limit(1);
                                filtered.show();


                                filtered
                                        .selectExpr(columnNames)
                                        .write()
                                        .option("maxRecordsPerFile", 5)
                                        .mode("overwrite")
                                        .format("parquet")
                                        .partitionBy("event_date", "probe")
                                        .option("path", "./parquet_results")
                                        .option("checkpointLocation", "./path_to_checkpoint_location")
                                        .save();


                            }
                        }
                )
                .start();

But instead of this Spark overwrites file and it still has only 1 record. How I can achieve expected behaviour?

New to Spark. Also sorry for my English.

0

There are 0 best solutions below