Spark dataset.map from stage0 repeats in other stages

59 Views Asked by At

This is related to my other post : Spark Java Map processing entire data set in all executors . I have a simple use case:

  1. Read data from DB in 4 partitions

     properties.setProperty("partitionColumn", "num_rows");
     properties.setProperty("lowerBound", "0");
     properties.setProperty("upperBound", getTotalRowCount(ID));
     properties.setProperty("numPartitions", "4");
     properties.setProperty("Driver", driver);
     properties.setProperty("user", user);
     properties.setProperty("password", password);
     Dataset<Row> records = SparkSession.getActiveSession().get().read().jdbc(jdbcUrl, table, properties);
    
  2. process the recordset in a loop to convert/filter/format etc.,

     Dataset<String> stringRecordSet = dbRecordsSet.map((MapFunction<Row, String> )xmlRow -> {
             return TransformationService.extractXMLBlobToString(xmlRow);
         }, Encoders.STRING());   Dataset < Row > jsonDataSet = sparkSession.read().json(stringifiedDataSet);
    
  3. simply save this:

    jsonDataSet.write().format("csv").save(filepath);

Issue: My dataset has 40 rows. Steps 2 processes each partition in parallel in a loop as expected. But in step3 when I save the recordset in the file system, that stage3 reexecutes stage2 to process the partitions all over again. the function inside .map (TransformationService.extractXMLBlobToString) is called again for all 40 records. In production I have dataset in millions. stage0 has the loop to process my data in the right format, stage1 is save, but this stage reprocesses the loop, So I see a dataset of 10 million records is processed twice unncessarily. Why is this happening? If I do a cache between step 2 and 3, it increases the time by two fold. Not helping at all. my processing time went from 5 mins to 10 mins. Can someone please help me understand why SAVE calls the loop again?

1

There are 1 best solutions below

2
Koedlt On

Your map transformation is executed twice because you are calling 2 actions on its result without caching in between. You two actions are:

  • Dataset < Row > jsonDataSet = sparkSession.read().json(stringifiedDataSet);
    • this is an action because you did not supply any schema to your DataFrameReader object, so it has to infer a schema for your dataframe, necessitating an immediate action
  • jsonDataSet.write().format("csv").save(filepath);

You could try:

  • caching stringRecordSet. This is the default solution for these kinds of problems
  • in this case, you might turn the sparkSession.read().json(stringifiedDataSet); action into a lazy transformation by supplying it with a schema, like so: sparkSession.read().schema(schema).json(stringifiedDataSet);