I have a data stream which function foreachBatch is then applied to. I perform partitioning on two columns and write the result to a Parquet file. I need to limit the file size to say 5 records.
I expect that inside each partitioning folder Spark will append new records to the end of the temporary file, and then save already filled files.
StreamingQuery query1 = joinedDF
.writeStream()
.foreachBatch(
(VoidFunction2<Dataset<Row>, Long>) (dataset, batchId) -> {
if (!dataset.isEmpty()) {
Dataset<Row> filtered = dataset.sort(col("_start_time").desc()).limit(1);
filtered.show();
filtered
.selectExpr(columnNames)
.write()
.option("maxRecordsPerFile", 5)
.mode("overwrite")
.format("parquet")
.partitionBy("event_date", "probe")
.option("path", "./parquet_results")
.option("checkpointLocation", "./path_to_checkpoint_location")
.save();
}
}
)
.start();
But instead of this Spark overwrites file and it still has only 1 record. How I can achieve expected behaviour?
New to Spark. Also sorry for my English.