This is related to my other post : Spark Java Map processing entire data set in all executors . I have a simple use case:
Read data from DB in 4 partitions
properties.setProperty("partitionColumn", "num_rows"); properties.setProperty("lowerBound", "0"); properties.setProperty("upperBound", getTotalRowCount(ID)); properties.setProperty("numPartitions", "4"); properties.setProperty("Driver", driver); properties.setProperty("user", user); properties.setProperty("password", password); Dataset<Row> records = SparkSession.getActiveSession().get().read().jdbc(jdbcUrl, table, properties);process the recordset in a loop to convert/filter/format etc.,
Dataset<String> stringRecordSet = dbRecordsSet.map((MapFunction<Row, String> )xmlRow -> { return TransformationService.extractXMLBlobToString(xmlRow); }, Encoders.STRING()); Dataset < Row > jsonDataSet = sparkSession.read().json(stringifiedDataSet);simply save this:
jsonDataSet.write().format("csv").save(filepath);
Issue: My dataset has 40 rows. Steps 2 processes each partition in parallel in a loop as expected. But in step3 when I save the recordset in the file system, that stage3 reexecutes stage2 to process the partitions all over again. the function inside .map (TransformationService.extractXMLBlobToString) is called again for all 40 records. In production I have dataset in millions. stage0 has the loop to process my data in the right format, stage1 is save, but this stage reprocesses the loop, So I see a dataset of 10 million records is processed twice unncessarily. Why is this happening? If I do a cache between step 2 and 3, it increases the time by two fold. Not helping at all. my processing time went from 5 mins to 10 mins. Can someone please help me understand why SAVE calls the loop again?
Your
maptransformation is executed twice because you are calling 2 actions on its result without caching in between. You two actions are:Dataset < Row > jsonDataSet = sparkSession.read().json(stringifiedDataSet);DataFrameReaderobject, so it has to infer a schema for your dataframe, necessitating an immediate actionjsonDataSet.write().format("csv").save(filepath);You could try:
stringRecordSet. This is the default solution for these kinds of problemssparkSession.read().json(stringifiedDataSet);action into a lazy transformation by supplying it with a schema, like so:sparkSession.read().schema(schema).json(stringifiedDataSet);